mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-02-23 15:49:54 +01:00
Track latest tokio changes
This commit is contained in:
@@ -18,11 +18,12 @@ extern crate tokio_core;
|
||||
extern crate futures_cpupool;
|
||||
|
||||
use clap::{Arg, App};
|
||||
use futures::Future;
|
||||
use futures::{Future, Stream};
|
||||
use futures_cpupool::{CpuFuture, CpuPool};
|
||||
use std::cmp;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use std::time::{Duration, Instant};
|
||||
use tarpc::future::{Connect};
|
||||
use tarpc::util::{FirstSocketAddr, Never, spawn_core};
|
||||
use tokio_core::reactor;
|
||||
@@ -49,7 +50,7 @@ impl Server {
|
||||
impl FutureService for Server {
|
||||
type ReadFut = CpuFuture<Vec<u8>, Never>;
|
||||
|
||||
fn read(&self, size: u32) -> Self::ReadFut {
|
||||
fn read(&mut self, size: u32) -> Self::ReadFut {
|
||||
let request_number = self.request_count.fetch_add(1, Ordering::SeqCst);
|
||||
debug!("Server received read({}) no. {}", size, request_number);
|
||||
self.pool
|
||||
@@ -79,44 +80,48 @@ impl Microseconds for Duration {
|
||||
}
|
||||
}
|
||||
|
||||
fn run_once(clients: Vec<FutureClient>, concurrency: u32) -> impl Future<Item=(), Error=()> {
|
||||
#[derive(Default)]
|
||||
struct Stats {
|
||||
sum: Duration,
|
||||
count: u64,
|
||||
min: Option<Duration>,
|
||||
max: Option<Duration>,
|
||||
}
|
||||
|
||||
fn run_once(mut clients: Vec<FutureClient>, concurrency: u32) -> impl Future<Item=(), Error=()> + 'static {
|
||||
let start = Instant::now();
|
||||
let futs = clients.iter()
|
||||
.enumerate()
|
||||
.cycle()
|
||||
.enumerate()
|
||||
.take(concurrency as usize)
|
||||
.map(|(iteration, (client_id, client))| {
|
||||
let iteration = iteration + 1;
|
||||
let start = SystemTime::now();
|
||||
debug!("Client {} reading (iteration {})...", client_id, iteration);
|
||||
let future = client.read(CHUNK_SIZE).map(move |_| {
|
||||
let elapsed = start.elapsed().unwrap();
|
||||
debug!("Client {} received reply (iteration {}).", client_id, iteration);
|
||||
elapsed
|
||||
});
|
||||
future
|
||||
let num_clients = clients.len();
|
||||
futures::stream::futures_unordered((0..concurrency as usize)
|
||||
.map(|iteration| (iteration + 1, iteration % num_clients))
|
||||
.map(|(iteration, client_idx)| {
|
||||
let mut client = &mut clients[client_idx];
|
||||
let start = Instant::now();
|
||||
debug!("Client {} reading (iteration {})...", client_idx, iteration);
|
||||
client.read(CHUNK_SIZE)
|
||||
.map(move |_| (client_idx, iteration, start))
|
||||
}))
|
||||
//.collect::<FuturesUnordered<_>>()
|
||||
.map(|(client_idx, iteration, start)| {
|
||||
let elapsed = start.elapsed();
|
||||
debug!("Client {} received reply (iteration {}).", client_idx, iteration);
|
||||
elapsed
|
||||
})
|
||||
.map_err(|e| panic!(e))
|
||||
.fold(Stats::default(), move |mut stats, elapsed| {
|
||||
stats.sum += elapsed;
|
||||
stats.count += 1;
|
||||
stats.min = Some(cmp::min(stats.min.unwrap_or(elapsed), elapsed));
|
||||
stats.max = Some(cmp::max(stats.max.unwrap_or(elapsed), elapsed));
|
||||
Ok(stats)
|
||||
})
|
||||
.map(move |stats| {
|
||||
info!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs",
|
||||
stats.count,
|
||||
stats.sum.microseconds() as f64 / stats.count as f64,
|
||||
stats.min.unwrap().microseconds(),
|
||||
stats.max.unwrap().microseconds(),
|
||||
start.elapsed().microseconds());
|
||||
})
|
||||
// Need an intermediate collection to kick off each future,
|
||||
// because futures::collect will iterate sequentially.
|
||||
.collect::<Vec<_>>();
|
||||
let futs = futures::collect(futs);
|
||||
|
||||
futs.map(move |latencies| {
|
||||
let total_time = start.elapsed();
|
||||
|
||||
let sum_latencies = latencies.iter().fold(Duration::new(0, 0), |sum, &dur| sum + dur);
|
||||
let mean = sum_latencies / latencies.len() as u32;
|
||||
let min_latency = *latencies.iter().min().unwrap();
|
||||
let max_latency = *latencies.iter().max().unwrap();
|
||||
|
||||
info!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs",
|
||||
latencies.len(),
|
||||
mean.microseconds(),
|
||||
min_latency.microseconds(),
|
||||
max_latency.microseconds(),
|
||||
total_time.microseconds());
|
||||
}).map_err(|e| panic!(e))
|
||||
}
|
||||
|
||||
fn main() {
|
||||
|
||||
@@ -49,7 +49,7 @@ struct Subscriber {
|
||||
impl subscriber::FutureService for Subscriber {
|
||||
type ReceiveFut = futures::Finished<(), Never>;
|
||||
|
||||
fn receive(&self, message: String) -> Self::ReceiveFut {
|
||||
fn receive(&mut self, message: String) -> Self::ReceiveFut {
|
||||
println!("{} received message: {}", self.id, message);
|
||||
futures::finished(())
|
||||
}
|
||||
@@ -80,11 +80,11 @@ impl Publisher {
|
||||
impl publisher::FutureService for Publisher {
|
||||
type BroadcastFut = BoxFuture<(), Never>;
|
||||
|
||||
fn broadcast(&self, message: String) -> Self::BroadcastFut {
|
||||
fn broadcast(&mut self, message: String) -> Self::BroadcastFut {
|
||||
futures::collect(self.clients
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.values_mut()
|
||||
// Ignore failing subscribers.
|
||||
.map(move |client| client.receive(message.clone()).then(|_| Ok(())))
|
||||
.collect::<Vec<_>>())
|
||||
@@ -94,7 +94,7 @@ impl publisher::FutureService for Publisher {
|
||||
|
||||
type SubscribeFut = BoxFuture<(), Message>;
|
||||
|
||||
fn subscribe(&self, id: u32, address: SocketAddr) -> Self::SubscribeFut {
|
||||
fn subscribe(&mut self, id: u32, address: SocketAddr) -> Self::SubscribeFut {
|
||||
let clients = self.clients.clone();
|
||||
subscriber::FutureClient::connect(&address)
|
||||
.map(move |subscriber| {
|
||||
@@ -108,7 +108,7 @@ impl publisher::FutureService for Publisher {
|
||||
|
||||
type UnsubscribeFut = BoxFuture<(), Never>;
|
||||
|
||||
fn unsubscribe(&self, id: u32) -> Self::UnsubscribeFut {
|
||||
fn unsubscribe(&mut self, id: u32) -> Self::UnsubscribeFut {
|
||||
println!("Unsubscribing {}", id);
|
||||
self.clients.lock().unwrap().remove(&id).unwrap();
|
||||
futures::finished(()).boxed()
|
||||
@@ -122,7 +122,7 @@ fn main() {
|
||||
.wait()
|
||||
.unwrap();
|
||||
|
||||
let publisher_client = publisher::SyncClient::connect(publisher_addr).unwrap();
|
||||
let mut publisher_client = publisher::SyncClient::connect(publisher_addr).unwrap();
|
||||
|
||||
let subscriber1 = Subscriber::new(0);
|
||||
publisher_client.subscribe(0, subscriber1).unwrap();
|
||||
|
||||
@@ -1,55 +0,0 @@
|
||||
// Copyright 2016 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(conservative_impl_trait, plugin)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate env_logger;
|
||||
extern crate futures;
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
#[macro_use]
|
||||
extern crate tarpc;
|
||||
extern crate tokio_core;
|
||||
|
||||
use futures::Future;
|
||||
use tarpc::util::Never;
|
||||
use tarpc::future::Connect;
|
||||
|
||||
service! {
|
||||
rpc hello(name: String) -> String;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct HelloServer;
|
||||
|
||||
impl SyncService for HelloServer {
|
||||
fn hello(&self, name: String) -> Result<String, Never> {
|
||||
info!("Got request: {}", name);
|
||||
Ok(format!("Hello, {}!", name))
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let _ = env_logger::init();
|
||||
let mut core = tokio_core::reactor::Core::new().unwrap();
|
||||
let addr = HelloServer.listen("localhost:10000").unwrap();
|
||||
let f = FutureClient::connect(&addr)
|
||||
.map_err(tarpc::Error::from)
|
||||
.and_then(|client| {
|
||||
let resp1 = client.hello("Mom".to_string());
|
||||
info!("Sent first request.");
|
||||
/*
|
||||
let resp2 = client.hello("Dad".to_string());
|
||||
info!("Sent second request.");
|
||||
*/
|
||||
futures::collect(vec![resp1, /*resp2*/])
|
||||
}).map(|responses| {
|
||||
for resp in responses {
|
||||
println!("{}", resp);
|
||||
}
|
||||
});
|
||||
core.run(f).unwrap();
|
||||
}
|
||||
@@ -39,7 +39,7 @@ impl Error for NoNameGiven {
|
||||
struct HelloServer;
|
||||
|
||||
impl SyncService for HelloServer {
|
||||
fn hello(&self, name: String) -> Result<String, NoNameGiven> {
|
||||
fn hello(&mut self, name: String) -> Result<String, NoNameGiven> {
|
||||
if name == "" {
|
||||
Err(NoNameGiven)
|
||||
} else {
|
||||
@@ -50,7 +50,7 @@ impl SyncService for HelloServer {
|
||||
|
||||
fn main() {
|
||||
let addr = HelloServer.listen("localhost:10000").unwrap();
|
||||
let client = SyncClient::connect(addr).unwrap();
|
||||
let mut client = SyncClient::connect(addr).unwrap();
|
||||
println!("{}", client.hello("Mom".to_string()).unwrap());
|
||||
println!("{}", client.hello("".to_string()).unwrap_err());
|
||||
}
|
||||
@@ -46,7 +46,7 @@ impl Service for HelloServer {
|
||||
type Error = io::Error;
|
||||
type Future = Box<Future<Item = tarpc::Response<String, Never>, Error = io::Error>>;
|
||||
|
||||
fn call(&self, request: Self::Request) -> Self::Future {
|
||||
fn call(&mut self, request: Self::Request) -> Self::Future {
|
||||
Ok(Ok(format!("Hello, {}!", request.unwrap()))).into_future().boxed()
|
||||
}
|
||||
}
|
||||
@@ -60,7 +60,7 @@ impl FutureClient {
|
||||
tarpc::Client::connect_remotely(addr, &tarpc::REMOTE).map(FutureClient)
|
||||
}
|
||||
|
||||
pub fn hello(&self, name: String)
|
||||
pub fn hello(&mut self, name: String)
|
||||
-> impl Future<Item = String, Error = tarpc::Error<Never>> + 'static
|
||||
{
|
||||
self.0.call(name).then(|msg| msg.unwrap())
|
||||
@@ -73,7 +73,7 @@ fn main() {
|
||||
let addr = HelloServer::listen("localhost:10000".first_socket_addr()).wait().unwrap();
|
||||
let f = FutureClient::connect(&addr)
|
||||
.map_err(tarpc::Error::from)
|
||||
.and_then(|client| {
|
||||
.and_then(|mut client| {
|
||||
let resp1 = client.hello("Mom".to_string());
|
||||
info!("Sent first request.");
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ extern crate futures;
|
||||
#[macro_use]
|
||||
extern crate tarpc;
|
||||
|
||||
use futures::Future;
|
||||
use tarpc::util::{FirstSocketAddr, Never};
|
||||
use tarpc::sync::Connect;
|
||||
|
||||
@@ -24,13 +23,14 @@ struct HelloServer;
|
||||
impl FutureService for HelloServer {
|
||||
type HelloFut = futures::Finished<String, Never>;
|
||||
|
||||
fn hello(&self, name: String) -> Self::HelloFut {
|
||||
fn hello(&mut self, name: String) -> Self::HelloFut {
|
||||
futures::finished(format!("Hello, {}!", name))
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let addr = HelloServer.listen("localhost:10000".first_socket_addr()).wait().unwrap();
|
||||
let client = SyncClient::connect(addr).unwrap();
|
||||
let addr = "localhost:10000";
|
||||
let _server = HelloServer.listen(addr.first_socket_addr());
|
||||
let mut client = SyncClient::connect(addr).unwrap();
|
||||
println!("{}", client.hello("Mom".to_string()).unwrap());
|
||||
}
|
||||
35
examples/readme_sync.rs
Normal file
35
examples/readme_sync.rs
Normal file
@@ -0,0 +1,35 @@
|
||||
// Copyright 2016 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
// required by `FutureClient` (not used in this example)
|
||||
#![feature(conservative_impl_trait, plugin)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate futures;
|
||||
#[macro_use]
|
||||
extern crate tarpc;
|
||||
|
||||
use tarpc::util::Never;
|
||||
use tarpc::sync::Connect;
|
||||
|
||||
service! {
|
||||
rpc hello(name: String) -> String;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct HelloServer;
|
||||
|
||||
impl SyncService for HelloServer {
|
||||
fn hello(&mut self, name: String) -> Result<String, Never> {
|
||||
Ok(format!("Hello, {}!", name))
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let addr = "localhost:10000";
|
||||
HelloServer.listen(addr).unwrap();
|
||||
let mut client = SyncClient::connect(addr).unwrap();
|
||||
println!("{}", client.hello("Mom".to_string()).unwrap());
|
||||
}
|
||||
@@ -41,7 +41,7 @@ struct AddServer;
|
||||
impl AddFutureService for AddServer {
|
||||
type AddFut = futures::Finished<i32, Never>;
|
||||
|
||||
fn add(&self, x: i32, y: i32) -> Self::AddFut {
|
||||
fn add(&mut self, x: i32, y: i32) -> Self::AddFut {
|
||||
futures::finished(x + y)
|
||||
}
|
||||
}
|
||||
@@ -54,7 +54,7 @@ struct DoubleServer {
|
||||
impl DoubleServer {
|
||||
fn new(client: add::FutureClient) -> Self {
|
||||
DoubleServer {
|
||||
client: Arc::new(Mutex::new(client))
|
||||
client: Arc::new(Mutex::new(client)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -62,7 +62,7 @@ impl DoubleServer {
|
||||
impl DoubleFutureService for DoubleServer {
|
||||
type DoubleFut = BoxFuture<i32, Message>;
|
||||
|
||||
fn double(&self, x: i32) -> Self::DoubleFut {
|
||||
fn double(&mut self, x: i32) -> Self::DoubleFut {
|
||||
self.client
|
||||
.lock()
|
||||
.unwrap()
|
||||
@@ -80,7 +80,7 @@ fn main() {
|
||||
let double = DoubleServer::new(add_client);
|
||||
let double_addr = double.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
|
||||
let double_client = double::SyncClient::connect(&double_addr).unwrap();
|
||||
let mut double_client = double::SyncClient::connect(&double_addr).unwrap();
|
||||
for i in 0..5 {
|
||||
println!("{:?}", double_client.double(i).unwrap());
|
||||
}
|
||||
|
||||
@@ -44,7 +44,7 @@ struct Server;
|
||||
impl FutureService for Server {
|
||||
type ReadFut = futures::Finished<Arc<Vec<u8>>, Never>;
|
||||
|
||||
fn read(&self) -> Self::ReadFut {
|
||||
fn read(&mut self) -> Self::ReadFut {
|
||||
futures::finished(BUF.clone())
|
||||
}
|
||||
}
|
||||
@@ -53,7 +53,7 @@ const CHUNK_SIZE: u32 = 1 << 19;
|
||||
|
||||
fn bench_tarpc(target: u64) {
|
||||
let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
let client = SyncClient::connect(&addr).unwrap();
|
||||
let mut client = SyncClient::connect(&addr).unwrap();
|
||||
let start = time::Instant::now();
|
||||
let mut nread = 0;
|
||||
while nread < target {
|
||||
|
||||
@@ -31,7 +31,7 @@ struct Bar;
|
||||
impl bar::FutureService for Bar {
|
||||
type BarFut = futures::Finished<i32, Never>;
|
||||
|
||||
fn bar(&self, i: i32) -> Self::BarFut {
|
||||
fn bar(&mut self, i: i32) -> Self::BarFut {
|
||||
futures::finished(i)
|
||||
}
|
||||
}
|
||||
@@ -47,7 +47,7 @@ struct Baz;
|
||||
impl baz::FutureService for Baz {
|
||||
type BazFut = futures::Finished<String, Never>;
|
||||
|
||||
fn baz(&self, s: String) -> Self::BazFut {
|
||||
fn baz(&mut self, s: String) -> Self::BazFut {
|
||||
futures::finished(format!("Hello, {}!", s))
|
||||
}
|
||||
}
|
||||
@@ -61,8 +61,8 @@ fn main() {
|
||||
let bar_addr = Bar.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
let baz_addr = Baz.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
|
||||
let bar_client = bar::SyncClient::connect(&bar_addr).unwrap();
|
||||
let baz_client = baz::SyncClient::connect(&baz_addr).unwrap();
|
||||
let mut bar_client = bar::SyncClient::connect(&bar_addr).unwrap();
|
||||
let mut baz_client = baz::SyncClient::connect(&baz_addr).unwrap();
|
||||
|
||||
info!("Result: {:?}", bar_client.bar(17));
|
||||
|
||||
|
||||
Reference in New Issue
Block a user