diff --git a/bincode-transport/src/compat.rs b/bincode-transport/src/compat.rs index f70348c..8a30593 100644 --- a/bincode-transport/src/compat.rs +++ b/bincode-transport/src/compat.rs @@ -1,150 +1,17 @@ -use futures::{compat::Stream01CompatExt, prelude::*, ready}; -use futures_legacy::{ - executor::{ - self as executor01, Notify as Notify01, NotifyHandle as NotifyHandle01, - UnsafeNotify as UnsafeNotify01, - }, - Async as Async01, AsyncSink as AsyncSink01, Sink as Sink01, Stream as Stream01, -}; -use std::{ - pin::Pin, - task::{self, Poll, Waker}, -}; +use futures::compat::*; +use futures_legacy::task::Spawn as Spawn01; -/// A shim to convert a 0.1 Sink + Stream to a 0.3 Sink + Stream. -#[derive(Debug)] -pub struct Compat { - staged_item: Option, - inner: S, +#[allow(dead_code)] +struct Compat01As03SinkExposed { + inner: Spawn01, + buffer: Option, + close_started: bool, } -impl Compat { - /// Returns a new Compat. - pub fn new(inner: S) -> Self { - Compat { - inner, - staged_item: None, - } - } - - /// Unwraps Compat, returning the inner value. - pub fn into_inner(self) -> S { - self.inner - } - - /// Returns a reference to the value wrapped by Compat. - pub fn get_ref(&self) -> &S { - &self.inner - } -} - -impl Stream for Compat +pub fn exposed_compat_exec(input: &Compat01As03Sink, f: F) -> T where - S: Stream01, + F: FnOnce(&S) -> T, { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - unsafe { - let inner = &mut Pin::get_unchecked_mut(self).inner; - let mut compat = inner.compat(); - let compat = Pin::new_unchecked(&mut compat); - match ready!(compat.poll_next(waker)) { - None => Poll::Ready(None), - Some(Ok(next)) => Poll::Ready(Some(Ok(next))), - Some(Err(e)) => Poll::Ready(Some(Err(e))), - } - } - } -} - -impl Sink for Compat -where - S: Sink01, -{ - type SinkItem = SinkItem; - type SinkError = S::SinkError; - - fn start_send(self: Pin<&mut Self>, item: SinkItem) -> Result<(), S::SinkError> { - let me = unsafe { Pin::get_unchecked_mut(self) }; - assert!(me.staged_item.is_none()); - me.staged_item = Some(item); - Ok(()) - } - - fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - let notify = &WakerToHandle(waker); - - executor01::with_notify(notify, 0, move || { - let me = unsafe { Pin::get_unchecked_mut(self) }; - match me.staged_item.take() { - Some(staged_item) => match me.inner.start_send(staged_item) { - Ok(AsyncSink01::Ready) => Poll::Ready(Ok(())), - Ok(AsyncSink01::NotReady(item)) => { - me.staged_item = Some(item); - Poll::Pending - } - Err(e) => Poll::Ready(Err(e)), - }, - None => Poll::Ready(Ok(())), - } - }) - } - - fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - let notify = &WakerToHandle(waker); - - executor01::with_notify(notify, 0, move || { - let me = unsafe { Pin::get_unchecked_mut(self) }; - match me.inner.poll_complete() { - Ok(Async01::Ready(())) => Poll::Ready(Ok(())), - Ok(Async01::NotReady) => Poll::Pending, - Err(e) => Poll::Ready(Err(e)), - } - }) - } - - fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - let notify = &WakerToHandle(waker); - - executor01::with_notify(notify, 0, move || { - let me = unsafe { Pin::get_unchecked_mut(self) }; - match me.inner.close() { - Ok(Async01::Ready(())) => Poll::Ready(Ok(())), - Ok(Async01::NotReady) => Poll::Pending, - Err(e) => Poll::Ready(Err(e)), - } - }) - } -} - -#[derive(Clone, Debug)] -struct WakerToHandle<'a>(&'a Waker); - -#[derive(Debug)] -struct NotifyWaker(task::Waker); - -impl Notify01 for NotifyWaker { - fn notify(&self, _: usize) { - self.0.wake(); - } -} - -unsafe impl UnsafeNotify01 for NotifyWaker { - unsafe fn clone_raw(&self) -> NotifyHandle01 { - let ptr = Box::new(NotifyWaker(self.0.clone())); - - NotifyHandle01::new(Box::into_raw(ptr)) - } - - unsafe fn drop_raw(&self) { - let ptr: *const dyn UnsafeNotify01 = self; - drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01)); - } -} - -impl<'a> From> for NotifyHandle01 { - fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 { - unsafe { NotifyWaker(handle.0.clone()).clone_raw() } - } + let exposed = unsafe { std::mem::transmute::<_, &Compat01As03SinkExposed>(input) }; + f(exposed.inner.get_ref()) } diff --git a/bincode-transport/src/lib.rs b/bincode-transport/src/lib.rs index 37230dd..6a49fd0 100644 --- a/bincode-transport/src/lib.rs +++ b/bincode-transport/src/lib.rs @@ -9,13 +9,8 @@ #![feature(futures_api, arbitrary_self_types, await_macro, async_await)] #![deny(missing_docs, missing_debug_implementations)] -use self::compat::Compat; use async_bincode::{AsyncBincodeStream, AsyncDestination}; -use futures::{ - compat::{Compat01As03, Future01CompatExt, Stream01CompatExt}, - prelude::*, - ready, -}; +use futures::{compat::*, prelude::*, ready}; use pin_utils::unsafe_pinned; use serde::{Deserialize, Serialize}; use std::{ @@ -34,19 +29,12 @@ mod compat; /// A transport that serializes to, and deserializes from, a [`TcpStream`]. #[derive(Debug)] pub struct Transport { - inner: Compat, SinkItem>, -} - -impl Transport { - /// Returns the transport underlying the bincode transport. - pub fn into_inner(self) -> S { - self.inner.into_inner().into_inner() - } + inner: Compat01As03Sink, SinkItem>, } impl Transport { unsafe_pinned!( - inner: Compat, SinkItem> + inner: Compat01As03Sink, SinkItem> ); } @@ -113,11 +101,11 @@ where type SinkItem = SinkItem; fn peer_addr(&self) -> io::Result { - self.inner.get_ref().get_ref().peer_addr() + compat::exposed_compat_exec(&self.inner, |conn| conn.get_ref().peer_addr()) } fn local_addr(&self) -> io::Result { - self.inner.get_ref().get_ref().local_addr() + compat::exposed_compat_exec(&self.inner, |conn| conn.get_ref().local_addr()) } } @@ -133,7 +121,7 @@ where impl From for Transport { fn from(inner: S) -> Self { Transport { - inner: Compat::new(AsyncBincodeStream::from(inner).for_async()), + inner: Compat01As03Sink::new(AsyncBincodeStream::from(inner).for_async()), } } }