mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-02-23 15:49:54 +01:00
Compare commits
5 Commits
v0.27.1
...
client-clo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9e4c3a2b8b | ||
|
|
d78b24b631 | ||
|
|
49900d7a35 | ||
|
|
1e680e3a5a | ||
|
|
2591d21e94 |
@@ -40,7 +40,7 @@ rather than in a separate language such as .proto. This means there's no separat
|
|||||||
process, and no context switching between different languages.
|
process, and no context switching between different languages.
|
||||||
|
|
||||||
Some other features of tarpc:
|
Some other features of tarpc:
|
||||||
- Pluggable transport: any type implementing `Stream<Item = Request> + Sink<Response>` can be
|
- Pluggable transport: any type impling `Stream<Item = Request> + Sink<Response>` can be
|
||||||
used as a transport to connect the client and server.
|
used as a transport to connect the client and server.
|
||||||
- `Send + 'static` optional: if the transport doesn't require it, neither does tarpc!
|
- `Send + 'static` optional: if the transport doesn't require it, neither does tarpc!
|
||||||
- Cascading cancellation: dropping a request will send a cancellation message to the server.
|
- Cascading cancellation: dropping a request will send a cancellation message to the server.
|
||||||
@@ -55,7 +55,7 @@ Some other features of tarpc:
|
|||||||
[tracing](https://github.com/tokio-rs/tracing) primitives extended with
|
[tracing](https://github.com/tokio-rs/tracing) primitives extended with
|
||||||
[OpenTelemetry](https://opentelemetry.io/) traces. Using a compatible tracing subscriber like
|
[OpenTelemetry](https://opentelemetry.io/) traces. Using a compatible tracing subscriber like
|
||||||
[Jaeger](https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-jaeger),
|
[Jaeger](https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-jaeger),
|
||||||
each RPC can be traced through the client, server, and other dependencies downstream of the
|
each RPC can be traced through the client, server, amd other dependencies downstream of the
|
||||||
server. Even for applications not connected to a distributed tracing collector, the
|
server. Even for applications not connected to a distributed tracing collector, the
|
||||||
instrumentation can also be ingested by regular loggers like
|
instrumentation can also be ingested by regular loggers like
|
||||||
[env_logger](https://github.com/env-logger-rs/env_logger/).
|
[env_logger](https://github.com/env-logger-rs/env_logger/).
|
||||||
@@ -81,7 +81,7 @@ your `Cargo.toml`:
|
|||||||
|
|
||||||
```toml
|
```toml
|
||||||
anyhow = "1.0"
|
anyhow = "1.0"
|
||||||
futures = "1.0"
|
futures = "0.3"
|
||||||
tarpc = { version = "0.27", features = ["tokio1"] }
|
tarpc = { version = "0.27", features = ["tokio1"] }
|
||||||
tokio = { version = "1.0", features = ["macros"] }
|
tokio = { version = "1.0", features = ["macros"] }
|
||||||
```
|
```
|
||||||
|
|||||||
22
RELEASES.md
22
RELEASES.md
@@ -2,6 +2,28 @@
|
|||||||
|
|
||||||
### Breaking Changes
|
### Breaking Changes
|
||||||
|
|
||||||
|
### RPC error type is changing
|
||||||
|
|
||||||
|
RPC return types are changing from `Result<Response, io::Error>` to `Result<Response,
|
||||||
|
tarpc::client::RpcError>`.
|
||||||
|
|
||||||
|
Becaue tarpc is a library, not an application, it should strive to
|
||||||
|
use structured errors in its API so that users have maximal flexibility
|
||||||
|
in how they handle errors. io::Error makes that hard, because it is a
|
||||||
|
kitchen-sink error type.
|
||||||
|
|
||||||
|
RPCs in particular only have 3 classes of errors:
|
||||||
|
|
||||||
|
- The connection breaks.
|
||||||
|
- The request expires.
|
||||||
|
- The server decides not to process the request.
|
||||||
|
|
||||||
|
RPC responses can also contain application-specific errors, but from the
|
||||||
|
perspective of the RPC library, those are opaque to the framework, classified
|
||||||
|
as successful responsees.
|
||||||
|
|
||||||
|
### Open Telemetry
|
||||||
|
|
||||||
The Opentelemetry dependency is updated to version 0.16.x.
|
The Opentelemetry dependency is updated to version 0.16.x.
|
||||||
|
|
||||||
## 0.27.0 (2021-09-22)
|
## 0.27.0 (2021-09-22)
|
||||||
|
|||||||
@@ -291,6 +291,9 @@ where
|
|||||||
/// Could not flush the transport.
|
/// Could not flush the transport.
|
||||||
#[error("could not flush the transport")]
|
#[error("could not flush the transport")]
|
||||||
Flush(#[source] E),
|
Flush(#[source] E),
|
||||||
|
/// Could not close the write end of the transport.
|
||||||
|
#[error("could not close the write end of the transport")]
|
||||||
|
Close(#[source] E),
|
||||||
/// Could not poll expired requests.
|
/// Could not poll expired requests.
|
||||||
#[error("could not poll expired requests")]
|
#[error("could not poll expired requests")]
|
||||||
Timer(#[source] tokio::time::error::Error),
|
Timer(#[source] tokio::time::error::Error),
|
||||||
@@ -335,6 +338,15 @@ where
|
|||||||
.map_err(ChannelError::Flush)
|
.map_err(ChannelError::Flush)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_close<'a>(
|
||||||
|
self: &'a mut Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
) -> Poll<Result<(), ChannelError<C::Error>>> {
|
||||||
|
self.transport_pin_mut()
|
||||||
|
.poll_close(cx)
|
||||||
|
.map_err(ChannelError::Close)
|
||||||
|
}
|
||||||
|
|
||||||
fn canceled_requests_mut<'a>(self: &'a mut Pin<&mut Self>) -> &'a mut CanceledRequests {
|
fn canceled_requests_mut<'a>(self: &'a mut Pin<&mut Self>) -> &'a mut CanceledRequests {
|
||||||
self.as_mut().project().canceled_requests
|
self.as_mut().project().canceled_requests
|
||||||
}
|
}
|
||||||
@@ -394,7 +406,7 @@ where
|
|||||||
|
|
||||||
match (pending_requests_status, canceled_requests_status) {
|
match (pending_requests_status, canceled_requests_status) {
|
||||||
(ReceiverStatus::Closed, ReceiverStatus::Closed) => {
|
(ReceiverStatus::Closed, ReceiverStatus::Closed) => {
|
||||||
ready!(self.poll_flush(cx)?);
|
ready!(self.poll_close(cx)?);
|
||||||
Poll::Ready(None)
|
Poll::Ready(None)
|
||||||
}
|
}
|
||||||
(ReceiverStatus::Pending, _) | (_, ReceiverStatus::Pending) => {
|
(ReceiverStatus::Pending, _) | (_, ReceiverStatus::Pending) => {
|
||||||
|
|||||||
@@ -68,7 +68,7 @@
|
|||||||
//!
|
//!
|
||||||
//! ```toml
|
//! ```toml
|
||||||
//! anyhow = "1.0"
|
//! anyhow = "1.0"
|
||||||
//! futures = "1.0"
|
//! futures = "0.3"
|
||||||
//! tarpc = { version = "0.27", features = ["tokio1"] }
|
//! tarpc = { version = "0.27", features = ["tokio1"] }
|
||||||
//! tokio = { version = "1.0", features = ["macros"] }
|
//! tokio = { version = "1.0", features = ["macros"] }
|
||||||
//! ```
|
//! ```
|
||||||
|
|||||||
@@ -333,6 +333,7 @@ where
|
|||||||
type Item = Result<TrackedRequest<Req>, ChannelError<T::Error>>;
|
type Item = Result<TrackedRequest<Req>, ChannelError<T::Error>>;
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
|
#[derive(Debug)]
|
||||||
enum ReceiverStatus {
|
enum ReceiverStatus {
|
||||||
Ready,
|
Ready,
|
||||||
Pending,
|
Pending,
|
||||||
@@ -388,6 +389,11 @@ where
|
|||||||
Poll::Pending => Pending,
|
Poll::Pending => Pending,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
tracing::trace!(
|
||||||
|
"Expired requests: {:?}, Inbound: {:?}",
|
||||||
|
expiration_status,
|
||||||
|
request_status
|
||||||
|
);
|
||||||
match (expiration_status, request_status) {
|
match (expiration_status, request_status) {
|
||||||
(Ready, _) | (_, Ready) => continue,
|
(Ready, _) | (_, Ready) => continue,
|
||||||
(Closed, Closed) => return Poll::Ready(None),
|
(Closed, Closed) => return Poll::Ready(None),
|
||||||
|
|||||||
@@ -98,6 +98,11 @@ impl InFlightRequests {
|
|||||||
&mut self,
|
&mut self,
|
||||||
cx: &mut Context,
|
cx: &mut Context,
|
||||||
) -> Poll<Option<Result<u64, tokio::time::error::Error>>> {
|
) -> Poll<Option<Result<u64, tokio::time::error::Error>>> {
|
||||||
|
if self.deadlines.is_empty() {
|
||||||
|
// TODO(https://github.com/tokio-rs/tokio/issues/4161)
|
||||||
|
// This is a workaround for DelayQueue not always treating this case correctly.
|
||||||
|
return Poll::Ready(None);
|
||||||
|
}
|
||||||
self.deadlines.poll_expired(cx).map_ok(|expired| {
|
self.deadlines.poll_expired(cx).map_ok(|expired| {
|
||||||
if let Some(RequestData {
|
if let Some(RequestData {
|
||||||
abort_handle, span, ..
|
abort_handle, span, ..
|
||||||
@@ -184,12 +189,31 @@ mod tests {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn remove_request_doesnt_abort() {
|
async fn remove_request_doesnt_abort() {
|
||||||
let mut in_flight_requests = InFlightRequests::default();
|
let mut in_flight_requests = InFlightRequests::default();
|
||||||
|
assert!(in_flight_requests.deadlines.is_empty());
|
||||||
|
|
||||||
let abort_registration = in_flight_requests
|
let abort_registration = in_flight_requests
|
||||||
.start_request(0, SystemTime::now(), Span::current())
|
.start_request(
|
||||||
|
0,
|
||||||
|
SystemTime::now() + std::time::Duration::from_secs(10),
|
||||||
|
Span::current(),
|
||||||
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let mut abortable_future = Box::new(Abortable::new(pending::<()>(), abort_registration));
|
let mut abortable_future = Box::new(Abortable::new(pending::<()>(), abort_registration));
|
||||||
|
|
||||||
|
// Precondition: Pending expiration
|
||||||
|
assert_matches!(
|
||||||
|
in_flight_requests.poll_expired(&mut noop_context()),
|
||||||
|
Poll::Pending
|
||||||
|
);
|
||||||
|
assert!(!in_flight_requests.deadlines.is_empty());
|
||||||
|
|
||||||
assert_matches!(in_flight_requests.remove_request(0), Some(_));
|
assert_matches!(in_flight_requests.remove_request(0), Some(_));
|
||||||
|
// Postcondition: No pending expirations
|
||||||
|
assert!(in_flight_requests.deadlines.is_empty());
|
||||||
|
assert_matches!(
|
||||||
|
in_flight_requests.poll_expired(&mut noop_context()),
|
||||||
|
Poll::Ready(None)
|
||||||
|
);
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
abortable_future.poll_unpin(&mut noop_context()),
|
abortable_future.poll_unpin(&mut noop_context()),
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ use std::{fmt, hash::Hash};
|
|||||||
#[cfg(feature = "tokio1")]
|
#[cfg(feature = "tokio1")]
|
||||||
use super::{tokio::TokioServerExecutor, Serve};
|
use super::{tokio::TokioServerExecutor, Serve};
|
||||||
|
|
||||||
/// An extension trait for [streams](Stream) of [`Channels`](Channel).
|
/// An extension trait for [streams](futures::prelude::Stream) of [`Channels`](Channel).
|
||||||
pub trait Incoming<C>
|
pub trait Incoming<C>
|
||||||
where
|
where
|
||||||
Self: Sized + Stream<Item = C>,
|
Self: Sized + Stream<Item = C>,
|
||||||
|
|||||||
Reference in New Issue
Block a user