mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-02-23 15:49:54 +01:00
Compare commits
22 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bc982c5584 | ||
|
|
d440e12c19 | ||
|
|
bc8128af69 | ||
|
|
1d87c14262 | ||
|
|
ca929c2178 | ||
|
|
569039734b | ||
|
|
3d43310e6a | ||
|
|
d21cbddb0d | ||
|
|
25aa857edf | ||
|
|
0bb2e2bbbe | ||
|
|
dc376343d6 | ||
|
|
2e7d1f8a88 | ||
|
|
6314591c65 | ||
|
|
7dd7494420 | ||
|
|
6c10e3649f | ||
|
|
4c6dee13d2 | ||
|
|
e45abe953a | ||
|
|
dec3e491b5 | ||
|
|
6ce341cf79 | ||
|
|
b9868250f8 | ||
|
|
a3f1064efe | ||
|
|
026083d653 |
19
.github/workflows/main.yml
vendored
19
.github/workflows/main.yml
vendored
@@ -28,6 +28,25 @@ jobs:
|
|||||||
profile: minimal
|
profile: minimal
|
||||||
toolchain: stable
|
toolchain: stable
|
||||||
override: true
|
override: true
|
||||||
|
- uses: actions-rs/cargo@v1
|
||||||
|
with:
|
||||||
|
command: test
|
||||||
|
- uses: actions-rs/cargo@v1
|
||||||
|
with:
|
||||||
|
command: test
|
||||||
|
args: --manifest-path tarpc/Cargo.toml --features serde1
|
||||||
|
- uses: actions-rs/cargo@v1
|
||||||
|
with:
|
||||||
|
command: test
|
||||||
|
args: --manifest-path tarpc/Cargo.toml --features tokio1
|
||||||
|
- uses: actions-rs/cargo@v1
|
||||||
|
with:
|
||||||
|
command: test
|
||||||
|
args: --manifest-path tarpc/Cargo.toml --features serde-transport
|
||||||
|
- uses: actions-rs/cargo@v1
|
||||||
|
with:
|
||||||
|
command: test
|
||||||
|
args: --manifest-path tarpc/Cargo.toml --features tcp
|
||||||
- uses: actions-rs/cargo@v1
|
- uses: actions-rs/cargo@v1
|
||||||
with:
|
with:
|
||||||
command: test
|
command: test
|
||||||
|
|||||||
@@ -59,7 +59,7 @@ Some other features of tarpc:
|
|||||||
Add to your `Cargo.toml` dependencies:
|
Add to your `Cargo.toml` dependencies:
|
||||||
|
|
||||||
```toml
|
```toml
|
||||||
tarpc = "0.22.0"
|
tarpc = "0.24"
|
||||||
```
|
```
|
||||||
|
|
||||||
The `tarpc::service` attribute expands to a collection of items that form an rpc service.
|
The `tarpc::service` attribute expands to a collection of items that form an rpc service.
|
||||||
@@ -68,12 +68,13 @@ Simply implement the generated service trait, and you're off to the races!
|
|||||||
|
|
||||||
## Example
|
## Example
|
||||||
|
|
||||||
For this example, in addition to tarpc, also add two other dependencies to
|
This example uses [tokio](https://tokio.rs), so add the following dependencies to
|
||||||
your `Cargo.toml`:
|
your `Cargo.toml`:
|
||||||
|
|
||||||
```toml
|
```toml
|
||||||
futures = "0.3"
|
futures = "1.0"
|
||||||
tokio = "0.2"
|
tarpc = { version = "0.24", features = ["tokio1"] }
|
||||||
|
tokio = { version = "1.0", features = ["macros"] }
|
||||||
```
|
```
|
||||||
|
|
||||||
In the following example, we use an in-process channel for communication between
|
In the following example, we use an in-process channel for communication between
|
||||||
|
|||||||
16
RELEASES.md
16
RELEASES.md
@@ -1,3 +1,19 @@
|
|||||||
|
## 0.24.1 (2020-12-28)
|
||||||
|
|
||||||
|
### Breaking Changes
|
||||||
|
|
||||||
|
Upgrades tokio to 1.0.
|
||||||
|
|
||||||
|
## 0.24.0 (2020-12-28)
|
||||||
|
|
||||||
|
This release was yanked.
|
||||||
|
|
||||||
|
## 0.23.0 (2020-10-19)
|
||||||
|
|
||||||
|
### Breaking Changes
|
||||||
|
|
||||||
|
Upgrades tokio to 0.3.
|
||||||
|
|
||||||
## 0.22.0 (2020-08-02)
|
## 0.22.0 (2020-08-02)
|
||||||
|
|
||||||
This release adds some flexibility and consistency to `serde_transport`, with one new feature and
|
This release adds some flexibility and consistency to `serde_transport`, with one new feature and
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "tarpc-example-service"
|
name = "tarpc-example-service"
|
||||||
version = "0.6.0"
|
version = "0.8.0"
|
||||||
authors = ["Tim Kuehn <tikue@google.com>"]
|
authors = ["Tim Kuehn <tikue@google.com>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
@@ -13,14 +13,12 @@ readme = "../README.md"
|
|||||||
description = "An example server built on tarpc."
|
description = "An example server built on tarpc."
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
clap = "2.0"
|
clap = "2.33"
|
||||||
|
env_logger = "0.8"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
serde = { version = "1.0" }
|
serde = { version = "1.0" }
|
||||||
tarpc = { version = "0.22", path = "../tarpc", features = ["full"] }
|
tarpc = { version = "0.24", path = "../tarpc", features = ["full"] }
|
||||||
tokio = { version = "0.2", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
tokio-serde = { version = "0.6", features = ["json"] }
|
|
||||||
tokio-util = { version = "0.3", features = ["codec"] }
|
|
||||||
env_logger = "0.6"
|
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
name = "service"
|
name = "service"
|
||||||
|
|||||||
@@ -6,8 +6,7 @@
|
|||||||
|
|
||||||
use clap::{App, Arg};
|
use clap::{App, Arg};
|
||||||
use std::{io, net::SocketAddr};
|
use std::{io, net::SocketAddr};
|
||||||
use tarpc::{client, context};
|
use tarpc::{client, context, tokio_serde::formats::Json};
|
||||||
use tokio_serde::formats::Json;
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> io::Result<()> {
|
async fn main() -> io::Result<()> {
|
||||||
|
|||||||
@@ -14,8 +14,8 @@ use std::{
|
|||||||
use tarpc::{
|
use tarpc::{
|
||||||
context,
|
context,
|
||||||
server::{self, Channel, Handler},
|
server::{self, Channel, Handler},
|
||||||
|
tokio_serde::formats::Json,
|
||||||
};
|
};
|
||||||
use tokio_serde::formats::Json;
|
|
||||||
|
|
||||||
// This is the type that implements the generated World trait. It is the business logic
|
// This is the type that implements the generated World trait. It is the business logic
|
||||||
// and is used to start the server.
|
// and is used to start the server.
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "tarpc-plugins"
|
name = "tarpc-plugins"
|
||||||
version = "0.8.0"
|
version = "0.9.0"
|
||||||
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
|
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
@@ -19,15 +19,15 @@ serde1 = []
|
|||||||
travis-ci = { repository = "google/tarpc" }
|
travis-ci = { repository = "google/tarpc" }
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
syn = { version = "1.0.11", features = ["full"] }
|
proc-macro2 = "1.0"
|
||||||
quote = "1.0.2"
|
quote = "1.0"
|
||||||
proc-macro2 = "1.0.6"
|
syn = { version = "1.0", features = ["full"] }
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
proc-macro = true
|
proc-macro = true
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
assert-type-eq = "0.1.0"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
tarpc = { path = "../tarpc" }
|
tarpc = { path = "../tarpc" }
|
||||||
assert-type-eq = "0.1.0"
|
|
||||||
|
|||||||
@@ -215,6 +215,19 @@ impl Parse for DeriveSerde {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Generates:
|
||||||
|
/// - derive of Debug, serde Serialize & Deserialize
|
||||||
|
/// - serde crate annotation
|
||||||
|
#[proc_macro_attribute]
|
||||||
|
pub fn derive_serde(_attr: TokenStream, item: TokenStream) -> TokenStream {
|
||||||
|
let mut gen: proc_macro2::TokenStream = quote! {
|
||||||
|
#[derive(tarpc::serde::Serialize, tarpc::serde::Deserialize)]
|
||||||
|
#[serde(crate = "tarpc::serde")]
|
||||||
|
};
|
||||||
|
gen.extend(proc_macro2::TokenStream::from(item));
|
||||||
|
proc_macro::TokenStream::from(gen)
|
||||||
|
}
|
||||||
|
|
||||||
/// Generates:
|
/// Generates:
|
||||||
/// - service trait
|
/// - service trait
|
||||||
/// - serve fn
|
/// - serve fn
|
||||||
@@ -240,7 +253,10 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
|
|||||||
let args: &[&[PatType]] = &rpcs.iter().map(|rpc| &*rpc.args).collect::<Vec<_>>();
|
let args: &[&[PatType]] = &rpcs.iter().map(|rpc| &*rpc.args).collect::<Vec<_>>();
|
||||||
let response_fut_name = &format!("{}ResponseFut", ident.unraw());
|
let response_fut_name = &format!("{}ResponseFut", ident.unraw());
|
||||||
let derive_serialize = if derive_serde.0 {
|
let derive_serialize = if derive_serde.0 {
|
||||||
Some(quote!(#[derive(serde::Serialize, serde::Deserialize)]))
|
Some(
|
||||||
|
quote! {#[derive(tarpc::serde::Serialize, tarpc::serde::Deserialize)]
|
||||||
|
#[serde(crate = "tarpc::serde")]},
|
||||||
|
)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "tarpc"
|
name = "tarpc"
|
||||||
version = "0.22.0"
|
version = "0.24.1"
|
||||||
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
|
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
@@ -16,9 +16,9 @@ description = "An RPC framework for Rust with a focus on ease of use."
|
|||||||
default = []
|
default = []
|
||||||
|
|
||||||
serde1 = ["tarpc-plugins/serde1", "serde", "serde/derive"]
|
serde1 = ["tarpc-plugins/serde1", "serde", "serde/derive"]
|
||||||
tokio1 = []
|
tokio1 = ["tokio/rt-multi-thread"]
|
||||||
serde-transport = ["tokio-serde", "tokio-util/codec"]
|
serde-transport = ["serde1", "tokio1", "tokio-serde/json", "tokio-util/codec"]
|
||||||
tcp = ["tokio/net", "tokio/stream"]
|
tcp = ["tokio/net"]
|
||||||
|
|
||||||
full = ["serde1", "tokio1", "serde-transport", "tcp"]
|
full = ["serde1", "tokio1", "serde-transport", "tcp"]
|
||||||
|
|
||||||
@@ -29,36 +29,38 @@ travis-ci = { repository = "google/tarpc" }
|
|||||||
anyhow = "1.0"
|
anyhow = "1.0"
|
||||||
fnv = "1.0"
|
fnv = "1.0"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
humantime = "1.0"
|
humantime = "2.0"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
pin-project = "0.4.17"
|
pin-project = "1.0"
|
||||||
rand = "0.7"
|
rand = "0.7"
|
||||||
tokio = { version = "0.2", features = ["time"] }
|
|
||||||
serde = { optional = true, version = "1.0", features = ["derive"] }
|
serde = { optional = true, version = "1.0", features = ["derive"] }
|
||||||
static_assertions = "1.1.0"
|
static_assertions = "1.1.0"
|
||||||
tarpc-plugins = { path = "../plugins", version = "0.8" }
|
tarpc-plugins = { path = "../plugins", version = "0.9" }
|
||||||
tokio-util = { optional = true, version = "0.3" }
|
tokio = { version = "1", features = ["time"] }
|
||||||
tokio-serde = { optional = true, version = "0.6" }
|
tokio-util = { optional = true, version = "0.6" }
|
||||||
|
tokio-serde = { optional = true, version = "0.8" }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
assert_matches = "1.0"
|
assert_matches = "1.4"
|
||||||
bincode = "1.3"
|
bincode = "1.3"
|
||||||
bytes = { version = "0.5", features = ["serde"] }
|
bytes = { version = "1", features = ["serde"] }
|
||||||
env_logger = "0.6"
|
env_logger = "0.8"
|
||||||
flate2 = "1.0.16"
|
flate2 = "1.0"
|
||||||
futures = "0.3"
|
|
||||||
humantime = "1.0"
|
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
pin-utils = "0.1.0-alpha"
|
pin-utils = "0.1.0-alpha"
|
||||||
serde_bytes = "0.11"
|
serde_bytes = "0.11"
|
||||||
tokio = { version = "0.2", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
tokio-serde = { version = "0.6", features = ["json", "bincode"] }
|
tokio-serde = { version = "0.8", features = ["json", "bincode"] }
|
||||||
trybuild = "1.0"
|
trybuild = "1.0"
|
||||||
|
|
||||||
[package.metadata.docs.rs]
|
[package.metadata.docs.rs]
|
||||||
all-features = true
|
all-features = true
|
||||||
rustdoc-args = ["--cfg", "docsrs"]
|
rustdoc-args = ["--cfg", "docsrs"]
|
||||||
|
|
||||||
|
[[example]]
|
||||||
|
name = "compression"
|
||||||
|
required-features = ["serde-transport", "tcp"]
|
||||||
|
|
||||||
[[example]]
|
[[example]]
|
||||||
name = "server_calling_server"
|
name = "server_calling_server"
|
||||||
required-features = ["full"]
|
required-features = ["full"]
|
||||||
@@ -70,3 +72,15 @@ required-features = ["full"]
|
|||||||
[[example]]
|
[[example]]
|
||||||
name = "pubsub"
|
name = "pubsub"
|
||||||
required-features = ["full"]
|
required-features = ["full"]
|
||||||
|
|
||||||
|
[[example]]
|
||||||
|
name = "custom_transport"
|
||||||
|
required-features = ["serde1", "tokio1", "serde-transport"]
|
||||||
|
|
||||||
|
[[test]]
|
||||||
|
name = "service_functional"
|
||||||
|
required-features = ["serde-transport"]
|
||||||
|
|
||||||
|
[[test]]
|
||||||
|
name = "dataservice"
|
||||||
|
required-features = ["serde-transport", "tcp"]
|
||||||
|
|||||||
52
tarpc/examples/custom_transport.rs
Normal file
52
tarpc/examples/custom_transport.rs
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
use futures::future;
|
||||||
|
use tarpc::context::Context;
|
||||||
|
use tarpc::serde_transport as transport;
|
||||||
|
use tarpc::server::{BaseChannel, Channel};
|
||||||
|
use tokio::net::{UnixListener, UnixStream};
|
||||||
|
use tokio_serde::formats::Bincode;
|
||||||
|
use tokio_util::codec::length_delimited::LengthDelimitedCodec;
|
||||||
|
|
||||||
|
#[tarpc::service]
|
||||||
|
pub trait PingService {
|
||||||
|
async fn ping();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct Service;
|
||||||
|
|
||||||
|
impl PingService for Service {
|
||||||
|
type PingFut = future::Ready<()>;
|
||||||
|
|
||||||
|
fn ping(self, _: Context) -> Self::PingFut {
|
||||||
|
future::ready(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> std::io::Result<()> {
|
||||||
|
let bind_addr = "/tmp/tarpc_on_unix_example.sock";
|
||||||
|
|
||||||
|
let _ = std::fs::remove_file(bind_addr);
|
||||||
|
|
||||||
|
let listener = UnixListener::bind(bind_addr).unwrap();
|
||||||
|
let codec_builder = LengthDelimitedCodec::builder();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
let (conn, _addr) = listener.accept().await.unwrap();
|
||||||
|
let framed = codec_builder.new_framed(conn);
|
||||||
|
let transport = transport::new(framed, Bincode::default());
|
||||||
|
|
||||||
|
let fut = BaseChannel::with_defaults(transport)
|
||||||
|
.respond_with(Service.serve())
|
||||||
|
.execute();
|
||||||
|
tokio::spawn(fut);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let conn = UnixStream::connect(bind_addr).await?;
|
||||||
|
let transport = transport::new(codec_builder.new_framed(conn), Bincode::default());
|
||||||
|
PingServiceClient::new(Default::default(), transport)
|
||||||
|
.spawn()?
|
||||||
|
.ping(tarpc::context::current())
|
||||||
|
.await
|
||||||
|
}
|
||||||
@@ -3,7 +3,6 @@
|
|||||||
// Use of this source code is governed by an MIT-style
|
// Use of this source code is governed by an MIT-style
|
||||||
// license that can be found in the LICENSE file or at
|
// license that can be found in the LICENSE file or at
|
||||||
// https://opensource.org/licenses/MIT.
|
// https://opensource.org/licenses/MIT.
|
||||||
|
|
||||||
//! *Disclaimer*: This is not an official Google product.
|
//! *Disclaimer*: This is not an official Google product.
|
||||||
//!
|
//!
|
||||||
//! tarpc is an RPC framework for rust with a focus on ease of use. Defining a
|
//! tarpc is an RPC framework for rust with a focus on ease of use. Defining a
|
||||||
@@ -47,7 +46,7 @@
|
|||||||
//! Add to your `Cargo.toml` dependencies:
|
//! Add to your `Cargo.toml` dependencies:
|
||||||
//!
|
//!
|
||||||
//! ```toml
|
//! ```toml
|
||||||
//! tarpc = "0.22.0"
|
//! tarpc = "0.24"
|
||||||
//! ```
|
//! ```
|
||||||
//!
|
//!
|
||||||
//! The `tarpc::service` attribute expands to a collection of items that form an rpc service.
|
//! The `tarpc::service` attribute expands to a collection of items that form an rpc service.
|
||||||
@@ -56,12 +55,13 @@
|
|||||||
//!
|
//!
|
||||||
//! ## Example
|
//! ## Example
|
||||||
//!
|
//!
|
||||||
//! For this example, in addition to tarpc, also add two other dependencies to
|
//! This example uses [tokio](https://tokio.rs), so add the following dependencies to
|
||||||
//! your `Cargo.toml`:
|
//! your `Cargo.toml`:
|
||||||
//!
|
//!
|
||||||
//! ```toml
|
//! ```toml
|
||||||
//! futures = "0.3"
|
//! futures = "1.0"
|
||||||
//! tokio = "0.2"
|
//! tarpc = { version = "0.24", features = ["tokio1"] }
|
||||||
|
//! tokio = { version = "1.0", features = ["macros"] }
|
||||||
//! ```
|
//! ```
|
||||||
//!
|
//!
|
||||||
//! In the following example, we use an in-process channel for communication between
|
//! In the following example, we use an in-process channel for communication between
|
||||||
@@ -165,6 +165,9 @@
|
|||||||
//! # future::ready(format!("Hello, {}!", name))
|
//! # future::ready(format!("Hello, {}!", name))
|
||||||
//! # }
|
//! # }
|
||||||
//! # }
|
//! # }
|
||||||
|
//! # #[cfg(not(feature = "tokio1"))]
|
||||||
|
//! # fn main() {}
|
||||||
|
//! # #[cfg(feature = "tokio1")]
|
||||||
//! #[tokio::main]
|
//! #[tokio::main]
|
||||||
//! async fn main() -> io::Result<()> {
|
//! async fn main() -> io::Result<()> {
|
||||||
//! let (client_transport, server_transport) = tarpc::transport::channel::unbounded();
|
//! let (client_transport, server_transport) = tarpc::transport::channel::unbounded();
|
||||||
@@ -203,12 +206,21 @@
|
|||||||
pub mod rpc;
|
pub mod rpc;
|
||||||
pub use rpc::*;
|
pub use rpc::*;
|
||||||
|
|
||||||
|
#[cfg(feature = "serde1")]
|
||||||
|
pub use serde;
|
||||||
|
|
||||||
|
#[cfg(feature = "serde-transport")]
|
||||||
|
pub use tokio_serde;
|
||||||
|
|
||||||
#[cfg(feature = "serde-transport")]
|
#[cfg(feature = "serde-transport")]
|
||||||
#[cfg_attr(docsrs, doc(cfg(feature = "serde-transport")))]
|
#[cfg_attr(docsrs, doc(cfg(feature = "serde-transport")))]
|
||||||
pub mod serde_transport;
|
pub mod serde_transport;
|
||||||
|
|
||||||
pub mod trace;
|
pub mod trace;
|
||||||
|
|
||||||
|
#[cfg(feature = "serde1")]
|
||||||
|
pub use tarpc_plugins::derive_serde;
|
||||||
|
|
||||||
/// The main macro that creates RPC services.
|
/// The main macro that creates RPC services.
|
||||||
///
|
///
|
||||||
/// Rpc methods are specified, mirroring trait syntax:
|
/// Rpc methods are specified, mirroring trait syntax:
|
||||||
|
|||||||
@@ -55,7 +55,7 @@ pub enum ClientMessage<T> {
|
|||||||
Cancel {
|
Cancel {
|
||||||
/// The trace context associates the message with a specific chain of causally-related actions,
|
/// The trace context associates the message with a specific chain of causally-related actions,
|
||||||
/// possibly orchestrated across many distributed systems.
|
/// possibly orchestrated across many distributed systems.
|
||||||
#[cfg_attr(feature = "serde", serde(default))]
|
#[cfg_attr(feature = "serde1", serde(default))]
|
||||||
trace_context: trace::Context,
|
trace_context: trace::Context,
|
||||||
/// The ID of the request to cancel.
|
/// The ID of the request to cancel.
|
||||||
request_id: u64,
|
request_id: u64,
|
||||||
|
|||||||
@@ -8,6 +8,7 @@
|
|||||||
|
|
||||||
use crate::context;
|
use crate::context;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
|
use std::fmt;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
/// Provides a [`Client`] backed by a transport.
|
/// Provides a [`Client`] backed by a transport.
|
||||||
@@ -127,7 +128,6 @@ impl Default for Config {
|
|||||||
|
|
||||||
/// A channel and dispatch pair. The dispatch drives the sending and receiving of requests
|
/// A channel and dispatch pair. The dispatch drives the sending and receiving of requests
|
||||||
/// and must be polled continuously or spawned.
|
/// and must be polled continuously or spawned.
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct NewClient<C, D> {
|
pub struct NewClient<C, D> {
|
||||||
/// The new client.
|
/// The new client.
|
||||||
pub client: C,
|
pub client: C,
|
||||||
@@ -153,3 +153,9 @@ where
|
|||||||
Ok(self.client)
|
Ok(self.client)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<C, D> fmt::Debug for NewClient<C, D> {
|
||||||
|
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(fmt, "NewClient")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -88,7 +88,7 @@ impl<'a, Req, Resp> Future for Call<'a, Req, Resp> {
|
|||||||
let resp = ready!(self.as_mut().project().fut.poll(cx));
|
let resp = ready!(self.as_mut().project().fut.poll(cx));
|
||||||
Poll::Ready(match resp {
|
Poll::Ready(match resp {
|
||||||
Ok(resp) => resp,
|
Ok(resp) => resp,
|
||||||
Err(tokio::time::Elapsed { .. }) => Err(io::Error::new(
|
Err(tokio::time::error::Elapsed { .. }) => Err(io::Error::new(
|
||||||
io::ErrorKind::TimedOut,
|
io::ErrorKind::TimedOut,
|
||||||
"Client dropped expired request.".to_string(),
|
"Client dropped expired request.".to_string(),
|
||||||
)),
|
)),
|
||||||
@@ -723,7 +723,7 @@ mod tests {
|
|||||||
};
|
};
|
||||||
use std::{pin::Pin, sync::atomic::AtomicU64, sync::Arc};
|
use std::{pin::Pin, sync::atomic::AtomicU64, sync::Arc};
|
||||||
|
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn dispatch_response_cancels_on_drop() {
|
async fn dispatch_response_cancels_on_drop() {
|
||||||
let (cancellation, mut canceled_requests) = cancellations();
|
let (cancellation, mut canceled_requests) = cancellations();
|
||||||
let (_, response) = oneshot::channel();
|
let (_, response) = oneshot::channel();
|
||||||
@@ -738,7 +738,7 @@ mod tests {
|
|||||||
assert_eq!(canceled_requests.0.try_next().unwrap(), Some(3));
|
assert_eq!(canceled_requests.0.try_next().unwrap(), Some(3));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn stage_request() {
|
async fn stage_request() {
|
||||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||||
let dispatch = Pin::new(&mut dispatch);
|
let dispatch = Pin::new(&mut dispatch);
|
||||||
@@ -755,7 +755,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Regression test for https://github.com/google/tarpc/issues/220
|
// Regression test for https://github.com/google/tarpc/issues/220
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn stage_request_channel_dropped_doesnt_panic() {
|
async fn stage_request_channel_dropped_doesnt_panic() {
|
||||||
let (mut dispatch, mut channel, mut server_channel) = set_up();
|
let (mut dispatch, mut channel, mut server_channel) = set_up();
|
||||||
let mut dispatch = Pin::new(&mut dispatch);
|
let mut dispatch = Pin::new(&mut dispatch);
|
||||||
@@ -776,7 +776,7 @@ mod tests {
|
|||||||
dispatch.await.unwrap();
|
dispatch.await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn stage_request_response_future_dropped_is_canceled_before_sending() {
|
async fn stage_request_response_future_dropped_is_canceled_before_sending() {
|
||||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||||
let dispatch = Pin::new(&mut dispatch);
|
let dispatch = Pin::new(&mut dispatch);
|
||||||
@@ -791,7 +791,7 @@ mod tests {
|
|||||||
assert!(dispatch.poll_next_request(cx).ready().is_none());
|
assert!(dispatch.poll_next_request(cx).ready().is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn stage_request_response_future_dropped_is_canceled_after_sending() {
|
async fn stage_request_response_future_dropped_is_canceled_after_sending() {
|
||||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||||
@@ -813,7 +813,7 @@ mod tests {
|
|||||||
assert!(dispatch.project().in_flight_requests.is_empty());
|
assert!(dispatch.project().in_flight_requests.is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn stage_request_response_closed_skipped() {
|
async fn stage_request_response_closed_skipped() {
|
||||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||||
let dispatch = Pin::new(&mut dispatch);
|
let dispatch = Pin::new(&mut dispatch);
|
||||||
|
|||||||
@@ -36,7 +36,6 @@ pub use self::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
/// Manages clients, serving multiplexed requests over each connection.
|
/// Manages clients, serving multiplexed requests over each connection.
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Server<Req, Resp> {
|
pub struct Server<Req, Resp> {
|
||||||
config: Config,
|
config: Config,
|
||||||
ghost: PhantomData<(Req, Resp)>,
|
ghost: PhantomData<(Req, Resp)>,
|
||||||
@@ -48,6 +47,12 @@ impl<Req, Resp> Default for Server<Req, Resp> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<Req, Resp> fmt::Debug for Server<Req, Resp> {
|
||||||
|
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(fmt, "Server")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Settings that control the behavior of the server.
|
/// Settings that control the behavior of the server.
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
@@ -167,7 +172,6 @@ where
|
|||||||
|
|
||||||
/// BaseChannel lifts a Transport to a Channel by tracking in-flight requests.
|
/// BaseChannel lifts a Transport to a Channel by tracking in-flight requests.
|
||||||
#[pin_project]
|
#[pin_project]
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct BaseChannel<Req, Resp, T> {
|
pub struct BaseChannel<Req, Resp, T> {
|
||||||
config: Config,
|
config: Config,
|
||||||
/// Writes responses to the wire and reads requests off the wire.
|
/// Writes responses to the wire and reads requests off the wire.
|
||||||
@@ -236,6 +240,12 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<Req, Resp, T> fmt::Debug for BaseChannel<Req, Resp, T> {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(f, "BaseChannel")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// The server end of an open connection with a client, streaming in requests from, and sinking
|
/// The server end of an open connection with a client, streaming in requests from, and sinking
|
||||||
/// responses to, the client.
|
/// responses to, the client.
|
||||||
///
|
///
|
||||||
@@ -384,7 +394,6 @@ where
|
|||||||
|
|
||||||
/// A running handler serving all requests coming over a channel.
|
/// A running handler serving all requests coming over a channel.
|
||||||
#[pin_project]
|
#[pin_project]
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct ClientHandler<C, S>
|
pub struct ClientHandler<C, S>
|
||||||
where
|
where
|
||||||
C: Channel,
|
C: Channel,
|
||||||
@@ -508,9 +517,17 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<C, S> fmt::Debug for ClientHandler<C, S>
|
||||||
|
where
|
||||||
|
C: Channel,
|
||||||
|
{
|
||||||
|
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(fmt, "ClientHandler")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// A future fulfilling a single client request.
|
/// A future fulfilling a single client request.
|
||||||
#[pin_project]
|
#[pin_project]
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct RequestHandler<F, R> {
|
pub struct RequestHandler<F, R> {
|
||||||
#[pin]
|
#[pin]
|
||||||
resp: Abortable<Resp<F, R>>,
|
resp: Abortable<Resp<F, R>>,
|
||||||
@@ -528,8 +545,13 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<F, R> fmt::Debug for RequestHandler<F, R> {
|
||||||
|
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(fmt, "RequestHandler")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[pin_project]
|
#[pin_project]
|
||||||
#[derive(Debug)]
|
|
||||||
struct Resp<F, R> {
|
struct Resp<F, R> {
|
||||||
state: RespState,
|
state: RespState,
|
||||||
request_id: u64,
|
request_id: u64,
|
||||||
@@ -565,7 +587,7 @@ where
|
|||||||
request_id: self.request_id,
|
request_id: self.request_id,
|
||||||
message: match result {
|
message: match result {
|
||||||
Ok(message) => Ok(message),
|
Ok(message) => Ok(message),
|
||||||
Err(tokio::time::Elapsed { .. }) => {
|
Err(tokio::time::error::Elapsed { .. }) => {
|
||||||
debug!(
|
debug!(
|
||||||
"[{}] Response did not complete before deadline of {}s.",
|
"[{}] Response did not complete before deadline of {}s.",
|
||||||
self.ctx.trace_id(),
|
self.ctx.trace_id(),
|
||||||
@@ -614,6 +636,12 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<F, R> fmt::Debug for Resp<F, R> {
|
||||||
|
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(fmt, "Resp")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<C, S> Stream for ClientHandler<C, S>
|
impl<C, S> Stream for ClientHandler<C, S>
|
||||||
where
|
where
|
||||||
C: Channel,
|
C: Channel,
|
||||||
@@ -624,11 +652,7 @@ where
|
|||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
loop {
|
loop {
|
||||||
let read = self.as_mut().pump_read(cx)?;
|
let read = self.as_mut().pump_read(cx)?;
|
||||||
let read_closed = if let Poll::Ready(None) = read {
|
let read_closed = matches!(read, Poll::Ready(None));
|
||||||
true
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
};
|
|
||||||
match (read, self.as_mut().pump_write(cx, read_closed)?) {
|
match (read, self.as_mut().pump_write(cx, read_closed)?) {
|
||||||
(Poll::Ready(None), Poll::Ready(None)) => {
|
(Poll::Ready(None), Poll::Ready(None)) => {
|
||||||
return Poll::Ready(None);
|
return Poll::Ready(None);
|
||||||
|
|||||||
@@ -117,10 +117,7 @@ pub trait PollExt {
|
|||||||
|
|
||||||
impl<T> PollExt for Poll<Option<T>> {
|
impl<T> PollExt for Poll<Option<T>> {
|
||||||
fn is_done(&self) -> bool {
|
fn is_done(&self) -> bool {
|
||||||
match self {
|
matches!(self, Poll::Ready(None))
|
||||||
Poll::Ready(None) => true,
|
|
||||||
_ => false,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -78,6 +78,7 @@ impl<Item, SinkItem> Sink<SinkItem> for UnboundedChannel<Item, SinkItem> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
#[cfg(feature = "tokio1")]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::{
|
use crate::{
|
||||||
client, context,
|
client, context,
|
||||||
@@ -89,8 +90,7 @@ mod tests {
|
|||||||
use log::trace;
|
use log::trace;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
#[cfg(feature = "tokio1")]
|
#[tokio::test]
|
||||||
#[tokio::test(threaded_scheduler)]
|
|
||||||
async fn integration() -> io::Result<()> {
|
async fn integration() -> io::Result<()> {
|
||||||
let _ = env_logger::try_init();
|
let _ = env_logger::try_init();
|
||||||
|
|
||||||
|
|||||||
@@ -10,8 +10,8 @@ use std::{
|
|||||||
time::{Duration, SystemTime},
|
time::{Duration, SystemTime},
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(feature = "serde")]
|
#[cfg(feature = "serde1")]
|
||||||
#[cfg_attr(docsrs, doc(cfg(feature = "serde")))]
|
#[cfg_attr(docsrs, doc(cfg(feature = "serde1")))]
|
||||||
pub mod serde;
|
pub mod serde;
|
||||||
|
|
||||||
/// Extension trait for [SystemTimes](SystemTime) in the future, i.e. deadlines.
|
/// Extension trait for [SystemTimes](SystemTime) in the future, i.e. deadlines.
|
||||||
|
|||||||
@@ -14,10 +14,7 @@ use serde::{Deserialize, Serialize};
|
|||||||
use std::{error::Error, io, pin::Pin};
|
use std::{error::Error, io, pin::Pin};
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
use tokio_serde::{Framed as SerdeFramed, *};
|
use tokio_serde::{Framed as SerdeFramed, *};
|
||||||
use tokio_util::codec::{
|
use tokio_util::codec::{length_delimited::LengthDelimitedCodec, Framed};
|
||||||
length_delimited::{self, LengthDelimitedCodec},
|
|
||||||
Framed,
|
|
||||||
};
|
|
||||||
|
|
||||||
/// A transport that serializes to, and deserializes from, a byte stream.
|
/// A transport that serializes to, and deserializes from, a byte stream.
|
||||||
#[pin_project]
|
#[pin_project]
|
||||||
@@ -130,6 +127,7 @@ pub mod tcp {
|
|||||||
futures::ready,
|
futures::ready,
|
||||||
std::{marker::PhantomData, net::SocketAddr},
|
std::{marker::PhantomData, net::SocketAddr},
|
||||||
tokio::net::{TcpListener, TcpStream, ToSocketAddrs},
|
tokio::net::{TcpListener, TcpStream, ToSocketAddrs},
|
||||||
|
tokio_util::codec::length_delimited,
|
||||||
};
|
};
|
||||||
|
|
||||||
mod private {
|
mod private {
|
||||||
@@ -269,9 +267,12 @@ pub mod tcp {
|
|||||||
type Item = io::Result<Transport<TcpStream, Item, SinkItem, Codec>>;
|
type Item = io::Result<Transport<TcpStream, Item, SinkItem, Codec>>;
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
let next =
|
let conn: TcpStream =
|
||||||
ready!(Pin::new(&mut self.as_mut().project().listener.incoming()).poll_next(cx)?);
|
ready!(Pin::new(&mut self.as_mut().project().listener).poll_accept(cx)?).0;
|
||||||
Poll::Ready(next.map(|conn| Ok(new(self.config.new_framed(conn), (self.codec_fn)()))))
|
Poll::Ready(Some(Ok(new(
|
||||||
|
self.config.new_framed(conn),
|
||||||
|
(self.codec_fn)(),
|
||||||
|
))))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -286,7 +287,7 @@ mod tests {
|
|||||||
io::{self, Cursor},
|
io::{self, Cursor},
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
};
|
};
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
use tokio_serde::formats::SymmetricalJson;
|
use tokio_serde::formats::SymmetricalJson;
|
||||||
|
|
||||||
fn ctx() -> Context<'static> {
|
fn ctx() -> Context<'static> {
|
||||||
@@ -301,8 +302,8 @@ mod tests {
|
|||||||
fn poll_read(
|
fn poll_read(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
buf: &mut [u8],
|
buf: &mut ReadBuf<'_>,
|
||||||
) -> Poll<io::Result<usize>> {
|
) -> Poll<io::Result<()>> {
|
||||||
AsyncRead::poll_read(Pin::new(self.0.get_mut()), cx, buf)
|
AsyncRead::poll_read(Pin::new(self.0.get_mut()), cx, buf)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -345,8 +346,8 @@ mod tests {
|
|||||||
fn poll_read(
|
fn poll_read(
|
||||||
self: Pin<&mut Self>,
|
self: Pin<&mut Self>,
|
||||||
_cx: &mut Context<'_>,
|
_cx: &mut Context<'_>,
|
||||||
_buf: &mut [u8],
|
_buf: &mut ReadBuf<'_>,
|
||||||
) -> Poll<io::Result<usize>> {
|
) -> Poll<io::Result<()>> {
|
||||||
unreachable!()
|
unreachable!()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ use std::{
|
|||||||
/// Consists of a span identifying an event, an optional parent span identifying a causal event
|
/// Consists of a span identifying an event, an optional parent span identifying a causal event
|
||||||
/// that triggered the current span, and a trace with which all related spans are associated.
|
/// that triggered the current span, and a trace with which all related spans are associated.
|
||||||
#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy)]
|
#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy)]
|
||||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
|
||||||
pub struct Context {
|
pub struct Context {
|
||||||
/// An identifier of the trace associated with the current context. A trace ID is typically
|
/// An identifier of the trace associated with the current context. A trace ID is typically
|
||||||
/// created at a root span and passed along through all causal events.
|
/// created at a root span and passed along through all causal events.
|
||||||
@@ -47,12 +47,12 @@ pub struct Context {
|
|||||||
/// A 128-bit UUID identifying a trace. All spans caused by the same originating span share the
|
/// A 128-bit UUID identifying a trace. All spans caused by the same originating span share the
|
||||||
/// same trace ID.
|
/// same trace ID.
|
||||||
#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy)]
|
#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy)]
|
||||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
|
||||||
pub struct TraceId(u128);
|
pub struct TraceId(u128);
|
||||||
|
|
||||||
/// A 64-bit identifier of a span within a trace. The identifier is unique within the span's trace.
|
/// A 64-bit identifier of a span within a trace. The identifier is unique within the span's trace.
|
||||||
#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy)]
|
#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy)]
|
||||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
|
||||||
pub struct SpanId(u64);
|
pub struct SpanId(u64);
|
||||||
|
|
||||||
impl Context {
|
impl Context {
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
#[tarpc::service]
|
#[tarpc::service(derive_serde = false)]
|
||||||
trait World {
|
trait World {
|
||||||
async fn hello(name: String) -> String;
|
async fn hello(name: String) -> String;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,11 +9,3 @@ error: hint: `#[tarpc::server]` only rewrites async fns, and `fn hello` is not a
|
|||||||
|
|
|
|
||||||
10 | fn hello(name: String) -> String {
|
10 | fn hello(name: String) -> String {
|
||||||
| ^^
|
| ^^
|
||||||
|
|
||||||
error[E0433]: failed to resolve: use of undeclared type or module `serde`
|
|
||||||
--> $DIR/tarpc_server_missing_async.rs:1:1
|
|
||||||
|
|
|
||||||
1 | #[tarpc::service]
|
|
||||||
| ^^^^^^^^^^^^^^^^^ use of undeclared type or module `serde`
|
|
||||||
|
|
|
||||||
= note: this error originates in an attribute macro (in Nightly builds, run with -Z macro-backtrace for more info)
|
|
||||||
|
|||||||
51
tarpc/tests/dataservice.rs
Normal file
51
tarpc/tests/dataservice.rs
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
use futures::prelude::*;
|
||||||
|
use std::io;
|
||||||
|
use tarpc::serde_transport;
|
||||||
|
use tarpc::{client, context, server::Handler};
|
||||||
|
use tokio_serde::formats::Json;
|
||||||
|
|
||||||
|
#[tarpc::derive_serde]
|
||||||
|
#[derive(Debug, PartialEq)]
|
||||||
|
pub enum TestData {
|
||||||
|
Black,
|
||||||
|
White,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tarpc::service]
|
||||||
|
pub trait ColorProtocol {
|
||||||
|
async fn get_opposite_color(color: TestData) -> TestData;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct ColorServer;
|
||||||
|
|
||||||
|
#[tarpc::server]
|
||||||
|
impl ColorProtocol for ColorServer {
|
||||||
|
async fn get_opposite_color(self, _: context::Context, color: TestData) -> TestData {
|
||||||
|
match color {
|
||||||
|
TestData::White => TestData::Black,
|
||||||
|
TestData::Black => TestData::White,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_call() -> io::Result<()> {
|
||||||
|
let transport = tarpc::serde_transport::tcp::listen("localhost:56797", Json::default).await?;
|
||||||
|
let addr = transport.local_addr();
|
||||||
|
tokio::spawn(
|
||||||
|
tarpc::Server::default()
|
||||||
|
.incoming(transport.take(1).filter_map(|r| async { r.ok() }))
|
||||||
|
.respond_with(ColorServer.serve()),
|
||||||
|
);
|
||||||
|
|
||||||
|
let transport = serde_transport::tcp::connect(addr, Json::default).await?;
|
||||||
|
let mut client = ColorProtocolClient::new(client::Config::default(), transport).spawn()?;
|
||||||
|
|
||||||
|
let color = client
|
||||||
|
.get_opposite_color(context::current(), TestData::White)
|
||||||
|
.await?;
|
||||||
|
assert_eq!(color, TestData::Black);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
@@ -6,12 +6,11 @@ use futures::{
|
|||||||
use std::io;
|
use std::io;
|
||||||
use tarpc::{
|
use tarpc::{
|
||||||
client::{self},
|
client::{self},
|
||||||
context, serde_transport,
|
context,
|
||||||
server::{self, BaseChannel, Channel, Handler},
|
server::{self, BaseChannel, Channel, Handler},
|
||||||
transport::channel,
|
transport::channel,
|
||||||
};
|
};
|
||||||
use tokio::join;
|
use tokio::join;
|
||||||
use tokio_serde::formats::Json;
|
|
||||||
|
|
||||||
#[tarpc_plugins::service]
|
#[tarpc_plugins::service]
|
||||||
trait Service {
|
trait Service {
|
||||||
@@ -36,7 +35,7 @@ impl Service for Server {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn sequential() -> io::Result<()> {
|
async fn sequential() -> io::Result<()> {
|
||||||
let _ = env_logger::try_init();
|
let _ = env_logger::try_init();
|
||||||
|
|
||||||
@@ -58,12 +57,15 @@ async fn sequential() -> io::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "serde1")]
|
#[cfg(all(feature = "serde-transport", feature = "tcp"))]
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn serde() -> io::Result<()> {
|
async fn serde() -> io::Result<()> {
|
||||||
|
use tarpc::serde_transport;
|
||||||
|
use tokio_serde::formats::Json;
|
||||||
|
|
||||||
let _ = env_logger::try_init();
|
let _ = env_logger::try_init();
|
||||||
|
|
||||||
let transport = serde_transport::tcp::listen("localhost:56789", Json::default).await?;
|
let transport = tarpc::serde_transport::tcp::listen("localhost:56789", Json::default).await?;
|
||||||
let addr = transport.local_addr();
|
let addr = transport.local_addr();
|
||||||
tokio::spawn(
|
tokio::spawn(
|
||||||
tarpc::Server::default()
|
tarpc::Server::default()
|
||||||
@@ -83,7 +85,7 @@ async fn serde() -> io::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn concurrent() -> io::Result<()> {
|
async fn concurrent() -> io::Result<()> {
|
||||||
let _ = env_logger::try_init();
|
let _ = env_logger::try_init();
|
||||||
|
|
||||||
@@ -112,7 +114,7 @@ async fn concurrent() -> io::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn concurrent_join() -> io::Result<()> {
|
async fn concurrent_join() -> io::Result<()> {
|
||||||
let _ = env_logger::try_init();
|
let _ = env_logger::try_init();
|
||||||
|
|
||||||
@@ -142,7 +144,7 @@ async fn concurrent_join() -> io::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(threaded_scheduler)]
|
#[tokio::test]
|
||||||
async fn concurrent_join_all() -> io::Result<()> {
|
async fn concurrent_join_all() -> io::Result<()> {
|
||||||
let _ = env_logger::try_init();
|
let _ = env_logger::try_init();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user