diff --git a/examples/concurrency.rs b/examples/concurrency.rs index 7cfb84f..f6c420f 100644 --- a/examples/concurrency.rs +++ b/examples/concurrency.rs @@ -12,6 +12,7 @@ extern crate env_logger; extern crate futures; #[macro_use] extern crate log; +extern crate serde; #[macro_use] extern crate tarpc; extern crate tokio_core; @@ -30,7 +31,7 @@ use tarpc::util::{FirstSocketAddr, Never}; use tokio_core::reactor; service! { - rpc read(size: u32) -> Vec; + rpc read(size: u32) -> serde::bytes::ByteBuf; } #[derive(Clone)] @@ -49,19 +50,19 @@ impl Server { } impl FutureService for Server { - type ReadFut = CpuFuture, Never>; + type ReadFut = CpuFuture; fn read(&self, size: u32) -> Self::ReadFut { let request_number = self.request_count.fetch_add(1, Ordering::SeqCst); debug!("Server received read({}) no. {}", size, request_number); self.pool .spawn(futures::lazy(move || { - let mut vec: Vec = Vec::with_capacity(size as usize); + let mut vec = Vec::with_capacity(size as usize); for i in 0..size { vec.push(((i % 2) << 8) as u8); } debug!("Server sending response no. {}", request_number); - futures::finished(vec) + Ok(vec.into()) })) } } @@ -106,11 +107,9 @@ fn run_once(clients: Vec, concurrency: u32) -> impl Future + 'static { let start = Instant::now(); - let num_clients = clients.len(); futures::stream::futures_unordered((0..concurrency as usize) - .map(|iteration| (iteration + 1, iteration % num_clients)) - .map(|(iteration, client_idx)| { - let client = &clients[client_idx]; + .zip(clients.iter().enumerate().cycle()) + .map(|(iteration, (client_idx, client))| { let start = Instant::now(); debug!("Client {} reading (iteration {})...", client_idx, iteration); client.read(CHUNK_SIZE) @@ -181,10 +180,7 @@ fn main() { info!("Client {} connecting...", i); FutureClient::connect(addr, client::Options::default().remote(remote)) .map_err(|e| panic!(e)) - }) - // Need an intermediate collection to connect the clients in parallel, - // because `futures::collect` iterates sequentially. - .collect::>(); + }); let run = futures::collect(clients).and_then(|clients| run_once(clients, concurrency)); diff --git a/examples/pubsub.rs b/examples/pubsub.rs index e2f9e1e..e6d5ca1 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -48,11 +48,11 @@ struct Subscriber { } impl subscriber::FutureService for Subscriber { - type ReceiveFut = futures::Finished<(), Never>; + type ReceiveFut = Result<(), Never>; fn receive(&self, message: String) -> Self::ReceiveFut { println!("{} received message: {}", self.id, message); - futures::finished(()) + Ok(()) } } diff --git a/examples/readme_futures.rs b/examples/readme_futures.rs index 7bf66d2..5553aed 100644 --- a/examples/readme_futures.rs +++ b/examples/readme_futures.rs @@ -25,10 +25,10 @@ service! { struct HelloServer; impl FutureService for HelloServer { - type HelloFut = futures::Finished; + type HelloFut = Result; fn hello(&self, name: String) -> Self::HelloFut { - futures::finished(format!("Hello, {}!", name)) + Ok(format!("Hello, {}!", name)) } } diff --git a/examples/server_calling_server.rs b/examples/server_calling_server.rs index a1ac41c..dcf5c9f 100644 --- a/examples/server_calling_server.rs +++ b/examples/server_calling_server.rs @@ -40,10 +40,10 @@ pub mod double { struct AddServer; impl AddFutureService for AddServer { - type AddFut = futures::Finished; + type AddFut = Result; fn add(&self, x: i32, y: i32) -> Self::AddFut { - futures::finished(x + y) + Ok(x + y) } } diff --git a/examples/throughput.rs b/examples/throughput.rs index 0c99c52..d260b98 100644 --- a/examples/throughput.rs +++ b/examples/throughput.rs @@ -12,6 +12,7 @@ extern crate lazy_static; extern crate tarpc; extern crate env_logger; extern crate futures; +extern crate serde; use futures::Future; use std::io::{Read, Write, stdout}; @@ -24,7 +25,7 @@ use tarpc::client::sync::Connect; use tarpc::util::{FirstSocketAddr, Never}; lazy_static! { - static ref BUF: Arc> = Arc::new(gen_vec(CHUNK_SIZE as usize)); + static ref BUF: Arc = Arc::new(gen_vec(CHUNK_SIZE as usize).into()); } fn gen_vec(size: usize) -> Vec { @@ -36,17 +37,17 @@ fn gen_vec(size: usize) -> Vec { } service! { - rpc read() -> Arc>; + rpc read() -> Arc; } #[derive(Clone)] struct Server; impl FutureService for Server { - type ReadFut = futures::Finished>, Never>; + type ReadFut = Result, Never>; fn read(&self) -> Self::ReadFut { - futures::finished(BUF.clone()) + Ok(BUF.clone()) } } diff --git a/examples/two_clients.rs b/examples/two_clients.rs index 0e56ce4..3ad0366 100644 --- a/examples/two_clients.rs +++ b/examples/two_clients.rs @@ -30,10 +30,10 @@ mod bar { #[derive(Clone)] struct Bar; impl bar::FutureService for Bar { - type BarFut = futures::Finished; + type BarFut = Result; fn bar(&self, i: i32) -> Self::BarFut { - futures::finished(i) + Ok(i) } } @@ -46,10 +46,10 @@ mod baz { #[derive(Clone)] struct Baz; impl baz::FutureService for Baz { - type BazFut = futures::Finished; + type BazFut = Result; fn baz(&self, s: String) -> Self::BazFut { - futures::finished(format!("Hello, {}!", s)) + Ok(format!("Hello, {}!", s)) } } diff --git a/src/macros.rs b/src/macros.rs index b5e9edd..c6e27d3 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -107,7 +107,9 @@ macro_rules! impl_deserialize { impl $crate::serde::de::Visitor for Visitor { type Value = $impler; - fn expecting(&self, formatter: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + fn expecting(&self, formatter: &mut ::std::fmt::Formatter) + -> ::std::fmt::Result + { formatter.write_str("an enum variant") } @@ -328,7 +330,7 @@ macro_rules! service { snake_to_camel! { /// The type of future returned by `{}`. - type $fn_name: $crate::futures::Future; + type $fn_name: $crate::futures::IntoFuture; } $(#[$attr])* @@ -371,10 +373,12 @@ macro_rules! service { enum tarpc_service_FutureReply__ { DeserializeError(tarpc_service_Future__), $($fn_name( - $crate::futures::Then) - -> tarpc_service_Future__>)),* + $crate::futures::Then< + ::Future, + tarpc_service_Future__, + fn(::std::result::Result<$out, $error>) + -> tarpc_service_Future__>)),* } impl $crate::futures::Future for tarpc_service_FutureReply__ { @@ -447,7 +451,8 @@ macro_rules! service { } return tarpc_service_FutureReply__::$fn_name( $crate::futures::Future::then( - FutureService::$fn_name(&self.0, $($arg),*), + $crate::futures::IntoFuture::into_future( + FutureService::$fn_name(&self.0, $($arg),*)), tarpc_service_wrap__)); } )*