The major breaking change is that Channel::execute no longer internally
spawns RPC handlers, because it is no longer possible to place a Send
bound on the return type of Serve::serve. Instead, Channel::execute
returns a stream of RPC handler futures.
Service authors can reproduce the old behavior by spawning each response
handler (the compiler knows whether or not the futures can be spawned;
it's just that the bounds can't be expressed generically):
channel.execute(server.serve())
.for_each(|rpc| { tokio::spawn(rpc); })
* Make client::InFlightRequests generic over result.
Previously, InFlightRequests required the client response type to be a
server response. However, this prevented injection of non-server
responses: for example, if the client fails to send a request, it should
complete the request with an IO error rather than a server error.
* Gracefully handle client-side send errors.
Previously, a client channel would immediately disconnect when
encountering an error in Transport::try_send. One kind of error that can
occur in try_send is message validation, e.g. validating a message is
not larger than a configured frame size. The problem with shutting down
the client immediately is that debuggability suffers: it can be hard to
understand what caused the client to fail. Also, these errors are not
always fatal, as with frame size limits, so complete shutdown was
extreme.
By bubbling up errors, it's now possible for the caller to
programmatically handle them. For example, the error could be walked
via anyhow::Error:
```
2023-01-10T02:49:32.528939Z WARN client: the client failed to send the request
Caused by:
0: could not write to the transport
1: frame size too big
```
* Some follow-up work: right now, read errors will bubble up to all pending RPCs. However, on the write side, only `start_send` bubbles up. `poll_ready`, `poll_flush`, and `poll_close` do not propagate back to pending RPCs. This is probably okay in most circumstances, because fatal write errors likely coincide with fatal read errors, which *do* propagate back to clients. But it might still be worth unifying this logic.
---------
Co-authored-by: Tim Kuehn <tikue@google.com>
## Problem
Library users might get stuck with or ran into issues while using tarpc because of incompatible third party libraries. in particular, tokio_serde and tokio_util.
## Solution
This PR does the following:
1. re-export tokio_serde as part of feature serde-transport, because the end user imports it to use some serde-transport APIs.
2. Update third library packages to latest release and fix resulting issues from that.
## Important Notes
tokio_util 7.3 DelayQueue::poll_expired API changed [0] therefore, InFlightRequests::poll_expired now returns Poll<Option<u64>>
[0] https://docs.rs/tokio-util/latest/tokio_util/time/delay_queue/struct.DelayQueue.html#method.poll_expired
In the interest of the user's attention, some ancillary APIs have been
moved to new submodules:
- server::limits contains what was previously called Throttler and
ChannelFilter. Both of those names were very generic, when the methods
applied by these types were very specific (and also simplistic). Renames
have occurred:
- ThrottlerStream => MaxRequestsPerChannel
- Throttler => MaxRequests
- ChannelFilter => MaxChannelsPerKey
- server::incoming contains the Incoming trait.
- server::tokio contains the tokio-specific helper types.
The 5 structs and 1 enum remaining in the base server module are all
core to the functioning of the server.
tarpc is now instrumented with tracing primitives extended with
OpenTelemetry traces. Using a compatible tracing-opentelemetry
subscriber like Jaeger, each RPC can be traced through the client,
server, amd other dependencies downstream of the server. Even for
applications not connected to a distributed tracing collector, the
instrumentation can also be ingested by regular loggers like env_logger.
# Breaking Changes
## Logging
Logged events are now structured using tracing. For applications using a
logger and not a tracing subscriber, these logs may look different or
contain information in a less consumable manner. The easiest solution is
to add a tracing subscriber that logs to stdout, such as
tracing_subscriber::fmt.
## Context
- Context no longer has parent_span, which was actually never needed,
because the context sent in an RPC is inherently the parent context.
For purposes of distributed tracing, the client side of the RPC has all
necessary information to link the span to its parent; the server side
need do nothing more than export the (trace ID, span ID) tuple.
- Context has a new field, SamplingDecision, which has two variants,
Sampled and Unsampled. This field can be used by downstream systems to
determine whether a trace needs to be exported. If the parent span is
sampled, the expectation is that all child spans be exported, as well;
to do otherwise could result in lossy traces being exported. Note that
if an Openetelemetry tracing subscriber is not installed, the fallback
context will still be used, but the Context's sampling decision will
always be inherited by the parent Context's sampling decision.
- Context::scope has been removed. Context propagation is now done via
tracing's task-local spans. Spans can be propagated across tasks via
Span::in_scope. When a service receives a request, it attaches an
Opentelemetry context to the local Span created before request handling,
and this context contains the request deadline. This span-local deadline
is retrieved by Context::current, but it cannot be modified so that
future Context::current calls contain a different deadline. However, the
deadline in the context passed into an RPC call will override it, so
users can retrieve the current context and then modify the deadline
field, as has been historically possible.
- Context propgation precedence changes: when an RPC is initiated, the
current Span's Opentelemetry context takes precedence over the trace
context passed into the RPC method. If there is no current Span, then
the trace context argument is used as it has been historically. Note
that Opentelemetry context propagation requires an Opentelemetry
tracing subscriber to be installed.
## Server
- The server::Channel trait now has an additional required associated
type and method which returns the underlying transport. This makes it
more ergonomic for users to retrieve transport-specific information,
like IP Address. BaseChannel implements Channel::transport by returning
the underlying transport, and channel decorators like Throttler just
delegate to the Channel::transport method of the wrapped channel.
# References
[1] https://github.com/tokio-rs/tracing
[2] https://opentelemetry.io
[3] https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-jaeger
[4] https://github.com/env-logger-rs/env_logger
This required the breaking change of removing the Client trait. The
intent of the Client trait was to facilitate the decorator pattern by
allowing users to create their own Clients that added behavior on top of
the base client. Unfortunately, this trait had become a maintenance
burden, consistently causing issues with lifetimes and the lack of
generic associated types. Specifically, it meant that Client impls could
not use async fns, which is no longer tenable today.
1. Renames
Some of the items in this module were renamed to be less generic:
- Handler => Incoming
- ClientHandler => Requests
- ResponseHandler => InFlightRequest
- Channel::{respond_with => requests}
In the case of Handler: handler of *what*? Now it's a bit clearer that
this is a stream of Channels (aka *incoming* connections).
Similarly, ClientHandler was a stream of requests over a single
connection. Hopefully Requests better reflects that.
ResponseHandler was renamed InFlightRequest because it no longer
contains the serving function. Instead, it is just the request, plus
the response channel and an abort hook. As a result of this,
Channel::respond_with underwent a big change: it used to take the
serving function and return a ClientHandler; now it has been renamed
Channel::requests and does not take any args.
2. Execute methods
All methods thats actually result in responses being generated
have been consolidated into methods named `execute`:
- InFlightRequest::execute returns a future that completes when a
response has been generated and sent to the server Channel.
- Requests::execute automatically spawns response handlers for all
requests over a single channel.
- Channel::execute is a convenience for `channel.requests().execute()`.
- Incoming::execute automatically spawns response handlers for all
requests over all channels.
3. Removal of Server.
server::Server was removed, as it provided no value over the Incoming/Channel
abstractions. Additionally, server::new was removed, since it just
returned a Server.
I don't know what the intention was behind using u32::MAX + 1 but since the
argument's type is usize this is the only giant value that makes sense to me.
Instead, serde_transport::tcp::connect returns a future named Connect
that has methods to directly access the framing config. This is
consistent with how serde_transport::tcp::listen returns a future with
methods to access the framing config. In addition to this consistency,
it reduces the API surface and provides a simpler user transition from
"zero config" to "some config".
This PR obsoletes the JSON and Bincode transports and instead introduces a unified transport that
is generic over any tokio-serde serialization format as well as AsyncRead + AsyncWrite medium.
This comes with a slight hit for usability (having to manually specify the underlying transport
and codec), but it can be alleviated by making custom freestanding connect and listen fns.