Re-work concurrency example to not use wait() everywhere.

This commit is contained in:
Tim Kuehn
2016-10-29 02:02:11 -07:00
parent b6e9d61286
commit a1de4c1b05

View File

@@ -13,6 +13,7 @@ extern crate futures;
extern crate log;
#[macro_use]
extern crate tarpc;
extern crate tokio_core;
extern crate futures_cpupool;
use futures::Future;
@@ -21,6 +22,7 @@ use std::thread;
use std::time::{Duration, Instant, SystemTime};
use tarpc::future::{Connect};
use tarpc::util::{FirstSocketAddr, Never};
use tokio_core::reactor;
service! {
rpc read(size: u32) -> Vec<u8>;
@@ -50,41 +52,40 @@ impl FutureService for Server {
}
}
fn run_once(clients: &[FutureClient], concurrency: u32, print: bool) {
let _ = env_logger::init();
fn run_once<'a>(clients: &'a [FutureClient], concurrency: u32, print: bool)
-> Box<Future<Item=(), Error=()> + 'a>
{
let start = Instant::now();
let futures: Vec<_> = clients.iter()
let futs = clients.iter()
.cycle()
.take(concurrency as usize)
.map(|client| {
let start = SystemTime::now();
info!("Reading...");
let future = client.read(CHUNK_SIZE).map(move |_| start.elapsed().unwrap());
info!("Yielding...");
thread::yield_now();
future
})
.collect();
});
let futs = futures::collect(futs);
let latencies: Vec<_> = futures.into_iter()
.map(|future| {
future.wait().unwrap()
})
.collect();
let total_time = start.elapsed();
Box::new(futs.map(move |latencies| {
let total_time = start.elapsed();
let sum_latencies = latencies.iter().fold(Duration::new(0, 0), |sum, &dur| sum + dur);
let mean = sum_latencies / latencies.len() as u32;
let min_latency = *latencies.iter().min().unwrap();
let max_latency = *latencies.iter().max().unwrap();
let sum_latencies = latencies.iter().fold(Duration::new(0, 0), |sum, &dur| sum + dur);
let mean = sum_latencies / latencies.len() as u32;
let min_latency = *latencies.iter().min().unwrap();
let max_latency = *latencies.iter().max().unwrap();
if print {
println!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs",
latencies.len(),
mean.microseconds(),
min_latency.microseconds(),
max_latency.microseconds(),
total_time.microseconds());
}
if print {
println!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs",
latencies.len(),
mean.microseconds(),
min_latency.microseconds(),
max_latency.microseconds(),
total_time.microseconds());
}
}).map_err(|e| panic!(e)))
}
trait Microseconds {
@@ -105,18 +106,20 @@ const MAX_CONCURRENCY: u32 = 100;
fn main() {
let _ = env_logger::init();
let server = Server::new().listen("localhost:0".first_socket_addr()).wait().unwrap();
println!("Server listening on {}.", server.local_addr());
let clients: Vec<_> = (1...5)
let mut core = reactor::Core::new().unwrap();
let handle = core.handle();
let clients = (1...5)
.map(|i| {
println!("Client {} connecting...", i);
FutureClient::connect(server.local_addr()).wait().unwrap()
})
.collect();
FutureClient::connect_with(server.local_addr(), &handle).map_err(|e| panic!(e))
});
let clients = core.run(futures::collect(clients)).unwrap();
println!("Starting...");
run_once(&clients, MAX_CONCURRENCY, false);
for concurrency in 1...MAX_CONCURRENCY {
run_once(&clients, concurrency, true);
}
let runs = ::std::iter::once(run_once(&clients, MAX_CONCURRENCY, false))
.chain((1...MAX_CONCURRENCY).map(|concurrency| run_once(&clients, concurrency, true)));
core.run(futures::collect(runs)).unwrap();
}