Merge pull request #121 from tikue/send-client

Make SyncClient Send again (and Clone too!).
This commit is contained in:
Tim
2017-03-06 22:12:33 -08:00
committed by GitHub
8 changed files with 174 additions and 42 deletions

View File

@@ -80,7 +80,7 @@ fn main() {
tx.send(handle.addr()).unwrap();
handle.run();
});
let mut client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap();
let client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap();
println!("{}", client.hello("Mom".to_string()).unwrap());
}
```

View File

@@ -59,7 +59,7 @@ fn main() {
tx.send(handle.addr()).unwrap();
handle.run();
});
let mut client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap();
let client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap();
println!("{}", client.hello("Mom".to_string()).unwrap());
println!("{}", client.hello("".to_string()).unwrap_err());
}

View File

@@ -38,6 +38,6 @@ fn main() {
tx.send(handle.addr()).unwrap();
handle.run();
});
let mut client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap();
let client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap();
println!("{}", client.hello("Mom".to_string()).unwrap());
}

View File

@@ -0,0 +1,95 @@
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms.
#![feature(plugin)]
#![plugin(tarpc_plugins)]
extern crate env_logger;
#[macro_use]
extern crate tarpc;
extern crate futures;
extern crate tokio_core;
use add::{SyncService as AddSyncService, SyncServiceExt as AddExt};
use double::{SyncService as DoubleSyncService, SyncServiceExt as DoubleExt};
use std::sync::mpsc;
use std::thread;
use tarpc::{client, server};
use tarpc::client::sync::ClientExt as Fc;
use tarpc::util::{FirstSocketAddr, Message, Never};
pub mod add {
service! {
/// Add two ints together.
rpc add(x: i32, y: i32) -> i32;
}
}
pub mod double {
use tarpc::util::Message;
service! {
/// 2 * x
rpc double(x: i32) -> i32 | Message;
}
}
#[derive(Clone)]
struct AddServer;
impl AddSyncService for AddServer {
fn add(&self, x: i32, y: i32) -> Result<i32, Never> {
Ok(x + y)
}
}
#[derive(Clone)]
struct DoubleServer {
client: add::SyncClient,
}
impl DoubleServer {
fn new(client: add::SyncClient) -> Self {
DoubleServer { client: client }
}
}
impl DoubleSyncService for DoubleServer {
fn double(&self, x: i32) -> Result<i32, Message> {
self.client
.add(x, x)
.map_err(|e| e.to_string().into())
}
}
fn main() {
let _ = env_logger::init();
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let handle = AddServer.listen("localhost:0".first_socket_addr(),
server::Options::default()).unwrap();
tx.send(handle.addr()).unwrap();
handle.run();
});
let add = rx.recv().unwrap();
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let add_client = add::SyncClient::connect(add, client::Options::default()).unwrap();
let handle = DoubleServer::new(add_client)
.listen("localhost:0".first_socket_addr(), server::Options::default())
.unwrap();
tx.send(handle.addr()).unwrap();
handle.run();
});
let double = rx.recv().unwrap();
let double_client = double::SyncClient::connect(double, client::Options::default()).unwrap();
for i in 0..5 {
let doubled = double_client.double(i).unwrap();
println!("{:?}", doubled);
}
}

View File

@@ -66,7 +66,7 @@ fn bench_tarpc(target: u64) {
tx.send(addr).unwrap();
reactor.run(server).unwrap();
});
let mut client = SyncClient::connect(rx.recv().unwrap().addr(), client::Options::default())
let client = SyncClient::connect(rx.recv().unwrap().addr(), client::Options::default())
.unwrap();
let start = time::Instant::now();
let mut nread = 0;

View File

@@ -62,7 +62,7 @@ macro_rules! pos {
fn main() {
let _ = env_logger::init();
let mut bar_client = {
let bar_client = {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut reactor = reactor::Core::new().unwrap();
@@ -77,7 +77,7 @@ fn main() {
bar::SyncClient::connect(handle.addr(), client::Options::default()).unwrap()
};
let mut baz_client = {
let baz_client = {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut reactor = reactor::Core::new().unwrap();

View File

@@ -264,32 +264,33 @@ pub mod future {
/// Exposes a trait for connecting synchronously to servers.
pub mod sync {
use futures::{self, Future, Stream};
use super::Options;
use super::Reactor;
use super::future::{Client as FutureClient, ClientExt as FutureClientExt};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::io;
use std::net::ToSocketAddrs;
use std::sync::mpsc;
use std::thread;
use tokio_core::reactor;
use tokio_service::Service;
use util::FirstSocketAddr;
#[doc(hidden)]
pub struct Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
inner: FutureClient<Req, Resp, E>,
reactor: reactor::Core,
pub struct Client<Req, Resp, E> {
request: futures::sync::mpsc::UnboundedSender<(Req, mpsc::Sender<Result<Resp, ::Error<E>>>)>,
}
impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
impl<Req, Resp, E> Clone for Client<Req, Resp, E> {
fn clone(&self) -> Self {
Client {
request: self.request.clone(),
}
}
}
impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E> {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "Client {{ .. }}")
}
@@ -301,8 +302,10 @@ pub mod sync {
E: Deserialize + Sync + Send + 'static
{
/// Drives an RPC call for the given request.
pub fn call(&mut self, request: Req) -> Result<Resp, ::Error<E>> {
self.reactor.run(self.inner.call(request))
pub fn call(&self, request: Req) -> Result<Resp, ::Error<E>> {
let (tx, rx) = mpsc::channel();
self.request.send((request, tx)).unwrap();
rx.recv().unwrap()
}
}
@@ -317,16 +320,55 @@ pub mod sync {
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
fn connect<A>(addr: A, mut options: Options) -> io::Result<Self>
fn connect<A>(addr: A, _options: Options) -> io::Result<Self>
where A: ToSocketAddrs
{
let mut reactor = reactor::Core::new()?;
let addr = addr.try_first_socket_addr()?;
options.reactor = Some(Reactor::Handle(reactor.handle()));
Ok(Client {
inner: reactor.run(FutureClient::connect(addr, options))?,
reactor: reactor,
})
let (connect_tx, connect_rx) = mpsc::channel();
let (request, request_rx) = futures::sync::mpsc::unbounded();
#[cfg(feature = "tls")]
let tls_ctx = _options.tls_ctx;
thread::spawn(move || {
let mut reactor = match reactor::Core::new() {
Ok(reactor) => reactor,
Err(e) => {
connect_tx.send(Err(e)).unwrap();
return;
}
};
let options;
#[cfg(feature = "tls")]
{
let mut opts = Options::default().handle(reactor.handle());
opts.tls_ctx = tls_ctx;
options = opts;
}
#[cfg(not(feature = "tls"))]
{
options = Options::default().handle(reactor.handle());
}
let client = match reactor.run(FutureClient::connect(addr, options)) {
Ok(client) => {
connect_tx.send(Ok(())).unwrap();
client
}
Err(e) => {
connect_tx.send(Err(e)).unwrap();
return;
}
};
let handle = reactor.handle();
let requests = request_rx.for_each(|(request, response_tx): (_, mpsc::Sender<_>)| {
handle.spawn(client.call(request)
.then(move |response| {
Ok(response_tx.send(response).unwrap())
}));
Ok(())
});
reactor.run(requests).unwrap();
});
connect_rx.recv().unwrap()?;
Ok(Client { request })
}
}
}

View File

@@ -588,18 +588,13 @@ macro_rules! service {
impl<A> FutureServiceExt for A where A: FutureService {}
impl<S> SyncServiceExt for S where S: SyncService {}
#[allow(unused)]
/// The client stub that makes RPC calls to the server. Exposes a blocking interface.
#[allow(unused)]
#[derive(Clone, Debug)]
pub struct SyncClient {
inner: tarpc_service_SyncClient__,
}
impl ::std::fmt::Debug for SyncClient {
fn fmt(&self, formatter: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
write!(formatter, "SyncClient {{ inner: {:?}, .. }}", self.inner)
}
}
impl $crate::client::sync::ClientExt for SyncClient {
fn connect<A>(addr_: A, options_: $crate::client::Options) -> ::std::io::Result<Self>
where A: ::std::net::ToSocketAddrs,
@@ -616,7 +611,7 @@ macro_rules! service {
$(
#[allow(unused)]
$(#[$attr])*
pub fn $fn_name(&mut self, $($arg: $in_),*)
pub fn $fn_name(&self, $($arg: $in_),*)
-> ::std::result::Result<$out, $crate::Error<$error>>
{
return then__(self.inner.call(tarpc_service_Request__::$fn_name(($($arg,)*))));
@@ -934,7 +929,7 @@ mod functional_test {
-> io::Result<(server::future::Handle, reactor::Core, Listen<S>)>
where S: FutureServiceExt
{
let mut reactor = reactor::Core::new()?;
let reactor = reactor::Core::new()?;
let server_options = get_tls_server_options();
let (handle, server) = server.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
@@ -1056,7 +1051,7 @@ mod functional_test {
#[test]
fn simple() {
let _ = env_logger::init();
let (_, mut client, _) = unwrap!(start_server_with_sync_client::<SyncClient,
let (_, client, _) = unwrap!(start_server_with_sync_client::<SyncClient,
Server>(Server));
assert_eq!(3, client.add(1, 2).unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
@@ -1067,7 +1062,7 @@ mod functional_test {
use futures::Future;
let _ = env_logger::init();
let (addr, mut client, shutdown) =
let (addr, client, shutdown) =
unwrap!(start_server_with_sync_client::<SyncClient, Server>(Server));
assert_eq!(3, client.add(1, 2).unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
@@ -1078,7 +1073,7 @@ mod functional_test {
let (tx2, rx2) = ::std::sync::mpsc::channel();
let shutdown2 = shutdown.clone();
::std::thread::spawn(move || {
let mut client = get_sync_client::<SyncClient>(addr).unwrap();
let client = get_sync_client::<SyncClient>(addr).unwrap();
tx.send(()).unwrap();
let add = client.add(3, 2).unwrap();
drop(client);
@@ -1098,7 +1093,7 @@ mod functional_test {
#[test]
fn no_shutdown() {
let _ = env_logger::init();
let (addr, mut client, shutdown) =
let (addr, client, shutdown) =
unwrap!(start_server_with_sync_client::<SyncClient, Server>(Server));
assert_eq!(3, client.add(1, 2).unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
@@ -1114,7 +1109,7 @@ mod functional_test {
#[test]
fn other_service() {
let _ = env_logger::init();
let (_, mut client, _) =
let (_, client, _) =
unwrap!(start_server_with_sync_client::<super::other_service::SyncClient,
Server>(Server));
match client.foo().err().expect("failed unwrap") {