Merge branch 'master' of github.com:google/tarpc into pre-push-bench

This commit is contained in:
Tim Kuehn
2017-02-16 00:59:34 -08:00
9 changed files with 43 additions and 39 deletions

View File

@@ -6,10 +6,14 @@ license = "MIT"
documentation = "https://docs.rs/tarpc"
homepage = "https://github.com/google/tarpc"
repository = "https://github.com/google/tarpc"
keywords = ["rpc", "protocol", "remote", "procedure", "serialize", "tls"]
keywords = ["rpc", "network", "server", "api", "tls"]
categories = ["asynchronous", "network-programming"]
readme = "README.md"
description = "An RPC framework for Rust with a focus on ease of use."
[badges]
travis-ci = { repository = "google/tarpc" }
[dependencies]
bincode = "1.0.0-alpha2"
byteorder = "1.0"

View File

@@ -117,10 +117,10 @@ service! {
struct HelloServer;
impl FutureService for HelloServer {
type HelloFut = futures::Finished<String, Never>;
type HelloFut = Result<String, Never>;
fn hello(&self, name: String) -> Self::HelloFut {
futures::finished(format!("Hello, {}!", name))
Ok(format!("Hello, {}!", name))
}
}
@@ -187,10 +187,10 @@ service! {
struct HelloServer;
impl FutureService for HelloServer {
type HelloFut = futures::Finished<String, Never>;
type HelloFut = Result<String, Never>;
fn hello(&mut self, name: String) -> Self::HelloFut {
futures::finished(format!("Hello, {}!", name))
Ok(format!("Hello, {}!", name))
}
}
@@ -264,10 +264,10 @@ and the following future-based trait:
```rust
impl FutureService for HelloServer {
type HelloFut = futures::Finished<String, Message>;
type HelloFut = Result<String, Message>;
fn hello(&mut self, name: String) -> Self::HelloFut {
futures::finished(format!("Hello, {}!", name))
Ok(format!("Hello, {}!", name))
}
}
```

View File

@@ -12,6 +12,7 @@ extern crate env_logger;
extern crate futures;
#[macro_use]
extern crate log;
extern crate serde;
#[macro_use]
extern crate tarpc;
extern crate tokio_core;
@@ -30,7 +31,7 @@ use tarpc::util::{FirstSocketAddr, Never};
use tokio_core::reactor;
service! {
rpc read(size: u32) -> Vec<u8>;
rpc read(size: u32) -> serde::bytes::ByteBuf;
}
#[derive(Clone)]
@@ -49,19 +50,19 @@ impl Server {
}
impl FutureService for Server {
type ReadFut = CpuFuture<Vec<u8>, Never>;
type ReadFut = CpuFuture<serde::bytes::ByteBuf, Never>;
fn read(&self, size: u32) -> Self::ReadFut {
let request_number = self.request_count.fetch_add(1, Ordering::SeqCst);
debug!("Server received read({}) no. {}", size, request_number);
self.pool
.spawn(futures::lazy(move || {
let mut vec: Vec<u8> = Vec::with_capacity(size as usize);
let mut vec = Vec::with_capacity(size as usize);
for i in 0..size {
vec.push(((i % 2) << 8) as u8);
}
debug!("Server sending response no. {}", request_number);
futures::finished(vec)
Ok(vec.into())
}))
}
}
@@ -106,11 +107,9 @@ fn run_once(clients: Vec<FutureClient>,
concurrency: u32)
-> impl Future<Item = (), Error = ()> + 'static {
let start = Instant::now();
let num_clients = clients.len();
futures::stream::futures_unordered((0..concurrency as usize)
.map(|iteration| (iteration + 1, iteration % num_clients))
.map(|(iteration, client_idx)| {
let client = &clients[client_idx];
.zip(clients.iter().enumerate().cycle())
.map(|(iteration, (client_idx, client))| {
let start = Instant::now();
debug!("Client {} reading (iteration {})...", client_idx, iteration);
client.read(CHUNK_SIZE)
@@ -182,10 +181,7 @@ fn main() {
info!("Client {} connecting...", i);
FutureClient::connect(addr, client::Options::default().remote(remote))
.map_err(|e| panic!(e))
})
// Need an intermediate collection to connect the clients in parallel,
// because `futures::collect` iterates sequentially.
.collect::<Vec<_>>();
});
let run = futures::collect(clients).and_then(|clients| run_once(clients, concurrency));

View File

@@ -49,11 +49,11 @@ struct Subscriber {
}
impl subscriber::FutureService for Subscriber {
type ReceiveFut = futures::Finished<(), Never>;
type ReceiveFut = Result<(), Never>;
fn receive(&self, message: String) -> Self::ReceiveFut {
println!("{} received message: {}", self.id, message);
futures::finished(())
Ok(())
}
}

View File

@@ -25,10 +25,10 @@ service! {
struct HelloServer;
impl FutureService for HelloServer {
type HelloFut = futures::Finished<String, Never>;
type HelloFut = Result<String, Never>;
fn hello(&self, name: String) -> Self::HelloFut {
futures::finished(format!("Hello, {}!", name))
Ok(format!("Hello, {}!", name))
}
}

View File

@@ -40,10 +40,10 @@ pub mod double {
struct AddServer;
impl AddFutureService for AddServer {
type AddFut = futures::Finished<i32, Never>;
type AddFut = Result<i32, Never>;
fn add(&self, x: i32, y: i32) -> Self::AddFut {
futures::finished(x + y)
Ok(x + y)
}
}

View File

@@ -12,6 +12,7 @@ extern crate lazy_static;
extern crate tarpc;
extern crate env_logger;
extern crate futures;
extern crate serde;
extern crate tokio_core;
use std::io::{Read, Write, stdout};
@@ -25,7 +26,7 @@ use tarpc::util::{FirstSocketAddr, Never};
use tokio_core::reactor;
lazy_static! {
static ref BUF: Arc<Vec<u8>> = Arc::new(gen_vec(CHUNK_SIZE as usize));
static ref BUF: Arc<serde::bytes::ByteBuf> = Arc::new(gen_vec(CHUNK_SIZE as usize).into());
}
fn gen_vec(size: usize) -> Vec<u8> {
@@ -37,17 +38,17 @@ fn gen_vec(size: usize) -> Vec<u8> {
}
service! {
rpc read() -> Arc<Vec<u8>>;
rpc read() -> Arc<serde::bytes::ByteBuf>;
}
#[derive(Clone)]
struct Server;
impl FutureService for Server {
type ReadFut = futures::Finished<Arc<Vec<u8>>, Never>;
type ReadFut = Result<Arc<serde::bytes::ByteBuf>, Never>;
fn read(&self) -> Self::ReadFut {
futures::finished(BUF.clone())
Ok(BUF.clone())
}
}

View File

@@ -31,10 +31,10 @@ mod bar {
#[derive(Clone)]
struct Bar;
impl bar::FutureService for Bar {
type BarFut = futures::Finished<i32, Never>;
type BarFut = Result<i32, Never>;
fn bar(&self, i: i32) -> Self::BarFut {
futures::finished(i)
Ok(i)
}
}
@@ -47,10 +47,10 @@ mod baz {
#[derive(Clone)]
struct Baz;
impl baz::FutureService for Baz {
type BazFut = futures::Finished<String, Never>;
type BazFut = Result<String, Never>;
fn baz(&self, s: String) -> Self::BazFut {
futures::finished(format!("Hello, {}!", s))
Ok(format!("Hello, {}!", s))
}
}

View File

@@ -331,7 +331,7 @@ macro_rules! service {
snake_to_camel! {
/// The type of future returned by `{}`.
type $fn_name: $crate::futures::Future<Item=$out, Error=$error>;
type $fn_name: $crate::futures::IntoFuture<Item=$out, Error=$error>;
}
$(#[$attr])*
@@ -375,10 +375,12 @@ macro_rules! service {
enum tarpc_service_FutureReply__<tarpc_service_S__: FutureService> {
DeserializeError(tarpc_service_Future__),
$($fn_name(
$crate::futures::Then<ty_snake_to_camel!(tarpc_service_S__::$fn_name),
tarpc_service_Future__,
fn(::std::result::Result<$out, $error>)
-> tarpc_service_Future__>)),*
$crate::futures::Then<
<ty_snake_to_camel!(tarpc_service_S__::$fn_name)
as $crate::futures::IntoFuture>::Future,
tarpc_service_Future__,
fn(::std::result::Result<$out, $error>)
-> tarpc_service_Future__>)),*
}
impl<S: FutureService> $crate::futures::Future for tarpc_service_FutureReply__<S> {
@@ -451,7 +453,8 @@ macro_rules! service {
}
return tarpc_service_FutureReply__::$fn_name(
$crate::futures::Future::then(
FutureService::$fn_name(&self.0, $($arg),*),
$crate::futures::IntoFuture::into_future(
FutureService::$fn_name(&self.0, $($arg),*)),
tarpc_service_wrap__));
}
)*