From 05c6be192d2fa41e4deb201a7ee50d5984ede901 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Wed, 11 Jan 2017 22:26:12 -0800 Subject: [PATCH] Rework the future Connect trait to only have one method, which takes an Options arg. --- benches/latency.rs | 4 +- examples/concurrency.rs | 74 +++++++------ examples/pubsub.rs | 27 +++-- examples/readme_errors.rs | 2 +- examples/readme_expanded.rs | 18 ++-- examples/readme_futures.rs | 10 +- examples/readme_sync.rs | 2 +- examples/server_calling_server.rs | 10 +- examples/throughput.rs | 8 +- examples/two_clients.rs | 2 +- src/client.rs | 171 ++++++++++++++++-------------- src/lib.rs | 59 +++++------ src/macros.rs | 75 +++++-------- src/protocol.rs | 33 +++--- src/server.rs | 36 +++---- src/util.rs | 8 +- 16 files changed, 257 insertions(+), 282 deletions(-) diff --git a/benches/latency.rs b/benches/latency.rs index 4de77a4..a203af1 100644 --- a/benches/latency.rs +++ b/benches/latency.rs @@ -14,10 +14,10 @@ extern crate env_logger; extern crate futures; use futures::Future; -#[cfg(test)] -use test::Bencher; use tarpc::sync::Connect; use tarpc::util::{FirstSocketAddr, Never}; +#[cfg(test)] +use test::Bencher; service! { rpc ack(); diff --git a/examples/concurrency.rs b/examples/concurrency.rs index ab21a7d..e4a37bf 100644 --- a/examples/concurrency.rs +++ b/examples/concurrency.rs @@ -24,7 +24,7 @@ use std::{cmp, thread}; use std::sync::{Arc, mpsc}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::{Duration, Instant}; -use tarpc::future::{Connect}; +use tarpc::client::future::{Connect, Options}; use tarpc::util::{FirstSocketAddr, Never}; use tokio_core::reactor; @@ -73,10 +73,10 @@ trait Microseconds { impl Microseconds for Duration { fn microseconds(&self) -> i64 { - chrono::Duration::from_std(*self) - .unwrap() - .num_microseconds() - .unwrap() + chrono::Duration::from_std(*self) + .unwrap() + .num_microseconds() + .unwrap() } } @@ -101,21 +101,25 @@ fn spawn_core() -> reactor::Remote { rx.recv().unwrap() } -fn run_once(clients: Vec, concurrency: u32) -> impl Future + 'static { +fn run_once(clients: Vec, + concurrency: u32) + -> impl Future + 'static { let start = Instant::now(); let num_clients = clients.len(); futures::stream::futures_unordered((0..concurrency as usize) - .map(|iteration| (iteration + 1, iteration % num_clients)) - .map(|(iteration, client_idx)| { - let client = &clients[client_idx]; - let start = Instant::now(); - debug!("Client {} reading (iteration {})...", client_idx, iteration); - client.read(CHUNK_SIZE) - .map(move |_| (client_idx, iteration, start)) - })) + .map(|iteration| (iteration + 1, iteration % num_clients)) + .map(|(iteration, client_idx)| { + let client = &clients[client_idx]; + let start = Instant::now(); + debug!("Client {} reading (iteration {})...", client_idx, iteration); + client.read(CHUNK_SIZE) + .map(move |_| (client_idx, iteration, start)) + })) .map(|(client_idx, iteration, start)| { let elapsed = start.elapsed(); - debug!("Client {} received reply (iteration {}).", client_idx, iteration); + debug!("Client {} received reply (iteration {}).", + client_idx, + iteration); elapsed }) .map_err(|e| panic!(e)) @@ -128,31 +132,31 @@ fn run_once(clients: Vec, concurrency: u32) -> impl Future Mean={}µs, Min={}µs, Max={}µs, Total={}µs", - stats.count, - stats.sum.microseconds() as f64 / stats.count as f64, - stats.min.unwrap().microseconds(), - stats.max.unwrap().microseconds(), - start.elapsed().microseconds()); + stats.count, + stats.sum.microseconds() as f64 / stats.count as f64, + stats.min.unwrap().microseconds(), + stats.max.unwrap().microseconds(), + start.elapsed().microseconds()); }) } fn main() { let _ = env_logger::init(); let matches = App::new("Tarpc Concurrency") - .about("Demonstrates making concurrent requests to a tarpc service.") - .arg(Arg::with_name("concurrency") - .short("c") - .long("concurrency") - .value_name("LEVEL") - .help("Sets a custom concurrency level") - .takes_value(true)) - .arg(Arg::with_name("clients") - .short("n") - .long("num_clients") - .value_name("AMOUNT") - .help("How many clients to distribute requests between") - .takes_value(true)) - .get_matches(); + .about("Demonstrates making concurrent requests to a tarpc service.") + .arg(Arg::with_name("concurrency") + .short("c") + .long("concurrency") + .value_name("LEVEL") + .help("Sets a custom concurrency level") + .takes_value(true)) + .arg(Arg::with_name("clients") + .short("n") + .long("num_clients") + .value_name("AMOUNT") + .help("How many clients to distribute requests between") + .takes_value(true)) + .get_matches(); let concurrency = matches.value_of("concurrency") .map(&str::parse) .map(Result::unwrap) @@ -170,7 +174,7 @@ fn main() { .map(|i| (i, spawn_core())) .map(|(i, remote)| { info!("Client {} connecting...", i); - FutureClient::connect_remotely(&addr, &remote) + FutureClient::connect(addr, Options::default().remote(remote)) .map_err(|e| panic!(e)) }) // Need an intermediate collection to connect the clients in parallel, diff --git a/examples/pubsub.rs b/examples/pubsub.rs index 26fea69..34a0a09 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -14,14 +14,14 @@ extern crate tokio_proto as tokio; use futures::{BoxFuture, Future}; use publisher::FutureServiceExt as PublisherExt; -use subscriber::FutureServiceExt as SubscriberExt; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; -use tarpc::future::Connect as Fc; -use tarpc::sync::Connect as Sc; +use subscriber::FutureServiceExt as SubscriberExt; +use tarpc::client::future::{Connect as Fc, Options}; +use tarpc::client::sync::Connect as Sc; use tarpc::util::{FirstSocketAddr, Message, Never}; pub mod subscriber { @@ -57,12 +57,10 @@ impl subscriber::FutureService for Subscriber { impl Subscriber { fn new(id: u32) -> SocketAddr { - Subscriber { - id: id, - } - .listen("localhost:0".first_socket_addr()) - .wait() - .unwrap() + Subscriber { id: id } + .listen("localhost:0".first_socket_addr()) + .wait() + .unwrap() } } @@ -88,22 +86,21 @@ impl publisher::FutureService for Publisher { // Ignore failing subscribers. .map(move |client| client.receive(message.clone()).then(|_| Ok(()))) .collect::>()) - .map(|_| ()) - .boxed() + .map(|_| ()) + .boxed() } - type SubscribeFut = BoxFuture<(), Message>; + type SubscribeFut = Box>; fn subscribe(&self, id: u32, address: SocketAddr) -> Self::SubscribeFut { let clients = self.clients.clone(); - subscriber::FutureClient::connect(&address) + Box::new(subscriber::FutureClient::connect(address, Options::default()) .map(move |subscriber| { println!("Subscribing {}.", id); clients.lock().unwrap().insert(id, subscriber); () }) - .map_err(|e| e.to_string().into()) - .boxed() + .map_err(|e| e.to_string().into())) } type UnsubscribeFut = BoxFuture<(), Never>; diff --git a/examples/readme_errors.rs b/examples/readme_errors.rs index 37ef07b..67d4958 100644 --- a/examples/readme_errors.rs +++ b/examples/readme_errors.rs @@ -14,7 +14,7 @@ extern crate serde_derive; use std::error::Error; use std::fmt; -use tarpc::sync::Connect; +use tarpc::client::sync::Connect; service! { rpc hello(name: String) -> String | NoNameGiven; diff --git a/examples/readme_expanded.rs b/examples/readme_expanded.rs index e6fd975..3187481 100644 --- a/examples/readme_expanded.rs +++ b/examples/readme_expanded.rs @@ -22,7 +22,7 @@ use bincode::serde::DeserializeError; use futures::{Future, IntoFuture}; use std::io; use std::net::SocketAddr; -use tarpc::future::Connect; +use tarpc::client::future::{Connect, Options}; use tarpc::util::FirstSocketAddr; use tarpc::util::Never; use tokio_service::Service; @@ -31,9 +31,9 @@ use tokio_service::Service; struct HelloServer; impl HelloServer { - fn listen(addr: SocketAddr) -> impl Future { + fn listen(addr: SocketAddr) -> impl Future { let (tx, rx) = futures::oneshot(); - tarpc::future::REMOTE.spawn(move |handle| { + tarpc::REMOTE.spawn(move |handle| { Ok(tx.complete(tarpc::listen_with(addr, move || Ok(HelloServer), handle.clone()))) }); rx.map_err(|e| panic!(e)).and_then(|result| result) @@ -56,13 +56,13 @@ impl Service for HelloServer { pub struct FutureClient(tarpc::Client); impl FutureClient { - fn connect(addr: &SocketAddr) -> impl Future { - tarpc::Client::connect_remotely(addr, &tarpc::future::REMOTE).map(FutureClient) + fn connect(addr: SocketAddr) -> impl Future { + tarpc::Client::connect(addr, Options::default()).map(FutureClient) } - pub fn hello(&self, name: String) - -> impl Future> + 'static - { + pub fn hello(&self, + name: String) + -> impl Future> + 'static { self.0.call(name).then(|msg| msg.unwrap()) } } @@ -71,7 +71,7 @@ fn main() { let _ = env_logger::init(); let mut core = tokio_core::reactor::Core::new().unwrap(); let addr = HelloServer::listen("localhost:10000".first_socket_addr()).wait().unwrap(); - let f = FutureClient::connect(&addr) + let f = FutureClient::connect(addr) .map_err(tarpc::Error::from) .and_then(|client| { let resp1 = client.hello("Mom".to_string()); diff --git a/examples/readme_futures.rs b/examples/readme_futures.rs index 0afda18..e8a04f3 100644 --- a/examples/readme_futures.rs +++ b/examples/readme_futures.rs @@ -12,7 +12,7 @@ extern crate tarpc; extern crate tokio_core; use futures::Future; -use tarpc::future::Connect; +use tarpc::client::future::{Connect, Options}; use tarpc::util::{FirstSocketAddr, Never}; use tokio_core::reactor; @@ -34,11 +34,11 @@ impl FutureService for HelloServer { fn main() { let addr = "localhost:10000".first_socket_addr(); let mut core = reactor::Core::new().unwrap(); + let handle = core.handle(); HelloServer.listen_with(addr, core.handle()).unwrap(); - core.run( - FutureClient::connect(&addr) + core.run(FutureClient::connect(addr, Options::default().handle(handle)) .map_err(tarpc::Error::from) .and_then(|client| client.hello("Mom".to_string())) - .map(|resp| println!("{}", resp)) - ).unwrap(); + .map(|resp| println!("{}", resp))) + .unwrap(); } diff --git a/examples/readme_sync.rs b/examples/readme_sync.rs index 9e26cb5..4d42d29 100644 --- a/examples/readme_sync.rs +++ b/examples/readme_sync.rs @@ -11,8 +11,8 @@ extern crate futures; #[macro_use] extern crate tarpc; +use tarpc::client::sync::Connect; use tarpc::util::Never; -use tarpc::sync::Connect; service! { rpc hello(name: String) -> String; diff --git a/examples/server_calling_server.rs b/examples/server_calling_server.rs index 8f01d62..180793b 100644 --- a/examples/server_calling_server.rs +++ b/examples/server_calling_server.rs @@ -15,9 +15,9 @@ use add::{FutureService as AddFutureService, FutureServiceExt as AddExt}; use double::{FutureService as DoubleFutureService, FutureServiceExt as DoubleExt}; use futures::{BoxFuture, Future}; use std::sync::{Arc, Mutex}; +use tarpc::client::future::{Connect as Fc, Options}; +use tarpc::client::sync::Connect as Sc; use tarpc::util::{FirstSocketAddr, Message, Never}; -use tarpc::future::Connect as Fc; -use tarpc::sync::Connect as Sc; pub mod add { service! { @@ -53,9 +53,7 @@ struct DoubleServer { impl DoubleServer { fn new(client: add::FutureClient) -> Self { - DoubleServer { - client: Arc::new(Mutex::new(client)), - } + DoubleServer { client: Arc::new(Mutex::new(client)) } } } @@ -75,7 +73,7 @@ impl DoubleFutureService for DoubleServer { fn main() { let _ = env_logger::init(); let add_addr = AddServer.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let add_client = add::FutureClient::connect(&add_addr).wait().unwrap(); + let add_client = add::FutureClient::connect(add_addr, Options::default()).wait().unwrap(); let double = DoubleServer::new(add_client); let double_addr = double.listen("localhost:0".first_socket_addr()).wait().unwrap(); diff --git a/examples/throughput.rs b/examples/throughput.rs index ad63fc6..6afff01 100644 --- a/examples/throughput.rs +++ b/examples/throughput.rs @@ -13,13 +13,13 @@ extern crate tarpc; extern crate env_logger; extern crate futures; -use std::sync::Arc; +use futures::Future; +use std::io::{Read, Write, stdout}; use std::net; +use std::sync::Arc; use std::thread; use std::time; -use std::io::{Read, Write, stdout}; -use futures::Future; -use tarpc::sync::Connect; +use tarpc::client::sync::Connect; use tarpc::util::{FirstSocketAddr, Never}; lazy_static! { diff --git a/examples/two_clients.rs b/examples/two_clients.rs index c8a06b1..be405b1 100644 --- a/examples/two_clients.rs +++ b/examples/two_clients.rs @@ -17,8 +17,8 @@ extern crate futures; use bar::FutureServiceExt as BarExt; use baz::FutureServiceExt as BazExt; use futures::Future; +use tarpc::client::sync::Connect; use tarpc::util::{FirstSocketAddr, Never}; -use tarpc::sync::Connect; mod bar { service! { diff --git a/src/client.rs b/src/client.rs index 7afdee7..1c80360 100644 --- a/src/client.rs +++ b/src/client.rs @@ -25,10 +25,11 @@ type BindClient = /// /// Typically, this would be combined with a serialization pre-processing step /// and a deserialization post-processing step. +#[doc(hidden)] pub struct Client where Req: Serialize + 'static, Resp: Deserialize + 'static, - E: Deserialize + 'static, + E: Deserialize + 'static { inner: BindClient, } @@ -36,12 +37,10 @@ pub struct Client impl Clone for Client where Req: Serialize + 'static, Resp: Deserialize + 'static, - E: Deserialize + 'static, + E: Deserialize + 'static { fn clone(&self) -> Self { - Client { - inner: self.inner.clone(), - } + Client { inner: self.inner.clone() } } } @@ -63,16 +62,14 @@ impl Service for Client impl Client where Req: Serialize + 'static, Resp: Deserialize + 'static, - E: Deserialize + 'static, + E: Deserialize + 'static { fn new(inner: BindClient) -> Self where Req: Serialize + Sync + Send + 'static, Resp: Deserialize + Sync + Send + 'static, E: Deserialize + Sync + Send + 'static { - Client { - inner: inner, - } + Client { inner: inner } } fn map_err(resp: WireResponse) -> Result> { @@ -85,7 +82,7 @@ impl Client impl fmt::Debug for Client where Req: Serialize + 'static, Resp: Deserialize + 'static, - E: Deserialize + 'static, + E: Deserialize + 'static { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { write!(f, "Client {{ .. }}") @@ -94,73 +91,70 @@ impl fmt::Debug for Client /// Exposes a trait for connecting asynchronously to servers. pub mod future { - use future::REMOTE; - use futures::{self, Async, Future}; + use REMOTE; + use futures::{self, Async, Future, future}; use protocol::Proto; use serde::{Deserialize, Serialize}; use std::io; use std::marker::PhantomData; use std::net::SocketAddr; use super::Client; - use tokio_core::net::TcpStream; use tokio_core::{self, reactor}; + use tokio_core::net::TcpStream; use tokio_proto::BindClient; - /// Types that can connect to a server asynchronously. - pub trait Connect<'a>: Sized { - /// The type of the future returned when calling `connect`. - type ConnectFut: Future + 'static; + /// Additional options to configure how the client connects. + #[derive(Clone, Default)] + pub struct Options { + reactor: Option, + } - /// The type of the future returned when calling `connect_with`. - type ConnectWithFut: Future + 'a; - - /// Connects to a server located at the given address, using a remote to the default - /// reactor. - fn connect(addr: &SocketAddr) -> Self::ConnectFut { - Self::connect_remotely(addr, &REMOTE) + impl Options { + /// Connect using the given reactor handle. + pub fn handle(mut self, handle: reactor::Handle) -> Self { + self.reactor = Some(Reactor::Handle(handle)); + self } - /// Connects to a server located at the given address, using the given reactor remote. - fn connect_remotely(addr: &SocketAddr, remote: &reactor::Remote) -> Self::ConnectFut; + /// Connect using the given reactor remote. + pub fn remote(mut self, remote: reactor::Remote) -> Self { + self.reactor = Some(Reactor::Remote(remote)); + self + } + } - /// Connects to a server located at the given address, using the given reactor handle. - fn connect_with(addr: &SocketAddr, handle: &'a reactor::Handle) -> Self::ConnectWithFut; + #[derive(Clone)] + enum Reactor { + Handle(reactor::Handle), + Remote(reactor::Remote), + } + + /// Types that can connect to a server asynchronously. + pub trait Connect: Sized { + /// The type of the future returned when calling `connect`. + type ConnectFut: Future; + + /// Connects to a server located at the given address, using the given options. + fn connect(addr: SocketAddr, options: Options) -> Self::ConnectFut; } /// A future that resolves to a `Client` or an `io::Error`. + #[doc(hidden)] pub struct ConnectFuture where Req: Serialize + 'static, Resp: Deserialize + 'static, E: Deserialize + 'static, { - inner: futures::Oneshot>>, + inner: + future::Either< + futures::Map>, + futures::Flatten< + futures::MapErr< + futures::Oneshot>>, + fn(futures::Canceled) -> io::Error>>>, } impl Future for ConnectFuture - where Req: Serialize + 'static, - Resp: Deserialize + 'static, - E: Deserialize + 'static, - { - type Item = Client; - type Error = io::Error; - - fn poll(&mut self) -> futures::Poll { - // Ok to unwrap because we ensure the oneshot is always completed. - match self.inner.poll().unwrap() { - Async::Ready(Ok(client)) => Ok(Async::Ready(client)), - Async::Ready(Err(err)) => Err(err), - Async::NotReady => Ok(Async::NotReady), - } - } - } - - /// A future that resolves to a `Client` or an `io::Error`. - pub struct ConnectWithFuture<'a, Req, Resp, E> { - inner: futures::Map>, - } - - impl<'a, Req, Resp, E> Future for ConnectWithFuture<'a, Req, Resp, E> where Req: Serialize + Sync + Send + 'static, Resp: Deserialize + Sync + Send + 'static, E: Deserialize + Sync + Send + 'static @@ -169,19 +163,23 @@ pub mod future { type Error = io::Error; fn poll(&mut self) -> futures::Poll { - self.inner.poll() + // Ok to unwrap because we ensure the oneshot is always completed. + match Future::poll(&mut self.inner)? { + Async::Ready(client) => Ok(Async::Ready(client)), + Async::NotReady => Ok(Async::NotReady), + } } } - struct MultiplexConnect<'a, Req, Resp, E>(&'a reactor::Handle, PhantomData<(Req, Resp, E)>); + struct MultiplexConnect(reactor::Handle, PhantomData<(Req, Resp, E)>); - impl<'a, Req, Resp, E> MultiplexConnect<'a, Req, Resp, E> { - fn new(handle: &'a reactor::Handle) -> Self { + impl MultiplexConnect { + fn new(handle: reactor::Handle) -> Self { MultiplexConnect(handle, PhantomData) } } - impl<'a, Req, Resp, E> FnOnce<(TcpStream,)> for MultiplexConnect<'a, Req, Resp, E> + impl FnOnce<(TcpStream,)> for MultiplexConnect where Req: Serialize + Sync + Send + 'static, Resp: Deserialize + Sync + Send + 'static, E: Deserialize + Sync + Send + 'static @@ -189,37 +187,49 @@ pub mod future { type Output = Client; extern "rust-call" fn call_once(self, (tcp,): (TcpStream,)) -> Client { - Client::new(Proto::new().bind_client(self.0, tcp)) + Client::new(Proto::new().bind_client(&self.0, tcp)) } } - impl<'a, Req, Resp, E> Connect<'a> for Client + impl Connect for Client where Req: Serialize + Sync + Send + 'static, Resp: Deserialize + Sync + Send + 'static, E: Deserialize + Sync + Send + 'static { type ConnectFut = ConnectFuture; - type ConnectWithFut = ConnectWithFuture<'a, Req, Resp, E>; - fn connect_remotely(addr: &SocketAddr, remote: &reactor::Remote) -> Self::ConnectFut { - let addr = *addr; - let (tx, rx) = futures::oneshot(); - remote.spawn(move |handle| { - let handle2 = handle.clone(); - TcpStream::connect(&addr, handle) - .map(move |tcp| Client::new(Proto::new().bind_client(&handle2, tcp))) - .then(move |result| { - tx.complete(result); - Ok(()) - }) - }); - ConnectFuture { inner: rx } - } - - fn connect_with(addr: &SocketAddr, handle: &'a reactor::Handle) -> Self::ConnectWithFut { - ConnectWithFuture { - inner: TcpStream::connect(addr, handle).map(MultiplexConnect::new(handle)) + fn connect(addr: SocketAddr, options: Options) -> Self::ConnectFut { + let setup = move |tx: futures::sync::oneshot::Sender<_>| { + move |handle: &reactor::Handle| { + let handle2 = handle.clone(); + TcpStream::connect(&addr, handle) + .map(move |tcp| Client::new(Proto::new().bind_client(&handle2, tcp))) + .then(move |result| { + tx.complete(result); + Ok(()) + }) + } + }; + let rx = match options.reactor { + Some(Reactor::Handle(handle)) => { + let tcp = TcpStream::connect(&addr, &handle).map(MultiplexConnect::new(handle)); + return ConnectFuture { inner: future::Either::A(tcp) }; + } + Some(Reactor::Remote(remote)) => { + let (tx, rx) = futures::oneshot(); + remote.spawn(setup(tx)); + rx + } + None => { + let (tx, rx) = futures::oneshot(); + REMOTE.spawn(setup(tx)); + rx + } + }; + fn panic(canceled: futures::Canceled) -> io::Error { + unreachable!(canceled) } + ConnectFuture { inner: future::Either::B(rx.map_err(panic as fn(_) -> _).flatten()) } } } } @@ -253,7 +263,8 @@ pub mod sync { "`ToSocketAddrs::to_socket_addrs` returned an empty \ iterator.")); }; - ::connect(&addr).wait() + ::connect(addr, super::future::Options::default()) + .wait() } } } diff --git a/src/lib.rs b/src/lib.rs index 41a2992..0b4678b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,7 +34,7 @@ //! #[macro_use] //! extern crate tarpc; //! -//! use tarpc::sync::Connect; +//! use tarpc::client::sync::Connect; //! use tarpc::util::Never; //! //! service! { @@ -59,7 +59,8 @@ //! ``` //! #![deny(missing_docs)] -#![feature(plugin, conservative_impl_trait, never_type, unboxed_closures, fn_traits, specialization)] +#![feature(plugin, conservative_impl_trait, never_type, unboxed_closures, fn_traits, + specialization)] #![plugin(tarpc_plugins)] extern crate byteorder; @@ -89,7 +90,7 @@ pub extern crate tokio_service; #[doc(hidden)] pub use client::Client; #[doc(hidden)] -pub use client::future::{ConnectFuture, ConnectWithFuture}; +pub use client::future::ConnectFuture; pub use errors::Error; #[doc(hidden)] pub use errors::WireError; @@ -104,7 +105,7 @@ pub mod util; #[macro_use] mod macros; /// Provides the base client stubs used by the service macro. -mod client; +pub mod client; /// Provides the base server boilerplate used by service implementations. mod server; /// Provides implementations of `ClientProto` and `ServerProto` that implement the tarpc protocol. @@ -113,37 +114,27 @@ mod protocol; /// Provides a few different error types. mod errors; -/// Utility specific to synchronous implementation. -pub mod sync { - pub use client::sync::*; +use std::sync::mpsc; +use std::thread; +use tokio_core::reactor; + +lazy_static! { + /// The `Remote` for the default reactor core. + #[doc(hidden)] + pub static ref REMOTE: reactor::Remote = { + spawn_core() + }; } -/// Utility specific to futures implementation. -pub mod future { - pub use client::future::*; - use futures; - use tokio_core::reactor; - use std::thread; - use std::sync::mpsc; - - lazy_static! { - /// The `Remote` for the default reactor core. - pub static ref REMOTE: reactor::Remote = { - spawn_core() - }; - } - - /// Spawns a `reactor::Core` running forever on a new thread. - fn spawn_core() -> reactor::Remote { - let (tx, rx) = mpsc::channel(); - thread::spawn(move || { - let mut core = reactor::Core::new().unwrap(); - tx.send(core.handle().remote().clone()).unwrap(); - - // Run forever - core.run(futures::empty::<(), !>()).unwrap(); - }); - rx.recv().unwrap() - } +/// Spawns a `reactor::Core` running forever on a new thread. +fn spawn_core() -> reactor::Remote { + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + let mut core = reactor::Core::new().unwrap(); + tx.send(core.handle().remote().clone()).unwrap(); + // Run forever + core.run(futures::empty::<(), !>()).unwrap(); + }); + rx.recv().unwrap() } diff --git a/src/macros.rs b/src/macros.rs index 8d463ee..ac3af6d 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -330,7 +330,7 @@ macro_rules! service { fn listen(self, addr: ::std::net::SocketAddr) -> $crate::ListenFuture { let (tx, rx) = $crate::futures::oneshot(); - $crate::future::REMOTE.spawn(move |handle| + $crate::REMOTE.spawn(move |handle| Ok(tx.complete(Self::listen_with(self, addr, handle.clone())))); @@ -476,7 +476,9 @@ macro_rules! service { { let addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?; let (tx, rx) = $crate::futures::oneshot(); - $crate::future::REMOTE.spawn(move |handle| Ok(tx.complete(Self::listen_with(self, addr, handle.clone())))); + $crate::REMOTE.spawn(move |handle| { + Ok(tx.complete(Self::listen_with(self, addr, handle.clone()))) + }); $crate::futures::Future::wait($crate::ListenFuture::from_oneshot(rx)) } @@ -547,12 +549,13 @@ macro_rules! service { /// The client stub that makes RPC calls to the server. Exposes a blocking interface. pub struct SyncClient(FutureClient); - impl $crate::sync::Connect for SyncClient { + impl $crate::client::sync::Connect for SyncClient { fn connect(addr: A) -> ::std::result::Result where A: ::std::net::ToSocketAddrs, { let addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?; - let client = ::connect(&addr); + let client = ::connect( + addr, $crate::client::future::Options::default()); let client = $crate::futures::Future::wait(client)?; let client = SyncClient(client); ::std::result::Result::Ok(client) @@ -596,57 +599,25 @@ macro_rules! service { } } - #[allow(non_camel_case_types)] - /// Implementation detail: Pending connection. - pub struct __tarpc_service_ConnectWithFuture<'a, T> { - inner: $crate::futures::Map<$crate::ConnectWithFuture<'a, - __tarpc_service_Request, - __tarpc_service_Response, - __tarpc_service_Error>, - fn(__tarpc_service_Client) -> T>, - } - - impl<'a, T> $crate::futures::Future for __tarpc_service_ConnectWithFuture<'a, T> { - type Item = T; - type Error = ::std::io::Error; - - fn poll(&mut self) -> $crate::futures::Poll { - $crate::futures::Future::poll(&mut self.inner) - } - } - #[allow(unused)] #[derive(Clone, Debug)] /// The client stub that makes RPC calls to the server. Exposes a Future interface. pub struct FutureClient(__tarpc_service_Client); - impl<'a> $crate::future::Connect<'a> for FutureClient { + impl<'a> $crate::client::future::Connect for FutureClient { type ConnectFut = __tarpc_service_ConnectFuture; - type ConnectWithFut = __tarpc_service_ConnectWithFuture<'a, Self>; - fn connect_remotely(__tarpc_service_addr: &::std::net::SocketAddr, - __tarpc_service_remote: &$crate::tokio_core::reactor::Remote) + fn connect(__tarpc_service_addr: ::std::net::SocketAddr, + __tarpc_service_options: $crate::client::future::Options) -> Self::ConnectFut { - let client = <__tarpc_service_Client as $crate::future::Connect>::connect_remotely( - __tarpc_service_addr, __tarpc_service_remote); + let client = <__tarpc_service_Client as $crate::client::future::Connect>::connect( + __tarpc_service_addr, __tarpc_service_options); __tarpc_service_ConnectFuture { inner: $crate::futures::Future::map(client, FutureClient) } } - - fn connect_with(__tarpc_service_addr: &::std::net::SocketAddr, - __tarpc_service_handle: &'a $crate::tokio_core::reactor::Handle) - -> Self::ConnectWithFut - { - let client = <__tarpc_service_Client as $crate::future::Connect>::connect_with( - __tarpc_service_addr, __tarpc_service_handle); - - __tarpc_service_ConnectWithFuture { - inner: $crate::futures::Future::map(client, FutureClient) - } - } } impl FutureClient { @@ -746,9 +717,9 @@ mod functional_test { } mod sync { + use client::sync::Connect; use super::{SyncClient, SyncService, SyncServiceExt}; use super::env_logger; - use sync::Connect; use util::FirstSocketAddr; use util::Never; @@ -777,7 +748,8 @@ mod functional_test { fn other_service() { let _ = env_logger::init(); let addr = Server.listen("localhost:0".first_socket_addr()).unwrap(); - let client = super::other_service::SyncClient::connect(addr).expect("Could not connect!"); + let client = super::other_service::SyncClient::connect(addr) + .expect("Could not connect!"); match client.foo().err().unwrap() { ::Error::ServerDeserialize(_) => {} // good bad => panic!("Expected Error::ServerDeserialize but got {}", bad), @@ -786,7 +758,7 @@ mod functional_test { } mod future { - use future::Connect; + use client::future::{Connect, Options}; use futures::{Finished, Future, finished}; use super::{FutureClient, FutureService, FutureServiceExt}; use super::env_logger; @@ -814,7 +786,7 @@ mod functional_test { fn simple() { let _ = env_logger::init(); let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let client = FutureClient::connect(&addr).wait().unwrap(); + let client = FutureClient::connect(addr, Options::default()).wait().unwrap(); assert_eq!(3, client.add(1, 2).wait().unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap()); } @@ -823,7 +795,7 @@ mod functional_test { fn concurrent() { let _ = env_logger::init(); let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let client = FutureClient::connect(&addr).wait().unwrap(); + let client = FutureClient::connect(addr, Options::default()).wait().unwrap(); let req1 = client.add(1, 2); let req2 = client.add(3, 4); let req3 = client.hey("Tim".to_string()); @@ -836,8 +808,9 @@ mod functional_test { fn other_service() { let _ = env_logger::init(); let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let client = - super::other_service::FutureClient::connect(&addr).wait().unwrap(); + let client = super::other_service::FutureClient::connect(addr, Options::default()) + .wait() + .unwrap(); match client.foo().wait().err().unwrap() { ::Error::ServerDeserialize(_) => {} // good bad => panic!(r#"Expected Error::ServerDeserialize but got "{}""#, bad), @@ -865,14 +838,14 @@ mod functional_test { #[test] fn error() { - use future::Connect as Fc; - use sync::Connect as Sc; + use client::future::{Connect as Fc, Options}; + use client::sync::Connect as Sc; use std::error::Error as E; use self::error_service::*; let _ = env_logger::init(); let addr = ErrorServer.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let client = FutureClient::connect(&addr).wait().unwrap(); + let client = FutureClient::connect(addr, Options::default()).wait().unwrap(); client.bar() .then(move |result| { match result.err().unwrap() { diff --git a/src/protocol.rs b/src/protocol.rs index e3fc61e..b973b38 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -10,8 +10,8 @@ use std::io::{self, Cursor}; use std::marker::PhantomData; use std::mem; use tokio_core::io::{EasyBuf, Framed, Io}; -use tokio_proto::streaming::multiplex::RequestId; use tokio_proto::multiplex::{ClientProto, ServerProto}; +use tokio_proto::streaming::multiplex::RequestId; use util::Debugger; // `Encode` is the type that `Codec` encodes. `Decode` is the type it decodes. @@ -37,7 +37,7 @@ impl Codec { impl tokio_core::io::Codec for Codec where Encode: serde::Serialize, - Decode: serde::Deserialize, + Decode: serde::Deserialize { type Out = (RequestId, Encode); type In = (RequestId, Result); @@ -62,7 +62,7 @@ impl tokio_core::io::Codec for Codec match self.state { Id if buf.len() < mem::size_of::() => { trace!("--> Buf len is {}; waiting for 8 to parse id.", buf.len()); - return Ok(None) + return Ok(None); } Id => { let mut id_buf = buf.drain_to(mem::size_of::()); @@ -71,22 +71,23 @@ impl tokio_core::io::Codec for Codec self.state = Len { id: id }; } Len { .. } if buf.len() < mem::size_of::() => { - trace!("--> Buf len is {}; waiting for 8 to parse packet length.", buf.len()); - return Ok(None) + trace!("--> Buf len is {}; waiting for 8 to parse packet length.", + buf.len()); + return Ok(None); } Len { id } => { let len_buf = buf.drain_to(mem::size_of::()); let len = Cursor::new(len_buf).read_u64::()?; trace!("--> Parsed payload length = {}, remaining buffer length = {}", - len, buf.len()); - self.state = Payload { - id: id, - len: len, - }; + len, + buf.len()); + self.state = Payload { id: id, len: len }; } Payload { len, .. } if buf.len() < len as usize => { - trace!("--> Buf len is {}; waiting for {} to parse payload.", buf.len(), len); - return Ok(None) + trace!("--> Buf len is {}; waiting for {} to parse payload.", + buf.len(), + len); + return Ok(None); } Payload { id, len } => { let payload = buf.drain_to(len as usize); @@ -117,7 +118,7 @@ impl Proto { impl ServerProto for Proto where T: Io + 'static, Encode: serde::Serialize + 'static, - Decode: serde::Deserialize + 'static, + Decode: serde::Deserialize + 'static { type Response = Encode; type Request = Result; @@ -132,7 +133,7 @@ impl ServerProto for Proto impl ClientProto for Proto where T: Io + 'static, Encode: serde::Serialize + 'static, - Decode: serde::Deserialize + 'static, + Decode: serde::Deserialize + 'static { type Response = Result; type Request = Encode; @@ -157,8 +158,8 @@ fn serialize() { let mut codec: Codec<(char, char, char), (char, char, char)> = Codec::new(); codec.encode(MSG, &mut vec).unwrap(); buf.get_mut().append(&mut vec); - let actual: Result)>, io::Error> = - codec.decode(&mut buf); + let actual: Result)>, + io::Error> = codec.decode(&mut buf); match actual { Ok(Some((id, ref v))) if id == MSG.0 && *v.as_ref().unwrap() == MSG.1 => {} diff --git a/src/server.rs b/src/server.rs index 54bd951..10b3c83 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,12 +3,12 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use net2; +use REMOTE; use bincode::serde::DeserializeError; use errors::WireError; -use future::REMOTE; -use protocol::Proto; use futures::{self, Async, Future, Stream}; +use net2; +use protocol::Proto; use serde::{Deserialize, Serialize}; use std::io; use std::net::SocketAddr; @@ -50,29 +50,29 @@ pub fn listen_with(addr: SocketAddr, let addr = listener.local_addr()?; let handle2 = handle.clone(); - let server = listener.incoming().for_each(move |(socket, _)| { - Proto::new().bind_server(&handle2, socket, new_service.new_service()?); + let server = listener.incoming() + .for_each(move |(socket, _)| { + Proto::new().bind_server(&handle2, socket, new_service.new_service()?); - Ok(()) - }).map_err(|e| error!("While processing incoming connections: {}", e)); + Ok(()) + }) + .map_err(|e| error!("While processing incoming connections: {}", e)); handle.spawn(server); Ok(addr) } -fn listener(addr: &SocketAddr, - handle: &Handle) -> io::Result { +fn listener(addr: &SocketAddr, handle: &Handle) -> io::Result { const PENDING_CONNECTION_BACKLOG: i32 = 1024; match *addr { - SocketAddr::V4(_) => net2::TcpBuilder::new_v4(), - SocketAddr::V6(_) => net2::TcpBuilder::new_v6() - }? - .reuse_address(true)? - .bind(addr)? - .listen(PENDING_CONNECTION_BACKLOG) - .and_then(|l| { - TcpListener::from_listener(l, addr, handle) - }) + SocketAddr::V4(_) => net2::TcpBuilder::new_v4(), + SocketAddr::V6(_) => net2::TcpBuilder::new_v6(), + } + ? + .reuse_address(true)? + .bind(addr)? + .listen(PENDING_CONNECTION_BACKLOG) + .and_then(|l| TcpListener::from_listener(l, addr, handle)) } /// A future that resolves to a `ServerHandle`. diff --git a/src/util.rs b/src/util.rs index 64a4113..7525c9b 100644 --- a/src/util.rs +++ b/src/util.rs @@ -6,8 +6,8 @@ use futures::{Future, Poll}; use futures::stream::Stream; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use std::error::Error; use std::{fmt, io}; +use std::error::Error; use std::net::{SocketAddr, ToSocketAddrs}; /// A bottom type that impls `Error`, `Serialize`, and `Deserialize`. It is impossible to @@ -102,10 +102,10 @@ pub trait FirstSocketAddr: ToSocketAddrs { /// Returns the first resolved `SocketAddr`, if one exists. fn try_first_socket_addr(&self) -> io::Result { if let Some(a) = self.to_socket_addrs()?.next() { - Ok(a) + Ok(a) } else { - Err(io::Error::new(io::ErrorKind::AddrNotAvailable, - "`ToSocketAddrs::to_socket_addrs` returned an empty iterator.")) + Err(io::Error::new(io::ErrorKind::AddrNotAvailable, + "`ToSocketAddrs::to_socket_addrs` returned an empty iterator.")) } }