Return transport errors to the caller (#399)

* Make client::InFlightRequests generic over result.

Previously, InFlightRequests required the client response type to be a
server response. However, this prevented injection of non-server
responses: for example, if the client fails to send a request, it should
complete the request with an IO error rather than a server error.

* Gracefully handle client-side send errors.

Previously, a client channel would immediately disconnect when
encountering an error in Transport::try_send. One kind of error that can
occur in try_send is message validation, e.g. validating a message is
not larger than a configured frame size. The problem with shutting down
the client immediately is that debuggability suffers: it can be hard to
understand what caused the client to fail. Also, these errors are not
always fatal, as with frame size limits, so complete shutdown was
extreme.

By bubbling up errors, it's now possible for the caller to
programmatically handle them. For example, the error could be walked
via anyhow::Error:

```
    2023-01-10T02:49:32.528939Z  WARN client: the client failed to send the request

    Caused by:
        0: could not write to the transport
        1: frame size too big
```

* Some follow-up work: right now, read errors will bubble up to all pending RPCs. However, on the write side, only `start_send` bubbles up. `poll_ready`, `poll_flush`, and `poll_close` do not propagate back to pending RPCs. This is probably okay in most circumstances, because fatal write errors likely coincide with fatal read errors, which *do* propagate back to clients. But it might still be worth unifying this logic.

---------

Co-authored-by: Tim Kuehn <tikue@google.com>
This commit is contained in:
Bruno
2023-03-24 22:31:25 +01:00
committed by GitHub
parent 878f594d5b
commit 93f3880025
5 changed files with 311 additions and 114 deletions

View File

@@ -26,7 +26,8 @@ async fn main() -> anyhow::Result<()> {
let flags = Flags::parse();
init_tracing("Tarpc Example Client")?;
let transport = tarpc::serde_transport::tcp::connect(flags.server_addr, Json::default);
let mut transport = tarpc::serde_transport::tcp::connect(flags.server_addr, Json::default);
transport.config_mut().max_frame_length(usize::MAX);
// WorldClient is generated by the service attribute. It has a constructor `new` that takes a
// config and any Transport as input.
@@ -42,7 +43,10 @@ async fn main() -> anyhow::Result<()> {
.instrument(tracing::info_span!("Two Hellos"))
.await;
tracing::info!("{:?}", hello);
match hello {
Ok(hello) => tracing::info!("{hello:?}"),
Err(e) => tracing::warn!("{:?}", anyhow::Error::from(e)),
}
// Let the background span processor finish.
sleep(Duration::from_micros(1)).await;

View File

@@ -10,14 +10,13 @@ mod in_flight_requests;
use crate::{
cancellations::{cancellations, CanceledRequests, RequestCancellation},
context, trace, ClientMessage, Request, Response, ServerError, Transport,
context, trace, ChannelError, ClientMessage, Request, Response, ServerError, Transport,
};
use futures::{prelude::*, ready, stream::Fuse, task::*};
use in_flight_requests::{DeadlineExceededError, InFlightRequests};
use in_flight_requests::InFlightRequests;
use pin_project::pin_project;
use std::{
convert::TryFrom,
error::Error,
fmt,
pin::Pin,
sync::{
@@ -158,7 +157,7 @@ impl<Req, Resp> Channel<Req, Resp> {
response_completion,
})
.await
.map_err(|mpsc::error::SendError(_)| RpcError::Disconnected)?;
.map_err(|mpsc::error::SendError(_)| RpcError::Shutdown)?;
response_guard.response().await
}
}
@@ -166,7 +165,7 @@ impl<Req, Resp> Channel<Req, Resp> {
/// A server response that is completed by request dispatch when the corresponding response
/// arrives off the wire.
struct ResponseGuard<'a, Resp> {
response: &'a mut oneshot::Receiver<Result<Response<Resp>, DeadlineExceededError>>,
response: &'a mut oneshot::Receiver<Result<Resp, RpcError>>,
cancellation: &'a RequestCancellation,
request_id: u64,
cancel: bool,
@@ -174,12 +173,17 @@ struct ResponseGuard<'a, Resp> {
/// An error that can occur in the processing of an RPC. This is not request-specific errors but
/// rather cross-cutting errors that can always occur.
#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[derive(thiserror::Error, Debug)]
pub enum RpcError {
/// The client disconnected from the server.
#[error("the client disconnected from the server")]
Disconnected,
#[error("the connection to the server was already shutdown")]
Shutdown,
/// The client failed to send the request.
#[error("the client failed to send the request")]
Send(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
/// An error occurred while waiting for the server response.
#[error("an error occurred while waiting for the server response")]
Receive(#[source] Arc<dyn std::error::Error + Send + Sync + 'static>),
/// The request exceeded its deadline.
#[error("the request exceeded its deadline")]
DeadlineExceeded,
@@ -188,24 +192,18 @@ pub enum RpcError {
Server(#[from] ServerError),
}
impl From<DeadlineExceededError> for RpcError {
fn from(_: DeadlineExceededError) -> Self {
RpcError::DeadlineExceeded
}
}
impl<Resp> ResponseGuard<'_, Resp> {
async fn response(mut self) -> Result<Resp, RpcError> {
let response = (&mut self.response).await;
// Cancel drop logic once a response has been received.
self.cancel = false;
match response {
Ok(resp) => Ok(resp?.message?),
Ok(response) => response,
Err(oneshot::error::RecvError { .. }) => {
// The oneshot is Canceled when the dispatch task ends. In that case,
// there's nothing listening on the other side, so there's no point in
// propagating cancellation.
Err(RpcError::Disconnected)
Err(RpcError::Shutdown)
}
}
}
@@ -242,7 +240,6 @@ where
{
let (to_dispatch, pending_requests) = mpsc::channel(config.pending_request_buffer);
let (cancellation, canceled_requests) = cancellations();
let canceled_requests = canceled_requests;
NewClient {
client: Channel {
@@ -274,42 +271,18 @@ pub struct RequestDispatch<Req, Resp, C> {
/// Requests that were dropped.
canceled_requests: CanceledRequests,
/// Requests already written to the wire that haven't yet received responses.
in_flight_requests: InFlightRequests<Resp>,
in_flight_requests: InFlightRequests<Result<Resp, RpcError>>,
/// Configures limits to prevent unlimited resource usage.
config: Config,
}
/// Critical errors that result in a Channel disconnecting.
#[derive(thiserror::Error, Debug)]
pub enum ChannelError<E>
where
E: Error + Send + Sync + 'static,
{
/// Could not read from the transport.
#[error("could not read from the transport")]
Read(#[source] E),
/// Could not ready the transport for writes.
#[error("could not ready the transport for writes")]
Ready(#[source] E),
/// Could not write to the transport.
#[error("could not write to the transport")]
Write(#[source] E),
/// Could not flush the transport.
#[error("could not flush the transport")]
Flush(#[source] E),
/// Could not close the write end of the transport.
#[error("could not close the write end of the transport")]
Close(#[source] E),
/// Could not poll expired requests.
#[error("could not poll expired requests")]
Timer(#[source] tokio::time::error::Error),
}
impl<Req, Resp, C> RequestDispatch<Req, Resp, C>
where
C: Transport<ClientMessage<Req>, Response<Resp>>,
{
fn in_flight_requests<'a>(self: &'a mut Pin<&mut Self>) -> &'a mut InFlightRequests<Resp> {
fn in_flight_requests<'a>(
self: &'a mut Pin<&mut Self>,
) -> &'a mut InFlightRequests<Result<Resp, RpcError>> {
self.as_mut().project().in_flight_requests
}
@@ -369,7 +342,17 @@ where
) -> Poll<Option<Result<(), ChannelError<C::Error>>>> {
self.transport_pin_mut()
.poll_next(cx)
.map_err(ChannelError::Read)
.map_err(|e| {
let e = Arc::new(e);
for span in self
.in_flight_requests()
.complete_all_requests(|| Err(RpcError::Receive(e.clone())))
{
let _entered = span.enter();
tracing::info!("ReceiveError");
}
ChannelError::Read(e)
})
.map_ok(|response| {
self.complete(response);
})
@@ -399,7 +382,10 @@ where
// Receiving Poll::Ready(None) when polling expired requests never indicates "Closed",
// because there can temporarily be zero in-flight rquests. Therefore, there is no need to
// track the status like is done with pending and cancelled requests.
if let Poll::Ready(Some(_)) = self.in_flight_requests().poll_expired(cx) {
if let Poll::Ready(Some(_)) = self
.in_flight_requests()
.poll_expired(cx, || Err(RpcError::DeadlineExceeded))
{
// Expired requests are considered complete; there is no compelling reason to send a
// cancellation message to the server, since it will have already exhausted its
// allotted processing time.
@@ -510,7 +496,7 @@ where
Some(dispatch_request) => dispatch_request,
None => return Poll::Ready(None),
};
let entered = span.enter();
let _entered = span.enter();
// poll_next_request only returns Ready if there is room to buffer another request.
// Therefore, we can call write_request without fear of erroring due to a full
// buffer.
@@ -523,13 +509,16 @@ where
trace_context: ctx.trace_context,
},
});
self.start_send(request)?;
tracing::info!("SendRequest");
drop(entered);
self.in_flight_requests()
.insert_request(request_id, ctx, span, response_completion)
.insert_request(request_id, ctx, span.clone(), response_completion)
.expect("Request IDs should be unique");
match self.start_send(request) {
Ok(()) => tracing::info!("SendRequest"),
Err(e) => {
self.in_flight_requests()
.complete_request(request_id, Err(RpcError::Send(Box::new(e))));
}
}
Poll::Ready(Some(Ok(())))
}
@@ -554,7 +543,10 @@ where
/// Sends a server response to the client task that initiated the associated request.
fn complete(mut self: Pin<&mut Self>, response: Response<Resp>) -> bool {
self.in_flight_requests().complete_request(response)
self.in_flight_requests().complete_request(
response.request_id,
response.message.map_err(RpcError::Server),
)
}
}
@@ -603,30 +595,37 @@ struct DispatchRequest<Req, Resp> {
pub span: Span,
pub request_id: u64,
pub request: Req,
pub response_completion: oneshot::Sender<Result<Response<Resp>, DeadlineExceededError>>,
pub response_completion: oneshot::Sender<Result<Resp, RpcError>>,
}
#[cfg(test)]
mod tests {
use super::{cancellations, Channel, DispatchRequest, RequestDispatch, ResponseGuard};
use super::{
cancellations, Channel, DispatchRequest, RequestDispatch, ResponseGuard, RpcError,
};
use crate::{
client::{
in_flight_requests::{DeadlineExceededError, InFlightRequests},
Config,
},
context,
client::{in_flight_requests::InFlightRequests, Config},
context::{self, current},
transport::{self, channel::UnboundedChannel},
ClientMessage, Response,
ChannelError, ClientMessage, Response,
};
use assert_matches::assert_matches;
use futures::{prelude::*, task::*};
use std::{
convert::TryFrom,
fmt::Display,
marker::PhantomData,
pin::Pin,
sync::atomic::{AtomicUsize, Ordering},
sync::Arc,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use thiserror::Error;
use tokio::sync::{
mpsc::{self},
oneshot,
};
use tokio::sync::{mpsc, oneshot};
use tracing::Span;
#[tokio::test]
@@ -647,7 +646,7 @@ mod tests {
.await
.unwrap();
assert_matches!(dispatch.as_mut().poll(cx), Poll::Pending);
assert_matches!(rx.try_recv(), Ok(Ok(Response { request_id: 0, message: Ok(resp) })) if resp == "Resp");
assert_matches!(rx.try_recv(), Ok(Ok(resp)) if resp == "Resp");
}
#[tokio::test]
@@ -781,6 +780,185 @@ mod tests {
assert!(dispatch.as_mut().poll_next_request(cx).is_pending());
}
#[tokio::test]
async fn test_shutdown_error() {
let _ = tracing_subscriber::fmt().with_test_writer().try_init();
let (dispatch, mut channel, _) = set_up();
let (tx, mut rx) = oneshot::channel();
// send succeeds
let resp = send_request(&mut channel, "hi", tx, &mut rx).await;
drop(dispatch);
// error on receive
assert_matches!(resp.response().await, Err(RpcError::Shutdown));
let (dispatch, channel, _) = set_up();
drop(dispatch);
// error on send
let resp = channel
.call(current(), "test_request", "hi".to_string())
.await;
assert_matches!(resp, Err(RpcError::Shutdown));
}
#[tokio::test]
async fn test_transport_error_write() {
let cause = TransportError::Write;
let (mut dispatch, mut channel, mut cx) = setup_always_err(cause);
let (tx, mut rx) = oneshot::channel();
let resp = send_request(&mut channel, "hi", tx, &mut rx).await;
assert!(dispatch.as_mut().poll(&mut cx).is_pending());
let res = resp.response().await;
assert_matches!(res, Err(RpcError::Send(_)));
let client_error: anyhow::Error = res.unwrap_err().into();
let mut chain = client_error.chain();
chain.next(); // original RpcError
assert_eq!(
chain
.next()
.unwrap()
.downcast_ref::<ChannelError<TransportError>>(),
Some(&ChannelError::Write(cause))
);
assert_eq!(
client_error.root_cause().downcast_ref::<TransportError>(),
Some(&cause)
);
}
#[tokio::test]
async fn test_transport_error_read() {
let cause = TransportError::Read;
let (mut dispatch, mut channel, mut cx) = setup_always_err(cause);
let (tx, mut rx) = oneshot::channel();
let resp = send_request(&mut channel, "hi", tx, &mut rx).await;
assert_eq!(
dispatch.as_mut().pump_write(&mut cx),
Poll::Ready(Some(Ok(())))
);
assert_eq!(
dispatch.as_mut().pump_read(&mut cx),
Poll::Ready(Some(Err(ChannelError::Read(Arc::new(cause)))))
);
assert_matches!(resp.response().await, Err(RpcError::Receive(_)));
}
#[tokio::test]
async fn test_transport_error_ready() {
let cause = TransportError::Ready;
let (mut dispatch, _, mut cx) = setup_always_err(cause);
assert_eq!(
dispatch.as_mut().poll(&mut cx),
Poll::Ready(Err(ChannelError::Ready(cause)))
);
}
#[tokio::test]
async fn test_transport_error_flush() {
let cause = TransportError::Flush;
let (mut dispatch, _, mut cx) = setup_always_err(cause);
assert_eq!(
dispatch.as_mut().poll(&mut cx),
Poll::Ready(Err(ChannelError::Flush(cause)))
);
}
#[tokio::test]
async fn test_transport_error_close() {
let cause = TransportError::Close;
let (mut dispatch, channel, mut cx) = setup_always_err(cause);
drop(channel);
assert_eq!(
dispatch.as_mut().poll(&mut cx),
Poll::Ready(Err(ChannelError::Close(cause)))
);
}
fn setup_always_err(
cause: TransportError,
) -> (
Pin<Box<RequestDispatch<String, String, AlwaysErrorTransport<String>>>>,
Channel<String, String>,
Context<'static>,
) {
let (to_dispatch, pending_requests) = mpsc::channel(1);
let (cancellation, canceled_requests) = cancellations();
let transport: AlwaysErrorTransport<String> = AlwaysErrorTransport(cause, PhantomData);
let dispatch = Box::pin(RequestDispatch::<String, String, _> {
transport: transport.fuse(),
pending_requests,
canceled_requests,
in_flight_requests: InFlightRequests::default(),
config: Config::default(),
});
let channel = Channel {
to_dispatch,
cancellation,
next_request_id: Arc::new(AtomicUsize::new(0)),
};
let cx = Context::from_waker(noop_waker_ref());
(dispatch, channel, cx)
}
struct AlwaysErrorTransport<I>(TransportError, PhantomData<I>);
#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
enum TransportError {
Read,
Ready,
Write,
Flush,
Close,
}
impl Display for TransportError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&format!("{self:?}"))
}
}
impl<I: Clone, S> Sink<S> for AlwaysErrorTransport<I> {
type Error = TransportError;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.0 {
TransportError::Ready => Poll::Ready(Err(self.0)),
TransportError::Flush => Poll::Pending,
_ => Poll::Ready(Ok(())),
}
}
fn start_send(self: Pin<&mut Self>, _: S) -> Result<(), Self::Error> {
if matches!(self.0, TransportError::Write) {
Err(self.0)
} else {
Ok(())
}
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if matches!(self.0, TransportError::Flush) {
Poll::Ready(Err(self.0))
} else {
Poll::Ready(Ok(()))
}
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if matches!(self.0, TransportError::Close) {
Poll::Ready(Err(self.0))
} else {
Poll::Ready(Ok(()))
}
}
}
impl<I: Clone> Stream for AlwaysErrorTransport<I> {
type Item = Result<Response<I>, TransportError>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if matches!(self.0, TransportError::Read) {
Poll::Ready(Some(Err(self.0)))
} else {
Poll::Pending
}
}
}
fn set_up() -> (
Pin<
Box<
@@ -820,8 +998,8 @@ mod tests {
async fn send_request<'a>(
channel: &'a mut Channel<String, String>,
request: &str,
response_completion: oneshot::Sender<Result<Response<String>, DeadlineExceededError>>,
response: &'a mut oneshot::Receiver<Result<Response<String>, DeadlineExceededError>>,
response_completion: oneshot::Sender<Result<String, RpcError>>,
response: &'a mut oneshot::Receiver<Result<String, RpcError>>,
) -> ResponseGuard<'a, String> {
let request_id =
u64::try_from(channel.next_request_id.fetch_add(1, Ordering::Relaxed)).unwrap();

View File

@@ -1,7 +1,6 @@
use crate::{
context,
util::{Compact, TimeUntil},
Response,
};
use fnv::FnvHashMap;
use std::{
@@ -28,17 +27,11 @@ impl<Resp> Default for InFlightRequests<Resp> {
}
}
/// The request exceeded its deadline.
#[derive(thiserror::Error, Debug)]
#[non_exhaustive]
#[error("the request exceeded its deadline")]
pub struct DeadlineExceededError;
#[derive(Debug)]
struct RequestData<Resp> {
struct RequestData<Res> {
ctx: context::Context,
span: Span,
response_completion: oneshot::Sender<Result<Response<Resp>, DeadlineExceededError>>,
response_completion: oneshot::Sender<Res>,
/// The key to remove the timer for the request's deadline.
deadline_key: delay_queue::Key,
}
@@ -48,7 +41,7 @@ struct RequestData<Resp> {
#[derive(Debug)]
pub struct AlreadyExistsError;
impl<Resp> InFlightRequests<Resp> {
impl<Res> InFlightRequests<Res> {
/// Returns the number of in-flight requests.
pub fn len(&self) -> usize {
self.request_data.len()
@@ -65,7 +58,7 @@ impl<Resp> InFlightRequests<Resp> {
request_id: u64,
ctx: context::Context,
span: Span,
response_completion: oneshot::Sender<Result<Response<Resp>, DeadlineExceededError>>,
response_completion: oneshot::Sender<Res>,
) -> Result<(), AlreadyExistsError> {
match self.request_data.entry(request_id) {
hash_map::Entry::Vacant(vacant) => {
@@ -84,25 +77,35 @@ impl<Resp> InFlightRequests<Resp> {
}
/// Removes a request without aborting. Returns true iff the request was found.
pub fn complete_request(&mut self, response: Response<Resp>) -> bool {
if let Some(request_data) = self.request_data.remove(&response.request_id) {
pub fn complete_request(&mut self, request_id: u64, result: Res) -> bool {
if let Some(request_data) = self.request_data.remove(&request_id) {
let _entered = request_data.span.enter();
tracing::info!("ReceiveResponse");
self.request_data.compact(0.1);
self.deadlines.remove(&request_data.deadline_key);
let _ = request_data.response_completion.send(Ok(response));
let _ = request_data.response_completion.send(result);
return true;
}
tracing::debug!(
"No in-flight request found for request_id = {}.",
response.request_id
);
tracing::debug!("No in-flight request found for request_id = {request_id}.");
// If the response completion was absent, then the request was already canceled.
false
}
/// Completes all requests using the provided function.
/// Returns Spans for all completes requests.
pub fn complete_all_requests<'a>(
&'a mut self,
mut result: impl FnMut() -> Res + 'a,
) -> impl Iterator<Item = Span> + 'a {
self.deadlines.clear();
self.request_data.drain().map(move |(_, request_data)| {
let _ = request_data.response_completion.send(result());
request_data.span
})
}
/// Cancels a request without completing (typically used when a request handle was dropped
/// before the request completed).
pub fn cancel_request(&mut self, request_id: u64) -> Option<(context::Context, Span)> {
@@ -117,16 +120,18 @@ impl<Resp> InFlightRequests<Resp> {
/// Yields a request that has expired, completing it with a TimedOut error.
/// The caller should send cancellation messages for any yielded request ID.
pub fn poll_expired(&mut self, cx: &mut Context) -> Poll<Option<u64>> {
pub fn poll_expired(
&mut self,
cx: &mut Context,
expired_error: impl Fn() -> Res,
) -> Poll<Option<u64>> {
self.deadlines.poll_expired(cx).map(|expired| {
let request_id = expired?.into_inner();
if let Some(request_data) = self.request_data.remove(&request_id) {
let _entered = request_data.span.enter();
tracing::error!("DeadlineExceeded");
self.request_data.compact(0.1);
let _ = request_data
.response_completion
.send(Err(DeadlineExceededError));
let _ = request_data.response_completion.send(expired_error());
}
Some(request_id)
})

View File

@@ -311,6 +311,7 @@ pub use crate::transport::sealed::Transport;
use anyhow::Context as _;
use futures::task::*;
use std::sync::Arc;
use std::{error::Error, fmt::Display, io, time::SystemTime};
/// A message from a client to a server.
@@ -383,6 +384,29 @@ pub struct ServerError {
pub detail: String,
}
/// Critical errors that result in a Channel disconnecting.
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
pub enum ChannelError<E>
where
E: Error + Send + Sync + 'static,
{
/// Could not read from the transport.
#[error("could not read from the transport")]
Read(#[source] Arc<E>),
/// Could not ready the transport for writes.
#[error("could not ready the transport for writes")]
Ready(#[source] E),
/// Could not write to the transport.
#[error("could not write to the transport")]
Write(#[source] E),
/// Could not flush the transport.
#[error("could not flush the transport")]
Flush(#[source] E),
/// Could not close the write end of the transport.
#[error("could not close the write end of the transport")]
Close(#[source] E),
}
impl<T> Request<T> {
/// Returns the deadline for this request.
pub fn deadline(&self) -> &SystemTime {

View File

@@ -9,7 +9,7 @@
use crate::{
cancellations::{cancellations, CanceledRequests, RequestCancellation},
context::{self, SpanExt},
trace, ClientMessage, Request, Response, Transport,
trace, ChannelError, ClientMessage, Request, Response, Transport,
};
use ::tokio::sync::mpsc;
use futures::{
@@ -21,7 +21,7 @@ use futures::{
};
use in_flight_requests::{AlreadyExistsError, InFlightRequests};
use pin_project::pin_project;
use std::{convert::TryFrom, error::Error, fmt, marker::PhantomData, pin::Pin};
use std::{convert::TryFrom, error::Error, fmt, marker::PhantomData, pin::Pin, sync::Arc};
use tracing::{info_span, instrument::Instrument, Span};
mod in_flight_requests;
@@ -337,20 +337,6 @@ where
}
}
/// Critical errors that result in a Channel disconnecting.
#[derive(thiserror::Error, Debug)]
pub enum ChannelError<E>
where
E: Error + Send + Sync + 'static,
{
/// An error occurred reading from, or writing to, the transport.
#[error("an error occurred in the transport: {0}")]
Transport(#[source] E),
/// An error occurred while polling expired requests.
#[error("an error occurred while polling expired requests: {0}")]
Timer(#[source] ::tokio::time::error::Error),
}
impl<Req, Resp, T> Stream for BaseChannel<Req, Resp, T>
where
T: Transport<Response<Resp>, ClientMessage<Req>>,
@@ -407,7 +393,7 @@ where
let request_status = match self
.transport_pin_mut()
.poll_next(cx)
.map_err(ChannelError::Transport)?
.map_err(|e| ChannelError::Read(Arc::new(e)))?
{
Poll::Ready(Some(message)) => match message {
ClientMessage::Request(request) => {
@@ -467,7 +453,7 @@ where
self.project()
.transport
.poll_ready(cx)
.map_err(ChannelError::Transport)
.map_err(ChannelError::Ready)
}
fn start_send(mut self: Pin<&mut Self>, response: Response<Resp>) -> Result<(), Self::Error> {
@@ -480,7 +466,7 @@ where
self.project()
.transport
.start_send(response)
.map_err(ChannelError::Transport)
.map_err(ChannelError::Write)
} else {
// If the request isn't tracked anymore, there's no need to send the response.
Ok(())
@@ -492,14 +478,14 @@ where
self.project()
.transport
.poll_flush(cx)
.map_err(ChannelError::Transport)
.map_err(ChannelError::Flush)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.project()
.transport
.poll_close(cx)
.map_err(ChannelError::Transport)
.map_err(ChannelError::Close)
}
}