diff --git a/Cargo.toml b/Cargo.toml index d5cd548..f3b8df8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,15 +25,17 @@ take = "0.1" tokio-service = { git = "https://github.com/tokio-rs/tokio-service" } tokio-proto = { git = "https://github.com/tokio-rs/tokio-proto" } tokio-core = { git = "https://github.com/tokio-rs/tokio-core" } +net2 = "0.2" [replace] -"tokio-core:0.1.0" = { git = "https://github.com/tokio-rs/tokio-core" } -"futures:0.1.3" = { git = "https://github.com/alexcrichton/futures-rs" } +"tokio-core:0.1.1" = { git = "https://github.com/tokio-rs/tokio-core" } +"futures:0.1.6" = { git = "https://github.com/alexcrichton/futures-rs" } [dev-dependencies] chrono = "0.2" env_logger = "0.3" futures-cpupool = "0.1" +clap = "2.0" [features] unstable = ["serde/unstable"] diff --git a/examples/concurrency.rs b/examples/concurrency.rs index d6a8611..ff1de85 100644 --- a/examples/concurrency.rs +++ b/examples/concurrency.rs @@ -7,6 +7,7 @@ #![plugin(tarpc_plugins)] extern crate chrono; +extern crate clap; extern crate env_logger; extern crate futures; #[macro_use] @@ -16,8 +17,11 @@ extern crate tarpc; extern crate tokio_core; extern crate futures_cpupool; +use clap::{Arg, App}; use futures::Future; use futures_cpupool::{CpuFuture, CpuPool}; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::{Duration, Instant, SystemTime}; use tarpc::future::{Connect}; use tarpc::util::{FirstSocketAddr, Never, spawn_core}; @@ -28,11 +32,17 @@ service! { } #[derive(Clone)] -struct Server(CpuPool); +struct Server { + pool: CpuPool, + request_count: Arc, +} impl Server { fn new() -> Self { - Server(CpuPool::new_num_cpus()) + Server { + pool: CpuPool::new_num_cpus(), + request_count: Arc::new(AtomicUsize::new(1)), + } } } @@ -40,52 +50,21 @@ impl FutureService for Server { type ReadFut = CpuFuture, Never>; fn read(&self, size: u32) -> Self::ReadFut { - self.0 + let request_number = self.request_count.fetch_add(1, Ordering::SeqCst); + debug!("Server received read({}) no. {}", size, request_number); + self.pool .spawn(futures::lazy(move || { let mut vec: Vec = Vec::with_capacity(size as usize); for i in 0..size { vec.push((i % 1 << 8) as u8); } - futures::finished::<_, Never>(vec) + debug!("Server sending response no. {}", request_number); + futures::finished(vec) })) } } -fn run_once<'a>(clients: &'a [FutureClient], concurrency: u32, print: bool) - -> Box + 'a> -{ - let start = Instant::now(); - let futs = clients.iter() - .cycle() - .take(concurrency as usize) - .map(|client| { - let start = SystemTime::now(); - let future = client.read(CHUNK_SIZE).map(move |_| start.elapsed().unwrap()); - future - }) - // Need an intermediate collection to kick off each future, - // because futures::collect will iterate sequentially. - .collect::>(); - let futs = futures::collect(futs); - - Box::new(futs.map(move |latencies| { - let total_time = start.elapsed(); - - let sum_latencies = latencies.iter().fold(Duration::new(0, 0), |sum, &dur| sum + dur); - let mean = sum_latencies / latencies.len() as u32; - let min_latency = *latencies.iter().min().unwrap(); - let max_latency = *latencies.iter().max().unwrap(); - - if print { - println!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs", - latencies.len(), - mean.microseconds(), - min_latency.microseconds(), - max_latency.microseconds(), - total_time.microseconds()); - } - }).map_err(|e| panic!(e))) -} +const CHUNK_SIZE: u32 = 1 << 10; trait Microseconds { fn microseconds(&self) -> i64; @@ -100,33 +79,93 @@ impl Microseconds for Duration { } } -const CHUNK_SIZE: u32 = 1 << 10; -const MAX_CONCURRENCY: u32 = 100; +fn run_once(clients: Vec, concurrency: u32) -> impl Future { + let start = Instant::now(); + let futs = clients.iter() + .enumerate() + .cycle() + .enumerate() + .take(concurrency as usize) + .map(|(iteration, (client_id, client))| { + let iteration = iteration + 1; + let start = SystemTime::now(); + debug!("Client {} reading (iteration {})...", client_id, iteration); + let future = client.read(CHUNK_SIZE).map(move |_| { + let elapsed = start.elapsed().unwrap(); + debug!("Client {} received reply (iteration {}).", client_id, iteration); + elapsed + }); + future + }) + // Need an intermediate collection to kick off each future, + // because futures::collect will iterate sequentially. + .collect::>(); + let futs = futures::collect(futs); + + futs.map(move |latencies| { + let total_time = start.elapsed(); + + let sum_latencies = latencies.iter().fold(Duration::new(0, 0), |sum, &dur| sum + dur); + let mean = sum_latencies / latencies.len() as u32; + let min_latency = *latencies.iter().min().unwrap(); + let max_latency = *latencies.iter().max().unwrap(); + + info!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs", + latencies.len(), + mean.microseconds(), + min_latency.microseconds(), + max_latency.microseconds(), + total_time.microseconds()); + }).map_err(|e| panic!(e)) +} fn main() { let _ = env_logger::init(); + let matches = App::new("Tarpc Concurrency") + .about("Demonstrates making concurrent requests to a tarpc service.") + .arg(Arg::with_name("concurrency") + .short("c") + .long("concurrency") + .value_name("LEVEL") + .help("Sets a custom concurrency level") + .takes_value(true)) + .arg(Arg::with_name("clients") + .short("n") + .long("num_clients") + .value_name("AMOUNT") + .help("How many clients to distribute requests between") + .takes_value(true)) + .get_matches(); + let concurrency = matches.value_of("concurrency") + .map(&str::parse) + .map(Result::unwrap) + .unwrap_or(10); + let num_clients = matches.value_of("clients") + .map(&str::parse) + .map(Result::unwrap) + .unwrap_or(4); - let server = Server::new().listen("localhost:0".first_socket_addr()).wait().unwrap(); - println!("Server listening on {}.", server.local_addr()); + let addr = Server::new().listen("localhost:0".first_socket_addr()).wait().unwrap(); + info!("Server listening on {}.", addr); - // The driver of the main future. - let mut core = reactor::Core::new().unwrap(); - - let clients = (1...5) + let clients = (0..num_clients) // Spin up a couple threads to drive the clients. .map(|i| (i, spawn_core())) .map(|(i, remote)| { - println!("Client {} connecting...", i); - FutureClient::connect_remotely(server.local_addr(), &remote) + info!("Client {} connecting...", i); + FutureClient::connect_remotely(&addr, &remote) .map_err(|e| panic!(e)) }) // Need an intermediate collection to connect the clients in parallel, // because `futures::collect` iterates sequentially. .collect::>(); - let clients = core.run(futures::collect(clients)).unwrap(); - println!("Starting..."); - let runs = ::std::iter::once(run_once(&clients, MAX_CONCURRENCY, false)) - .chain((1...MAX_CONCURRENCY).map(|concurrency| run_once(&clients, concurrency, true))); - core.run(futures::collect(runs)).unwrap(); + let run = futures::collect(clients).and_then(|clients| run_once(clients, concurrency)); + + info!("Starting..."); + + // The driver of the main future. + let mut core = reactor::Core::new().unwrap(); + + core.run(run).unwrap(); } diff --git a/examples/pubsub.rs b/examples/pubsub.rs index 0314577..92cdfee 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -56,13 +56,13 @@ impl subscriber::FutureService for Subscriber { } impl Subscriber { - fn new(id: u32) -> tokio::server::ServerHandle { + fn new(id: u32) -> SocketAddr { Subscriber { - id: id, - } - .listen("localhost:0".first_socket_addr()) - .wait() - .unwrap() + id: id, + } + .listen("localhost:0".first_socket_addr()) + .wait() + .unwrap() } } @@ -117,19 +117,18 @@ impl publisher::FutureService for Publisher { fn main() { let _ = env_logger::init(); - let publisher_server = Publisher::new() + let publisher_addr = Publisher::new() .listen("localhost:0".first_socket_addr()) .wait() .unwrap(); - let publisher_addr = publisher_server.local_addr(); let publisher_client = publisher::SyncClient::connect(publisher_addr).unwrap(); let subscriber1 = Subscriber::new(0); - publisher_client.subscribe(0, *subscriber1.local_addr()).unwrap(); + publisher_client.subscribe(0, subscriber1).unwrap(); let subscriber2 = Subscriber::new(1); - publisher_client.subscribe(1, *subscriber2.local_addr()).unwrap(); + publisher_client.subscribe(1, subscriber2).unwrap(); println!("Broadcasting..."); diff --git a/examples/readme.rs b/examples/readme.rs index a16c651..ae2f4de 100644 --- a/examples/readme.rs +++ b/examples/readme.rs @@ -6,12 +6,17 @@ #![feature(conservative_impl_trait, plugin)] #![plugin(tarpc_plugins)] +extern crate env_logger; extern crate futures; #[macro_use] +extern crate log; +#[macro_use] extern crate tarpc; +extern crate tokio_core; +use futures::Future; use tarpc::util::Never; -use tarpc::sync::Connect; +use tarpc::future::Connect; service! { rpc hello(name: String) -> String; @@ -22,13 +27,29 @@ struct HelloServer; impl SyncService for HelloServer { fn hello(&self, name: String) -> Result { + info!("Got request: {}", name); Ok(format!("Hello, {}!", name)) } } fn main() { - let addr = "localhost:10000"; - HelloServer.listen(addr).unwrap(); - let client = SyncClient::connect(addr).unwrap(); - println!("{}", client.hello("Mom".to_string()).unwrap()); + let _ = env_logger::init(); + let mut core = tokio_core::reactor::Core::new().unwrap(); + let addr = HelloServer.listen("localhost:10000").unwrap(); + let f = FutureClient::connect(&addr) + .map_err(tarpc::Error::from) + .and_then(|client| { + let resp1 = client.hello("Mom".to_string()); + info!("Sent first request."); + /* + let resp2 = client.hello("Dad".to_string()); + info!("Sent second request."); + */ + futures::collect(vec![resp1, /*resp2*/]) + }).map(|responses| { + for resp in responses { + println!("{}", resp); + } + }); + core.run(f).unwrap(); } diff --git a/examples/readme2.rs b/examples/readme2.rs index d7e964b..fd7e81e 100644 --- a/examples/readme2.rs +++ b/examples/readme2.rs @@ -49,8 +49,7 @@ impl SyncService for HelloServer { } fn main() { - let addr = "localhost:10000"; - HelloServer.listen(addr).unwrap(); + let addr = HelloServer.listen("localhost:10000").unwrap(); let client = SyncClient::connect(addr).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); println!("{}", client.hello("".to_string()).unwrap_err()); diff --git a/examples/readme_expanded.rs b/examples/readme_expanded.rs new file mode 100644 index 0000000..566e49b --- /dev/null +++ b/examples/readme_expanded.rs @@ -0,0 +1,91 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the MIT License, . +// This file may not be copied, modified, or distributed except according to those terms. + +#![feature(conservative_impl_trait, plugin, proc_macro)] +#![plugin(tarpc_plugins)] + +extern crate bincode; +extern crate env_logger; +extern crate futures; +#[macro_use] +extern crate log; +#[macro_use] +extern crate serde_derive; +#[macro_use] +extern crate tarpc; +extern crate tokio_core; +extern crate tokio_service; + +use bincode::serde::DeserializeError; +use futures::{Future, IntoFuture}; +use std::io; +use std::net::SocketAddr; +use tarpc::future::Connect; +use tarpc::util::FirstSocketAddr; +use tarpc::util::Never; +use tokio_service::Service; + +#[derive(Clone, Copy)] +struct HelloServer; + +impl HelloServer { + fn listen(addr: SocketAddr) -> impl Future { + let (tx, rx) = futures::oneshot(); + tarpc::REMOTE.spawn(move |handle| { + Ok(tx.complete(tarpc::listen_with(addr, move || Ok(HelloServer), handle.clone()))) + }); + rx.map_err(|e| panic!(e)).and_then(|result| result) + } +} + +impl Service for HelloServer { + type Request = Result; + type Response = tarpc::Response; + type Error = io::Error; + type Future = Box, Error = io::Error>>; + + fn call(&self, request: Self::Request) -> Self::Future { + Ok(Ok(format!("Hello, {}!", request.unwrap()))).into_future().boxed() + } +} + +/// The client stub that makes RPC calls to the server. Exposes a Future interface. +#[derive(Debug)] +pub struct FutureClient(tarpc::Client); + +impl FutureClient { + fn connect(addr: &SocketAddr) -> impl Future { + tarpc::Client::connect_remotely(addr, &tarpc::REMOTE).map(FutureClient) + } + + pub fn hello(&self, name: String) + -> impl Future> + 'static + { + self.0.call(name).then(|msg| msg.unwrap()) + } +} + +fn main() { + let _ = env_logger::init(); + let mut core = tokio_core::reactor::Core::new().unwrap(); + let addr = HelloServer::listen("localhost:10000".first_socket_addr()).wait().unwrap(); + let f = FutureClient::connect(&addr) + .map_err(tarpc::Error::from) + .and_then(|client| { + let resp1 = client.hello("Mom".to_string()); + info!("Sent first request."); + + let resp2 = client.hello("Dad".to_string()); + info!("Sent second request."); + + futures::collect(vec![resp1, resp2]) + }) + .map(|responses| { + for resp in responses { + println!("{}", resp); + } + }); + core.run(f).unwrap(); +} diff --git a/examples/readme_future.rs b/examples/readme_future.rs index 103568a..a0bb26f 100644 --- a/examples/readme_future.rs +++ b/examples/readme_future.rs @@ -10,6 +10,7 @@ extern crate futures; #[macro_use] extern crate tarpc; +use futures::Future; use tarpc::util::{FirstSocketAddr, Never}; use tarpc::sync::Connect; @@ -29,8 +30,7 @@ impl FutureService for HelloServer { } fn main() { - let addr = "localhost:10000"; - let _server = HelloServer.listen(addr.first_socket_addr()); + let addr = HelloServer.listen("localhost:10000".first_socket_addr()).wait().unwrap(); let client = SyncClient::connect(addr).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); } diff --git a/examples/server_calling_server.rs b/examples/server_calling_server.rs index a9d7ffe..120a5f2 100644 --- a/examples/server_calling_server.rs +++ b/examples/server_calling_server.rs @@ -74,12 +74,13 @@ impl DoubleFutureService for DoubleServer { fn main() { let _ = env_logger::init(); - let add = AddServer.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let add_client = add::FutureClient::connect(add.local_addr()).wait().unwrap(); - let double = DoubleServer::new(add_client); - let double = double.listen("localhost:0".first_socket_addr()).wait().unwrap(); + let add_addr = AddServer.listen("localhost:0".first_socket_addr()).wait().unwrap(); + let add_client = add::FutureClient::connect(&add_addr).wait().unwrap(); - let double_client = double::SyncClient::connect(double.local_addr()).unwrap(); + let double = DoubleServer::new(add_client); + let double_addr = double.listen("localhost:0".first_socket_addr()).wait().unwrap(); + + let double_client = double::SyncClient::connect(&double_addr).unwrap(); for i in 0..5 { println!("{:?}", double_client.double(i).unwrap()); } diff --git a/examples/throughput.rs b/examples/throughput.rs index f82b5cb..ad63fc6 100644 --- a/examples/throughput.rs +++ b/examples/throughput.rs @@ -52,8 +52,8 @@ impl FutureService for Server { const CHUNK_SIZE: u32 = 1 << 19; fn bench_tarpc(target: u64) { - let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let client = SyncClient::connect(handle.local_addr()).unwrap(); + let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); + let client = SyncClient::connect(&addr).unwrap(); let start = time::Instant::now(); let mut nread = 0; while nread < target { diff --git a/examples/two_clients.rs b/examples/two_clients.rs index b26e0d0..c8a06b1 100644 --- a/examples/two_clients.rs +++ b/examples/two_clients.rs @@ -58,10 +58,11 @@ macro_rules! pos { fn main() { let _ = env_logger::init(); - let bar = Bar.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let baz = Baz.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let bar_client = bar::SyncClient::connect(bar.local_addr()).unwrap(); - let baz_client = baz::SyncClient::connect(baz.local_addr()).unwrap(); + let bar_addr = Bar.listen("localhost:0".first_socket_addr()).wait().unwrap(); + let baz_addr = Baz.listen("localhost:0".first_socket_addr()).wait().unwrap(); + + let bar_client = bar::SyncClient::connect(&bar_addr).unwrap(); + let baz_client = baz::SyncClient::connect(&baz_addr).unwrap(); info!("Result: {:?}", bar_client.bar(17)); diff --git a/src/client.rs b/src/client.rs index c17f0e9..b902bb9 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,57 +3,62 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use WireError; +use {WireError, framed}; use bincode::serde::DeserializeError; -use framed::Framed; -use futures::{self, Async, Future}; +use futures::{self, Future}; use serde::{Deserialize, Serialize}; use std::fmt; use std::io; use tokio_core::net::TcpStream; -use tokio_core::reactor; -use tokio_proto::easy::{EasyClient, EasyResponse, multiplex}; +use tokio_proto::BindClient as ProtoBindClient; +use tokio_proto::multiplex::Multiplex; use tokio_service::Service; +type WireResponse = Result>, DeserializeError>; +type ResponseFuture = futures::Map< as Service>::Future, + fn(WireResponse) -> Result>>; +type BindClient = + >> as ProtoBindClient>::BindClient; + /// A client that impls `tokio_service::Service` that writes and reads bytes. /// /// Typically, this would be combined with a serialization pre-processing step /// and a deserialization post-processing step. -pub struct Client { - inner: EasyClient>, +pub struct Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static, +{ + inner: BindClient, } -type WireResponse = Result>, DeserializeError>; -type ResponseFuture = futures::Map>, - fn(WireResponse) -> Result>>; - impl Service for Client - where Req: Send + 'static, - Resp: Send + 'static, - E: Send + 'static + where Req: Serialize + Sync + Send + 'static, + Resp: Deserialize + Sync + Send + 'static, + E: Deserialize + Sync + Send + 'static { type Request = Req; type Response = Result>; type Error = io::Error; - type Future = ResponseFuture; - - fn poll_ready(&self) -> Async<()> { - self.inner.poll_ready() - } + type Future = ResponseFuture; fn call(&self, request: Self::Request) -> Self::Future { self.inner.call(request).map(Self::map_err) } } -impl Client { - fn new(tcp: TcpStream, handle: &reactor::Handle) -> Self - where Req: Serialize + Send + 'static, - Resp: Deserialize + Send + 'static, - E: Deserialize + Send + 'static +impl Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static, +{ + fn new(inner: BindClient) -> Self + where Req: Serialize + Sync + Send + 'static, + Resp: Deserialize + Sync + Send + 'static, + E: Deserialize + Sync + Send + 'static { Client { - inner: multiplex::connect(Framed::new(tcp), handle), + inner: inner, } } @@ -64,7 +69,11 @@ impl Client { } } -impl fmt::Debug for Client { +impl fmt::Debug for Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static, +{ fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { write!(f, "Client {{ .. }}") } @@ -72,7 +81,7 @@ impl fmt::Debug for Client { /// Exposes a trait for connecting asynchronously to servers. pub mod future { - use REMOTE; + use {REMOTE, framed}; use futures::{self, Async, Future}; use serde::{Deserialize, Serialize}; use std::io; @@ -81,6 +90,7 @@ pub mod future { use super::Client; use tokio_core::net::TcpStream; use tokio_core::{self, reactor}; + use tokio_proto::BindClient; /// Types that can connect to a server asynchronously. pub trait Connect<'a>: Sized { @@ -104,11 +114,19 @@ pub mod future { } /// A future that resolves to a `Client` or an `io::Error`. - pub struct ConnectFuture { + pub struct ConnectFuture + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static, + { inner: futures::Oneshot>>, } - impl Future for ConnectFuture { + impl Future for ConnectFuture + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static, + { type Item = Client; type Error = io::Error; @@ -129,9 +147,9 @@ pub mod future { } impl<'a, Req, Resp, E> Future for ConnectWithFuture<'a, Req, Resp, E> - where Req: Serialize + Send + 'static, - Resp: Deserialize + Send + 'static, - E: Deserialize + Send + 'static + where Req: Serialize + Sync + Send + 'static, + Resp: Deserialize + Sync + Send + 'static, + E: Deserialize + Sync + Send + 'static { type Item = Client; type Error = io::Error; @@ -150,21 +168,21 @@ pub mod future { } impl<'a, Req, Resp, E> FnOnce<(TcpStream,)> for MultiplexConnect<'a, Req, Resp, E> - where Req: Serialize + Send + 'static, - Resp: Deserialize + Send + 'static, - E: Deserialize + Send + 'static + where Req: Serialize + Sync + Send + 'static, + Resp: Deserialize + Sync + Send + 'static, + E: Deserialize + Sync + Send + 'static { type Output = Client; extern "rust-call" fn call_once(self, (tcp,): (TcpStream,)) -> Client { - Client::new(tcp, self.0) + Client::new(framed::Proto::new().bind_client(self.0, tcp)) } } impl<'a, Req, Resp, E> Connect<'a> for Client - where Req: Serialize + Send + 'static, - Resp: Deserialize + Send + 'static, - E: Deserialize + Send + 'static + where Req: Serialize + Sync + Send + 'static, + Resp: Deserialize + Sync + Send + 'static, + E: Deserialize + Sync + Send + 'static { type ConnectFut = ConnectFuture; type ConnectWithFut = ConnectWithFuture<'a, Req, Resp, E>; @@ -175,7 +193,7 @@ pub mod future { remote.spawn(move |handle| { let handle2 = handle.clone(); TcpStream::connect(&addr, handle) - .map(move |tcp| Client::new(tcp, &handle2)) + .map(move |tcp| Client::new(framed::Proto::new().bind_client(&handle2, tcp))) .then(move |result| { tx.complete(result); Ok(()) @@ -207,9 +225,9 @@ pub mod sync { } impl Connect for Client - where Req: Serialize + Send + 'static, - Resp: Deserialize + Send + 'static, - E: Deserialize + Send + 'static + where Req: Serialize + Sync + Send + 'static, + Resp: Deserialize + Sync + Send + 'static, + E: Deserialize + Sync + Send + 'static { fn connect(addr: A) -> Result where A: ToSocketAddrs diff --git a/src/errors.rs b/src/errors.rs index 1668ec9..78bc44c 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -3,7 +3,7 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use {bincode, tokio_proto as proto}; +use bincode; use serde::{Deserialize, Serialize}; use std::{fmt, io}; use std::error::Error as StdError; @@ -75,15 +75,6 @@ impl StdError for Error { } } -impl From>> for Error { - fn from(err: proto::Error>) -> Self { - match err { - proto::Error::Transport(e) => e, - proto::Error::Io(e) => e.into(), - } - } -} - impl From for Error { fn from(err: io::Error) -> Self { Error::Io(err) diff --git a/src/framed.rs b/src/framed.rs index ae6481a..6791753 100644 --- a/src/framed.rs +++ b/src/framed.rs @@ -3,156 +3,157 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. +use {serde, tokio_core}; use bincode::{SizeLimit, serde as bincode}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; -use futures::{Async, Poll}; -use serde; use std::io::{self, Cursor}; use std::marker::PhantomData; use std::mem; -use tokio_core::easy::{self, EasyBuf, EasyFramed}; -use tokio_core::io::{FramedIo, Io}; -use tokio_proto::multiplex::{self, RequestId}; -use util::Never; - -/// Handles the IO of tarpc messages. Similar to `tokio_core::easy::EasyFramed` except that it -/// hardcodes a parser and serializer suitable for tarpc messages. -pub struct Framed { - inner: EasyFramed, Serializer>, -} - -impl Framed { - /// Constructs a new tarpc FramedIo - pub fn new(upstream: I) -> Framed - where I: Io, - In: serde::Serialize, - Out: serde::Deserialize - { - Framed { inner: EasyFramed::new(upstream, Parser::new(), Serializer::new()) } - } -} +use tokio_core::io::{EasyBuf, Framed, Io}; +use tokio_proto::streaming::multiplex::{self, RequestId}; +use tokio_proto::multiplex::{ClientProto, ServerProto}; +use util::{Debugger, Never}; /// The type of message sent and received by the transport. pub type Frame = multiplex::Frame; -impl FramedIo for Framed - where I: Io, - In: serde::Serialize, - Out: serde::Deserialize -{ - type In = (RequestId, In); - type Out = Option<(RequestId, Result)>; - fn poll_read(&mut self) -> Async<()> { - self.inner.poll_read() - } - - fn poll_write(&mut self) -> Async<()> { - self.inner.poll_write() - } - - fn read(&mut self) -> Poll { - self.inner.read() - } - - fn write(&mut self, req: Self::In) -> Poll<(), io::Error> { - self.inner.write(req) - } - - fn flush(&mut self) -> Poll<(), io::Error> { - self.inner.flush() - } +// `Req` is the type that `Codec` parses. `Resp` is the type it serializes. +pub struct Codec { + state: CodecState, + _phantom_data: PhantomData<(Req, Resp)>, } -// `T` is the type that `Parser` parses. -struct Parser { - state: ParserState, - _phantom_data: PhantomData, -} - -enum ParserState { +enum CodecState { Id, Len { id: u64 }, Payload { id: u64, len: u64 }, } -impl Parser { +impl Codec { fn new() -> Self { - Parser { - state: ParserState::Id, + Codec { + state: CodecState::Id, _phantom_data: PhantomData, } } } -impl easy::Parse for Parser - where T: serde::Deserialize +impl tokio_core::io::Codec for Codec + where Req: serde::Deserialize, + Resp: serde::Serialize, { - type Out = (RequestId, Result); + type Out = (RequestId, Resp); + type In = (RequestId, Result); - fn parse(&mut self, buf: &mut EasyBuf) -> Poll { - use self::ParserState::*; - - loop { - match self.state { - Id if buf.len() < mem::size_of::() => return Ok(Async::NotReady), - Id => { - self.state = Len { id: Cursor::new(&*buf.get_mut()).read_u64::()? }; - *buf = buf.split_off(mem::size_of::()); - } - Len { .. } if buf.len() < mem::size_of::() => return Ok(Async::NotReady), - Len { id } => { - self.state = Payload { - id: id, - len: Cursor::new(&*buf.get_mut()).read_u64::()?, - }; - *buf = buf.split_off(mem::size_of::()); - } - Payload { len, .. } if buf.len() < len as usize => return Ok(Async::NotReady), - Payload { id, .. } => { - let mut buf = buf.get_mut(); - let result = bincode::deserialize_from(&mut Cursor::new(&mut *buf), - SizeLimit::Infinite); - // Clear any unread bytes so we don't read garbage on next request. - buf.clear(); - // Reset the state machine because, either way, we're done processing this - // message. - self.state = Id; - - return Ok(Async::Ready((id, result))); - } - } - } - } -} - -struct Serializer(PhantomData); - -impl Serializer { - fn new() -> Self { - Serializer(PhantomData) - } -} - -impl easy::Serialize for Serializer - where T: serde::Serialize -{ - type In = (RequestId, T); - - fn serialize(&mut self, (id, message): Self::In, buf: &mut Vec) { + fn encode(&mut self, (id, message): Self::Out, buf: &mut Vec) -> io::Result<()> { buf.write_u64::(id).unwrap(); + trace!("Encoded request id = {} as {:?}", id, buf); buf.write_u64::(bincode::serialized_size(&message)).unwrap(); bincode::serialize_into(buf, &message, SizeLimit::Infinite) // TODO(tikue): handle err .expect("In bincode::serialize_into"); + trace!("Encoded buffer: {:?}", buf); + Ok(()) + } + + fn decode(&mut self, buf: &mut EasyBuf) -> Result, io::Error> { + use self::CodecState::*; + trace!("Codec::decode: {:?}", buf.as_slice()); + + loop { + match self.state { + Id if buf.len() < mem::size_of::() => { + trace!("--> Buf len is {}; waiting for 8 to parse id.", buf.len()); + return Ok(None) + } + Id => { + let mut id_buf = buf.drain_to(mem::size_of::()); + let id = Cursor::new(&mut id_buf).read_u64::()?; + trace!("--> Parsed id = {} from {:?}", id, id_buf.as_slice()); + self.state = Len { id: id }; + } + Len { .. } if buf.len() < mem::size_of::() => { + trace!("--> Buf len is {}; waiting for 8 to parse packet length.", buf.len()); + return Ok(None) + } + Len { id } => { + let len_buf = buf.drain_to(mem::size_of::()); + let len = Cursor::new(len_buf).read_u64::()?; + trace!("--> Parsed payload length = {}, remaining buffer length = {}", + len, buf.len()); + self.state = Payload { + id: id, + len: len, + }; + } + Payload { len, .. } if buf.len() < len as usize => { + trace!("--> Buf len is {}; waiting for {} to parse payload.", buf.len(), len); + return Ok(None) + } + Payload { id, len } => { + let payload = buf.drain_to(len as usize); + let result = bincode::deserialize_from(&mut Cursor::new(payload), + SizeLimit::Infinite); + // Reset the state machine because, either way, we're done processing this + // message. + self.state = Id; + + trace!("--> Parsed message: {:?}", Debugger(&result)); + return Ok(Some((id, result))); + } + } + } + } +} + +/// Implements the `multiplex::ServerProto` trait. +pub struct Proto(PhantomData<(Req, Resp)>); + +impl Proto { + /// Returns a new `Proto`. + pub fn new() -> Self { + Proto(PhantomData) + } +} + +impl ServerProto for Proto + where T: Io + 'static, + Req: serde::Deserialize + 'static, + Resp: serde::Serialize + 'static, +{ + type Response = Resp; + type Request = Result; + type Error = io::Error; + type Transport = Framed>; + type BindTransport = Result; + + fn bind_transport(&self, io: T) -> Self::BindTransport { + Ok(io.framed(Codec::new())) + } +} + +impl ClientProto for Proto + where T: Io + 'static, + Req: serde::Serialize + 'static, + Resp: serde::Deserialize + 'static, +{ + type Response = Result; + type Request = Req; + type Error = io::Error; + type Transport = Framed>; + type BindTransport = Result; + + fn bind_transport(&self, io: T) -> Self::BindTransport { + Ok(io.framed(Codec::new())) } } #[test] fn serialize() { - use tokio_core::easy::{Parse, Serialize}; + use tokio_core::io::Codec as TokioCodec; const MSG: (u64, (char, char, char)) = (4, ('a', 'b', 'c')); let mut buf = EasyBuf::new(); @@ -160,13 +161,14 @@ fn serialize() { // Serialize twice to check for idempotence. for _ in 0..2 { - Serializer::new().serialize(MSG, &mut vec); + let mut codec: Codec<(char, char, char), (char, char, char)> = Codec::new(); + codec.encode(MSG, &mut vec).unwrap(); buf.get_mut().append(&mut vec); - let actual: Poll<(u64, Result<(char, char, char), bincode::DeserializeError>), io::Error> = - Parser::new().parse(&mut buf); + let actual: Result)>, io::Error> = + codec.decode(&mut buf); match actual { - Ok(Async::Ready((id, ref v))) if id == MSG.0 && *v.as_ref().unwrap() == MSG.1 => {} + Ok(Some((id, ref v))) if id == MSG.0 && *v.as_ref().unwrap() == MSG.1 => {} bad => panic!("Expected {:?}, but got {:?}", Some(MSG), bad), } diff --git a/src/lib.rs b/src/lib.rs index 2e5846b..abc14e0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,7 +59,7 @@ //! ``` //! #![deny(missing_docs)] -#![feature(plugin, conservative_impl_trait, never_type, proc_macro, unboxed_closures, fn_traits)] +#![feature(plugin, conservative_impl_trait, never_type, proc_macro, unboxed_closures, fn_traits, specialization)] #![plugin(tarpc_plugins)] extern crate byteorder; @@ -68,6 +68,7 @@ extern crate bytes; extern crate lazy_static; #[macro_use] extern crate log; +extern crate net2; #[macro_use] extern crate serde_derive; extern crate take; @@ -95,8 +96,6 @@ pub use errors::{Error, SerializableError}; #[doc(hidden)] pub use errors::WireError; #[doc(hidden)] -pub use framed::Framed; -#[doc(hidden)] pub use server::{ListenFuture, Response, listen, listen_with}; /// Provides some utility error types, as well as a trait for spawning futures on the default event diff --git a/src/macros.rs b/src/macros.rs index d6ea8d9..5aa8849 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -393,10 +393,26 @@ macro_rules! service { /// Provides a function for starting the service. This is a separate trait from /// `FutureService` to prevent collisions with the names of RPCs. pub trait FutureServiceExt: FutureService { + fn listen(self, addr: ::std::net::SocketAddr) -> $crate::ListenFuture + { + let (tx, rx) = $crate::futures::oneshot(); + $crate::REMOTE.spawn(move |handle| + Ok(tx.complete(Self::listen_with(self, + addr, + handle.clone())))); + $crate::ListenFuture::from_oneshot(rx) + } + /// Spawns the service, binding to the given address and running on /// the default tokio `Loop`. - fn listen(self, addr: ::std::net::SocketAddr) -> $crate::ListenFuture { - return $crate::listen(addr, __tarpc_service_AsyncServer(self)); + fn listen_with(self, + addr: ::std::net::SocketAddr, + handle: $crate::tokio_core::reactor::Handle) + -> ::std::io::Result<::std::net::SocketAddr> + { + return $crate::listen_with(addr, + move || Ok(__tarpc_service_AsyncServer(self.clone())), + handle); #[allow(non_camel_case_types)] #[derive(Clone)] @@ -461,10 +477,6 @@ macro_rules! service { type Error = ::std::io::Error; type Future = __tarpc_service_FutureReply<__tarpc_service_S>; - fn poll_ready(&self) -> $crate::futures::Async<()> { - $crate::futures::Async::Ready(()) - } - fn call(&self, __tarpc_service_request: Self::Request) -> Self::Future { let __tarpc_service_request = match __tarpc_service_request { Ok(__tarpc_service_request) => __tarpc_service_request, @@ -525,18 +537,29 @@ macro_rules! service { /// Provides a function for starting the service. This is a separate trait from /// `SyncService` to prevent collisions with the names of RPCs. pub trait SyncServiceExt: SyncService { - /// Spawns the service, binding to the given address and running on - /// the default tokio `Loop`. fn listen(self, addr: L) - -> ::std::io::Result<$crate::tokio_proto::server::ServerHandle> + -> ::std::io::Result<::std::net::SocketAddr> where L: ::std::net::ToSocketAddrs { let addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?; + let (tx, rx) = $crate::futures::oneshot(); + $crate::REMOTE.spawn(move |handle| Ok(tx.complete(Self::listen_with(self, addr, handle.clone())))); + $crate::futures::Future::wait($crate::ListenFuture::from_oneshot(rx)) + } + + /// Spawns the service, binding to the given address and running on + /// the default tokio `Loop`. + fn listen_with(self, addr: L, handle: $crate::tokio_core::reactor::Handle) + -> ::std::io::Result<::std::net::SocketAddr> + where L: ::std::net::ToSocketAddrs + { let __tarpc_service_service = __SyncServer { service: self, }; - return $crate::futures::Future::wait( - FutureServiceExt::listen(__tarpc_service_service, addr)); + return FutureServiceExt::listen_with( + __tarpc_service_service, + $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?, + handle); #[derive(Clone)] struct __SyncServer { @@ -810,8 +833,8 @@ mod functional_test { #[test] fn simple() { let _ = env_logger::init(); - let handle = Server.listen("localhost:0".first_socket_addr()).unwrap(); - let client = SyncClient::connect(handle.local_addr()).unwrap(); + let addr = Server.listen("localhost:0".first_socket_addr()).unwrap(); + let client = SyncClient::connect(addr).unwrap(); assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap()); } @@ -819,8 +842,8 @@ mod functional_test { #[test] fn other_service() { let _ = env_logger::init(); - let handle = Server.listen("localhost:0".first_socket_addr()).unwrap(); - let client = super::other_service::SyncClient::connect(handle.local_addr()).unwrap(); + let addr = Server.listen("localhost:0".first_socket_addr()).unwrap(); + let client = super::other_service::SyncClient::connect(addr).expect("Could not connect!"); match client.foo().err().unwrap() { ::Error::ServerDeserialize(_) => {} // good bad => panic!("Expected Error::ServerDeserialize but got {}", bad), @@ -856,18 +879,31 @@ mod functional_test { #[test] fn simple() { let _ = env_logger::init(); - let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let client = FutureClient::connect(handle.local_addr()).wait().unwrap(); + let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); + let client = FutureClient::connect(&addr).wait().unwrap(); assert_eq!(3, client.add(1, 2).wait().unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap()); } + #[test] + fn concurrent() { + let _ = env_logger::init(); + let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); + let client = FutureClient::connect(&addr).wait().unwrap(); + let req1 = client.add(1, 2); + let req2 = client.add(3, 4); + let req3 = client.hey("Tim".to_string()); + assert_eq!(3, req1.wait().unwrap()); + assert_eq!(7, req2.wait().unwrap()); + assert_eq!("Hey, Tim.", req3.wait().unwrap()); + } + #[test] fn other_service() { let _ = env_logger::init(); - let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); + let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); let client = - super::other_service::FutureClient::connect(handle.local_addr()).wait().unwrap(); + super::other_service::FutureClient::connect(&addr).wait().unwrap(); match client.foo().wait().err().unwrap() { ::Error::ServerDeserialize(_) => {} // good bad => panic!(r#"Expected Error::ServerDeserialize but got "{}""#, bad), @@ -901,8 +937,8 @@ mod functional_test { use self::error_service::*; let _ = env_logger::init(); - let handle = ErrorServer.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let client = FutureClient::connect(handle.local_addr()).wait().unwrap(); + let addr = ErrorServer.listen("localhost:0".first_socket_addr()).wait().unwrap(); + let client = FutureClient::connect(&addr).wait().unwrap(); client.bar() .then(move |result| { match result.err().unwrap() { @@ -916,7 +952,7 @@ mod functional_test { .wait() .unwrap(); - let client = SyncClient::connect(handle.local_addr()).unwrap(); + let client = SyncClient::connect(&addr).unwrap(); match client.bar().err().unwrap() { ::Error::App(e) => { assert_eq!(e.description(), "lol jk"); diff --git a/src/plugins/src/lib.rs b/src/plugins/src/lib.rs index 0747c8d..95f22be 100644 --- a/src/plugins/src/lib.rs +++ b/src/plugins/src/lib.rs @@ -15,7 +15,7 @@ use syntax::ext::base::{ExtCtxt, MacResult, DummyResult, MacEager}; use syntax::ext::quote::rt::Span; use syntax::parse::{self, token, PResult}; use syntax::parse::parser::{Parser, PathStyle}; -use syntax::parse::token::intern_and_get_ident; +use syntax::symbol::Symbol; use syntax::ptr::P; use syntax::tokenstream::TokenTree; use syntax::util::small_vector::SmallVector; @@ -46,23 +46,12 @@ fn snake_to_camel(cx: &mut ExtCtxt, sp: Span, tts: &[TokenTree]) -> Box { - let mut updated = (**meta_item).clone(); - if let NameValue(_, Spanned { node: Str(ref mut doc, _), .. }) = updated.node { - let updated_doc = doc.replace("{}", &old_ident); - *doc = intern_and_get_ident(&updated_doc); - } else { - unreachable!() - }; - Some(P(updated)) - } - _ => None, + for attr in item.attrs.iter_mut().filter(|attr| attr.is_sugared_doc) { + if let NameValue(Spanned { node: Str(ref mut doc, _), .. }) = attr.value.node { + *doc = Symbol::intern(&doc.as_str().replace("{}", &old_ident)); + } else { + unreachable!() }; - if let Some(updated) = updated { - *meta_item = updated; - } } MacEager::trait_items(SmallVector::one(item)) @@ -158,7 +147,7 @@ fn convert(ident: &mut Ident) -> String { // The Fut suffix is hardcoded right now; this macro isn't really meant to be general-purpose. camel_ty.push_str("Fut"); - *ident = Ident::with_empty_ctxt(token::intern(&camel_ty)); + *ident = Ident::with_empty_ctxt(Symbol::intern(&camel_ty)); ident_str } diff --git a/src/server.rs b/src/server.rs index 667452e..6f02464 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,17 +3,17 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use REMOTE; +use {REMOTE, net2}; use bincode::serde::DeserializeError; use errors::WireError; -use framed::Framed; -use futures::{self, Async, Future}; +use framed::Proto; +use futures::{self, Async, Future, Stream}; use serde::{Deserialize, Serialize}; use std::io; use std::net::SocketAddr; +use tokio_core::net::TcpListener; use tokio_core::reactor::Handle; -use tokio_proto::easy::multiplex; -use tokio_proto::server::{self, ServerHandle}; +use tokio_proto::BindServer; use tokio_service::NewService; /// A message from server to client. @@ -29,15 +29,15 @@ pub fn listen(addr: SocketAddr, new_service: S) -> ListenFuture E: Serialize + 'static { let (tx, rx) = futures::oneshot(); - REMOTE.spawn(move |handle| Ok(tx.complete(listen_with(addr, new_service, handle)))); + REMOTE.spawn(move |handle| Ok(tx.complete(listen_with(addr, new_service, handle.clone())))); ListenFuture { inner: rx } } /// Spawns a service that binds to the given address using the given handle. pub fn listen_with(addr: SocketAddr, new_service: S, - handle: &Handle) - -> io::Result + handle: Handle) + -> io::Result where S: NewService, Response = Response, Error = io::Error> + Send + 'static, @@ -45,19 +45,49 @@ pub fn listen_with(addr: SocketAddr, Resp: Serialize + 'static, E: Serialize + 'static { - server::listen(handle, addr, move |stream| { - Ok(multiplex::EasyServer::new(new_service.new_service()?, Framed::new(stream)) - .map_err(|()| panic!("What do we do here"))) + let listener = listener(&addr, &handle)?; + let addr = listener.local_addr()?; + + let handle2 = handle.clone(); + let server = listener.incoming().for_each(move |(socket, _)| { + Proto::new().bind_server(&handle2, socket, new_service.new_service()?); + + Ok(()) + }).map_err(|e| error!("While processing incoming connections: {}", e)); + handle.spawn(server); + Ok(addr) +} + +fn listener(addr: &SocketAddr, + handle: &Handle) -> io::Result { + const PENDING_CONNECTION_BACKLOG: i32 = 1024; + + match *addr { + SocketAddr::V4(_) => net2::TcpBuilder::new_v4(), + SocketAddr::V6(_) => net2::TcpBuilder::new_v6() + }? + .reuse_address(true)? + .bind(addr)? + .listen(PENDING_CONNECTION_BACKLOG) + .and_then(|l| { + TcpListener::from_listener(l, addr, handle) }) } /// A future that resolves to a `ServerHandle`. pub struct ListenFuture { - inner: futures::Oneshot>, + inner: futures::Oneshot>, +} + +impl ListenFuture { + #[doc(hidden)] + pub fn from_oneshot(rx: futures::Oneshot>) -> Self { + ListenFuture { inner: rx } + } } impl Future for ListenFuture { - type Item = ServerHandle; + type Item = SocketAddr; type Error = io::Error; fn poll(&mut self) -> futures::Poll { diff --git a/src/util.rs b/src/util.rs index 4166d65..6fd9f2e 100644 --- a/src/util.rs +++ b/src/util.rs @@ -14,6 +14,7 @@ use tokio_core::reactor; /// A bottom type that impls `Error`, `Serialize`, and `Deserialize`. It is impossible to /// instantiate this type. +#[allow(unreachable_code)] #[derive(Debug)] pub struct Never(!); @@ -135,3 +136,18 @@ pub fn spawn_core() -> reactor::Remote { }); rx.recv().unwrap() } + +/// A struct that will format as the contained type if the type impls Debug. +pub struct Debugger<'a, T: 'a>(pub &'a T); + +impl<'a, T: fmt::Debug> fmt::Debug for Debugger<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "{:?}", self.0) + } +} + +impl<'a, T> fmt::Debug for Debugger<'a, T> { + default fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "{{not debuggable}}") + } +}