Use upstream sink compat shims

This commit is contained in:
Artem Vorotnikov
2019-03-28 18:02:04 +03:00
parent a7fb4d22cc
commit 06c420b60c
2 changed files with 17 additions and 162 deletions

View File

@@ -1,150 +1,17 @@
use futures::{compat::Stream01CompatExt, prelude::*, ready};
use futures_legacy::{
executor::{
self as executor01, Notify as Notify01, NotifyHandle as NotifyHandle01,
UnsafeNotify as UnsafeNotify01,
},
Async as Async01, AsyncSink as AsyncSink01, Sink as Sink01, Stream as Stream01,
};
use std::{
pin::Pin,
task::{self, Poll, Waker},
};
use futures::compat::*;
use futures_legacy::task::Spawn as Spawn01;
/// A shim to convert a 0.1 Sink + Stream to a 0.3 Sink + Stream.
#[derive(Debug)]
pub struct Compat<S, SinkItem> {
staged_item: Option<SinkItem>,
inner: S,
#[allow(dead_code)]
struct Compat01As03SinkExposed<S, SinkItem> {
inner: Spawn01<S>,
buffer: Option<SinkItem>,
close_started: bool,
}
impl<S, SinkItem> Compat<S, SinkItem> {
/// Returns a new Compat.
pub fn new(inner: S) -> Self {
Compat {
inner,
staged_item: None,
}
}
/// Unwraps Compat, returning the inner value.
pub fn into_inner(self) -> S {
self.inner
}
/// Returns a reference to the value wrapped by Compat.
pub fn get_ref(&self) -> &S {
&self.inner
}
}
impl<S, SinkItem> Stream for Compat<S, SinkItem>
pub fn exposed_compat_exec<S, SinkItem, F, T>(input: &Compat01As03Sink<S, SinkItem>, f: F) -> T
where
S: Stream01,
F: FnOnce(&S) -> T,
{
type Item = Result<S::Item, S::Error>;
fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<Self::Item>> {
unsafe {
let inner = &mut Pin::get_unchecked_mut(self).inner;
let mut compat = inner.compat();
let compat = Pin::new_unchecked(&mut compat);
match ready!(compat.poll_next(waker)) {
None => Poll::Ready(None),
Some(Ok(next)) => Poll::Ready(Some(Ok(next))),
Some(Err(e)) => Poll::Ready(Some(Err(e))),
}
}
}
}
impl<S, SinkItem> Sink for Compat<S, SinkItem>
where
S: Sink01<SinkItem = SinkItem>,
{
type SinkItem = SinkItem;
type SinkError = S::SinkError;
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> Result<(), S::SinkError> {
let me = unsafe { Pin::get_unchecked_mut(self) };
assert!(me.staged_item.is_none());
me.staged_item = Some(item);
Ok(())
}
fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
let notify = &WakerToHandle(waker);
executor01::with_notify(notify, 0, move || {
let me = unsafe { Pin::get_unchecked_mut(self) };
match me.staged_item.take() {
Some(staged_item) => match me.inner.start_send(staged_item) {
Ok(AsyncSink01::Ready) => Poll::Ready(Ok(())),
Ok(AsyncSink01::NotReady(item)) => {
me.staged_item = Some(item);
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
},
None => Poll::Ready(Ok(())),
}
})
}
fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
let notify = &WakerToHandle(waker);
executor01::with_notify(notify, 0, move || {
let me = unsafe { Pin::get_unchecked_mut(self) };
match me.inner.poll_complete() {
Ok(Async01::Ready(())) => Poll::Ready(Ok(())),
Ok(Async01::NotReady) => Poll::Pending,
Err(e) => Poll::Ready(Err(e)),
}
})
}
fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
let notify = &WakerToHandle(waker);
executor01::with_notify(notify, 0, move || {
let me = unsafe { Pin::get_unchecked_mut(self) };
match me.inner.close() {
Ok(Async01::Ready(())) => Poll::Ready(Ok(())),
Ok(Async01::NotReady) => Poll::Pending,
Err(e) => Poll::Ready(Err(e)),
}
})
}
}
#[derive(Clone, Debug)]
struct WakerToHandle<'a>(&'a Waker);
#[derive(Debug)]
struct NotifyWaker(task::Waker);
impl Notify01 for NotifyWaker {
fn notify(&self, _: usize) {
self.0.wake();
}
}
unsafe impl UnsafeNotify01 for NotifyWaker {
unsafe fn clone_raw(&self) -> NotifyHandle01 {
let ptr = Box::new(NotifyWaker(self.0.clone()));
NotifyHandle01::new(Box::into_raw(ptr))
}
unsafe fn drop_raw(&self) {
let ptr: *const dyn UnsafeNotify01 = self;
drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
}
}
impl<'a> From<WakerToHandle<'a>> for NotifyHandle01 {
fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 {
unsafe { NotifyWaker(handle.0.clone()).clone_raw() }
}
let exposed = unsafe { std::mem::transmute::<_, &Compat01As03SinkExposed<S, SinkItem>>(input) };
f(exposed.inner.get_ref())
}

View File

@@ -9,13 +9,8 @@
#![feature(futures_api, arbitrary_self_types, await_macro, async_await)]
#![deny(missing_docs, missing_debug_implementations)]
use self::compat::Compat;
use async_bincode::{AsyncBincodeStream, AsyncDestination};
use futures::{
compat::{Compat01As03, Future01CompatExt, Stream01CompatExt},
prelude::*,
ready,
};
use futures::{compat::*, prelude::*, ready};
use pin_utils::unsafe_pinned;
use serde::{Deserialize, Serialize};
use std::{
@@ -34,19 +29,12 @@ mod compat;
/// A transport that serializes to, and deserializes from, a [`TcpStream`].
#[derive(Debug)]
pub struct Transport<S, Item, SinkItem> {
inner: Compat<AsyncBincodeStream<S, Item, SinkItem, AsyncDestination>, SinkItem>,
}
impl<S, Item, SinkItem> Transport<S, Item, SinkItem> {
/// Returns the transport underlying the bincode transport.
pub fn into_inner(self) -> S {
self.inner.into_inner().into_inner()
}
inner: Compat01As03Sink<AsyncBincodeStream<S, Item, SinkItem, AsyncDestination>, SinkItem>,
}
impl<S, Item, SinkItem> Transport<S, Item, SinkItem> {
unsafe_pinned!(
inner: Compat<AsyncBincodeStream<S, Item, SinkItem, AsyncDestination>, SinkItem>
inner: Compat01As03Sink<AsyncBincodeStream<S, Item, SinkItem, AsyncDestination>, SinkItem>
);
}
@@ -113,11 +101,11 @@ where
type SinkItem = SinkItem;
fn peer_addr(&self) -> io::Result<SocketAddr> {
self.inner.get_ref().get_ref().peer_addr()
compat::exposed_compat_exec(&self.inner, |conn| conn.get_ref().peer_addr())
}
fn local_addr(&self) -> io::Result<SocketAddr> {
self.inner.get_ref().get_ref().local_addr()
compat::exposed_compat_exec(&self.inner, |conn| conn.get_ref().local_addr())
}
}
@@ -133,7 +121,7 @@ where
impl<S, Item, SinkItem> From<S> for Transport<S, Item, SinkItem> {
fn from(inner: S) -> Self {
Transport {
inner: Compat::new(AsyncBincodeStream::from(inner).for_async()),
inner: Compat01As03Sink::new(AsyncBincodeStream::from(inner).for_async()),
}
}
}