18 Commits

Author SHA1 Message Date
Tim Kuehn
a3a6404a30 Prepare release of 0.28.0 2022-04-06 22:07:07 -07:00
Tim Kuehn
b36eac80b1 Bump minimum rust version to 1.58.0 2022-04-06 21:53:56 -07:00
Bruno
d7070e4bc3 Update opentelemetry and related dependencies (#362) 2022-04-03 14:09:14 -07:00
Tim Kuehn
b5d1828308 Use captured identifiers in format strings.
This was stabilized in Rust 1.58.0: https://blog.rust-lang.org/2022/01/13/Rust-1.58.0.html
2022-01-13 15:00:44 -08:00
Zak Cutner
92cfe63c4f Use single-threaded Tokio runtime (#360) 2022-01-06 20:59:51 -08:00
Tim Kuehn
839a2f067c Update example to latest version of Clap 2021-12-27 22:56:17 -08:00
David Kleingeld
b5d593488c Derive more traits for RpcError (#359)
Makes RpcError derive Clone, PartialEq, Eq, Hash, Serialize and Deserialize.
2021-12-27 22:00:58 -08:00
Tim Kuehn
eea38b8bf4 Simplify code with const assert!.
The code that prevents compilation on systems where usize is larger than
u64 previously used a const index-out-of-bounds trick. That code can now
be replaced with assert!, as const panic! has landed in 1.57.0 stable.
2021-12-03 15:20:33 -08:00
Shi Yan
70493c15f4 Fix a compiling issue of the official example (#358)
Fix a compiling issue of the official example because of the following error :

```
error[E0599]: the method `execute` exists for struct `BaseChannel<_, _, UnboundedChannel<ClientMessage<_>, Response<_>>>`, but its trait bounds were not satisfied
  --> src/main.rs:39:25
   |
39 |     tokio::spawn(server.execute(HelloServer.serve()));
   |                         ^^^^^^^ method cannot be called on `BaseChannel<_, _, UnboundedChannel<ClientMessage<_>, Response<_>>>` due to unsatisfied trait bounds
   |
   = note: the following trait bounds were not satisfied:
           `<&BaseChannel<_, _, UnboundedChannel<ClientMessage<_>, Response<_>>> as futures::Stream>::Item = _`
           which is required by `&BaseChannel<_, _, UnboundedChannel<ClientMessage<_>, Response<_>>>: tarpc::server::incoming::Incoming<_>`
           `&BaseChannel<_, _, UnboundedChannel<ClientMessage<_>, Response<_>>>: futures::Stream`
           which is required by `&BaseChannel<_, _, UnboundedChannel<ClientMessage<_>, Response<_>>>: tarpc::server::incoming::Incoming<_>`
   = help: items from traits can only be used if the trait is in scope
help: the following trait is implemented but not in scope; perhaps add a `use` for it:
   |
1  | use tarpc::server::Channel;
   |
```

See https://github.com/google/tarpc/pull/358#issuecomment-981953193 for the root cause.
2021-11-29 17:01:16 -08:00
baptiste0928
f7c5d6a7c3 Fix example-service (#355)
Fixes the compilation of the example-service crate (the Clap trait has been renamed Parser in clap-rs/clap@d840d56).
2021-11-15 08:39:04 -08:00
Scott Kirkpatrick
98c5d2a18b Re-add typo fixes (#353)
The typo fixes that were added by commit b5d9aaa
were accidentally reverted by commit 1e680e3, this
will add them back
2021-11-08 10:07:21 -08:00
Tim Kuehn
46b534f7c6 Use HashMap::shrink_to in impl of Comapct::compact. 2021-10-21 17:03:57 -07:00
Tim Kuehn
42b4fc52b1 Set rust-version to 1.56 2021-10-21 16:08:15 -07:00
Tim Kuehn
350dbcdad0 Upgrade to Rust 2021! 2021-10-21 14:10:21 -07:00
Tim Kuehn
b1b4461d89 Prepare release of 0.27.2 2021-10-08 22:31:56 -07:00
Tim Kuehn
f694b7573a Close TcpStream when client disconnects.
An attempt at a clean shutdown helps the server to drop its connections
more quickly.

Testing this uncovered a latent bug in DelayQueue wherein `poll_expired`
yields `Pending` when empty. A workaround was added to
`InFlightRequests::poll_expired`: check if there are actually any
outstanding requests before calling `DelayQueue::poll_expired`.
2021-10-08 22:13:24 -07:00
Tim Kuehn
1e680e3a5a Fix typos in docs.
Fixes https://github.com/google/tarpc/issues/352.
2021-10-08 19:19:50 -07:00
Tim Kuehn
2591d21e94 Update release notes to mention io::Error = 2021-09-23 13:57:43 -07:00
22 changed files with 191 additions and 75 deletions

View File

@@ -67,7 +67,7 @@ Some other features of tarpc:
Add to your `Cargo.toml` dependencies:
```toml
tarpc = "0.27"
tarpc = "0.28"
```
The `tarpc::service` attribute expands to a collection of items that form an rpc service.
@@ -81,8 +81,8 @@ your `Cargo.toml`:
```toml
anyhow = "1.0"
futures = "1.0"
tarpc = { version = "0.27", features = ["tokio1"] }
futures = "0.3"
tarpc = { version = "0.28", features = ["tokio1"] }
tokio = { version = "1.0", features = ["macros"] }
```
@@ -100,7 +100,7 @@ use futures::{
};
use tarpc::{
client, context,
server::{self, incoming::Incoming},
server::{self, incoming::Incoming, Channel},
};
// This is the service definition. It looks a lot like a trait definition.
@@ -128,7 +128,7 @@ impl World for HelloServer {
type HelloFut = Ready<String>;
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
future::ready(format!("Hello, {}!", name))
future::ready(format!("Hello, {name}!"))
}
}
```
@@ -155,7 +155,7 @@ async fn main() -> anyhow::Result<()> {
// specifies a deadline and trace information which can be helpful in debugging requests.
let hello = client.hello(context::current(), "Stim".to_string()).await?;
println!("{}", hello);
println!("{hello}");
Ok(())
}

View File

@@ -1,7 +1,43 @@
## 0.28.0 (2022-04-06)
### Breaking Changes
- The minimum supported Rust version has increased to 1.58.0.
- The version of opentelemetry depended on by tarpc has increased to 0.17.0.
## 0.27.2 (2021-10-08)
### Fixes
Clients will now close their transport before dropping it. An attempt at a clean shutdown can help
the server drop its connections more quickly.
## 0.27.1 (2021-09-22)
### Breaking Changes
### RPC error type is changing
RPC return types are changing from `Result<Response, io::Error>` to `Result<Response,
tarpc::client::RpcError>`.
Becaue tarpc is a library, not an application, it should strive to
use structured errors in its API so that users have maximal flexibility
in how they handle errors. io::Error makes that hard, because it is a
kitchen-sink error type.
RPCs in particular only have 3 classes of errors:
- The connection breaks.
- The request expires.
- The server decides not to process the request.
RPC responses can also contain application-specific errors, but from the
perspective of the RPC library, those are opaque to the framework, classified
as successful responsees.
### Open Telemetry
The Opentelemetry dependency is updated to version 0.16.x.
## 0.27.0 (2021-09-22)

View File

@@ -1,8 +1,9 @@
[package]
name = "tarpc-example-service"
version = "0.10.0"
rust-version = "1.56"
authors = ["Tim Kuehn <tikue@google.com>"]
edition = "2018"
edition = "2021"
license = "MIT"
documentation = "https://docs.rs/tarpc-example-service"
homepage = "https://github.com/google/tarpc"
@@ -14,13 +15,13 @@ description = "An example server built on tarpc."
[dependencies]
anyhow = "1.0"
clap = "3.0.0-beta.2"
clap = { version = "3.0.0-rc.9", features = ["derive"] }
log = "0.4"
futures = "0.3"
opentelemetry = { version = "0.16", features = ["rt-tokio"] }
opentelemetry-jaeger = { version = "0.15", features = ["rt-tokio"] }
rand = "0.8"
tarpc = { version = "0.27", path = "../tarpc", features = ["full"] }
tarpc = { version = "0.28", path = "../tarpc", features = ["full"] }
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread"] }
tracing = { version = "0.1" }
tracing-opentelemetry = "0.15"

View File

@@ -4,14 +4,14 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
use clap::Clap;
use clap::Parser;
use service::{init_tracing, WorldClient};
use std::{net::SocketAddr, time::Duration};
use tarpc::{client, context, tokio_serde::formats::Json};
use tokio::time::sleep;
use tracing::Instrument;
#[derive(Clap)]
#[derive(Parser)]
struct Flags {
/// Sets the server address to connect to.
#[clap(long)]

View File

@@ -4,7 +4,7 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
use clap::Clap;
use clap::Parser;
use futures::{future, prelude::*};
use rand::{
distributions::{Distribution, Uniform},
@@ -22,7 +22,7 @@ use tarpc::{
};
use tokio::time;
#[derive(Clap)]
#[derive(Parser)]
struct Flags {
/// Sets the port number to listen on.
#[clap(long)]
@@ -40,7 +40,7 @@ impl World for HelloServer {
let sleep_time =
Duration::from_millis(Uniform::new_inclusive(1, 10).sample(&mut thread_rng()));
time::sleep(sleep_time).await;
format!("Hello, {}! You are connected from {}", name, self.0)
format!("Hello, {name}! You are connected from {}", self.0)
}
}

View File

@@ -1,8 +1,9 @@
[package]
name = "tarpc-plugins"
version = "0.12.0"
rust-version = "1.56"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
edition = "2018"
edition = "2021"
license = "MIT"
documentation = "https://docs.rs/tarpc-plugins"
homepage = "https://github.com/google/tarpc"

View File

@@ -83,7 +83,7 @@ impl Parse for Service {
ident_errors,
syn::Error::new(
rpc.ident.span(),
format!("method name conflicts with generated fn `{}::serve`", ident)
format!("method name conflicts with generated fn `{ident}::serve`")
)
);
}
@@ -270,7 +270,7 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
let methods = rpcs.iter().map(|rpc| &rpc.ident).collect::<Vec<_>>();
let request_names = methods
.iter()
.map(|m| format!("{}.{}", ident, m))
.map(|m| format!("{ident}.{m}"))
.collect::<Vec<_>>();
ServiceGenerator {
@@ -306,7 +306,7 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
.collect::<Vec<_>>(),
future_types: &camel_case_fn_names
.iter()
.map(|name| parse_str(&format!("{}Fut", name)).unwrap())
.map(|name| parse_str(&format!("{name}Fut")).unwrap())
.collect::<Vec<_>>(),
derive_serialize: derive_serialize.as_ref(),
}
@@ -409,7 +409,7 @@ fn verify_types_were_provided(
if !provided.iter().any(|typedecl| typedecl.ident == expected) {
let mut e = syn::Error::new(
span,
format!("not all trait items implemented, missing: `{}`", expected),
format!("not all trait items implemented, missing: `{expected}`"),
);
let fn_span = method.sig.fn_token.span();
e.extend(syn::Error::new(
@@ -479,7 +479,7 @@ impl<'a> ServiceGenerator<'a> {
),
output,
)| {
let ty_doc = format!("The response future returned by [`{}::{}`].", service_ident, ident);
let ty_doc = format!("The response future returned by [`{service_ident}::{ident}`].");
quote! {
#[doc = #ty_doc]
type #future_type: std::future::Future<Output = #output>;

View File

@@ -1,8 +1,12 @@
[package]
name = "tarpc"
version = "0.27.1"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
edition = "2018"
version = "0.28.0"
rust-version = "1.58.0"
authors = [
"Adam Wright <adam.austin.wright@gmail.com>",
"Tim Kuehn <timothy.j.kuehn@gmail.com>",
]
edition = "2021"
license = "MIT"
documentation = "https://docs.rs/tarpc"
homepage = "https://github.com/google/tarpc"
@@ -16,13 +20,20 @@ description = "An RPC framework for Rust with a focus on ease of use."
default = []
serde1 = ["tarpc-plugins/serde1", "serde", "serde/derive"]
tokio1 = ["tokio/rt-multi-thread"]
tokio1 = ["tokio/rt"]
serde-transport = ["serde1", "tokio1", "tokio-serde", "tokio-util/codec"]
serde-transport-json = ["tokio-serde/json"]
serde-transport-bincode = ["tokio-serde/bincode"]
tcp = ["tokio/net"]
full = ["serde1", "tokio1", "serde-transport", "serde-transport-json", "serde-transport-bincode", "tcp"]
full = [
"serde1",
"tokio1",
"serde-transport",
"serde-transport-json",
"serde-transport-bincode",
"tcp",
]
[badges]
travis-ci = { repository = "google/tarpc" }
@@ -39,11 +50,14 @@ static_assertions = "1.1.0"
tarpc-plugins = { path = "../plugins", version = "0.12" }
thiserror = "1.0"
tokio = { version = "1", features = ["time"] }
tokio-util = { version = "0.6.3", features = ["time"] }
tokio-util = { version = "0.6.9", features = ["time"] }
tokio-serde = { optional = true, version = "0.8" }
tracing = { version = "0.1", default-features = false, features = ["attributes", "log"] }
tracing-opentelemetry = { version = "0.15", default-features = false }
opentelemetry = { version = "0.16", default-features = false }
tracing = { version = "0.1", default-features = false, features = [
"attributes",
"log",
] }
tracing-opentelemetry = { version = "0.17.2", default-features = false }
opentelemetry = { version = "0.17.0", default-features = false }
[dev-dependencies]
@@ -52,11 +66,13 @@ bincode = "1.3"
bytes = { version = "1", features = ["serde"] }
flate2 = "1.0"
futures-test = "0.3"
opentelemetry = { version = "0.16", default-features = false, features = ["rt-tokio"] }
opentelemetry-jaeger = { version = "0.15", features = ["rt-tokio"] }
opentelemetry = { version = "0.17.0", default-features = false, features = [
"rt-tokio",
] }
opentelemetry-jaeger = { version = "0.16.0", features = ["rt-tokio"] }
pin-utils = "0.1.0-alpha"
serde_bytes = "0.11"
tracing-subscriber = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tokio = { version = "1", features = ["full", "test-util"] }
tokio-serde = { version = "0.8", features = ["json", "bincode"] }
trybuild = "1.0"

View File

@@ -54,7 +54,7 @@ where
if algorithm != CompressionAlgorithm::Deflate {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Compression algorithm {:?} not supported", algorithm),
format!("Compression algorithm {algorithm:?} not supported"),
));
}
let mut deflater = DeflateDecoder::new(payload.as_slice());
@@ -102,7 +102,7 @@ struct HelloServer;
#[tarpc::server]
impl World for HelloServer {
async fn hello(self, _: context::Context, name: String) -> String {
format!("Hey, {}!", name)
format!("Hey, {name}!")
}
}

View File

@@ -290,7 +290,7 @@ fn init_tracing(service_name: &str) -> anyhow::Result<()> {
.install_batch(opentelemetry::runtime::Tokio)?;
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(tracing_subscriber::filter::EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer())
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.try_init()?;

View File

@@ -29,7 +29,7 @@ impl World for HelloServer {
type HelloFut = Ready<String>;
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
future::ready(format!("Hello, {}!", name))
future::ready(format!("Hello, {name}!"))
}
}
@@ -49,7 +49,7 @@ async fn main() -> anyhow::Result<()> {
// specifies a deadline and trace information which can be helpful in debugging requests.
let hello = client.hello(context::current(), "Stim".to_string()).await?;
println!("{}", hello);
println!("{hello}");
Ok(())
}

View File

@@ -81,14 +81,10 @@ impl<C, D> fmt::Debug for NewClient<C, D> {
}
}
#[allow(dead_code)]
#[allow(clippy::no_effect)]
const CHECK_USIZE: () = {
if std::mem::size_of::<usize>() > std::mem::size_of::<u64>() {
// TODO: replace this with panic!() as soon as RFC 2345 gets stabilized
["usize is too big to fit in u64"][42];
}
};
const _CHECK_USIZE: () = assert!(
std::mem::size_of::<usize>() <= std::mem::size_of::<u64>(),
"usize is too big to fit in u64"
);
/// Handles communication from the client to request dispatch.
#[derive(Debug)]
@@ -172,7 +168,8 @@ struct ResponseGuard<'a, Resp> {
/// An error that can occur in the processing of an RPC. This is not request-specific errors but
/// rather cross-cutting errors that can always occur.
#[derive(thiserror::Error, Debug)]
#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub enum RpcError {
/// The client disconnected from the server.
#[error("the client disconnected from the server")]
@@ -291,6 +288,9 @@ where
/// Could not flush the transport.
#[error("could not flush the transport")]
Flush(#[source] E),
/// Could not close the write end of the transport.
#[error("could not close the write end of the transport")]
Close(#[source] E),
/// Could not poll expired requests.
#[error("could not poll expired requests")]
Timer(#[source] tokio::time::error::Error),
@@ -335,6 +335,15 @@ where
.map_err(ChannelError::Flush)
}
fn poll_close<'a>(
self: &'a mut Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), ChannelError<C::Error>>> {
self.transport_pin_mut()
.poll_close(cx)
.map_err(ChannelError::Close)
}
fn canceled_requests_mut<'a>(self: &'a mut Pin<&mut Self>) -> &'a mut CanceledRequests {
self.as_mut().project().canceled_requests
}
@@ -394,7 +403,7 @@ where
match (pending_requests_status, canceled_requests_status) {
(ReceiverStatus::Closed, ReceiverStatus::Closed) => {
ready!(self.poll_flush(cx)?);
ready!(self.poll_close(cx)?);
Poll::Ready(None)
}
(ReceiverStatus::Pending, _) | (_, ReceiverStatus::Pending) => {

View File

@@ -27,7 +27,7 @@
//! process, and no context switching between different languages.
//!
//! Some other features of tarpc:
//! - Pluggable transport: any type impling `Stream<Item = Request> + Sink<Response>` can be
//! - Pluggable transport: any type implementing `Stream<Item = Request> + Sink<Response>` can be
//! used as a transport to connect the client and server.
//! - `Send + 'static` optional: if the transport doesn't require it, neither does tarpc!
//! - Cascading cancellation: dropping a request will send a cancellation message to the server.
@@ -42,7 +42,7 @@
//! [tracing](https://github.com/tokio-rs/tracing) primitives extended with
//! [OpenTelemetry](https://opentelemetry.io/) traces. Using a compatible tracing subscriber like
//! [Jaeger](https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-jaeger),
//! each RPC can be traced through the client, server, amd other dependencies downstream of the
//! each RPC can be traced through the client, server, and other dependencies downstream of the
//! server. Even for applications not connected to a distributed tracing collector, the
//! instrumentation can also be ingested by regular loggers like
//! [env_logger](https://github.com/env-logger-rs/env_logger/).
@@ -54,7 +54,7 @@
//! Add to your `Cargo.toml` dependencies:
//!
//! ```toml
//! tarpc = "0.27"
//! tarpc = "0.28"
//! ```
//!
//! The `tarpc::service` attribute expands to a collection of items that form an rpc service.
@@ -68,8 +68,8 @@
//!
//! ```toml
//! anyhow = "1.0"
//! futures = "1.0"
//! tarpc = { version = "0.27", features = ["tokio1"] }
//! futures = "0.3"
//! tarpc = { version = "0.28", features = ["tokio1"] }
//! tokio = { version = "1.0", features = ["macros"] }
//! ```
//!
@@ -88,7 +88,7 @@
//! };
//! use tarpc::{
//! client, context,
//! server::{self, incoming::Incoming},
//! server::{self, incoming::Incoming, Channel},
//! };
//!
//! // This is the service definition. It looks a lot like a trait definition.
@@ -132,7 +132,7 @@
//! type HelloFut = Ready<String>;
//!
//! fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
//! future::ready(format!("Hello, {}!", name))
//! future::ready(format!("Hello, {name}!"))
//! }
//! }
//! ```
@@ -168,7 +168,7 @@
//! # // an associated type representing the future output by the fn.
//! # type HelloFut = Ready<String>;
//! # fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
//! # future::ready(format!("Hello, {}!", name))
//! # future::ready(format!("Hello, {name}!"))
//! # }
//! # }
//! # #[cfg(not(feature = "tokio1"))]
@@ -190,7 +190,7 @@
//! // specifies a deadline and trace information which can be helpful in debugging requests.
//! let hello = client.hello(context::current(), "Stim".to_string()).await?;
//!
//! println!("{}", hello);
//! println!("{hello}");
//!
//! Ok(())
//! }
@@ -264,7 +264,7 @@ pub use tarpc_plugins::service;
/// #[tarpc::server]
/// impl World for HelloServer {
/// async fn hello(self, _: context::Context, name: String) -> String {
/// format!("Hello, {}! You are connected from {:?}.", name, self.0)
/// format!("Hello, {name}! You are connected from {:?}.", self.0)
/// }
/// }
/// ```
@@ -290,7 +290,7 @@ pub use tarpc_plugins::service;
/// fn hello(self, _: context::Context, name: String) -> Pin<Box<dyn Future<Output = String>
/// + Send>> {
/// Box::pin(async move {
/// format!("Hello, {}! You are connected from {:?}.", name, self.0)
/// format!("Hello, {name}! You are connected from {:?}.", self.0)
/// })
/// }
/// }

View File

@@ -333,6 +333,7 @@ where
type Item = Result<TrackedRequest<Req>, ChannelError<T::Error>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
#[derive(Debug)]
enum ReceiverStatus {
Ready,
Pending,
@@ -388,6 +389,11 @@ where
Poll::Pending => Pending,
};
tracing::trace!(
"Expired requests: {:?}, Inbound: {:?}",
expiration_status,
request_status
);
match (expiration_status, request_status) {
(Ready, _) | (_, Ready) => continue,
(Closed, Closed) => return Poll::Ready(None),

View File

@@ -98,6 +98,11 @@ impl InFlightRequests {
&mut self,
cx: &mut Context,
) -> Poll<Option<Result<u64, tokio::time::error::Error>>> {
if self.deadlines.is_empty() {
// TODO(https://github.com/tokio-rs/tokio/issues/4161)
// This is a workaround for DelayQueue not always treating this case correctly.
return Poll::Ready(None);
}
self.deadlines.poll_expired(cx).map_ok(|expired| {
if let Some(RequestData {
abort_handle, span, ..
@@ -184,12 +189,31 @@ mod tests {
#[tokio::test]
async fn remove_request_doesnt_abort() {
let mut in_flight_requests = InFlightRequests::default();
assert!(in_flight_requests.deadlines.is_empty());
let abort_registration = in_flight_requests
.start_request(0, SystemTime::now(), Span::current())
.start_request(
0,
SystemTime::now() + std::time::Duration::from_secs(10),
Span::current(),
)
.unwrap();
let mut abortable_future = Box::new(Abortable::new(pending::<()>(), abort_registration));
// Precondition: Pending expiration
assert_matches!(
in_flight_requests.poll_expired(&mut noop_context()),
Poll::Pending
);
assert!(!in_flight_requests.deadlines.is_empty());
assert_matches!(in_flight_requests.remove_request(0), Some(_));
// Postcondition: No pending expirations
assert!(in_flight_requests.deadlines.is_empty());
assert_matches!(
in_flight_requests.poll_expired(&mut noop_context()),
Poll::Ready(None)
);
assert_matches!(
abortable_future.poll_unpin(&mut noop_context()),
Poll::Pending

View File

@@ -8,7 +8,7 @@ use std::{fmt, hash::Hash};
#[cfg(feature = "tokio1")]
use super::{tokio::TokioServerExecutor, Serve};
/// An extension trait for [streams](Stream) of [`Channels`](Channel).
/// An extension trait for [streams](futures::prelude::Stream) of [`Channels`](Channel).
pub trait Incoming<C>
where
Self: Sized + Stream<Item = C>,

View File

@@ -138,25 +138,25 @@ impl From<u64> for SpanId {
impl From<opentelemetry::trace::TraceId> for TraceId {
fn from(trace_id: opentelemetry::trace::TraceId) -> Self {
Self::from(trace_id.to_u128())
Self::from(u128::from_be_bytes(trace_id.to_bytes()))
}
}
impl From<TraceId> for opentelemetry::trace::TraceId {
fn from(trace_id: TraceId) -> Self {
Self::from_u128(trace_id.into())
Self::from_bytes(u128::from(trace_id).to_be_bytes())
}
}
impl From<opentelemetry::trace::SpanId> for SpanId {
fn from(span_id: opentelemetry::trace::SpanId) -> Self {
Self::from(span_id.to_u64())
Self::from(u64::from_be_bytes(span_id.to_bytes()))
}
}
impl From<SpanId> for opentelemetry::trace::SpanId {
fn from(span_id: SpanId) -> Self {
Self::from_u64(span_id.0)
Self::from_bytes(u64::from(span_id).to_be_bytes())
}
}

View File

@@ -181,7 +181,7 @@ mod tests {
future::ready(request.parse::<u64>().map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!("{:?} is not an int", request),
format!("{request:?} is not an int"),
)
}))
}),

View File

@@ -38,11 +38,34 @@ where
H: BuildHasher,
{
fn compact(&mut self, usage_ratio_threshold: f64) {
if self.capacity() > 1000 {
let usage_ratio = self.len() as f64 / self.capacity() as f64;
if usage_ratio < usage_ratio_threshold {
self.shrink_to_fit();
}
}
let usage_ratio_threshold = usage_ratio_threshold.clamp(f64::MIN_POSITIVE, 1.);
let cap = f64::max(1000., self.len() as f64 / usage_ratio_threshold);
self.shrink_to(cap as usize);
}
}
#[test]
fn test_compact() {
let mut map = HashMap::with_capacity(2048);
assert_eq!(map.capacity(), 3584);
// Make usage ratio 25%
for i in 0..896 {
map.insert(format!("k{i}"), "v");
}
map.compact(-1.0);
assert_eq!(map.capacity(), 3584);
map.compact(0.25);
assert_eq!(map.capacity(), 3584);
map.compact(0.50);
assert_eq!(map.capacity(), 1792);
map.compact(1.0);
assert_eq!(map.capacity(), 1792);
map.compact(2.0);
assert_eq!(map.capacity(), 1792);
}

View File

@@ -7,8 +7,8 @@ struct HelloServer;
#[tarpc::server]
impl World for HelloServer {
fn hello(name: String) -> String {
format!("Hello, {}!", name)
fn hello(name: String) -> String {
format!("Hello, {name}!", name)
}
}

View File

@@ -7,5 +7,5 @@ error: not all trait items implemented, missing: `HelloFut`
error: hint: `#[tarpc::server]` only rewrites async fns, and `fn hello` is not async
--> $DIR/tarpc_server_missing_async.rs:10:5
|
10 | fn hello(name: String) -> String {
10 | fn hello(name: String) -> String {
| ^^

View File

@@ -31,7 +31,7 @@ impl Service for Server {
type HeyFut = Ready<String>;
fn hey(self, _: context::Context, name: String) -> Self::HeyFut {
ready(format!("Hey, {}.", name))
ready(format!("Hey, {name}."))
}
}