From f4e22bdc2e49cdebe1987696761e974a1cd34be1 Mon Sep 17 00:00:00 2001 From: Artem Vorotnikov Date: Mon, 15 Apr 2019 18:56:23 +0300 Subject: [PATCH] Port to std::task::Context --- bincode-transport/src/lib.rs | 22 +++---- rpc/src/client/channel.rs | 98 ++++++++++++++++-------------- rpc/src/server/filter.rs | 14 +++-- rpc/src/server/mod.rs | 60 +++++++++++------- rpc/src/transport/channel.rs | 10 +-- rpc/src/transport/mod.rs | 18 +++--- rpc/src/util/deadline_compat.rs | 10 +-- tarpc/examples/service_registry.rs | 8 +-- tarpc/src/macros.rs | 4 +- 9 files changed, 135 insertions(+), 109 deletions(-) diff --git a/bincode-transport/src/lib.rs b/bincode-transport/src/lib.rs index cf347b6..7a5e657 100644 --- a/bincode-transport/src/lib.rs +++ b/bincode-transport/src/lib.rs @@ -19,7 +19,7 @@ use std::{ marker::PhantomData, net::SocketAddr, pin::Pin, - task::{Poll, Waker}, + task::{Context, Poll}, }; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_tcp::{TcpListener, TcpStream}; @@ -45,8 +45,8 @@ where { type Item = io::Result; - fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll>> { - match self.inner().poll_next(waker) { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + match self.inner().poll_next(cx) { Poll::Pending => Poll::Pending, Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(Ok(next))) => Poll::Ready(Some(Ok(next))), @@ -70,16 +70,16 @@ where .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } - fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - convert(self.inner().poll_ready(waker)) + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + convert(self.inner().poll_ready(cx)) } - fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - convert(self.inner().poll_flush(waker)) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + convert(self.inner().poll_flush(cx)) } - fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - convert(self.inner().poll_close(waker)) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + convert(self.inner().poll_close(cx)) } } @@ -176,8 +176,8 @@ where { type Item = io::Result>; - fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - let next = ready!(self.incoming().poll_next(waker)?); + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let next = ready!(self.incoming().poll_next(cx)?); Poll::Ready(next.map(|conn| Ok(new(conn)))) } } diff --git a/rpc/src/client/channel.rs b/rpc/src/client/channel.rs index 5c42133..0dbc572 100644 --- a/rpc/src/client/channel.rs +++ b/rpc/src/client/channel.rs @@ -15,7 +15,7 @@ use futures::{ prelude::*, ready, stream::Fuse, - task::Waker, + task::Context, Poll, }; use humantime::format_rfc3339; @@ -87,8 +87,8 @@ impl<'a, Req, Resp> Send<'a, Req, Resp> { impl<'a, Req, Resp> Future for Send<'a, Req, Resp> { type Output = io::Result>; - fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll { - self.as_mut().fut().poll(waker) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.as_mut().fut().poll(cx) } } @@ -106,8 +106,8 @@ impl<'a, Req, Resp> Call<'a, Req, Resp> { impl<'a, Req, Resp> Future for Call<'a, Req, Resp> { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll { - self.as_mut().fut().poll(waker) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.as_mut().fut().poll(cx) } } @@ -182,8 +182,8 @@ impl DispatchResponse { impl Future for DispatchResponse { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll> { - let resp = ready!(self.response.poll_unpin(waker)); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let resp = ready!(self.response.poll_unpin(cx)); self.complete = true; @@ -323,8 +323,8 @@ where unsafe_pinned!(pending_requests: Fuse>>); unsafe_pinned!(transport: Fuse); - fn pump_read(self: &mut Pin<&mut Self>, waker: &Waker) -> PollIo<()> { - Poll::Ready(match ready!(self.as_mut().transport().poll_next(waker)?) { + fn pump_read(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> { + Poll::Ready(match ready!(self.as_mut().transport().poll_next(cx)?) { Some(response) => { self.complete(response); Some(Ok(())) @@ -336,13 +336,13 @@ where }) } - fn pump_write(self: &mut Pin<&mut Self>, waker: &Waker) -> PollIo<()> { + fn pump_write(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> { enum ReceiverStatus { NotReady, Closed, } - let pending_requests_status = match self.poll_next_request(waker)? { + let pending_requests_status = match self.poll_next_request(cx)? { Poll::Ready(Some(dispatch_request)) => { self.write_request(dispatch_request)?; return Poll::Ready(Some(Ok(()))); @@ -351,7 +351,7 @@ where Poll::Pending => ReceiverStatus::NotReady, }; - let canceled_requests_status = match self.poll_next_cancellation(waker)? { + let canceled_requests_status = match self.poll_next_cancellation(cx)? { Poll::Ready(Some((context, request_id))) => { self.write_cancel(context, request_id)?; return Poll::Ready(Some(Ok(()))); @@ -362,12 +362,12 @@ where match (pending_requests_status, canceled_requests_status) { (ReceiverStatus::Closed, ReceiverStatus::Closed) => { - ready!(self.as_mut().transport().poll_flush(waker)?); + ready!(self.as_mut().transport().poll_flush(cx)?); Poll::Ready(None) } (ReceiverStatus::NotReady, _) | (_, ReceiverStatus::NotReady) => { // No more messages to process, so flush any messages buffered in the transport. - ready!(self.as_mut().transport().poll_flush(waker)?); + ready!(self.as_mut().transport().poll_flush(cx)?); // Even if we fully-flush, we return Pending, because we have no more requests // or cancellations right now. @@ -379,7 +379,7 @@ where /// Yields the next pending request, if one is ready to be sent. fn poll_next_request( self: &mut Pin<&mut Self>, - waker: &Waker, + cx: &mut Context<'_>, ) -> PollIo> { if self.as_mut().in_flight_requests().len() >= self.config.max_in_flight_requests { info!( @@ -393,13 +393,13 @@ where return Poll::Pending; } - while let Poll::Pending = self.as_mut().transport().poll_ready(waker)? { + while let Poll::Pending = self.as_mut().transport().poll_ready(cx)? { // We can't yield a request-to-be-sent before the transport is capable of buffering it. - ready!(self.as_mut().transport().poll_flush(waker)?); + ready!(self.as_mut().transport().poll_flush(cx)?); } loop { - match ready!(self.as_mut().pending_requests().poll_next_unpin(waker)) { + match ready!(self.as_mut().pending_requests().poll_next_unpin(cx)) { Some(request) => { if request.response_completion.is_canceled() { trace!( @@ -422,14 +422,14 @@ where /// Yields the next pending cancellation, and, if one is ready, cancels the associated request. fn poll_next_cancellation( self: &mut Pin<&mut Self>, - waker: &Waker, + cx: &mut Context<'_>, ) -> PollIo<(context::Context, u64)> { - while let Poll::Pending = self.as_mut().transport().poll_ready(waker)? { - ready!(self.as_mut().transport().poll_flush(waker)?); + while let Poll::Pending = self.as_mut().transport().poll_ready(cx)? { + ready!(self.as_mut().transport().poll_flush(cx)?); } loop { - let cancellation = self.as_mut().canceled_requests().poll_next_unpin(waker); + let cancellation = self.as_mut().canceled_requests().poll_next_unpin(cx); match ready!(cancellation) { Some(request_id) => { if let Some(in_flight_data) = @@ -537,10 +537,10 @@ where { type Output = io::Result<()>; - fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { trace!("[{}] RequestDispatch::poll", self.as_mut().server_addr()); loop { - match (self.pump_read(waker)?, self.pump_write(waker)?) { + match (self.pump_read(cx)?, self.pump_write(cx)?) { (read, write @ Poll::Ready(None)) => { if self.as_mut().in_flight_requests().is_empty() { info!( @@ -627,8 +627,8 @@ impl RequestCancellation { impl Stream for CanceledRequests { type Item = u64; - fn poll_next(mut self: Pin<&mut Self>, waker: &Waker) -> Poll> { - self.0.poll_next_unpin(waker) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.0.poll_next_unpin(cx) } } @@ -659,8 +659,8 @@ where { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll { - match self.as_mut().future().try_poll(waker) { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.as_mut().future().try_poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(result) => { self.finished().take().expect( @@ -699,8 +699,8 @@ where { type Output = Result, Fut::Error>; - fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll { - match self.as_mut().future().try_poll(waker) { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.as_mut().future().try_poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(result) => { let response = self @@ -742,8 +742,8 @@ where { type Output = Result; - fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll { - self.try_chain().poll(waker, |result| match result { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.try_chain().poll(cx, |result| match result { Ok(ok) => TryChainAction::Future(ok), Err(err) => TryChainAction::Output(Err(err)), }) @@ -775,7 +775,11 @@ where TryChain::First(fut1) } - fn poll(self: Pin<&mut Self>, waker: &Waker, f: F) -> Poll> + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + f: F, + ) -> Poll> where F: FnOnce(Result) -> TryChainAction, { @@ -788,14 +792,14 @@ where let output = match this { TryChain::First(fut1) => { // Poll the first future - match unsafe { Pin::new_unchecked(fut1) }.try_poll(waker) { + match unsafe { Pin::new_unchecked(fut1) }.try_poll(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(output) => output, } } TryChain::Second(fut2) => { // Poll the second future - return unsafe { Pin::new_unchecked(fut2) }.try_poll(waker); + return unsafe { Pin::new_unchecked(fut2) }.try_poll(cx); } TryChain::Empty => { panic!("future must not be polled after it returned `Poll::Ready`"); @@ -824,7 +828,7 @@ mod tests { ClientMessage, Response, }; use fnv::FnvHashMap; - use futures::{channel::mpsc, prelude::*, Poll}; + use futures::{channel::mpsc, prelude::*, task::Context, Poll}; use futures_test::task::noop_waker_ref; use std::{ marker, @@ -838,11 +842,11 @@ mod tests { fn stage_request() { let (mut dispatch, mut channel, _server_channel) = set_up(); let mut dispatch = Pin::new(&mut dispatch); - let waker = &noop_waker_ref(); + let cx = &mut Context::from_waker(&noop_waker_ref()); let _resp = send_request(&mut channel, "hi"); - let req = dispatch.poll_next_request(waker).ready(); + let req = dispatch.poll_next_request(cx).ready(); assert!(req.is_some()); let req = req.unwrap(); @@ -855,12 +859,12 @@ mod tests { fn stage_request_channel_dropped_doesnt_panic() { let (mut dispatch, mut channel, mut server_channel) = set_up(); let mut dispatch = Pin::new(&mut dispatch); - let waker = &noop_waker_ref(); + let cx = &mut Context::from_waker(&noop_waker_ref()); let _ = send_request(&mut channel, "hi"); drop(channel); - assert!(dispatch.as_mut().poll(waker).is_ready()); + assert!(dispatch.as_mut().poll(cx).is_ready()); send_response( &mut server_channel, Response { @@ -875,7 +879,7 @@ mod tests { fn stage_request_response_future_dropped_is_canceled_before_sending() { let (mut dispatch, mut channel, _server_channel) = set_up(); let mut dispatch = Pin::new(&mut dispatch); - let waker = &noop_waker_ref(); + let cx = &mut Context::from_waker(&noop_waker_ref()); let _ = send_request(&mut channel, "hi"); @@ -883,24 +887,24 @@ mod tests { drop(channel); // Test that a request future dropped before it's processed by dispatch will cause the request // to not be added to the in-flight request map. - assert!(dispatch.poll_next_request(waker).ready().is_none()); + assert!(dispatch.poll_next_request(cx).ready().is_none()); } #[test] fn stage_request_response_future_dropped_is_canceled_after_sending() { let (mut dispatch, mut channel, _server_channel) = set_up(); - let waker = &noop_waker_ref(); + let cx = &mut Context::from_waker(&noop_waker_ref()); let mut dispatch = Pin::new(&mut dispatch); let req = send_request(&mut channel, "hi"); - assert!(dispatch.as_mut().pump_write(waker).ready().is_some()); + assert!(dispatch.as_mut().pump_write(cx).ready().is_some()); assert!(!dispatch.as_mut().in_flight_requests().is_empty()); // Test that a request future dropped after it's processed by dispatch will cause the request // to be removed from the in-flight request map. drop(req); - if let Poll::Ready(Some(_)) = dispatch.as_mut().poll_next_cancellation(waker).unwrap() { + if let Poll::Ready(Some(_)) = dispatch.as_mut().poll_next_cancellation(cx).unwrap() { // ok } else { panic!("Expected request to be cancelled") @@ -912,7 +916,7 @@ mod tests { fn stage_request_response_closed_skipped() { let (mut dispatch, mut channel, _server_channel) = set_up(); let mut dispatch = Pin::new(&mut dispatch); - let waker = &noop_waker_ref(); + let cx = &mut Context::from_waker(&noop_waker_ref()); // Test that a request future that's closed its receiver but not yet canceled its request -- // i.e. still in `drop fn` -- will cause the request to not be added to the in-flight request @@ -920,7 +924,7 @@ mod tests { let mut resp = send_request(&mut channel, "hi"); resp.response.get_mut().close(); - assert!(dispatch.poll_next_request(waker).is_pending()); + assert!(dispatch.poll_next_request(cx).is_pending()); } fn set_up() -> ( diff --git a/rpc/src/server/filter.rs b/rpc/src/server/filter.rs index 5f01c22..5d24d61 100644 --- a/rpc/src/server/filter.rs +++ b/rpc/src/server/filter.rs @@ -15,7 +15,7 @@ use futures::{ prelude::*, ready, stream::Fuse, - task::{Poll, Waker}, + task::{Context, Poll}, }; use log::{debug, error, info, trace, warn}; use pin_utils::unsafe_pinned; @@ -197,7 +197,10 @@ impl ConnectionFilter { } } - fn poll_listener(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo> + fn poll_listener( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> PollIo> where S: Stream>, C: Transport, SinkItem = Response> + Send, @@ -208,7 +211,10 @@ impl ConnectionFilter { } } - fn poll_closed_connections(self: &mut Pin<&mut Self>, cx: &Waker) -> Poll> { + fn poll_closed_connections( + self: &mut Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { match ready!(self.as_mut().closed_connections_rx().poll_next_unpin(cx)) { Some(addr) => { self.handle_closed_connection(&addr); @@ -226,7 +232,7 @@ where { type Item = io::Result>; - fn poll_next(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo> { loop { match ( self.as_mut().poll_listener(cx)?, diff --git a/rpc/src/server/mod.rs b/rpc/src/server/mod.rs index 76b8aab..9b112e2 100644 --- a/rpc/src/server/mod.rs +++ b/rpc/src/server/mod.rs @@ -7,7 +7,7 @@ //! Provides a server that concurrently handles many connections sending multiplexed requests. use crate::{ - context::Context, util::deadline_compat, util::AsDuration, util::Compact, ClientMessage, + context, util::deadline_compat, util::AsDuration, util::Compact, ClientMessage, ClientMessageKind, PollIo, Request, Response, ServerError, Transport, }; use fnv::FnvHashMap; @@ -17,7 +17,7 @@ use futures::{ prelude::*, ready, stream::Fuse, - task::{Poll, Waker}, + task::{Context, Poll}, try_ready, }; use humantime::format_rfc3339; @@ -128,12 +128,12 @@ where Req: Send + 'static, Resp: Send + 'static, T: Transport, SinkItem = Response> + Send + 'static, - F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone, + F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone, Fut: Future> + Send + 'static, { type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<()> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { while let Some(channel) = ready!(self.as_mut().incoming().poll_next(cx)) { match channel { Ok(channel) => { @@ -165,7 +165,7 @@ where /// Responds to all requests with `request_handler`. fn respond_with(self, request_handler: F) -> Running where - F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone, + F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone, Fut: Future> + Send + 'static, { Running { @@ -234,15 +234,24 @@ where self.as_mut().transport().start_send(response) } - pub(crate) fn poll_ready(mut self: Pin<&mut Self>, cx: &Waker) -> Poll> { + pub(crate) fn poll_ready( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { self.as_mut().transport().poll_ready(cx) } - pub(crate) fn poll_flush(mut self: Pin<&mut Self>, cx: &Waker) -> Poll> { + pub(crate) fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { self.as_mut().transport().poll_flush(cx) } - pub(crate) fn poll_next(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo> { + pub(crate) fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> PollIo> { self.as_mut().transport().poll_next(cx) } @@ -255,7 +264,7 @@ where /// responses and resolves when the connection is closed. pub fn respond_with(self, f: F) -> impl Future where - F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone, + F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone, Fut: Future> + Send + 'static, Req: 'static, Resp: 'static, @@ -281,9 +290,9 @@ where struct ClientHandler { channel: Channel, /// Responses waiting to be written to the wire. - pending_responses: Fuse)>>, + pending_responses: Fuse)>>, /// Handed out to request handlers to fan in responses. - responses_tx: mpsc::Sender<(Context, Response)>, + responses_tx: mpsc::Sender<(context::Context, Response)>, /// Number of requests currently being responded to. in_flight_requests: FnvHashMap, /// Request handler. @@ -293,8 +302,8 @@ struct ClientHandler { impl ClientHandler { unsafe_pinned!(channel: Channel); unsafe_pinned!(in_flight_requests: FnvHashMap); - unsafe_pinned!(pending_responses: Fuse)>>); - unsafe_pinned!(responses_tx: mpsc::Sender<(Context, Response)>); + unsafe_pinned!(pending_responses: Fuse)>>); + unsafe_pinned!(responses_tx: mpsc::Sender<(context::Context, Response)>); // For this to be safe, field f must be private, and code in this module must never // construct PinMut. unsafe_unpinned!(f: F); @@ -305,12 +314,15 @@ where Req: Send + 'static, Resp: Send + 'static, T: Transport, SinkItem = Response> + Send, - F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone, + F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone, Fut: Future> + Send + 'static, { /// If at max in-flight requests, check that there's room to immediately write a throttled /// response. - fn poll_ready_if_throttling(mut self: Pin<&mut Self>, cx: &Waker) -> Poll> { + fn poll_ready_if_throttling( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { if self.in_flight_requests.len() >= self.channel.config.max_in_flight_requests_per_connection { @@ -328,7 +340,7 @@ where Poll::Ready(Ok(())) } - fn pump_read(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<()> { + fn pump_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> { ready!(self.as_mut().poll_ready_if_throttling(cx)?); Poll::Ready(match ready!(self.as_mut().channel().poll_next(cx)?) { @@ -350,7 +362,11 @@ where }) } - fn pump_write(mut self: Pin<&mut Self>, cx: &Waker, read_half_closed: bool) -> PollIo<()> { + fn pump_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + read_half_closed: bool, + ) -> PollIo<()> { match self.as_mut().poll_next_response(cx)? { Poll::Ready(Some((_, response))) => { self.as_mut().channel().start_send(response)?; @@ -379,8 +395,8 @@ where fn poll_next_response( mut self: Pin<&mut Self>, - cx: &Waker, - ) -> PollIo<(Context, Response)> { + cx: &mut Context<'_>, + ) -> PollIo<(context::Context, Response)> { // Ensure there's room to write a response. while let Poll::Pending = self.as_mut().channel().poll_ready(cx)? { ready!(self.as_mut().channel().poll_flush(cx)?); @@ -421,7 +437,7 @@ where ) -> io::Result<()> { let request_id = request.id; let peer = self.as_mut().channel().client_addr; - let ctx = Context { + let ctx = context::Context { deadline: request.deadline, trace_context, }; @@ -527,12 +543,12 @@ where Req: Send + 'static, Resp: Send + 'static, T: Transport, SinkItem = Response> + Send, - F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone, + F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone, Fut: Future> + Send + 'static, { type Output = io::Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &Waker) -> Poll> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { trace!("[{}] ClientHandler::poll", self.channel.client_addr); loop { let read = self.as_mut().pump_read(cx)?; diff --git a/rpc/src/transport/channel.rs b/rpc/src/transport/channel.rs index a5c3d39..704391c 100644 --- a/rpc/src/transport/channel.rs +++ b/rpc/src/transport/channel.rs @@ -7,7 +7,7 @@ //! Transports backed by in-memory channels. use crate::{PollIo, Transport}; -use futures::{channel::mpsc, task::Waker, Poll, Sink, Stream}; +use futures::{channel::mpsc, task::Context, Poll, Sink, Stream}; use pin_utils::unsafe_pinned; use std::pin::Pin; use std::{ @@ -45,7 +45,7 @@ impl UnboundedChannel { impl Stream for UnboundedChannel { type Item = Result; - fn poll_next(self: Pin<&mut Self>, cx: &Waker) -> PollIo { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo { self.rx().poll_next(cx).map(|option| option.map(Ok)) } } @@ -53,7 +53,7 @@ impl Stream for UnboundedChannel { impl Sink for UnboundedChannel { type SinkError = io::Error; - fn poll_ready(self: Pin<&mut Self>, cx: &Waker) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.tx() .poll_ready(cx) .map_err(|_| io::Error::from(io::ErrorKind::NotConnected)) @@ -65,13 +65,13 @@ impl Sink for UnboundedChannel { .map_err(|_| io::Error::from(io::ErrorKind::NotConnected)) } - fn poll_flush(self: Pin<&mut Self>, cx: &Waker) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.tx() .poll_flush(cx) .map_err(|_| io::Error::from(io::ErrorKind::NotConnected)) } - fn poll_close(self: Pin<&mut Self>, cx: &Waker) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.tx() .poll_close(cx) .map_err(|_| io::Error::from(io::ErrorKind::NotConnected)) diff --git a/rpc/src/transport/mod.rs b/rpc/src/transport/mod.rs index c368660..b926450 100644 --- a/rpc/src/transport/mod.rs +++ b/rpc/src/transport/mod.rs @@ -15,7 +15,7 @@ use std::{ marker::PhantomData, net::SocketAddr, pin::Pin, - task::{Poll, Waker}, + task::{Context, Poll}, }; pub mod channel; @@ -74,8 +74,8 @@ where { type Item = S::Item; - fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - self.inner().poll_next(waker) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner().poll_next(cx) } } @@ -89,16 +89,16 @@ where self.inner().start_send(item) } - fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - self.inner().poll_ready(waker) + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner().poll_ready(cx) } - fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - self.inner().poll_flush(waker) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner().poll_flush(cx) } - fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll> { - self.inner().poll_close(waker) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner().poll_close(cx) } } diff --git a/rpc/src/util/deadline_compat.rs b/rpc/src/util/deadline_compat.rs index 2a44fbc..c91f20d 100644 --- a/rpc/src/util/deadline_compat.rs +++ b/rpc/src/util/deadline_compat.rs @@ -5,10 +5,10 @@ // https://opensource.org/licenses/MIT. use futures::{ - compat::{Compat01As03, Future01CompatExt}, + compat::*, prelude::*, ready, - task::{Poll, Waker}, + task::{Context, Poll}, }; use pin_utils::unsafe_pinned; use std::pin::Pin; @@ -50,15 +50,15 @@ where { type Output = Result>; - fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // First, try polling the future - match self.as_mut().future().try_poll(waker) { + match self.as_mut().future().try_poll(cx) { Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)), Poll::Pending => {} Poll::Ready(Err(e)) => return Poll::Ready(Err(timeout::Error::inner(e))), } - let delay = self.delay().poll_unpin(waker); + let delay = self.delay().poll_unpin(cx); // Now check the timer match ready!(delay) { diff --git a/tarpc/examples/service_registry.rs b/tarpc/examples/service_registry.rs index 97f97a3..59fa61b 100644 --- a/tarpc/examples/service_registry.rs +++ b/tarpc/examples/service_registry.rs @@ -18,7 +18,7 @@ mod registry { io, pin::Pin, sync::Arc, - task::{Poll, Waker}, + task::{Context, Poll}, }; use tarpc::{ client::{self, Client}, @@ -213,11 +213,11 @@ mod registry { { type Output = Output; - fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { unsafe { match Pin::get_unchecked_mut(self) { - Either::Left(car) => Pin::new_unchecked(car).poll(waker), - Either::Right(cdr) => Pin::new_unchecked(cdr).poll(waker), + Either::Left(car) => Pin::new_unchecked(car).poll(cx), + Either::Right(cdr) => Pin::new_unchecked(cdr).poll(cx), } } } diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index e8b2bfb..fddde44 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -177,7 +177,7 @@ macro_rules! service { impl ::std::future::Future for ResponseFut { type Output = ::std::io::Result; - fn poll(self: ::std::pin::Pin<&mut Self>, waker: &::std::task::Waker) + fn poll(self: ::std::pin::Pin<&mut Self>, cx: &mut ::std::task::Context<'_>) -> ::std::task::Poll<::std::io::Result> { unsafe { @@ -185,7 +185,7 @@ macro_rules! service { $( ResponseFut::$fn_name(resp) => ::std::pin::Pin::new_unchecked(resp) - .poll(waker) + .poll(cx) .map(Response::$fn_name) .map(Ok), )*