18 Commits

Author SHA1 Message Date
Tim Kuehn
a3a6404a30 Prepare release of 0.28.0 2022-04-06 22:07:07 -07:00
Tim Kuehn
b36eac80b1 Bump minimum rust version to 1.58.0 2022-04-06 21:53:56 -07:00
Bruno
d7070e4bc3 Update opentelemetry and related dependencies (#362) 2022-04-03 14:09:14 -07:00
Tim Kuehn
b5d1828308 Use captured identifiers in format strings.
This was stabilized in Rust 1.58.0: https://blog.rust-lang.org/2022/01/13/Rust-1.58.0.html
2022-01-13 15:00:44 -08:00
Zak Cutner
92cfe63c4f Use single-threaded Tokio runtime (#360) 2022-01-06 20:59:51 -08:00
Tim Kuehn
839a2f067c Update example to latest version of Clap 2021-12-27 22:56:17 -08:00
David Kleingeld
b5d593488c Derive more traits for RpcError (#359)
Makes RpcError derive Clone, PartialEq, Eq, Hash, Serialize and Deserialize.
2021-12-27 22:00:58 -08:00
Tim Kuehn
eea38b8bf4 Simplify code with const assert!.
The code that prevents compilation on systems where usize is larger than
u64 previously used a const index-out-of-bounds trick. That code can now
be replaced with assert!, as const panic! has landed in 1.57.0 stable.
2021-12-03 15:20:33 -08:00
Shi Yan
70493c15f4 Fix a compiling issue of the official example (#358)
Fix a compiling issue of the official example because of the following error :

```
error[E0599]: the method `execute` exists for struct `BaseChannel<_, _, UnboundedChannel<ClientMessage<_>, Response<_>>>`, but its trait bounds were not satisfied
  --> src/main.rs:39:25
   |
39 |     tokio::spawn(server.execute(HelloServer.serve()));
   |                         ^^^^^^^ method cannot be called on `BaseChannel<_, _, UnboundedChannel<ClientMessage<_>, Response<_>>>` due to unsatisfied trait bounds
   |
   = note: the following trait bounds were not satisfied:
           `<&BaseChannel<_, _, UnboundedChannel<ClientMessage<_>, Response<_>>> as futures::Stream>::Item = _`
           which is required by `&BaseChannel<_, _, UnboundedChannel<ClientMessage<_>, Response<_>>>: tarpc::server::incoming::Incoming<_>`
           `&BaseChannel<_, _, UnboundedChannel<ClientMessage<_>, Response<_>>>: futures::Stream`
           which is required by `&BaseChannel<_, _, UnboundedChannel<ClientMessage<_>, Response<_>>>: tarpc::server::incoming::Incoming<_>`
   = help: items from traits can only be used if the trait is in scope
help: the following trait is implemented but not in scope; perhaps add a `use` for it:
   |
1  | use tarpc::server::Channel;
   |
```

See https://github.com/google/tarpc/pull/358#issuecomment-981953193 for the root cause.
2021-11-29 17:01:16 -08:00
baptiste0928
f7c5d6a7c3 Fix example-service (#355)
Fixes the compilation of the example-service crate (the Clap trait has been renamed Parser in clap-rs/clap@d840d56).
2021-11-15 08:39:04 -08:00
Scott Kirkpatrick
98c5d2a18b Re-add typo fixes (#353)
The typo fixes that were added by commit b5d9aaa
were accidentally reverted by commit 1e680e3, this
will add them back
2021-11-08 10:07:21 -08:00
Tim Kuehn
46b534f7c6 Use HashMap::shrink_to in impl of Comapct::compact. 2021-10-21 17:03:57 -07:00
Tim Kuehn
42b4fc52b1 Set rust-version to 1.56 2021-10-21 16:08:15 -07:00
Tim Kuehn
350dbcdad0 Upgrade to Rust 2021! 2021-10-21 14:10:21 -07:00
Tim Kuehn
b1b4461d89 Prepare release of 0.27.2 2021-10-08 22:31:56 -07:00
Tim Kuehn
f694b7573a Close TcpStream when client disconnects.
An attempt at a clean shutdown helps the server to drop its connections
more quickly.

Testing this uncovered a latent bug in DelayQueue wherein `poll_expired`
yields `Pending` when empty. A workaround was added to
`InFlightRequests::poll_expired`: check if there are actually any
outstanding requests before calling `DelayQueue::poll_expired`.
2021-10-08 22:13:24 -07:00
Tim Kuehn
1e680e3a5a Fix typos in docs.
Fixes https://github.com/google/tarpc/issues/352.
2021-10-08 19:19:50 -07:00
Tim Kuehn
2591d21e94 Update release notes to mention io::Error = 2021-09-23 13:57:43 -07:00
22 changed files with 191 additions and 75 deletions

View File

@@ -67,7 +67,7 @@ Some other features of tarpc:
Add to your `Cargo.toml` dependencies: Add to your `Cargo.toml` dependencies:
```toml ```toml
tarpc = "0.27" tarpc = "0.28"
``` ```
The `tarpc::service` attribute expands to a collection of items that form an rpc service. The `tarpc::service` attribute expands to a collection of items that form an rpc service.
@@ -81,8 +81,8 @@ your `Cargo.toml`:
```toml ```toml
anyhow = "1.0" anyhow = "1.0"
futures = "1.0" futures = "0.3"
tarpc = { version = "0.27", features = ["tokio1"] } tarpc = { version = "0.28", features = ["tokio1"] }
tokio = { version = "1.0", features = ["macros"] } tokio = { version = "1.0", features = ["macros"] }
``` ```
@@ -100,7 +100,7 @@ use futures::{
}; };
use tarpc::{ use tarpc::{
client, context, client, context,
server::{self, incoming::Incoming}, server::{self, incoming::Incoming, Channel},
}; };
// This is the service definition. It looks a lot like a trait definition. // This is the service definition. It looks a lot like a trait definition.
@@ -128,7 +128,7 @@ impl World for HelloServer {
type HelloFut = Ready<String>; type HelloFut = Ready<String>;
fn hello(self, _: context::Context, name: String) -> Self::HelloFut { fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
future::ready(format!("Hello, {}!", name)) future::ready(format!("Hello, {name}!"))
} }
} }
``` ```
@@ -155,7 +155,7 @@ async fn main() -> anyhow::Result<()> {
// specifies a deadline and trace information which can be helpful in debugging requests. // specifies a deadline and trace information which can be helpful in debugging requests.
let hello = client.hello(context::current(), "Stim".to_string()).await?; let hello = client.hello(context::current(), "Stim".to_string()).await?;
println!("{}", hello); println!("{hello}");
Ok(()) Ok(())
} }

View File

@@ -1,7 +1,43 @@
## 0.28.0 (2022-04-06)
### Breaking Changes
- The minimum supported Rust version has increased to 1.58.0.
- The version of opentelemetry depended on by tarpc has increased to 0.17.0.
## 0.27.2 (2021-10-08)
### Fixes
Clients will now close their transport before dropping it. An attempt at a clean shutdown can help
the server drop its connections more quickly.
## 0.27.1 (2021-09-22) ## 0.27.1 (2021-09-22)
### Breaking Changes ### Breaking Changes
### RPC error type is changing
RPC return types are changing from `Result<Response, io::Error>` to `Result<Response,
tarpc::client::RpcError>`.
Becaue tarpc is a library, not an application, it should strive to
use structured errors in its API so that users have maximal flexibility
in how they handle errors. io::Error makes that hard, because it is a
kitchen-sink error type.
RPCs in particular only have 3 classes of errors:
- The connection breaks.
- The request expires.
- The server decides not to process the request.
RPC responses can also contain application-specific errors, but from the
perspective of the RPC library, those are opaque to the framework, classified
as successful responsees.
### Open Telemetry
The Opentelemetry dependency is updated to version 0.16.x. The Opentelemetry dependency is updated to version 0.16.x.
## 0.27.0 (2021-09-22) ## 0.27.0 (2021-09-22)

View File

@@ -1,8 +1,9 @@
[package] [package]
name = "tarpc-example-service" name = "tarpc-example-service"
version = "0.10.0" version = "0.10.0"
rust-version = "1.56"
authors = ["Tim Kuehn <tikue@google.com>"] authors = ["Tim Kuehn <tikue@google.com>"]
edition = "2018" edition = "2021"
license = "MIT" license = "MIT"
documentation = "https://docs.rs/tarpc-example-service" documentation = "https://docs.rs/tarpc-example-service"
homepage = "https://github.com/google/tarpc" homepage = "https://github.com/google/tarpc"
@@ -14,13 +15,13 @@ description = "An example server built on tarpc."
[dependencies] [dependencies]
anyhow = "1.0" anyhow = "1.0"
clap = "3.0.0-beta.2" clap = { version = "3.0.0-rc.9", features = ["derive"] }
log = "0.4" log = "0.4"
futures = "0.3" futures = "0.3"
opentelemetry = { version = "0.16", features = ["rt-tokio"] } opentelemetry = { version = "0.16", features = ["rt-tokio"] }
opentelemetry-jaeger = { version = "0.15", features = ["rt-tokio"] } opentelemetry-jaeger = { version = "0.15", features = ["rt-tokio"] }
rand = "0.8" rand = "0.8"
tarpc = { version = "0.27", path = "../tarpc", features = ["full"] } tarpc = { version = "0.28", path = "../tarpc", features = ["full"] }
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread"] } tokio = { version = "1", features = ["macros", "net", "rt-multi-thread"] }
tracing = { version = "0.1" } tracing = { version = "0.1" }
tracing-opentelemetry = "0.15" tracing-opentelemetry = "0.15"

View File

@@ -4,14 +4,14 @@
// license that can be found in the LICENSE file or at // license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT. // https://opensource.org/licenses/MIT.
use clap::Clap; use clap::Parser;
use service::{init_tracing, WorldClient}; use service::{init_tracing, WorldClient};
use std::{net::SocketAddr, time::Duration}; use std::{net::SocketAddr, time::Duration};
use tarpc::{client, context, tokio_serde::formats::Json}; use tarpc::{client, context, tokio_serde::formats::Json};
use tokio::time::sleep; use tokio::time::sleep;
use tracing::Instrument; use tracing::Instrument;
#[derive(Clap)] #[derive(Parser)]
struct Flags { struct Flags {
/// Sets the server address to connect to. /// Sets the server address to connect to.
#[clap(long)] #[clap(long)]

View File

@@ -4,7 +4,7 @@
// license that can be found in the LICENSE file or at // license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT. // https://opensource.org/licenses/MIT.
use clap::Clap; use clap::Parser;
use futures::{future, prelude::*}; use futures::{future, prelude::*};
use rand::{ use rand::{
distributions::{Distribution, Uniform}, distributions::{Distribution, Uniform},
@@ -22,7 +22,7 @@ use tarpc::{
}; };
use tokio::time; use tokio::time;
#[derive(Clap)] #[derive(Parser)]
struct Flags { struct Flags {
/// Sets the port number to listen on. /// Sets the port number to listen on.
#[clap(long)] #[clap(long)]
@@ -40,7 +40,7 @@ impl World for HelloServer {
let sleep_time = let sleep_time =
Duration::from_millis(Uniform::new_inclusive(1, 10).sample(&mut thread_rng())); Duration::from_millis(Uniform::new_inclusive(1, 10).sample(&mut thread_rng()));
time::sleep(sleep_time).await; time::sleep(sleep_time).await;
format!("Hello, {}! You are connected from {}", name, self.0) format!("Hello, {name}! You are connected from {}", self.0)
} }
} }

View File

@@ -1,8 +1,9 @@
[package] [package]
name = "tarpc-plugins" name = "tarpc-plugins"
version = "0.12.0" version = "0.12.0"
rust-version = "1.56"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"] authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
edition = "2018" edition = "2021"
license = "MIT" license = "MIT"
documentation = "https://docs.rs/tarpc-plugins" documentation = "https://docs.rs/tarpc-plugins"
homepage = "https://github.com/google/tarpc" homepage = "https://github.com/google/tarpc"

View File

@@ -83,7 +83,7 @@ impl Parse for Service {
ident_errors, ident_errors,
syn::Error::new( syn::Error::new(
rpc.ident.span(), rpc.ident.span(),
format!("method name conflicts with generated fn `{}::serve`", ident) format!("method name conflicts with generated fn `{ident}::serve`")
) )
); );
} }
@@ -270,7 +270,7 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
let methods = rpcs.iter().map(|rpc| &rpc.ident).collect::<Vec<_>>(); let methods = rpcs.iter().map(|rpc| &rpc.ident).collect::<Vec<_>>();
let request_names = methods let request_names = methods
.iter() .iter()
.map(|m| format!("{}.{}", ident, m)) .map(|m| format!("{ident}.{m}"))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
ServiceGenerator { ServiceGenerator {
@@ -306,7 +306,7 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
future_types: &camel_case_fn_names future_types: &camel_case_fn_names
.iter() .iter()
.map(|name| parse_str(&format!("{}Fut", name)).unwrap()) .map(|name| parse_str(&format!("{name}Fut")).unwrap())
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
derive_serialize: derive_serialize.as_ref(), derive_serialize: derive_serialize.as_ref(),
} }
@@ -409,7 +409,7 @@ fn verify_types_were_provided(
if !provided.iter().any(|typedecl| typedecl.ident == expected) { if !provided.iter().any(|typedecl| typedecl.ident == expected) {
let mut e = syn::Error::new( let mut e = syn::Error::new(
span, span,
format!("not all trait items implemented, missing: `{}`", expected), format!("not all trait items implemented, missing: `{expected}`"),
); );
let fn_span = method.sig.fn_token.span(); let fn_span = method.sig.fn_token.span();
e.extend(syn::Error::new( e.extend(syn::Error::new(
@@ -479,7 +479,7 @@ impl<'a> ServiceGenerator<'a> {
), ),
output, output,
)| { )| {
let ty_doc = format!("The response future returned by [`{}::{}`].", service_ident, ident); let ty_doc = format!("The response future returned by [`{service_ident}::{ident}`].");
quote! { quote! {
#[doc = #ty_doc] #[doc = #ty_doc]
type #future_type: std::future::Future<Output = #output>; type #future_type: std::future::Future<Output = #output>;

View File

@@ -1,8 +1,12 @@
[package] [package]
name = "tarpc" name = "tarpc"
version = "0.27.1" version = "0.28.0"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"] rust-version = "1.58.0"
edition = "2018" authors = [
"Adam Wright <adam.austin.wright@gmail.com>",
"Tim Kuehn <timothy.j.kuehn@gmail.com>",
]
edition = "2021"
license = "MIT" license = "MIT"
documentation = "https://docs.rs/tarpc" documentation = "https://docs.rs/tarpc"
homepage = "https://github.com/google/tarpc" homepage = "https://github.com/google/tarpc"
@@ -16,13 +20,20 @@ description = "An RPC framework for Rust with a focus on ease of use."
default = [] default = []
serde1 = ["tarpc-plugins/serde1", "serde", "serde/derive"] serde1 = ["tarpc-plugins/serde1", "serde", "serde/derive"]
tokio1 = ["tokio/rt-multi-thread"] tokio1 = ["tokio/rt"]
serde-transport = ["serde1", "tokio1", "tokio-serde", "tokio-util/codec"] serde-transport = ["serde1", "tokio1", "tokio-serde", "tokio-util/codec"]
serde-transport-json = ["tokio-serde/json"] serde-transport-json = ["tokio-serde/json"]
serde-transport-bincode = ["tokio-serde/bincode"] serde-transport-bincode = ["tokio-serde/bincode"]
tcp = ["tokio/net"] tcp = ["tokio/net"]
full = ["serde1", "tokio1", "serde-transport", "serde-transport-json", "serde-transport-bincode", "tcp"] full = [
"serde1",
"tokio1",
"serde-transport",
"serde-transport-json",
"serde-transport-bincode",
"tcp",
]
[badges] [badges]
travis-ci = { repository = "google/tarpc" } travis-ci = { repository = "google/tarpc" }
@@ -39,11 +50,14 @@ static_assertions = "1.1.0"
tarpc-plugins = { path = "../plugins", version = "0.12" } tarpc-plugins = { path = "../plugins", version = "0.12" }
thiserror = "1.0" thiserror = "1.0"
tokio = { version = "1", features = ["time"] } tokio = { version = "1", features = ["time"] }
tokio-util = { version = "0.6.3", features = ["time"] } tokio-util = { version = "0.6.9", features = ["time"] }
tokio-serde = { optional = true, version = "0.8" } tokio-serde = { optional = true, version = "0.8" }
tracing = { version = "0.1", default-features = false, features = ["attributes", "log"] } tracing = { version = "0.1", default-features = false, features = [
tracing-opentelemetry = { version = "0.15", default-features = false } "attributes",
opentelemetry = { version = "0.16", default-features = false } "log",
] }
tracing-opentelemetry = { version = "0.17.2", default-features = false }
opentelemetry = { version = "0.17.0", default-features = false }
[dev-dependencies] [dev-dependencies]
@@ -52,11 +66,13 @@ bincode = "1.3"
bytes = { version = "1", features = ["serde"] } bytes = { version = "1", features = ["serde"] }
flate2 = "1.0" flate2 = "1.0"
futures-test = "0.3" futures-test = "0.3"
opentelemetry = { version = "0.16", default-features = false, features = ["rt-tokio"] } opentelemetry = { version = "0.17.0", default-features = false, features = [
opentelemetry-jaeger = { version = "0.15", features = ["rt-tokio"] } "rt-tokio",
] }
opentelemetry-jaeger = { version = "0.16.0", features = ["rt-tokio"] }
pin-utils = "0.1.0-alpha" pin-utils = "0.1.0-alpha"
serde_bytes = "0.11" serde_bytes = "0.11"
tracing-subscriber = "0.2" tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tokio = { version = "1", features = ["full", "test-util"] } tokio = { version = "1", features = ["full", "test-util"] }
tokio-serde = { version = "0.8", features = ["json", "bincode"] } tokio-serde = { version = "0.8", features = ["json", "bincode"] }
trybuild = "1.0" trybuild = "1.0"

View File

@@ -54,7 +54,7 @@ where
if algorithm != CompressionAlgorithm::Deflate { if algorithm != CompressionAlgorithm::Deflate {
return Err(io::Error::new( return Err(io::Error::new(
io::ErrorKind::InvalidData, io::ErrorKind::InvalidData,
format!("Compression algorithm {:?} not supported", algorithm), format!("Compression algorithm {algorithm:?} not supported"),
)); ));
} }
let mut deflater = DeflateDecoder::new(payload.as_slice()); let mut deflater = DeflateDecoder::new(payload.as_slice());
@@ -102,7 +102,7 @@ struct HelloServer;
#[tarpc::server] #[tarpc::server]
impl World for HelloServer { impl World for HelloServer {
async fn hello(self, _: context::Context, name: String) -> String { async fn hello(self, _: context::Context, name: String) -> String {
format!("Hey, {}!", name) format!("Hey, {name}!")
} }
} }

View File

@@ -290,7 +290,7 @@ fn init_tracing(service_name: &str) -> anyhow::Result<()> {
.install_batch(opentelemetry::runtime::Tokio)?; .install_batch(opentelemetry::runtime::Tokio)?;
tracing_subscriber::registry() tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env()) .with(tracing_subscriber::filter::EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer()) .with(tracing_subscriber::fmt::layer())
.with(tracing_opentelemetry::layer().with_tracer(tracer)) .with(tracing_opentelemetry::layer().with_tracer(tracer))
.try_init()?; .try_init()?;

View File

@@ -29,7 +29,7 @@ impl World for HelloServer {
type HelloFut = Ready<String>; type HelloFut = Ready<String>;
fn hello(self, _: context::Context, name: String) -> Self::HelloFut { fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
future::ready(format!("Hello, {}!", name)) future::ready(format!("Hello, {name}!"))
} }
} }
@@ -49,7 +49,7 @@ async fn main() -> anyhow::Result<()> {
// specifies a deadline and trace information which can be helpful in debugging requests. // specifies a deadline and trace information which can be helpful in debugging requests.
let hello = client.hello(context::current(), "Stim".to_string()).await?; let hello = client.hello(context::current(), "Stim".to_string()).await?;
println!("{}", hello); println!("{hello}");
Ok(()) Ok(())
} }

View File

@@ -81,14 +81,10 @@ impl<C, D> fmt::Debug for NewClient<C, D> {
} }
} }
#[allow(dead_code)] const _CHECK_USIZE: () = assert!(
#[allow(clippy::no_effect)] std::mem::size_of::<usize>() <= std::mem::size_of::<u64>(),
const CHECK_USIZE: () = { "usize is too big to fit in u64"
if std::mem::size_of::<usize>() > std::mem::size_of::<u64>() { );
// TODO: replace this with panic!() as soon as RFC 2345 gets stabilized
["usize is too big to fit in u64"][42];
}
};
/// Handles communication from the client to request dispatch. /// Handles communication from the client to request dispatch.
#[derive(Debug)] #[derive(Debug)]
@@ -172,7 +168,8 @@ struct ResponseGuard<'a, Resp> {
/// An error that can occur in the processing of an RPC. This is not request-specific errors but /// An error that can occur in the processing of an RPC. This is not request-specific errors but
/// rather cross-cutting errors that can always occur. /// rather cross-cutting errors that can always occur.
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub enum RpcError { pub enum RpcError {
/// The client disconnected from the server. /// The client disconnected from the server.
#[error("the client disconnected from the server")] #[error("the client disconnected from the server")]
@@ -291,6 +288,9 @@ where
/// Could not flush the transport. /// Could not flush the transport.
#[error("could not flush the transport")] #[error("could not flush the transport")]
Flush(#[source] E), Flush(#[source] E),
/// Could not close the write end of the transport.
#[error("could not close the write end of the transport")]
Close(#[source] E),
/// Could not poll expired requests. /// Could not poll expired requests.
#[error("could not poll expired requests")] #[error("could not poll expired requests")]
Timer(#[source] tokio::time::error::Error), Timer(#[source] tokio::time::error::Error),
@@ -335,6 +335,15 @@ where
.map_err(ChannelError::Flush) .map_err(ChannelError::Flush)
} }
fn poll_close<'a>(
self: &'a mut Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), ChannelError<C::Error>>> {
self.transport_pin_mut()
.poll_close(cx)
.map_err(ChannelError::Close)
}
fn canceled_requests_mut<'a>(self: &'a mut Pin<&mut Self>) -> &'a mut CanceledRequests { fn canceled_requests_mut<'a>(self: &'a mut Pin<&mut Self>) -> &'a mut CanceledRequests {
self.as_mut().project().canceled_requests self.as_mut().project().canceled_requests
} }
@@ -394,7 +403,7 @@ where
match (pending_requests_status, canceled_requests_status) { match (pending_requests_status, canceled_requests_status) {
(ReceiverStatus::Closed, ReceiverStatus::Closed) => { (ReceiverStatus::Closed, ReceiverStatus::Closed) => {
ready!(self.poll_flush(cx)?); ready!(self.poll_close(cx)?);
Poll::Ready(None) Poll::Ready(None)
} }
(ReceiverStatus::Pending, _) | (_, ReceiverStatus::Pending) => { (ReceiverStatus::Pending, _) | (_, ReceiverStatus::Pending) => {

View File

@@ -27,7 +27,7 @@
//! process, and no context switching between different languages. //! process, and no context switching between different languages.
//! //!
//! Some other features of tarpc: //! Some other features of tarpc:
//! - Pluggable transport: any type impling `Stream<Item = Request> + Sink<Response>` can be //! - Pluggable transport: any type implementing `Stream<Item = Request> + Sink<Response>` can be
//! used as a transport to connect the client and server. //! used as a transport to connect the client and server.
//! - `Send + 'static` optional: if the transport doesn't require it, neither does tarpc! //! - `Send + 'static` optional: if the transport doesn't require it, neither does tarpc!
//! - Cascading cancellation: dropping a request will send a cancellation message to the server. //! - Cascading cancellation: dropping a request will send a cancellation message to the server.
@@ -42,7 +42,7 @@
//! [tracing](https://github.com/tokio-rs/tracing) primitives extended with //! [tracing](https://github.com/tokio-rs/tracing) primitives extended with
//! [OpenTelemetry](https://opentelemetry.io/) traces. Using a compatible tracing subscriber like //! [OpenTelemetry](https://opentelemetry.io/) traces. Using a compatible tracing subscriber like
//! [Jaeger](https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-jaeger), //! [Jaeger](https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-jaeger),
//! each RPC can be traced through the client, server, amd other dependencies downstream of the //! each RPC can be traced through the client, server, and other dependencies downstream of the
//! server. Even for applications not connected to a distributed tracing collector, the //! server. Even for applications not connected to a distributed tracing collector, the
//! instrumentation can also be ingested by regular loggers like //! instrumentation can also be ingested by regular loggers like
//! [env_logger](https://github.com/env-logger-rs/env_logger/). //! [env_logger](https://github.com/env-logger-rs/env_logger/).
@@ -54,7 +54,7 @@
//! Add to your `Cargo.toml` dependencies: //! Add to your `Cargo.toml` dependencies:
//! //!
//! ```toml //! ```toml
//! tarpc = "0.27" //! tarpc = "0.28"
//! ``` //! ```
//! //!
//! The `tarpc::service` attribute expands to a collection of items that form an rpc service. //! The `tarpc::service` attribute expands to a collection of items that form an rpc service.
@@ -68,8 +68,8 @@
//! //!
//! ```toml //! ```toml
//! anyhow = "1.0" //! anyhow = "1.0"
//! futures = "1.0" //! futures = "0.3"
//! tarpc = { version = "0.27", features = ["tokio1"] } //! tarpc = { version = "0.28", features = ["tokio1"] }
//! tokio = { version = "1.0", features = ["macros"] } //! tokio = { version = "1.0", features = ["macros"] }
//! ``` //! ```
//! //!
@@ -88,7 +88,7 @@
//! }; //! };
//! use tarpc::{ //! use tarpc::{
//! client, context, //! client, context,
//! server::{self, incoming::Incoming}, //! server::{self, incoming::Incoming, Channel},
//! }; //! };
//! //!
//! // This is the service definition. It looks a lot like a trait definition. //! // This is the service definition. It looks a lot like a trait definition.
@@ -132,7 +132,7 @@
//! type HelloFut = Ready<String>; //! type HelloFut = Ready<String>;
//! //!
//! fn hello(self, _: context::Context, name: String) -> Self::HelloFut { //! fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
//! future::ready(format!("Hello, {}!", name)) //! future::ready(format!("Hello, {name}!"))
//! } //! }
//! } //! }
//! ``` //! ```
@@ -168,7 +168,7 @@
//! # // an associated type representing the future output by the fn. //! # // an associated type representing the future output by the fn.
//! # type HelloFut = Ready<String>; //! # type HelloFut = Ready<String>;
//! # fn hello(self, _: context::Context, name: String) -> Self::HelloFut { //! # fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
//! # future::ready(format!("Hello, {}!", name)) //! # future::ready(format!("Hello, {name}!"))
//! # } //! # }
//! # } //! # }
//! # #[cfg(not(feature = "tokio1"))] //! # #[cfg(not(feature = "tokio1"))]
@@ -190,7 +190,7 @@
//! // specifies a deadline and trace information which can be helpful in debugging requests. //! // specifies a deadline and trace information which can be helpful in debugging requests.
//! let hello = client.hello(context::current(), "Stim".to_string()).await?; //! let hello = client.hello(context::current(), "Stim".to_string()).await?;
//! //!
//! println!("{}", hello); //! println!("{hello}");
//! //!
//! Ok(()) //! Ok(())
//! } //! }
@@ -264,7 +264,7 @@ pub use tarpc_plugins::service;
/// #[tarpc::server] /// #[tarpc::server]
/// impl World for HelloServer { /// impl World for HelloServer {
/// async fn hello(self, _: context::Context, name: String) -> String { /// async fn hello(self, _: context::Context, name: String) -> String {
/// format!("Hello, {}! You are connected from {:?}.", name, self.0) /// format!("Hello, {name}! You are connected from {:?}.", self.0)
/// } /// }
/// } /// }
/// ``` /// ```
@@ -290,7 +290,7 @@ pub use tarpc_plugins::service;
/// fn hello(self, _: context::Context, name: String) -> Pin<Box<dyn Future<Output = String> /// fn hello(self, _: context::Context, name: String) -> Pin<Box<dyn Future<Output = String>
/// + Send>> { /// + Send>> {
/// Box::pin(async move { /// Box::pin(async move {
/// format!("Hello, {}! You are connected from {:?}.", name, self.0) /// format!("Hello, {name}! You are connected from {:?}.", self.0)
/// }) /// })
/// } /// }
/// } /// }

View File

@@ -333,6 +333,7 @@ where
type Item = Result<TrackedRequest<Req>, ChannelError<T::Error>>; type Item = Result<TrackedRequest<Req>, ChannelError<T::Error>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
#[derive(Debug)]
enum ReceiverStatus { enum ReceiverStatus {
Ready, Ready,
Pending, Pending,
@@ -388,6 +389,11 @@ where
Poll::Pending => Pending, Poll::Pending => Pending,
}; };
tracing::trace!(
"Expired requests: {:?}, Inbound: {:?}",
expiration_status,
request_status
);
match (expiration_status, request_status) { match (expiration_status, request_status) {
(Ready, _) | (_, Ready) => continue, (Ready, _) | (_, Ready) => continue,
(Closed, Closed) => return Poll::Ready(None), (Closed, Closed) => return Poll::Ready(None),

View File

@@ -98,6 +98,11 @@ impl InFlightRequests {
&mut self, &mut self,
cx: &mut Context, cx: &mut Context,
) -> Poll<Option<Result<u64, tokio::time::error::Error>>> { ) -> Poll<Option<Result<u64, tokio::time::error::Error>>> {
if self.deadlines.is_empty() {
// TODO(https://github.com/tokio-rs/tokio/issues/4161)
// This is a workaround for DelayQueue not always treating this case correctly.
return Poll::Ready(None);
}
self.deadlines.poll_expired(cx).map_ok(|expired| { self.deadlines.poll_expired(cx).map_ok(|expired| {
if let Some(RequestData { if let Some(RequestData {
abort_handle, span, .. abort_handle, span, ..
@@ -184,12 +189,31 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn remove_request_doesnt_abort() { async fn remove_request_doesnt_abort() {
let mut in_flight_requests = InFlightRequests::default(); let mut in_flight_requests = InFlightRequests::default();
assert!(in_flight_requests.deadlines.is_empty());
let abort_registration = in_flight_requests let abort_registration = in_flight_requests
.start_request(0, SystemTime::now(), Span::current()) .start_request(
0,
SystemTime::now() + std::time::Duration::from_secs(10),
Span::current(),
)
.unwrap(); .unwrap();
let mut abortable_future = Box::new(Abortable::new(pending::<()>(), abort_registration)); let mut abortable_future = Box::new(Abortable::new(pending::<()>(), abort_registration));
// Precondition: Pending expiration
assert_matches!(
in_flight_requests.poll_expired(&mut noop_context()),
Poll::Pending
);
assert!(!in_flight_requests.deadlines.is_empty());
assert_matches!(in_flight_requests.remove_request(0), Some(_)); assert_matches!(in_flight_requests.remove_request(0), Some(_));
// Postcondition: No pending expirations
assert!(in_flight_requests.deadlines.is_empty());
assert_matches!(
in_flight_requests.poll_expired(&mut noop_context()),
Poll::Ready(None)
);
assert_matches!( assert_matches!(
abortable_future.poll_unpin(&mut noop_context()), abortable_future.poll_unpin(&mut noop_context()),
Poll::Pending Poll::Pending

View File

@@ -8,7 +8,7 @@ use std::{fmt, hash::Hash};
#[cfg(feature = "tokio1")] #[cfg(feature = "tokio1")]
use super::{tokio::TokioServerExecutor, Serve}; use super::{tokio::TokioServerExecutor, Serve};
/// An extension trait for [streams](Stream) of [`Channels`](Channel). /// An extension trait for [streams](futures::prelude::Stream) of [`Channels`](Channel).
pub trait Incoming<C> pub trait Incoming<C>
where where
Self: Sized + Stream<Item = C>, Self: Sized + Stream<Item = C>,

View File

@@ -138,25 +138,25 @@ impl From<u64> for SpanId {
impl From<opentelemetry::trace::TraceId> for TraceId { impl From<opentelemetry::trace::TraceId> for TraceId {
fn from(trace_id: opentelemetry::trace::TraceId) -> Self { fn from(trace_id: opentelemetry::trace::TraceId) -> Self {
Self::from(trace_id.to_u128()) Self::from(u128::from_be_bytes(trace_id.to_bytes()))
} }
} }
impl From<TraceId> for opentelemetry::trace::TraceId { impl From<TraceId> for opentelemetry::trace::TraceId {
fn from(trace_id: TraceId) -> Self { fn from(trace_id: TraceId) -> Self {
Self::from_u128(trace_id.into()) Self::from_bytes(u128::from(trace_id).to_be_bytes())
} }
} }
impl From<opentelemetry::trace::SpanId> for SpanId { impl From<opentelemetry::trace::SpanId> for SpanId {
fn from(span_id: opentelemetry::trace::SpanId) -> Self { fn from(span_id: opentelemetry::trace::SpanId) -> Self {
Self::from(span_id.to_u64()) Self::from(u64::from_be_bytes(span_id.to_bytes()))
} }
} }
impl From<SpanId> for opentelemetry::trace::SpanId { impl From<SpanId> for opentelemetry::trace::SpanId {
fn from(span_id: SpanId) -> Self { fn from(span_id: SpanId) -> Self {
Self::from_u64(span_id.0) Self::from_bytes(u64::from(span_id).to_be_bytes())
} }
} }

View File

@@ -181,7 +181,7 @@ mod tests {
future::ready(request.parse::<u64>().map_err(|_| { future::ready(request.parse::<u64>().map_err(|_| {
io::Error::new( io::Error::new(
io::ErrorKind::InvalidInput, io::ErrorKind::InvalidInput,
format!("{:?} is not an int", request), format!("{request:?} is not an int"),
) )
})) }))
}), }),

View File

@@ -38,11 +38,34 @@ where
H: BuildHasher, H: BuildHasher,
{ {
fn compact(&mut self, usage_ratio_threshold: f64) { fn compact(&mut self, usage_ratio_threshold: f64) {
if self.capacity() > 1000 { let usage_ratio_threshold = usage_ratio_threshold.clamp(f64::MIN_POSITIVE, 1.);
let usage_ratio = self.len() as f64 / self.capacity() as f64; let cap = f64::max(1000., self.len() as f64 / usage_ratio_threshold);
if usage_ratio < usage_ratio_threshold { self.shrink_to(cap as usize);
self.shrink_to_fit();
}
}
} }
} }
#[test]
fn test_compact() {
let mut map = HashMap::with_capacity(2048);
assert_eq!(map.capacity(), 3584);
// Make usage ratio 25%
for i in 0..896 {
map.insert(format!("k{i}"), "v");
}
map.compact(-1.0);
assert_eq!(map.capacity(), 3584);
map.compact(0.25);
assert_eq!(map.capacity(), 3584);
map.compact(0.50);
assert_eq!(map.capacity(), 1792);
map.compact(1.0);
assert_eq!(map.capacity(), 1792);
map.compact(2.0);
assert_eq!(map.capacity(), 1792);
}

View File

@@ -7,8 +7,8 @@ struct HelloServer;
#[tarpc::server] #[tarpc::server]
impl World for HelloServer { impl World for HelloServer {
fn hello(name: String) -> String { fn hello(name: String) -> String {
format!("Hello, {}!", name) format!("Hello, {name}!", name)
} }
} }

View File

@@ -7,5 +7,5 @@ error: not all trait items implemented, missing: `HelloFut`
error: hint: `#[tarpc::server]` only rewrites async fns, and `fn hello` is not async error: hint: `#[tarpc::server]` only rewrites async fns, and `fn hello` is not async
--> $DIR/tarpc_server_missing_async.rs:10:5 --> $DIR/tarpc_server_missing_async.rs:10:5
| |
10 | fn hello(name: String) -> String { 10 | fn hello(name: String) -> String {
| ^^ | ^^

View File

@@ -31,7 +31,7 @@ impl Service for Server {
type HeyFut = Ready<String>; type HeyFut = Ready<String>;
fn hey(self, _: context::Context, name: String) -> Self::HeyFut { fn hey(self, _: context::Context, name: String) -> Self::HeyFut {
ready(format!("Hey, {}.", name)) ready(format!("Hey, {name}."))
} }
} }