diff --git a/example-service/Cargo.toml b/example-service/Cargo.toml index 2b62139..8b5b4f0 100644 --- a/example-service/Cargo.toml +++ b/example-service/Cargo.toml @@ -18,14 +18,14 @@ anyhow = "1.0" clap = { version = "3.0.0-rc.9", features = ["derive"] } log = "0.4" futures = "0.3" -opentelemetry = { version = "0.16", features = ["rt-tokio"] } -opentelemetry-jaeger = { version = "0.15", features = ["rt-tokio"] } +opentelemetry = { version = "0.17", features = ["rt-tokio"] } +opentelemetry-jaeger = { version = "0.16", features = ["rt-tokio"] } rand = "0.8" tarpc = { version = "0.29", path = "../tarpc", features = ["full"] } tokio = { version = "1", features = ["macros", "net", "rt-multi-thread"] } tracing = { version = "0.1" } -tracing-opentelemetry = "0.15" -tracing-subscriber = "0.2" +tracing-opentelemetry = "0.17" +tracing-subscriber = {version = "0.3", features = ["env-filter"]} [lib] name = "service" diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index 4c6e157..e6b723e 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -50,7 +50,7 @@ static_assertions = "1.1.0" tarpc-plugins = { path = "../plugins", version = "0.12" } thiserror = "1.0" tokio = { version = "1", features = ["time"] } -tokio-util = { version = "0.6.9", features = ["time"] } +tokio-util = { version = "0.7.3", features = ["time"] } tokio-serde = { optional = true, version = "0.8" } tracing = { version = "0.1", default-features = false, features = [ "attributes", diff --git a/tarpc/examples/custom_transport.rs b/tarpc/examples/custom_transport.rs index 67b4c0e..e7e2ce3 100644 --- a/tarpc/examples/custom_transport.rs +++ b/tarpc/examples/custom_transport.rs @@ -1,8 +1,9 @@ +use tarpc::context::Context; use tarpc::serde_transport as transport; use tarpc::server::{BaseChannel, Channel}; -use tarpc::{context::Context, tokio_serde::formats::Bincode}; +use tarpc::tokio_serde::formats::Bincode; +use tarpc::tokio_util::codec::length_delimited::LengthDelimitedCodec; use tokio::net::{UnixListener, UnixStream}; -use tokio_util::codec::length_delimited::LengthDelimitedCodec; #[tarpc::service] pub trait PingService { diff --git a/tarpc/examples/pubsub.rs b/tarpc/examples/pubsub.rs index b90aa67..8140cb3 100644 --- a/tarpc/examples/pubsub.rs +++ b/tarpc/examples/pubsub.rs @@ -52,9 +52,9 @@ use tarpc::{ client, context, serde_transport::tcp, server::{self, Channel}, + tokio_serde::formats::Json, }; use tokio::net::ToSocketAddrs; -use tokio_serde::formats::Json; use tracing::info; use tracing_subscriber::prelude::*; diff --git a/tarpc/examples/tracing.rs b/tarpc/examples/tracing.rs index 4b9338b..5b4b8fd 100644 --- a/tarpc/examples/tracing.rs +++ b/tarpc/examples/tracing.rs @@ -9,8 +9,8 @@ use futures::{future, prelude::*}; use tarpc::{ client, context, server::{incoming::Incoming, BaseChannel}, + tokio_serde::formats::Json, }; -use tokio_serde::formats::Json; use tracing_subscriber::prelude::*; pub mod add { diff --git a/tarpc/src/client.rs b/tarpc/src/client.rs index c884abd..8545695 100644 --- a/tarpc/src/client.rs +++ b/tarpc/src/client.rs @@ -395,11 +395,7 @@ where // Receiving Poll::Ready(None) when polling expired requests never indicates "Closed", // because there can temporarily be zero in-flight rquests. Therefore, there is no need to // track the status like is done with pending and cancelled requests. - if let Poll::Ready(Some(_)) = self - .in_flight_requests() - .poll_expired(cx) - .map_err(ChannelError::Timer)? - { + if let Poll::Ready(Some(_)) = self.in_flight_requests().poll_expired(cx) { // Expired requests are considered complete; there is no compelling reason to send a // cancellation message to the server, since it will have already exhausted its // allotted processing time. diff --git a/tarpc/src/client/in_flight_requests.rs b/tarpc/src/client/in_flight_requests.rs index 0758691..a7e5fb5 100644 --- a/tarpc/src/client/in_flight_requests.rs +++ b/tarpc/src/client/in_flight_requests.rs @@ -117,12 +117,9 @@ impl InFlightRequests { /// Yields a request that has expired, completing it with a TimedOut error. /// The caller should send cancellation messages for any yielded request ID. - pub fn poll_expired( - &mut self, - cx: &mut Context, - ) -> Poll>> { - self.deadlines.poll_expired(cx).map_ok(|expired| { - let request_id = expired.into_inner(); + pub fn poll_expired(&mut self, cx: &mut Context) -> Poll> { + self.deadlines.poll_expired(cx).map(|expired| { + let request_id = expired?.into_inner(); if let Some(request_data) = self.request_data.remove(&request_id) { let _entered = request_data.span.enter(); tracing::error!("DeadlineExceeded"); @@ -131,7 +128,7 @@ impl InFlightRequests { .response_completion .send(Err(DeadlineExceededError)); } - request_id + Some(request_id) }) } } diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index ebcbd8f..891efdd 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -209,7 +209,7 @@ pub use serde; #[cfg(feature = "serde-transport")] -pub use tokio_serde; +pub use {tokio_serde, tokio_util}; #[cfg(feature = "serde-transport")] #[cfg_attr(docsrs, doc(cfg(feature = "serde-transport")))] diff --git a/tarpc/src/server.rs b/tarpc/src/server.rs index 52bcd4c..3a0aae5 100644 --- a/tarpc/src/server.rs +++ b/tarpc/src/server.rs @@ -393,11 +393,7 @@ where Poll::Pending | Poll::Ready(None) => Closed, }; - let expiration_status = match self - .in_flight_requests_mut() - .poll_expired(cx) - .map_err(ChannelError::Timer)? - { + let expiration_status = match self.in_flight_requests_mut().poll_expired(cx) { // No need to send a response, since the client wouldn't be waiting for one // anymore. Poll::Ready(Some(_)) => Ready, diff --git a/tarpc/src/server/in_flight_requests.rs b/tarpc/src/server/in_flight_requests.rs index 912243c..07ed1d6 100644 --- a/tarpc/src/server/in_flight_requests.rs +++ b/tarpc/src/server/in_flight_requests.rs @@ -94,16 +94,14 @@ impl InFlightRequests { } /// Yields a request that has expired, aborting any ongoing processing of that request. - pub fn poll_expired( - &mut self, - cx: &mut Context, - ) -> Poll>> { + pub fn poll_expired(&mut self, cx: &mut Context) -> Poll> { if self.deadlines.is_empty() { // TODO(https://github.com/tokio-rs/tokio/issues/4161) // This is a workaround for DelayQueue not always treating this case correctly. return Poll::Ready(None); } - self.deadlines.poll_expired(cx).map_ok(|expired| { + self.deadlines.poll_expired(cx).map(|expired| { + let expired = expired?; if let Some(RequestData { abort_handle, span, .. }) = self.request_data.remove(expired.get_ref()) @@ -113,7 +111,7 @@ impl InFlightRequests { abort_handle.abort(); tracing::error!("DeadlineExceeded"); } - expired.into_inner() + Some(expired.into_inner()) }) } } @@ -161,7 +159,7 @@ mod tests { assert_matches!( in_flight_requests.poll_expired(&mut noop_context()), - Poll::Ready(Some(Ok(_))) + Poll::Ready(Some(_)) ); assert_matches!( abortable_future.poll_unpin(&mut noop_context()),