5 Commits

Author SHA1 Message Date
Tim Kuehn
9e4c3a2b8b Fix poll_expired incorrectly returning Pending when there are no outstanding requests. 2021-10-08 22:01:57 -07:00
Tim Kuehn
d78b24b631 Assert that poll_expired yields None when DelayQueue is empty 2021-10-08 21:28:43 -07:00
Tim Kuehn
49900d7a35 Close TcpStream when client disconnects 2021-10-08 20:10:16 -07:00
Tim Kuehn
1e680e3a5a Fix typos in docs.
Fixes https://github.com/google/tarpc/issues/352.
2021-10-08 19:19:50 -07:00
Tim Kuehn
2591d21e94 Update release notes to mention io::Error = 2021-09-23 13:57:43 -07:00
7 changed files with 71 additions and 7 deletions

View File

@@ -40,7 +40,7 @@ rather than in a separate language such as .proto. This means there's no separat
process, and no context switching between different languages.
Some other features of tarpc:
- Pluggable transport: any type implementing `Stream<Item = Request> + Sink<Response>` can be
- Pluggable transport: any type impling `Stream<Item = Request> + Sink<Response>` can be
used as a transport to connect the client and server.
- `Send + 'static` optional: if the transport doesn't require it, neither does tarpc!
- Cascading cancellation: dropping a request will send a cancellation message to the server.
@@ -55,7 +55,7 @@ Some other features of tarpc:
[tracing](https://github.com/tokio-rs/tracing) primitives extended with
[OpenTelemetry](https://opentelemetry.io/) traces. Using a compatible tracing subscriber like
[Jaeger](https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-jaeger),
each RPC can be traced through the client, server, and other dependencies downstream of the
each RPC can be traced through the client, server, amd other dependencies downstream of the
server. Even for applications not connected to a distributed tracing collector, the
instrumentation can also be ingested by regular loggers like
[env_logger](https://github.com/env-logger-rs/env_logger/).
@@ -81,7 +81,7 @@ your `Cargo.toml`:
```toml
anyhow = "1.0"
futures = "1.0"
futures = "0.3"
tarpc = { version = "0.27", features = ["tokio1"] }
tokio = { version = "1.0", features = ["macros"] }
```

View File

@@ -2,6 +2,28 @@
### Breaking Changes
### RPC error type is changing
RPC return types are changing from `Result<Response, io::Error>` to `Result<Response,
tarpc::client::RpcError>`.
Becaue tarpc is a library, not an application, it should strive to
use structured errors in its API so that users have maximal flexibility
in how they handle errors. io::Error makes that hard, because it is a
kitchen-sink error type.
RPCs in particular only have 3 classes of errors:
- The connection breaks.
- The request expires.
- The server decides not to process the request.
RPC responses can also contain application-specific errors, but from the
perspective of the RPC library, those are opaque to the framework, classified
as successful responsees.
### Open Telemetry
The Opentelemetry dependency is updated to version 0.16.x.
## 0.27.0 (2021-09-22)

View File

@@ -291,6 +291,9 @@ where
/// Could not flush the transport.
#[error("could not flush the transport")]
Flush(#[source] E),
/// Could not close the write end of the transport.
#[error("could not close the write end of the transport")]
Close(#[source] E),
/// Could not poll expired requests.
#[error("could not poll expired requests")]
Timer(#[source] tokio::time::error::Error),
@@ -335,6 +338,15 @@ where
.map_err(ChannelError::Flush)
}
fn poll_close<'a>(
self: &'a mut Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), ChannelError<C::Error>>> {
self.transport_pin_mut()
.poll_close(cx)
.map_err(ChannelError::Close)
}
fn canceled_requests_mut<'a>(self: &'a mut Pin<&mut Self>) -> &'a mut CanceledRequests {
self.as_mut().project().canceled_requests
}
@@ -394,7 +406,7 @@ where
match (pending_requests_status, canceled_requests_status) {
(ReceiverStatus::Closed, ReceiverStatus::Closed) => {
ready!(self.poll_flush(cx)?);
ready!(self.poll_close(cx)?);
Poll::Ready(None)
}
(ReceiverStatus::Pending, _) | (_, ReceiverStatus::Pending) => {

View File

@@ -68,7 +68,7 @@
//!
//! ```toml
//! anyhow = "1.0"
//! futures = "1.0"
//! futures = "0.3"
//! tarpc = { version = "0.27", features = ["tokio1"] }
//! tokio = { version = "1.0", features = ["macros"] }
//! ```

View File

@@ -333,6 +333,7 @@ where
type Item = Result<TrackedRequest<Req>, ChannelError<T::Error>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
#[derive(Debug)]
enum ReceiverStatus {
Ready,
Pending,
@@ -388,6 +389,11 @@ where
Poll::Pending => Pending,
};
tracing::trace!(
"Expired requests: {:?}, Inbound: {:?}",
expiration_status,
request_status
);
match (expiration_status, request_status) {
(Ready, _) | (_, Ready) => continue,
(Closed, Closed) => return Poll::Ready(None),

View File

@@ -98,6 +98,11 @@ impl InFlightRequests {
&mut self,
cx: &mut Context,
) -> Poll<Option<Result<u64, tokio::time::error::Error>>> {
if self.deadlines.is_empty() {
// TODO(https://github.com/tokio-rs/tokio/issues/4161)
// This is a workaround for DelayQueue not always treating this case correctly.
return Poll::Ready(None);
}
self.deadlines.poll_expired(cx).map_ok(|expired| {
if let Some(RequestData {
abort_handle, span, ..
@@ -184,12 +189,31 @@ mod tests {
#[tokio::test]
async fn remove_request_doesnt_abort() {
let mut in_flight_requests = InFlightRequests::default();
assert!(in_flight_requests.deadlines.is_empty());
let abort_registration = in_flight_requests
.start_request(0, SystemTime::now(), Span::current())
.start_request(
0,
SystemTime::now() + std::time::Duration::from_secs(10),
Span::current(),
)
.unwrap();
let mut abortable_future = Box::new(Abortable::new(pending::<()>(), abort_registration));
// Precondition: Pending expiration
assert_matches!(
in_flight_requests.poll_expired(&mut noop_context()),
Poll::Pending
);
assert!(!in_flight_requests.deadlines.is_empty());
assert_matches!(in_flight_requests.remove_request(0), Some(_));
// Postcondition: No pending expirations
assert!(in_flight_requests.deadlines.is_empty());
assert_matches!(
in_flight_requests.poll_expired(&mut noop_context()),
Poll::Ready(None)
);
assert_matches!(
abortable_future.poll_unpin(&mut noop_context()),
Poll::Pending

View File

@@ -8,7 +8,7 @@ use std::{fmt, hash::Hash};
#[cfg(feature = "tokio1")]
use super::{tokio::TokioServerExecutor, Serve};
/// An extension trait for [streams](Stream) of [`Channels`](Channel).
/// An extension trait for [streams](futures::prelude::Stream) of [`Channels`](Channel).
pub trait Incoming<C>
where
Self: Sized + Stream<Item = C>,