From fe4eab38f1467c6d56bf1e33eaf1c8de95a45d03 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Tue, 7 Feb 2017 19:58:11 -0800 Subject: [PATCH] Change FutureService's associated types to be bounded by IntoFuture rather than Future. It's strictly more flexible, because everything that impls Future impls IntoFuture, and it additionally allows returning types like Result. Which is nice. --- examples/concurrency.rs | 20 ++++++++------------ examples/pubsub.rs | 4 ++-- examples/readme_futures.rs | 4 ++-- examples/server_calling_server.rs | 4 ++-- examples/throughput.rs | 9 +++++---- examples/two_clients.rs | 8 ++++---- src/macros.rs | 19 ++++++++++++------- 7 files changed, 35 insertions(+), 33 deletions(-) diff --git a/examples/concurrency.rs b/examples/concurrency.rs index 7cfb84f..f6c420f 100644 --- a/examples/concurrency.rs +++ b/examples/concurrency.rs @@ -12,6 +12,7 @@ extern crate env_logger; extern crate futures; #[macro_use] extern crate log; +extern crate serde; #[macro_use] extern crate tarpc; extern crate tokio_core; @@ -30,7 +31,7 @@ use tarpc::util::{FirstSocketAddr, Never}; use tokio_core::reactor; service! { - rpc read(size: u32) -> Vec; + rpc read(size: u32) -> serde::bytes::ByteBuf; } #[derive(Clone)] @@ -49,19 +50,19 @@ impl Server { } impl FutureService for Server { - type ReadFut = CpuFuture, Never>; + type ReadFut = CpuFuture; fn read(&self, size: u32) -> Self::ReadFut { let request_number = self.request_count.fetch_add(1, Ordering::SeqCst); debug!("Server received read({}) no. {}", size, request_number); self.pool .spawn(futures::lazy(move || { - let mut vec: Vec = Vec::with_capacity(size as usize); + let mut vec = Vec::with_capacity(size as usize); for i in 0..size { vec.push(((i % 2) << 8) as u8); } debug!("Server sending response no. {}", request_number); - futures::finished(vec) + Ok(vec.into()) })) } } @@ -106,11 +107,9 @@ fn run_once(clients: Vec, concurrency: u32) -> impl Future + 'static { let start = Instant::now(); - let num_clients = clients.len(); futures::stream::futures_unordered((0..concurrency as usize) - .map(|iteration| (iteration + 1, iteration % num_clients)) - .map(|(iteration, client_idx)| { - let client = &clients[client_idx]; + .zip(clients.iter().enumerate().cycle()) + .map(|(iteration, (client_idx, client))| { let start = Instant::now(); debug!("Client {} reading (iteration {})...", client_idx, iteration); client.read(CHUNK_SIZE) @@ -181,10 +180,7 @@ fn main() { info!("Client {} connecting...", i); FutureClient::connect(addr, client::Options::default().remote(remote)) .map_err(|e| panic!(e)) - }) - // Need an intermediate collection to connect the clients in parallel, - // because `futures::collect` iterates sequentially. - .collect::>(); + }); let run = futures::collect(clients).and_then(|clients| run_once(clients, concurrency)); diff --git a/examples/pubsub.rs b/examples/pubsub.rs index e2f9e1e..e6d5ca1 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -48,11 +48,11 @@ struct Subscriber { } impl subscriber::FutureService for Subscriber { - type ReceiveFut = futures::Finished<(), Never>; + type ReceiveFut = Result<(), Never>; fn receive(&self, message: String) -> Self::ReceiveFut { println!("{} received message: {}", self.id, message); - futures::finished(()) + Ok(()) } } diff --git a/examples/readme_futures.rs b/examples/readme_futures.rs index 7bf66d2..5553aed 100644 --- a/examples/readme_futures.rs +++ b/examples/readme_futures.rs @@ -25,10 +25,10 @@ service! { struct HelloServer; impl FutureService for HelloServer { - type HelloFut = futures::Finished; + type HelloFut = Result; fn hello(&self, name: String) -> Self::HelloFut { - futures::finished(format!("Hello, {}!", name)) + Ok(format!("Hello, {}!", name)) } } diff --git a/examples/server_calling_server.rs b/examples/server_calling_server.rs index a1ac41c..dcf5c9f 100644 --- a/examples/server_calling_server.rs +++ b/examples/server_calling_server.rs @@ -40,10 +40,10 @@ pub mod double { struct AddServer; impl AddFutureService for AddServer { - type AddFut = futures::Finished; + type AddFut = Result; fn add(&self, x: i32, y: i32) -> Self::AddFut { - futures::finished(x + y) + Ok(x + y) } } diff --git a/examples/throughput.rs b/examples/throughput.rs index 0c99c52..d260b98 100644 --- a/examples/throughput.rs +++ b/examples/throughput.rs @@ -12,6 +12,7 @@ extern crate lazy_static; extern crate tarpc; extern crate env_logger; extern crate futures; +extern crate serde; use futures::Future; use std::io::{Read, Write, stdout}; @@ -24,7 +25,7 @@ use tarpc::client::sync::Connect; use tarpc::util::{FirstSocketAddr, Never}; lazy_static! { - static ref BUF: Arc> = Arc::new(gen_vec(CHUNK_SIZE as usize)); + static ref BUF: Arc = Arc::new(gen_vec(CHUNK_SIZE as usize).into()); } fn gen_vec(size: usize) -> Vec { @@ -36,17 +37,17 @@ fn gen_vec(size: usize) -> Vec { } service! { - rpc read() -> Arc>; + rpc read() -> Arc; } #[derive(Clone)] struct Server; impl FutureService for Server { - type ReadFut = futures::Finished>, Never>; + type ReadFut = Result, Never>; fn read(&self) -> Self::ReadFut { - futures::finished(BUF.clone()) + Ok(BUF.clone()) } } diff --git a/examples/two_clients.rs b/examples/two_clients.rs index 0e56ce4..3ad0366 100644 --- a/examples/two_clients.rs +++ b/examples/two_clients.rs @@ -30,10 +30,10 @@ mod bar { #[derive(Clone)] struct Bar; impl bar::FutureService for Bar { - type BarFut = futures::Finished; + type BarFut = Result; fn bar(&self, i: i32) -> Self::BarFut { - futures::finished(i) + Ok(i) } } @@ -46,10 +46,10 @@ mod baz { #[derive(Clone)] struct Baz; impl baz::FutureService for Baz { - type BazFut = futures::Finished; + type BazFut = Result; fn baz(&self, s: String) -> Self::BazFut { - futures::finished(format!("Hello, {}!", s)) + Ok(format!("Hello, {}!", s)) } } diff --git a/src/macros.rs b/src/macros.rs index b5e9edd..c6e27d3 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -107,7 +107,9 @@ macro_rules! impl_deserialize { impl $crate::serde::de::Visitor for Visitor { type Value = $impler; - fn expecting(&self, formatter: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + fn expecting(&self, formatter: &mut ::std::fmt::Formatter) + -> ::std::fmt::Result + { formatter.write_str("an enum variant") } @@ -328,7 +330,7 @@ macro_rules! service { snake_to_camel! { /// The type of future returned by `{}`. - type $fn_name: $crate::futures::Future; + type $fn_name: $crate::futures::IntoFuture; } $(#[$attr])* @@ -371,10 +373,12 @@ macro_rules! service { enum tarpc_service_FutureReply__ { DeserializeError(tarpc_service_Future__), $($fn_name( - $crate::futures::Then) - -> tarpc_service_Future__>)),* + $crate::futures::Then< + ::Future, + tarpc_service_Future__, + fn(::std::result::Result<$out, $error>) + -> tarpc_service_Future__>)),* } impl $crate::futures::Future for tarpc_service_FutureReply__ { @@ -447,7 +451,8 @@ macro_rules! service { } return tarpc_service_FutureReply__::$fn_name( $crate::futures::Future::then( - FutureService::$fn_name(&self.0, $($arg),*), + $crate::futures::IntoFuture::into_future( + FutureService::$fn_name(&self.0, $($arg),*)), tarpc_service_wrap__)); } )*