From a1de4c1b0599e8da78d3040df3a538d1f2e301c1 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sat, 29 Oct 2016 02:02:11 -0700 Subject: [PATCH] Re-work concurrency example to not use wait() everywhere. --- examples/concurrency.rs | 69 +++++++++++++++++++++-------------------- 1 file changed, 36 insertions(+), 33 deletions(-) diff --git a/examples/concurrency.rs b/examples/concurrency.rs index 841a424..76d0a0e 100644 --- a/examples/concurrency.rs +++ b/examples/concurrency.rs @@ -13,6 +13,7 @@ extern crate futures; extern crate log; #[macro_use] extern crate tarpc; +extern crate tokio_core; extern crate futures_cpupool; use futures::Future; @@ -21,6 +22,7 @@ use std::thread; use std::time::{Duration, Instant, SystemTime}; use tarpc::future::{Connect}; use tarpc::util::{FirstSocketAddr, Never}; +use tokio_core::reactor; service! { rpc read(size: u32) -> Vec; @@ -50,41 +52,40 @@ impl FutureService for Server { } } -fn run_once(clients: &[FutureClient], concurrency: u32, print: bool) { - let _ = env_logger::init(); - +fn run_once<'a>(clients: &'a [FutureClient], concurrency: u32, print: bool) + -> Box + 'a> +{ let start = Instant::now(); - let futures: Vec<_> = clients.iter() + let futs = clients.iter() .cycle() .take(concurrency as usize) .map(|client| { let start = SystemTime::now(); + info!("Reading..."); let future = client.read(CHUNK_SIZE).map(move |_| start.elapsed().unwrap()); + info!("Yielding..."); thread::yield_now(); future - }) - .collect(); + }); + let futs = futures::collect(futs); - let latencies: Vec<_> = futures.into_iter() - .map(|future| { - future.wait().unwrap() - }) - .collect(); - let total_time = start.elapsed(); + Box::new(futs.map(move |latencies| { + let total_time = start.elapsed(); - let sum_latencies = latencies.iter().fold(Duration::new(0, 0), |sum, &dur| sum + dur); - let mean = sum_latencies / latencies.len() as u32; - let min_latency = *latencies.iter().min().unwrap(); - let max_latency = *latencies.iter().max().unwrap(); + let sum_latencies = latencies.iter().fold(Duration::new(0, 0), |sum, &dur| sum + dur); + let mean = sum_latencies / latencies.len() as u32; + let min_latency = *latencies.iter().min().unwrap(); + let max_latency = *latencies.iter().max().unwrap(); - if print { - println!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs", - latencies.len(), - mean.microseconds(), - min_latency.microseconds(), - max_latency.microseconds(), - total_time.microseconds()); - } + if print { + println!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs", + latencies.len(), + mean.microseconds(), + min_latency.microseconds(), + max_latency.microseconds(), + total_time.microseconds()); + } + }).map_err(|e| panic!(e))) } trait Microseconds { @@ -105,18 +106,20 @@ const MAX_CONCURRENCY: u32 = 100; fn main() { let _ = env_logger::init(); + let server = Server::new().listen("localhost:0".first_socket_addr()).wait().unwrap(); println!("Server listening on {}.", server.local_addr()); - let clients: Vec<_> = (1...5) + + let mut core = reactor::Core::new().unwrap(); + let handle = core.handle(); + let clients = (1...5) .map(|i| { println!("Client {} connecting...", i); - FutureClient::connect(server.local_addr()).wait().unwrap() - }) - .collect(); + FutureClient::connect_with(server.local_addr(), &handle).map_err(|e| panic!(e)) + }); + let clients = core.run(futures::collect(clients)).unwrap(); println!("Starting..."); - - run_once(&clients, MAX_CONCURRENCY, false); - for concurrency in 1...MAX_CONCURRENCY { - run_once(&clients, concurrency, true); - } + let runs = ::std::iter::once(run_once(&clients, MAX_CONCURRENCY, false)) + .chain((1...MAX_CONCURRENCY).map(|concurrency| run_once(&clients, concurrency, true))); + core.run(futures::collect(runs)).unwrap(); }