13 Commits

Author SHA1 Message Date
Tim Kuehn
b9868250f8 Prepare release of v0.23.0 2020-10-19 11:12:43 -07:00
Urhengulas
a3f1064efe Cargo.toml: Clean + update dependencies 2020-10-18 16:03:04 -07:00
Johann Hemmann
026083d653 Bump tokio from 0.2 to 0.3 (#319)
# Bump `tokio` from 0.2 to 0.3

* `Cargo.toml`:
    * bump `tokio` from 0.2 to 0.3
    * bump `tokio-util` from 0.3 to 0.4
    * remove feature `time` from `tokio`
    * fix alphabetical order of dependencies
* `tarpc::rpc`:
    * `client, server`: `tokio::time::Elapsed` -> `tokio::time::error::Elapsed`
    * `client, transport`, `::tests`: Fix `#[tokio::test]` macro usage
* `tarpc::serde_transport`:
    * `TcpListener.incoming().poll_next(...)` -> `TcpListener.poll_accept(...)`
      -> https://github.com/tokio-rs/tokio/discussions/2983
    * Adapt `AsyncRead`, `AsynWrite` implements in tests
* `README.md`, `tarpc::lib`: Adapt tokio version in docs

# Satisfy clippy

* replace `match`-statements with `matches!(...)`-macro
2020-10-17 17:33:08 -07:00
Tim Kuehn
d27f341bde Prepare release of v0.22.0 2020-08-19 18:35:36 -07:00
Tim Kuehn
2264ebecfc Remove serde_transport::tcp::connect_with.
Instead, serde_transport::tcp::connect returns a future named Connect
that has methods to directly access the framing config. This is
consistent with how serde_transport::tcp::listen returns a future with
methods to access the framing config. In addition to this consistency,
it reduces the API surface and provides a simpler user transition from
"zero config" to "some config".
2020-08-19 17:51:53 -07:00
Tim Kuehn
3207affb4a Update pre-commit for changes to cargo fmt.
--write-mode is now --check.
2020-08-19 17:51:20 -07:00
Andre B. Reis
0602afd50c Make connect() and connect_with() take a FnOnce for the codec (#315) 2020-08-19 16:15:26 -07:00
Tim Kuehn
4343e12217 Fix incorrect documentation 2020-08-18 02:58:11 -07:00
Tim Kuehn
7fda862fb8 Run cargo fmt 2020-08-18 02:55:24 -07:00
Tim Kuehn
aa7b875b1a Expose framing config in serde_transport. 2020-08-18 02:47:41 -07:00
Tim Kuehn
54d6e0e3b6 Add license headers 2020-08-04 17:33:41 -07:00
Tim Kuehn
bea3b442aa Move mod.rs files up one directory.
It's easier in IDEs if the files aren't all named the same.
2020-08-04 17:25:53 -07:00
Tim Kuehn
954a2502e7 Remove duplicate rustdoc 2020-08-02 22:24:09 -07:00
25 changed files with 209 additions and 156 deletions

View File

@@ -14,6 +14,8 @@
# tarpc
<!-- cargo-sync-readme start -->
*Disclaimer*: This is not an official Google product.
tarpc is an RPC framework for rust with a focus on ease of use. Defining a
@@ -22,7 +24,7 @@ writing a server is taken care of for you.
[Documentation](https://docs.rs/crate/tarpc/)
### What is an RPC framework?
## What is an RPC framework?
"RPC" stands for "Remote Procedure Call," a function call where the work of
producing the return value is being done somewhere else. When an rpc function is
invoked, behind the scenes the function contacts some other process somewhere
@@ -40,7 +42,7 @@ process, and no context switching between different languages.
Some other features of tarpc:
- Pluggable transport: any type impling `Stream<Item = Request> + Sink<Response>` can be
used as a transport to connect the client and server.
- `Send` optional: if the transport doesn't require it, neither does tarpc!
- `Send + 'static` optional: if the transport doesn't require it, neither does tarpc!
- Cascading cancellation: dropping a request will send a cancellation message to the server.
The server will cease any unfinished work on the request, subsequently cancelling any of its
own requests, repeating for the entire chain of transitive dependencies.
@@ -53,25 +55,25 @@ Some other features of tarpc:
responses `Serialize + Deserialize`. It's entirely optional, though: in-memory transports can
be used, as well, so the price of serialization doesn't have to be paid when it's not needed.
### Usage
## Usage
Add to your `Cargo.toml` dependencies:
```toml
tarpc = { version = "0.21.0", features = ["full"] }
tarpc = "0.23.0"
```
The `tarpc::service` attribute expands to a collection of items that form an rpc service.
These generated types make it easy and ergonomic to write servers with less boilerplate.
Simply implement the generated service trait, and you're off to the races!
### Example
## Example
For this example, in addition to tarpc, also add two other dependencies to
your `Cargo.toml`:
```toml
futures = "0.3"
tokio = "0.2"
tokio = "0.3"
```
In the following example, we use an in-process channel for communication between
@@ -81,6 +83,7 @@ For a more real-world example, see [example-service](example-service).
First, let's set up the dependencies and service definition.
```rust
use futures::{
future::{self, Ready},
prelude::*,
@@ -109,19 +112,22 @@ implement it for our Server struct.
#[derive(Clone)]
struct HelloServer;
#[tarpc::server]
impl World for HelloServer {
async fn hello(self, _: context::Context, name: String) -> String {
format!("Hello, {}!", name)
// Each defined rpc generates two items in the trait, a fn that serves the RPC, and
// an associated type representing the future output by the fn.
type HelloFut = Ready<String>;
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
future::ready(format!("Hello, {}!", name))
}
}
```
Lastly let's write our `main` that will start the server. While this example uses an
[in-process
channel](https://docs.rs/tarpc/0.18.0/tarpc/transport/channel/struct.UnboundedChannel.html),
tarpc also ships bincode and JSON
tokio-net based TCP transports that are generic over all serializable types.
[in-process channel](rpc::transport::channel), tarpc also ships a generic [`serde_transport`]
behind the `serde-transport` feature, with additional [TCP](serde_transport::tcp) functionality
available behind the `tcp` feature.
```rust
#[tokio::main]
@@ -151,9 +157,11 @@ async fn main() -> io::Result<()> {
}
```
### Service Documentation
## Service Documentation
Use `cargo doc` as you normally would to see the documentation created for all
items expanded by a `service!` invocation.
<!-- cargo-sync-readme end -->
License: MIT

View File

@@ -1,3 +1,35 @@
## 0.23.0 (2020-10-19)
### Breaking Changes
Upgrades tokio to 0.3.
## 0.22.0 (2020-08-02)
This release adds some flexibility and consistency to `serde_transport`, with one new feature and
one small breaking change.
### New Features
`serde_transport::tcp` now exposes framing configuration on `connect()` and `listen()`. This is
useful if, for instance, you want to send requests or responses that are larger than the maximum
payload allowed by default:
```rust
let mut transport = tarpc::serde_transport::tcp::connect(server_addr, Json::default);
transport.config_mut().max_frame_length(4294967296);
let mut client = MyClient::new(client::Config::default(), transport.await?).spawn()?;
```
### Breaking Changes
The codec argument to `serde_transport::tcp::connect` changed from a Codec to impl Fn() -> Codec,
to be consistent with `serde_transport::tcp::listen`. While only one Codec is needed, more than one
person has been tripped up by the inconsistency between `connect` and `listen`. Unfortunately, the
compiler errors are not much help in this case, so it was decided to simply do the more intuitive
thing so that the compiler doesn't need to step in in the first place.
## 0.21.1 (2020-08-02)
### New Features
@@ -62,7 +94,7 @@ nameable futures and will just be boxing the return type anyway. This macro does
### Bug Fixes
- https://github.com/google/tarpc/issues/304
A race condition in code that limits number of connections per client caused occasional panics.
- https://github.com/google/tarpc/pull/295
@@ -82,7 +114,7 @@ nameable futures and will just be boxing the return type anyway. This macro does
## 0.13.0 (2018-10-16)
### Breaking Changes
### Breaking Changes
Version 0.13 marks a significant departure from previous versions of tarpc. The
API has changed significantly. The tokio-proto crate has been torn out and

View File

@@ -13,13 +13,14 @@ readme = "../README.md"
description = "An example server built on tarpc."
[dependencies]
clap = "2.0"
clap = "2.33"
env_logger = "0.8"
futures = "0.3"
serde = { version = "1.0" }
tarpc = { version = "0.21", path = "../tarpc", features = ["full"] }
tokio = { version = "0.2", features = ["full"] }
tarpc = { version = "0.23", path = "../tarpc", features = ["full"] }
tokio = { version = "0.3", features = ["full"] }
tokio-serde = { version = "0.6", features = ["json"] }
env_logger = "0.6"
tokio-util = { version = "0.4", features = ["codec"] }
[lib]
name = "service"

View File

@@ -43,11 +43,13 @@ async fn main() -> io::Result<()> {
let name = flags.value_of("name").unwrap().into();
let transport = tarpc::serde_transport::tcp::connect(server_addr, Json::default()).await?;
let mut transport = tarpc::serde_transport::tcp::connect(server_addr, Json::default);
transport.config_mut().max_frame_length(4294967296);
// WorldClient is generated by the service attribute. It has a constructor `new` that takes a
// config and any Transport as input.
let mut client = service::WorldClient::new(client::Config::default(), transport).spawn()?;
let mut client =
service::WorldClient::new(client::Config::default(), transport.await?).spawn()?;
// The client has an RPC method for each RPC defined in the annotated trait. It takes the same
// args as defined, with the addition of a Context, which is always the first arg. The Context

View File

@@ -57,8 +57,9 @@ async fn main() -> io::Result<()> {
// JSON transport is provided by the json_transport tarpc module. It makes it easy
// to start up a serde-powered json serialization strategy over TCP.
tarpc::serde_transport::tcp::listen(&server_addr, Json::default)
.await?
let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Json::default).await?;
listener.config_mut().max_frame_length(4294967296);
listener
// Ignore accept errors.
.filter_map(|r| future::ready(r.ok()))
.map(server::BaseChannel::with_defaults)

View File

@@ -93,7 +93,7 @@ diff=""
for file in $(git diff --name-only --cached);
do
if [ ${file: -3} == ".rs" ]; then
diff="$diff$(cargo fmt -- --skip-children --write-mode=diff $file)"
diff="$diff$(cargo fmt -- --unstable-features --skip-children --check $file)"
fi
done
if grep --quiet "^[-+]" <<< "$diff"; then

View File

@@ -19,15 +19,15 @@ serde1 = []
travis-ci = { repository = "google/tarpc" }
[dependencies]
syn = { version = "1.0.11", features = ["full"] }
quote = "1.0.2"
proc-macro2 = "1.0.6"
proc-macro2 = "1.0"
quote = "1.0"
syn = { version = "1.0", features = ["full"] }
[lib]
proc-macro = true
[dev-dependencies]
assert-type-eq = "0.1.0"
futures = "0.3"
serde = { version = "1.0", features = ["derive"] }
tarpc = { path = "../tarpc" }
assert-type-eq = "0.1.0"

View File

@@ -328,60 +328,6 @@ fn transform_method(method: &mut ImplItemMethod) -> ImplItemType {
t
}
/// Syntactic sugar to make using async functions in the server implementation
/// easier. It does this by rewriting code like this, which would normally not
/// compile because async functions are disallowed in trait implementations:
///
/// ```rust
/// # extern crate tarpc;
/// # use tarpc::context;
/// # use std::net::SocketAddr;
/// #[tarpc_plugins::service]
/// trait World {
/// async fn hello(name: String) -> String;
/// }
///
/// #[derive(Clone)]
/// struct HelloServer(SocketAddr);
///
/// #[tarpc_plugins::server]
/// impl World for HelloServer {
/// async fn hello(self, _: context::Context, name: String) -> String {
/// format!("Hello, {}! You are connected from {:?}.", name, self.0)
/// }
/// }
/// ```
///
/// Into code like this, which matches the service trait definition:
///
/// ```rust
/// # extern crate tarpc;
/// # use tarpc::context;
/// # use std::pin::Pin;
/// # use futures::Future;
/// # use std::net::SocketAddr;
/// #[tarpc_plugins::service]
/// trait World {
/// async fn hello(name: String) -> String;
/// }
///
/// #[derive(Clone)]
/// struct HelloServer(SocketAddr);
///
/// impl World for HelloServer {
/// type HelloFut = Pin<Box<dyn Future<Output = String> + Send>>;
///
/// fn hello(self, _: context::Context, name: String) -> Pin<Box<dyn Future<Output = String>
/// + Send>> {
/// Box::pin(async move {
/// format!("Hello, {}! You are connected from {:?}.", name, self.0)
/// })
/// }
/// }
/// ```
///
/// Note that this won't touch functions unless they have been annotated with
/// `async`, meaning that this should not break existing code.
#[proc_macro_attribute]
pub fn server(_attr: TokenStream, input: TokenStream) -> TokenStream {
let mut item = syn::parse_macro_input!(input as ItemImpl);

View File

@@ -1,6 +1,6 @@
[package]
name = "tarpc"
version = "0.21.1"
version = "0.23.0"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
edition = "2018"
license = "MIT"
@@ -29,29 +29,27 @@ travis-ci = { repository = "google/tarpc" }
anyhow = "1.0"
fnv = "1.0"
futures = "0.3"
humantime = "1.0"
humantime = "2.0"
log = "0.4"
pin-project = "0.4.17"
pin-project = "1.0"
rand = "0.7"
tokio = { version = "0.2", features = ["time"] }
serde = { optional = true, version = "1.0", features = ["derive"] }
static_assertions = "1.1.0"
tokio-util = { optional = true, version = "0.2" }
tarpc-plugins = { path = "../plugins", version = "0.8" }
tokio = { version = "0.3" }
tokio-util = { optional = true, version = "0.4" }
tokio-serde = { optional = true, version = "0.6" }
[dev-dependencies]
assert_matches = "1.0"
assert_matches = "1.4"
bincode = "1.3"
bytes = { version = "0.5", features = ["serde"] }
env_logger = "0.6"
flate2 = "1.0.16"
futures = "0.3"
humantime = "1.0"
env_logger = "0.8"
flate2 = "1.0"
log = "0.4"
pin-utils = "0.1.0-alpha"
serde_bytes = "0.11"
tokio = { version = "0.2", features = ["full"] }
tokio = { version = "0.3", features = ["full"] }
tokio-serde = { version = "0.6", features = ["json", "bincode"] }
trybuild = "1.0"

View File

@@ -118,7 +118,7 @@ async fn main() -> anyhow::Result<()> {
.await;
});
let transport = tcp::connect(addr, Bincode::default()).await?;
let transport = tcp::connect(addr, Bincode::default).await?;
let mut client =
WorldClient::new(client::Config::default(), add_compression(transport)).spawn()?;

View File

@@ -18,7 +18,7 @@
/// messages to all clients subscribed to the topic of that message.
///
/// Subscriber Publisher PubSub Server
/// T1 | | |
/// T1 | | |
/// T2 |-----Connect------------------------------------------------------>|
/// T3 | | |
/// T2 |<-------------------------------------------------------Topics-----|
@@ -103,7 +103,7 @@ impl Subscriber {
publisher_addr: impl ToSocketAddrs,
topics: Vec<String>,
) -> anyhow::Result<SubscriberHandle> {
let publisher = tcp::connect(publisher_addr, Json::default()).await?;
let publisher = tcp::connect(publisher_addr, Json::default).await?;
let local_addr = publisher.local_addr()?;
let mut handler = server::BaseChannel::with_defaults(publisher)
.respond_with(Subscriber { local_addr, topics }.serve());
@@ -308,7 +308,7 @@ async fn main() -> anyhow::Result<()> {
let mut publisher = publisher::PublisherClient::new(
client::Config::default(),
tcp::connect(addrs.publisher, Json::default()).await?,
tcp::connect(addrs.publisher, Json::default).await?,
)
.spawn()?;

View File

@@ -62,7 +62,7 @@ async fn main() -> io::Result<()> {
};
tokio::spawn(server);
let transport = tarpc::serde_transport::tcp::connect(addr, Json::default()).await?;
let transport = tarpc::serde_transport::tcp::connect(addr, Json::default).await?;
// WorldClient is generated by the tarpc::service attribute. It has a constructor `new` that
// takes a config and any Transport as input.

View File

@@ -68,7 +68,7 @@ async fn main() -> io::Result<()> {
.respond_with(AddServer.serve());
tokio::spawn(add_server);
let to_add_server = tarpc::serde_transport::tcp::connect(addr, Json::default()).await?;
let to_add_server = tarpc::serde_transport::tcp::connect(addr, Json::default).await?;
let add_client = add::AddClient::new(client::Config::default(), to_add_server).spawn()?;
let double_listener = tarpc::serde_transport::tcp::listen("localhost:0", Json::default)
@@ -81,7 +81,7 @@ async fn main() -> io::Result<()> {
.respond_with(DoubleServer { add_client }.serve());
tokio::spawn(double_server);
let to_double_server = tarpc::serde_transport::tcp::connect(addr, Json::default()).await?;
let to_double_server = tarpc::serde_transport::tcp::connect(addr, Json::default).await?;
let mut double_client =
double::DoubleClient::new(client::Config::default(), to_double_server).spawn()?;

View File

@@ -4,9 +4,6 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
//! [![Latest Version](https://img.shields.io/crates/v/tarpc.svg)](https://crates.io/crates/tarpc)
//! [![Join the chat at https://gitter.im/tarpc/Lobby](https://badges.gitter.im/tarpc/Lobby.svg)](https://gitter.im/tarpc/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
//!
//! *Disclaimer*: This is not an official Google product.
//!
//! tarpc is an RPC framework for rust with a focus on ease of use. Defining a
@@ -50,7 +47,7 @@
//! Add to your `Cargo.toml` dependencies:
//!
//! ```toml
//! tarpc = "0.21.0"
//! tarpc = "0.23.0"
//! ```
//!
//! The `tarpc::service` attribute expands to a collection of items that form an rpc service.
@@ -64,7 +61,7 @@
//!
//! ```toml
//! futures = "0.3"
//! tokio = "0.2"
//! tokio = "0.3"
//! ```
//!
//! In the following example, we use an in-process channel for communication between

View File

@@ -88,7 +88,7 @@ impl<'a, Req, Resp> Future for Call<'a, Req, Resp> {
let resp = ready!(self.as_mut().project().fut.poll(cx));
Poll::Ready(match resp {
Ok(resp) => resp,
Err(tokio::time::Elapsed { .. }) => Err(io::Error::new(
Err(tokio::time::error::Elapsed { .. }) => Err(io::Error::new(
io::ErrorKind::TimedOut,
"Client dropped expired request.".to_string(),
)),
@@ -723,7 +723,7 @@ mod tests {
};
use std::{pin::Pin, sync::atomic::AtomicU64, sync::Arc};
#[tokio::test(threaded_scheduler)]
#[tokio::test]
async fn dispatch_response_cancels_on_drop() {
let (cancellation, mut canceled_requests) = cancellations();
let (_, response) = oneshot::channel();
@@ -738,7 +738,7 @@ mod tests {
assert_eq!(canceled_requests.0.try_next().unwrap(), Some(3));
}
#[tokio::test(threaded_scheduler)]
#[tokio::test]
async fn stage_request() {
let (mut dispatch, mut channel, _server_channel) = set_up();
let dispatch = Pin::new(&mut dispatch);
@@ -755,7 +755,7 @@ mod tests {
}
// Regression test for https://github.com/google/tarpc/issues/220
#[tokio::test(threaded_scheduler)]
#[tokio::test]
async fn stage_request_channel_dropped_doesnt_panic() {
let (mut dispatch, mut channel, mut server_channel) = set_up();
let mut dispatch = Pin::new(&mut dispatch);
@@ -776,7 +776,7 @@ mod tests {
dispatch.await.unwrap();
}
#[tokio::test(threaded_scheduler)]
#[tokio::test]
async fn stage_request_response_future_dropped_is_canceled_before_sending() {
let (mut dispatch, mut channel, _server_channel) = set_up();
let dispatch = Pin::new(&mut dispatch);
@@ -791,7 +791,7 @@ mod tests {
assert!(dispatch.poll_next_request(cx).ready().is_none());
}
#[tokio::test(threaded_scheduler)]
#[tokio::test]
async fn stage_request_response_future_dropped_is_canceled_after_sending() {
let (mut dispatch, mut channel, _server_channel) = set_up();
let cx = &mut Context::from_waker(&noop_waker_ref());
@@ -813,7 +813,7 @@ mod tests {
assert!(dispatch.project().in_flight_requests.is_empty());
}
#[tokio::test(threaded_scheduler)]
#[tokio::test]
async fn stage_request_response_closed_skipped() {
let (mut dispatch, mut channel, _server_channel) = set_up();
let dispatch = Pin::new(&mut dispatch);

View File

@@ -565,7 +565,7 @@ where
request_id: self.request_id,
message: match result {
Ok(message) => Ok(message),
Err(tokio::time::Elapsed { .. }) => {
Err(tokio::time::error::Elapsed { .. }) => {
debug!(
"[{}] Response did not complete before deadline of {}s.",
self.ctx.trace_id(),
@@ -624,11 +624,7 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let read = self.as_mut().pump_read(cx)?;
let read_closed = if let Poll::Ready(None) = read {
true
} else {
false
};
let read_closed = matches!(read, Poll::Ready(None));
match (read, self.as_mut().pump_write(cx, read_closed)?) {
(Poll::Ready(None), Poll::Ready(None)) => {
return Poll::Ready(None);

View File

@@ -1,3 +1,9 @@
// Copyright 2020 Google LLC
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
use crate::server::{Channel, Config};
use crate::{context, Request, Response};
use fnv::FnvHashSet;
@@ -111,10 +117,7 @@ pub trait PollExt {
impl<T> PollExt for Poll<Option<T>> {
fn is_done(&self) -> bool {
match self {
Poll::Ready(None) => true,
_ => false,
}
matches!(self, Poll::Ready(None))
}
}

View File

@@ -1,3 +1,9 @@
// Copyright 2020 Google LLC
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
use super::{Channel, Config};
use crate::{Response, ServerError};
use futures::{future::AbortRegistration, prelude::*, ready, task::*};

View File

@@ -90,7 +90,7 @@ mod tests {
use std::io;
#[cfg(feature = "tokio1")]
#[tokio::test(threaded_scheduler)]
#[tokio::test]
async fn integration() -> io::Result<()> {
let _ = env_logger::try_init();

View File

@@ -14,7 +14,10 @@ use serde::{Deserialize, Serialize};
use std::{error::Error, io, pin::Pin};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_serde::{Framed as SerdeFramed, *};
use tokio_util::codec::{length_delimited::LengthDelimitedCodec, Framed};
use tokio_util::codec::{
length_delimited::{self, LengthDelimitedCodec},
Framed,
};
/// A transport that serializes to, and deserializes from, a byte stream.
#[pin_project]
@@ -90,6 +93,22 @@ fn convert<E: Into<Box<dyn Error + Send + Sync>>>(
poll.map(|ready| ready.map_err(|e| io::Error::new(io::ErrorKind::Other, e)))
}
/// Constructs a new transport from a framed transport and a serialization codec.
pub fn new<S, Item, SinkItem, Codec>(
framed_io: Framed<S, LengthDelimitedCodec>,
codec: Codec,
) -> Transport<S, Item, SinkItem, Codec>
where
S: AsyncWrite + AsyncRead,
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
Codec: Serializer<SinkItem> + Deserializer<Item>,
{
Transport {
inner: SerdeFramed::new(framed_io, codec),
}
}
impl<S, Item, SinkItem, Codec> From<(S, Codec)> for Transport<S, Item, SinkItem, Codec>
where
S: AsyncWrite + AsyncRead,
@@ -97,10 +116,8 @@ where
SinkItem: Serialize,
Codec: Serializer<SinkItem> + Deserializer<Item>,
{
fn from((inner, codec): (S, Codec)) -> Self {
Transport {
inner: SerdeFramed::new(Framed::new(inner, LengthDelimitedCodec::new()), codec),
}
fn from((io, codec): (S, Codec)) -> Self {
new(Framed::new(io, LengthDelimitedCodec::new()), codec)
}
}
@@ -134,34 +151,65 @@ pub mod tcp {
}
}
/// Returns a new JSON transport that reads from and writes to `io`.
pub fn new<Item, SinkItem, Codec>(
io: TcpStream,
codec: Codec,
) -> Transport<TcpStream, Item, SinkItem, Codec>
/// A connection Future that also exposes the length-delimited framing config.
#[pin_project]
pub struct Connect<T, Item, SinkItem, CodecFn> {
#[pin]
inner: T,
codec_fn: CodecFn,
config: length_delimited::Builder,
ghost: PhantomData<(fn(SinkItem), fn() -> Item)>,
}
impl<T, Item, SinkItem, Codec, CodecFn> Future for Connect<T, Item, SinkItem, CodecFn>
where
T: Future<Output = io::Result<TcpStream>>,
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
Codec: Serializer<SinkItem> + Deserializer<Item>,
CodecFn: Fn() -> Codec,
{
Transport::from((io, codec))
type Output = io::Result<Transport<TcpStream, Item, SinkItem, Codec>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let io = ready!(self.as_mut().project().inner.poll(cx))?;
Poll::Ready(Ok(new(self.config.new_framed(io), (self.codec_fn)())))
}
}
/// Connects to `addr`, wrapping the connection in a JSON transport.
pub async fn connect<A, Item, SinkItem, Codec>(
impl<T, Item, SinkItem, CodecFn> Connect<T, Item, SinkItem, CodecFn> {
/// Returns an immutable reference to the length-delimited codec's config.
pub fn config(&self) -> &length_delimited::Builder {
&self.config
}
/// Returns a mutable reference to the length-delimited codec's config.
pub fn config_mut(&mut self) -> &mut length_delimited::Builder {
&mut self.config
}
}
/// Connects to `addr`, wrapping the connection in a TCP transport.
pub fn connect<A, Item, SinkItem, Codec, CodecFn>(
addr: A,
codec: Codec,
) -> io::Result<Transport<TcpStream, Item, SinkItem, Codec>>
codec_fn: CodecFn,
) -> Connect<impl Future<Output = io::Result<TcpStream>>, Item, SinkItem, CodecFn>
where
A: ToSocketAddrs,
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
Codec: Serializer<SinkItem> + Deserializer<Item>,
CodecFn: Fn() -> Codec,
{
Ok(new(TcpStream::connect(addr).await?, codec))
Connect {
inner: TcpStream::connect(addr),
codec_fn,
config: LengthDelimitedCodec::builder(),
ghost: PhantomData,
}
}
/// Listens on `addr`, wrapping accepted connections in JSON transports.
/// Listens on `addr`, wrapping accepted connections in TCP transports.
pub async fn listen<A, Item, SinkItem, Codec, CodecFn>(
addr: A,
codec_fn: CodecFn,
@@ -178,6 +226,7 @@ pub mod tcp {
listener,
codec_fn,
local_addr,
config: LengthDelimitedCodec::builder(),
ghost: PhantomData,
})
}
@@ -189,7 +238,8 @@ pub mod tcp {
listener: TcpListener,
local_addr: SocketAddr,
codec_fn: CodecFn,
ghost: PhantomData<(Item, SinkItem, Codec)>,
config: length_delimited::Builder,
ghost: PhantomData<(fn() -> Item, fn(SinkItem), Codec)>,
}
impl<Item, SinkItem, Codec, CodecFn> Incoming<Item, SinkItem, Codec, CodecFn> {
@@ -197,6 +247,16 @@ pub mod tcp {
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
/// Returns an immutable reference to the length-delimited codec's config.
pub fn config(&self) -> &length_delimited::Builder {
&self.config
}
/// Returns a mutable reference to the length-delimited codec's config.
pub fn config_mut(&mut self) -> &mut length_delimited::Builder {
&mut self.config
}
}
impl<Item, SinkItem, Codec, CodecFn> Stream for Incoming<Item, SinkItem, Codec, CodecFn>
@@ -209,9 +269,12 @@ pub mod tcp {
type Item = io::Result<Transport<TcpStream, Item, SinkItem, Codec>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let next =
ready!(Pin::new(&mut self.as_mut().project().listener.incoming()).poll_next(cx)?);
Poll::Ready(next.map(|conn| Ok(new(conn, (self.codec_fn)()))))
let conn: TcpStream =
ready!(Pin::new(&mut self.as_mut().project().listener).poll_accept(cx)?).0;
Poll::Ready(Some(Ok(new(
self.config.new_framed(conn),
(self.codec_fn)(),
))))
}
}
}
@@ -226,7 +289,7 @@ mod tests {
io::{self, Cursor},
pin::Pin,
};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_serde::formats::SymmetricalJson;
fn ctx() -> Context<'static> {
@@ -241,8 +304,8 @@ mod tests {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
AsyncRead::poll_read(Pin::new(self.0.get_mut()), cx, buf)
}
}
@@ -285,8 +348,8 @@ mod tests {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_buf: &mut [u8],
) -> Poll<io::Result<usize>> {
_buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
unreachable!()
}
}

View File

@@ -36,7 +36,7 @@ impl Service for Server {
}
}
#[tokio::test(threaded_scheduler)]
#[tokio::test]
async fn sequential() -> io::Result<()> {
let _ = env_logger::try_init();
@@ -59,7 +59,7 @@ async fn sequential() -> io::Result<()> {
}
#[cfg(feature = "serde1")]
#[tokio::test(threaded_scheduler)]
#[tokio::test]
async fn serde() -> io::Result<()> {
let _ = env_logger::try_init();
@@ -71,7 +71,7 @@ async fn serde() -> io::Result<()> {
.respond_with(Server.serve()),
);
let transport = serde_transport::tcp::connect(addr, Json::default()).await?;
let transport = serde_transport::tcp::connect(addr, Json::default).await?;
let mut client = ServiceClient::new(client::Config::default(), transport).spawn()?;
assert_matches!(client.add(context::current(), 1, 2).await, Ok(3));
@@ -83,7 +83,7 @@ async fn serde() -> io::Result<()> {
Ok(())
}
#[tokio::test(threaded_scheduler)]
#[tokio::test]
async fn concurrent() -> io::Result<()> {
let _ = env_logger::try_init();
@@ -112,7 +112,7 @@ async fn concurrent() -> io::Result<()> {
Ok(())
}
#[tokio::test(threaded_scheduler)]
#[tokio::test]
async fn concurrent_join() -> io::Result<()> {
let _ = env_logger::try_init();
@@ -142,7 +142,7 @@ async fn concurrent_join() -> io::Result<()> {
Ok(())
}
#[tokio::test(threaded_scheduler)]
#[tokio::test]
async fn concurrent_join_all() -> io::Result<()> {
let _ = env_logger::try_init();