diff --git a/bincode-transport/Cargo.toml b/bincode-transport/Cargo.toml index 94a9ffb..916dbbf 100644 --- a/bincode-transport/Cargo.toml +++ b/bincode-transport/Cargo.toml @@ -14,6 +14,7 @@ description = "A bincode-based transport for tarpc services." [dependencies] bincode = "1" +futures-preview = { version = "0.3.0-alpha.14", features = ["compat"] } futures_legacy = { version = "0.1", package = "futures" } pin-utils = "0.1.0-alpha.4" rpc = { package = "tarpc-lib", version = "0.3", path = "../rpc", features = ["serde1"] } @@ -22,11 +23,7 @@ tokio-io = "0.1" async-bincode = "0.4" tokio-tcp = "0.1" -[target.'cfg(not(test))'.dependencies] -futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] } - [dev-dependencies] -futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] } env_logger = "0.6" humantime = "1.0" libtest = "0.0.1" diff --git a/bincode-transport/src/compat.rs b/bincode-transport/src/compat.rs deleted file mode 100644 index f70348c..0000000 --- a/bincode-transport/src/compat.rs +++ /dev/null @@ -1,150 +0,0 @@ -use futures::{compat::Stream01CompatExt, prelude::*, ready}; -use futures_legacy::{ - executor::{ - self as executor01, Notify as Notify01, NotifyHandle as NotifyHandle01, - UnsafeNotify as UnsafeNotify01, - }, - Async as Async01, AsyncSink as AsyncSink01, Sink as Sink01, Stream as Stream01, -}; -use std::{ - pin::Pin, - task::{self, Poll, Waker}, -}; - -/// A shim to convert a 0.1 Sink + Stream to a 0.3 Sink + Stream. -#[derive(Debug)] -pub struct Compat { - staged_item: Option, - inner: S, -} - -impl Compat { - /// Returns a new Compat. - pub fn new(inner: S) -> Self { - Compat { - inner, - staged_item: None, - } - } - - /// Unwraps Compat, returning the inner value. - pub fn into_inner(self) -> S { - self.inner - } - - /// Returns a reference to the value wrapped by Compat. - pub fn get_ref(&self) -> &S { - &self.inner - } -} - -impl Stream for Compat -where - S: Stream01, -{ - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - unsafe { - let inner = &mut Pin::get_unchecked_mut(self).inner; - let mut compat = inner.compat(); - let compat = Pin::new_unchecked(&mut compat); - match ready!(compat.poll_next(waker)) { - None => Poll::Ready(None), - Some(Ok(next)) => Poll::Ready(Some(Ok(next))), - Some(Err(e)) => Poll::Ready(Some(Err(e))), - } - } - } -} - -impl Sink for Compat -where - S: Sink01, -{ - type SinkItem = SinkItem; - type SinkError = S::SinkError; - - fn start_send(self: Pin<&mut Self>, item: SinkItem) -> Result<(), S::SinkError> { - let me = unsafe { Pin::get_unchecked_mut(self) }; - assert!(me.staged_item.is_none()); - me.staged_item = Some(item); - Ok(()) - } - - fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - let notify = &WakerToHandle(waker); - - executor01::with_notify(notify, 0, move || { - let me = unsafe { Pin::get_unchecked_mut(self) }; - match me.staged_item.take() { - Some(staged_item) => match me.inner.start_send(staged_item) { - Ok(AsyncSink01::Ready) => Poll::Ready(Ok(())), - Ok(AsyncSink01::NotReady(item)) => { - me.staged_item = Some(item); - Poll::Pending - } - Err(e) => Poll::Ready(Err(e)), - }, - None => Poll::Ready(Ok(())), - } - }) - } - - fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - let notify = &WakerToHandle(waker); - - executor01::with_notify(notify, 0, move || { - let me = unsafe { Pin::get_unchecked_mut(self) }; - match me.inner.poll_complete() { - Ok(Async01::Ready(())) => Poll::Ready(Ok(())), - Ok(Async01::NotReady) => Poll::Pending, - Err(e) => Poll::Ready(Err(e)), - } - }) - } - - fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - let notify = &WakerToHandle(waker); - - executor01::with_notify(notify, 0, move || { - let me = unsafe { Pin::get_unchecked_mut(self) }; - match me.inner.close() { - Ok(Async01::Ready(())) => Poll::Ready(Ok(())), - Ok(Async01::NotReady) => Poll::Pending, - Err(e) => Poll::Ready(Err(e)), - } - }) - } -} - -#[derive(Clone, Debug)] -struct WakerToHandle<'a>(&'a Waker); - -#[derive(Debug)] -struct NotifyWaker(task::Waker); - -impl Notify01 for NotifyWaker { - fn notify(&self, _: usize) { - self.0.wake(); - } -} - -unsafe impl UnsafeNotify01 for NotifyWaker { - unsafe fn clone_raw(&self) -> NotifyHandle01 { - let ptr = Box::new(NotifyWaker(self.0.clone())); - - NotifyHandle01::new(Box::into_raw(ptr)) - } - - unsafe fn drop_raw(&self) { - let ptr: *const dyn UnsafeNotify01 = self; - drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01)); - } -} - -impl<'a> From> for NotifyHandle01 { - fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 { - unsafe { NotifyWaker(handle.0.clone()).clone_raw() } - } -} diff --git a/bincode-transport/src/lib.rs b/bincode-transport/src/lib.rs index 37230dd..cf16dd3 100644 --- a/bincode-transport/src/lib.rs +++ b/bincode-transport/src/lib.rs @@ -9,13 +9,8 @@ #![feature(futures_api, arbitrary_self_types, await_macro, async_await)] #![deny(missing_docs, missing_debug_implementations)] -use self::compat::Compat; use async_bincode::{AsyncBincodeStream, AsyncDestination}; -use futures::{ - compat::{Compat01As03, Future01CompatExt, Stream01CompatExt}, - prelude::*, - ready, -}; +use futures::{compat::*, prelude::*, ready}; use pin_utils::unsafe_pinned; use serde::{Deserialize, Serialize}; use std::{ @@ -24,29 +19,20 @@ use std::{ marker::PhantomData, net::SocketAddr, pin::Pin, - task::{Poll, Waker}, + task::{Context, Poll}, }; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_tcp::{TcpListener, TcpStream}; -mod compat; - /// A transport that serializes to, and deserializes from, a [`TcpStream`]. #[derive(Debug)] pub struct Transport { - inner: Compat, SinkItem>, -} - -impl Transport { - /// Returns the transport underlying the bincode transport. - pub fn into_inner(self) -> S { - self.inner.into_inner().into_inner() - } + inner: Compat01As03Sink, SinkItem>, } impl Transport { unsafe_pinned!( - inner: Compat, SinkItem> + inner: Compat01As03Sink, SinkItem> ); } @@ -57,8 +43,8 @@ where { type Item = io::Result; - fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll>> { - match self.inner().poll_next(waker) { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + match self.inner().poll_next(cx) { Poll::Pending => Poll::Pending, Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(Ok(next))) => Poll::Ready(Some(Ok(next))), @@ -69,12 +55,11 @@ where } } -impl Sink for Transport +impl Sink for Transport where S: AsyncWrite, SinkItem: Serialize, { - type SinkItem = SinkItem; type SinkError = io::Error; fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> { @@ -83,16 +68,16 @@ where .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } - fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - convert(self.inner().poll_ready(waker)) + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + convert(self.inner().poll_ready(cx)) } - fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - convert(self.inner().poll_flush(waker)) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + convert(self.inner().poll_flush(cx)) } - fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - convert(self.inner().poll_close(waker)) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + convert(self.inner().poll_close(cx)) } } @@ -133,7 +118,7 @@ where impl From for Transport { fn from(inner: S) -> Self { Transport { - inner: Compat::new(AsyncBincodeStream::from(inner).for_async()), + inner: Compat01As03Sink::new(AsyncBincodeStream::from(inner).for_async()), } } } @@ -189,8 +174,8 @@ where { type Item = io::Result>; - fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - let next = ready!(self.incoming().poll_next(waker)?); + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let next = ready!(self.incoming().poll_next(cx)?); Poll::Ready(next.map(|conn| Ok(new(conn)))) } } diff --git a/bincode-transport/tests/cancel.rs b/bincode-transport/tests/cancel.rs index fd891ef..c574fbb 100644 --- a/bincode-transport/tests/cancel.rs +++ b/bincode-transport/tests/cancel.rs @@ -11,7 +11,7 @@ use futures::{ compat::{Executor01CompatExt, Future01CompatExt}, prelude::*, - stream, + stream::FuturesUnordered, }; use log::{info, trace}; use rand::distributions::{Distribution, Normal}; @@ -126,7 +126,10 @@ async fn run() -> io::Result<()> { let response = client.call(ctx, "ping".into()); requests.push(response.map(move |r| (trace_id, r))); } - let (fastest_response, _) = await!(stream::futures_unordered(requests).into_future()); + let (fastest_response, _) = await!(requests + .into_iter() + .collect::>() + .into_future()); let (trace_id, resp) = fastest_response.unwrap(); info!("[{}] fastest_response = {:?}", trace_id, resp); diff --git a/example-service/Cargo.toml b/example-service/Cargo.toml index 92fc4f9..bf5f8b8 100644 --- a/example-service/Cargo.toml +++ b/example-service/Cargo.toml @@ -15,7 +15,7 @@ description = "An example server built on tarpc." [dependencies] bincode-transport = { package = "tarpc-bincode-transport", version = "0.4", path = "../bincode-transport" } clap = "2.0" -futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] } +futures-preview = { version = "0.3.0-alpha.14", features = ["compat"] } serde = { version = "1.0" } tarpc = { version = "0.15", path = "../tarpc", features = ["serde1"] } tokio = "0.1" diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 2c4f568..a68f7fb 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -18,6 +18,7 @@ serde1 = ["trace/serde", "serde", "serde/derive"] [dependencies] fnv = "1.0" +futures-preview = { version = "0.3.0-alpha.14", features = ["compat"] } humantime = "1.0" log = "0.4" pin-utils = "0.1.0-alpha.4" @@ -26,11 +27,7 @@ tokio-timer = "0.2" trace = { package = "tarpc-trace", version = "0.2", path = "../trace" } serde = { optional = true, version = "1.0" } -[target.'cfg(not(test))'.dependencies] -futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] } - [dev-dependencies] -futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] } -futures-test-preview = { version = "0.3.0-alpha.13" } +futures-test-preview = { version = "0.3.0-alpha.14" } env_logger = "0.6" tokio = "0.1" diff --git a/rpc/src/client/channel.rs b/rpc/src/client/channel.rs index 6b75a18..0dbc572 100644 --- a/rpc/src/client/channel.rs +++ b/rpc/src/client/channel.rs @@ -15,7 +15,7 @@ use futures::{ prelude::*, ready, stream::Fuse, - task::Waker, + task::Context, Poll, }; use humantime::format_rfc3339; @@ -65,14 +65,19 @@ struct Send<'a, Req, Resp> { fut: MapOkDispatchResponse, Resp>, } -type SendMapErrConnectionReset<'a, Req, Resp> = - MapErrConnectionReset>>>; +type SendMapErrConnectionReset<'a, Req, Resp> = MapErrConnectionReset< + futures::sink::Send<'a, mpsc::Sender>, DispatchRequest>, +>; impl<'a, Req, Resp> Send<'a, Req, Resp> { unsafe_pinned!( fut: MapOkDispatchResponse< MapErrConnectionReset< - futures::sink::Send<'a, mpsc::Sender>>, + futures::sink::Send< + 'a, + mpsc::Sender>, + DispatchRequest, + >, >, Resp, > @@ -82,8 +87,8 @@ impl<'a, Req, Resp> Send<'a, Req, Resp> { impl<'a, Req, Resp> Future for Send<'a, Req, Resp> { type Output = io::Result>; - fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll { - self.as_mut().fut().poll(waker) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.as_mut().fut().poll(cx) } } @@ -101,8 +106,8 @@ impl<'a, Req, Resp> Call<'a, Req, Resp> { impl<'a, Req, Resp> Future for Call<'a, Req, Resp> { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll { - self.as_mut().fut().poll(waker) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.as_mut().fut().poll(cx) } } @@ -177,8 +182,8 @@ impl DispatchResponse { impl Future for DispatchResponse { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll> { - let resp = ready!(self.response.poll_unpin(waker)); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let resp = ready!(self.response.poll_unpin(cx)); self.complete = true; @@ -318,8 +323,8 @@ where unsafe_pinned!(pending_requests: Fuse>>); unsafe_pinned!(transport: Fuse); - fn pump_read(self: &mut Pin<&mut Self>, waker: &Waker) -> PollIo<()> { - Poll::Ready(match ready!(self.as_mut().transport().poll_next(waker)?) { + fn pump_read(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> { + Poll::Ready(match ready!(self.as_mut().transport().poll_next(cx)?) { Some(response) => { self.complete(response); Some(Ok(())) @@ -331,13 +336,13 @@ where }) } - fn pump_write(self: &mut Pin<&mut Self>, waker: &Waker) -> PollIo<()> { + fn pump_write(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> { enum ReceiverStatus { NotReady, Closed, } - let pending_requests_status = match self.poll_next_request(waker)? { + let pending_requests_status = match self.poll_next_request(cx)? { Poll::Ready(Some(dispatch_request)) => { self.write_request(dispatch_request)?; return Poll::Ready(Some(Ok(()))); @@ -346,7 +351,7 @@ where Poll::Pending => ReceiverStatus::NotReady, }; - let canceled_requests_status = match self.poll_next_cancellation(waker)? { + let canceled_requests_status = match self.poll_next_cancellation(cx)? { Poll::Ready(Some((context, request_id))) => { self.write_cancel(context, request_id)?; return Poll::Ready(Some(Ok(()))); @@ -357,12 +362,12 @@ where match (pending_requests_status, canceled_requests_status) { (ReceiverStatus::Closed, ReceiverStatus::Closed) => { - ready!(self.as_mut().transport().poll_flush(waker)?); + ready!(self.as_mut().transport().poll_flush(cx)?); Poll::Ready(None) } (ReceiverStatus::NotReady, _) | (_, ReceiverStatus::NotReady) => { // No more messages to process, so flush any messages buffered in the transport. - ready!(self.as_mut().transport().poll_flush(waker)?); + ready!(self.as_mut().transport().poll_flush(cx)?); // Even if we fully-flush, we return Pending, because we have no more requests // or cancellations right now. @@ -374,7 +379,7 @@ where /// Yields the next pending request, if one is ready to be sent. fn poll_next_request( self: &mut Pin<&mut Self>, - waker: &Waker, + cx: &mut Context<'_>, ) -> PollIo> { if self.as_mut().in_flight_requests().len() >= self.config.max_in_flight_requests { info!( @@ -388,13 +393,13 @@ where return Poll::Pending; } - while let Poll::Pending = self.as_mut().transport().poll_ready(waker)? { + while let Poll::Pending = self.as_mut().transport().poll_ready(cx)? { // We can't yield a request-to-be-sent before the transport is capable of buffering it. - ready!(self.as_mut().transport().poll_flush(waker)?); + ready!(self.as_mut().transport().poll_flush(cx)?); } loop { - match ready!(self.as_mut().pending_requests().poll_next_unpin(waker)) { + match ready!(self.as_mut().pending_requests().poll_next_unpin(cx)) { Some(request) => { if request.response_completion.is_canceled() { trace!( @@ -417,14 +422,14 @@ where /// Yields the next pending cancellation, and, if one is ready, cancels the associated request. fn poll_next_cancellation( self: &mut Pin<&mut Self>, - waker: &Waker, + cx: &mut Context<'_>, ) -> PollIo<(context::Context, u64)> { - while let Poll::Pending = self.as_mut().transport().poll_ready(waker)? { - ready!(self.as_mut().transport().poll_flush(waker)?); + while let Poll::Pending = self.as_mut().transport().poll_ready(cx)? { + ready!(self.as_mut().transport().poll_flush(cx)?); } loop { - let cancellation = self.as_mut().canceled_requests().poll_next_unpin(waker); + let cancellation = self.as_mut().canceled_requests().poll_next_unpin(cx); match ready!(cancellation) { Some(request_id) => { if let Some(in_flight_data) = @@ -532,10 +537,10 @@ where { type Output = io::Result<()>; - fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { trace!("[{}] RequestDispatch::poll", self.as_mut().server_addr()); loop { - match (self.pump_read(waker)?, self.pump_write(waker)?) { + match (self.pump_read(cx)?, self.pump_write(cx)?) { (read, write @ Poll::Ready(None)) => { if self.as_mut().in_flight_requests().is_empty() { info!( @@ -622,8 +627,8 @@ impl RequestCancellation { impl Stream for CanceledRequests { type Item = u64; - fn poll_next(mut self: Pin<&mut Self>, waker: &Waker) -> Poll> { - self.0.poll_next_unpin(waker) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.0.poll_next_unpin(cx) } } @@ -654,8 +659,8 @@ where { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll { - match self.as_mut().future().try_poll(waker) { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.as_mut().future().try_poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(result) => { self.finished().take().expect( @@ -694,8 +699,8 @@ where { type Output = Result, Fut::Error>; - fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll { - match self.as_mut().future().try_poll(waker) { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.as_mut().future().try_poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(result) => { let response = self @@ -737,8 +742,8 @@ where { type Output = Result; - fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll { - self.try_chain().poll(waker, |result| match result { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.try_chain().poll(cx, |result| match result { Ok(ok) => TryChainAction::Future(ok), Err(err) => TryChainAction::Output(Err(err)), }) @@ -770,7 +775,11 @@ where TryChain::First(fut1) } - fn poll(self: Pin<&mut Self>, waker: &Waker, f: F) -> Poll> + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + f: F, + ) -> Poll> where F: FnOnce(Result) -> TryChainAction, { @@ -783,14 +792,14 @@ where let output = match this { TryChain::First(fut1) => { // Poll the first future - match unsafe { Pin::new_unchecked(fut1) }.try_poll(waker) { + match unsafe { Pin::new_unchecked(fut1) }.try_poll(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(output) => output, } } TryChain::Second(fut2) => { // Poll the second future - return unsafe { Pin::new_unchecked(fut2) }.try_poll(waker); + return unsafe { Pin::new_unchecked(fut2) }.try_poll(cx); } TryChain::Empty => { panic!("future must not be polled after it returned `Poll::Ready`"); @@ -819,7 +828,7 @@ mod tests { ClientMessage, Response, }; use fnv::FnvHashMap; - use futures::{channel::mpsc, prelude::*, Poll}; + use futures::{channel::mpsc, prelude::*, task::Context, Poll}; use futures_test::task::noop_waker_ref; use std::{ marker, @@ -833,11 +842,11 @@ mod tests { fn stage_request() { let (mut dispatch, mut channel, _server_channel) = set_up(); let mut dispatch = Pin::new(&mut dispatch); - let waker = &noop_waker_ref(); + let cx = &mut Context::from_waker(&noop_waker_ref()); let _resp = send_request(&mut channel, "hi"); - let req = dispatch.poll_next_request(waker).ready(); + let req = dispatch.poll_next_request(cx).ready(); assert!(req.is_some()); let req = req.unwrap(); @@ -850,12 +859,12 @@ mod tests { fn stage_request_channel_dropped_doesnt_panic() { let (mut dispatch, mut channel, mut server_channel) = set_up(); let mut dispatch = Pin::new(&mut dispatch); - let waker = &noop_waker_ref(); + let cx = &mut Context::from_waker(&noop_waker_ref()); let _ = send_request(&mut channel, "hi"); drop(channel); - assert!(dispatch.as_mut().poll(waker).is_ready()); + assert!(dispatch.as_mut().poll(cx).is_ready()); send_response( &mut server_channel, Response { @@ -870,36 +879,36 @@ mod tests { fn stage_request_response_future_dropped_is_canceled_before_sending() { let (mut dispatch, mut channel, _server_channel) = set_up(); let mut dispatch = Pin::new(&mut dispatch); - let waker = &noop_waker_ref(); + let cx = &mut Context::from_waker(&noop_waker_ref()); let _ = send_request(&mut channel, "hi"); - // Drop the channel so polling returns none if no requests are currently ready. drop(channel); // Test that a request future dropped before it's processed by dispatch will cause the request // to not be added to the in-flight request map. - assert!(dispatch.poll_next_request(waker).ready().is_none()); + assert!(dispatch.poll_next_request(cx).ready().is_none()); } #[test] fn stage_request_response_future_dropped_is_canceled_after_sending() { let (mut dispatch, mut channel, _server_channel) = set_up(); - let waker = &noop_waker_ref(); + let cx = &mut Context::from_waker(&noop_waker_ref()); let mut dispatch = Pin::new(&mut dispatch); let req = send_request(&mut channel, "hi"); - assert!(dispatch.as_mut().pump_write(waker).ready().is_some()); + assert!(dispatch.as_mut().pump_write(cx).ready().is_some()); assert!(!dispatch.as_mut().in_flight_requests().is_empty()); - // Test that a request future dropped after it's processed by dispatch will cause the request // to be removed from the in-flight request map. drop(req); - if let Poll::Ready(Some(_)) = dispatch.as_mut().poll_next_cancellation(waker).unwrap() { + if let Poll::Ready(Some(_)) = dispatch.as_mut().poll_next_cancellation(cx).unwrap() { // ok - } else { panic!("Expected request to be cancelled")}; + } else { + panic!("Expected request to be cancelled") + }; assert!(dispatch.in_flight_requests().is_empty()); } @@ -907,7 +916,7 @@ mod tests { fn stage_request_response_closed_skipped() { let (mut dispatch, mut channel, _server_channel) = set_up(); let mut dispatch = Pin::new(&mut dispatch); - let waker = &noop_waker_ref(); + let cx = &mut Context::from_waker(&noop_waker_ref()); // Test that a request future that's closed its receiver but not yet canceled its request -- // i.e. still in `drop fn` -- will cause the request to not be added to the in-flight request @@ -915,7 +924,7 @@ mod tests { let mut resp = send_request(&mut channel, "hi"); resp.response.get_mut().close(); - assert!(dispatch.poll_next_request(waker).is_pending()); + assert!(dispatch.poll_next_request(cx).is_pending()); } fn set_up() -> ( @@ -958,7 +967,8 @@ mod tests { .send(context::current(), request.to_string()) .boxed() .compat(), - ).unwrap() + ) + .unwrap() } fn send_response( diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index dc87d8c..46ff8b2 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -156,7 +156,7 @@ thread_local! { // INIT must always be called before accessing SPAWN. // Otherwise, accessing SPAWN can trigger undefined behavior due to race conditions. INIT.call_once(|| {}); - RefCell::new(SEED_SPAWN.clone().expect("init() must be called.")) + RefCell::new(SEED_SPAWN.as_ref().expect("init() must be called.").box_clone()) } }; } @@ -182,12 +182,6 @@ trait CloneSpawn: Spawn { fn box_clone(&self) -> Box; } -impl Clone for Box { - fn clone(&self) -> Self { - self.box_clone() - } -} - impl CloneSpawn for S { fn box_clone(&self) -> Box { Box::new(self.clone()) diff --git a/rpc/src/server/filter.rs b/rpc/src/server/filter.rs index 5f01c22..5d24d61 100644 --- a/rpc/src/server/filter.rs +++ b/rpc/src/server/filter.rs @@ -15,7 +15,7 @@ use futures::{ prelude::*, ready, stream::Fuse, - task::{Poll, Waker}, + task::{Context, Poll}, }; use log::{debug, error, info, trace, warn}; use pin_utils::unsafe_pinned; @@ -197,7 +197,10 @@ impl ConnectionFilter { } } - fn poll_listener(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo> + fn poll_listener( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> PollIo> where S: Stream>, C: Transport, SinkItem = Response> + Send, @@ -208,7 +211,10 @@ impl ConnectionFilter { } } - fn poll_closed_connections(self: &mut Pin<&mut Self>, cx: &Waker) -> Poll> { + fn poll_closed_connections( + self: &mut Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { match ready!(self.as_mut().closed_connections_rx().poll_next_unpin(cx)) { Some(addr) => { self.handle_closed_connection(&addr); @@ -226,7 +232,7 @@ where { type Item = io::Result>; - fn poll_next(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo> { loop { match ( self.as_mut().poll_listener(cx)?, diff --git a/rpc/src/server/mod.rs b/rpc/src/server/mod.rs index 76b8aab..9b112e2 100644 --- a/rpc/src/server/mod.rs +++ b/rpc/src/server/mod.rs @@ -7,7 +7,7 @@ //! Provides a server that concurrently handles many connections sending multiplexed requests. use crate::{ - context::Context, util::deadline_compat, util::AsDuration, util::Compact, ClientMessage, + context, util::deadline_compat, util::AsDuration, util::Compact, ClientMessage, ClientMessageKind, PollIo, Request, Response, ServerError, Transport, }; use fnv::FnvHashMap; @@ -17,7 +17,7 @@ use futures::{ prelude::*, ready, stream::Fuse, - task::{Poll, Waker}, + task::{Context, Poll}, try_ready, }; use humantime::format_rfc3339; @@ -128,12 +128,12 @@ where Req: Send + 'static, Resp: Send + 'static, T: Transport, SinkItem = Response> + Send + 'static, - F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone, + F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone, Fut: Future> + Send + 'static, { type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<()> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { while let Some(channel) = ready!(self.as_mut().incoming().poll_next(cx)) { match channel { Ok(channel) => { @@ -165,7 +165,7 @@ where /// Responds to all requests with `request_handler`. fn respond_with(self, request_handler: F) -> Running where - F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone, + F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone, Fut: Future> + Send + 'static, { Running { @@ -234,15 +234,24 @@ where self.as_mut().transport().start_send(response) } - pub(crate) fn poll_ready(mut self: Pin<&mut Self>, cx: &Waker) -> Poll> { + pub(crate) fn poll_ready( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { self.as_mut().transport().poll_ready(cx) } - pub(crate) fn poll_flush(mut self: Pin<&mut Self>, cx: &Waker) -> Poll> { + pub(crate) fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { self.as_mut().transport().poll_flush(cx) } - pub(crate) fn poll_next(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo> { + pub(crate) fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> PollIo> { self.as_mut().transport().poll_next(cx) } @@ -255,7 +264,7 @@ where /// responses and resolves when the connection is closed. pub fn respond_with(self, f: F) -> impl Future where - F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone, + F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone, Fut: Future> + Send + 'static, Req: 'static, Resp: 'static, @@ -281,9 +290,9 @@ where struct ClientHandler { channel: Channel, /// Responses waiting to be written to the wire. - pending_responses: Fuse)>>, + pending_responses: Fuse)>>, /// Handed out to request handlers to fan in responses. - responses_tx: mpsc::Sender<(Context, Response)>, + responses_tx: mpsc::Sender<(context::Context, Response)>, /// Number of requests currently being responded to. in_flight_requests: FnvHashMap, /// Request handler. @@ -293,8 +302,8 @@ struct ClientHandler { impl ClientHandler { unsafe_pinned!(channel: Channel); unsafe_pinned!(in_flight_requests: FnvHashMap); - unsafe_pinned!(pending_responses: Fuse)>>); - unsafe_pinned!(responses_tx: mpsc::Sender<(Context, Response)>); + unsafe_pinned!(pending_responses: Fuse)>>); + unsafe_pinned!(responses_tx: mpsc::Sender<(context::Context, Response)>); // For this to be safe, field f must be private, and code in this module must never // construct PinMut. unsafe_unpinned!(f: F); @@ -305,12 +314,15 @@ where Req: Send + 'static, Resp: Send + 'static, T: Transport, SinkItem = Response> + Send, - F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone, + F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone, Fut: Future> + Send + 'static, { /// If at max in-flight requests, check that there's room to immediately write a throttled /// response. - fn poll_ready_if_throttling(mut self: Pin<&mut Self>, cx: &Waker) -> Poll> { + fn poll_ready_if_throttling( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { if self.in_flight_requests.len() >= self.channel.config.max_in_flight_requests_per_connection { @@ -328,7 +340,7 @@ where Poll::Ready(Ok(())) } - fn pump_read(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<()> { + fn pump_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> { ready!(self.as_mut().poll_ready_if_throttling(cx)?); Poll::Ready(match ready!(self.as_mut().channel().poll_next(cx)?) { @@ -350,7 +362,11 @@ where }) } - fn pump_write(mut self: Pin<&mut Self>, cx: &Waker, read_half_closed: bool) -> PollIo<()> { + fn pump_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + read_half_closed: bool, + ) -> PollIo<()> { match self.as_mut().poll_next_response(cx)? { Poll::Ready(Some((_, response))) => { self.as_mut().channel().start_send(response)?; @@ -379,8 +395,8 @@ where fn poll_next_response( mut self: Pin<&mut Self>, - cx: &Waker, - ) -> PollIo<(Context, Response)> { + cx: &mut Context<'_>, + ) -> PollIo<(context::Context, Response)> { // Ensure there's room to write a response. while let Poll::Pending = self.as_mut().channel().poll_ready(cx)? { ready!(self.as_mut().channel().poll_flush(cx)?); @@ -421,7 +437,7 @@ where ) -> io::Result<()> { let request_id = request.id; let peer = self.as_mut().channel().client_addr; - let ctx = Context { + let ctx = context::Context { deadline: request.deadline, trace_context, }; @@ -527,12 +543,12 @@ where Req: Send + 'static, Resp: Send + 'static, T: Transport, SinkItem = Response> + Send, - F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone, + F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone, Fut: Future> + Send + 'static, { type Output = io::Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &Waker) -> Poll> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { trace!("[{}] ClientHandler::poll", self.channel.client_addr); loop { let read = self.as_mut().pump_read(cx)?; diff --git a/rpc/src/transport/channel.rs b/rpc/src/transport/channel.rs index 592b765..4d14421 100644 --- a/rpc/src/transport/channel.rs +++ b/rpc/src/transport/channel.rs @@ -7,7 +7,7 @@ //! Transports backed by in-memory channels. use crate::{PollIo, Transport}; -use futures::{channel::mpsc, task::Waker, Poll, Sink, Stream}; +use futures::{channel::mpsc, task::Context, Poll, Sink, Stream}; use pin_utils::unsafe_pinned; use std::pin::Pin; use std::{ @@ -45,16 +45,15 @@ impl UnboundedChannel { impl Stream for UnboundedChannel { type Item = Result; - fn poll_next(self: Pin<&mut Self>, cx: &Waker) -> PollIo { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo { self.rx().poll_next(cx).map(|option| option.map(Ok)) } } -impl Sink for UnboundedChannel { - type SinkItem = SinkItem; +impl Sink for UnboundedChannel { type SinkError = io::Error; - fn poll_ready(self: Pin<&mut Self>, cx: &Waker) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.tx() .poll_ready(cx) .map_err(|_| io::Error::from(io::ErrorKind::NotConnected)) @@ -66,13 +65,13 @@ impl Sink for UnboundedChannel { .map_err(|_| io::Error::from(io::ErrorKind::NotConnected)) } - fn poll_flush(self: Pin<&mut Self>, cx: &Waker) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.tx() .poll_flush(cx) .map_err(|_| io::Error::from(io::ErrorKind::NotConnected)) } - fn poll_close(self: Pin<&mut Self>, cx: &Waker) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.tx() .poll_close(cx) .map_err(|_| io::Error::from(io::ErrorKind::NotConnected)) @@ -80,8 +79,8 @@ impl Sink for UnboundedChannel { } impl Transport for UnboundedChannel { - type Item = Item; type SinkItem = SinkItem; + type Item = Item; fn peer_addr(&self) -> io::Result { Ok(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0)) @@ -130,8 +129,11 @@ mod tests { Ok::<_, io::Error>((response1, response2)) }; - let (response1, response2) = - run_future(server.join(responses.unwrap_or_else(|e| panic!(e)))).1; + let (response1, response2) = run_future(future::join( + server, + responses.unwrap_or_else(|e| panic!(e)), + )) + .1; trace!("response1: {:?}, response2: {:?}", response1, response2); diff --git a/rpc/src/transport/mod.rs b/rpc/src/transport/mod.rs index d72ffed..b926450 100644 --- a/rpc/src/transport/mod.rs +++ b/rpc/src/transport/mod.rs @@ -12,9 +12,10 @@ use futures::prelude::*; use std::{ io, + marker::PhantomData, net::SocketAddr, pin::Pin, - task::{Poll, Waker}, + task::{Context, Poll}, }; pub mod channel; @@ -23,7 +24,7 @@ pub mod channel; pub trait Transport where Self: Stream::Item>>, - Self: Sink::SinkItem, SinkError = io::Error>, + Self: Sink<::SinkItem, SinkError = io::Error>, { /// The type read off the transport. type Item; @@ -37,77 +38,78 @@ where } /// Returns a new Transport backed by the given Stream + Sink and connecting addresses. -pub fn new( +pub fn new( inner: S, peer_addr: SocketAddr, local_addr: SocketAddr, -) -> impl Transport +) -> impl Transport where S: Stream>, - S: Sink, + S: Sink, { TransportShim { inner, peer_addr, local_addr, + _marker: PhantomData, } } /// A transport created by adding peers to a Stream + Sink. #[derive(Debug)] -struct TransportShim { +struct TransportShim { peer_addr: SocketAddr, local_addr: SocketAddr, inner: S, + _marker: PhantomData, } -impl TransportShim { +impl TransportShim { pin_utils::unsafe_pinned!(inner: S); } -impl Stream for TransportShim +impl Stream for TransportShim where S: Stream, { type Item = S::Item; - fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - self.inner().poll_next(waker) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner().poll_next(cx) } } -impl Sink for TransportShim +impl Sink for TransportShim where - S: Sink, + S: Sink, { - type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(self: Pin<&mut Self>, item: S::SinkItem) -> Result<(), S::SinkError> { + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), S::SinkError> { self.inner().start_send(item) } - fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - self.inner().poll_ready(waker) + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner().poll_ready(cx) } - fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - self.inner().poll_flush(waker) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner().poll_flush(cx) } - fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - self.inner().poll_close(waker) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner().poll_close(cx) } } -impl Transport for TransportShim +impl Transport for TransportShim where - S: Stream + Sink, + S: Stream + Sink, Self: Stream>, - Self: Sink, + Self: Sink, { type Item = Item; - type SinkItem = S::SinkItem; + type SinkItem = SinkItem; /// The address of the remote peer this transport is in communication with. fn peer_addr(&self) -> io::Result { diff --git a/rpc/src/util/deadline_compat.rs b/rpc/src/util/deadline_compat.rs index 2a44fbc..c91f20d 100644 --- a/rpc/src/util/deadline_compat.rs +++ b/rpc/src/util/deadline_compat.rs @@ -5,10 +5,10 @@ // https://opensource.org/licenses/MIT. use futures::{ - compat::{Compat01As03, Future01CompatExt}, + compat::*, prelude::*, ready, - task::{Poll, Waker}, + task::{Context, Poll}, }; use pin_utils::unsafe_pinned; use std::pin::Pin; @@ -50,15 +50,15 @@ where { type Output = Result>; - fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // First, try polling the future - match self.as_mut().future().try_poll(waker) { + match self.as_mut().future().try_poll(cx) { Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)), Poll::Pending => {} Poll::Ready(Err(e)) => return Poll::Ready(Err(timeout::Error::inner(e))), } - let delay = self.delay().poll_unpin(waker); + let delay = self.delay().poll_unpin(cx); // Now check the timer match ready!(delay) { diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index 7392079..9c24fe7 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -19,19 +19,16 @@ serde1 = ["rpc/serde1", "serde", "serde/derive"] travis-ci = { repository = "google/tarpc" } [dependencies] +futures-preview = { version = "0.3.0-alpha.14", features = ["compat"] } log = "0.4" serde = { optional = true, version = "1.0" } -tarpc-plugins = { path = "../plugins", version = "0.5.0" } rpc = { package = "tarpc-lib", path = "../rpc", version = "0.3" } - -[target.'cfg(not(test))'.dependencies] -futures-preview = "0.3.0-alpha.13" +tarpc-plugins = { path = "../plugins", version = "0.5.0" } [dev-dependencies] bincode = "1" bytes = { version = "0.4", features = ["serde"] } humantime = "1.0" -futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] } bincode-transport = { package = "tarpc-bincode-transport", version = "0.4", path = "../bincode-transport" } env_logger = "0.6" libtest = "0.0.1" diff --git a/tarpc/examples/service_registry.rs b/tarpc/examples/service_registry.rs index 97f97a3..59fa61b 100644 --- a/tarpc/examples/service_registry.rs +++ b/tarpc/examples/service_registry.rs @@ -18,7 +18,7 @@ mod registry { io, pin::Pin, sync::Arc, - task::{Poll, Waker}, + task::{Context, Poll}, }; use tarpc::{ client::{self, Client}, @@ -213,11 +213,11 @@ mod registry { { type Output = Output; - fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { unsafe { match Pin::get_unchecked_mut(self) { - Either::Left(car) => Pin::new_unchecked(car).poll(waker), - Either::Right(cdr) => Pin::new_unchecked(cdr).poll(waker), + Either::Left(car) => Pin::new_unchecked(car).poll(cx), + Either::Right(cdr) => Pin::new_unchecked(cdr).poll(cx), } } } diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index e8b2bfb..fddde44 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -177,7 +177,7 @@ macro_rules! service { impl ::std::future::Future for ResponseFut { type Output = ::std::io::Result; - fn poll(self: ::std::pin::Pin<&mut Self>, waker: &::std::task::Waker) + fn poll(self: ::std::pin::Pin<&mut Self>, cx: &mut ::std::task::Context<'_>) -> ::std::task::Poll<::std::io::Result> { unsafe { @@ -185,7 +185,7 @@ macro_rules! service { $( ResponseFut::$fn_name(resp) => ::std::pin::Pin::new_unchecked(resp) - .poll(waker) + .poll(cx) .map(Response::$fn_name) .map(Ok), )*