diff --git a/README.md b/README.md index de47c22..da525aa 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ fn main() { tx.send(handle.addr()).unwrap(); handle.run(); }); - let mut client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap(); + let client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); } ``` diff --git a/examples/readme_errors.rs b/examples/readme_errors.rs index c823732..dc3bf19 100644 --- a/examples/readme_errors.rs +++ b/examples/readme_errors.rs @@ -59,7 +59,7 @@ fn main() { tx.send(handle.addr()).unwrap(); handle.run(); }); - let mut client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap(); + let client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); println!("{}", client.hello("".to_string()).unwrap_err()); } diff --git a/examples/readme_sync.rs b/examples/readme_sync.rs index 5ded1a4..44b3591 100644 --- a/examples/readme_sync.rs +++ b/examples/readme_sync.rs @@ -38,6 +38,6 @@ fn main() { tx.send(handle.addr()).unwrap(); handle.run(); }); - let mut client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap(); + let client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); } diff --git a/examples/sync_server_calling_server.rs b/examples/sync_server_calling_server.rs new file mode 100644 index 0000000..6441b1f --- /dev/null +++ b/examples/sync_server_calling_server.rs @@ -0,0 +1,95 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the MIT License, . +// This file may not be copied, modified, or distributed except according to those terms. + +#![feature(plugin)] +#![plugin(tarpc_plugins)] + +extern crate env_logger; +#[macro_use] +extern crate tarpc; +extern crate futures; +extern crate tokio_core; + +use add::{SyncService as AddSyncService, SyncServiceExt as AddExt}; +use double::{SyncService as DoubleSyncService, SyncServiceExt as DoubleExt}; +use std::sync::mpsc; +use std::thread; +use tarpc::{client, server}; +use tarpc::client::sync::ClientExt as Fc; +use tarpc::util::{FirstSocketAddr, Message, Never}; + +pub mod add { + service! { + /// Add two ints together. + rpc add(x: i32, y: i32) -> i32; + } +} + +pub mod double { + use tarpc::util::Message; + + service! { + /// 2 * x + rpc double(x: i32) -> i32 | Message; + } +} + +#[derive(Clone)] +struct AddServer; + +impl AddSyncService for AddServer { + fn add(&self, x: i32, y: i32) -> Result { + Ok(x + y) + } +} + +#[derive(Clone)] +struct DoubleServer { + client: add::SyncClient, +} + +impl DoubleServer { + fn new(client: add::SyncClient) -> Self { + DoubleServer { client: client } + } +} + +impl DoubleSyncService for DoubleServer { + fn double(&self, x: i32) -> Result { + self.client + .add(x, x) + .map_err(|e| e.to_string().into()) + } +} + +fn main() { + let _ = env_logger::init(); + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + let handle = AddServer.listen("localhost:0".first_socket_addr(), + server::Options::default()).unwrap(); + tx.send(handle.addr()).unwrap(); + handle.run(); + }); + + + let add = rx.recv().unwrap(); + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + let add_client = add::SyncClient::connect(add, client::Options::default()).unwrap(); + let handle = DoubleServer::new(add_client) + .listen("localhost:0".first_socket_addr(), server::Options::default()) + .unwrap(); + tx.send(handle.addr()).unwrap(); + handle.run(); + }); + let double = rx.recv().unwrap(); + + let double_client = double::SyncClient::connect(double, client::Options::default()).unwrap(); + for i in 0..5 { + let doubled = double_client.double(i).unwrap(); + println!("{:?}", doubled); + } +} diff --git a/examples/throughput.rs b/examples/throughput.rs index 421bc53..fd5e26f 100644 --- a/examples/throughput.rs +++ b/examples/throughput.rs @@ -66,7 +66,7 @@ fn bench_tarpc(target: u64) { tx.send(addr).unwrap(); reactor.run(server).unwrap(); }); - let mut client = SyncClient::connect(rx.recv().unwrap().addr(), client::Options::default()) + let client = SyncClient::connect(rx.recv().unwrap().addr(), client::Options::default()) .unwrap(); let start = time::Instant::now(); let mut nread = 0; diff --git a/examples/two_clients.rs b/examples/two_clients.rs index 55dd0ba..df0eb11 100644 --- a/examples/two_clients.rs +++ b/examples/two_clients.rs @@ -62,7 +62,7 @@ macro_rules! pos { fn main() { let _ = env_logger::init(); - let mut bar_client = { + let bar_client = { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let mut reactor = reactor::Core::new().unwrap(); @@ -77,7 +77,7 @@ fn main() { bar::SyncClient::connect(handle.addr(), client::Options::default()).unwrap() }; - let mut baz_client = { + let baz_client = { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let mut reactor = reactor::Core::new().unwrap(); diff --git a/src/client.rs b/src/client.rs index 00543c0..b0e9fd3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -264,32 +264,33 @@ pub mod future { /// Exposes a trait for connecting synchronously to servers. pub mod sync { + use futures::{self, Future, Stream}; use super::Options; - use super::Reactor; use super::future::{Client as FutureClient, ClientExt as FutureClientExt}; use serde::{Deserialize, Serialize}; use std::fmt; use std::io; use std::net::ToSocketAddrs; + use std::sync::mpsc; + use std::thread; use tokio_core::reactor; use tokio_service::Service; use util::FirstSocketAddr; #[doc(hidden)] - pub struct Client - where Req: Serialize + 'static, - Resp: Deserialize + 'static, - E: Deserialize + 'static - { - inner: FutureClient, - reactor: reactor::Core, + pub struct Client { + request: futures::sync::mpsc::UnboundedSender<(Req, mpsc::Sender>>)>, } - impl fmt::Debug for Client - where Req: Serialize + 'static, - Resp: Deserialize + 'static, - E: Deserialize + 'static - { + impl Clone for Client { + fn clone(&self) -> Self { + Client { + request: self.request.clone(), + } + } + } + + impl fmt::Debug for Client { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { write!(f, "Client {{ .. }}") } @@ -301,8 +302,10 @@ pub mod sync { E: Deserialize + Sync + Send + 'static { /// Drives an RPC call for the given request. - pub fn call(&mut self, request: Req) -> Result> { - self.reactor.run(self.inner.call(request)) + pub fn call(&self, request: Req) -> Result> { + let (tx, rx) = mpsc::channel(); + self.request.send((request, tx)).unwrap(); + rx.recv().unwrap() } } @@ -317,16 +320,55 @@ pub mod sync { Resp: Deserialize + Sync + Send + 'static, E: Deserialize + Sync + Send + 'static { - fn connect(addr: A, mut options: Options) -> io::Result + fn connect(addr: A, _options: Options) -> io::Result where A: ToSocketAddrs { - let mut reactor = reactor::Core::new()?; let addr = addr.try_first_socket_addr()?; - options.reactor = Some(Reactor::Handle(reactor.handle())); - Ok(Client { - inner: reactor.run(FutureClient::connect(addr, options))?, - reactor: reactor, - }) + let (connect_tx, connect_rx) = mpsc::channel(); + let (request, request_rx) = futures::sync::mpsc::unbounded(); + #[cfg(feature = "tls")] + let tls_ctx = _options.tls_ctx; + thread::spawn(move || { + let mut reactor = match reactor::Core::new() { + Ok(reactor) => reactor, + Err(e) => { + connect_tx.send(Err(e)).unwrap(); + return; + } + }; + let options; + #[cfg(feature = "tls")] + { + let mut opts = Options::default().handle(reactor.handle()); + opts.tls_ctx = tls_ctx; + options = opts; + } + #[cfg(not(feature = "tls"))] + { + options = Options::default().handle(reactor.handle()); + } + let client = match reactor.run(FutureClient::connect(addr, options)) { + Ok(client) => { + connect_tx.send(Ok(())).unwrap(); + client + } + Err(e) => { + connect_tx.send(Err(e)).unwrap(); + return; + } + }; + let handle = reactor.handle(); + let requests = request_rx.for_each(|(request, response_tx): (_, mpsc::Sender<_>)| { + handle.spawn(client.call(request) + .then(move |response| { + Ok(response_tx.send(response).unwrap()) + })); + Ok(()) + }); + reactor.run(requests).unwrap(); + }); + connect_rx.recv().unwrap()?; + Ok(Client { request }) } } } diff --git a/src/macros.rs b/src/macros.rs index 6a07784..6679ae8 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -588,18 +588,13 @@ macro_rules! service { impl FutureServiceExt for A where A: FutureService {} impl SyncServiceExt for S where S: SyncService {} - #[allow(unused)] /// The client stub that makes RPC calls to the server. Exposes a blocking interface. + #[allow(unused)] + #[derive(Clone, Debug)] pub struct SyncClient { inner: tarpc_service_SyncClient__, } - impl ::std::fmt::Debug for SyncClient { - fn fmt(&self, formatter: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { - write!(formatter, "SyncClient {{ inner: {:?}, .. }}", self.inner) - } - } - impl $crate::client::sync::ClientExt for SyncClient { fn connect(addr_: A, options_: $crate::client::Options) -> ::std::io::Result where A: ::std::net::ToSocketAddrs, @@ -616,7 +611,7 @@ macro_rules! service { $( #[allow(unused)] $(#[$attr])* - pub fn $fn_name(&mut self, $($arg: $in_),*) + pub fn $fn_name(&self, $($arg: $in_),*) -> ::std::result::Result<$out, $crate::Error<$error>> { return then__(self.inner.call(tarpc_service_Request__::$fn_name(($($arg,)*)))); @@ -934,7 +929,7 @@ mod functional_test { -> io::Result<(server::future::Handle, reactor::Core, Listen)> where S: FutureServiceExt { - let mut reactor = reactor::Core::new()?; + let reactor = reactor::Core::new()?; let server_options = get_tls_server_options(); let (handle, server) = server.listen("localhost:0".first_socket_addr(), &reactor.handle(), @@ -1056,7 +1051,7 @@ mod functional_test { #[test] fn simple() { let _ = env_logger::init(); - let (_, mut client, _) = unwrap!(start_server_with_sync_client::(Server)); assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap()); @@ -1067,7 +1062,7 @@ mod functional_test { use futures::Future; let _ = env_logger::init(); - let (addr, mut client, shutdown) = + let (addr, client, shutdown) = unwrap!(start_server_with_sync_client::(Server)); assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap()); @@ -1078,7 +1073,7 @@ mod functional_test { let (tx2, rx2) = ::std::sync::mpsc::channel(); let shutdown2 = shutdown.clone(); ::std::thread::spawn(move || { - let mut client = get_sync_client::(addr).unwrap(); + let client = get_sync_client::(addr).unwrap(); tx.send(()).unwrap(); let add = client.add(3, 2).unwrap(); drop(client); @@ -1098,7 +1093,7 @@ mod functional_test { #[test] fn no_shutdown() { let _ = env_logger::init(); - let (addr, mut client, shutdown) = + let (addr, client, shutdown) = unwrap!(start_server_with_sync_client::(Server)); assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap()); @@ -1114,7 +1109,7 @@ mod functional_test { #[test] fn other_service() { let _ = env_logger::init(); - let (_, mut client, _) = + let (_, client, _) = unwrap!(start_server_with_sync_client::(Server)); match client.foo().err().expect("failed unwrap") {