diff --git a/examples/concurrency.rs b/examples/concurrency.rs index d6a8611..4415bb9 100644 --- a/examples/concurrency.rs +++ b/examples/concurrency.rs @@ -18,6 +18,7 @@ extern crate futures_cpupool; use futures::Future; use futures_cpupool::{CpuFuture, CpuPool}; +use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; use tarpc::future::{Connect}; use tarpc::util::{FirstSocketAddr, Never, spawn_core}; @@ -46,20 +47,22 @@ impl FutureService for Server { for i in 0..size { vec.push((i % 1 << 8) as u8); } - futures::finished::<_, Never>(vec) + futures::finished(vec) })) } } -fn run_once<'a>(clients: &'a [FutureClient], concurrency: u32, print: bool) - -> Box + 'a> +fn run_once(clients: Arc>, concurrency: u32) -> Box> { let start = Instant::now(); let futs = clients.iter() + .enumerate() .cycle() + .enumerate() .take(concurrency as usize) - .map(|client| { + .map(|(iteration, (client_id, client))| { let start = SystemTime::now(); + debug!("Client {} reading (iteration {})...", client_id, iteration); let future = client.read(CHUNK_SIZE).map(move |_| start.elapsed().unwrap()); future }) @@ -76,14 +79,12 @@ fn run_once<'a>(clients: &'a [FutureClient], concurrency: u32, print: bool) let min_latency = *latencies.iter().min().unwrap(); let max_latency = *latencies.iter().max().unwrap(); - if print { - println!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs", - latencies.len(), - mean.microseconds(), - min_latency.microseconds(), - max_latency.microseconds(), - total_time.microseconds()); - } + info!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs", + latencies.len(), + mean.microseconds(), + min_latency.microseconds(), + max_latency.microseconds(), + total_time.microseconds()); }).map_err(|e| panic!(e))) } @@ -107,16 +108,16 @@ fn main() { let _ = env_logger::init(); let server = Server::new().listen("localhost:0".first_socket_addr()).wait().unwrap(); - println!("Server listening on {}.", server.local_addr()); + info!("Server listening on {}.", server.local_addr()); // The driver of the main future. let mut core = reactor::Core::new().unwrap(); - let clients = (1...5) + let clients = (0..4) // Spin up a couple threads to drive the clients. .map(|i| (i, spawn_core())) .map(|(i, remote)| { - println!("Client {} connecting...", i); + info!("Client {} connecting...", i); FutureClient::connect_remotely(server.local_addr(), &remote) .map_err(|e| panic!(e)) }) @@ -124,9 +125,13 @@ fn main() { // because `futures::collect` iterates sequentially. .collect::>(); - let clients = core.run(futures::collect(clients)).unwrap(); - println!("Starting..."); - let runs = ::std::iter::once(run_once(&clients, MAX_CONCURRENCY, false)) - .chain((1...MAX_CONCURRENCY).map(|concurrency| run_once(&clients, concurrency, true))); - core.run(futures::collect(runs)).unwrap(); + let runs = futures::collect(clients).and_then(|clients| { + let clients = Arc::new(clients); + let runs = (1...MAX_CONCURRENCY) + .map(move |concurrency| run_once(clients.clone(), concurrency)); + futures::collect(runs) + }); + + info!("Starting..."); + core.run(runs).unwrap(); }