mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-02-23 15:49:54 +01:00
Compare commits
13 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b9868250f8 | ||
|
|
a3f1064efe | ||
|
|
026083d653 | ||
|
|
d27f341bde | ||
|
|
2264ebecfc | ||
|
|
3207affb4a | ||
|
|
0602afd50c | ||
|
|
4343e12217 | ||
|
|
7fda862fb8 | ||
|
|
aa7b875b1a | ||
|
|
54d6e0e3b6 | ||
|
|
bea3b442aa | ||
|
|
954a2502e7 |
36
README.md
36
README.md
@@ -14,6 +14,8 @@
|
|||||||
|
|
||||||
# tarpc
|
# tarpc
|
||||||
|
|
||||||
|
<!-- cargo-sync-readme start -->
|
||||||
|
|
||||||
*Disclaimer*: This is not an official Google product.
|
*Disclaimer*: This is not an official Google product.
|
||||||
|
|
||||||
tarpc is an RPC framework for rust with a focus on ease of use. Defining a
|
tarpc is an RPC framework for rust with a focus on ease of use. Defining a
|
||||||
@@ -22,7 +24,7 @@ writing a server is taken care of for you.
|
|||||||
|
|
||||||
[Documentation](https://docs.rs/crate/tarpc/)
|
[Documentation](https://docs.rs/crate/tarpc/)
|
||||||
|
|
||||||
### What is an RPC framework?
|
## What is an RPC framework?
|
||||||
"RPC" stands for "Remote Procedure Call," a function call where the work of
|
"RPC" stands for "Remote Procedure Call," a function call where the work of
|
||||||
producing the return value is being done somewhere else. When an rpc function is
|
producing the return value is being done somewhere else. When an rpc function is
|
||||||
invoked, behind the scenes the function contacts some other process somewhere
|
invoked, behind the scenes the function contacts some other process somewhere
|
||||||
@@ -40,7 +42,7 @@ process, and no context switching between different languages.
|
|||||||
Some other features of tarpc:
|
Some other features of tarpc:
|
||||||
- Pluggable transport: any type impling `Stream<Item = Request> + Sink<Response>` can be
|
- Pluggable transport: any type impling `Stream<Item = Request> + Sink<Response>` can be
|
||||||
used as a transport to connect the client and server.
|
used as a transport to connect the client and server.
|
||||||
- `Send` optional: if the transport doesn't require it, neither does tarpc!
|
- `Send + 'static` optional: if the transport doesn't require it, neither does tarpc!
|
||||||
- Cascading cancellation: dropping a request will send a cancellation message to the server.
|
- Cascading cancellation: dropping a request will send a cancellation message to the server.
|
||||||
The server will cease any unfinished work on the request, subsequently cancelling any of its
|
The server will cease any unfinished work on the request, subsequently cancelling any of its
|
||||||
own requests, repeating for the entire chain of transitive dependencies.
|
own requests, repeating for the entire chain of transitive dependencies.
|
||||||
@@ -53,25 +55,25 @@ Some other features of tarpc:
|
|||||||
responses `Serialize + Deserialize`. It's entirely optional, though: in-memory transports can
|
responses `Serialize + Deserialize`. It's entirely optional, though: in-memory transports can
|
||||||
be used, as well, so the price of serialization doesn't have to be paid when it's not needed.
|
be used, as well, so the price of serialization doesn't have to be paid when it's not needed.
|
||||||
|
|
||||||
### Usage
|
## Usage
|
||||||
Add to your `Cargo.toml` dependencies:
|
Add to your `Cargo.toml` dependencies:
|
||||||
|
|
||||||
```toml
|
```toml
|
||||||
tarpc = { version = "0.21.0", features = ["full"] }
|
tarpc = "0.23.0"
|
||||||
```
|
```
|
||||||
|
|
||||||
The `tarpc::service` attribute expands to a collection of items that form an rpc service.
|
The `tarpc::service` attribute expands to a collection of items that form an rpc service.
|
||||||
These generated types make it easy and ergonomic to write servers with less boilerplate.
|
These generated types make it easy and ergonomic to write servers with less boilerplate.
|
||||||
Simply implement the generated service trait, and you're off to the races!
|
Simply implement the generated service trait, and you're off to the races!
|
||||||
|
|
||||||
### Example
|
## Example
|
||||||
|
|
||||||
For this example, in addition to tarpc, also add two other dependencies to
|
For this example, in addition to tarpc, also add two other dependencies to
|
||||||
your `Cargo.toml`:
|
your `Cargo.toml`:
|
||||||
|
|
||||||
```toml
|
```toml
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
tokio = "0.2"
|
tokio = "0.3"
|
||||||
```
|
```
|
||||||
|
|
||||||
In the following example, we use an in-process channel for communication between
|
In the following example, we use an in-process channel for communication between
|
||||||
@@ -81,6 +83,7 @@ For a more real-world example, see [example-service](example-service).
|
|||||||
First, let's set up the dependencies and service definition.
|
First, let's set up the dependencies and service definition.
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
|
|
||||||
use futures::{
|
use futures::{
|
||||||
future::{self, Ready},
|
future::{self, Ready},
|
||||||
prelude::*,
|
prelude::*,
|
||||||
@@ -109,19 +112,22 @@ implement it for our Server struct.
|
|||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct HelloServer;
|
struct HelloServer;
|
||||||
|
|
||||||
#[tarpc::server]
|
|
||||||
impl World for HelloServer {
|
impl World for HelloServer {
|
||||||
async fn hello(self, _: context::Context, name: String) -> String {
|
// Each defined rpc generates two items in the trait, a fn that serves the RPC, and
|
||||||
format!("Hello, {}!", name)
|
// an associated type representing the future output by the fn.
|
||||||
|
|
||||||
|
type HelloFut = Ready<String>;
|
||||||
|
|
||||||
|
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
|
||||||
|
future::ready(format!("Hello, {}!", name))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
Lastly let's write our `main` that will start the server. While this example uses an
|
Lastly let's write our `main` that will start the server. While this example uses an
|
||||||
[in-process
|
[in-process channel](rpc::transport::channel), tarpc also ships a generic [`serde_transport`]
|
||||||
channel](https://docs.rs/tarpc/0.18.0/tarpc/transport/channel/struct.UnboundedChannel.html),
|
behind the `serde-transport` feature, with additional [TCP](serde_transport::tcp) functionality
|
||||||
tarpc also ships bincode and JSON
|
available behind the `tcp` feature.
|
||||||
tokio-net based TCP transports that are generic over all serializable types.
|
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
@@ -151,9 +157,11 @@ async fn main() -> io::Result<()> {
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
### Service Documentation
|
## Service Documentation
|
||||||
|
|
||||||
Use `cargo doc` as you normally would to see the documentation created for all
|
Use `cargo doc` as you normally would to see the documentation created for all
|
||||||
items expanded by a `service!` invocation.
|
items expanded by a `service!` invocation.
|
||||||
|
|
||||||
|
<!-- cargo-sync-readme end -->
|
||||||
|
|
||||||
License: MIT
|
License: MIT
|
||||||
|
|||||||
36
RELEASES.md
36
RELEASES.md
@@ -1,3 +1,35 @@
|
|||||||
|
## 0.23.0 (2020-10-19)
|
||||||
|
|
||||||
|
### Breaking Changes
|
||||||
|
|
||||||
|
Upgrades tokio to 0.3.
|
||||||
|
|
||||||
|
## 0.22.0 (2020-08-02)
|
||||||
|
|
||||||
|
This release adds some flexibility and consistency to `serde_transport`, with one new feature and
|
||||||
|
one small breaking change.
|
||||||
|
|
||||||
|
### New Features
|
||||||
|
|
||||||
|
`serde_transport::tcp` now exposes framing configuration on `connect()` and `listen()`. This is
|
||||||
|
useful if, for instance, you want to send requests or responses that are larger than the maximum
|
||||||
|
payload allowed by default:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
let mut transport = tarpc::serde_transport::tcp::connect(server_addr, Json::default);
|
||||||
|
transport.config_mut().max_frame_length(4294967296);
|
||||||
|
let mut client = MyClient::new(client::Config::default(), transport.await?).spawn()?;
|
||||||
|
```
|
||||||
|
|
||||||
|
### Breaking Changes
|
||||||
|
|
||||||
|
The codec argument to `serde_transport::tcp::connect` changed from a Codec to impl Fn() -> Codec,
|
||||||
|
to be consistent with `serde_transport::tcp::listen`. While only one Codec is needed, more than one
|
||||||
|
person has been tripped up by the inconsistency between `connect` and `listen`. Unfortunately, the
|
||||||
|
compiler errors are not much help in this case, so it was decided to simply do the more intuitive
|
||||||
|
thing so that the compiler doesn't need to step in in the first place.
|
||||||
|
|
||||||
|
|
||||||
## 0.21.1 (2020-08-02)
|
## 0.21.1 (2020-08-02)
|
||||||
|
|
||||||
### New Features
|
### New Features
|
||||||
@@ -62,7 +94,7 @@ nameable futures and will just be boxing the return type anyway. This macro does
|
|||||||
### Bug Fixes
|
### Bug Fixes
|
||||||
|
|
||||||
- https://github.com/google/tarpc/issues/304
|
- https://github.com/google/tarpc/issues/304
|
||||||
|
|
||||||
A race condition in code that limits number of connections per client caused occasional panics.
|
A race condition in code that limits number of connections per client caused occasional panics.
|
||||||
|
|
||||||
- https://github.com/google/tarpc/pull/295
|
- https://github.com/google/tarpc/pull/295
|
||||||
@@ -82,7 +114,7 @@ nameable futures and will just be boxing the return type anyway. This macro does
|
|||||||
|
|
||||||
## 0.13.0 (2018-10-16)
|
## 0.13.0 (2018-10-16)
|
||||||
|
|
||||||
### Breaking Changes
|
### Breaking Changes
|
||||||
|
|
||||||
Version 0.13 marks a significant departure from previous versions of tarpc. The
|
Version 0.13 marks a significant departure from previous versions of tarpc. The
|
||||||
API has changed significantly. The tokio-proto crate has been torn out and
|
API has changed significantly. The tokio-proto crate has been torn out and
|
||||||
|
|||||||
@@ -13,13 +13,14 @@ readme = "../README.md"
|
|||||||
description = "An example server built on tarpc."
|
description = "An example server built on tarpc."
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
clap = "2.0"
|
clap = "2.33"
|
||||||
|
env_logger = "0.8"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
serde = { version = "1.0" }
|
serde = { version = "1.0" }
|
||||||
tarpc = { version = "0.21", path = "../tarpc", features = ["full"] }
|
tarpc = { version = "0.23", path = "../tarpc", features = ["full"] }
|
||||||
tokio = { version = "0.2", features = ["full"] }
|
tokio = { version = "0.3", features = ["full"] }
|
||||||
tokio-serde = { version = "0.6", features = ["json"] }
|
tokio-serde = { version = "0.6", features = ["json"] }
|
||||||
env_logger = "0.6"
|
tokio-util = { version = "0.4", features = ["codec"] }
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
name = "service"
|
name = "service"
|
||||||
|
|||||||
@@ -43,11 +43,13 @@ async fn main() -> io::Result<()> {
|
|||||||
|
|
||||||
let name = flags.value_of("name").unwrap().into();
|
let name = flags.value_of("name").unwrap().into();
|
||||||
|
|
||||||
let transport = tarpc::serde_transport::tcp::connect(server_addr, Json::default()).await?;
|
let mut transport = tarpc::serde_transport::tcp::connect(server_addr, Json::default);
|
||||||
|
transport.config_mut().max_frame_length(4294967296);
|
||||||
|
|
||||||
// WorldClient is generated by the service attribute. It has a constructor `new` that takes a
|
// WorldClient is generated by the service attribute. It has a constructor `new` that takes a
|
||||||
// config and any Transport as input.
|
// config and any Transport as input.
|
||||||
let mut client = service::WorldClient::new(client::Config::default(), transport).spawn()?;
|
let mut client =
|
||||||
|
service::WorldClient::new(client::Config::default(), transport.await?).spawn()?;
|
||||||
|
|
||||||
// The client has an RPC method for each RPC defined in the annotated trait. It takes the same
|
// The client has an RPC method for each RPC defined in the annotated trait. It takes the same
|
||||||
// args as defined, with the addition of a Context, which is always the first arg. The Context
|
// args as defined, with the addition of a Context, which is always the first arg. The Context
|
||||||
|
|||||||
@@ -57,8 +57,9 @@ async fn main() -> io::Result<()> {
|
|||||||
|
|
||||||
// JSON transport is provided by the json_transport tarpc module. It makes it easy
|
// JSON transport is provided by the json_transport tarpc module. It makes it easy
|
||||||
// to start up a serde-powered json serialization strategy over TCP.
|
// to start up a serde-powered json serialization strategy over TCP.
|
||||||
tarpc::serde_transport::tcp::listen(&server_addr, Json::default)
|
let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Json::default).await?;
|
||||||
.await?
|
listener.config_mut().max_frame_length(4294967296);
|
||||||
|
listener
|
||||||
// Ignore accept errors.
|
// Ignore accept errors.
|
||||||
.filter_map(|r| future::ready(r.ok()))
|
.filter_map(|r| future::ready(r.ok()))
|
||||||
.map(server::BaseChannel::with_defaults)
|
.map(server::BaseChannel::with_defaults)
|
||||||
|
|||||||
@@ -93,7 +93,7 @@ diff=""
|
|||||||
for file in $(git diff --name-only --cached);
|
for file in $(git diff --name-only --cached);
|
||||||
do
|
do
|
||||||
if [ ${file: -3} == ".rs" ]; then
|
if [ ${file: -3} == ".rs" ]; then
|
||||||
diff="$diff$(cargo fmt -- --skip-children --write-mode=diff $file)"
|
diff="$diff$(cargo fmt -- --unstable-features --skip-children --check $file)"
|
||||||
fi
|
fi
|
||||||
done
|
done
|
||||||
if grep --quiet "^[-+]" <<< "$diff"; then
|
if grep --quiet "^[-+]" <<< "$diff"; then
|
||||||
|
|||||||
@@ -19,15 +19,15 @@ serde1 = []
|
|||||||
travis-ci = { repository = "google/tarpc" }
|
travis-ci = { repository = "google/tarpc" }
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
syn = { version = "1.0.11", features = ["full"] }
|
proc-macro2 = "1.0"
|
||||||
quote = "1.0.2"
|
quote = "1.0"
|
||||||
proc-macro2 = "1.0.6"
|
syn = { version = "1.0", features = ["full"] }
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
proc-macro = true
|
proc-macro = true
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
assert-type-eq = "0.1.0"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
tarpc = { path = "../tarpc" }
|
tarpc = { path = "../tarpc" }
|
||||||
assert-type-eq = "0.1.0"
|
|
||||||
|
|||||||
@@ -328,60 +328,6 @@ fn transform_method(method: &mut ImplItemMethod) -> ImplItemType {
|
|||||||
t
|
t
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Syntactic sugar to make using async functions in the server implementation
|
|
||||||
/// easier. It does this by rewriting code like this, which would normally not
|
|
||||||
/// compile because async functions are disallowed in trait implementations:
|
|
||||||
///
|
|
||||||
/// ```rust
|
|
||||||
/// # extern crate tarpc;
|
|
||||||
/// # use tarpc::context;
|
|
||||||
/// # use std::net::SocketAddr;
|
|
||||||
/// #[tarpc_plugins::service]
|
|
||||||
/// trait World {
|
|
||||||
/// async fn hello(name: String) -> String;
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// #[derive(Clone)]
|
|
||||||
/// struct HelloServer(SocketAddr);
|
|
||||||
///
|
|
||||||
/// #[tarpc_plugins::server]
|
|
||||||
/// impl World for HelloServer {
|
|
||||||
/// async fn hello(self, _: context::Context, name: String) -> String {
|
|
||||||
/// format!("Hello, {}! You are connected from {:?}.", name, self.0)
|
|
||||||
/// }
|
|
||||||
/// }
|
|
||||||
/// ```
|
|
||||||
///
|
|
||||||
/// Into code like this, which matches the service trait definition:
|
|
||||||
///
|
|
||||||
/// ```rust
|
|
||||||
/// # extern crate tarpc;
|
|
||||||
/// # use tarpc::context;
|
|
||||||
/// # use std::pin::Pin;
|
|
||||||
/// # use futures::Future;
|
|
||||||
/// # use std::net::SocketAddr;
|
|
||||||
/// #[tarpc_plugins::service]
|
|
||||||
/// trait World {
|
|
||||||
/// async fn hello(name: String) -> String;
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// #[derive(Clone)]
|
|
||||||
/// struct HelloServer(SocketAddr);
|
|
||||||
///
|
|
||||||
/// impl World for HelloServer {
|
|
||||||
/// type HelloFut = Pin<Box<dyn Future<Output = String> + Send>>;
|
|
||||||
///
|
|
||||||
/// fn hello(self, _: context::Context, name: String) -> Pin<Box<dyn Future<Output = String>
|
|
||||||
/// + Send>> {
|
|
||||||
/// Box::pin(async move {
|
|
||||||
/// format!("Hello, {}! You are connected from {:?}.", name, self.0)
|
|
||||||
/// })
|
|
||||||
/// }
|
|
||||||
/// }
|
|
||||||
/// ```
|
|
||||||
///
|
|
||||||
/// Note that this won't touch functions unless they have been annotated with
|
|
||||||
/// `async`, meaning that this should not break existing code.
|
|
||||||
#[proc_macro_attribute]
|
#[proc_macro_attribute]
|
||||||
pub fn server(_attr: TokenStream, input: TokenStream) -> TokenStream {
|
pub fn server(_attr: TokenStream, input: TokenStream) -> TokenStream {
|
||||||
let mut item = syn::parse_macro_input!(input as ItemImpl);
|
let mut item = syn::parse_macro_input!(input as ItemImpl);
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "tarpc"
|
name = "tarpc"
|
||||||
version = "0.21.1"
|
version = "0.23.0"
|
||||||
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
|
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
@@ -29,29 +29,27 @@ travis-ci = { repository = "google/tarpc" }
|
|||||||
anyhow = "1.0"
|
anyhow = "1.0"
|
||||||
fnv = "1.0"
|
fnv = "1.0"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
humantime = "1.0"
|
humantime = "2.0"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
pin-project = "0.4.17"
|
pin-project = "1.0"
|
||||||
rand = "0.7"
|
rand = "0.7"
|
||||||
tokio = { version = "0.2", features = ["time"] }
|
|
||||||
serde = { optional = true, version = "1.0", features = ["derive"] }
|
serde = { optional = true, version = "1.0", features = ["derive"] }
|
||||||
static_assertions = "1.1.0"
|
static_assertions = "1.1.0"
|
||||||
tokio-util = { optional = true, version = "0.2" }
|
|
||||||
tarpc-plugins = { path = "../plugins", version = "0.8" }
|
tarpc-plugins = { path = "../plugins", version = "0.8" }
|
||||||
|
tokio = { version = "0.3" }
|
||||||
|
tokio-util = { optional = true, version = "0.4" }
|
||||||
tokio-serde = { optional = true, version = "0.6" }
|
tokio-serde = { optional = true, version = "0.6" }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
assert_matches = "1.0"
|
assert_matches = "1.4"
|
||||||
bincode = "1.3"
|
bincode = "1.3"
|
||||||
bytes = { version = "0.5", features = ["serde"] }
|
bytes = { version = "0.5", features = ["serde"] }
|
||||||
env_logger = "0.6"
|
env_logger = "0.8"
|
||||||
flate2 = "1.0.16"
|
flate2 = "1.0"
|
||||||
futures = "0.3"
|
|
||||||
humantime = "1.0"
|
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
pin-utils = "0.1.0-alpha"
|
pin-utils = "0.1.0-alpha"
|
||||||
serde_bytes = "0.11"
|
serde_bytes = "0.11"
|
||||||
tokio = { version = "0.2", features = ["full"] }
|
tokio = { version = "0.3", features = ["full"] }
|
||||||
tokio-serde = { version = "0.6", features = ["json", "bincode"] }
|
tokio-serde = { version = "0.6", features = ["json", "bincode"] }
|
||||||
trybuild = "1.0"
|
trybuild = "1.0"
|
||||||
|
|
||||||
|
|||||||
@@ -118,7 +118,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
.await;
|
.await;
|
||||||
});
|
});
|
||||||
|
|
||||||
let transport = tcp::connect(addr, Bincode::default()).await?;
|
let transport = tcp::connect(addr, Bincode::default).await?;
|
||||||
let mut client =
|
let mut client =
|
||||||
WorldClient::new(client::Config::default(), add_compression(transport)).spawn()?;
|
WorldClient::new(client::Config::default(), add_compression(transport)).spawn()?;
|
||||||
|
|
||||||
|
|||||||
@@ -18,7 +18,7 @@
|
|||||||
/// messages to all clients subscribed to the topic of that message.
|
/// messages to all clients subscribed to the topic of that message.
|
||||||
///
|
///
|
||||||
/// Subscriber Publisher PubSub Server
|
/// Subscriber Publisher PubSub Server
|
||||||
/// T1 | | |
|
/// T1 | | |
|
||||||
/// T2 |-----Connect------------------------------------------------------>|
|
/// T2 |-----Connect------------------------------------------------------>|
|
||||||
/// T3 | | |
|
/// T3 | | |
|
||||||
/// T2 |<-------------------------------------------------------Topics-----|
|
/// T2 |<-------------------------------------------------------Topics-----|
|
||||||
@@ -103,7 +103,7 @@ impl Subscriber {
|
|||||||
publisher_addr: impl ToSocketAddrs,
|
publisher_addr: impl ToSocketAddrs,
|
||||||
topics: Vec<String>,
|
topics: Vec<String>,
|
||||||
) -> anyhow::Result<SubscriberHandle> {
|
) -> anyhow::Result<SubscriberHandle> {
|
||||||
let publisher = tcp::connect(publisher_addr, Json::default()).await?;
|
let publisher = tcp::connect(publisher_addr, Json::default).await?;
|
||||||
let local_addr = publisher.local_addr()?;
|
let local_addr = publisher.local_addr()?;
|
||||||
let mut handler = server::BaseChannel::with_defaults(publisher)
|
let mut handler = server::BaseChannel::with_defaults(publisher)
|
||||||
.respond_with(Subscriber { local_addr, topics }.serve());
|
.respond_with(Subscriber { local_addr, topics }.serve());
|
||||||
@@ -308,7 +308,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
|
|
||||||
let mut publisher = publisher::PublisherClient::new(
|
let mut publisher = publisher::PublisherClient::new(
|
||||||
client::Config::default(),
|
client::Config::default(),
|
||||||
tcp::connect(addrs.publisher, Json::default()).await?,
|
tcp::connect(addrs.publisher, Json::default).await?,
|
||||||
)
|
)
|
||||||
.spawn()?;
|
.spawn()?;
|
||||||
|
|
||||||
|
|||||||
@@ -62,7 +62,7 @@ async fn main() -> io::Result<()> {
|
|||||||
};
|
};
|
||||||
tokio::spawn(server);
|
tokio::spawn(server);
|
||||||
|
|
||||||
let transport = tarpc::serde_transport::tcp::connect(addr, Json::default()).await?;
|
let transport = tarpc::serde_transport::tcp::connect(addr, Json::default).await?;
|
||||||
|
|
||||||
// WorldClient is generated by the tarpc::service attribute. It has a constructor `new` that
|
// WorldClient is generated by the tarpc::service attribute. It has a constructor `new` that
|
||||||
// takes a config and any Transport as input.
|
// takes a config and any Transport as input.
|
||||||
|
|||||||
@@ -68,7 +68,7 @@ async fn main() -> io::Result<()> {
|
|||||||
.respond_with(AddServer.serve());
|
.respond_with(AddServer.serve());
|
||||||
tokio::spawn(add_server);
|
tokio::spawn(add_server);
|
||||||
|
|
||||||
let to_add_server = tarpc::serde_transport::tcp::connect(addr, Json::default()).await?;
|
let to_add_server = tarpc::serde_transport::tcp::connect(addr, Json::default).await?;
|
||||||
let add_client = add::AddClient::new(client::Config::default(), to_add_server).spawn()?;
|
let add_client = add::AddClient::new(client::Config::default(), to_add_server).spawn()?;
|
||||||
|
|
||||||
let double_listener = tarpc::serde_transport::tcp::listen("localhost:0", Json::default)
|
let double_listener = tarpc::serde_transport::tcp::listen("localhost:0", Json::default)
|
||||||
@@ -81,7 +81,7 @@ async fn main() -> io::Result<()> {
|
|||||||
.respond_with(DoubleServer { add_client }.serve());
|
.respond_with(DoubleServer { add_client }.serve());
|
||||||
tokio::spawn(double_server);
|
tokio::spawn(double_server);
|
||||||
|
|
||||||
let to_double_server = tarpc::serde_transport::tcp::connect(addr, Json::default()).await?;
|
let to_double_server = tarpc::serde_transport::tcp::connect(addr, Json::default).await?;
|
||||||
let mut double_client =
|
let mut double_client =
|
||||||
double::DoubleClient::new(client::Config::default(), to_double_server).spawn()?;
|
double::DoubleClient::new(client::Config::default(), to_double_server).spawn()?;
|
||||||
|
|
||||||
|
|||||||
@@ -4,9 +4,6 @@
|
|||||||
// license that can be found in the LICENSE file or at
|
// license that can be found in the LICENSE file or at
|
||||||
// https://opensource.org/licenses/MIT.
|
// https://opensource.org/licenses/MIT.
|
||||||
|
|
||||||
//! [](https://crates.io/crates/tarpc)
|
|
||||||
//! [](https://gitter.im/tarpc/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
|
|
||||||
//!
|
|
||||||
//! *Disclaimer*: This is not an official Google product.
|
//! *Disclaimer*: This is not an official Google product.
|
||||||
//!
|
//!
|
||||||
//! tarpc is an RPC framework for rust with a focus on ease of use. Defining a
|
//! tarpc is an RPC framework for rust with a focus on ease of use. Defining a
|
||||||
@@ -50,7 +47,7 @@
|
|||||||
//! Add to your `Cargo.toml` dependencies:
|
//! Add to your `Cargo.toml` dependencies:
|
||||||
//!
|
//!
|
||||||
//! ```toml
|
//! ```toml
|
||||||
//! tarpc = "0.21.0"
|
//! tarpc = "0.23.0"
|
||||||
//! ```
|
//! ```
|
||||||
//!
|
//!
|
||||||
//! The `tarpc::service` attribute expands to a collection of items that form an rpc service.
|
//! The `tarpc::service` attribute expands to a collection of items that form an rpc service.
|
||||||
@@ -64,7 +61,7 @@
|
|||||||
//!
|
//!
|
||||||
//! ```toml
|
//! ```toml
|
||||||
//! futures = "0.3"
|
//! futures = "0.3"
|
||||||
//! tokio = "0.2"
|
//! tokio = "0.3"
|
||||||
//! ```
|
//! ```
|
||||||
//!
|
//!
|
||||||
//! In the following example, we use an in-process channel for communication between
|
//! In the following example, we use an in-process channel for communication between
|
||||||
|
|||||||
@@ -88,7 +88,7 @@ impl<'a, Req, Resp> Future for Call<'a, Req, Resp> {
|
|||||||
let resp = ready!(self.as_mut().project().fut.poll(cx));
|
let resp = ready!(self.as_mut().project().fut.poll(cx));
|
||||||
Poll::Ready(match resp {
|
Poll::Ready(match resp {
|
||||||
Ok(resp) => resp,
|
Ok(resp) => resp,
|
||||||
Err(tokio::time::Elapsed { .. }) => Err(io::Error::new(
|
Err(tokio::time::error::Elapsed { .. }) => Err(io::Error::new(
|
||||||
io::ErrorKind::TimedOut,
|
io::ErrorKind::TimedOut,
|
||||||
"Client dropped expired request.".to_string(),
|
"Client dropped expired request.".to_string(),
|
||||||
)),
|
)),
|
||||||
@@ -723,7 +723,7 @@ mod tests {
|
|||||||
};
|
};
|
||||||
use std::{pin::Pin, sync::atomic::AtomicU64, sync::Arc};
|
use std::{pin::Pin, sync::atomic::AtomicU64, sync::Arc};
|
||||||
|
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn dispatch_response_cancels_on_drop() {
|
async fn dispatch_response_cancels_on_drop() {
|
||||||
let (cancellation, mut canceled_requests) = cancellations();
|
let (cancellation, mut canceled_requests) = cancellations();
|
||||||
let (_, response) = oneshot::channel();
|
let (_, response) = oneshot::channel();
|
||||||
@@ -738,7 +738,7 @@ mod tests {
|
|||||||
assert_eq!(canceled_requests.0.try_next().unwrap(), Some(3));
|
assert_eq!(canceled_requests.0.try_next().unwrap(), Some(3));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn stage_request() {
|
async fn stage_request() {
|
||||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||||
let dispatch = Pin::new(&mut dispatch);
|
let dispatch = Pin::new(&mut dispatch);
|
||||||
@@ -755,7 +755,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Regression test for https://github.com/google/tarpc/issues/220
|
// Regression test for https://github.com/google/tarpc/issues/220
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn stage_request_channel_dropped_doesnt_panic() {
|
async fn stage_request_channel_dropped_doesnt_panic() {
|
||||||
let (mut dispatch, mut channel, mut server_channel) = set_up();
|
let (mut dispatch, mut channel, mut server_channel) = set_up();
|
||||||
let mut dispatch = Pin::new(&mut dispatch);
|
let mut dispatch = Pin::new(&mut dispatch);
|
||||||
@@ -776,7 +776,7 @@ mod tests {
|
|||||||
dispatch.await.unwrap();
|
dispatch.await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn stage_request_response_future_dropped_is_canceled_before_sending() {
|
async fn stage_request_response_future_dropped_is_canceled_before_sending() {
|
||||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||||
let dispatch = Pin::new(&mut dispatch);
|
let dispatch = Pin::new(&mut dispatch);
|
||||||
@@ -791,7 +791,7 @@ mod tests {
|
|||||||
assert!(dispatch.poll_next_request(cx).ready().is_none());
|
assert!(dispatch.poll_next_request(cx).ready().is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn stage_request_response_future_dropped_is_canceled_after_sending() {
|
async fn stage_request_response_future_dropped_is_canceled_after_sending() {
|
||||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||||
@@ -813,7 +813,7 @@ mod tests {
|
|||||||
assert!(dispatch.project().in_flight_requests.is_empty());
|
assert!(dispatch.project().in_flight_requests.is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn stage_request_response_closed_skipped() {
|
async fn stage_request_response_closed_skipped() {
|
||||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||||
let dispatch = Pin::new(&mut dispatch);
|
let dispatch = Pin::new(&mut dispatch);
|
||||||
|
|||||||
@@ -565,7 +565,7 @@ where
|
|||||||
request_id: self.request_id,
|
request_id: self.request_id,
|
||||||
message: match result {
|
message: match result {
|
||||||
Ok(message) => Ok(message),
|
Ok(message) => Ok(message),
|
||||||
Err(tokio::time::Elapsed { .. }) => {
|
Err(tokio::time::error::Elapsed { .. }) => {
|
||||||
debug!(
|
debug!(
|
||||||
"[{}] Response did not complete before deadline of {}s.",
|
"[{}] Response did not complete before deadline of {}s.",
|
||||||
self.ctx.trace_id(),
|
self.ctx.trace_id(),
|
||||||
@@ -624,11 +624,7 @@ where
|
|||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
loop {
|
loop {
|
||||||
let read = self.as_mut().pump_read(cx)?;
|
let read = self.as_mut().pump_read(cx)?;
|
||||||
let read_closed = if let Poll::Ready(None) = read {
|
let read_closed = matches!(read, Poll::Ready(None));
|
||||||
true
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
};
|
|
||||||
match (read, self.as_mut().pump_write(cx, read_closed)?) {
|
match (read, self.as_mut().pump_write(cx, read_closed)?) {
|
||||||
(Poll::Ready(None), Poll::Ready(None)) => {
|
(Poll::Ready(None), Poll::Ready(None)) => {
|
||||||
return Poll::Ready(None);
|
return Poll::Ready(None);
|
||||||
@@ -1,3 +1,9 @@
|
|||||||
|
// Copyright 2020 Google LLC
|
||||||
|
//
|
||||||
|
// Use of this source code is governed by an MIT-style
|
||||||
|
// license that can be found in the LICENSE file or at
|
||||||
|
// https://opensource.org/licenses/MIT.
|
||||||
|
|
||||||
use crate::server::{Channel, Config};
|
use crate::server::{Channel, Config};
|
||||||
use crate::{context, Request, Response};
|
use crate::{context, Request, Response};
|
||||||
use fnv::FnvHashSet;
|
use fnv::FnvHashSet;
|
||||||
@@ -111,10 +117,7 @@ pub trait PollExt {
|
|||||||
|
|
||||||
impl<T> PollExt for Poll<Option<T>> {
|
impl<T> PollExt for Poll<Option<T>> {
|
||||||
fn is_done(&self) -> bool {
|
fn is_done(&self) -> bool {
|
||||||
match self {
|
matches!(self, Poll::Ready(None))
|
||||||
Poll::Ready(None) => true,
|
|
||||||
_ => false,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,9 @@
|
|||||||
|
// Copyright 2020 Google LLC
|
||||||
|
//
|
||||||
|
// Use of this source code is governed by an MIT-style
|
||||||
|
// license that can be found in the LICENSE file or at
|
||||||
|
// https://opensource.org/licenses/MIT.
|
||||||
|
|
||||||
use super::{Channel, Config};
|
use super::{Channel, Config};
|
||||||
use crate::{Response, ServerError};
|
use crate::{Response, ServerError};
|
||||||
use futures::{future::AbortRegistration, prelude::*, ready, task::*};
|
use futures::{future::AbortRegistration, prelude::*, ready, task::*};
|
||||||
|
|||||||
@@ -90,7 +90,7 @@ mod tests {
|
|||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
#[cfg(feature = "tokio1")]
|
#[cfg(feature = "tokio1")]
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn integration() -> io::Result<()> {
|
async fn integration() -> io::Result<()> {
|
||||||
let _ = env_logger::try_init();
|
let _ = env_logger::try_init();
|
||||||
|
|
||||||
|
|||||||
@@ -14,7 +14,10 @@ use serde::{Deserialize, Serialize};
|
|||||||
use std::{error::Error, io, pin::Pin};
|
use std::{error::Error, io, pin::Pin};
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
use tokio_serde::{Framed as SerdeFramed, *};
|
use tokio_serde::{Framed as SerdeFramed, *};
|
||||||
use tokio_util::codec::{length_delimited::LengthDelimitedCodec, Framed};
|
use tokio_util::codec::{
|
||||||
|
length_delimited::{self, LengthDelimitedCodec},
|
||||||
|
Framed,
|
||||||
|
};
|
||||||
|
|
||||||
/// A transport that serializes to, and deserializes from, a byte stream.
|
/// A transport that serializes to, and deserializes from, a byte stream.
|
||||||
#[pin_project]
|
#[pin_project]
|
||||||
@@ -90,6 +93,22 @@ fn convert<E: Into<Box<dyn Error + Send + Sync>>>(
|
|||||||
poll.map(|ready| ready.map_err(|e| io::Error::new(io::ErrorKind::Other, e)))
|
poll.map(|ready| ready.map_err(|e| io::Error::new(io::ErrorKind::Other, e)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Constructs a new transport from a framed transport and a serialization codec.
|
||||||
|
pub fn new<S, Item, SinkItem, Codec>(
|
||||||
|
framed_io: Framed<S, LengthDelimitedCodec>,
|
||||||
|
codec: Codec,
|
||||||
|
) -> Transport<S, Item, SinkItem, Codec>
|
||||||
|
where
|
||||||
|
S: AsyncWrite + AsyncRead,
|
||||||
|
Item: for<'de> Deserialize<'de>,
|
||||||
|
SinkItem: Serialize,
|
||||||
|
Codec: Serializer<SinkItem> + Deserializer<Item>,
|
||||||
|
{
|
||||||
|
Transport {
|
||||||
|
inner: SerdeFramed::new(framed_io, codec),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<S, Item, SinkItem, Codec> From<(S, Codec)> for Transport<S, Item, SinkItem, Codec>
|
impl<S, Item, SinkItem, Codec> From<(S, Codec)> for Transport<S, Item, SinkItem, Codec>
|
||||||
where
|
where
|
||||||
S: AsyncWrite + AsyncRead,
|
S: AsyncWrite + AsyncRead,
|
||||||
@@ -97,10 +116,8 @@ where
|
|||||||
SinkItem: Serialize,
|
SinkItem: Serialize,
|
||||||
Codec: Serializer<SinkItem> + Deserializer<Item>,
|
Codec: Serializer<SinkItem> + Deserializer<Item>,
|
||||||
{
|
{
|
||||||
fn from((inner, codec): (S, Codec)) -> Self {
|
fn from((io, codec): (S, Codec)) -> Self {
|
||||||
Transport {
|
new(Framed::new(io, LengthDelimitedCodec::new()), codec)
|
||||||
inner: SerdeFramed::new(Framed::new(inner, LengthDelimitedCodec::new()), codec),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -134,34 +151,65 @@ pub mod tcp {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a new JSON transport that reads from and writes to `io`.
|
/// A connection Future that also exposes the length-delimited framing config.
|
||||||
pub fn new<Item, SinkItem, Codec>(
|
#[pin_project]
|
||||||
io: TcpStream,
|
pub struct Connect<T, Item, SinkItem, CodecFn> {
|
||||||
codec: Codec,
|
#[pin]
|
||||||
) -> Transport<TcpStream, Item, SinkItem, Codec>
|
inner: T,
|
||||||
|
codec_fn: CodecFn,
|
||||||
|
config: length_delimited::Builder,
|
||||||
|
ghost: PhantomData<(fn(SinkItem), fn() -> Item)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, Item, SinkItem, Codec, CodecFn> Future for Connect<T, Item, SinkItem, CodecFn>
|
||||||
where
|
where
|
||||||
|
T: Future<Output = io::Result<TcpStream>>,
|
||||||
Item: for<'de> Deserialize<'de>,
|
Item: for<'de> Deserialize<'de>,
|
||||||
SinkItem: Serialize,
|
SinkItem: Serialize,
|
||||||
Codec: Serializer<SinkItem> + Deserializer<Item>,
|
Codec: Serializer<SinkItem> + Deserializer<Item>,
|
||||||
|
CodecFn: Fn() -> Codec,
|
||||||
{
|
{
|
||||||
Transport::from((io, codec))
|
type Output = io::Result<Transport<TcpStream, Item, SinkItem, Codec>>;
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||||
|
let io = ready!(self.as_mut().project().inner.poll(cx))?;
|
||||||
|
Poll::Ready(Ok(new(self.config.new_framed(io), (self.codec_fn)())))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Connects to `addr`, wrapping the connection in a JSON transport.
|
impl<T, Item, SinkItem, CodecFn> Connect<T, Item, SinkItem, CodecFn> {
|
||||||
pub async fn connect<A, Item, SinkItem, Codec>(
|
/// Returns an immutable reference to the length-delimited codec's config.
|
||||||
|
pub fn config(&self) -> &length_delimited::Builder {
|
||||||
|
&self.config
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a mutable reference to the length-delimited codec's config.
|
||||||
|
pub fn config_mut(&mut self) -> &mut length_delimited::Builder {
|
||||||
|
&mut self.config
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Connects to `addr`, wrapping the connection in a TCP transport.
|
||||||
|
pub fn connect<A, Item, SinkItem, Codec, CodecFn>(
|
||||||
addr: A,
|
addr: A,
|
||||||
codec: Codec,
|
codec_fn: CodecFn,
|
||||||
) -> io::Result<Transport<TcpStream, Item, SinkItem, Codec>>
|
) -> Connect<impl Future<Output = io::Result<TcpStream>>, Item, SinkItem, CodecFn>
|
||||||
where
|
where
|
||||||
A: ToSocketAddrs,
|
A: ToSocketAddrs,
|
||||||
Item: for<'de> Deserialize<'de>,
|
Item: for<'de> Deserialize<'de>,
|
||||||
SinkItem: Serialize,
|
SinkItem: Serialize,
|
||||||
Codec: Serializer<SinkItem> + Deserializer<Item>,
|
Codec: Serializer<SinkItem> + Deserializer<Item>,
|
||||||
|
CodecFn: Fn() -> Codec,
|
||||||
{
|
{
|
||||||
Ok(new(TcpStream::connect(addr).await?, codec))
|
Connect {
|
||||||
|
inner: TcpStream::connect(addr),
|
||||||
|
codec_fn,
|
||||||
|
config: LengthDelimitedCodec::builder(),
|
||||||
|
ghost: PhantomData,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Listens on `addr`, wrapping accepted connections in JSON transports.
|
/// Listens on `addr`, wrapping accepted connections in TCP transports.
|
||||||
pub async fn listen<A, Item, SinkItem, Codec, CodecFn>(
|
pub async fn listen<A, Item, SinkItem, Codec, CodecFn>(
|
||||||
addr: A,
|
addr: A,
|
||||||
codec_fn: CodecFn,
|
codec_fn: CodecFn,
|
||||||
@@ -178,6 +226,7 @@ pub mod tcp {
|
|||||||
listener,
|
listener,
|
||||||
codec_fn,
|
codec_fn,
|
||||||
local_addr,
|
local_addr,
|
||||||
|
config: LengthDelimitedCodec::builder(),
|
||||||
ghost: PhantomData,
|
ghost: PhantomData,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -189,7 +238,8 @@ pub mod tcp {
|
|||||||
listener: TcpListener,
|
listener: TcpListener,
|
||||||
local_addr: SocketAddr,
|
local_addr: SocketAddr,
|
||||||
codec_fn: CodecFn,
|
codec_fn: CodecFn,
|
||||||
ghost: PhantomData<(Item, SinkItem, Codec)>,
|
config: length_delimited::Builder,
|
||||||
|
ghost: PhantomData<(fn() -> Item, fn(SinkItem), Codec)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Item, SinkItem, Codec, CodecFn> Incoming<Item, SinkItem, Codec, CodecFn> {
|
impl<Item, SinkItem, Codec, CodecFn> Incoming<Item, SinkItem, Codec, CodecFn> {
|
||||||
@@ -197,6 +247,16 @@ pub mod tcp {
|
|||||||
pub fn local_addr(&self) -> SocketAddr {
|
pub fn local_addr(&self) -> SocketAddr {
|
||||||
self.local_addr
|
self.local_addr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns an immutable reference to the length-delimited codec's config.
|
||||||
|
pub fn config(&self) -> &length_delimited::Builder {
|
||||||
|
&self.config
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a mutable reference to the length-delimited codec's config.
|
||||||
|
pub fn config_mut(&mut self) -> &mut length_delimited::Builder {
|
||||||
|
&mut self.config
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Item, SinkItem, Codec, CodecFn> Stream for Incoming<Item, SinkItem, Codec, CodecFn>
|
impl<Item, SinkItem, Codec, CodecFn> Stream for Incoming<Item, SinkItem, Codec, CodecFn>
|
||||||
@@ -209,9 +269,12 @@ pub mod tcp {
|
|||||||
type Item = io::Result<Transport<TcpStream, Item, SinkItem, Codec>>;
|
type Item = io::Result<Transport<TcpStream, Item, SinkItem, Codec>>;
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
let next =
|
let conn: TcpStream =
|
||||||
ready!(Pin::new(&mut self.as_mut().project().listener.incoming()).poll_next(cx)?);
|
ready!(Pin::new(&mut self.as_mut().project().listener).poll_accept(cx)?).0;
|
||||||
Poll::Ready(next.map(|conn| Ok(new(conn, (self.codec_fn)()))))
|
Poll::Ready(Some(Ok(new(
|
||||||
|
self.config.new_framed(conn),
|
||||||
|
(self.codec_fn)(),
|
||||||
|
))))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -226,7 +289,7 @@ mod tests {
|
|||||||
io::{self, Cursor},
|
io::{self, Cursor},
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
};
|
};
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
use tokio_serde::formats::SymmetricalJson;
|
use tokio_serde::formats::SymmetricalJson;
|
||||||
|
|
||||||
fn ctx() -> Context<'static> {
|
fn ctx() -> Context<'static> {
|
||||||
@@ -241,8 +304,8 @@ mod tests {
|
|||||||
fn poll_read(
|
fn poll_read(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
buf: &mut [u8],
|
buf: &mut ReadBuf<'_>,
|
||||||
) -> Poll<io::Result<usize>> {
|
) -> Poll<io::Result<()>> {
|
||||||
AsyncRead::poll_read(Pin::new(self.0.get_mut()), cx, buf)
|
AsyncRead::poll_read(Pin::new(self.0.get_mut()), cx, buf)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -285,8 +348,8 @@ mod tests {
|
|||||||
fn poll_read(
|
fn poll_read(
|
||||||
self: Pin<&mut Self>,
|
self: Pin<&mut Self>,
|
||||||
_cx: &mut Context<'_>,
|
_cx: &mut Context<'_>,
|
||||||
_buf: &mut [u8],
|
_buf: &mut ReadBuf<'_>,
|
||||||
) -> Poll<io::Result<usize>> {
|
) -> Poll<io::Result<()>> {
|
||||||
unreachable!()
|
unreachable!()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -36,7 +36,7 @@ impl Service for Server {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn sequential() -> io::Result<()> {
|
async fn sequential() -> io::Result<()> {
|
||||||
let _ = env_logger::try_init();
|
let _ = env_logger::try_init();
|
||||||
|
|
||||||
@@ -59,7 +59,7 @@ async fn sequential() -> io::Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "serde1")]
|
#[cfg(feature = "serde1")]
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn serde() -> io::Result<()> {
|
async fn serde() -> io::Result<()> {
|
||||||
let _ = env_logger::try_init();
|
let _ = env_logger::try_init();
|
||||||
|
|
||||||
@@ -71,7 +71,7 @@ async fn serde() -> io::Result<()> {
|
|||||||
.respond_with(Server.serve()),
|
.respond_with(Server.serve()),
|
||||||
);
|
);
|
||||||
|
|
||||||
let transport = serde_transport::tcp::connect(addr, Json::default()).await?;
|
let transport = serde_transport::tcp::connect(addr, Json::default).await?;
|
||||||
let mut client = ServiceClient::new(client::Config::default(), transport).spawn()?;
|
let mut client = ServiceClient::new(client::Config::default(), transport).spawn()?;
|
||||||
|
|
||||||
assert_matches!(client.add(context::current(), 1, 2).await, Ok(3));
|
assert_matches!(client.add(context::current(), 1, 2).await, Ok(3));
|
||||||
@@ -83,7 +83,7 @@ async fn serde() -> io::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn concurrent() -> io::Result<()> {
|
async fn concurrent() -> io::Result<()> {
|
||||||
let _ = env_logger::try_init();
|
let _ = env_logger::try_init();
|
||||||
|
|
||||||
@@ -112,7 +112,7 @@ async fn concurrent() -> io::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn concurrent_join() -> io::Result<()> {
|
async fn concurrent_join() -> io::Result<()> {
|
||||||
let _ = env_logger::try_init();
|
let _ = env_logger::try_init();
|
||||||
|
|
||||||
@@ -142,7 +142,7 @@ async fn concurrent_join() -> io::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn concurrent_join_all() -> io::Result<()> {
|
async fn concurrent_join_all() -> io::Result<()> {
|
||||||
let _ = env_logger::try_init();
|
let _ = env_logger::try_init();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user