diff --git a/rpc/src/transport/mod.rs b/rpc/src/transport/mod.rs index 41a8fe6..4b1e147 100644 --- a/rpc/src/transport/mod.rs +++ b/rpc/src/transport/mod.rs @@ -10,7 +10,7 @@ //! can be plugged in, using whatever protocol it wants. use futures::prelude::*; -use std::{io, net::SocketAddr}; +use std::{io, net::SocketAddr, pin::Pin, task::{Poll, LocalWaker}}; pub mod channel; @@ -30,3 +30,81 @@ where /// The address of the local half of this transport. fn local_addr(&self) -> io::Result; } + +/// Returns a new Transport backed by the given Stream + Sink and connecting addresses. +pub fn new(inner: S, peer_addr: SocketAddr, local_addr: SocketAddr) -> impl Transport +where + S: Stream>, + S: Sink, +{ + TransportShim { inner, peer_addr, local_addr } +} + +/// A transport created by adding peers to a Stream + Sink. +#[derive(Debug)] +struct TransportShim { + peer_addr: SocketAddr, + local_addr: SocketAddr, + inner: S, +} + + +impl TransportShim { + pin_utils::unsafe_pinned!(inner: S); + +} + +impl Stream for TransportShim +where + S: Stream, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + self.inner().poll_next(waker) + } +} + +impl Sink for TransportShim +where + S: Sink +{ + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(mut self: Pin<&mut Self>, item: S::SinkItem) -> Result<(), S::SinkError> { + self.inner().start_send(item) + } + + fn poll_ready(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + self.inner().poll_ready(waker) + } + + fn poll_flush(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + self.inner().poll_flush(waker) + } + + fn poll_close(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + self.inner().poll_close(waker) + } +} + +impl Transport for TransportShim +where + S: Stream + Sink, + Self: Stream>, + Self: Sink, +{ + type Item = Item; + type SinkItem = S::SinkItem; + + /// The address of the remote peer this transport is in communication with. + fn peer_addr(&self) -> io::Result { + Ok(self.peer_addr) + } + + /// The address of the local half of this transport. + fn local_addr(&self) -> io::Result { + Ok(self.local_addr) + } +}