feat: re-export used packages (#371)

## Problem
Library users might get stuck with or ran into issues while using tarpc because of incompatible third party libraries. in particular, tokio_serde and tokio_util.

## Solution
This PR does the following:

1. re-export tokio_serde as part of feature serde-transport, because the end user imports it to use some serde-transport APIs.
2. Update third library packages to latest release and fix resulting issues from that.

## Important Notes
tokio_util 7.3 DelayQueue::poll_expired API changed [0] therefore, InFlightRequests::poll_expired now returns Poll<Option<u64>>

[0] https://docs.rs/tokio-util/latest/tokio_util/time/delay_queue/struct.DelayQueue.html#method.poll_expired
This commit is contained in:
kkharji
2022-07-15 20:14:49 +03:00
committed by GitHub
parent 4c8ba41b2f
commit 0e102288a5
10 changed files with 22 additions and 34 deletions

View File

@@ -18,14 +18,14 @@ anyhow = "1.0"
clap = { version = "3.0.0-rc.9", features = ["derive"] }
log = "0.4"
futures = "0.3"
opentelemetry = { version = "0.16", features = ["rt-tokio"] }
opentelemetry-jaeger = { version = "0.15", features = ["rt-tokio"] }
opentelemetry = { version = "0.17", features = ["rt-tokio"] }
opentelemetry-jaeger = { version = "0.16", features = ["rt-tokio"] }
rand = "0.8"
tarpc = { version = "0.29", path = "../tarpc", features = ["full"] }
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread"] }
tracing = { version = "0.1" }
tracing-opentelemetry = "0.15"
tracing-subscriber = "0.2"
tracing-opentelemetry = "0.17"
tracing-subscriber = {version = "0.3", features = ["env-filter"]}
[lib]
name = "service"

View File

@@ -50,7 +50,7 @@ static_assertions = "1.1.0"
tarpc-plugins = { path = "../plugins", version = "0.12" }
thiserror = "1.0"
tokio = { version = "1", features = ["time"] }
tokio-util = { version = "0.6.9", features = ["time"] }
tokio-util = { version = "0.7.3", features = ["time"] }
tokio-serde = { optional = true, version = "0.8" }
tracing = { version = "0.1", default-features = false, features = [
"attributes",

View File

@@ -1,8 +1,9 @@
use tarpc::context::Context;
use tarpc::serde_transport as transport;
use tarpc::server::{BaseChannel, Channel};
use tarpc::{context::Context, tokio_serde::formats::Bincode};
use tarpc::tokio_serde::formats::Bincode;
use tarpc::tokio_util::codec::length_delimited::LengthDelimitedCodec;
use tokio::net::{UnixListener, UnixStream};
use tokio_util::codec::length_delimited::LengthDelimitedCodec;
#[tarpc::service]
pub trait PingService {

View File

@@ -52,9 +52,9 @@ use tarpc::{
client, context,
serde_transport::tcp,
server::{self, Channel},
tokio_serde::formats::Json,
};
use tokio::net::ToSocketAddrs;
use tokio_serde::formats::Json;
use tracing::info;
use tracing_subscriber::prelude::*;

View File

@@ -9,8 +9,8 @@ use futures::{future, prelude::*};
use tarpc::{
client, context,
server::{incoming::Incoming, BaseChannel},
tokio_serde::formats::Json,
};
use tokio_serde::formats::Json;
use tracing_subscriber::prelude::*;
pub mod add {

View File

@@ -395,11 +395,7 @@ where
// Receiving Poll::Ready(None) when polling expired requests never indicates "Closed",
// because there can temporarily be zero in-flight rquests. Therefore, there is no need to
// track the status like is done with pending and cancelled requests.
if let Poll::Ready(Some(_)) = self
.in_flight_requests()
.poll_expired(cx)
.map_err(ChannelError::Timer)?
{
if let Poll::Ready(Some(_)) = self.in_flight_requests().poll_expired(cx) {
// Expired requests are considered complete; there is no compelling reason to send a
// cancellation message to the server, since it will have already exhausted its
// allotted processing time.

View File

@@ -117,12 +117,9 @@ impl<Resp> InFlightRequests<Resp> {
/// Yields a request that has expired, completing it with a TimedOut error.
/// The caller should send cancellation messages for any yielded request ID.
pub fn poll_expired(
&mut self,
cx: &mut Context,
) -> Poll<Option<Result<u64, tokio::time::error::Error>>> {
self.deadlines.poll_expired(cx).map_ok(|expired| {
let request_id = expired.into_inner();
pub fn poll_expired(&mut self, cx: &mut Context) -> Poll<Option<u64>> {
self.deadlines.poll_expired(cx).map(|expired| {
let request_id = expired?.into_inner();
if let Some(request_data) = self.request_data.remove(&request_id) {
let _entered = request_data.span.enter();
tracing::error!("DeadlineExceeded");
@@ -131,7 +128,7 @@ impl<Resp> InFlightRequests<Resp> {
.response_completion
.send(Err(DeadlineExceededError));
}
request_id
Some(request_id)
})
}
}

View File

@@ -209,7 +209,7 @@
pub use serde;
#[cfg(feature = "serde-transport")]
pub use tokio_serde;
pub use {tokio_serde, tokio_util};
#[cfg(feature = "serde-transport")]
#[cfg_attr(docsrs, doc(cfg(feature = "serde-transport")))]

View File

@@ -393,11 +393,7 @@ where
Poll::Pending | Poll::Ready(None) => Closed,
};
let expiration_status = match self
.in_flight_requests_mut()
.poll_expired(cx)
.map_err(ChannelError::Timer)?
{
let expiration_status = match self.in_flight_requests_mut().poll_expired(cx) {
// No need to send a response, since the client wouldn't be waiting for one
// anymore.
Poll::Ready(Some(_)) => Ready,

View File

@@ -94,16 +94,14 @@ impl InFlightRequests {
}
/// Yields a request that has expired, aborting any ongoing processing of that request.
pub fn poll_expired(
&mut self,
cx: &mut Context,
) -> Poll<Option<Result<u64, tokio::time::error::Error>>> {
pub fn poll_expired(&mut self, cx: &mut Context) -> Poll<Option<u64>> {
if self.deadlines.is_empty() {
// TODO(https://github.com/tokio-rs/tokio/issues/4161)
// This is a workaround for DelayQueue not always treating this case correctly.
return Poll::Ready(None);
}
self.deadlines.poll_expired(cx).map_ok(|expired| {
self.deadlines.poll_expired(cx).map(|expired| {
let expired = expired?;
if let Some(RequestData {
abort_handle, span, ..
}) = self.request_data.remove(expired.get_ref())
@@ -113,7 +111,7 @@ impl InFlightRequests {
abort_handle.abort();
tracing::error!("DeadlineExceeded");
}
expired.into_inner()
Some(expired.into_inner())
})
}
}
@@ -161,7 +159,7 @@ mod tests {
assert_matches!(
in_flight_requests.poll_expired(&mut noop_context()),
Poll::Ready(Some(Ok(_)))
Poll::Ready(Some(_))
);
assert_matches!(
abortable_future.poll_unpin(&mut noop_context()),