From d0d65c413a8d2ab6336947ca4ae76163bd6633cf Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sat, 25 Feb 2017 01:30:58 -0800 Subject: [PATCH 1/2] Make Client Send again (and Clone too!). The basic strategy is to start a reactor on a dedicated thread running a request stream. Requests are spawned onto the reactor, allowing multiple requests to be processed concurrently. For example, if you clone the client to make requests from multiple threads, they won't have to wait for each others' requests to complete before theirs start being sent out. Also, client rpcs only take &self now, which was also required for clients to be usable in a service. Also added a test to prevent regressions. --- README.md | 2 +- examples/readme_errors.rs | 2 +- examples/readme_sync.rs | 2 +- examples/sync_server_calling_server.rs | 95 ++++++++++++++++++++++++++ examples/throughput.rs | 2 +- examples/two_clients.rs | 4 +- src/client.rs | 86 +++++++++++++++++------ src/macros.rs | 17 ++--- 8 files changed, 174 insertions(+), 36 deletions(-) create mode 100644 examples/sync_server_calling_server.rs diff --git a/README.md b/README.md index de47c22..da525aa 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ fn main() { tx.send(handle.addr()).unwrap(); handle.run(); }); - let mut client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap(); + let client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); } ``` diff --git a/examples/readme_errors.rs b/examples/readme_errors.rs index c823732..dc3bf19 100644 --- a/examples/readme_errors.rs +++ b/examples/readme_errors.rs @@ -59,7 +59,7 @@ fn main() { tx.send(handle.addr()).unwrap(); handle.run(); }); - let mut client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap(); + let client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); println!("{}", client.hello("".to_string()).unwrap_err()); } diff --git a/examples/readme_sync.rs b/examples/readme_sync.rs index 5ded1a4..44b3591 100644 --- a/examples/readme_sync.rs +++ b/examples/readme_sync.rs @@ -38,6 +38,6 @@ fn main() { tx.send(handle.addr()).unwrap(); handle.run(); }); - let mut client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap(); + let client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); } diff --git a/examples/sync_server_calling_server.rs b/examples/sync_server_calling_server.rs new file mode 100644 index 0000000..6441b1f --- /dev/null +++ b/examples/sync_server_calling_server.rs @@ -0,0 +1,95 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the MIT License, . +// This file may not be copied, modified, or distributed except according to those terms. + +#![feature(plugin)] +#![plugin(tarpc_plugins)] + +extern crate env_logger; +#[macro_use] +extern crate tarpc; +extern crate futures; +extern crate tokio_core; + +use add::{SyncService as AddSyncService, SyncServiceExt as AddExt}; +use double::{SyncService as DoubleSyncService, SyncServiceExt as DoubleExt}; +use std::sync::mpsc; +use std::thread; +use tarpc::{client, server}; +use tarpc::client::sync::ClientExt as Fc; +use tarpc::util::{FirstSocketAddr, Message, Never}; + +pub mod add { + service! { + /// Add two ints together. + rpc add(x: i32, y: i32) -> i32; + } +} + +pub mod double { + use tarpc::util::Message; + + service! { + /// 2 * x + rpc double(x: i32) -> i32 | Message; + } +} + +#[derive(Clone)] +struct AddServer; + +impl AddSyncService for AddServer { + fn add(&self, x: i32, y: i32) -> Result { + Ok(x + y) + } +} + +#[derive(Clone)] +struct DoubleServer { + client: add::SyncClient, +} + +impl DoubleServer { + fn new(client: add::SyncClient) -> Self { + DoubleServer { client: client } + } +} + +impl DoubleSyncService for DoubleServer { + fn double(&self, x: i32) -> Result { + self.client + .add(x, x) + .map_err(|e| e.to_string().into()) + } +} + +fn main() { + let _ = env_logger::init(); + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + let handle = AddServer.listen("localhost:0".first_socket_addr(), + server::Options::default()).unwrap(); + tx.send(handle.addr()).unwrap(); + handle.run(); + }); + + + let add = rx.recv().unwrap(); + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + let add_client = add::SyncClient::connect(add, client::Options::default()).unwrap(); + let handle = DoubleServer::new(add_client) + .listen("localhost:0".first_socket_addr(), server::Options::default()) + .unwrap(); + tx.send(handle.addr()).unwrap(); + handle.run(); + }); + let double = rx.recv().unwrap(); + + let double_client = double::SyncClient::connect(double, client::Options::default()).unwrap(); + for i in 0..5 { + let doubled = double_client.double(i).unwrap(); + println!("{:?}", doubled); + } +} diff --git a/examples/throughput.rs b/examples/throughput.rs index 421bc53..fd5e26f 100644 --- a/examples/throughput.rs +++ b/examples/throughput.rs @@ -66,7 +66,7 @@ fn bench_tarpc(target: u64) { tx.send(addr).unwrap(); reactor.run(server).unwrap(); }); - let mut client = SyncClient::connect(rx.recv().unwrap().addr(), client::Options::default()) + let client = SyncClient::connect(rx.recv().unwrap().addr(), client::Options::default()) .unwrap(); let start = time::Instant::now(); let mut nread = 0; diff --git a/examples/two_clients.rs b/examples/two_clients.rs index 55dd0ba..df0eb11 100644 --- a/examples/two_clients.rs +++ b/examples/two_clients.rs @@ -62,7 +62,7 @@ macro_rules! pos { fn main() { let _ = env_logger::init(); - let mut bar_client = { + let bar_client = { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let mut reactor = reactor::Core::new().unwrap(); @@ -77,7 +77,7 @@ fn main() { bar::SyncClient::connect(handle.addr(), client::Options::default()).unwrap() }; - let mut baz_client = { + let baz_client = { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let mut reactor = reactor::Core::new().unwrap(); diff --git a/src/client.rs b/src/client.rs index 00543c0..b0e9fd3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -264,32 +264,33 @@ pub mod future { /// Exposes a trait for connecting synchronously to servers. pub mod sync { + use futures::{self, Future, Stream}; use super::Options; - use super::Reactor; use super::future::{Client as FutureClient, ClientExt as FutureClientExt}; use serde::{Deserialize, Serialize}; use std::fmt; use std::io; use std::net::ToSocketAddrs; + use std::sync::mpsc; + use std::thread; use tokio_core::reactor; use tokio_service::Service; use util::FirstSocketAddr; #[doc(hidden)] - pub struct Client - where Req: Serialize + 'static, - Resp: Deserialize + 'static, - E: Deserialize + 'static - { - inner: FutureClient, - reactor: reactor::Core, + pub struct Client { + request: futures::sync::mpsc::UnboundedSender<(Req, mpsc::Sender>>)>, } - impl fmt::Debug for Client - where Req: Serialize + 'static, - Resp: Deserialize + 'static, - E: Deserialize + 'static - { + impl Clone for Client { + fn clone(&self) -> Self { + Client { + request: self.request.clone(), + } + } + } + + impl fmt::Debug for Client { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { write!(f, "Client {{ .. }}") } @@ -301,8 +302,10 @@ pub mod sync { E: Deserialize + Sync + Send + 'static { /// Drives an RPC call for the given request. - pub fn call(&mut self, request: Req) -> Result> { - self.reactor.run(self.inner.call(request)) + pub fn call(&self, request: Req) -> Result> { + let (tx, rx) = mpsc::channel(); + self.request.send((request, tx)).unwrap(); + rx.recv().unwrap() } } @@ -317,16 +320,55 @@ pub mod sync { Resp: Deserialize + Sync + Send + 'static, E: Deserialize + Sync + Send + 'static { - fn connect(addr: A, mut options: Options) -> io::Result + fn connect(addr: A, _options: Options) -> io::Result where A: ToSocketAddrs { - let mut reactor = reactor::Core::new()?; let addr = addr.try_first_socket_addr()?; - options.reactor = Some(Reactor::Handle(reactor.handle())); - Ok(Client { - inner: reactor.run(FutureClient::connect(addr, options))?, - reactor: reactor, - }) + let (connect_tx, connect_rx) = mpsc::channel(); + let (request, request_rx) = futures::sync::mpsc::unbounded(); + #[cfg(feature = "tls")] + let tls_ctx = _options.tls_ctx; + thread::spawn(move || { + let mut reactor = match reactor::Core::new() { + Ok(reactor) => reactor, + Err(e) => { + connect_tx.send(Err(e)).unwrap(); + return; + } + }; + let options; + #[cfg(feature = "tls")] + { + let mut opts = Options::default().handle(reactor.handle()); + opts.tls_ctx = tls_ctx; + options = opts; + } + #[cfg(not(feature = "tls"))] + { + options = Options::default().handle(reactor.handle()); + } + let client = match reactor.run(FutureClient::connect(addr, options)) { + Ok(client) => { + connect_tx.send(Ok(())).unwrap(); + client + } + Err(e) => { + connect_tx.send(Err(e)).unwrap(); + return; + } + }; + let handle = reactor.handle(); + let requests = request_rx.for_each(|(request, response_tx): (_, mpsc::Sender<_>)| { + handle.spawn(client.call(request) + .then(move |response| { + Ok(response_tx.send(response).unwrap()) + })); + Ok(()) + }); + reactor.run(requests).unwrap(); + }); + connect_rx.recv().unwrap()?; + Ok(Client { request }) } } } diff --git a/src/macros.rs b/src/macros.rs index 6a07784..3e23edf 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -588,8 +588,9 @@ macro_rules! service { impl FutureServiceExt for A where A: FutureService {} impl SyncServiceExt for S where S: SyncService {} - #[allow(unused)] /// The client stub that makes RPC calls to the server. Exposes a blocking interface. + #[allow(unused)] + #[derive(Clone)] pub struct SyncClient { inner: tarpc_service_SyncClient__, } @@ -616,7 +617,7 @@ macro_rules! service { $( #[allow(unused)] $(#[$attr])* - pub fn $fn_name(&mut self, $($arg: $in_),*) + pub fn $fn_name(&self, $($arg: $in_),*) -> ::std::result::Result<$out, $crate::Error<$error>> { return then__(self.inner.call(tarpc_service_Request__::$fn_name(($($arg,)*)))); @@ -934,7 +935,7 @@ mod functional_test { -> io::Result<(server::future::Handle, reactor::Core, Listen)> where S: FutureServiceExt { - let mut reactor = reactor::Core::new()?; + let reactor = reactor::Core::new()?; let server_options = get_tls_server_options(); let (handle, server) = server.listen("localhost:0".first_socket_addr(), &reactor.handle(), @@ -1056,7 +1057,7 @@ mod functional_test { #[test] fn simple() { let _ = env_logger::init(); - let (_, mut client, _) = unwrap!(start_server_with_sync_client::(Server)); assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap()); @@ -1067,7 +1068,7 @@ mod functional_test { use futures::Future; let _ = env_logger::init(); - let (addr, mut client, shutdown) = + let (addr, client, shutdown) = unwrap!(start_server_with_sync_client::(Server)); assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap()); @@ -1078,7 +1079,7 @@ mod functional_test { let (tx2, rx2) = ::std::sync::mpsc::channel(); let shutdown2 = shutdown.clone(); ::std::thread::spawn(move || { - let mut client = get_sync_client::(addr).unwrap(); + let client = get_sync_client::(addr).unwrap(); tx.send(()).unwrap(); let add = client.add(3, 2).unwrap(); drop(client); @@ -1098,7 +1099,7 @@ mod functional_test { #[test] fn no_shutdown() { let _ = env_logger::init(); - let (addr, mut client, shutdown) = + let (addr, client, shutdown) = unwrap!(start_server_with_sync_client::(Server)); assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap()); @@ -1114,7 +1115,7 @@ mod functional_test { #[test] fn other_service() { let _ = env_logger::init(); - let (_, mut client, _) = + let (_, client, _) = unwrap!(start_server_with_sync_client::(Server)); match client.foo().err().expect("failed unwrap") { From 073bc25e186489dcdd1a01b5c11c377446a2d0fd Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sat, 25 Feb 2017 01:56:25 -0800 Subject: [PATCH 2/2] Derive debug rather than manually impl --- src/macros.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/macros.rs b/src/macros.rs index 3e23edf..6679ae8 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -590,17 +590,11 @@ macro_rules! service { /// The client stub that makes RPC calls to the server. Exposes a blocking interface. #[allow(unused)] - #[derive(Clone)] + #[derive(Clone, Debug)] pub struct SyncClient { inner: tarpc_service_SyncClient__, } - impl ::std::fmt::Debug for SyncClient { - fn fmt(&self, formatter: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { - write!(formatter, "SyncClient {{ inner: {:?}, .. }}", self.inner) - } - } - impl $crate::client::sync::ClientExt for SyncClient { fn connect(addr_: A, options_: $crate::client::Options) -> ::std::io::Result where A: ::std::net::ToSocketAddrs,