From 41c1aafaf741ecd8b9c7ae12d3d2f11eddc14212 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Thu, 8 Aug 2019 21:49:33 -0700 Subject: [PATCH] Update tokio to v0.2.0-alpha.1 As part of this, I made an optional tokio feature which, when enabled, adds utility functions that spawn on the default tokio executor. This allows for the removal of the runtime crate. On the one hand, this makes the spawning utils slightly less generic. On the other hand: - The fns are just helpers and are easily rewritten by the user. - Tokio is the clear dominant futures executor, so most people will just use these versions. --- README.md | 10 ++- example-service/Cargo.toml | 7 +- example-service/src/client.rs | 2 +- example-service/src/server.rs | 2 +- json-transport/Cargo.toml | 2 +- plugins/Cargo.toml | 2 +- rpc/Cargo.toml | 12 ++-- rpc/src/client/channel.rs | 81 +++++++-------------- rpc/src/client/mod.rs | 11 +-- rpc/src/lib.rs | 40 +---------- rpc/src/server/mod.rs | 93 +++++++++---------------- rpc/src/transport/channel.rs | 7 +- rpc/src/util/deadline_compat.rs | 68 ------------------ rpc/src/util/mod.rs | 1 - tarpc/Cargo.toml | 9 ++- tarpc/examples/pubsub.rs | 6 +- tarpc/examples/readme.rs | 4 +- tarpc/examples/server_calling_server.rs | 6 +- tarpc/src/lib.rs | 11 ++- tarpc/tests/service_functional.rs | 40 +++-------- 20 files changed, 103 insertions(+), 311 deletions(-) delete mode 100644 rpc/src/util/deadline_compat.rs diff --git a/README.md b/README.md index b267d5d..e34b6eb 100644 --- a/README.md +++ b/README.md @@ -60,9 +60,8 @@ For this example, in addition to tarpc, also add two other dependencies to your `Cargo.toml`: ```toml -futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } -tokio = "0.1" -runtime-tokio = "0.3.0-alpha.5" +futures-preview = { version = "0.3.0-alpha.17" } +tokio = "0.2.0-alpha.1" ``` In the following example, we use an in-process channel for communication between @@ -75,7 +74,6 @@ First, let's set up the dependencies and service definition. #![feature(async_await)] use futures::{ - compat::Executor01CompatExt, future::{self, Ready}, prelude::*, }; @@ -123,7 +121,7 @@ tarpc also ships a that uses bincode over TCP. ```rust -#[runtime::main(runtime_tokio::Tokio)] +#[tokio::main] async fn main() -> io::Result<()> { let (client_transport, server_transport) = tarpc::transport::channel::unbounded(); @@ -133,7 +131,7 @@ async fn main() -> io::Result<()> { .incoming(stream::once(future::ready(server_transport))) .respond_with(HelloServer.serve()); - let _ = runtime::spawn(server); + tokio::spawn(server); // WorldClient is generated by the macro. It has a constructor `new` that takes a config and // any Transport as input diff --git a/example-service/Cargo.toml b/example-service/Cargo.toml index e2d5296..e287e92 100644 --- a/example-service/Cargo.toml +++ b/example-service/Cargo.toml @@ -15,11 +15,10 @@ description = "An example server built on tarpc." [dependencies] json-transport = { package = "tarpc-json-transport", version = "0.1", path = "../json-transport" } clap = "2.0" -futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } -runtime = "0.3.0-alpha.6" -runtime-tokio = "0.3.0-alpha.5" +futures-preview = { version = "0.3.0-alpha.17" } serde = { version = "1.0" } -tarpc = { version = "0.18", path = "../tarpc", features = ["serde1"] } +tarpc = { version = "0.18", path = "../tarpc", features = ["serde1", "tokio1"] } +tokio = "0.2.0-alpha.1" env_logger = "0.6" [lib] diff --git a/example-service/src/client.rs b/example-service/src/client.rs index c0e8615..609447f 100644 --- a/example-service/src/client.rs +++ b/example-service/src/client.rs @@ -10,7 +10,7 @@ use clap::{App, Arg}; use std::io; use tarpc::{client, context}; -#[runtime::main(runtime_tokio::Tokio)] +#[tokio::main] async fn main() -> io::Result<()> { let flags = App::new("Hello Client") .version("0.1") diff --git a/example-service/src/server.rs b/example-service/src/server.rs index acd6766..14991c3 100644 --- a/example-service/src/server.rs +++ b/example-service/src/server.rs @@ -37,7 +37,7 @@ impl World for HelloServer { } } -#[runtime::main(runtime_tokio::Tokio)] +#[tokio::main] async fn main() -> io::Result<()> { env_logger::init(); diff --git a/json-transport/Cargo.toml b/json-transport/Cargo.toml index 19d6185..f3cac61 100644 --- a/json-transport/Cargo.toml +++ b/json-transport/Cargo.toml @@ -13,7 +13,7 @@ readme = "../README.md" description = "A JSON-based transport for tarpc services." [dependencies] -futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } +futures-preview = { version = "0.3.0-alpha.17" } futures_legacy = { version = "0.1", package = "futures" } pin-utils = "0.1.0-alpha.4" serde = "1.0" diff --git a/plugins/Cargo.toml b/plugins/Cargo.toml index c3dc776..3901d90 100644 --- a/plugins/Cargo.toml +++ b/plugins/Cargo.toml @@ -27,6 +27,6 @@ proc-macro2 = "0.4" proc-macro = true [dev-dependencies] -futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } +futures-preview = { version = "0.3.0-alpha.17" } serde = { version = "1.0", features = ["derive"] } tarpc = { path = "../tarpc" } diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index ab680fe..bd24f8f 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -15,25 +15,21 @@ description = "An RPC framework for Rust with a focus on ease of use." [features] default = [] serde1 = ["trace/serde", "serde", "serde/derive"] +tokio1 = ["tokio"] [dependencies] fnv = "1.0" -futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } +futures-preview = { version = "0.3.0-alpha.17" } humantime = "1.0" log = "0.4" -once_cell = "0.2" pin-utils = "0.1.0-alpha.4" rand = "0.7" -runtime = "0.3.0-alpha.6" -runtime-raw = "0.3.0-alpha.4" -tokio-timer = "0.2" +tokio-timer = "0.3.0-alpha.1" trace = { package = "tarpc-trace", version = "0.2", path = "../trace" } serde = { optional = true, version = "1.0" } +tokio = { optional = true, version = "0.2.0-alpha.1" } [dev-dependencies] futures-test-preview = { version = "0.3.0-alpha.17" } env_logger = "0.6" -runtime-tokio = "0.3.0-alpha.5" -tokio = "0.1" -tokio-executor = "0.1" assert_matches = "1.0" diff --git a/rpc/src/client/channel.rs b/rpc/src/client/channel.rs index 41be17b..f974b5d 100644 --- a/rpc/src/client/channel.rs +++ b/rpc/src/client/channel.rs @@ -6,7 +6,7 @@ use crate::{ context, - util::{deadline_compat, Compact, TimeUntil}, + util::{Compact, TimeUntil}, ClientMessage, PollIo, Request, Response, Transport, }; use fnv::FnvHashMap; @@ -18,7 +18,6 @@ use futures::{ task::Context, Poll, }; -use humantime::format_rfc3339; use log::{debug, info, trace}; use pin_utils::{unsafe_pinned, unsafe_unpinned}; use std::{ @@ -29,9 +28,9 @@ use std::{ atomic::{AtomicU64, Ordering}, Arc, }, - time::Instant, }; use trace::SpanId; +use tokio_timer::{timeout, Timeout}; use super::{Config, NewClient}; @@ -117,11 +116,9 @@ impl Channel { ctx.trace_context.span_id = SpanId::random(&mut rand::thread_rng()); let timeout = ctx.deadline.time_until(); - let deadline = Instant::now() + timeout; trace!( - "[{}] Queuing request with deadline {} (timeout {:?}).", + "[{}] Queuing request with timeout {:?}.", ctx.trace_id(), - format_rfc3339(ctx.deadline), timeout, ); @@ -137,7 +134,7 @@ impl Channel { response_completion, })), DispatchResponse { - response: deadline_compat::Deadline::new(response, deadline), + response: Timeout::new(response, timeout), complete: false, request_id, cancellation, @@ -160,7 +157,7 @@ impl Channel { /// arrives off the wire. #[derive(Debug)] struct DispatchResponse { - response: deadline_compat::Deadline>>, + response: Timeout>>, ctx: context::Context, complete: bool, cancellation: RequestCancellation, @@ -190,38 +187,12 @@ impl Future for DispatchResponse { } } } - Err(e) => Err({ - let trace_id = *self.as_mut().ctx().trace_id(); - - if e.is_elapsed() { - io::Error::new( - io::ErrorKind::TimedOut, - "Client dropped expired request.".to_string(), - ) - } else if e.is_timer() { - let e = e.into_timer().unwrap(); - if e.is_at_capacity() { - io::Error::new( - io::ErrorKind::Other, - "Cancelling request because an expiration could not be set \ - due to the timer being at capacity." - .to_string(), - ) - } else if e.is_shutdown() { - panic!("[{}] Timer was shutdown", trace_id) - } else { - panic!("[{}] Unrecognized timer error: {}", trace_id, e) - } - } else if e.is_inner() { - // The oneshot is Canceled when the dispatch task ends. In that case, - // there's nothing listening on the other side, so there's no point in - // propagating cancellation. - self.complete = true; - io::Error::from(io::ErrorKind::ConnectionReset) - } else { - panic!("[{}] Unrecognized deadline error: {:?}", trace_id, e) - } - }), + Err(timeout::Elapsed{..}) => Err( + io::Error::new( + io::ErrorKind::TimedOut, + "Client dropped expired request.".to_string(), + ) + ), }) } } @@ -752,7 +723,6 @@ mod tests { client::Config, context, transport::{self, channel::UnboundedChannel}, - util::deadline_compat, ClientMessage, Response, }; use fnv::FnvHashMap; @@ -764,16 +734,17 @@ mod tests { }; use futures_test::task::noop_waker_ref; use std::time::Duration; - use std::{pin::Pin, sync::atomic::AtomicU64, sync::Arc, time::Instant}; + use std::{pin::Pin, sync::atomic::AtomicU64, sync::Arc}; + use tokio_timer::Timeout; + use tokio::runtime::current_thread; #[test] fn dispatch_response_cancels_on_timeout() { - let past_deadline = Instant::now() - Duration::from_secs(1); let (_response_completion, response) = oneshot::channel(); let (cancellation, mut canceled_requests) = cancellations(); let resp = DispatchResponse:: { - // Deadline in the past should cause resp to error out when polled. - response: deadline_compat::Deadline::new(response, past_deadline), + // Timeout in the past should cause resp to error out when polled. + response: Timeout::new(response, Duration::from_secs(0)), complete: false, request_id: 3, cancellation, @@ -784,8 +755,7 @@ mod tests { let timer = tokio_timer::Timer::default(); tokio_timer::with_default( &timer.handle(), - &mut tokio_executor::enter().unwrap(), - |_| { + || { let _ = resp .as_mut() .poll(&mut Context::from_waker(&noop_waker_ref())); @@ -812,6 +782,10 @@ mod tests { assert_eq!(req.request, "hi".to_string()); } + fn block_on(f: F) -> F::Output { + current_thread::Runtime::new().unwrap().block_on(f) + } + // Regression test for https://github.com/google/tarpc/issues/220 #[test] fn stage_request_channel_dropped_doesnt_panic() { @@ -830,7 +804,7 @@ mod tests { message: Ok("hello".into()), }, ); - tokio::runtime::current_thread::block_on_all(dispatch.boxed().compat()).unwrap(); + block_on(dispatch).unwrap(); } #[test] @@ -918,21 +892,14 @@ mod tests { channel: &mut Channel, request: &str, ) -> DispatchResponse { - tokio::runtime::current_thread::block_on_all( - channel - .send(context::current(), request.to_string()) - .boxed() - .compat(), - ) - .unwrap() + block_on(channel.send(context::current(), request.to_string())).unwrap() } fn send_response( channel: &mut UnboundedChannel, Response>, response: Response, ) { - tokio::runtime::current_thread::block_on_all(channel.send(response).boxed().compat()) - .unwrap(); + block_on(channel.send(response)).unwrap(); } trait PollTest { diff --git a/rpc/src/client/mod.rs b/rpc/src/client/mod.rs index fdec802..0ad466a 100644 --- a/rpc/src/client/mod.rs +++ b/rpc/src/client/mod.rs @@ -141,19 +141,12 @@ where D: Future> + Send + 'static, { /// Helper method to spawn the dispatch on the default executor. + #[cfg(feature = "tokio1")] pub fn spawn(self) -> io::Result { let dispatch = self .dispatch .unwrap_or_else(move |e| error!("Connection broken: {}", e)); - crate::spawn(dispatch).map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!( - "Could not spawn client dispatch task. Is shutdown: {}", - e.is_shutdown() - ), - ) - })?; + tokio::spawn(dispatch); Ok(self.client) } } diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index d2d72e0..4eac3a5 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -34,11 +34,9 @@ pub(crate) mod util; pub use crate::{client::Client, server::Server, transport::Transport}; use futures::{ - task::{Poll, Spawn, SpawnError, SpawnExt}, - Future, FutureExt, + task::Poll, }; -use once_cell::sync::OnceCell; -use std::{cell::RefCell, io, time::SystemTime}; +use std::{io, time::SystemTime}; /// A message from a client to a server. #[derive(Debug)] @@ -123,37 +121,3 @@ impl Request { } pub(crate) type PollIo = Poll>>; - -static SEED_SPAWN: OnceCell> = OnceCell::new(); - -thread_local! { - static SPAWN: RefCell> = - RefCell::new(SEED_SPAWN.get().expect("init() must be called.").box_clone()); -} - -/// Initializes the RPC library with a mechanism to spawn futures on the user's runtime. -/// Client stubs and servers both use the initialized spawn. -/// -/// Init only has an effect the first time it is called. If called previously, successive calls to -/// init are noops. -pub fn init(spawn: impl Spawn + Clone + Send + Sync + 'static) { - let _ = SEED_SPAWN.set(Box::new(spawn)); -} - -pub(crate) fn spawn(future: impl Future + Send + 'static) -> Result<(), SpawnError> { - if SEED_SPAWN.get().is_some() { - SPAWN.with(|spawn| spawn.borrow_mut().spawn(future)) - } else { - runtime_raw::current_runtime().spawn_boxed(future.boxed()) - } -} - -trait CloneSpawn: Spawn { - fn box_clone(&self) -> Box; -} - -impl CloneSpawn for S { - fn box_clone(&self) -> Box { - Box::new(self.clone()) - } -} diff --git a/rpc/src/server/mod.rs b/rpc/src/server/mod.rs index 217972a..6558c6c 100644 --- a/rpc/src/server/mod.rs +++ b/rpc/src/server/mod.rs @@ -7,7 +7,7 @@ //! Provides a server that concurrently handles many connections sending multiplexed requests. use crate::{ - context, util::deadline_compat, util::Compact, util::TimeUntil, ClientMessage, PollIo, Request, + context, util::Compact, util::TimeUntil, ClientMessage, PollIo, Request, Response, ServerError, Transport, }; use fnv::FnvHashMap; @@ -20,7 +20,7 @@ use futures::{ task::{Context, Poll}, }; use humantime::format_rfc3339; -use log::{debug, error, info, trace, warn}; +use log::{debug, info, trace}; use pin_utils::{unsafe_pinned, unsafe_unpinned}; use std::{ fmt, @@ -28,10 +28,9 @@ use std::{ io, marker::PhantomData, pin::Pin, - time::{Instant, SystemTime}, + time::SystemTime, }; -use tokio_timer::timeout; -use trace::{self, TraceId}; +use tokio_timer::{Timeout, timeout}; mod filter; #[cfg(test)] @@ -154,6 +153,7 @@ where } /// Responds to all requests with `server`. + #[cfg(feature = "tokio1")] fn respond_with(self, server: S) -> Running where S: Serve, @@ -502,7 +502,7 @@ where request_id, ctx, deadline, - f: deadline_compat::Deadline::new(response, Instant::now() + timeout), + f: Timeout::new(response, timeout), response: None, response_tx: self.as_mut().responses_tx().clone(), }; @@ -541,7 +541,7 @@ struct Resp { request_id: u64, ctx: context::Context, deadline: SystemTime, - f: deadline_compat::Deadline, + f: Timeout, response: Option>, response_tx: mpsc::Sender<(context::Context, Response)>, } @@ -554,7 +554,7 @@ enum RespState { } impl Resp { - unsafe_pinned!(f: deadline_compat::Deadline); + unsafe_pinned!(f: Timeout); unsafe_pinned!(response_tx: mpsc::Sender<(context::Context, Response)>); unsafe_unpinned!(response: Option>); unsafe_unpinned!(state: RespState); @@ -575,8 +575,21 @@ where request_id: self.request_id, message: match result { Ok(message) => Ok(message), - Err(e) => { - Err(make_server_error(e, *self.ctx.trace_id(), self.deadline)) + Err(timeout::Elapsed{..}) => { + debug!( + "[{}] Response did not complete before deadline of {}s.", + self.ctx.trace_id(), + format_rfc3339(self.deadline) + ); + // No point in responding, since the client will have dropped the + // request. + Err(ServerError { + kind: io::ErrorKind::TimedOut, + detail: Some(format!( + "Response did not complete before deadline of {}s.", + format_rfc3339(self.deadline) + )), + }) } }, }); @@ -636,45 +649,6 @@ where } } -fn make_server_error( - e: timeout::Error<()>, - trace_id: TraceId, - deadline: SystemTime, -) -> ServerError { - if e.is_elapsed() { - debug!( - "[{}] Response did not complete before deadline of {}s.", - trace_id, - format_rfc3339(deadline) - ); - // No point in responding, since the client will have dropped the request. - ServerError { - kind: io::ErrorKind::TimedOut, - detail: Some(format!( - "Response did not complete before deadline of {}s.", - format_rfc3339(deadline) - )), - } - } else if e.is_timer() { - error!( - "[{}] Response failed because of an issue with a timer: {:?}", - trace_id, e - ); - - ServerError { - kind: io::ErrorKind::Other, - detail: Some(format!("{:?}", e)), - } - } else { - error!("[{}] Unexpected response failure: {:?}", trace_id, e); - - ServerError { - kind: io::ErrorKind::Other, - detail: Some(format!("Server unexpectedly failed to respond: {:?}", e)), - } - } -} - // Send + 'static execution helper methods. impl ClientHandler @@ -687,18 +661,12 @@ where { /// Runs the client handler until completion by spawning each /// request handler onto the default executor. + #[cfg(feature = "tokio1")] pub fn execute(self) -> impl Future { self.try_for_each(|request_handler| { async { - crate::spawn(request_handler).map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!( - "Could not spawn response task. Is shutdown: {}", - e.is_shutdown() - ), - ) - }) + tokio::spawn(request_handler); + Ok(()) } }) .unwrap_or_else(|e| info!("ClientHandler errored out: {}", e)) @@ -708,16 +676,19 @@ where /// A future that drives the server by spawning channels and request handlers on the default /// executor. #[derive(Debug)] +#[cfg(feature = "tokio1")] pub struct Running { incoming: St, server: Se, } +#[cfg(feature = "tokio1")] impl Running { unsafe_pinned!(incoming: St); unsafe_unpinned!(server: Se); } +#[cfg(feature = "tokio1")] impl Future for Running where St: Sized + Stream, @@ -731,13 +702,11 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { while let Some(channel) = ready!(self.as_mut().incoming().poll_next(cx)) { - if let Err(e) = crate::spawn( + tokio::spawn( channel .respond_with(self.as_mut().server().clone()) .execute(), - ) { - warn!("Failed to spawn channel handler: {:?}", e); - } + ); } info!("Server shutting down."); Poll::Ready(()) diff --git a/rpc/src/transport/channel.rs b/rpc/src/transport/channel.rs index c38c28f..76bd5b8 100644 --- a/rpc/src/transport/channel.rs +++ b/rpc/src/transport/channel.rs @@ -87,12 +87,12 @@ mod tests { use log::trace; use std::io; - #[runtime::test(runtime_tokio::Tokio)] + #[tokio::test] async fn integration() -> io::Result<()> { let _ = env_logger::try_init(); let (client_channel, server_channel) = transport::channel::unbounded(); - crate::spawn( + tokio::spawn( Server::default() .incoming(stream::once(future::ready(server_channel))) .respond_with(|_ctx, request: String| { @@ -103,8 +103,7 @@ mod tests { ) })) }), - ) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + ); let mut client = client::new(client::Config::default(), client_channel).spawn()?; diff --git a/rpc/src/util/deadline_compat.rs b/rpc/src/util/deadline_compat.rs deleted file mode 100644 index 7898df8..0000000 --- a/rpc/src/util/deadline_compat.rs +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2018 Google LLC -// -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -use futures::{ - compat::*, - prelude::*, - ready, - task::{Context, Poll}, -}; -use pin_utils::unsafe_pinned; -use std::pin::Pin; -use std::time::Instant; -use tokio_timer::{timeout, Delay}; - -#[must_use = "futures do nothing unless polled"] -#[derive(Debug)] -pub struct Deadline { - future: T, - delay: Compat01As03, -} - -impl Deadline { - unsafe_pinned!(future: T); - unsafe_pinned!(delay: Compat01As03); - - /// Create a new `Deadline` that completes when `future` completes or when - /// `deadline` is reached. - pub fn new(future: T, deadline: Instant) -> Deadline { - Deadline::new_with_delay(future, Delay::new(deadline)) - } - - pub(crate) fn new_with_delay(future: T, delay: Delay) -> Deadline { - Deadline { - future, - delay: delay.compat(), - } - } - - /// Gets a mutable reference to the underlying future in this deadline. - pub fn get_mut(&mut self) -> &mut T { - &mut self.future - } -} -impl Future for Deadline -where - T: Future, -{ - type Output = Result>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // First, try polling the future - match self.as_mut().future().poll(cx) { - Poll::Ready(v) => return Poll::Ready(Ok(v)), - Poll::Pending => {} - } - - let delay = self.delay().poll_unpin(cx); - - // Now check the timer - match ready!(delay) { - Ok(_) => Poll::Ready(Err(timeout::Error::elapsed())), - Err(e) => Poll::Ready(Err(timeout::Error::timer(e))), - } - } -} diff --git a/rpc/src/util/mod.rs b/rpc/src/util/mod.rs index e97d7ac..12a69b6 100644 --- a/rpc/src/util/mod.rs +++ b/rpc/src/util/mod.rs @@ -10,7 +10,6 @@ use std::{ time::{Duration, SystemTime}, }; -pub mod deadline_compat; #[cfg(feature = "serde")] pub mod serde; diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index 47b9d36..032b6ba 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -14,6 +14,7 @@ description = "An RPC framework for Rust with a focus on ease of use." [features] serde1 = ["rpc/serde1", "tarpc-plugins/serde1", "serde", "serde/derive"] +tokio1 = ["rpc/tokio1"] [badges] travis-ci = { repository = "google/tarpc" } @@ -29,14 +30,12 @@ bincode = "1.0" bincode-transport = { package = "tarpc-bincode-transport", version = "0.7", path = "../bincode-transport" } bytes = { version = "0.4", features = ["serde"] } env_logger = "0.6" -futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } +futures-preview = { version = "0.3.0-alpha.17" } humantime = "1.0" log = "0.4" -runtime = "0.3.0-alpha.6" -runtime-tokio = "0.3.0-alpha.5" -tokio-tcp = "0.1" +tokio-tcp = "0.2.0-alpha.1" pin-utils = "0.1.0-alpha.4" -tokio = "0.1" +tokio = "0.2.0-alpha.1" [[example]] name = "server_calling_server" diff --git a/tarpc/examples/pubsub.rs b/tarpc/examples/pubsub.rs index 9444f78..e12baaa 100644 --- a/tarpc/examples/pubsub.rs +++ b/tarpc/examples/pubsub.rs @@ -63,7 +63,7 @@ impl Subscriber { let incoming = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())? .filter_map(|r| future::ready(r.ok())); let addr = incoming.get_ref().local_addr(); - let _ = runtime::spawn( + tokio::spawn( server::new(config) .incoming(incoming) .take(1) @@ -140,14 +140,14 @@ impl publisher::Publisher for Publisher { } } -#[runtime::main(runtime_tokio::Tokio)] +#[tokio::main] async fn main() -> io::Result<()> { env_logger::init(); let transport = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())? .filter_map(|r| future::ready(r.ok())); let publisher_addr = transport.get_ref().local_addr(); - let _ = runtime::spawn( + tokio::spawn( transport .take(1) .map(server::BaseChannel::with_defaults) diff --git a/tarpc/examples/readme.rs b/tarpc/examples/readme.rs index c4c7669..ef073b4 100644 --- a/tarpc/examples/readme.rs +++ b/tarpc/examples/readme.rs @@ -39,7 +39,7 @@ impl World for HelloServer { } } -#[runtime::main(runtime_tokio::Tokio)] +#[tokio::main] async fn main() -> io::Result<()> { // bincode_transport is provided by the associated crate bincode-transport. It makes it easy // to start up a serde-powered bincode serialization strategy over TCP. @@ -61,7 +61,7 @@ async fn main() -> io::Result<()> { .execute() .await; }; - let _ = runtime::spawn(server); + tokio::spawn(server); let transport = bincode_transport::connect(&addr).await?; diff --git a/tarpc/examples/server_calling_server.rs b/tarpc/examples/server_calling_server.rs index 400c69a..b53965b 100644 --- a/tarpc/examples/server_calling_server.rs +++ b/tarpc/examples/server_calling_server.rs @@ -64,7 +64,7 @@ impl DoubleService for DoubleServer { } } -#[runtime::main(runtime_tokio::Tokio)] +#[tokio::main] async fn main() -> io::Result<()> { env_logger::init(); @@ -75,7 +75,7 @@ async fn main() -> io::Result<()> { .incoming(add_listener) .take(1) .respond_with(AddServer.serve()); - let _ = runtime::spawn(add_server); + tokio::spawn(add_server); let to_add_server = bincode_transport::connect(&addr).await?; let add_client = add::AddClient::new(client::Config::default(), to_add_server).spawn()?; @@ -87,7 +87,7 @@ async fn main() -> io::Result<()> { .incoming(double_listener) .take(1) .respond_with(DoubleServer { add_client }.serve()); - let _ = runtime::spawn(double_server); + tokio::spawn(double_server); let to_double_server = bincode_transport::connect(&addr).await?; let mut double_client = diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index 0c49242..5522e08 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -63,9 +63,8 @@ //! your `Cargo.toml`: //! //! ```toml -//! futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } -//! tokio = "0.1" -//! runtime-tokio = "0.3.0-alpha.5" +//! futures-preview = { version = "0.3.0-alpha.17" } +//! tokio = "0.2.0-alpha.1" //! ``` //! //! In the following example, we use an in-process channel for communication between @@ -79,7 +78,6 @@ //! # extern crate futures; //! //! use futures::{ -//! compat::Executor01CompatExt, //! future::{self, Ready}, //! prelude::*, //! }; @@ -105,7 +103,6 @@ //! # #![feature(async_await)] //! # extern crate futures; //! # use futures::{ -//! # compat::Executor01CompatExt, //! # future::{self, Ready}, //! # prelude::*, //! # }; @@ -176,7 +173,7 @@ //! # future::ready(format!("Hello, {}!", name)) //! # } //! # } -//! #[runtime::main(runtime_tokio::Tokio)] +//! #[tokio::main] //! async fn main() -> io::Result<()> { //! let (client_transport, server_transport) = tarpc::transport::channel::unbounded(); //! @@ -186,7 +183,7 @@ //! .incoming(stream::once(future::ready(server_transport))) //! .respond_with(HelloServer.serve()); //! -//! let _ = runtime::spawn(server); +//! tokio::spawn(server); //! //! // WorldClient is generated by the macro. It has a constructor `new` that takes a config and //! // any Transport as input diff --git a/tarpc/tests/service_functional.rs b/tarpc/tests/service_functional.rs index 5d3db88..9d8542f 100644 --- a/tarpc/tests/service_functional.rs +++ b/tarpc/tests/service_functional.rs @@ -13,26 +13,6 @@ use tarpc::{ transport::channel, }; -trait RuntimeExt { - fn exec_bg(&mut self, future: impl Future + 'static); - fn exec(&mut self, future: F) -> Result - where - F: Future>; -} - -impl RuntimeExt for tokio::runtime::current_thread::Runtime { - fn exec_bg(&mut self, future: impl Future + 'static) { - self.spawn(Box::pin(future.unit_error()).compat()); - } - - fn exec(&mut self, future: F) -> Result - where - F: Future>, - { - self.block_on(futures::compat::Compat::new(Box::pin(future))) - } -} - #[tarpc_plugins::service] trait Service { async fn add(x: i32, y: i32) -> i32; @@ -56,13 +36,13 @@ impl Service for Server { } } -#[runtime::test(runtime_tokio::TokioCurrentThread)] +#[tokio::test] async fn sequential() -> io::Result<()> { let _ = env_logger::try_init(); let (tx, rx) = channel::unbounded(); - let _ = runtime::spawn( + tokio::spawn( BaseChannel::new(server::Config::default(), rx) .respond_with(Server.serve()) .execute() @@ -79,13 +59,13 @@ async fn sequential() -> io::Result<()> { } #[cfg(feature = "serde1")] -#[runtime::test(runtime_tokio::TokioCurrentThread)] +#[tokio::test] async fn serde() -> io::Result<()> { let _ = env_logger::try_init(); let transport = bincode_transport::listen(&([0, 0, 0, 0], 56789).into())?; let addr = transport.local_addr(); - let _ = runtime::spawn( + tokio::spawn( tarpc::Server::default() .incoming(transport.take(1).filter_map(|r| async { r.ok() })) .respond_with(Server.serve()), @@ -103,12 +83,12 @@ async fn serde() -> io::Result<()> { Ok(()) } -#[runtime::test(runtime_tokio::TokioCurrentThread)] +#[tokio::test] async fn concurrent() -> io::Result<()> { let _ = env_logger::try_init(); let (tx, rx) = channel::unbounded(); - let _ = runtime::spawn( + tokio::spawn( rpc::Server::default() .incoming(stream::once(ready(rx))) .respond_with(Server.serve()), @@ -162,14 +142,14 @@ fn in_memory_single_threaded() -> io::Result<()> { let server = BaseChannel::new(server::Config::default(), rx) .respond_with(().serve()) .try_for_each(|r| async move { Ok(r.await) }); - runtime.exec_bg(async { + runtime.spawn(async { if let Err(e) = server.await { warn!("Error while running server: {}", e); } }); let NewClient{mut client, dispatch} = InMemoryClient::new(client::Config::default(), tx); - runtime.exec_bg(async move { + runtime.spawn(async move { if let Err(e) = dispatch.await { warn!("Error while running client dispatch: {}", e) } @@ -177,13 +157,13 @@ fn in_memory_single_threaded() -> io::Result<()> { let rc = Rc::new(()); assert_matches!( - runtime.exec(client.strong_count(context::current(), rc.clone())), + runtime.block_on(client.strong_count(context::current(), rc.clone())), Ok(2) ); let _weak = Rc::downgrade(&rc); assert_matches!( - runtime.exec(client.weak_count(context::current(), rc)), + runtime.block_on(client.weak_count(context::current(), rc)), Ok(1) );