diff --git a/README.md b/README.md index b267d5d..e34b6eb 100644 --- a/README.md +++ b/README.md @@ -60,9 +60,8 @@ For this example, in addition to tarpc, also add two other dependencies to your `Cargo.toml`: ```toml -futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } -tokio = "0.1" -runtime-tokio = "0.3.0-alpha.5" +futures-preview = { version = "0.3.0-alpha.17" } +tokio = "0.2.0-alpha.1" ``` In the following example, we use an in-process channel for communication between @@ -75,7 +74,6 @@ First, let's set up the dependencies and service definition. #![feature(async_await)] use futures::{ - compat::Executor01CompatExt, future::{self, Ready}, prelude::*, }; @@ -123,7 +121,7 @@ tarpc also ships a that uses bincode over TCP. ```rust -#[runtime::main(runtime_tokio::Tokio)] +#[tokio::main] async fn main() -> io::Result<()> { let (client_transport, server_transport) = tarpc::transport::channel::unbounded(); @@ -133,7 +131,7 @@ async fn main() -> io::Result<()> { .incoming(stream::once(future::ready(server_transport))) .respond_with(HelloServer.serve()); - let _ = runtime::spawn(server); + tokio::spawn(server); // WorldClient is generated by the macro. It has a constructor `new` that takes a config and // any Transport as input diff --git a/example-service/Cargo.toml b/example-service/Cargo.toml index e2d5296..e287e92 100644 --- a/example-service/Cargo.toml +++ b/example-service/Cargo.toml @@ -15,11 +15,10 @@ description = "An example server built on tarpc." [dependencies] json-transport = { package = "tarpc-json-transport", version = "0.1", path = "../json-transport" } clap = "2.0" -futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } -runtime = "0.3.0-alpha.6" -runtime-tokio = "0.3.0-alpha.5" +futures-preview = { version = "0.3.0-alpha.17" } serde = { version = "1.0" } -tarpc = { version = "0.18", path = "../tarpc", features = ["serde1"] } +tarpc = { version = "0.18", path = "../tarpc", features = ["serde1", "tokio1"] } +tokio = "0.2.0-alpha.1" env_logger = "0.6" [lib] diff --git a/example-service/src/client.rs b/example-service/src/client.rs index c0e8615..609447f 100644 --- a/example-service/src/client.rs +++ b/example-service/src/client.rs @@ -10,7 +10,7 @@ use clap::{App, Arg}; use std::io; use tarpc::{client, context}; -#[runtime::main(runtime_tokio::Tokio)] +#[tokio::main] async fn main() -> io::Result<()> { let flags = App::new("Hello Client") .version("0.1") diff --git a/example-service/src/server.rs b/example-service/src/server.rs index acd6766..14991c3 100644 --- a/example-service/src/server.rs +++ b/example-service/src/server.rs @@ -37,7 +37,7 @@ impl World for HelloServer { } } -#[runtime::main(runtime_tokio::Tokio)] +#[tokio::main] async fn main() -> io::Result<()> { env_logger::init(); diff --git a/json-transport/Cargo.toml b/json-transport/Cargo.toml index 19d6185..f3cac61 100644 --- a/json-transport/Cargo.toml +++ b/json-transport/Cargo.toml @@ -13,7 +13,7 @@ readme = "../README.md" description = "A JSON-based transport for tarpc services." [dependencies] -futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } +futures-preview = { version = "0.3.0-alpha.17" } futures_legacy = { version = "0.1", package = "futures" } pin-utils = "0.1.0-alpha.4" serde = "1.0" diff --git a/plugins/Cargo.toml b/plugins/Cargo.toml index c3dc776..3901d90 100644 --- a/plugins/Cargo.toml +++ b/plugins/Cargo.toml @@ -27,6 +27,6 @@ proc-macro2 = "0.4" proc-macro = true [dev-dependencies] -futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } +futures-preview = { version = "0.3.0-alpha.17" } serde = { version = "1.0", features = ["derive"] } tarpc = { path = "../tarpc" } diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index ab680fe..bd24f8f 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -15,25 +15,21 @@ description = "An RPC framework for Rust with a focus on ease of use." [features] default = [] serde1 = ["trace/serde", "serde", "serde/derive"] +tokio1 = ["tokio"] [dependencies] fnv = "1.0" -futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } +futures-preview = { version = "0.3.0-alpha.17" } humantime = "1.0" log = "0.4" -once_cell = "0.2" pin-utils = "0.1.0-alpha.4" rand = "0.7" -runtime = "0.3.0-alpha.6" -runtime-raw = "0.3.0-alpha.4" -tokio-timer = "0.2" +tokio-timer = "0.3.0-alpha.1" trace = { package = "tarpc-trace", version = "0.2", path = "../trace" } serde = { optional = true, version = "1.0" } +tokio = { optional = true, version = "0.2.0-alpha.1" } [dev-dependencies] futures-test-preview = { version = "0.3.0-alpha.17" } env_logger = "0.6" -runtime-tokio = "0.3.0-alpha.5" -tokio = "0.1" -tokio-executor = "0.1" assert_matches = "1.0" diff --git a/rpc/src/client/channel.rs b/rpc/src/client/channel.rs index 41be17b..f974b5d 100644 --- a/rpc/src/client/channel.rs +++ b/rpc/src/client/channel.rs @@ -6,7 +6,7 @@ use crate::{ context, - util::{deadline_compat, Compact, TimeUntil}, + util::{Compact, TimeUntil}, ClientMessage, PollIo, Request, Response, Transport, }; use fnv::FnvHashMap; @@ -18,7 +18,6 @@ use futures::{ task::Context, Poll, }; -use humantime::format_rfc3339; use log::{debug, info, trace}; use pin_utils::{unsafe_pinned, unsafe_unpinned}; use std::{ @@ -29,9 +28,9 @@ use std::{ atomic::{AtomicU64, Ordering}, Arc, }, - time::Instant, }; use trace::SpanId; +use tokio_timer::{timeout, Timeout}; use super::{Config, NewClient}; @@ -117,11 +116,9 @@ impl Channel { ctx.trace_context.span_id = SpanId::random(&mut rand::thread_rng()); let timeout = ctx.deadline.time_until(); - let deadline = Instant::now() + timeout; trace!( - "[{}] Queuing request with deadline {} (timeout {:?}).", + "[{}] Queuing request with timeout {:?}.", ctx.trace_id(), - format_rfc3339(ctx.deadline), timeout, ); @@ -137,7 +134,7 @@ impl Channel { response_completion, })), DispatchResponse { - response: deadline_compat::Deadline::new(response, deadline), + response: Timeout::new(response, timeout), complete: false, request_id, cancellation, @@ -160,7 +157,7 @@ impl Channel { /// arrives off the wire. #[derive(Debug)] struct DispatchResponse { - response: deadline_compat::Deadline>>, + response: Timeout>>, ctx: context::Context, complete: bool, cancellation: RequestCancellation, @@ -190,38 +187,12 @@ impl Future for DispatchResponse { } } } - Err(e) => Err({ - let trace_id = *self.as_mut().ctx().trace_id(); - - if e.is_elapsed() { - io::Error::new( - io::ErrorKind::TimedOut, - "Client dropped expired request.".to_string(), - ) - } else if e.is_timer() { - let e = e.into_timer().unwrap(); - if e.is_at_capacity() { - io::Error::new( - io::ErrorKind::Other, - "Cancelling request because an expiration could not be set \ - due to the timer being at capacity." - .to_string(), - ) - } else if e.is_shutdown() { - panic!("[{}] Timer was shutdown", trace_id) - } else { - panic!("[{}] Unrecognized timer error: {}", trace_id, e) - } - } else if e.is_inner() { - // The oneshot is Canceled when the dispatch task ends. In that case, - // there's nothing listening on the other side, so there's no point in - // propagating cancellation. - self.complete = true; - io::Error::from(io::ErrorKind::ConnectionReset) - } else { - panic!("[{}] Unrecognized deadline error: {:?}", trace_id, e) - } - }), + Err(timeout::Elapsed{..}) => Err( + io::Error::new( + io::ErrorKind::TimedOut, + "Client dropped expired request.".to_string(), + ) + ), }) } } @@ -752,7 +723,6 @@ mod tests { client::Config, context, transport::{self, channel::UnboundedChannel}, - util::deadline_compat, ClientMessage, Response, }; use fnv::FnvHashMap; @@ -764,16 +734,17 @@ mod tests { }; use futures_test::task::noop_waker_ref; use std::time::Duration; - use std::{pin::Pin, sync::atomic::AtomicU64, sync::Arc, time::Instant}; + use std::{pin::Pin, sync::atomic::AtomicU64, sync::Arc}; + use tokio_timer::Timeout; + use tokio::runtime::current_thread; #[test] fn dispatch_response_cancels_on_timeout() { - let past_deadline = Instant::now() - Duration::from_secs(1); let (_response_completion, response) = oneshot::channel(); let (cancellation, mut canceled_requests) = cancellations(); let resp = DispatchResponse:: { - // Deadline in the past should cause resp to error out when polled. - response: deadline_compat::Deadline::new(response, past_deadline), + // Timeout in the past should cause resp to error out when polled. + response: Timeout::new(response, Duration::from_secs(0)), complete: false, request_id: 3, cancellation, @@ -784,8 +755,7 @@ mod tests { let timer = tokio_timer::Timer::default(); tokio_timer::with_default( &timer.handle(), - &mut tokio_executor::enter().unwrap(), - |_| { + || { let _ = resp .as_mut() .poll(&mut Context::from_waker(&noop_waker_ref())); @@ -812,6 +782,10 @@ mod tests { assert_eq!(req.request, "hi".to_string()); } + fn block_on(f: F) -> F::Output { + current_thread::Runtime::new().unwrap().block_on(f) + } + // Regression test for https://github.com/google/tarpc/issues/220 #[test] fn stage_request_channel_dropped_doesnt_panic() { @@ -830,7 +804,7 @@ mod tests { message: Ok("hello".into()), }, ); - tokio::runtime::current_thread::block_on_all(dispatch.boxed().compat()).unwrap(); + block_on(dispatch).unwrap(); } #[test] @@ -918,21 +892,14 @@ mod tests { channel: &mut Channel, request: &str, ) -> DispatchResponse { - tokio::runtime::current_thread::block_on_all( - channel - .send(context::current(), request.to_string()) - .boxed() - .compat(), - ) - .unwrap() + block_on(channel.send(context::current(), request.to_string())).unwrap() } fn send_response( channel: &mut UnboundedChannel, Response>, response: Response, ) { - tokio::runtime::current_thread::block_on_all(channel.send(response).boxed().compat()) - .unwrap(); + block_on(channel.send(response)).unwrap(); } trait PollTest { diff --git a/rpc/src/client/mod.rs b/rpc/src/client/mod.rs index fdec802..0ad466a 100644 --- a/rpc/src/client/mod.rs +++ b/rpc/src/client/mod.rs @@ -141,19 +141,12 @@ where D: Future> + Send + 'static, { /// Helper method to spawn the dispatch on the default executor. + #[cfg(feature = "tokio1")] pub fn spawn(self) -> io::Result { let dispatch = self .dispatch .unwrap_or_else(move |e| error!("Connection broken: {}", e)); - crate::spawn(dispatch).map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!( - "Could not spawn client dispatch task. Is shutdown: {}", - e.is_shutdown() - ), - ) - })?; + tokio::spawn(dispatch); Ok(self.client) } } diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index d2d72e0..4eac3a5 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -34,11 +34,9 @@ pub(crate) mod util; pub use crate::{client::Client, server::Server, transport::Transport}; use futures::{ - task::{Poll, Spawn, SpawnError, SpawnExt}, - Future, FutureExt, + task::Poll, }; -use once_cell::sync::OnceCell; -use std::{cell::RefCell, io, time::SystemTime}; +use std::{io, time::SystemTime}; /// A message from a client to a server. #[derive(Debug)] @@ -123,37 +121,3 @@ impl Request { } pub(crate) type PollIo = Poll>>; - -static SEED_SPAWN: OnceCell> = OnceCell::new(); - -thread_local! { - static SPAWN: RefCell> = - RefCell::new(SEED_SPAWN.get().expect("init() must be called.").box_clone()); -} - -/// Initializes the RPC library with a mechanism to spawn futures on the user's runtime. -/// Client stubs and servers both use the initialized spawn. -/// -/// Init only has an effect the first time it is called. If called previously, successive calls to -/// init are noops. -pub fn init(spawn: impl Spawn + Clone + Send + Sync + 'static) { - let _ = SEED_SPAWN.set(Box::new(spawn)); -} - -pub(crate) fn spawn(future: impl Future + Send + 'static) -> Result<(), SpawnError> { - if SEED_SPAWN.get().is_some() { - SPAWN.with(|spawn| spawn.borrow_mut().spawn(future)) - } else { - runtime_raw::current_runtime().spawn_boxed(future.boxed()) - } -} - -trait CloneSpawn: Spawn { - fn box_clone(&self) -> Box; -} - -impl CloneSpawn for S { - fn box_clone(&self) -> Box { - Box::new(self.clone()) - } -} diff --git a/rpc/src/server/mod.rs b/rpc/src/server/mod.rs index 217972a..6558c6c 100644 --- a/rpc/src/server/mod.rs +++ b/rpc/src/server/mod.rs @@ -7,7 +7,7 @@ //! Provides a server that concurrently handles many connections sending multiplexed requests. use crate::{ - context, util::deadline_compat, util::Compact, util::TimeUntil, ClientMessage, PollIo, Request, + context, util::Compact, util::TimeUntil, ClientMessage, PollIo, Request, Response, ServerError, Transport, }; use fnv::FnvHashMap; @@ -20,7 +20,7 @@ use futures::{ task::{Context, Poll}, }; use humantime::format_rfc3339; -use log::{debug, error, info, trace, warn}; +use log::{debug, info, trace}; use pin_utils::{unsafe_pinned, unsafe_unpinned}; use std::{ fmt, @@ -28,10 +28,9 @@ use std::{ io, marker::PhantomData, pin::Pin, - time::{Instant, SystemTime}, + time::SystemTime, }; -use tokio_timer::timeout; -use trace::{self, TraceId}; +use tokio_timer::{Timeout, timeout}; mod filter; #[cfg(test)] @@ -154,6 +153,7 @@ where } /// Responds to all requests with `server`. + #[cfg(feature = "tokio1")] fn respond_with(self, server: S) -> Running where S: Serve, @@ -502,7 +502,7 @@ where request_id, ctx, deadline, - f: deadline_compat::Deadline::new(response, Instant::now() + timeout), + f: Timeout::new(response, timeout), response: None, response_tx: self.as_mut().responses_tx().clone(), }; @@ -541,7 +541,7 @@ struct Resp { request_id: u64, ctx: context::Context, deadline: SystemTime, - f: deadline_compat::Deadline, + f: Timeout, response: Option>, response_tx: mpsc::Sender<(context::Context, Response)>, } @@ -554,7 +554,7 @@ enum RespState { } impl Resp { - unsafe_pinned!(f: deadline_compat::Deadline); + unsafe_pinned!(f: Timeout); unsafe_pinned!(response_tx: mpsc::Sender<(context::Context, Response)>); unsafe_unpinned!(response: Option>); unsafe_unpinned!(state: RespState); @@ -575,8 +575,21 @@ where request_id: self.request_id, message: match result { Ok(message) => Ok(message), - Err(e) => { - Err(make_server_error(e, *self.ctx.trace_id(), self.deadline)) + Err(timeout::Elapsed{..}) => { + debug!( + "[{}] Response did not complete before deadline of {}s.", + self.ctx.trace_id(), + format_rfc3339(self.deadline) + ); + // No point in responding, since the client will have dropped the + // request. + Err(ServerError { + kind: io::ErrorKind::TimedOut, + detail: Some(format!( + "Response did not complete before deadline of {}s.", + format_rfc3339(self.deadline) + )), + }) } }, }); @@ -636,45 +649,6 @@ where } } -fn make_server_error( - e: timeout::Error<()>, - trace_id: TraceId, - deadline: SystemTime, -) -> ServerError { - if e.is_elapsed() { - debug!( - "[{}] Response did not complete before deadline of {}s.", - trace_id, - format_rfc3339(deadline) - ); - // No point in responding, since the client will have dropped the request. - ServerError { - kind: io::ErrorKind::TimedOut, - detail: Some(format!( - "Response did not complete before deadline of {}s.", - format_rfc3339(deadline) - )), - } - } else if e.is_timer() { - error!( - "[{}] Response failed because of an issue with a timer: {:?}", - trace_id, e - ); - - ServerError { - kind: io::ErrorKind::Other, - detail: Some(format!("{:?}", e)), - } - } else { - error!("[{}] Unexpected response failure: {:?}", trace_id, e); - - ServerError { - kind: io::ErrorKind::Other, - detail: Some(format!("Server unexpectedly failed to respond: {:?}", e)), - } - } -} - // Send + 'static execution helper methods. impl ClientHandler @@ -687,18 +661,12 @@ where { /// Runs the client handler until completion by spawning each /// request handler onto the default executor. + #[cfg(feature = "tokio1")] pub fn execute(self) -> impl Future { self.try_for_each(|request_handler| { async { - crate::spawn(request_handler).map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!( - "Could not spawn response task. Is shutdown: {}", - e.is_shutdown() - ), - ) - }) + tokio::spawn(request_handler); + Ok(()) } }) .unwrap_or_else(|e| info!("ClientHandler errored out: {}", e)) @@ -708,16 +676,19 @@ where /// A future that drives the server by spawning channels and request handlers on the default /// executor. #[derive(Debug)] +#[cfg(feature = "tokio1")] pub struct Running { incoming: St, server: Se, } +#[cfg(feature = "tokio1")] impl Running { unsafe_pinned!(incoming: St); unsafe_unpinned!(server: Se); } +#[cfg(feature = "tokio1")] impl Future for Running where St: Sized + Stream, @@ -731,13 +702,11 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { while let Some(channel) = ready!(self.as_mut().incoming().poll_next(cx)) { - if let Err(e) = crate::spawn( + tokio::spawn( channel .respond_with(self.as_mut().server().clone()) .execute(), - ) { - warn!("Failed to spawn channel handler: {:?}", e); - } + ); } info!("Server shutting down."); Poll::Ready(()) diff --git a/rpc/src/transport/channel.rs b/rpc/src/transport/channel.rs index c38c28f..76bd5b8 100644 --- a/rpc/src/transport/channel.rs +++ b/rpc/src/transport/channel.rs @@ -87,12 +87,12 @@ mod tests { use log::trace; use std::io; - #[runtime::test(runtime_tokio::Tokio)] + #[tokio::test] async fn integration() -> io::Result<()> { let _ = env_logger::try_init(); let (client_channel, server_channel) = transport::channel::unbounded(); - crate::spawn( + tokio::spawn( Server::default() .incoming(stream::once(future::ready(server_channel))) .respond_with(|_ctx, request: String| { @@ -103,8 +103,7 @@ mod tests { ) })) }), - ) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + ); let mut client = client::new(client::Config::default(), client_channel).spawn()?; diff --git a/rpc/src/util/deadline_compat.rs b/rpc/src/util/deadline_compat.rs deleted file mode 100644 index 7898df8..0000000 --- a/rpc/src/util/deadline_compat.rs +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2018 Google LLC -// -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -use futures::{ - compat::*, - prelude::*, - ready, - task::{Context, Poll}, -}; -use pin_utils::unsafe_pinned; -use std::pin::Pin; -use std::time::Instant; -use tokio_timer::{timeout, Delay}; - -#[must_use = "futures do nothing unless polled"] -#[derive(Debug)] -pub struct Deadline { - future: T, - delay: Compat01As03, -} - -impl Deadline { - unsafe_pinned!(future: T); - unsafe_pinned!(delay: Compat01As03); - - /// Create a new `Deadline` that completes when `future` completes or when - /// `deadline` is reached. - pub fn new(future: T, deadline: Instant) -> Deadline { - Deadline::new_with_delay(future, Delay::new(deadline)) - } - - pub(crate) fn new_with_delay(future: T, delay: Delay) -> Deadline { - Deadline { - future, - delay: delay.compat(), - } - } - - /// Gets a mutable reference to the underlying future in this deadline. - pub fn get_mut(&mut self) -> &mut T { - &mut self.future - } -} -impl Future for Deadline -where - T: Future, -{ - type Output = Result>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // First, try polling the future - match self.as_mut().future().poll(cx) { - Poll::Ready(v) => return Poll::Ready(Ok(v)), - Poll::Pending => {} - } - - let delay = self.delay().poll_unpin(cx); - - // Now check the timer - match ready!(delay) { - Ok(_) => Poll::Ready(Err(timeout::Error::elapsed())), - Err(e) => Poll::Ready(Err(timeout::Error::timer(e))), - } - } -} diff --git a/rpc/src/util/mod.rs b/rpc/src/util/mod.rs index e97d7ac..12a69b6 100644 --- a/rpc/src/util/mod.rs +++ b/rpc/src/util/mod.rs @@ -10,7 +10,6 @@ use std::{ time::{Duration, SystemTime}, }; -pub mod deadline_compat; #[cfg(feature = "serde")] pub mod serde; diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index 47b9d36..032b6ba 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -14,6 +14,7 @@ description = "An RPC framework for Rust with a focus on ease of use." [features] serde1 = ["rpc/serde1", "tarpc-plugins/serde1", "serde", "serde/derive"] +tokio1 = ["rpc/tokio1"] [badges] travis-ci = { repository = "google/tarpc" } @@ -29,14 +30,12 @@ bincode = "1.0" bincode-transport = { package = "tarpc-bincode-transport", version = "0.7", path = "../bincode-transport" } bytes = { version = "0.4", features = ["serde"] } env_logger = "0.6" -futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } +futures-preview = { version = "0.3.0-alpha.17" } humantime = "1.0" log = "0.4" -runtime = "0.3.0-alpha.6" -runtime-tokio = "0.3.0-alpha.5" -tokio-tcp = "0.1" +tokio-tcp = "0.2.0-alpha.1" pin-utils = "0.1.0-alpha.4" -tokio = "0.1" +tokio = "0.2.0-alpha.1" [[example]] name = "server_calling_server" diff --git a/tarpc/examples/pubsub.rs b/tarpc/examples/pubsub.rs index 9444f78..e12baaa 100644 --- a/tarpc/examples/pubsub.rs +++ b/tarpc/examples/pubsub.rs @@ -63,7 +63,7 @@ impl Subscriber { let incoming = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())? .filter_map(|r| future::ready(r.ok())); let addr = incoming.get_ref().local_addr(); - let _ = runtime::spawn( + tokio::spawn( server::new(config) .incoming(incoming) .take(1) @@ -140,14 +140,14 @@ impl publisher::Publisher for Publisher { } } -#[runtime::main(runtime_tokio::Tokio)] +#[tokio::main] async fn main() -> io::Result<()> { env_logger::init(); let transport = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())? .filter_map(|r| future::ready(r.ok())); let publisher_addr = transport.get_ref().local_addr(); - let _ = runtime::spawn( + tokio::spawn( transport .take(1) .map(server::BaseChannel::with_defaults) diff --git a/tarpc/examples/readme.rs b/tarpc/examples/readme.rs index c4c7669..ef073b4 100644 --- a/tarpc/examples/readme.rs +++ b/tarpc/examples/readme.rs @@ -39,7 +39,7 @@ impl World for HelloServer { } } -#[runtime::main(runtime_tokio::Tokio)] +#[tokio::main] async fn main() -> io::Result<()> { // bincode_transport is provided by the associated crate bincode-transport. It makes it easy // to start up a serde-powered bincode serialization strategy over TCP. @@ -61,7 +61,7 @@ async fn main() -> io::Result<()> { .execute() .await; }; - let _ = runtime::spawn(server); + tokio::spawn(server); let transport = bincode_transport::connect(&addr).await?; diff --git a/tarpc/examples/server_calling_server.rs b/tarpc/examples/server_calling_server.rs index 400c69a..b53965b 100644 --- a/tarpc/examples/server_calling_server.rs +++ b/tarpc/examples/server_calling_server.rs @@ -64,7 +64,7 @@ impl DoubleService for DoubleServer { } } -#[runtime::main(runtime_tokio::Tokio)] +#[tokio::main] async fn main() -> io::Result<()> { env_logger::init(); @@ -75,7 +75,7 @@ async fn main() -> io::Result<()> { .incoming(add_listener) .take(1) .respond_with(AddServer.serve()); - let _ = runtime::spawn(add_server); + tokio::spawn(add_server); let to_add_server = bincode_transport::connect(&addr).await?; let add_client = add::AddClient::new(client::Config::default(), to_add_server).spawn()?; @@ -87,7 +87,7 @@ async fn main() -> io::Result<()> { .incoming(double_listener) .take(1) .respond_with(DoubleServer { add_client }.serve()); - let _ = runtime::spawn(double_server); + tokio::spawn(double_server); let to_double_server = bincode_transport::connect(&addr).await?; let mut double_client = diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index 0c49242..5522e08 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -63,9 +63,8 @@ //! your `Cargo.toml`: //! //! ```toml -//! futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } -//! tokio = "0.1" -//! runtime-tokio = "0.3.0-alpha.5" +//! futures-preview = { version = "0.3.0-alpha.17" } +//! tokio = "0.2.0-alpha.1" //! ``` //! //! In the following example, we use an in-process channel for communication between @@ -79,7 +78,6 @@ //! # extern crate futures; //! //! use futures::{ -//! compat::Executor01CompatExt, //! future::{self, Ready}, //! prelude::*, //! }; @@ -105,7 +103,6 @@ //! # #![feature(async_await)] //! # extern crate futures; //! # use futures::{ -//! # compat::Executor01CompatExt, //! # future::{self, Ready}, //! # prelude::*, //! # }; @@ -176,7 +173,7 @@ //! # future::ready(format!("Hello, {}!", name)) //! # } //! # } -//! #[runtime::main(runtime_tokio::Tokio)] +//! #[tokio::main] //! async fn main() -> io::Result<()> { //! let (client_transport, server_transport) = tarpc::transport::channel::unbounded(); //! @@ -186,7 +183,7 @@ //! .incoming(stream::once(future::ready(server_transport))) //! .respond_with(HelloServer.serve()); //! -//! let _ = runtime::spawn(server); +//! tokio::spawn(server); //! //! // WorldClient is generated by the macro. It has a constructor `new` that takes a config and //! // any Transport as input diff --git a/tarpc/tests/service_functional.rs b/tarpc/tests/service_functional.rs index 5d3db88..9d8542f 100644 --- a/tarpc/tests/service_functional.rs +++ b/tarpc/tests/service_functional.rs @@ -13,26 +13,6 @@ use tarpc::{ transport::channel, }; -trait RuntimeExt { - fn exec_bg(&mut self, future: impl Future + 'static); - fn exec(&mut self, future: F) -> Result - where - F: Future>; -} - -impl RuntimeExt for tokio::runtime::current_thread::Runtime { - fn exec_bg(&mut self, future: impl Future + 'static) { - self.spawn(Box::pin(future.unit_error()).compat()); - } - - fn exec(&mut self, future: F) -> Result - where - F: Future>, - { - self.block_on(futures::compat::Compat::new(Box::pin(future))) - } -} - #[tarpc_plugins::service] trait Service { async fn add(x: i32, y: i32) -> i32; @@ -56,13 +36,13 @@ impl Service for Server { } } -#[runtime::test(runtime_tokio::TokioCurrentThread)] +#[tokio::test] async fn sequential() -> io::Result<()> { let _ = env_logger::try_init(); let (tx, rx) = channel::unbounded(); - let _ = runtime::spawn( + tokio::spawn( BaseChannel::new(server::Config::default(), rx) .respond_with(Server.serve()) .execute() @@ -79,13 +59,13 @@ async fn sequential() -> io::Result<()> { } #[cfg(feature = "serde1")] -#[runtime::test(runtime_tokio::TokioCurrentThread)] +#[tokio::test] async fn serde() -> io::Result<()> { let _ = env_logger::try_init(); let transport = bincode_transport::listen(&([0, 0, 0, 0], 56789).into())?; let addr = transport.local_addr(); - let _ = runtime::spawn( + tokio::spawn( tarpc::Server::default() .incoming(transport.take(1).filter_map(|r| async { r.ok() })) .respond_with(Server.serve()), @@ -103,12 +83,12 @@ async fn serde() -> io::Result<()> { Ok(()) } -#[runtime::test(runtime_tokio::TokioCurrentThread)] +#[tokio::test] async fn concurrent() -> io::Result<()> { let _ = env_logger::try_init(); let (tx, rx) = channel::unbounded(); - let _ = runtime::spawn( + tokio::spawn( rpc::Server::default() .incoming(stream::once(ready(rx))) .respond_with(Server.serve()), @@ -162,14 +142,14 @@ fn in_memory_single_threaded() -> io::Result<()> { let server = BaseChannel::new(server::Config::default(), rx) .respond_with(().serve()) .try_for_each(|r| async move { Ok(r.await) }); - runtime.exec_bg(async { + runtime.spawn(async { if let Err(e) = server.await { warn!("Error while running server: {}", e); } }); let NewClient{mut client, dispatch} = InMemoryClient::new(client::Config::default(), tx); - runtime.exec_bg(async move { + runtime.spawn(async move { if let Err(e) = dispatch.await { warn!("Error while running client dispatch: {}", e) } @@ -177,13 +157,13 @@ fn in_memory_single_threaded() -> io::Result<()> { let rc = Rc::new(()); assert_matches!( - runtime.exec(client.strong_count(context::current(), rc.clone())), + runtime.block_on(client.strong_count(context::current(), rc.clone())), Ok(2) ); let _weak = Rc::downgrade(&rc); assert_matches!( - runtime.exec(client.weak_count(context::current(), rc)), + runtime.block_on(client.weak_count(context::current(), rc)), Ok(1) );