From 608be5372bd268d71c4017b9d39627fc6dbd90c0 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sat, 5 Nov 2016 19:28:13 -0700 Subject: [PATCH 01/13] Try to fix concurrency example --- examples/concurrency.rs | 45 +++++++++++++++++++++++------------------ 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/examples/concurrency.rs b/examples/concurrency.rs index d6a8611..4415bb9 100644 --- a/examples/concurrency.rs +++ b/examples/concurrency.rs @@ -18,6 +18,7 @@ extern crate futures_cpupool; use futures::Future; use futures_cpupool::{CpuFuture, CpuPool}; +use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; use tarpc::future::{Connect}; use tarpc::util::{FirstSocketAddr, Never, spawn_core}; @@ -46,20 +47,22 @@ impl FutureService for Server { for i in 0..size { vec.push((i % 1 << 8) as u8); } - futures::finished::<_, Never>(vec) + futures::finished(vec) })) } } -fn run_once<'a>(clients: &'a [FutureClient], concurrency: u32, print: bool) - -> Box + 'a> +fn run_once(clients: Arc>, concurrency: u32) -> Box> { let start = Instant::now(); let futs = clients.iter() + .enumerate() .cycle() + .enumerate() .take(concurrency as usize) - .map(|client| { + .map(|(iteration, (client_id, client))| { let start = SystemTime::now(); + debug!("Client {} reading (iteration {})...", client_id, iteration); let future = client.read(CHUNK_SIZE).map(move |_| start.elapsed().unwrap()); future }) @@ -76,14 +79,12 @@ fn run_once<'a>(clients: &'a [FutureClient], concurrency: u32, print: bool) let min_latency = *latencies.iter().min().unwrap(); let max_latency = *latencies.iter().max().unwrap(); - if print { - println!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs", - latencies.len(), - mean.microseconds(), - min_latency.microseconds(), - max_latency.microseconds(), - total_time.microseconds()); - } + info!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs", + latencies.len(), + mean.microseconds(), + min_latency.microseconds(), + max_latency.microseconds(), + total_time.microseconds()); }).map_err(|e| panic!(e))) } @@ -107,16 +108,16 @@ fn main() { let _ = env_logger::init(); let server = Server::new().listen("localhost:0".first_socket_addr()).wait().unwrap(); - println!("Server listening on {}.", server.local_addr()); + info!("Server listening on {}.", server.local_addr()); // The driver of the main future. let mut core = reactor::Core::new().unwrap(); - let clients = (1...5) + let clients = (0..4) // Spin up a couple threads to drive the clients. .map(|i| (i, spawn_core())) .map(|(i, remote)| { - println!("Client {} connecting...", i); + info!("Client {} connecting...", i); FutureClient::connect_remotely(server.local_addr(), &remote) .map_err(|e| panic!(e)) }) @@ -124,9 +125,13 @@ fn main() { // because `futures::collect` iterates sequentially. .collect::>(); - let clients = core.run(futures::collect(clients)).unwrap(); - println!("Starting..."); - let runs = ::std::iter::once(run_once(&clients, MAX_CONCURRENCY, false)) - .chain((1...MAX_CONCURRENCY).map(|concurrency| run_once(&clients, concurrency, true))); - core.run(futures::collect(runs)).unwrap(); + let runs = futures::collect(clients).and_then(|clients| { + let clients = Arc::new(clients); + let runs = (1...MAX_CONCURRENCY) + .map(move |concurrency| run_once(clients.clone(), concurrency)); + futures::collect(runs) + }); + + info!("Starting..."); + core.run(runs).unwrap(); } From 13e56481bb2ba18a2472109d19f9d34963ba2385 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sat, 3 Dec 2016 17:00:12 -0800 Subject: [PATCH 02/13] Track latest changes to tokio-proto. --- Cargo.toml | 6 +- examples/concurrency.rs | 111 +++++++++++---------- examples/pubsub.rs | 19 ++-- examples/readme.rs | 3 +- examples/readme2.rs | 3 +- examples/readme_future.rs | 4 +- examples/server_calling_server.rs | 11 +- examples/throughput.rs | 4 +- examples/two_clients.rs | 9 +- src/client.rs | 104 +++++++++++-------- src/errors.rs | 11 +- src/framed.rs | 160 ++++++++++++++---------------- src/lib.rs | 3 +- src/macros.rs | 35 +++---- src/plugins/src/lib.rs | 25 ++--- src/server.rs | 47 ++++++--- src/util.rs | 1 + 17 files changed, 283 insertions(+), 273 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d5cd548..f3b8df8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,15 +25,17 @@ take = "0.1" tokio-service = { git = "https://github.com/tokio-rs/tokio-service" } tokio-proto = { git = "https://github.com/tokio-rs/tokio-proto" } tokio-core = { git = "https://github.com/tokio-rs/tokio-core" } +net2 = "0.2" [replace] -"tokio-core:0.1.0" = { git = "https://github.com/tokio-rs/tokio-core" } -"futures:0.1.3" = { git = "https://github.com/alexcrichton/futures-rs" } +"tokio-core:0.1.1" = { git = "https://github.com/tokio-rs/tokio-core" } +"futures:0.1.6" = { git = "https://github.com/alexcrichton/futures-rs" } [dev-dependencies] chrono = "0.2" env_logger = "0.3" futures-cpupool = "0.1" +clap = "2.0" [features] unstable = ["serde/unstable"] diff --git a/examples/concurrency.rs b/examples/concurrency.rs index 4415bb9..eb692d0 100644 --- a/examples/concurrency.rs +++ b/examples/concurrency.rs @@ -7,6 +7,7 @@ #![plugin(tarpc_plugins)] extern crate chrono; +extern crate clap; extern crate env_logger; extern crate futures; #[macro_use] @@ -16,9 +17,9 @@ extern crate tarpc; extern crate tokio_core; extern crate futures_cpupool; +use clap::{Arg, App}; use futures::Future; use futures_cpupool::{CpuFuture, CpuPool}; -use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; use tarpc::future::{Connect}; use tarpc::util::{FirstSocketAddr, Never, spawn_core}; @@ -41,6 +42,7 @@ impl FutureService for Server { type ReadFut = CpuFuture, Never>; fn read(&self, size: u32) -> Self::ReadFut { + debug!("Server received read({})", size); self.0 .spawn(futures::lazy(move || { let mut vec: Vec = Vec::with_capacity(size as usize); @@ -52,41 +54,7 @@ impl FutureService for Server { } } -fn run_once(clients: Arc>, concurrency: u32) -> Box> -{ - let start = Instant::now(); - let futs = clients.iter() - .enumerate() - .cycle() - .enumerate() - .take(concurrency as usize) - .map(|(iteration, (client_id, client))| { - let start = SystemTime::now(); - debug!("Client {} reading (iteration {})...", client_id, iteration); - let future = client.read(CHUNK_SIZE).map(move |_| start.elapsed().unwrap()); - future - }) - // Need an intermediate collection to kick off each future, - // because futures::collect will iterate sequentially. - .collect::>(); - let futs = futures::collect(futs); - - Box::new(futs.map(move |latencies| { - let total_time = start.elapsed(); - - let sum_latencies = latencies.iter().fold(Duration::new(0, 0), |sum, &dur| sum + dur); - let mean = sum_latencies / latencies.len() as u32; - let min_latency = *latencies.iter().min().unwrap(); - let max_latency = *latencies.iter().max().unwrap(); - - info!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs", - latencies.len(), - mean.microseconds(), - min_latency.microseconds(), - max_latency.microseconds(), - total_time.microseconds()); - }).map_err(|e| panic!(e))) -} +const CHUNK_SIZE: u32 = 1 << 10; trait Microseconds { fn microseconds(&self) -> i64; @@ -101,37 +69,76 @@ impl Microseconds for Duration { } } -const CHUNK_SIZE: u32 = 1 << 10; -const MAX_CONCURRENCY: u32 = 100; +fn run_once(clients: Vec, concurrency: u32) -> impl Future { + let start = Instant::now(); + let futs = clients.iter() + .enumerate() + .cycle() + .enumerate() + .take(concurrency as usize) + .map(|(iteration, (client_id, client))| { + let start = SystemTime::now(); + debug!("Client {} reading (iteration {})...", client_id, iteration + 1); + let future = client.read(CHUNK_SIZE).map(move |_| start.elapsed().unwrap()); + future + }) + // Need an intermediate collection to kick off each future, + // because futures::collect will iterate sequentially. + .collect::>(); + let futs = futures::collect(futs); + + futs.map(move |latencies| { + let total_time = start.elapsed(); + + let sum_latencies = latencies.iter().fold(Duration::new(0, 0), |sum, &dur| sum + dur); + let mean = sum_latencies / latencies.len() as u32; + let min_latency = *latencies.iter().min().unwrap(); + let max_latency = *latencies.iter().max().unwrap(); + + info!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs", + latencies.len(), + mean.microseconds(), + min_latency.microseconds(), + max_latency.microseconds(), + total_time.microseconds()); + }).map_err(|e| panic!(e)) +} fn main() { let _ = env_logger::init(); - - let server = Server::new().listen("localhost:0".first_socket_addr()).wait().unwrap(); - info!("Server listening on {}.", server.local_addr()); - - // The driver of the main future. - let mut core = reactor::Core::new().unwrap(); + let matches = App::new("Tarpc Concurrency") + .about("Demonstrates making concurrent requests to a tarpc service.") + .arg(Arg::with_name("concurrency") + .short("c") + .long("concurrency") + .value_name("LEVEL") + .help("Sets a custom concurrency level") + .takes_value(true)) + .get_matches(); + let concurrency = matches.value_of("concurrency") + .map(&str::parse) + .map(Result::unwrap) + .unwrap_or(10); + let addr = Server::new().listen("localhost:0".first_socket_addr()).wait().unwrap(); + info!("Server listening on {}.", addr); let clients = (0..4) // Spin up a couple threads to drive the clients. .map(|i| (i, spawn_core())) .map(|(i, remote)| { info!("Client {} connecting...", i); - FutureClient::connect_remotely(server.local_addr(), &remote) + FutureClient::connect_remotely(&addr, &remote) .map_err(|e| panic!(e)) }) // Need an intermediate collection to connect the clients in parallel, // because `futures::collect` iterates sequentially. .collect::>(); - let runs = futures::collect(clients).and_then(|clients| { - let clients = Arc::new(clients); - let runs = (1...MAX_CONCURRENCY) - .map(move |concurrency| run_once(clients.clone(), concurrency)); - futures::collect(runs) - }); + let run = futures::collect(clients).and_then(|clients| run_once(clients, concurrency)); info!("Starting..."); - core.run(runs).unwrap(); + + // The driver of the main future. + let mut core = reactor::Core::new().unwrap(); + core.run(run).unwrap(); } diff --git a/examples/pubsub.rs b/examples/pubsub.rs index 0314577..92cdfee 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -56,13 +56,13 @@ impl subscriber::FutureService for Subscriber { } impl Subscriber { - fn new(id: u32) -> tokio::server::ServerHandle { + fn new(id: u32) -> SocketAddr { Subscriber { - id: id, - } - .listen("localhost:0".first_socket_addr()) - .wait() - .unwrap() + id: id, + } + .listen("localhost:0".first_socket_addr()) + .wait() + .unwrap() } } @@ -117,19 +117,18 @@ impl publisher::FutureService for Publisher { fn main() { let _ = env_logger::init(); - let publisher_server = Publisher::new() + let publisher_addr = Publisher::new() .listen("localhost:0".first_socket_addr()) .wait() .unwrap(); - let publisher_addr = publisher_server.local_addr(); let publisher_client = publisher::SyncClient::connect(publisher_addr).unwrap(); let subscriber1 = Subscriber::new(0); - publisher_client.subscribe(0, *subscriber1.local_addr()).unwrap(); + publisher_client.subscribe(0, subscriber1).unwrap(); let subscriber2 = Subscriber::new(1); - publisher_client.subscribe(1, *subscriber2.local_addr()).unwrap(); + publisher_client.subscribe(1, subscriber2).unwrap(); println!("Broadcasting..."); diff --git a/examples/readme.rs b/examples/readme.rs index a16c651..3613332 100644 --- a/examples/readme.rs +++ b/examples/readme.rs @@ -27,8 +27,7 @@ impl SyncService for HelloServer { } fn main() { - let addr = "localhost:10000"; - HelloServer.listen(addr).unwrap(); + let addr = HelloServer.listen("localhost:10000").unwrap(); let client = SyncClient::connect(addr).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); } diff --git a/examples/readme2.rs b/examples/readme2.rs index d7e964b..fd7e81e 100644 --- a/examples/readme2.rs +++ b/examples/readme2.rs @@ -49,8 +49,7 @@ impl SyncService for HelloServer { } fn main() { - let addr = "localhost:10000"; - HelloServer.listen(addr).unwrap(); + let addr = HelloServer.listen("localhost:10000").unwrap(); let client = SyncClient::connect(addr).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); println!("{}", client.hello("".to_string()).unwrap_err()); diff --git a/examples/readme_future.rs b/examples/readme_future.rs index 103568a..a0bb26f 100644 --- a/examples/readme_future.rs +++ b/examples/readme_future.rs @@ -10,6 +10,7 @@ extern crate futures; #[macro_use] extern crate tarpc; +use futures::Future; use tarpc::util::{FirstSocketAddr, Never}; use tarpc::sync::Connect; @@ -29,8 +30,7 @@ impl FutureService for HelloServer { } fn main() { - let addr = "localhost:10000"; - let _server = HelloServer.listen(addr.first_socket_addr()); + let addr = HelloServer.listen("localhost:10000".first_socket_addr()).wait().unwrap(); let client = SyncClient::connect(addr).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); } diff --git a/examples/server_calling_server.rs b/examples/server_calling_server.rs index a9d7ffe..120a5f2 100644 --- a/examples/server_calling_server.rs +++ b/examples/server_calling_server.rs @@ -74,12 +74,13 @@ impl DoubleFutureService for DoubleServer { fn main() { let _ = env_logger::init(); - let add = AddServer.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let add_client = add::FutureClient::connect(add.local_addr()).wait().unwrap(); - let double = DoubleServer::new(add_client); - let double = double.listen("localhost:0".first_socket_addr()).wait().unwrap(); + let add_addr = AddServer.listen("localhost:0".first_socket_addr()).wait().unwrap(); + let add_client = add::FutureClient::connect(&add_addr).wait().unwrap(); - let double_client = double::SyncClient::connect(double.local_addr()).unwrap(); + let double = DoubleServer::new(add_client); + let double_addr = double.listen("localhost:0".first_socket_addr()).wait().unwrap(); + + let double_client = double::SyncClient::connect(&double_addr).unwrap(); for i in 0..5 { println!("{:?}", double_client.double(i).unwrap()); } diff --git a/examples/throughput.rs b/examples/throughput.rs index f82b5cb..ad63fc6 100644 --- a/examples/throughput.rs +++ b/examples/throughput.rs @@ -52,8 +52,8 @@ impl FutureService for Server { const CHUNK_SIZE: u32 = 1 << 19; fn bench_tarpc(target: u64) { - let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let client = SyncClient::connect(handle.local_addr()).unwrap(); + let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); + let client = SyncClient::connect(&addr).unwrap(); let start = time::Instant::now(); let mut nread = 0; while nread < target { diff --git a/examples/two_clients.rs b/examples/two_clients.rs index b26e0d0..c8a06b1 100644 --- a/examples/two_clients.rs +++ b/examples/two_clients.rs @@ -58,10 +58,11 @@ macro_rules! pos { fn main() { let _ = env_logger::init(); - let bar = Bar.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let baz = Baz.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let bar_client = bar::SyncClient::connect(bar.local_addr()).unwrap(); - let baz_client = baz::SyncClient::connect(baz.local_addr()).unwrap(); + let bar_addr = Bar.listen("localhost:0".first_socket_addr()).wait().unwrap(); + let baz_addr = Baz.listen("localhost:0".first_socket_addr()).wait().unwrap(); + + let bar_client = bar::SyncClient::connect(&bar_addr).unwrap(); + let baz_client = baz::SyncClient::connect(&baz_addr).unwrap(); info!("Result: {:?}", bar_client.bar(17)); diff --git a/src/client.rs b/src/client.rs index c17f0e9..b902bb9 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,57 +3,62 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use WireError; +use {WireError, framed}; use bincode::serde::DeserializeError; -use framed::Framed; -use futures::{self, Async, Future}; +use futures::{self, Future}; use serde::{Deserialize, Serialize}; use std::fmt; use std::io; use tokio_core::net::TcpStream; -use tokio_core::reactor; -use tokio_proto::easy::{EasyClient, EasyResponse, multiplex}; +use tokio_proto::BindClient as ProtoBindClient; +use tokio_proto::multiplex::Multiplex; use tokio_service::Service; +type WireResponse = Result>, DeserializeError>; +type ResponseFuture = futures::Map< as Service>::Future, + fn(WireResponse) -> Result>>; +type BindClient = + >> as ProtoBindClient>::BindClient; + /// A client that impls `tokio_service::Service` that writes and reads bytes. /// /// Typically, this would be combined with a serialization pre-processing step /// and a deserialization post-processing step. -pub struct Client { - inner: EasyClient>, +pub struct Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static, +{ + inner: BindClient, } -type WireResponse = Result>, DeserializeError>; -type ResponseFuture = futures::Map>, - fn(WireResponse) -> Result>>; - impl Service for Client - where Req: Send + 'static, - Resp: Send + 'static, - E: Send + 'static + where Req: Serialize + Sync + Send + 'static, + Resp: Deserialize + Sync + Send + 'static, + E: Deserialize + Sync + Send + 'static { type Request = Req; type Response = Result>; type Error = io::Error; - type Future = ResponseFuture; - - fn poll_ready(&self) -> Async<()> { - self.inner.poll_ready() - } + type Future = ResponseFuture; fn call(&self, request: Self::Request) -> Self::Future { self.inner.call(request).map(Self::map_err) } } -impl Client { - fn new(tcp: TcpStream, handle: &reactor::Handle) -> Self - where Req: Serialize + Send + 'static, - Resp: Deserialize + Send + 'static, - E: Deserialize + Send + 'static +impl Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static, +{ + fn new(inner: BindClient) -> Self + where Req: Serialize + Sync + Send + 'static, + Resp: Deserialize + Sync + Send + 'static, + E: Deserialize + Sync + Send + 'static { Client { - inner: multiplex::connect(Framed::new(tcp), handle), + inner: inner, } } @@ -64,7 +69,11 @@ impl Client { } } -impl fmt::Debug for Client { +impl fmt::Debug for Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static, +{ fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { write!(f, "Client {{ .. }}") } @@ -72,7 +81,7 @@ impl fmt::Debug for Client { /// Exposes a trait for connecting asynchronously to servers. pub mod future { - use REMOTE; + use {REMOTE, framed}; use futures::{self, Async, Future}; use serde::{Deserialize, Serialize}; use std::io; @@ -81,6 +90,7 @@ pub mod future { use super::Client; use tokio_core::net::TcpStream; use tokio_core::{self, reactor}; + use tokio_proto::BindClient; /// Types that can connect to a server asynchronously. pub trait Connect<'a>: Sized { @@ -104,11 +114,19 @@ pub mod future { } /// A future that resolves to a `Client` or an `io::Error`. - pub struct ConnectFuture { + pub struct ConnectFuture + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static, + { inner: futures::Oneshot>>, } - impl Future for ConnectFuture { + impl Future for ConnectFuture + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static, + { type Item = Client; type Error = io::Error; @@ -129,9 +147,9 @@ pub mod future { } impl<'a, Req, Resp, E> Future for ConnectWithFuture<'a, Req, Resp, E> - where Req: Serialize + Send + 'static, - Resp: Deserialize + Send + 'static, - E: Deserialize + Send + 'static + where Req: Serialize + Sync + Send + 'static, + Resp: Deserialize + Sync + Send + 'static, + E: Deserialize + Sync + Send + 'static { type Item = Client; type Error = io::Error; @@ -150,21 +168,21 @@ pub mod future { } impl<'a, Req, Resp, E> FnOnce<(TcpStream,)> for MultiplexConnect<'a, Req, Resp, E> - where Req: Serialize + Send + 'static, - Resp: Deserialize + Send + 'static, - E: Deserialize + Send + 'static + where Req: Serialize + Sync + Send + 'static, + Resp: Deserialize + Sync + Send + 'static, + E: Deserialize + Sync + Send + 'static { type Output = Client; extern "rust-call" fn call_once(self, (tcp,): (TcpStream,)) -> Client { - Client::new(tcp, self.0) + Client::new(framed::Proto::new().bind_client(self.0, tcp)) } } impl<'a, Req, Resp, E> Connect<'a> for Client - where Req: Serialize + Send + 'static, - Resp: Deserialize + Send + 'static, - E: Deserialize + Send + 'static + where Req: Serialize + Sync + Send + 'static, + Resp: Deserialize + Sync + Send + 'static, + E: Deserialize + Sync + Send + 'static { type ConnectFut = ConnectFuture; type ConnectWithFut = ConnectWithFuture<'a, Req, Resp, E>; @@ -175,7 +193,7 @@ pub mod future { remote.spawn(move |handle| { let handle2 = handle.clone(); TcpStream::connect(&addr, handle) - .map(move |tcp| Client::new(tcp, &handle2)) + .map(move |tcp| Client::new(framed::Proto::new().bind_client(&handle2, tcp))) .then(move |result| { tx.complete(result); Ok(()) @@ -207,9 +225,9 @@ pub mod sync { } impl Connect for Client - where Req: Serialize + Send + 'static, - Resp: Deserialize + Send + 'static, - E: Deserialize + Send + 'static + where Req: Serialize + Sync + Send + 'static, + Resp: Deserialize + Sync + Send + 'static, + E: Deserialize + Sync + Send + 'static { fn connect(addr: A) -> Result where A: ToSocketAddrs diff --git a/src/errors.rs b/src/errors.rs index 1668ec9..78bc44c 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -3,7 +3,7 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use {bincode, tokio_proto as proto}; +use bincode; use serde::{Deserialize, Serialize}; use std::{fmt, io}; use std::error::Error as StdError; @@ -75,15 +75,6 @@ impl StdError for Error { } } -impl From>> for Error { - fn from(err: proto::Error>) -> Self { - match err { - proto::Error::Transport(e) => e, - proto::Error::Io(e) => e.into(), - } - } -} - impl From for Error { fn from(err: io::Error) -> Self { Error::Io(err) diff --git a/src/framed.rs b/src/framed.rs index ae6481a..f4ad2ae 100644 --- a/src/framed.rs +++ b/src/framed.rs @@ -3,104 +3,71 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. +use {serde, tokio_core}; use bincode::{SizeLimit, serde as bincode}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; -use futures::{Async, Poll}; -use serde; use std::io::{self, Cursor}; use std::marker::PhantomData; use std::mem; -use tokio_core::easy::{self, EasyBuf, EasyFramed}; -use tokio_core::io::{FramedIo, Io}; -use tokio_proto::multiplex::{self, RequestId}; +use tokio_core::io::{EasyBuf, Framed, Io}; +use tokio_proto::streaming::multiplex::{self, RequestId}; +use tokio_proto::multiplex::{ClientProto, ServerProto}; use util::Never; -/// Handles the IO of tarpc messages. Similar to `tokio_core::easy::EasyFramed` except that it -/// hardcodes a parser and serializer suitable for tarpc messages. -pub struct Framed { - inner: EasyFramed, Serializer>, -} - -impl Framed { - /// Constructs a new tarpc FramedIo - pub fn new(upstream: I) -> Framed - where I: Io, - In: serde::Serialize, - Out: serde::Deserialize - { - Framed { inner: EasyFramed::new(upstream, Parser::new(), Serializer::new()) } - } -} - /// The type of message sent and received by the transport. pub type Frame = multiplex::Frame; -impl FramedIo for Framed - where I: Io, - In: serde::Serialize, - Out: serde::Deserialize -{ - type In = (RequestId, In); - type Out = Option<(RequestId, Result)>; - fn poll_read(&mut self) -> Async<()> { - self.inner.poll_read() - } - - fn poll_write(&mut self) -> Async<()> { - self.inner.poll_write() - } - - fn read(&mut self) -> Poll { - self.inner.read() - } - - fn write(&mut self, req: Self::In) -> Poll<(), io::Error> { - self.inner.write(req) - } - - fn flush(&mut self) -> Poll<(), io::Error> { - self.inner.flush() - } +// `T` is the type that `Codec` parses. +pub struct Codec { + state: CodecState, + _phantom_data: PhantomData<(Req, Resp)>, } -// `T` is the type that `Parser` parses. -struct Parser { - state: ParserState, - _phantom_data: PhantomData, -} - -enum ParserState { +enum CodecState { Id, Len { id: u64 }, Payload { id: u64, len: u64 }, } -impl Parser { +impl Codec { fn new() -> Self { - Parser { - state: ParserState::Id, + Codec { + state: CodecState::Id, _phantom_data: PhantomData, } } } -impl easy::Parse for Parser - where T: serde::Deserialize +impl tokio_core::io::Codec for Codec + where Req: serde::Deserialize, + Resp: serde::Serialize, { - type Out = (RequestId, Result); + type Out = (RequestId, Resp); + type In = (RequestId, Result); - fn parse(&mut self, buf: &mut EasyBuf) -> Poll { - use self::ParserState::*; + fn encode(&mut self, (id, message): Self::Out, buf: &mut Vec) -> io::Result<()> { + buf.write_u64::(id).unwrap(); + buf.write_u64::(bincode::serialized_size(&message)).unwrap(); + bincode::serialize_into(buf, + &message, + SizeLimit::Infinite) + // TODO(tikue): handle err + .expect("In bincode::serialize_into"); + Ok(()) + } + + fn decode(&mut self, buf: &mut EasyBuf) -> Result, io::Error> { + use self::CodecState::*; loop { match self.state { - Id if buf.len() < mem::size_of::() => return Ok(Async::NotReady), + Id if buf.len() < mem::size_of::() => return Ok(None), Id => { self.state = Len { id: Cursor::new(&*buf.get_mut()).read_u64::()? }; *buf = buf.split_off(mem::size_of::()); } - Len { .. } if buf.len() < mem::size_of::() => return Ok(Async::NotReady), + Len { .. } if buf.len() < mem::size_of::() => return Ok(None), Len { id } => { self.state = Payload { id: id, @@ -108,7 +75,7 @@ impl easy::Parse for Parser }; *buf = buf.split_off(mem::size_of::()); } - Payload { len, .. } if buf.len() < len as usize => return Ok(Async::NotReady), + Payload { len, .. } if buf.len() < len as usize => return Ok(None), Payload { id, .. } => { let mut buf = buf.get_mut(); let result = bincode::deserialize_from(&mut Cursor::new(&mut *buf), @@ -119,40 +86,58 @@ impl easy::Parse for Parser // message. self.state = Id; - return Ok(Async::Ready((id, result))); + return Ok(Some((id, result))); } } } } } -struct Serializer(PhantomData); +/// Implements the `multiplex::ServerProto` trait. +pub struct Proto(PhantomData<(Req, Resp)>); -impl Serializer { - fn new() -> Self { - Serializer(PhantomData) +impl Proto { + /// Returns a new `Proto`. + pub fn new() -> Self { + Proto(PhantomData) } } -impl easy::Serialize for Serializer - where T: serde::Serialize +impl ServerProto for Proto + where T: Io + 'static, + Req: serde::Deserialize + 'static, + Resp: serde::Serialize + 'static, { - type In = (RequestId, T); + type Response = Resp; + type Request = Result; + type Error = io::Error; + type Transport = Framed>; + type BindTransport = Result; - fn serialize(&mut self, (id, message): Self::In, buf: &mut Vec) { - buf.write_u64::(id).unwrap(); - buf.write_u64::(bincode::serialized_size(&message)).unwrap(); - bincode::serialize_into(buf, - &message, - SizeLimit::Infinite) - // TODO(tikue): handle err - .expect("In bincode::serialize_into"); + fn bind_transport(&self, io: T) -> Self::BindTransport { + Ok(io.framed(Codec::new())) + } +} + +impl ClientProto for Proto + where T: Io + 'static, + Req: serde::Serialize + 'static, + Resp: serde::Deserialize + 'static, +{ + type Response = Result; + type Request = Req; + type Error = io::Error; + type Transport = Framed>; + type BindTransport = Result; + + fn bind_transport(&self, io: T) -> Self::BindTransport { + Ok(io.framed(Codec::new())) } } #[test] fn serialize() { - use tokio_core::easy::{Parse, Serialize}; + use tokio_core::io::Codec as TokioCodec; const MSG: (u64, (char, char, char)) = (4, ('a', 'b', 'c')); let mut buf = EasyBuf::new(); @@ -160,13 +145,14 @@ fn serialize() { // Serialize twice to check for idempotence. for _ in 0..2 { - Serializer::new().serialize(MSG, &mut vec); + let mut codec: Codec<(char, char, char), (char, char, char)> = Codec::new(); + codec.encode(MSG, &mut vec).unwrap(); buf.get_mut().append(&mut vec); - let actual: Poll<(u64, Result<(char, char, char), bincode::DeserializeError>), io::Error> = - Parser::new().parse(&mut buf); + let actual: Result)>, io::Error> = + codec.decode(&mut buf); match actual { - Ok(Async::Ready((id, ref v))) if id == MSG.0 && *v.as_ref().unwrap() == MSG.1 => {} + Ok(Some((id, ref v))) if id == MSG.0 && *v.as_ref().unwrap() == MSG.1 => {} bad => panic!("Expected {:?}, but got {:?}", Some(MSG), bad), } diff --git a/src/lib.rs b/src/lib.rs index 2e5846b..6aa202a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -68,6 +68,7 @@ extern crate bytes; extern crate lazy_static; #[macro_use] extern crate log; +extern crate net2; #[macro_use] extern crate serde_derive; extern crate take; @@ -95,8 +96,6 @@ pub use errors::{Error, SerializableError}; #[doc(hidden)] pub use errors::WireError; #[doc(hidden)] -pub use framed::Framed; -#[doc(hidden)] pub use server::{ListenFuture, Response, listen, listen_with}; /// Provides some utility error types, as well as a trait for spawning futures on the default event diff --git a/src/macros.rs b/src/macros.rs index d6ea8d9..7e167b9 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -396,7 +396,7 @@ macro_rules! service { /// Spawns the service, binding to the given address and running on /// the default tokio `Loop`. fn listen(self, addr: ::std::net::SocketAddr) -> $crate::ListenFuture { - return $crate::listen(addr, __tarpc_service_AsyncServer(self)); + return $crate::listen(addr, move || Ok(__tarpc_service_AsyncServer(self.clone()))); #[allow(non_camel_case_types)] #[derive(Clone)] @@ -461,10 +461,6 @@ macro_rules! service { type Error = ::std::io::Error; type Future = __tarpc_service_FutureReply<__tarpc_service_S>; - fn poll_ready(&self) -> $crate::futures::Async<()> { - $crate::futures::Async::Ready(()) - } - fn call(&self, __tarpc_service_request: Self::Request) -> Self::Future { let __tarpc_service_request = match __tarpc_service_request { Ok(__tarpc_service_request) => __tarpc_service_request, @@ -528,15 +524,16 @@ macro_rules! service { /// Spawns the service, binding to the given address and running on /// the default tokio `Loop`. fn listen(self, addr: L) - -> ::std::io::Result<$crate::tokio_proto::server::ServerHandle> + -> ::std::io::Result<::std::net::SocketAddr> where L: ::std::net::ToSocketAddrs { - let addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?; let __tarpc_service_service = __SyncServer { service: self, }; return $crate::futures::Future::wait( - FutureServiceExt::listen(__tarpc_service_service, addr)); + FutureServiceExt::listen( + __tarpc_service_service, + $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?)); #[derive(Clone)] struct __SyncServer { @@ -810,8 +807,8 @@ mod functional_test { #[test] fn simple() { let _ = env_logger::init(); - let handle = Server.listen("localhost:0".first_socket_addr()).unwrap(); - let client = SyncClient::connect(handle.local_addr()).unwrap(); + let addr = Server.listen("localhost:0".first_socket_addr()).unwrap(); + let client = SyncClient::connect(addr).unwrap(); assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap()); } @@ -819,8 +816,8 @@ mod functional_test { #[test] fn other_service() { let _ = env_logger::init(); - let handle = Server.listen("localhost:0".first_socket_addr()).unwrap(); - let client = super::other_service::SyncClient::connect(handle.local_addr()).unwrap(); + let addr = Server.listen("localhost:0".first_socket_addr()).unwrap(); + let client = super::other_service::SyncClient::connect(addr).expect("Could not connect!"); match client.foo().err().unwrap() { ::Error::ServerDeserialize(_) => {} // good bad => panic!("Expected Error::ServerDeserialize but got {}", bad), @@ -856,8 +853,8 @@ mod functional_test { #[test] fn simple() { let _ = env_logger::init(); - let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let client = FutureClient::connect(handle.local_addr()).wait().unwrap(); + let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); + let client = FutureClient::connect(&addr).wait().unwrap(); assert_eq!(3, client.add(1, 2).wait().unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap()); } @@ -865,9 +862,9 @@ mod functional_test { #[test] fn other_service() { let _ = env_logger::init(); - let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); + let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); let client = - super::other_service::FutureClient::connect(handle.local_addr()).wait().unwrap(); + super::other_service::FutureClient::connect(&addr).wait().unwrap(); match client.foo().wait().err().unwrap() { ::Error::ServerDeserialize(_) => {} // good bad => panic!(r#"Expected Error::ServerDeserialize but got "{}""#, bad), @@ -901,8 +898,8 @@ mod functional_test { use self::error_service::*; let _ = env_logger::init(); - let handle = ErrorServer.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let client = FutureClient::connect(handle.local_addr()).wait().unwrap(); + let addr = ErrorServer.listen("localhost:0".first_socket_addr()).wait().unwrap(); + let client = FutureClient::connect(&addr).wait().unwrap(); client.bar() .then(move |result| { match result.err().unwrap() { @@ -916,7 +913,7 @@ mod functional_test { .wait() .unwrap(); - let client = SyncClient::connect(handle.local_addr()).unwrap(); + let client = SyncClient::connect(&addr).unwrap(); match client.bar().err().unwrap() { ::Error::App(e) => { assert_eq!(e.description(), "lol jk"); diff --git a/src/plugins/src/lib.rs b/src/plugins/src/lib.rs index 0747c8d..95f22be 100644 --- a/src/plugins/src/lib.rs +++ b/src/plugins/src/lib.rs @@ -15,7 +15,7 @@ use syntax::ext::base::{ExtCtxt, MacResult, DummyResult, MacEager}; use syntax::ext::quote::rt::Span; use syntax::parse::{self, token, PResult}; use syntax::parse::parser::{Parser, PathStyle}; -use syntax::parse::token::intern_and_get_ident; +use syntax::symbol::Symbol; use syntax::ptr::P; use syntax::tokenstream::TokenTree; use syntax::util::small_vector::SmallVector; @@ -46,23 +46,12 @@ fn snake_to_camel(cx: &mut ExtCtxt, sp: Span, tts: &[TokenTree]) -> Box { - let mut updated = (**meta_item).clone(); - if let NameValue(_, Spanned { node: Str(ref mut doc, _), .. }) = updated.node { - let updated_doc = doc.replace("{}", &old_ident); - *doc = intern_and_get_ident(&updated_doc); - } else { - unreachable!() - }; - Some(P(updated)) - } - _ => None, + for attr in item.attrs.iter_mut().filter(|attr| attr.is_sugared_doc) { + if let NameValue(Spanned { node: Str(ref mut doc, _), .. }) = attr.value.node { + *doc = Symbol::intern(&doc.as_str().replace("{}", &old_ident)); + } else { + unreachable!() }; - if let Some(updated) = updated { - *meta_item = updated; - } } MacEager::trait_items(SmallVector::one(item)) @@ -158,7 +147,7 @@ fn convert(ident: &mut Ident) -> String { // The Fut suffix is hardcoded right now; this macro isn't really meant to be general-purpose. camel_ty.push_str("Fut"); - *ident = Ident::with_empty_ctxt(token::intern(&camel_ty)); + *ident = Ident::with_empty_ctxt(Symbol::intern(&camel_ty)); ident_str } diff --git a/src/server.rs b/src/server.rs index 667452e..2825c28 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,17 +3,17 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use REMOTE; +use {REMOTE, net2}; use bincode::serde::DeserializeError; use errors::WireError; -use framed::Framed; -use futures::{self, Async, Future}; +use framed::Proto; +use futures::{self, Async, Future, Stream}; use serde::{Deserialize, Serialize}; use std::io; use std::net::SocketAddr; +use tokio_core::net::TcpListener; use tokio_core::reactor::Handle; -use tokio_proto::easy::multiplex; -use tokio_proto::server::{self, ServerHandle}; +use tokio_proto::BindServer; use tokio_service::NewService; /// A message from server to client. @@ -29,15 +29,15 @@ pub fn listen(addr: SocketAddr, new_service: S) -> ListenFuture E: Serialize + 'static { let (tx, rx) = futures::oneshot(); - REMOTE.spawn(move |handle| Ok(tx.complete(listen_with(addr, new_service, handle)))); + REMOTE.spawn(move |handle| Ok(tx.complete(listen_with(addr, new_service, handle.clone())))); ListenFuture { inner: rx } } /// Spawns a service that binds to the given address using the given handle. pub fn listen_with(addr: SocketAddr, new_service: S, - handle: &Handle) - -> io::Result + handle: Handle) + -> io::Result where S: NewService, Response = Response, Error = io::Error> + Send + 'static, @@ -45,19 +45,40 @@ pub fn listen_with(addr: SocketAddr, Resp: Serialize + 'static, E: Serialize + 'static { - server::listen(handle, addr, move |stream| { - Ok(multiplex::EasyServer::new(new_service.new_service()?, Framed::new(stream)) - .map_err(|()| panic!("What do we do here"))) + let listener = listener(&addr, &handle)?; + let addr = listener.local_addr()?; + + let handle2 = handle.clone(); + let server = listener.incoming().for_each(move |(socket, _)| { + Proto::new().bind_server(&handle2, socket, new_service.new_service()?); + + Ok(()) + }).map_err(|e| error!("While processing incoming connections: {}", e)); + handle.spawn(server); + Ok(addr) +} + +fn listener(addr: &SocketAddr, + handle: &Handle) -> io::Result { + match *addr { + SocketAddr::V4(_) => net2::TcpBuilder::new_v4(), + SocketAddr::V6(_) => net2::TcpBuilder::new_v6() + }? + .reuse_address(true)? + .bind(addr)? + .listen(1024) + .and_then(|l| { + TcpListener::from_listener(l, addr, handle) }) } /// A future that resolves to a `ServerHandle`. pub struct ListenFuture { - inner: futures::Oneshot>, + inner: futures::Oneshot>, } impl Future for ListenFuture { - type Item = ServerHandle; + type Item = SocketAddr; type Error = io::Error; fn poll(&mut self) -> futures::Poll { diff --git a/src/util.rs b/src/util.rs index 4166d65..f6dd550 100644 --- a/src/util.rs +++ b/src/util.rs @@ -14,6 +14,7 @@ use tokio_core::reactor; /// A bottom type that impls `Error`, `Serialize`, and `Deserialize`. It is impossible to /// instantiate this type. +#[allow(unreachable_code)] #[derive(Debug)] pub struct Never(!); From 5c17ffacae7b1a1a426dc1fe8a9dacd95886e5bb Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Mon, 5 Dec 2016 15:23:07 -0800 Subject: [PATCH 03/13] Add `listen_with` fns. --- src/macros.rs | 40 +++++++++++++++++++++++++++++++++------- src/server.rs | 7 +++++++ 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/src/macros.rs b/src/macros.rs index 7e167b9..6c2cb39 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -393,10 +393,26 @@ macro_rules! service { /// Provides a function for starting the service. This is a separate trait from /// `FutureService` to prevent collisions with the names of RPCs. pub trait FutureServiceExt: FutureService { + fn listen(self, addr: ::std::net::SocketAddr) -> $crate::ListenFuture + { + let (tx, rx) = $crate::futures::oneshot(); + $crate::REMOTE.spawn(move |handle| + Ok(tx.complete(Self::listen_with(self, + addr, + handle.clone())))); + $crate::ListenFuture::from_oneshot(rx) + } + /// Spawns the service, binding to the given address and running on /// the default tokio `Loop`. - fn listen(self, addr: ::std::net::SocketAddr) -> $crate::ListenFuture { - return $crate::listen(addr, move || Ok(__tarpc_service_AsyncServer(self.clone()))); + fn listen_with(self, + addr: ::std::net::SocketAddr, + handle: $crate::tokio_core::reactor::Handle) + -> ::std::io::Result<::std::net::SocketAddr> + { + return $crate::listen_with(addr, + move || Ok(__tarpc_service_AsyncServer(self.clone())), + handle); #[allow(non_camel_case_types)] #[derive(Clone)] @@ -521,19 +537,29 @@ macro_rules! service { /// Provides a function for starting the service. This is a separate trait from /// `SyncService` to prevent collisions with the names of RPCs. pub trait SyncServiceExt: SyncService { + fn listen(self, addr: L) + -> ::std::io::Result<::std::net::SocketAddr> + where L: ::std::net::ToSocketAddrs + { + let addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?; + let (tx, rx) = $crate::futures::oneshot(); + $crate::REMOTE.spawn(move |handle| Ok(tx.complete(Self::listen_with(self, addr, handle.clone())))); + $crate::futures::Future::wait($crate::ListenFuture::from_oneshot(rx)) + } + /// Spawns the service, binding to the given address and running on /// the default tokio `Loop`. - fn listen(self, addr: L) + fn listen_with(self, addr: L, handle: $crate::tokio_core::reactor::Handle) -> ::std::io::Result<::std::net::SocketAddr> where L: ::std::net::ToSocketAddrs { let __tarpc_service_service = __SyncServer { service: self, }; - return $crate::futures::Future::wait( - FutureServiceExt::listen( - __tarpc_service_service, - $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?)); + return FutureServiceExt::listen_with( + __tarpc_service_service, + $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?, + handle); #[derive(Clone)] struct __SyncServer { diff --git a/src/server.rs b/src/server.rs index 2825c28..de2506e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -77,6 +77,13 @@ pub struct ListenFuture { inner: futures::Oneshot>, } +impl ListenFuture { + #[doc(hidden)] + pub fn from_oneshot(rx: futures::Oneshot>) -> Self { + ListenFuture { inner: rx } + } +} + impl Future for ListenFuture { type Item = SocketAddr; type Error = io::Error; From f6b16600924faeb767e8f6cc303e44290015d611 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Mon, 5 Dec 2016 15:32:26 -0800 Subject: [PATCH 04/13] Count requests in concurrency example --- examples/concurrency.rs | 41 ++++++++++++++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/examples/concurrency.rs b/examples/concurrency.rs index eb692d0..ff1de85 100644 --- a/examples/concurrency.rs +++ b/examples/concurrency.rs @@ -20,6 +20,8 @@ extern crate futures_cpupool; use clap::{Arg, App}; use futures::Future; use futures_cpupool::{CpuFuture, CpuPool}; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::{Duration, Instant, SystemTime}; use tarpc::future::{Connect}; use tarpc::util::{FirstSocketAddr, Never, spawn_core}; @@ -30,11 +32,17 @@ service! { } #[derive(Clone)] -struct Server(CpuPool); +struct Server { + pool: CpuPool, + request_count: Arc, +} impl Server { fn new() -> Self { - Server(CpuPool::new_num_cpus()) + Server { + pool: CpuPool::new_num_cpus(), + request_count: Arc::new(AtomicUsize::new(1)), + } } } @@ -42,13 +50,15 @@ impl FutureService for Server { type ReadFut = CpuFuture, Never>; fn read(&self, size: u32) -> Self::ReadFut { - debug!("Server received read({})", size); - self.0 + let request_number = self.request_count.fetch_add(1, Ordering::SeqCst); + debug!("Server received read({}) no. {}", size, request_number); + self.pool .spawn(futures::lazy(move || { let mut vec: Vec = Vec::with_capacity(size as usize); for i in 0..size { vec.push((i % 1 << 8) as u8); } + debug!("Server sending response no. {}", request_number); futures::finished(vec) })) } @@ -77,9 +87,14 @@ fn run_once(clients: Vec, concurrency: u32) -> impl Future Date: Tue, 6 Dec 2016 11:17:26 -0800 Subject: [PATCH 05/13] Change readme example to exhibit deadlock... --- examples/readme.rs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/examples/readme.rs b/examples/readme.rs index 3613332..cbf206d 100644 --- a/examples/readme.rs +++ b/examples/readme.rs @@ -9,9 +9,11 @@ extern crate futures; #[macro_use] extern crate tarpc; +extern crate tokio_core; +use futures::Future; use tarpc::util::Never; -use tarpc::sync::Connect; +use tarpc::future::Connect; service! { rpc hello(name: String) -> String; @@ -27,7 +29,18 @@ impl SyncService for HelloServer { } fn main() { + let mut core = tokio_core::reactor::Core::new().unwrap(); let addr = HelloServer.listen("localhost:10000").unwrap(); - let client = SyncClient::connect(addr).unwrap(); - println!("{}", client.hello("Mom".to_string()).unwrap()); + let f = FutureClient::connect(&addr) + .map_err(tarpc::Error::from) + .and_then(|client| { + let resp1 = client.hello("Mom".to_string()); + let resp2 = client.hello("Dad".to_string()); + futures::collect(vec![resp1, resp2]) + }).map(|responses| { + for resp in responses { + println!("{}", resp); + } + }); + core.run(f).unwrap(); } From 8a29aa29b2771f6917f8dff18e9e7ecff5b9994a Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Fri, 9 Dec 2016 22:23:05 -0800 Subject: [PATCH 06/13] Add an version of the readme where all macros are expanded (with slight modifications to make it prettier). --- examples/readme.rs | 11 +- examples/readme_expanded.rs | 289 ++++++++++++++++++++++++++++++++++++ 2 files changed, 299 insertions(+), 1 deletion(-) create mode 100644 examples/readme_expanded.rs diff --git a/examples/readme.rs b/examples/readme.rs index cbf206d..ae2f4de 100644 --- a/examples/readme.rs +++ b/examples/readme.rs @@ -6,8 +6,11 @@ #![feature(conservative_impl_trait, plugin)] #![plugin(tarpc_plugins)] +extern crate env_logger; extern crate futures; #[macro_use] +extern crate log; +#[macro_use] extern crate tarpc; extern crate tokio_core; @@ -24,19 +27,25 @@ struct HelloServer; impl SyncService for HelloServer { fn hello(&self, name: String) -> Result { + info!("Got request: {}", name); Ok(format!("Hello, {}!", name)) } } fn main() { + let _ = env_logger::init(); let mut core = tokio_core::reactor::Core::new().unwrap(); let addr = HelloServer.listen("localhost:10000").unwrap(); let f = FutureClient::connect(&addr) .map_err(tarpc::Error::from) .and_then(|client| { let resp1 = client.hello("Mom".to_string()); + info!("Sent first request."); + /* let resp2 = client.hello("Dad".to_string()); - futures::collect(vec![resp1, resp2]) + info!("Sent second request."); + */ + futures::collect(vec![resp1, /*resp2*/]) }).map(|responses| { for resp in responses { println!("{}", resp); diff --git a/examples/readme_expanded.rs b/examples/readme_expanded.rs new file mode 100644 index 0000000..e01f16f --- /dev/null +++ b/examples/readme_expanded.rs @@ -0,0 +1,289 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the MIT License, . +// This file may not be copied, modified, or distributed except according to those terms. + +#![feature(conservative_impl_trait, plugin, proc_macro)] +#![plugin(tarpc_plugins)] + +extern crate bincode; +extern crate env_logger; +extern crate futures; +#[macro_use] +extern crate log; +#[macro_use] +extern crate serde_derive; +#[macro_use] +extern crate tarpc; +extern crate tokio_core; +extern crate tokio_service; + +use bincode::serde::DeserializeError; +use futures::Future; +use std::io; +use std::net::{SocketAddr, ToSocketAddrs}; +use std::thread; +use tarpc::WireError; +use tarpc::future::Connect; +use tarpc::util::FirstSocketAddr; +use tarpc::util::Never; +use tokio_core::reactor::{Handle, Remote}; +use tokio_service::Service; + +#[derive(Debug, Serialize, Deserialize)] +enum Request { + Hello(String), +} + +#[derive(Debug, Serialize, Deserialize)] +enum Response { + Hello(String), +} + +#[derive(Debug, Serialize, Deserialize)] +enum Error { + Hello(Never), +} + +/// Defines the `Future` RPC service. Implementors must be `Clone`, `Send`, and `'static`, +/// as required by `tokio_proto::NewService`. This is required so that the service can be used +/// to respond to multiple requests concurrently. +pub trait FutureService: Send + Clone + 'static { + type HelloFut: Future; + fn hello(&self, name: String) -> Self::HelloFut; +} + +/// Provides a function for starting the service. This is a separate trait from +/// `FutureService` to prevent collisions with the names of RPCs. +pub trait FutureServiceExt: FutureService { + fn listen(self, addr: SocketAddr) -> tarpc::ListenFuture { + let (tx, rx) = futures::oneshot(); + tarpc::REMOTE.spawn(move |handle| Ok(tx.complete(Self::listen_with(self, addr, handle.clone())))); + tarpc::ListenFuture::from_oneshot(rx) + } + + /// Spawns the service, binding to the given address and running on the default tokio `Loop`. + fn listen_with(self, addr: SocketAddr, handle: Handle) -> io::Result { + return tarpc::listen_with(addr, move || Ok(AsyncServer(self.clone())), handle); + + #[derive(Clone, Debug)] + struct AsyncServer(S); + + type Fut = futures::Finished, io::Error>; + + enum FutureReply { + DeserializeError(Fut), + Hello(futures::Then) -> Fut>), + } + + impl Future for FutureReply { + type Item = tarpc::Response; + type Error = io::Error; + + fn poll(&mut self) -> futures::Poll { + match *self { + FutureReply::DeserializeError(ref mut future) => future.poll(), + FutureReply::Hello(ref mut future) => future.poll(), + } + } + } + + impl Service for AsyncServer + where S: FutureService + { + type Request = Result; + type Response = tarpc::Response; + type Error = io::Error; + type Future = FutureReply; + + fn call(&self, request: Self::Request) -> Self::Future { + let request = match request { + Ok(request) => request, + Err(deserialize_err) => { + let err = Err(WireError::ServerDeserialize(deserialize_err.to_string())); + return FutureReply::DeserializeError(futures::finished(err)); + } + }; + + match request { + Request::Hello(name) => { + fn wrap(response: Result) -> Fut { + let fut = response.map(Response::Hello) + .map_err(|error| WireError::App(Error::Hello(error))); + futures::finished(fut) + } + return FutureReply::Hello(self.0.hello(name).then(wrap)); + } + } + } + } + } +} + +/// Defines the blocking RPC service. Must be `Clone`, `Send`, and `'static`, +/// as required by `tokio_proto::NewService`. This is required so that the service can be used +/// to respond to multiple requests concurrently. +pub trait SyncService: Send + Clone + 'static { + fn hello(&self, name: String) -> Result; +} + +/// Provides a function for starting the service. This is a separate trait from +/// `SyncService` to prevent collisions with the names of RPCs. +pub trait SyncServiceExt: SyncService { + fn listen(self, addr: L) -> io::Result + where L: ToSocketAddrs + { + let addr = addr.try_first_socket_addr()?; + let (tx, rx) = futures::oneshot(); + tarpc::REMOTE.spawn(move |handle| { + Ok(tx.complete(Self::listen_with(self, addr, handle.clone()))) + }); + tarpc::ListenFuture::from_oneshot(rx).wait() + } + + /// Spawns the service, binding to the given address and running on + /// the default tokio `Loop`. + fn listen_with(self, addr: L, handle: Handle) -> io::Result + where L: ToSocketAddrs + { + let service = SyncServer { service: self }; + return service.listen_with(addr.try_first_socket_addr()?, handle); + + #[derive(Clone)] + struct SyncServer { + service: S, + } + + impl FutureService for SyncServer + where S: SyncService + { + type HelloFut = futures::Flatten>, fn(futures::Canceled) -> Never>>; + + fn hello(&self, name: String) -> Self::HelloFut { + fn unimplemented(_: futures::Canceled) -> Never { + unimplemented!() + } + + let (complete, promise) = futures::oneshot(); + let service = self.clone(); + const UNIMPLEMENTED: fn(futures::Canceled) -> Never = unimplemented; + thread::spawn(move || { + let reply = SyncService::hello(&service.service, name); + complete.complete(futures::IntoFuture::into_future(reply)); + }); + promise.map_err(UNIMPLEMENTED).flatten() + } + } + } +} +impl FutureServiceExt for A where A: FutureService {} +impl SyncServiceExt for S where S: SyncService {} + +type Client = tarpc::Client; + +/// Implementation detail: Pending connection. +pub struct ConnectFuture { + inner: futures::Map, fn(Client) -> T>, +} + +impl Future for ConnectFuture { + type Item = T; + type Error = io::Error; + + fn poll(&mut self) -> futures::Poll { + self.inner.poll() + } +} + +/// Implementation detail: Pending connection. +pub struct ConnectWithFuture<'a, T> { + inner: futures::Map, fn(Client) -> T>, +} + +impl<'a, T> Future for ConnectWithFuture<'a, T> { + type Item = T; + type Error = io::Error; + fn poll(&mut self) -> futures::Poll { + self.inner.poll() + } +} + +/// The client stub that makes RPC calls to the server. Exposes a Future interface. +#[derive(Debug)] +pub struct FutureClient(Client); + +impl<'a> tarpc::future::Connect<'a> for FutureClient { + type ConnectFut = ConnectFuture; + type ConnectWithFut = ConnectWithFuture<'a, Self>; + + fn connect_remotely(addr: &SocketAddr, remote: &Remote) -> Self::ConnectFut { + let client = Client::connect_remotely(addr, remote); + ConnectFuture { inner: client.map(FutureClient) } + } + + fn connect_with(addr: &SocketAddr, handle: &'a Handle) -> Self::ConnectWithFut { + let client = Client::connect_with(addr, handle); + ConnectWithFuture { inner: client.map(FutureClient) } + } +} + +impl FutureClient { + pub fn hello(&self, name: String) + -> impl Future> + 'static + { + let request = Request::Hello(name); + + self.0.call(request).then(move |msg| { + match msg? { + Ok(Response::Hello(msg)) => Ok(msg), + Err(err) => { + Err(match err { + tarpc::Error::App(Error::Hello(err)) => tarpc::Error::App(err), + tarpc::Error::ServerDeserialize(err) => { + tarpc::Error::ServerDeserialize(err) + } + tarpc::Error::ServerSerialize(err) => tarpc::Error::ServerSerialize(err), + tarpc::Error::ClientDeserialize(err) => { + tarpc::Error::ClientDeserialize(err) + } + tarpc::Error::ClientSerialize(err) => tarpc::Error::ClientSerialize(err), + tarpc::Error::Io(error) => tarpc::Error::Io(error), + }) + } + } + }) + } +} + +#[derive(Clone)] +struct HelloServer; + +impl SyncService for HelloServer { + fn hello(&self, name: String) -> Result { + info!("Got request: {}", name); + Ok(format!("Hello, {}!", name)) + } +} + +fn main() { + let _ = env_logger::init(); + let mut core = tokio_core::reactor::Core::new().unwrap(); + let addr = HelloServer.listen("localhost:10000").unwrap(); + let f = FutureClient::connect(&addr) + .map_err(tarpc::Error::from) + .and_then(|client| { + let resp1 = client.hello("Mom".to_string()); + info!("Sent first request."); + + let resp2 = client.hello("Dad".to_string()); + info!("Sent second request."); + + futures::collect(vec![resp1, resp2]) + }) + .map(|responses| { + for resp in responses { + println!("{}", resp); + } + }); + core.run(f).unwrap(); +} From 35f8aefb30835ce7a024bf8e768af9c92c4b71bb Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sun, 11 Dec 2016 16:43:41 -0800 Subject: [PATCH 07/13] Small refactor --- src/framed.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/framed.rs b/src/framed.rs index f4ad2ae..502b584 100644 --- a/src/framed.rs +++ b/src/framed.rs @@ -64,16 +64,16 @@ impl tokio_core::io::Codec for Codec match self.state { Id if buf.len() < mem::size_of::() => return Ok(None), Id => { - self.state = Len { id: Cursor::new(&*buf.get_mut()).read_u64::()? }; - *buf = buf.split_off(mem::size_of::()); + let id_buf = buf.drain_to(mem::size_of::()); + self.state = Len { id: Cursor::new(id_buf).read_u64::()? }; } Len { .. } if buf.len() < mem::size_of::() => return Ok(None), Len { id } => { + let len_buf = buf.drain_to(mem::size_of::()); self.state = Payload { id: id, - len: Cursor::new(&*buf.get_mut()).read_u64::()?, + len: Cursor::new(len_buf).read_u64::()?, }; - *buf = buf.split_off(mem::size_of::()); } Payload { len, .. } if buf.len() < len as usize => return Ok(None), Payload { id, .. } => { From be156f4d6bcdf0d056625acc0940356afeba78e4 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sun, 11 Dec 2016 17:23:43 -0800 Subject: [PATCH 08/13] Cut down size of readme_expanded. --- examples/readme_expanded.rs | 240 ++++-------------------------------- 1 file changed, 21 insertions(+), 219 deletions(-) diff --git a/examples/readme_expanded.rs b/examples/readme_expanded.rs index e01f16f..566e49b 100644 --- a/examples/readme_expanded.rs +++ b/examples/readme_expanded.rs @@ -19,256 +19,58 @@ extern crate tokio_core; extern crate tokio_service; use bincode::serde::DeserializeError; -use futures::Future; +use futures::{Future, IntoFuture}; use std::io; -use std::net::{SocketAddr, ToSocketAddrs}; -use std::thread; -use tarpc::WireError; +use std::net::SocketAddr; use tarpc::future::Connect; use tarpc::util::FirstSocketAddr; use tarpc::util::Never; -use tokio_core::reactor::{Handle, Remote}; use tokio_service::Service; -#[derive(Debug, Serialize, Deserialize)] -enum Request { - Hello(String), -} +#[derive(Clone, Copy)] +struct HelloServer; -#[derive(Debug, Serialize, Deserialize)] -enum Response { - Hello(String), -} - -#[derive(Debug, Serialize, Deserialize)] -enum Error { - Hello(Never), -} - -/// Defines the `Future` RPC service. Implementors must be `Clone`, `Send`, and `'static`, -/// as required by `tokio_proto::NewService`. This is required so that the service can be used -/// to respond to multiple requests concurrently. -pub trait FutureService: Send + Clone + 'static { - type HelloFut: Future; - fn hello(&self, name: String) -> Self::HelloFut; -} - -/// Provides a function for starting the service. This is a separate trait from -/// `FutureService` to prevent collisions with the names of RPCs. -pub trait FutureServiceExt: FutureService { - fn listen(self, addr: SocketAddr) -> tarpc::ListenFuture { - let (tx, rx) = futures::oneshot(); - tarpc::REMOTE.spawn(move |handle| Ok(tx.complete(Self::listen_with(self, addr, handle.clone())))); - tarpc::ListenFuture::from_oneshot(rx) - } - - /// Spawns the service, binding to the given address and running on the default tokio `Loop`. - fn listen_with(self, addr: SocketAddr, handle: Handle) -> io::Result { - return tarpc::listen_with(addr, move || Ok(AsyncServer(self.clone())), handle); - - #[derive(Clone, Debug)] - struct AsyncServer(S); - - type Fut = futures::Finished, io::Error>; - - enum FutureReply { - DeserializeError(Fut), - Hello(futures::Then) -> Fut>), - } - - impl Future for FutureReply { - type Item = tarpc::Response; - type Error = io::Error; - - fn poll(&mut self) -> futures::Poll { - match *self { - FutureReply::DeserializeError(ref mut future) => future.poll(), - FutureReply::Hello(ref mut future) => future.poll(), - } - } - } - - impl Service for AsyncServer - where S: FutureService - { - type Request = Result; - type Response = tarpc::Response; - type Error = io::Error; - type Future = FutureReply; - - fn call(&self, request: Self::Request) -> Self::Future { - let request = match request { - Ok(request) => request, - Err(deserialize_err) => { - let err = Err(WireError::ServerDeserialize(deserialize_err.to_string())); - return FutureReply::DeserializeError(futures::finished(err)); - } - }; - - match request { - Request::Hello(name) => { - fn wrap(response: Result) -> Fut { - let fut = response.map(Response::Hello) - .map_err(|error| WireError::App(Error::Hello(error))); - futures::finished(fut) - } - return FutureReply::Hello(self.0.hello(name).then(wrap)); - } - } - } - } - } -} - -/// Defines the blocking RPC service. Must be `Clone`, `Send`, and `'static`, -/// as required by `tokio_proto::NewService`. This is required so that the service can be used -/// to respond to multiple requests concurrently. -pub trait SyncService: Send + Clone + 'static { - fn hello(&self, name: String) -> Result; -} - -/// Provides a function for starting the service. This is a separate trait from -/// `SyncService` to prevent collisions with the names of RPCs. -pub trait SyncServiceExt: SyncService { - fn listen(self, addr: L) -> io::Result - where L: ToSocketAddrs - { - let addr = addr.try_first_socket_addr()?; +impl HelloServer { + fn listen(addr: SocketAddr) -> impl Future { let (tx, rx) = futures::oneshot(); tarpc::REMOTE.spawn(move |handle| { - Ok(tx.complete(Self::listen_with(self, addr, handle.clone()))) + Ok(tx.complete(tarpc::listen_with(addr, move || Ok(HelloServer), handle.clone()))) }); - tarpc::ListenFuture::from_oneshot(rx).wait() - } - - /// Spawns the service, binding to the given address and running on - /// the default tokio `Loop`. - fn listen_with(self, addr: L, handle: Handle) -> io::Result - where L: ToSocketAddrs - { - let service = SyncServer { service: self }; - return service.listen_with(addr.try_first_socket_addr()?, handle); - - #[derive(Clone)] - struct SyncServer { - service: S, - } - - impl FutureService for SyncServer - where S: SyncService - { - type HelloFut = futures::Flatten>, fn(futures::Canceled) -> Never>>; - - fn hello(&self, name: String) -> Self::HelloFut { - fn unimplemented(_: futures::Canceled) -> Never { - unimplemented!() - } - - let (complete, promise) = futures::oneshot(); - let service = self.clone(); - const UNIMPLEMENTED: fn(futures::Canceled) -> Never = unimplemented; - thread::spawn(move || { - let reply = SyncService::hello(&service.service, name); - complete.complete(futures::IntoFuture::into_future(reply)); - }); - promise.map_err(UNIMPLEMENTED).flatten() - } - } + rx.map_err(|e| panic!(e)).and_then(|result| result) } } -impl FutureServiceExt for A where A: FutureService {} -impl SyncServiceExt for S where S: SyncService {} -type Client = tarpc::Client; - -/// Implementation detail: Pending connection. -pub struct ConnectFuture { - inner: futures::Map, fn(Client) -> T>, -} - -impl Future for ConnectFuture { - type Item = T; +impl Service for HelloServer { + type Request = Result; + type Response = tarpc::Response; type Error = io::Error; + type Future = Box, Error = io::Error>>; - fn poll(&mut self) -> futures::Poll { - self.inner.poll() - } -} - -/// Implementation detail: Pending connection. -pub struct ConnectWithFuture<'a, T> { - inner: futures::Map, fn(Client) -> T>, -} - -impl<'a, T> Future for ConnectWithFuture<'a, T> { - type Item = T; - type Error = io::Error; - fn poll(&mut self) -> futures::Poll { - self.inner.poll() + fn call(&self, request: Self::Request) -> Self::Future { + Ok(Ok(format!("Hello, {}!", request.unwrap()))).into_future().boxed() } } /// The client stub that makes RPC calls to the server. Exposes a Future interface. #[derive(Debug)] -pub struct FutureClient(Client); - -impl<'a> tarpc::future::Connect<'a> for FutureClient { - type ConnectFut = ConnectFuture; - type ConnectWithFut = ConnectWithFuture<'a, Self>; - - fn connect_remotely(addr: &SocketAddr, remote: &Remote) -> Self::ConnectFut { - let client = Client::connect_remotely(addr, remote); - ConnectFuture { inner: client.map(FutureClient) } - } - - fn connect_with(addr: &SocketAddr, handle: &'a Handle) -> Self::ConnectWithFut { - let client = Client::connect_with(addr, handle); - ConnectWithFuture { inner: client.map(FutureClient) } - } -} +pub struct FutureClient(tarpc::Client); impl FutureClient { + fn connect(addr: &SocketAddr) -> impl Future { + tarpc::Client::connect_remotely(addr, &tarpc::REMOTE).map(FutureClient) + } + pub fn hello(&self, name: String) -> impl Future> + 'static { - let request = Request::Hello(name); - - self.0.call(request).then(move |msg| { - match msg? { - Ok(Response::Hello(msg)) => Ok(msg), - Err(err) => { - Err(match err { - tarpc::Error::App(Error::Hello(err)) => tarpc::Error::App(err), - tarpc::Error::ServerDeserialize(err) => { - tarpc::Error::ServerDeserialize(err) - } - tarpc::Error::ServerSerialize(err) => tarpc::Error::ServerSerialize(err), - tarpc::Error::ClientDeserialize(err) => { - tarpc::Error::ClientDeserialize(err) - } - tarpc::Error::ClientSerialize(err) => tarpc::Error::ClientSerialize(err), - tarpc::Error::Io(error) => tarpc::Error::Io(error), - }) - } - } - }) - } -} - -#[derive(Clone)] -struct HelloServer; - -impl SyncService for HelloServer { - fn hello(&self, name: String) -> Result { - info!("Got request: {}", name); - Ok(format!("Hello, {}!", name)) + self.0.call(name).then(|msg| msg.unwrap()) } } fn main() { let _ = env_logger::init(); let mut core = tokio_core::reactor::Core::new().unwrap(); - let addr = HelloServer.listen("localhost:10000").unwrap(); + let addr = HelloServer::listen("localhost:10000".first_socket_addr()).wait().unwrap(); let f = FutureClient::connect(&addr) .map_err(tarpc::Error::from) .and_then(|client| { From f2bf1adf8b8ea784790cd74d04e173dbe9ff6139 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Fri, 16 Dec 2016 14:15:31 -0800 Subject: [PATCH 09/13] Fix bug wherein the Codec was clearing the buf after decoding a message. Don't do thatgit stash pop! --- src/framed.rs | 42 +++++++++++++++++++++++++++++------------- src/lib.rs | 2 +- src/util.rs | 15 +++++++++++++++ 3 files changed, 45 insertions(+), 14 deletions(-) diff --git a/src/framed.rs b/src/framed.rs index 502b584..3437561 100644 --- a/src/framed.rs +++ b/src/framed.rs @@ -12,7 +12,7 @@ use std::mem; use tokio_core::io::{EasyBuf, Framed, Io}; use tokio_proto::streaming::multiplex::{self, RequestId}; use tokio_proto::multiplex::{ClientProto, ServerProto}; -use util::Never; +use util::{Debugger, Never}; /// The type of message sent and received by the transport. pub type Frame = multiplex::Frame; @@ -48,44 +48,60 @@ impl tokio_core::io::Codec for Codec fn encode(&mut self, (id, message): Self::Out, buf: &mut Vec) -> io::Result<()> { buf.write_u64::(id).unwrap(); + trace!("Encoded request id = {} as {:?}", id, buf); buf.write_u64::(bincode::serialized_size(&message)).unwrap(); bincode::serialize_into(buf, &message, SizeLimit::Infinite) // TODO(tikue): handle err .expect("In bincode::serialize_into"); + trace!("Encoded buffer: {:?}", buf); Ok(()) } fn decode(&mut self, buf: &mut EasyBuf) -> Result, io::Error> { use self::CodecState::*; + trace!("Codec::decode: {:?}", buf.as_slice()); loop { match self.state { - Id if buf.len() < mem::size_of::() => return Ok(None), - Id => { - let id_buf = buf.drain_to(mem::size_of::()); - self.state = Len { id: Cursor::new(id_buf).read_u64::()? }; + Id if buf.len() < mem::size_of::() => { + trace!("--> Buf len is {}; waiting for 8 to parse id.", buf.len()); + return Ok(None) + } + Id => { + let mut id_buf = buf.drain_to(mem::size_of::()); + let id = Cursor::new(&mut id_buf).read_u64::()?; + trace!("--> Parsed id = {} from {:?}", id, id_buf.as_slice()); + self.state = Len { id: id }; + } + Len { .. } if buf.len() < mem::size_of::() => { + trace!("--> Buf len is {}; waiting for 8 to parse packet length.", buf.len()); + return Ok(None) } - Len { .. } if buf.len() < mem::size_of::() => return Ok(None), Len { id } => { let len_buf = buf.drain_to(mem::size_of::()); + let len = Cursor::new(len_buf).read_u64::()?; + trace!("--> Parsed payload length = {}, remaining buffer length = {}", + len, buf.len()); self.state = Payload { id: id, - len: Cursor::new(len_buf).read_u64::()?, + len: len, }; } - Payload { len, .. } if buf.len() < len as usize => return Ok(None), - Payload { id, .. } => { - let mut buf = buf.get_mut(); - let result = bincode::deserialize_from(&mut Cursor::new(&mut *buf), + Payload { len, .. } if buf.len() < len as usize => { + trace!("--> Buf len is {}; waiting for {} to parse payload.", buf.len(), len); + return Ok(None) + } + Payload { id, len } => { + let payload = buf.drain_to(len as usize); + let result = bincode::deserialize_from(&mut Cursor::new(payload), SizeLimit::Infinite); - // Clear any unread bytes so we don't read garbage on next request. - buf.clear(); // Reset the state machine because, either way, we're done processing this // message. self.state = Id; + trace!("--> Parsed message: {:?}", Debugger(&result)); return Ok(Some((id, result))); } } diff --git a/src/lib.rs b/src/lib.rs index 6aa202a..abc14e0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,7 +59,7 @@ //! ``` //! #![deny(missing_docs)] -#![feature(plugin, conservative_impl_trait, never_type, proc_macro, unboxed_closures, fn_traits)] +#![feature(plugin, conservative_impl_trait, never_type, proc_macro, unboxed_closures, fn_traits, specialization)] #![plugin(tarpc_plugins)] extern crate byteorder; diff --git a/src/util.rs b/src/util.rs index f6dd550..6fd9f2e 100644 --- a/src/util.rs +++ b/src/util.rs @@ -136,3 +136,18 @@ pub fn spawn_core() -> reactor::Remote { }); rx.recv().unwrap() } + +/// A struct that will format as the contained type if the type impls Debug. +pub struct Debugger<'a, T: 'a>(pub &'a T); + +impl<'a, T: fmt::Debug> fmt::Debug for Debugger<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "{:?}", self.0) + } +} + +impl<'a, T> fmt::Debug for Debugger<'a, T> { + default fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "{{not debuggable}}") + } +} From bdd6737914887b6f276790a8fc3334a2b08677a0 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Fri, 16 Dec 2016 14:22:25 -0800 Subject: [PATCH 10/13] Add a test for concurrent requests --- src/macros.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/macros.rs b/src/macros.rs index 6c2cb39..5aa8849 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -885,6 +885,19 @@ mod functional_test { assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap()); } + #[test] + fn concurrent() { + let _ = env_logger::init(); + let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); + let client = FutureClient::connect(&addr).wait().unwrap(); + let req1 = client.add(1, 2); + let req2 = client.add(3, 4); + let req3 = client.hey("Tim".to_string()); + assert_eq!(3, req1.wait().unwrap()); + assert_eq!(7, req2.wait().unwrap()); + assert_eq!("Hey, Tim.", req3.wait().unwrap()); + } + #[test] fn other_service() { let _ = env_logger::init(); From 200407a4c9d0b0c0a5929c1508749c41a9a36872 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sun, 8 Jan 2017 17:30:31 -0800 Subject: [PATCH 11/13] Make connection backlog arg to listen a const --- src/server.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/server.rs b/src/server.rs index de2506e..7b0a722 100644 --- a/src/server.rs +++ b/src/server.rs @@ -60,13 +60,15 @@ pub fn listen_with(addr: SocketAddr, fn listener(addr: &SocketAddr, handle: &Handle) -> io::Result { + const PENDING_CONNECTION_BACKLOG = 1024; + match *addr { SocketAddr::V4(_) => net2::TcpBuilder::new_v4(), SocketAddr::V6(_) => net2::TcpBuilder::new_v6() }? .reuse_address(true)? .bind(addr)? - .listen(1024) + .listen(PENDING_CONNECTION_BACKLOG) .and_then(|l| { TcpListener::from_listener(l, addr, handle) }) From ef41d4349c7a8f9886422d83a13a33654d86c6d2 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sun, 8 Jan 2017 17:34:44 -0800 Subject: [PATCH 12/13] Make connection backlog arg to listen a const --- src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.rs b/src/server.rs index 7b0a722..6f02464 100644 --- a/src/server.rs +++ b/src/server.rs @@ -60,7 +60,7 @@ pub fn listen_with(addr: SocketAddr, fn listener(addr: &SocketAddr, handle: &Handle) -> io::Result { - const PENDING_CONNECTION_BACKLOG = 1024; + const PENDING_CONNECTION_BACKLOG: i32 = 1024; match *addr { SocketAddr::V4(_) => net2::TcpBuilder::new_v4(), From 3719564efc71f9f3e881440c942d08d75a403362 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sun, 8 Jan 2017 17:38:05 -0800 Subject: [PATCH 13/13] Fix stale comment --- src/framed.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/framed.rs b/src/framed.rs index 3437561..6791753 100644 --- a/src/framed.rs +++ b/src/framed.rs @@ -18,7 +18,7 @@ use util::{Debugger, Never}; pub type Frame = multiplex::Frame; -// `T` is the type that `Codec` parses. +// `Req` is the type that `Codec` parses. `Resp` is the type it serializes. pub struct Codec { state: CodecState, _phantom_data: PhantomData<(Req, Resp)>,