diff --git a/README.md b/README.md index 45a6574..271213d 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ Here's a small service. use futures::{ - compat::TokioDefaultSpawner, + compat::Executor01CompatExt, future::{self, Ready}, prelude::*, }; @@ -121,7 +121,7 @@ async fn run() -> io::Result<()> { } fn main() { - tarpc::init(TokioDefaultSpawner); + tarpc::init(tokio::executor::DefaultExecutor::current().compat()); tokio::run(run() .map_err(|e| eprintln!("Oh no: {}", e)) .boxed() diff --git a/bincode-transport/Cargo.toml b/bincode-transport/Cargo.toml index c3430e3..ecda19c 100644 --- a/bincode-transport/Cargo.toml +++ b/bincode-transport/Cargo.toml @@ -23,10 +23,10 @@ async-bincode = "0.4" tokio-tcp = "0.1" [target.'cfg(not(test))'.dependencies] -futures-preview = { version = "0.3.0-alpha.12", features = ["compat"] } +futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] } [dev-dependencies] -futures-preview = { version = "0.3.0-alpha.12", features = ["compat", "tokio-compat"] } +futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] } env_logger = "0.6" humantime = "1.0" log = "0.4" diff --git a/bincode-transport/src/compat.rs b/bincode-transport/src/compat.rs index 9156f9b..f70348c 100644 --- a/bincode-transport/src/compat.rs +++ b/bincode-transport/src/compat.rs @@ -8,7 +8,7 @@ use futures_legacy::{ }; use std::{ pin::Pin, - task::{self, LocalWaker, Poll}, + task::{self, Poll, Waker}, }; /// A shim to convert a 0.1 Sink + Stream to a 0.3 Sink + Stream. @@ -44,7 +44,7 @@ where { type Item = Result; - fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll> { unsafe { let inner = &mut Pin::get_unchecked_mut(self).inner; let mut compat = inner.compat(); @@ -72,7 +72,7 @@ where Ok(()) } - fn poll_ready(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll> { let notify = &WakerToHandle(waker); executor01::with_notify(notify, 0, move || { @@ -91,7 +91,7 @@ where }) } - fn poll_flush(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll> { let notify = &WakerToHandle(waker); executor01::with_notify(notify, 0, move || { @@ -104,7 +104,7 @@ where }) } - fn poll_close(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll> { let notify = &WakerToHandle(waker); executor01::with_notify(notify, 0, move || { @@ -119,7 +119,7 @@ where } #[derive(Clone, Debug)] -struct WakerToHandle<'a>(&'a LocalWaker); +struct WakerToHandle<'a>(&'a Waker); #[derive(Debug)] struct NotifyWaker(task::Waker); @@ -145,6 +145,6 @@ unsafe impl UnsafeNotify01 for NotifyWaker { impl<'a> From> for NotifyHandle01 { fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 { - unsafe { NotifyWaker(handle.0.clone().into_waker()).clone_raw() } + unsafe { NotifyWaker(handle.0.clone()).clone_raw() } } } diff --git a/bincode-transport/src/lib.rs b/bincode-transport/src/lib.rs index cb8e981..37230dd 100644 --- a/bincode-transport/src/lib.rs +++ b/bincode-transport/src/lib.rs @@ -24,7 +24,7 @@ use std::{ marker::PhantomData, net::SocketAddr, pin::Pin, - task::{LocalWaker, Poll}, + task::{Poll, Waker}, }; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_tcp::{TcpListener, TcpStream}; @@ -57,7 +57,7 @@ where { type Item = io::Result; - fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll>> { + fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll>> { match self.inner().poll_next(waker) { Poll::Pending => Poll::Pending, Poll::Ready(None) => Poll::Ready(None), @@ -83,15 +83,15 @@ where .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } - fn poll_ready(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll> { convert(self.inner().poll_ready(waker)) } - fn poll_flush(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll> { convert(self.inner().poll_flush(waker)) } - fn poll_close(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll> { convert(self.inner().poll_close(waker)) } } @@ -189,7 +189,7 @@ where { type Item = io::Result>; - fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll> { let next = ready!(self.incoming().poll_next(waker)?); Poll::Ready(next.map(|conn| Ok(new(conn)))) } diff --git a/bincode-transport/tests/bench.rs b/bincode-transport/tests/bench.rs index 5e80f12..743968a 100644 --- a/bincode-transport/tests/bench.rs +++ b/bincode-transport/tests/bench.rs @@ -18,7 +18,7 @@ extern crate test; use self::test::stats::Stats; -use futures::{compat::TokioDefaultSpawner, prelude::*}; +use futures::{compat::Executor01CompatExt, prelude::*}; use rpc::{ client, context, server::{Handler, Server}, @@ -101,7 +101,7 @@ async fn bench() -> io::Result<()> { #[test] fn bench_small_packet() -> io::Result<()> { env_logger::init(); - rpc::init(TokioDefaultSpawner); + rpc::init(tokio::executor::DefaultExecutor::current().compat()); tokio::run(bench().map_err(|e| panic!(e.to_string())).boxed().compat()); println!("done"); diff --git a/bincode-transport/tests/cancel.rs b/bincode-transport/tests/cancel.rs index 6008eaa..fd891ef 100644 --- a/bincode-transport/tests/cancel.rs +++ b/bincode-transport/tests/cancel.rs @@ -9,7 +9,7 @@ #![feature(generators, await_macro, async_await, futures_api)] use futures::{ - compat::{Future01CompatExt, TokioDefaultSpawner}, + compat::{Executor01CompatExt, Future01CompatExt}, prelude::*, stream, }; @@ -136,7 +136,7 @@ async fn run() -> io::Result<()> { #[test] fn cancel_slower() -> io::Result<()> { env_logger::init(); - rpc::init(TokioDefaultSpawner); + rpc::init(tokio::executor::DefaultExecutor::current().compat()); tokio::run(run().boxed().map_err(|e| panic!(e)).compat()); Ok(()) diff --git a/bincode-transport/tests/pushback.rs b/bincode-transport/tests/pushback.rs index 4431bea..a59be02 100644 --- a/bincode-transport/tests/pushback.rs +++ b/bincode-transport/tests/pushback.rs @@ -9,7 +9,7 @@ #![feature(generators, await_macro, async_await, futures_api)] use futures::{ - compat::{Future01CompatExt, TokioDefaultSpawner}, + compat::{Executor01CompatExt, Future01CompatExt}, prelude::*, }; use log::{error, info, trace}; @@ -105,7 +105,7 @@ async fn run() -> io::Result<()> { #[test] fn ping_pong() -> io::Result<()> { env_logger::init(); - rpc::init(TokioDefaultSpawner); + rpc::init(tokio::executor::DefaultExecutor::current().compat()); tokio::run( run() diff --git a/example-service/Cargo.toml b/example-service/Cargo.toml index 0941874..8e16657 100644 --- a/example-service/Cargo.toml +++ b/example-service/Cargo.toml @@ -15,7 +15,7 @@ description = "An example server built on tarpc." [dependencies] bincode-transport = { package = "tarpc-bincode-transport", version = "0.3", path = "../bincode-transport" } clap = "2.0" -futures-preview = { version = "0.3.0-alpha.12", features = ["compat", "tokio-compat"] } +futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] } serde = { version = "1.0" } tarpc = { version = "0.14", path = "../tarpc", features = ["serde1"] } tokio = "0.1" diff --git a/example-service/src/client.rs b/example-service/src/client.rs index 31b1c39..788a041 100644 --- a/example-service/src/client.rs +++ b/example-service/src/client.rs @@ -7,7 +7,7 @@ #![feature(futures_api, arbitrary_self_types, await_macro, async_await)] use clap::{App, Arg}; -use futures::{compat::TokioDefaultSpawner, prelude::*}; +use futures::{compat::Executor01CompatExt, prelude::*}; use std::{io, net::SocketAddr}; use tarpc::{client, context}; @@ -53,7 +53,7 @@ fn main() { ) .get_matches(); - tarpc::init(TokioDefaultSpawner); + tarpc::init(tokio::executor::DefaultExecutor::current().compat()); let server_addr = flags.value_of("server_addr").unwrap(); let server_addr = server_addr @@ -62,7 +62,7 @@ fn main() { let name = flags.value_of("name").unwrap(); - tarpc::init(TokioDefaultSpawner); + tarpc::init(tokio::executor::DefaultExecutor::current().compat()); tokio::run( run(server_addr, name.into()) diff --git a/example-service/src/server.rs b/example-service/src/server.rs index 7d1c2ef..61f5638 100644 --- a/example-service/src/server.rs +++ b/example-service/src/server.rs @@ -8,7 +8,7 @@ use clap::{App, Arg}; use futures::{ - compat::TokioDefaultSpawner, + compat::Executor01CompatExt, future::{self, Ready}, prelude::*, }; @@ -73,7 +73,7 @@ fn main() { .parse() .unwrap_or_else(|e| panic!(r#"--port value "{}" invalid: {}"#, port, e)); - tarpc::init(TokioDefaultSpawner); + tarpc::init(tokio::executor::DefaultExecutor::current().compat()); tokio::run( run(([0, 0, 0, 0], port).into()) diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 7ec91cb..777c863 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -27,10 +27,10 @@ trace = { package = "tarpc-trace", version = "0.1", path = "../trace" } serde = { optional = true, version = "1.0" } [target.'cfg(not(test))'.dependencies] -futures-preview = { version = "0.3.0-alpha.12", features = ["compat"] } +futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] } [dev-dependencies] -futures-preview = { version = "0.3.0-alpha.12", features = ["compat", "tokio-compat"] } -futures-test-preview = { version = "0.3.0-alpha.12" } +futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] } +futures-test-preview = { version = "0.3.0-alpha.13" } env_logger = "0.6" tokio = "0.1" diff --git a/rpc/src/client/channel.rs b/rpc/src/client/channel.rs index d987541..b888efc 100644 --- a/rpc/src/client/channel.rs +++ b/rpc/src/client/channel.rs @@ -15,7 +15,7 @@ use futures::{ prelude::*, ready, stream::Fuse, - task::LocalWaker, + task::Waker, Poll, }; use humantime::format_rfc3339; @@ -82,8 +82,8 @@ impl<'a, Req, Resp> Send<'a, Req, Resp> { impl<'a, Req, Resp> Future for Send<'a, Req, Resp> { type Output = io::Result>; - fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll { - self.as_mut().fut().poll(lw) + fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll { + self.as_mut().fut().poll(waker) } } @@ -101,8 +101,8 @@ impl<'a, Req, Resp> Call<'a, Req, Resp> { impl<'a, Req, Resp> Future for Call<'a, Req, Resp> { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll { - self.as_mut().fut().poll(lw) + fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll { + self.as_mut().fut().poll(waker) } } @@ -177,7 +177,7 @@ impl DispatchResponse { impl Future for DispatchResponse { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll> { let resp = ready!(self.response.poll_unpin(waker)); self.complete = true; @@ -317,7 +317,7 @@ where unsafe_pinned!(pending_requests: Fuse>>); unsafe_pinned!(transport: Fuse); - fn pump_read(self: &mut Pin<&mut Self>, waker: &LocalWaker) -> PollIo<()> { + fn pump_read(self: &mut Pin<&mut Self>, waker: &Waker) -> PollIo<()> { Poll::Ready(match ready!(self.as_mut().transport().poll_next(waker)?) { Some(response) => { self.complete(response); @@ -330,7 +330,7 @@ where }) } - fn pump_write(self: &mut Pin<&mut Self>, waker: &LocalWaker) -> PollIo<()> { + fn pump_write(self: &mut Pin<&mut Self>, waker: &Waker) -> PollIo<()> { enum ReceiverStatus { NotReady, Closed, @@ -373,7 +373,7 @@ where /// Yields the next pending request, if one is ready to be sent. fn poll_next_request( self: &mut Pin<&mut Self>, - waker: &LocalWaker, + waker: &Waker, ) -> PollIo> { if self.as_mut().in_flight_requests().len() >= self.config.max_in_flight_requests { info!( @@ -416,7 +416,7 @@ where /// Yields the next pending cancellation, and, if one is ready, cancels the associated request. fn poll_next_cancellation( self: &mut Pin<&mut Self>, - waker: &LocalWaker, + waker: &Waker, ) -> PollIo<(context::Context, u64)> { while let Poll::Pending = self.as_mut().transport().poll_ready(waker)? { ready!(self.as_mut().transport().poll_flush(waker)?); @@ -530,7 +530,7 @@ where { type Output = io::Result<()>; - fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll> { trace!("[{}] RequestDispatch::poll", self.as_mut().server_addr()); loop { match (self.pump_read(waker)?, self.pump_write(waker)?) { @@ -620,7 +620,7 @@ impl RequestCancellation { impl Stream for CanceledRequests { type Item = u64; - fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, waker: &Waker) -> Poll> { self.0.poll_next_unpin(waker) } } @@ -652,8 +652,8 @@ where { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll { - match self.as_mut().future().try_poll(lw) { + fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll { + match self.as_mut().future().try_poll(waker) { Poll::Pending => Poll::Pending, Poll::Ready(result) => { self.finished().take().expect( @@ -692,8 +692,8 @@ where { type Output = Result, Fut::Error>; - fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll { - match self.as_mut().future().try_poll(lw) { + fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll { + match self.as_mut().future().try_poll(waker) { Poll::Pending => Poll::Pending, Poll::Ready(result) => { let response = self @@ -735,8 +735,8 @@ where { type Output = Result; - fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll { - self.try_chain().poll(lw, |result| match result { + fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll { + self.try_chain().poll(waker, |result| match result { Ok(ok) => TryChainAction::Future(ok), Err(err) => TryChainAction::Output(Err(err)), }) @@ -768,7 +768,7 @@ where TryChain::First(fut1) } - fn poll(self: Pin<&mut Self>, lw: &LocalWaker, f: F) -> Poll> + fn poll(self: Pin<&mut Self>, waker: &Waker, f: F) -> Poll> where F: FnOnce(Result) -> TryChainAction, { @@ -781,14 +781,14 @@ where let output = match this { TryChain::First(fut1) => { // Poll the first future - match unsafe { Pin::new_unchecked(fut1) }.try_poll(lw) { + match unsafe { Pin::new_unchecked(fut1) }.try_poll(waker) { Poll::Pending => return Poll::Pending, Poll::Ready(output) => output, } } TryChain::Second(fut2) => { // Poll the second future - return unsafe { Pin::new_unchecked(fut2) }.try_poll(lw); + return unsafe { Pin::new_unchecked(fut2) }.try_poll(waker); } TryChain::Empty => { panic!("future must not be polled after it returned `Poll::Ready`"); @@ -816,7 +816,7 @@ mod tests { }; use fnv::FnvHashMap; use futures::{channel::mpsc, prelude::*, Poll}; - use futures_test::task::noop_local_waker_ref; + use futures_test::task::noop_waker_ref; use std::{ marker, net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -839,7 +839,7 @@ mod tests { ); let mut dispatch = Pin::new(&mut dispatch); - let waker = &noop_local_waker_ref(); + let waker = &noop_waker_ref(); let req = dispatch.poll_next_request(waker).ready(); assert!(req.is_some()); @@ -866,7 +866,7 @@ mod tests { drop(channel); let mut dispatch = Pin::new(&mut dispatch); - let waker = &noop_local_waker_ref(); + let waker = &noop_waker_ref(); dispatch.poll_next_cancellation(waker).unwrap(); assert!(dispatch.poll_next_request(waker).ready().is_none()); @@ -890,7 +890,7 @@ mod tests { drop(channel); let mut dispatch = Pin::new(&mut dispatch); - let waker = &noop_local_waker_ref(); + let waker = &noop_waker_ref(); assert!(dispatch.poll_next_request(waker).ready().is_none()); } diff --git a/rpc/src/server/filter.rs b/rpc/src/server/filter.rs index 3107d05..5f01c22 100644 --- a/rpc/src/server/filter.rs +++ b/rpc/src/server/filter.rs @@ -15,7 +15,7 @@ use futures::{ prelude::*, ready, stream::Fuse, - task::{LocalWaker, Poll}, + task::{Poll, Waker}, }; use log::{debug, error, info, trace, warn}; use pin_utils::unsafe_pinned; @@ -197,10 +197,7 @@ impl ConnectionFilter { } } - fn poll_listener( - mut self: Pin<&mut Self>, - cx: &LocalWaker, - ) -> PollIo> + fn poll_listener(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo> where S: Stream>, C: Transport, SinkItem = Response> + Send, @@ -211,7 +208,7 @@ impl ConnectionFilter { } } - fn poll_closed_connections(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll> { + fn poll_closed_connections(self: &mut Pin<&mut Self>, cx: &Waker) -> Poll> { match ready!(self.as_mut().closed_connections_rx().poll_next_unpin(cx)) { Some(addr) => { self.handle_closed_connection(&addr); @@ -229,7 +226,7 @@ where { type Item = io::Result>; - fn poll_next(mut self: Pin<&mut Self>, cx: &LocalWaker) -> PollIo> { + fn poll_next(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo> { loop { match ( self.as_mut().poll_listener(cx)?, diff --git a/rpc/src/server/mod.rs b/rpc/src/server/mod.rs index 52d3dae..76b8aab 100644 --- a/rpc/src/server/mod.rs +++ b/rpc/src/server/mod.rs @@ -17,7 +17,7 @@ use futures::{ prelude::*, ready, stream::Fuse, - task::{LocalWaker, Poll}, + task::{Poll, Waker}, try_ready, }; use humantime::format_rfc3339; @@ -133,7 +133,7 @@ where { type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<()> { + fn poll(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<()> { while let Some(channel) = ready!(self.as_mut().incoming().poll_next(cx)) { match channel { Ok(channel) => { @@ -234,18 +234,15 @@ where self.as_mut().transport().start_send(response) } - pub(crate) fn poll_ready(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll> { + pub(crate) fn poll_ready(mut self: Pin<&mut Self>, cx: &Waker) -> Poll> { self.as_mut().transport().poll_ready(cx) } - pub(crate) fn poll_flush(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll> { + pub(crate) fn poll_flush(mut self: Pin<&mut Self>, cx: &Waker) -> Poll> { self.as_mut().transport().poll_flush(cx) } - pub(crate) fn poll_next( - mut self: Pin<&mut Self>, - cx: &LocalWaker, - ) -> PollIo> { + pub(crate) fn poll_next(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo> { self.as_mut().transport().poll_next(cx) } @@ -313,7 +310,7 @@ where { /// If at max in-flight requests, check that there's room to immediately write a throttled /// response. - fn poll_ready_if_throttling(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll> { + fn poll_ready_if_throttling(mut self: Pin<&mut Self>, cx: &Waker) -> Poll> { if self.in_flight_requests.len() >= self.channel.config.max_in_flight_requests_per_connection { @@ -331,7 +328,7 @@ where Poll::Ready(Ok(())) } - fn pump_read(mut self: Pin<&mut Self>, cx: &LocalWaker) -> PollIo<()> { + fn pump_read(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<()> { ready!(self.as_mut().poll_ready_if_throttling(cx)?); Poll::Ready(match ready!(self.as_mut().channel().poll_next(cx)?) { @@ -353,7 +350,7 @@ where }) } - fn pump_write(mut self: Pin<&mut Self>, cx: &LocalWaker, read_half_closed: bool) -> PollIo<()> { + fn pump_write(mut self: Pin<&mut Self>, cx: &Waker, read_half_closed: bool) -> PollIo<()> { match self.as_mut().poll_next_response(cx)? { Poll::Ready(Some((_, response))) => { self.as_mut().channel().start_send(response)?; @@ -382,7 +379,7 @@ where fn poll_next_response( mut self: Pin<&mut Self>, - cx: &LocalWaker, + cx: &Waker, ) -> PollIo<(Context, Response)> { // Ensure there's room to write a response. while let Poll::Pending = self.as_mut().channel().poll_ready(cx)? { @@ -535,7 +532,7 @@ where { type Output = io::Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll> { + fn poll(mut self: Pin<&mut Self>, cx: &Waker) -> Poll> { trace!("[{}] ClientHandler::poll", self.channel.client_addr); loop { let read = self.as_mut().pump_read(cx)?; diff --git a/rpc/src/transport/channel.rs b/rpc/src/transport/channel.rs index 92471f7..592b765 100644 --- a/rpc/src/transport/channel.rs +++ b/rpc/src/transport/channel.rs @@ -7,7 +7,7 @@ //! Transports backed by in-memory channels. use crate::{PollIo, Transport}; -use futures::{channel::mpsc, task::LocalWaker, Poll, Sink, Stream}; +use futures::{channel::mpsc, task::Waker, Poll, Sink, Stream}; use pin_utils::unsafe_pinned; use std::pin::Pin; use std::{ @@ -45,7 +45,7 @@ impl UnboundedChannel { impl Stream for UnboundedChannel { type Item = Result; - fn poll_next(self: Pin<&mut Self>, cx: &LocalWaker) -> PollIo { + fn poll_next(self: Pin<&mut Self>, cx: &Waker) -> PollIo { self.rx().poll_next(cx).map(|option| option.map(Ok)) } } @@ -54,7 +54,7 @@ impl Sink for UnboundedChannel { type SinkItem = SinkItem; type SinkError = io::Error; - fn poll_ready(self: Pin<&mut Self>, cx: &LocalWaker) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &Waker) -> Poll> { self.tx() .poll_ready(cx) .map_err(|_| io::Error::from(io::ErrorKind::NotConnected)) @@ -66,13 +66,13 @@ impl Sink for UnboundedChannel { .map_err(|_| io::Error::from(io::ErrorKind::NotConnected)) } - fn poll_flush(self: Pin<&mut Self>, cx: &LocalWaker) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &Waker) -> Poll> { self.tx() .poll_flush(cx) .map_err(|_| io::Error::from(io::ErrorKind::NotConnected)) } - fn poll_close(self: Pin<&mut Self>, cx: &LocalWaker) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &Waker) -> Poll> { self.tx() .poll_close(cx) .map_err(|_| io::Error::from(io::ErrorKind::NotConnected)) @@ -99,14 +99,15 @@ mod tests { server::{Handler, Server}, transport, }; - use futures::{compat::TokioDefaultSpawner, prelude::*, stream}; + use futures::compat::Executor01CompatExt; + use futures::{prelude::*, stream}; use log::trace; use std::io; #[test] fn integration() { let _ = env_logger::try_init(); - crate::init(TokioDefaultSpawner); + crate::init(tokio::executor::DefaultExecutor::current().compat()); let (client_channel, server_channel) = transport::channel::unbounded(); let server = Server::::default() diff --git a/rpc/src/transport/mod.rs b/rpc/src/transport/mod.rs index 998ee69..d72ffed 100644 --- a/rpc/src/transport/mod.rs +++ b/rpc/src/transport/mod.rs @@ -14,7 +14,7 @@ use std::{ io, net::SocketAddr, pin::Pin, - task::{LocalWaker, Poll}, + task::{Poll, Waker}, }; pub mod channel; @@ -71,7 +71,7 @@ where { type Item = S::Item; - fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll> { self.inner().poll_next(waker) } } @@ -87,15 +87,15 @@ where self.inner().start_send(item) } - fn poll_ready(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll> { self.inner().poll_ready(waker) } - fn poll_flush(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll> { self.inner().poll_flush(waker) } - fn poll_close(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll> { self.inner().poll_close(waker) } } diff --git a/rpc/src/util/deadline_compat.rs b/rpc/src/util/deadline_compat.rs index d175614..2a44fbc 100644 --- a/rpc/src/util/deadline_compat.rs +++ b/rpc/src/util/deadline_compat.rs @@ -8,7 +8,7 @@ use futures::{ compat::{Compat01As03, Future01CompatExt}, prelude::*, ready, - task::{LocalWaker, Poll}, + task::{Poll, Waker}, }; use pin_utils::unsafe_pinned; use std::pin::Pin; @@ -50,7 +50,7 @@ where { type Output = Result>; - fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll { + fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll { // First, try polling the future match self.as_mut().future().try_poll(waker) { Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)), diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index 7d084ae..1b49211 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -25,13 +25,13 @@ tarpc-plugins = { path = "../plugins", version = "0.5.0" } rpc = { package = "tarpc-lib", path = "../rpc", version = "0.2" } [target.'cfg(not(test))'.dependencies] -futures-preview = "0.3.0-alpha.12" +futures-preview = "0.3.0-alpha.13" [dev-dependencies] bincode = "1.0" bytes = { version = "0.4", features = ["serde"] } humantime = "1.0" -futures-preview = { version = "0.3.0-alpha.12", features = ["compat", "tokio-compat"] } +futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] } bincode-transport = { package = "tarpc-bincode-transport", version = "0.3", path = "../bincode-transport" } env_logger = "0.6" tokio = "0.1" diff --git a/tarpc/examples/readme.rs b/tarpc/examples/readme.rs index 2675b78..16f97ab 100644 --- a/tarpc/examples/readme.rs +++ b/tarpc/examples/readme.rs @@ -13,7 +13,7 @@ )] use futures::{ - compat::TokioDefaultSpawner, + compat::Executor01CompatExt, future::{self, Ready}, prelude::*, }; @@ -82,7 +82,7 @@ async fn run() -> io::Result<()> { } fn main() { - tarpc::init(TokioDefaultSpawner); + tarpc::init(tokio::executor::DefaultExecutor::current().compat()); tokio::run( run() diff --git a/tarpc/examples/server_calling_server.rs b/tarpc/examples/server_calling_server.rs index 00ee4ab..c8f318d 100644 --- a/tarpc/examples/server_calling_server.rs +++ b/tarpc/examples/server_calling_server.rs @@ -15,7 +15,7 @@ use crate::{add::Service as AddService, double::Service as DoubleService}; use futures::{ - compat::TokioDefaultSpawner, + compat::Executor01CompatExt, future::{self, Ready}, prelude::*, }; @@ -102,6 +102,6 @@ async fn run() -> io::Result<()> { fn main() { env_logger::init(); - tarpc::init(TokioDefaultSpawner); + tarpc::init(tokio::executor::DefaultExecutor::current().compat()); tokio::run(run().map_err(|e| panic!(e)).boxed().compat()); } diff --git a/tarpc/examples/service_registry.rs b/tarpc/examples/service_registry.rs index 7cee6fb..97f97a3 100644 --- a/tarpc/examples/service_registry.rs +++ b/tarpc/examples/service_registry.rs @@ -18,7 +18,7 @@ mod registry { io, pin::Pin, sync::Arc, - task::{LocalWaker, Poll}, + task::{Poll, Waker}, }; use tarpc::{ client::{self, Client}, @@ -213,7 +213,7 @@ mod registry { { type Output = Output; - fn poll(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll { + fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll { unsafe { match Pin::get_unchecked_mut(self) { Either::Left(car) => Pin::new_unchecked(car).poll(waker), @@ -239,6 +239,7 @@ mod registry { // Example use bytes::Bytes; use futures::{ + compat::Executor01CompatExt, future::{ready, Ready}, prelude::*, }; @@ -408,6 +409,6 @@ async fn run() -> io::Result<()> { } fn main() { - tarpc::init(futures::compat::TokioDefaultSpawner); + tarpc::init(tokio::executor::DefaultExecutor::current().compat()); tokio::run(run().boxed().map_err(|e| panic!(e)).boxed().compat()); } diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index 7551ed2..e8b2bfb 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -177,7 +177,7 @@ macro_rules! service { impl ::std::future::Future for ResponseFut { type Output = ::std::io::Result; - fn poll(self: ::std::pin::Pin<&mut Self>, waker: &::std::task::LocalWaker) + fn poll(self: ::std::pin::Pin<&mut Self>, waker: &::std::task::Waker) -> ::std::task::Poll<::std::io::Result> { unsafe { @@ -282,7 +282,7 @@ mod syntax_test { #[cfg(test)] mod functional_test { use futures::{ - compat::TokioDefaultSpawner, + compat::Executor01CompatExt, future::{ready, Ready}, prelude::*, }; @@ -315,7 +315,7 @@ mod functional_test { #[test] fn sequential() { let _ = env_logger::try_init(); - rpc::init(TokioDefaultSpawner); + rpc::init(tokio::executor::DefaultExecutor::current().compat()); let test = async { let (tx, rx) = channel::unbounded(); @@ -344,7 +344,7 @@ mod functional_test { #[test] fn concurrent() { let _ = env_logger::try_init(); - rpc::init(TokioDefaultSpawner); + rpc::init(tokio::executor::DefaultExecutor::current().compat()); let test = async { let (tx, rx) = channel::unbounded(); diff --git a/tarpc/tests/latency.rs b/tarpc/tests/latency.rs index a2b9c54..7a35b67 100644 --- a/tarpc/tests/latency.rs +++ b/tarpc/tests/latency.rs @@ -18,7 +18,7 @@ extern crate test; use self::test::stats::Stats; -use futures::{compat::TokioDefaultSpawner, future, prelude::*}; +use futures::{compat::Executor01CompatExt, future, prelude::*}; use rpc::{ client, context, server::{Handler, Server}, @@ -119,7 +119,7 @@ async fn bench() -> io::Result<()> { #[test] fn bench_small_packet() { env_logger::init(); - tarpc::init(TokioDefaultSpawner); + tarpc::init(tokio::executor::DefaultExecutor::current().compat()); tokio::run(bench().map_err(|e| panic!(e.to_string())).boxed().compat()) }