From 85fbe411e63d0f431135f130b05f5c2d112a5195 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sat, 29 Oct 2016 10:16:12 -0700 Subject: [PATCH] Run clients in parallel (that's the whole point!). --- examples/concurrency.rs | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/examples/concurrency.rs b/examples/concurrency.rs index 1b3c8cf..d6a8611 100644 --- a/examples/concurrency.rs +++ b/examples/concurrency.rs @@ -18,10 +18,9 @@ extern crate futures_cpupool; use futures::Future; use futures_cpupool::{CpuFuture, CpuPool}; -use std::thread; use std::time::{Duration, Instant, SystemTime}; use tarpc::future::{Connect}; -use tarpc::util::{FirstSocketAddr, Never}; +use tarpc::util::{FirstSocketAddr, Never, spawn_core}; use tokio_core::reactor; service! { @@ -63,7 +62,10 @@ fn run_once<'a>(clients: &'a [FutureClient], concurrency: u32, print: bool) let start = SystemTime::now(); let future = client.read(CHUNK_SIZE).map(move |_| start.elapsed().unwrap()); future - }); + }) + // Need an intermediate collection to kick off each future, + // because futures::collect will iterate sequentially. + .collect::>(); let futs = futures::collect(futs); Box::new(futs.map(move |latencies| { @@ -107,13 +109,21 @@ fn main() { let server = Server::new().listen("localhost:0".first_socket_addr()).wait().unwrap(); println!("Server listening on {}.", server.local_addr()); + // The driver of the main future. let mut core = reactor::Core::new().unwrap(); - let handle = core.handle(); + let clients = (1...5) - .map(|i| { + // Spin up a couple threads to drive the clients. + .map(|i| (i, spawn_core())) + .map(|(i, remote)| { println!("Client {} connecting...", i); - FutureClient::connect_with(server.local_addr(), &handle).map_err(|e| panic!(e)) - }); + FutureClient::connect_remotely(server.local_addr(), &remote) + .map_err(|e| panic!(e)) + }) + // Need an intermediate collection to connect the clients in parallel, + // because `futures::collect` iterates sequentially. + .collect::>(); + let clients = core.run(futures::collect(clients)).unwrap(); println!("Starting..."); let runs = ::std::iter::once(run_once(&clients, MAX_CONCURRENCY, false))