Add transport::channel::bounded.

This is like transport::channel::unbounded but with a fixed buffer size.
This commit is contained in:
Tim Kuehn
2021-03-08 22:23:46 -08:00
parent 27aacab432
commit 0964fc51ff
2 changed files with 77 additions and 10 deletions

View File

@@ -196,8 +196,8 @@ impl<Req, Resp, T> fmt::Debug for BaseChannel<Req, Resp, T> {
/// do not have specific scheduling needs and whose services are `Send + 'static`.
/// 2. [`Channel::requests`] - This method is best for those who need direct access to individual
/// requests, or are not using `tokio`, or want control over [futures](Future) scheduling.
/// [`Requests`] is a stream of [`InFlightRequests`](InFlightRequest), which each have an
/// [`execute`](InFightRequest::execute) method. If using `execute`, request processing will
/// [`Requests`] is a stream of [`InFlightRequests`](InFlightRequest), each which has an
/// [`execute`](InFlightRequest::execute) method. If using `execute`, request processing will
/// automatically cease when either the request deadline is reached or when a corresponding
/// cancellation message is received by the Channel.
/// 3. [`Sink::start_send`] - A user is free to manually send responses to requests produced by a

View File

@@ -29,20 +29,17 @@ pub fn unbounded<SinkItem, Item>() -> (
/// A bi-directional channel backed by an [`UnboundedSender`](mpsc::UnboundedSender)
/// and [`UnboundedReceiver`](mpsc::UnboundedReceiver).
#[pin_project]
#[derive(Debug)]
pub struct UnboundedChannel<Item, SinkItem> {
#[pin]
rx: mpsc::UnboundedReceiver<Item>,
#[pin]
tx: mpsc::UnboundedSender<SinkItem>,
}
impl<Item, SinkItem> Stream for UnboundedChannel<Item, SinkItem> {
type Item = Result<Item, io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<Item> {
self.project().rx.poll_recv(cx).map(|option| option.map(Ok))
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<Item> {
self.rx.poll_recv(cx).map(|option| option.map(Ok))
}
}
@@ -50,7 +47,7 @@ impl<Item, SinkItem> Sink<SinkItem> for UnboundedChannel<Item, SinkItem> {
type Error = io::Error;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(if self.project().tx.is_closed() {
Poll::Ready(if self.tx.is_closed() {
Err(io::Error::from(io::ErrorKind::NotConnected))
} else {
Ok(())
@@ -58,8 +55,7 @@ impl<Item, SinkItem> Sink<SinkItem> for UnboundedChannel<Item, SinkItem> {
}
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
self.project()
.tx
self.tx
.send(item)
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
}
@@ -75,6 +71,77 @@ impl<Item, SinkItem> Sink<SinkItem> for UnboundedChannel<Item, SinkItem> {
}
}
/// Returns two channel peers with buffer equal to `capacity`. Each [`Stream`] yields items sent
/// through the other's [`Sink`].
pub fn bounded<SinkItem, Item>(
capacity: usize,
) -> (Channel<SinkItem, Item>, Channel<Item, SinkItem>) {
let (tx1, rx2) = futures::channel::mpsc::channel(capacity);
let (tx2, rx1) = futures::channel::mpsc::channel(capacity);
(Channel { tx: tx1, rx: rx1 }, Channel { tx: tx2, rx: rx2 })
}
/// A bi-directional channel backed by a [`Sender`](futures::channel::mpsc::Sender)
/// and [`Receiver`](futures::channel::mpsc::Receiver).
#[pin_project]
#[derive(Debug)]
pub struct Channel<Item, SinkItem> {
#[pin]
rx: futures::channel::mpsc::Receiver<Item>,
#[pin]
tx: futures::channel::mpsc::Sender<SinkItem>,
}
impl<Item, SinkItem> Stream for Channel<Item, SinkItem> {
type Item = Result<Item, io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<Item> {
self.project().rx.poll_next(cx).map(|option| option.map(Ok))
}
}
impl<Item, SinkItem> Sink<SinkItem> for Channel<Item, SinkItem> {
type Error = io::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project()
.tx
.poll_ready(cx)
.map_err(convert_send_err_to_io)
}
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
self.project()
.tx
.start_send(item)
.map_err(convert_send_err_to_io)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project()
.tx
.poll_flush(cx)
.map_err(convert_send_err_to_io)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project()
.tx
.poll_close(cx)
.map_err(convert_send_err_to_io)
}
}
fn convert_send_err_to_io(e: futures::channel::mpsc::SendError) -> io::Error {
if e.is_disconnected() {
io::Error::from(io::ErrorKind::NotConnected)
} else if e.is_full() {
io::Error::from(io::ErrorKind::WouldBlock)
} else {
io::Error::new(io::ErrorKind::Other, e)
}
}
#[cfg(test)]
#[cfg(feature = "tokio1")]
mod tests {