mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-01-24 11:40:22 +01:00
Count requests in concurrency example
This commit is contained in:
@@ -20,6 +20,8 @@ extern crate futures_cpupool;
|
||||
use clap::{Arg, App};
|
||||
use futures::Future;
|
||||
use futures_cpupool::{CpuFuture, CpuPool};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use tarpc::future::{Connect};
|
||||
use tarpc::util::{FirstSocketAddr, Never, spawn_core};
|
||||
@@ -30,11 +32,17 @@ service! {
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Server(CpuPool);
|
||||
struct Server {
|
||||
pool: CpuPool,
|
||||
request_count: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
fn new() -> Self {
|
||||
Server(CpuPool::new_num_cpus())
|
||||
Server {
|
||||
pool: CpuPool::new_num_cpus(),
|
||||
request_count: Arc::new(AtomicUsize::new(1)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,13 +50,15 @@ impl FutureService for Server {
|
||||
type ReadFut = CpuFuture<Vec<u8>, Never>;
|
||||
|
||||
fn read(&self, size: u32) -> Self::ReadFut {
|
||||
debug!("Server received read({})", size);
|
||||
self.0
|
||||
let request_number = self.request_count.fetch_add(1, Ordering::SeqCst);
|
||||
debug!("Server received read({}) no. {}", size, request_number);
|
||||
self.pool
|
||||
.spawn(futures::lazy(move || {
|
||||
let mut vec: Vec<u8> = Vec::with_capacity(size as usize);
|
||||
for i in 0..size {
|
||||
vec.push((i % 1 << 8) as u8);
|
||||
}
|
||||
debug!("Server sending response no. {}", request_number);
|
||||
futures::finished(vec)
|
||||
}))
|
||||
}
|
||||
@@ -77,9 +87,14 @@ fn run_once(clients: Vec<FutureClient>, concurrency: u32) -> impl Future<Item=()
|
||||
.enumerate()
|
||||
.take(concurrency as usize)
|
||||
.map(|(iteration, (client_id, client))| {
|
||||
let iteration = iteration + 1;
|
||||
let start = SystemTime::now();
|
||||
debug!("Client {} reading (iteration {})...", client_id, iteration + 1);
|
||||
let future = client.read(CHUNK_SIZE).map(move |_| start.elapsed().unwrap());
|
||||
debug!("Client {} reading (iteration {})...", client_id, iteration);
|
||||
let future = client.read(CHUNK_SIZE).map(move |_| {
|
||||
let elapsed = start.elapsed().unwrap();
|
||||
debug!("Client {} received reply (iteration {}).", client_id, iteration);
|
||||
elapsed
|
||||
});
|
||||
future
|
||||
})
|
||||
// Need an intermediate collection to kick off each future,
|
||||
@@ -114,15 +129,26 @@ fn main() {
|
||||
.value_name("LEVEL")
|
||||
.help("Sets a custom concurrency level")
|
||||
.takes_value(true))
|
||||
.arg(Arg::with_name("clients")
|
||||
.short("n")
|
||||
.long("num_clients")
|
||||
.value_name("AMOUNT")
|
||||
.help("How many clients to distribute requests between")
|
||||
.takes_value(true))
|
||||
.get_matches();
|
||||
let concurrency = matches.value_of("concurrency")
|
||||
.map(&str::parse)
|
||||
.map(Result::unwrap)
|
||||
.unwrap_or(10);
|
||||
let num_clients = matches.value_of("clients")
|
||||
.map(&str::parse)
|
||||
.map(Result::unwrap)
|
||||
.unwrap_or(4);
|
||||
|
||||
let addr = Server::new().listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
info!("Server listening on {}.", addr);
|
||||
|
||||
let clients = (0..4)
|
||||
let clients = (0..num_clients)
|
||||
// Spin up a couple threads to drive the clients.
|
||||
.map(|i| (i, spawn_core()))
|
||||
.map(|(i, remote)| {
|
||||
@@ -140,5 +166,6 @@ fn main() {
|
||||
|
||||
// The driver of the main future.
|
||||
let mut core = reactor::Core::new().unwrap();
|
||||
|
||||
core.run(run).unwrap();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user