13 Commits

Author SHA1 Message Date
Tim Kuehn
b9868250f8 Prepare release of v0.23.0 2020-10-19 11:12:43 -07:00
Urhengulas
a3f1064efe Cargo.toml: Clean + update dependencies 2020-10-18 16:03:04 -07:00
Johann Hemmann
026083d653 Bump tokio from 0.2 to 0.3 (#319)
# Bump `tokio` from 0.2 to 0.3

* `Cargo.toml`:
    * bump `tokio` from 0.2 to 0.3
    * bump `tokio-util` from 0.3 to 0.4
    * remove feature `time` from `tokio`
    * fix alphabetical order of dependencies
* `tarpc::rpc`:
    * `client, server`: `tokio::time::Elapsed` -> `tokio::time::error::Elapsed`
    * `client, transport`, `::tests`: Fix `#[tokio::test]` macro usage
* `tarpc::serde_transport`:
    * `TcpListener.incoming().poll_next(...)` -> `TcpListener.poll_accept(...)`
      -> https://github.com/tokio-rs/tokio/discussions/2983
    * Adapt `AsyncRead`, `AsynWrite` implements in tests
* `README.md`, `tarpc::lib`: Adapt tokio version in docs

# Satisfy clippy

* replace `match`-statements with `matches!(...)`-macro
2020-10-17 17:33:08 -07:00
Tim Kuehn
d27f341bde Prepare release of v0.22.0 2020-08-19 18:35:36 -07:00
Tim Kuehn
2264ebecfc Remove serde_transport::tcp::connect_with.
Instead, serde_transport::tcp::connect returns a future named Connect
that has methods to directly access the framing config. This is
consistent with how serde_transport::tcp::listen returns a future with
methods to access the framing config. In addition to this consistency,
it reduces the API surface and provides a simpler user transition from
"zero config" to "some config".
2020-08-19 17:51:53 -07:00
Tim Kuehn
3207affb4a Update pre-commit for changes to cargo fmt.
--write-mode is now --check.
2020-08-19 17:51:20 -07:00
Andre B. Reis
0602afd50c Make connect() and connect_with() take a FnOnce for the codec (#315) 2020-08-19 16:15:26 -07:00
Tim Kuehn
4343e12217 Fix incorrect documentation 2020-08-18 02:58:11 -07:00
Tim Kuehn
7fda862fb8 Run cargo fmt 2020-08-18 02:55:24 -07:00
Tim Kuehn
aa7b875b1a Expose framing config in serde_transport. 2020-08-18 02:47:41 -07:00
Tim Kuehn
54d6e0e3b6 Add license headers 2020-08-04 17:33:41 -07:00
Tim Kuehn
bea3b442aa Move mod.rs files up one directory.
It's easier in IDEs if the files aren't all named the same.
2020-08-04 17:25:53 -07:00
Tim Kuehn
954a2502e7 Remove duplicate rustdoc 2020-08-02 22:24:09 -07:00
25 changed files with 209 additions and 156 deletions

View File

@@ -14,6 +14,8 @@
# tarpc # tarpc
<!-- cargo-sync-readme start -->
*Disclaimer*: This is not an official Google product. *Disclaimer*: This is not an official Google product.
tarpc is an RPC framework for rust with a focus on ease of use. Defining a tarpc is an RPC framework for rust with a focus on ease of use. Defining a
@@ -22,7 +24,7 @@ writing a server is taken care of for you.
[Documentation](https://docs.rs/crate/tarpc/) [Documentation](https://docs.rs/crate/tarpc/)
### What is an RPC framework? ## What is an RPC framework?
"RPC" stands for "Remote Procedure Call," a function call where the work of "RPC" stands for "Remote Procedure Call," a function call where the work of
producing the return value is being done somewhere else. When an rpc function is producing the return value is being done somewhere else. When an rpc function is
invoked, behind the scenes the function contacts some other process somewhere invoked, behind the scenes the function contacts some other process somewhere
@@ -40,7 +42,7 @@ process, and no context switching between different languages.
Some other features of tarpc: Some other features of tarpc:
- Pluggable transport: any type impling `Stream<Item = Request> + Sink<Response>` can be - Pluggable transport: any type impling `Stream<Item = Request> + Sink<Response>` can be
used as a transport to connect the client and server. used as a transport to connect the client and server.
- `Send` optional: if the transport doesn't require it, neither does tarpc! - `Send + 'static` optional: if the transport doesn't require it, neither does tarpc!
- Cascading cancellation: dropping a request will send a cancellation message to the server. - Cascading cancellation: dropping a request will send a cancellation message to the server.
The server will cease any unfinished work on the request, subsequently cancelling any of its The server will cease any unfinished work on the request, subsequently cancelling any of its
own requests, repeating for the entire chain of transitive dependencies. own requests, repeating for the entire chain of transitive dependencies.
@@ -53,25 +55,25 @@ Some other features of tarpc:
responses `Serialize + Deserialize`. It's entirely optional, though: in-memory transports can responses `Serialize + Deserialize`. It's entirely optional, though: in-memory transports can
be used, as well, so the price of serialization doesn't have to be paid when it's not needed. be used, as well, so the price of serialization doesn't have to be paid when it's not needed.
### Usage ## Usage
Add to your `Cargo.toml` dependencies: Add to your `Cargo.toml` dependencies:
```toml ```toml
tarpc = { version = "0.21.0", features = ["full"] } tarpc = "0.23.0"
``` ```
The `tarpc::service` attribute expands to a collection of items that form an rpc service. The `tarpc::service` attribute expands to a collection of items that form an rpc service.
These generated types make it easy and ergonomic to write servers with less boilerplate. These generated types make it easy and ergonomic to write servers with less boilerplate.
Simply implement the generated service trait, and you're off to the races! Simply implement the generated service trait, and you're off to the races!
### Example ## Example
For this example, in addition to tarpc, also add two other dependencies to For this example, in addition to tarpc, also add two other dependencies to
your `Cargo.toml`: your `Cargo.toml`:
```toml ```toml
futures = "0.3" futures = "0.3"
tokio = "0.2" tokio = "0.3"
``` ```
In the following example, we use an in-process channel for communication between In the following example, we use an in-process channel for communication between
@@ -81,6 +83,7 @@ For a more real-world example, see [example-service](example-service).
First, let's set up the dependencies and service definition. First, let's set up the dependencies and service definition.
```rust ```rust
use futures::{ use futures::{
future::{self, Ready}, future::{self, Ready},
prelude::*, prelude::*,
@@ -109,19 +112,22 @@ implement it for our Server struct.
#[derive(Clone)] #[derive(Clone)]
struct HelloServer; struct HelloServer;
#[tarpc::server]
impl World for HelloServer { impl World for HelloServer {
async fn hello(self, _: context::Context, name: String) -> String { // Each defined rpc generates two items in the trait, a fn that serves the RPC, and
format!("Hello, {}!", name) // an associated type representing the future output by the fn.
type HelloFut = Ready<String>;
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
future::ready(format!("Hello, {}!", name))
} }
} }
``` ```
Lastly let's write our `main` that will start the server. While this example uses an Lastly let's write our `main` that will start the server. While this example uses an
[in-process [in-process channel](rpc::transport::channel), tarpc also ships a generic [`serde_transport`]
channel](https://docs.rs/tarpc/0.18.0/tarpc/transport/channel/struct.UnboundedChannel.html), behind the `serde-transport` feature, with additional [TCP](serde_transport::tcp) functionality
tarpc also ships bincode and JSON available behind the `tcp` feature.
tokio-net based TCP transports that are generic over all serializable types.
```rust ```rust
#[tokio::main] #[tokio::main]
@@ -151,9 +157,11 @@ async fn main() -> io::Result<()> {
} }
``` ```
### Service Documentation ## Service Documentation
Use `cargo doc` as you normally would to see the documentation created for all Use `cargo doc` as you normally would to see the documentation created for all
items expanded by a `service!` invocation. items expanded by a `service!` invocation.
<!-- cargo-sync-readme end -->
License: MIT License: MIT

View File

@@ -1,3 +1,35 @@
## 0.23.0 (2020-10-19)
### Breaking Changes
Upgrades tokio to 0.3.
## 0.22.0 (2020-08-02)
This release adds some flexibility and consistency to `serde_transport`, with one new feature and
one small breaking change.
### New Features
`serde_transport::tcp` now exposes framing configuration on `connect()` and `listen()`. This is
useful if, for instance, you want to send requests or responses that are larger than the maximum
payload allowed by default:
```rust
let mut transport = tarpc::serde_transport::tcp::connect(server_addr, Json::default);
transport.config_mut().max_frame_length(4294967296);
let mut client = MyClient::new(client::Config::default(), transport.await?).spawn()?;
```
### Breaking Changes
The codec argument to `serde_transport::tcp::connect` changed from a Codec to impl Fn() -> Codec,
to be consistent with `serde_transport::tcp::listen`. While only one Codec is needed, more than one
person has been tripped up by the inconsistency between `connect` and `listen`. Unfortunately, the
compiler errors are not much help in this case, so it was decided to simply do the more intuitive
thing so that the compiler doesn't need to step in in the first place.
## 0.21.1 (2020-08-02) ## 0.21.1 (2020-08-02)
### New Features ### New Features
@@ -62,7 +94,7 @@ nameable futures and will just be boxing the return type anyway. This macro does
### Bug Fixes ### Bug Fixes
- https://github.com/google/tarpc/issues/304 - https://github.com/google/tarpc/issues/304
A race condition in code that limits number of connections per client caused occasional panics. A race condition in code that limits number of connections per client caused occasional panics.
- https://github.com/google/tarpc/pull/295 - https://github.com/google/tarpc/pull/295
@@ -82,7 +114,7 @@ nameable futures and will just be boxing the return type anyway. This macro does
## 0.13.0 (2018-10-16) ## 0.13.0 (2018-10-16)
### Breaking Changes ### Breaking Changes
Version 0.13 marks a significant departure from previous versions of tarpc. The Version 0.13 marks a significant departure from previous versions of tarpc. The
API has changed significantly. The tokio-proto crate has been torn out and API has changed significantly. The tokio-proto crate has been torn out and

View File

@@ -13,13 +13,14 @@ readme = "../README.md"
description = "An example server built on tarpc." description = "An example server built on tarpc."
[dependencies] [dependencies]
clap = "2.0" clap = "2.33"
env_logger = "0.8"
futures = "0.3" futures = "0.3"
serde = { version = "1.0" } serde = { version = "1.0" }
tarpc = { version = "0.21", path = "../tarpc", features = ["full"] } tarpc = { version = "0.23", path = "../tarpc", features = ["full"] }
tokio = { version = "0.2", features = ["full"] } tokio = { version = "0.3", features = ["full"] }
tokio-serde = { version = "0.6", features = ["json"] } tokio-serde = { version = "0.6", features = ["json"] }
env_logger = "0.6" tokio-util = { version = "0.4", features = ["codec"] }
[lib] [lib]
name = "service" name = "service"

View File

@@ -43,11 +43,13 @@ async fn main() -> io::Result<()> {
let name = flags.value_of("name").unwrap().into(); let name = flags.value_of("name").unwrap().into();
let transport = tarpc::serde_transport::tcp::connect(server_addr, Json::default()).await?; let mut transport = tarpc::serde_transport::tcp::connect(server_addr, Json::default);
transport.config_mut().max_frame_length(4294967296);
// WorldClient is generated by the service attribute. It has a constructor `new` that takes a // WorldClient is generated by the service attribute. It has a constructor `new` that takes a
// config and any Transport as input. // config and any Transport as input.
let mut client = service::WorldClient::new(client::Config::default(), transport).spawn()?; let mut client =
service::WorldClient::new(client::Config::default(), transport.await?).spawn()?;
// The client has an RPC method for each RPC defined in the annotated trait. It takes the same // The client has an RPC method for each RPC defined in the annotated trait. It takes the same
// args as defined, with the addition of a Context, which is always the first arg. The Context // args as defined, with the addition of a Context, which is always the first arg. The Context

View File

@@ -57,8 +57,9 @@ async fn main() -> io::Result<()> {
// JSON transport is provided by the json_transport tarpc module. It makes it easy // JSON transport is provided by the json_transport tarpc module. It makes it easy
// to start up a serde-powered json serialization strategy over TCP. // to start up a serde-powered json serialization strategy over TCP.
tarpc::serde_transport::tcp::listen(&server_addr, Json::default) let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Json::default).await?;
.await? listener.config_mut().max_frame_length(4294967296);
listener
// Ignore accept errors. // Ignore accept errors.
.filter_map(|r| future::ready(r.ok())) .filter_map(|r| future::ready(r.ok()))
.map(server::BaseChannel::with_defaults) .map(server::BaseChannel::with_defaults)

View File

@@ -93,7 +93,7 @@ diff=""
for file in $(git diff --name-only --cached); for file in $(git diff --name-only --cached);
do do
if [ ${file: -3} == ".rs" ]; then if [ ${file: -3} == ".rs" ]; then
diff="$diff$(cargo fmt -- --skip-children --write-mode=diff $file)" diff="$diff$(cargo fmt -- --unstable-features --skip-children --check $file)"
fi fi
done done
if grep --quiet "^[-+]" <<< "$diff"; then if grep --quiet "^[-+]" <<< "$diff"; then

View File

@@ -19,15 +19,15 @@ serde1 = []
travis-ci = { repository = "google/tarpc" } travis-ci = { repository = "google/tarpc" }
[dependencies] [dependencies]
syn = { version = "1.0.11", features = ["full"] } proc-macro2 = "1.0"
quote = "1.0.2" quote = "1.0"
proc-macro2 = "1.0.6" syn = { version = "1.0", features = ["full"] }
[lib] [lib]
proc-macro = true proc-macro = true
[dev-dependencies] [dev-dependencies]
assert-type-eq = "0.1.0"
futures = "0.3" futures = "0.3"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
tarpc = { path = "../tarpc" } tarpc = { path = "../tarpc" }
assert-type-eq = "0.1.0"

View File

@@ -328,60 +328,6 @@ fn transform_method(method: &mut ImplItemMethod) -> ImplItemType {
t t
} }
/// Syntactic sugar to make using async functions in the server implementation
/// easier. It does this by rewriting code like this, which would normally not
/// compile because async functions are disallowed in trait implementations:
///
/// ```rust
/// # extern crate tarpc;
/// # use tarpc::context;
/// # use std::net::SocketAddr;
/// #[tarpc_plugins::service]
/// trait World {
/// async fn hello(name: String) -> String;
/// }
///
/// #[derive(Clone)]
/// struct HelloServer(SocketAddr);
///
/// #[tarpc_plugins::server]
/// impl World for HelloServer {
/// async fn hello(self, _: context::Context, name: String) -> String {
/// format!("Hello, {}! You are connected from {:?}.", name, self.0)
/// }
/// }
/// ```
///
/// Into code like this, which matches the service trait definition:
///
/// ```rust
/// # extern crate tarpc;
/// # use tarpc::context;
/// # use std::pin::Pin;
/// # use futures::Future;
/// # use std::net::SocketAddr;
/// #[tarpc_plugins::service]
/// trait World {
/// async fn hello(name: String) -> String;
/// }
///
/// #[derive(Clone)]
/// struct HelloServer(SocketAddr);
///
/// impl World for HelloServer {
/// type HelloFut = Pin<Box<dyn Future<Output = String> + Send>>;
///
/// fn hello(self, _: context::Context, name: String) -> Pin<Box<dyn Future<Output = String>
/// + Send>> {
/// Box::pin(async move {
/// format!("Hello, {}! You are connected from {:?}.", name, self.0)
/// })
/// }
/// }
/// ```
///
/// Note that this won't touch functions unless they have been annotated with
/// `async`, meaning that this should not break existing code.
#[proc_macro_attribute] #[proc_macro_attribute]
pub fn server(_attr: TokenStream, input: TokenStream) -> TokenStream { pub fn server(_attr: TokenStream, input: TokenStream) -> TokenStream {
let mut item = syn::parse_macro_input!(input as ItemImpl); let mut item = syn::parse_macro_input!(input as ItemImpl);

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "tarpc" name = "tarpc"
version = "0.21.1" version = "0.23.0"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"] authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
edition = "2018" edition = "2018"
license = "MIT" license = "MIT"
@@ -29,29 +29,27 @@ travis-ci = { repository = "google/tarpc" }
anyhow = "1.0" anyhow = "1.0"
fnv = "1.0" fnv = "1.0"
futures = "0.3" futures = "0.3"
humantime = "1.0" humantime = "2.0"
log = "0.4" log = "0.4"
pin-project = "0.4.17" pin-project = "1.0"
rand = "0.7" rand = "0.7"
tokio = { version = "0.2", features = ["time"] }
serde = { optional = true, version = "1.0", features = ["derive"] } serde = { optional = true, version = "1.0", features = ["derive"] }
static_assertions = "1.1.0" static_assertions = "1.1.0"
tokio-util = { optional = true, version = "0.2" }
tarpc-plugins = { path = "../plugins", version = "0.8" } tarpc-plugins = { path = "../plugins", version = "0.8" }
tokio = { version = "0.3" }
tokio-util = { optional = true, version = "0.4" }
tokio-serde = { optional = true, version = "0.6" } tokio-serde = { optional = true, version = "0.6" }
[dev-dependencies] [dev-dependencies]
assert_matches = "1.0" assert_matches = "1.4"
bincode = "1.3" bincode = "1.3"
bytes = { version = "0.5", features = ["serde"] } bytes = { version = "0.5", features = ["serde"] }
env_logger = "0.6" env_logger = "0.8"
flate2 = "1.0.16" flate2 = "1.0"
futures = "0.3"
humantime = "1.0"
log = "0.4" log = "0.4"
pin-utils = "0.1.0-alpha" pin-utils = "0.1.0-alpha"
serde_bytes = "0.11" serde_bytes = "0.11"
tokio = { version = "0.2", features = ["full"] } tokio = { version = "0.3", features = ["full"] }
tokio-serde = { version = "0.6", features = ["json", "bincode"] } tokio-serde = { version = "0.6", features = ["json", "bincode"] }
trybuild = "1.0" trybuild = "1.0"

View File

@@ -118,7 +118,7 @@ async fn main() -> anyhow::Result<()> {
.await; .await;
}); });
let transport = tcp::connect(addr, Bincode::default()).await?; let transport = tcp::connect(addr, Bincode::default).await?;
let mut client = let mut client =
WorldClient::new(client::Config::default(), add_compression(transport)).spawn()?; WorldClient::new(client::Config::default(), add_compression(transport)).spawn()?;

View File

@@ -18,7 +18,7 @@
/// messages to all clients subscribed to the topic of that message. /// messages to all clients subscribed to the topic of that message.
/// ///
/// Subscriber Publisher PubSub Server /// Subscriber Publisher PubSub Server
/// T1 | | | /// T1 | | |
/// T2 |-----Connect------------------------------------------------------>| /// T2 |-----Connect------------------------------------------------------>|
/// T3 | | | /// T3 | | |
/// T2 |<-------------------------------------------------------Topics-----| /// T2 |<-------------------------------------------------------Topics-----|
@@ -103,7 +103,7 @@ impl Subscriber {
publisher_addr: impl ToSocketAddrs, publisher_addr: impl ToSocketAddrs,
topics: Vec<String>, topics: Vec<String>,
) -> anyhow::Result<SubscriberHandle> { ) -> anyhow::Result<SubscriberHandle> {
let publisher = tcp::connect(publisher_addr, Json::default()).await?; let publisher = tcp::connect(publisher_addr, Json::default).await?;
let local_addr = publisher.local_addr()?; let local_addr = publisher.local_addr()?;
let mut handler = server::BaseChannel::with_defaults(publisher) let mut handler = server::BaseChannel::with_defaults(publisher)
.respond_with(Subscriber { local_addr, topics }.serve()); .respond_with(Subscriber { local_addr, topics }.serve());
@@ -308,7 +308,7 @@ async fn main() -> anyhow::Result<()> {
let mut publisher = publisher::PublisherClient::new( let mut publisher = publisher::PublisherClient::new(
client::Config::default(), client::Config::default(),
tcp::connect(addrs.publisher, Json::default()).await?, tcp::connect(addrs.publisher, Json::default).await?,
) )
.spawn()?; .spawn()?;

View File

@@ -62,7 +62,7 @@ async fn main() -> io::Result<()> {
}; };
tokio::spawn(server); tokio::spawn(server);
let transport = tarpc::serde_transport::tcp::connect(addr, Json::default()).await?; let transport = tarpc::serde_transport::tcp::connect(addr, Json::default).await?;
// WorldClient is generated by the tarpc::service attribute. It has a constructor `new` that // WorldClient is generated by the tarpc::service attribute. It has a constructor `new` that
// takes a config and any Transport as input. // takes a config and any Transport as input.

View File

@@ -68,7 +68,7 @@ async fn main() -> io::Result<()> {
.respond_with(AddServer.serve()); .respond_with(AddServer.serve());
tokio::spawn(add_server); tokio::spawn(add_server);
let to_add_server = tarpc::serde_transport::tcp::connect(addr, Json::default()).await?; let to_add_server = tarpc::serde_transport::tcp::connect(addr, Json::default).await?;
let add_client = add::AddClient::new(client::Config::default(), to_add_server).spawn()?; let add_client = add::AddClient::new(client::Config::default(), to_add_server).spawn()?;
let double_listener = tarpc::serde_transport::tcp::listen("localhost:0", Json::default) let double_listener = tarpc::serde_transport::tcp::listen("localhost:0", Json::default)
@@ -81,7 +81,7 @@ async fn main() -> io::Result<()> {
.respond_with(DoubleServer { add_client }.serve()); .respond_with(DoubleServer { add_client }.serve());
tokio::spawn(double_server); tokio::spawn(double_server);
let to_double_server = tarpc::serde_transport::tcp::connect(addr, Json::default()).await?; let to_double_server = tarpc::serde_transport::tcp::connect(addr, Json::default).await?;
let mut double_client = let mut double_client =
double::DoubleClient::new(client::Config::default(), to_double_server).spawn()?; double::DoubleClient::new(client::Config::default(), to_double_server).spawn()?;

View File

@@ -4,9 +4,6 @@
// license that can be found in the LICENSE file or at // license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT. // https://opensource.org/licenses/MIT.
//! [![Latest Version](https://img.shields.io/crates/v/tarpc.svg)](https://crates.io/crates/tarpc)
//! [![Join the chat at https://gitter.im/tarpc/Lobby](https://badges.gitter.im/tarpc/Lobby.svg)](https://gitter.im/tarpc/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
//!
//! *Disclaimer*: This is not an official Google product. //! *Disclaimer*: This is not an official Google product.
//! //!
//! tarpc is an RPC framework for rust with a focus on ease of use. Defining a //! tarpc is an RPC framework for rust with a focus on ease of use. Defining a
@@ -50,7 +47,7 @@
//! Add to your `Cargo.toml` dependencies: //! Add to your `Cargo.toml` dependencies:
//! //!
//! ```toml //! ```toml
//! tarpc = "0.21.0" //! tarpc = "0.23.0"
//! ``` //! ```
//! //!
//! The `tarpc::service` attribute expands to a collection of items that form an rpc service. //! The `tarpc::service` attribute expands to a collection of items that form an rpc service.
@@ -64,7 +61,7 @@
//! //!
//! ```toml //! ```toml
//! futures = "0.3" //! futures = "0.3"
//! tokio = "0.2" //! tokio = "0.3"
//! ``` //! ```
//! //!
//! In the following example, we use an in-process channel for communication between //! In the following example, we use an in-process channel for communication between

View File

@@ -88,7 +88,7 @@ impl<'a, Req, Resp> Future for Call<'a, Req, Resp> {
let resp = ready!(self.as_mut().project().fut.poll(cx)); let resp = ready!(self.as_mut().project().fut.poll(cx));
Poll::Ready(match resp { Poll::Ready(match resp {
Ok(resp) => resp, Ok(resp) => resp,
Err(tokio::time::Elapsed { .. }) => Err(io::Error::new( Err(tokio::time::error::Elapsed { .. }) => Err(io::Error::new(
io::ErrorKind::TimedOut, io::ErrorKind::TimedOut,
"Client dropped expired request.".to_string(), "Client dropped expired request.".to_string(),
)), )),
@@ -723,7 +723,7 @@ mod tests {
}; };
use std::{pin::Pin, sync::atomic::AtomicU64, sync::Arc}; use std::{pin::Pin, sync::atomic::AtomicU64, sync::Arc};
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn dispatch_response_cancels_on_drop() { async fn dispatch_response_cancels_on_drop() {
let (cancellation, mut canceled_requests) = cancellations(); let (cancellation, mut canceled_requests) = cancellations();
let (_, response) = oneshot::channel(); let (_, response) = oneshot::channel();
@@ -738,7 +738,7 @@ mod tests {
assert_eq!(canceled_requests.0.try_next().unwrap(), Some(3)); assert_eq!(canceled_requests.0.try_next().unwrap(), Some(3));
} }
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn stage_request() { async fn stage_request() {
let (mut dispatch, mut channel, _server_channel) = set_up(); let (mut dispatch, mut channel, _server_channel) = set_up();
let dispatch = Pin::new(&mut dispatch); let dispatch = Pin::new(&mut dispatch);
@@ -755,7 +755,7 @@ mod tests {
} }
// Regression test for https://github.com/google/tarpc/issues/220 // Regression test for https://github.com/google/tarpc/issues/220
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn stage_request_channel_dropped_doesnt_panic() { async fn stage_request_channel_dropped_doesnt_panic() {
let (mut dispatch, mut channel, mut server_channel) = set_up(); let (mut dispatch, mut channel, mut server_channel) = set_up();
let mut dispatch = Pin::new(&mut dispatch); let mut dispatch = Pin::new(&mut dispatch);
@@ -776,7 +776,7 @@ mod tests {
dispatch.await.unwrap(); dispatch.await.unwrap();
} }
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn stage_request_response_future_dropped_is_canceled_before_sending() { async fn stage_request_response_future_dropped_is_canceled_before_sending() {
let (mut dispatch, mut channel, _server_channel) = set_up(); let (mut dispatch, mut channel, _server_channel) = set_up();
let dispatch = Pin::new(&mut dispatch); let dispatch = Pin::new(&mut dispatch);
@@ -791,7 +791,7 @@ mod tests {
assert!(dispatch.poll_next_request(cx).ready().is_none()); assert!(dispatch.poll_next_request(cx).ready().is_none());
} }
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn stage_request_response_future_dropped_is_canceled_after_sending() { async fn stage_request_response_future_dropped_is_canceled_after_sending() {
let (mut dispatch, mut channel, _server_channel) = set_up(); let (mut dispatch, mut channel, _server_channel) = set_up();
let cx = &mut Context::from_waker(&noop_waker_ref()); let cx = &mut Context::from_waker(&noop_waker_ref());
@@ -813,7 +813,7 @@ mod tests {
assert!(dispatch.project().in_flight_requests.is_empty()); assert!(dispatch.project().in_flight_requests.is_empty());
} }
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn stage_request_response_closed_skipped() { async fn stage_request_response_closed_skipped() {
let (mut dispatch, mut channel, _server_channel) = set_up(); let (mut dispatch, mut channel, _server_channel) = set_up();
let dispatch = Pin::new(&mut dispatch); let dispatch = Pin::new(&mut dispatch);

View File

@@ -565,7 +565,7 @@ where
request_id: self.request_id, request_id: self.request_id,
message: match result { message: match result {
Ok(message) => Ok(message), Ok(message) => Ok(message),
Err(tokio::time::Elapsed { .. }) => { Err(tokio::time::error::Elapsed { .. }) => {
debug!( debug!(
"[{}] Response did not complete before deadline of {}s.", "[{}] Response did not complete before deadline of {}s.",
self.ctx.trace_id(), self.ctx.trace_id(),
@@ -624,11 +624,7 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop { loop {
let read = self.as_mut().pump_read(cx)?; let read = self.as_mut().pump_read(cx)?;
let read_closed = if let Poll::Ready(None) = read { let read_closed = matches!(read, Poll::Ready(None));
true
} else {
false
};
match (read, self.as_mut().pump_write(cx, read_closed)?) { match (read, self.as_mut().pump_write(cx, read_closed)?) {
(Poll::Ready(None), Poll::Ready(None)) => { (Poll::Ready(None), Poll::Ready(None)) => {
return Poll::Ready(None); return Poll::Ready(None);

View File

@@ -1,3 +1,9 @@
// Copyright 2020 Google LLC
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
use crate::server::{Channel, Config}; use crate::server::{Channel, Config};
use crate::{context, Request, Response}; use crate::{context, Request, Response};
use fnv::FnvHashSet; use fnv::FnvHashSet;
@@ -111,10 +117,7 @@ pub trait PollExt {
impl<T> PollExt for Poll<Option<T>> { impl<T> PollExt for Poll<Option<T>> {
fn is_done(&self) -> bool { fn is_done(&self) -> bool {
match self { matches!(self, Poll::Ready(None))
Poll::Ready(None) => true,
_ => false,
}
} }
} }

View File

@@ -1,3 +1,9 @@
// Copyright 2020 Google LLC
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
use super::{Channel, Config}; use super::{Channel, Config};
use crate::{Response, ServerError}; use crate::{Response, ServerError};
use futures::{future::AbortRegistration, prelude::*, ready, task::*}; use futures::{future::AbortRegistration, prelude::*, ready, task::*};

View File

@@ -90,7 +90,7 @@ mod tests {
use std::io; use std::io;
#[cfg(feature = "tokio1")] #[cfg(feature = "tokio1")]
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn integration() -> io::Result<()> { async fn integration() -> io::Result<()> {
let _ = env_logger::try_init(); let _ = env_logger::try_init();

View File

@@ -14,7 +14,10 @@ use serde::{Deserialize, Serialize};
use std::{error::Error, io, pin::Pin}; use std::{error::Error, io, pin::Pin};
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tokio_serde::{Framed as SerdeFramed, *}; use tokio_serde::{Framed as SerdeFramed, *};
use tokio_util::codec::{length_delimited::LengthDelimitedCodec, Framed}; use tokio_util::codec::{
length_delimited::{self, LengthDelimitedCodec},
Framed,
};
/// A transport that serializes to, and deserializes from, a byte stream. /// A transport that serializes to, and deserializes from, a byte stream.
#[pin_project] #[pin_project]
@@ -90,6 +93,22 @@ fn convert<E: Into<Box<dyn Error + Send + Sync>>>(
poll.map(|ready| ready.map_err(|e| io::Error::new(io::ErrorKind::Other, e))) poll.map(|ready| ready.map_err(|e| io::Error::new(io::ErrorKind::Other, e)))
} }
/// Constructs a new transport from a framed transport and a serialization codec.
pub fn new<S, Item, SinkItem, Codec>(
framed_io: Framed<S, LengthDelimitedCodec>,
codec: Codec,
) -> Transport<S, Item, SinkItem, Codec>
where
S: AsyncWrite + AsyncRead,
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
Codec: Serializer<SinkItem> + Deserializer<Item>,
{
Transport {
inner: SerdeFramed::new(framed_io, codec),
}
}
impl<S, Item, SinkItem, Codec> From<(S, Codec)> for Transport<S, Item, SinkItem, Codec> impl<S, Item, SinkItem, Codec> From<(S, Codec)> for Transport<S, Item, SinkItem, Codec>
where where
S: AsyncWrite + AsyncRead, S: AsyncWrite + AsyncRead,
@@ -97,10 +116,8 @@ where
SinkItem: Serialize, SinkItem: Serialize,
Codec: Serializer<SinkItem> + Deserializer<Item>, Codec: Serializer<SinkItem> + Deserializer<Item>,
{ {
fn from((inner, codec): (S, Codec)) -> Self { fn from((io, codec): (S, Codec)) -> Self {
Transport { new(Framed::new(io, LengthDelimitedCodec::new()), codec)
inner: SerdeFramed::new(Framed::new(inner, LengthDelimitedCodec::new()), codec),
}
} }
} }
@@ -134,34 +151,65 @@ pub mod tcp {
} }
} }
/// Returns a new JSON transport that reads from and writes to `io`. /// A connection Future that also exposes the length-delimited framing config.
pub fn new<Item, SinkItem, Codec>( #[pin_project]
io: TcpStream, pub struct Connect<T, Item, SinkItem, CodecFn> {
codec: Codec, #[pin]
) -> Transport<TcpStream, Item, SinkItem, Codec> inner: T,
codec_fn: CodecFn,
config: length_delimited::Builder,
ghost: PhantomData<(fn(SinkItem), fn() -> Item)>,
}
impl<T, Item, SinkItem, Codec, CodecFn> Future for Connect<T, Item, SinkItem, CodecFn>
where where
T: Future<Output = io::Result<TcpStream>>,
Item: for<'de> Deserialize<'de>, Item: for<'de> Deserialize<'de>,
SinkItem: Serialize, SinkItem: Serialize,
Codec: Serializer<SinkItem> + Deserializer<Item>, Codec: Serializer<SinkItem> + Deserializer<Item>,
CodecFn: Fn() -> Codec,
{ {
Transport::from((io, codec)) type Output = io::Result<Transport<TcpStream, Item, SinkItem, Codec>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let io = ready!(self.as_mut().project().inner.poll(cx))?;
Poll::Ready(Ok(new(self.config.new_framed(io), (self.codec_fn)())))
}
} }
/// Connects to `addr`, wrapping the connection in a JSON transport. impl<T, Item, SinkItem, CodecFn> Connect<T, Item, SinkItem, CodecFn> {
pub async fn connect<A, Item, SinkItem, Codec>( /// Returns an immutable reference to the length-delimited codec's config.
pub fn config(&self) -> &length_delimited::Builder {
&self.config
}
/// Returns a mutable reference to the length-delimited codec's config.
pub fn config_mut(&mut self) -> &mut length_delimited::Builder {
&mut self.config
}
}
/// Connects to `addr`, wrapping the connection in a TCP transport.
pub fn connect<A, Item, SinkItem, Codec, CodecFn>(
addr: A, addr: A,
codec: Codec, codec_fn: CodecFn,
) -> io::Result<Transport<TcpStream, Item, SinkItem, Codec>> ) -> Connect<impl Future<Output = io::Result<TcpStream>>, Item, SinkItem, CodecFn>
where where
A: ToSocketAddrs, A: ToSocketAddrs,
Item: for<'de> Deserialize<'de>, Item: for<'de> Deserialize<'de>,
SinkItem: Serialize, SinkItem: Serialize,
Codec: Serializer<SinkItem> + Deserializer<Item>, Codec: Serializer<SinkItem> + Deserializer<Item>,
CodecFn: Fn() -> Codec,
{ {
Ok(new(TcpStream::connect(addr).await?, codec)) Connect {
inner: TcpStream::connect(addr),
codec_fn,
config: LengthDelimitedCodec::builder(),
ghost: PhantomData,
}
} }
/// Listens on `addr`, wrapping accepted connections in JSON transports. /// Listens on `addr`, wrapping accepted connections in TCP transports.
pub async fn listen<A, Item, SinkItem, Codec, CodecFn>( pub async fn listen<A, Item, SinkItem, Codec, CodecFn>(
addr: A, addr: A,
codec_fn: CodecFn, codec_fn: CodecFn,
@@ -178,6 +226,7 @@ pub mod tcp {
listener, listener,
codec_fn, codec_fn,
local_addr, local_addr,
config: LengthDelimitedCodec::builder(),
ghost: PhantomData, ghost: PhantomData,
}) })
} }
@@ -189,7 +238,8 @@ pub mod tcp {
listener: TcpListener, listener: TcpListener,
local_addr: SocketAddr, local_addr: SocketAddr,
codec_fn: CodecFn, codec_fn: CodecFn,
ghost: PhantomData<(Item, SinkItem, Codec)>, config: length_delimited::Builder,
ghost: PhantomData<(fn() -> Item, fn(SinkItem), Codec)>,
} }
impl<Item, SinkItem, Codec, CodecFn> Incoming<Item, SinkItem, Codec, CodecFn> { impl<Item, SinkItem, Codec, CodecFn> Incoming<Item, SinkItem, Codec, CodecFn> {
@@ -197,6 +247,16 @@ pub mod tcp {
pub fn local_addr(&self) -> SocketAddr { pub fn local_addr(&self) -> SocketAddr {
self.local_addr self.local_addr
} }
/// Returns an immutable reference to the length-delimited codec's config.
pub fn config(&self) -> &length_delimited::Builder {
&self.config
}
/// Returns a mutable reference to the length-delimited codec's config.
pub fn config_mut(&mut self) -> &mut length_delimited::Builder {
&mut self.config
}
} }
impl<Item, SinkItem, Codec, CodecFn> Stream for Incoming<Item, SinkItem, Codec, CodecFn> impl<Item, SinkItem, Codec, CodecFn> Stream for Incoming<Item, SinkItem, Codec, CodecFn>
@@ -209,9 +269,12 @@ pub mod tcp {
type Item = io::Result<Transport<TcpStream, Item, SinkItem, Codec>>; type Item = io::Result<Transport<TcpStream, Item, SinkItem, Codec>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let next = let conn: TcpStream =
ready!(Pin::new(&mut self.as_mut().project().listener.incoming()).poll_next(cx)?); ready!(Pin::new(&mut self.as_mut().project().listener).poll_accept(cx)?).0;
Poll::Ready(next.map(|conn| Ok(new(conn, (self.codec_fn)())))) Poll::Ready(Some(Ok(new(
self.config.new_framed(conn),
(self.codec_fn)(),
))))
} }
} }
} }
@@ -226,7 +289,7 @@ mod tests {
io::{self, Cursor}, io::{self, Cursor},
pin::Pin, pin::Pin,
}; };
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_serde::formats::SymmetricalJson; use tokio_serde::formats::SymmetricalJson;
fn ctx() -> Context<'static> { fn ctx() -> Context<'static> {
@@ -241,8 +304,8 @@ mod tests {
fn poll_read( fn poll_read(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut [u8], buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<()>> {
AsyncRead::poll_read(Pin::new(self.0.get_mut()), cx, buf) AsyncRead::poll_read(Pin::new(self.0.get_mut()), cx, buf)
} }
} }
@@ -285,8 +348,8 @@ mod tests {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
_cx: &mut Context<'_>, _cx: &mut Context<'_>,
_buf: &mut [u8], _buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<()>> {
unreachable!() unreachable!()
} }
} }

View File

@@ -36,7 +36,7 @@ impl Service for Server {
} }
} }
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn sequential() -> io::Result<()> { async fn sequential() -> io::Result<()> {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
@@ -59,7 +59,7 @@ async fn sequential() -> io::Result<()> {
} }
#[cfg(feature = "serde1")] #[cfg(feature = "serde1")]
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn serde() -> io::Result<()> { async fn serde() -> io::Result<()> {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
@@ -71,7 +71,7 @@ async fn serde() -> io::Result<()> {
.respond_with(Server.serve()), .respond_with(Server.serve()),
); );
let transport = serde_transport::tcp::connect(addr, Json::default()).await?; let transport = serde_transport::tcp::connect(addr, Json::default).await?;
let mut client = ServiceClient::new(client::Config::default(), transport).spawn()?; let mut client = ServiceClient::new(client::Config::default(), transport).spawn()?;
assert_matches!(client.add(context::current(), 1, 2).await, Ok(3)); assert_matches!(client.add(context::current(), 1, 2).await, Ok(3));
@@ -83,7 +83,7 @@ async fn serde() -> io::Result<()> {
Ok(()) Ok(())
} }
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn concurrent() -> io::Result<()> { async fn concurrent() -> io::Result<()> {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
@@ -112,7 +112,7 @@ async fn concurrent() -> io::Result<()> {
Ok(()) Ok(())
} }
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn concurrent_join() -> io::Result<()> { async fn concurrent_join() -> io::Result<()> {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
@@ -142,7 +142,7 @@ async fn concurrent_join() -> io::Result<()> {
Ok(()) Ok(())
} }
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn concurrent_join_all() -> io::Result<()> { async fn concurrent_join_all() -> io::Result<()> {
let _ = env_logger::try_init(); let _ = env_logger::try_init();