mirror of
https://github.com/OMGeeky/tarpc.git
synced 2025-12-29 23:55:59 +01:00
Make Client Send again (and Clone too!).
The basic strategy is to start a reactor on a dedicated thread running a request stream. Requests are spawned onto the reactor, allowing multiple requests to be processed concurrently. For example, if you clone the client to make requests from multiple threads, they won't have to wait for each others' requests to complete before theirs start being sent out. Also, client rpcs only take &self now, which was also required for clients to be usable in a service. Also added a test to prevent regressions.
This commit is contained in:
@@ -80,7 +80,7 @@ fn main() {
|
||||
tx.send(handle.addr()).unwrap();
|
||||
handle.run();
|
||||
});
|
||||
let mut client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap();
|
||||
let client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap();
|
||||
println!("{}", client.hello("Mom".to_string()).unwrap());
|
||||
}
|
||||
```
|
||||
|
||||
@@ -59,7 +59,7 @@ fn main() {
|
||||
tx.send(handle.addr()).unwrap();
|
||||
handle.run();
|
||||
});
|
||||
let mut client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap();
|
||||
let client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap();
|
||||
println!("{}", client.hello("Mom".to_string()).unwrap());
|
||||
println!("{}", client.hello("".to_string()).unwrap_err());
|
||||
}
|
||||
|
||||
@@ -38,6 +38,6 @@ fn main() {
|
||||
tx.send(handle.addr()).unwrap();
|
||||
handle.run();
|
||||
});
|
||||
let mut client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap();
|
||||
let client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap();
|
||||
println!("{}", client.hello("Mom".to_string()).unwrap());
|
||||
}
|
||||
|
||||
95
examples/sync_server_calling_server.rs
Normal file
95
examples/sync_server_calling_server.rs
Normal file
@@ -0,0 +1,95 @@
|
||||
// Copyright 2016 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(plugin)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate env_logger;
|
||||
#[macro_use]
|
||||
extern crate tarpc;
|
||||
extern crate futures;
|
||||
extern crate tokio_core;
|
||||
|
||||
use add::{SyncService as AddSyncService, SyncServiceExt as AddExt};
|
||||
use double::{SyncService as DoubleSyncService, SyncServiceExt as DoubleExt};
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
use tarpc::{client, server};
|
||||
use tarpc::client::sync::ClientExt as Fc;
|
||||
use tarpc::util::{FirstSocketAddr, Message, Never};
|
||||
|
||||
pub mod add {
|
||||
service! {
|
||||
/// Add two ints together.
|
||||
rpc add(x: i32, y: i32) -> i32;
|
||||
}
|
||||
}
|
||||
|
||||
pub mod double {
|
||||
use tarpc::util::Message;
|
||||
|
||||
service! {
|
||||
/// 2 * x
|
||||
rpc double(x: i32) -> i32 | Message;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct AddServer;
|
||||
|
||||
impl AddSyncService for AddServer {
|
||||
fn add(&self, x: i32, y: i32) -> Result<i32, Never> {
|
||||
Ok(x + y)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct DoubleServer {
|
||||
client: add::SyncClient,
|
||||
}
|
||||
|
||||
impl DoubleServer {
|
||||
fn new(client: add::SyncClient) -> Self {
|
||||
DoubleServer { client: client }
|
||||
}
|
||||
}
|
||||
|
||||
impl DoubleSyncService for DoubleServer {
|
||||
fn double(&self, x: i32) -> Result<i32, Message> {
|
||||
self.client
|
||||
.add(x, x)
|
||||
.map_err(|e| e.to_string().into())
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let _ = env_logger::init();
|
||||
let (tx, rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let handle = AddServer.listen("localhost:0".first_socket_addr(),
|
||||
server::Options::default()).unwrap();
|
||||
tx.send(handle.addr()).unwrap();
|
||||
handle.run();
|
||||
});
|
||||
|
||||
|
||||
let add = rx.recv().unwrap();
|
||||
let (tx, rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let add_client = add::SyncClient::connect(add, client::Options::default()).unwrap();
|
||||
let handle = DoubleServer::new(add_client)
|
||||
.listen("localhost:0".first_socket_addr(), server::Options::default())
|
||||
.unwrap();
|
||||
tx.send(handle.addr()).unwrap();
|
||||
handle.run();
|
||||
});
|
||||
let double = rx.recv().unwrap();
|
||||
|
||||
let double_client = double::SyncClient::connect(double, client::Options::default()).unwrap();
|
||||
for i in 0..5 {
|
||||
let doubled = double_client.double(i).unwrap();
|
||||
println!("{:?}", doubled);
|
||||
}
|
||||
}
|
||||
@@ -66,7 +66,7 @@ fn bench_tarpc(target: u64) {
|
||||
tx.send(addr).unwrap();
|
||||
reactor.run(server).unwrap();
|
||||
});
|
||||
let mut client = SyncClient::connect(rx.recv().unwrap().addr(), client::Options::default())
|
||||
let client = SyncClient::connect(rx.recv().unwrap().addr(), client::Options::default())
|
||||
.unwrap();
|
||||
let start = time::Instant::now();
|
||||
let mut nread = 0;
|
||||
|
||||
@@ -62,7 +62,7 @@ macro_rules! pos {
|
||||
|
||||
fn main() {
|
||||
let _ = env_logger::init();
|
||||
let mut bar_client = {
|
||||
let bar_client = {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let mut reactor = reactor::Core::new().unwrap();
|
||||
@@ -77,7 +77,7 @@ fn main() {
|
||||
bar::SyncClient::connect(handle.addr(), client::Options::default()).unwrap()
|
||||
};
|
||||
|
||||
let mut baz_client = {
|
||||
let baz_client = {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let mut reactor = reactor::Core::new().unwrap();
|
||||
|
||||
@@ -264,32 +264,33 @@ pub mod future {
|
||||
|
||||
/// Exposes a trait for connecting synchronously to servers.
|
||||
pub mod sync {
|
||||
use futures::{self, Future, Stream};
|
||||
use super::Options;
|
||||
use super::Reactor;
|
||||
use super::future::{Client as FutureClient, ClientExt as FutureClientExt};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::net::ToSocketAddrs;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
use tokio_core::reactor;
|
||||
use tokio_service::Service;
|
||||
use util::FirstSocketAddr;
|
||||
|
||||
#[doc(hidden)]
|
||||
pub struct Client<Req, Resp, E>
|
||||
where Req: Serialize + 'static,
|
||||
Resp: Deserialize + 'static,
|
||||
E: Deserialize + 'static
|
||||
{
|
||||
inner: FutureClient<Req, Resp, E>,
|
||||
reactor: reactor::Core,
|
||||
pub struct Client<Req, Resp, E> {
|
||||
request: futures::sync::mpsc::UnboundedSender<(Req, mpsc::Sender<Result<Resp, ::Error<E>>>)>,
|
||||
}
|
||||
|
||||
impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E>
|
||||
where Req: Serialize + 'static,
|
||||
Resp: Deserialize + 'static,
|
||||
E: Deserialize + 'static
|
||||
{
|
||||
impl<Req, Resp, E> Clone for Client<Req, Resp, E> {
|
||||
fn clone(&self) -> Self {
|
||||
Client {
|
||||
request: self.request.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
write!(f, "Client {{ .. }}")
|
||||
}
|
||||
@@ -301,8 +302,10 @@ pub mod sync {
|
||||
E: Deserialize + Sync + Send + 'static
|
||||
{
|
||||
/// Drives an RPC call for the given request.
|
||||
pub fn call(&mut self, request: Req) -> Result<Resp, ::Error<E>> {
|
||||
self.reactor.run(self.inner.call(request))
|
||||
pub fn call(&self, request: Req) -> Result<Resp, ::Error<E>> {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
self.request.send((request, tx)).unwrap();
|
||||
rx.recv().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -317,16 +320,55 @@ pub mod sync {
|
||||
Resp: Deserialize + Sync + Send + 'static,
|
||||
E: Deserialize + Sync + Send + 'static
|
||||
{
|
||||
fn connect<A>(addr: A, mut options: Options) -> io::Result<Self>
|
||||
fn connect<A>(addr: A, _options: Options) -> io::Result<Self>
|
||||
where A: ToSocketAddrs
|
||||
{
|
||||
let mut reactor = reactor::Core::new()?;
|
||||
let addr = addr.try_first_socket_addr()?;
|
||||
options.reactor = Some(Reactor::Handle(reactor.handle()));
|
||||
Ok(Client {
|
||||
inner: reactor.run(FutureClient::connect(addr, options))?,
|
||||
reactor: reactor,
|
||||
})
|
||||
let (connect_tx, connect_rx) = mpsc::channel();
|
||||
let (request, request_rx) = futures::sync::mpsc::unbounded();
|
||||
#[cfg(feature = "tls")]
|
||||
let tls_ctx = _options.tls_ctx;
|
||||
thread::spawn(move || {
|
||||
let mut reactor = match reactor::Core::new() {
|
||||
Ok(reactor) => reactor,
|
||||
Err(e) => {
|
||||
connect_tx.send(Err(e)).unwrap();
|
||||
return;
|
||||
}
|
||||
};
|
||||
let options;
|
||||
#[cfg(feature = "tls")]
|
||||
{
|
||||
let mut opts = Options::default().handle(reactor.handle());
|
||||
opts.tls_ctx = tls_ctx;
|
||||
options = opts;
|
||||
}
|
||||
#[cfg(not(feature = "tls"))]
|
||||
{
|
||||
options = Options::default().handle(reactor.handle());
|
||||
}
|
||||
let client = match reactor.run(FutureClient::connect(addr, options)) {
|
||||
Ok(client) => {
|
||||
connect_tx.send(Ok(())).unwrap();
|
||||
client
|
||||
}
|
||||
Err(e) => {
|
||||
connect_tx.send(Err(e)).unwrap();
|
||||
return;
|
||||
}
|
||||
};
|
||||
let handle = reactor.handle();
|
||||
let requests = request_rx.for_each(|(request, response_tx): (_, mpsc::Sender<_>)| {
|
||||
handle.spawn(client.call(request)
|
||||
.then(move |response| {
|
||||
Ok(response_tx.send(response).unwrap())
|
||||
}));
|
||||
Ok(())
|
||||
});
|
||||
reactor.run(requests).unwrap();
|
||||
});
|
||||
connect_rx.recv().unwrap()?;
|
||||
Ok(Client { request })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -588,8 +588,9 @@ macro_rules! service {
|
||||
impl<A> FutureServiceExt for A where A: FutureService {}
|
||||
impl<S> SyncServiceExt for S where S: SyncService {}
|
||||
|
||||
#[allow(unused)]
|
||||
/// The client stub that makes RPC calls to the server. Exposes a blocking interface.
|
||||
#[allow(unused)]
|
||||
#[derive(Clone)]
|
||||
pub struct SyncClient {
|
||||
inner: tarpc_service_SyncClient__,
|
||||
}
|
||||
@@ -616,7 +617,7 @@ macro_rules! service {
|
||||
$(
|
||||
#[allow(unused)]
|
||||
$(#[$attr])*
|
||||
pub fn $fn_name(&mut self, $($arg: $in_),*)
|
||||
pub fn $fn_name(&self, $($arg: $in_),*)
|
||||
-> ::std::result::Result<$out, $crate::Error<$error>>
|
||||
{
|
||||
return then__(self.inner.call(tarpc_service_Request__::$fn_name(($($arg,)*))));
|
||||
@@ -934,7 +935,7 @@ mod functional_test {
|
||||
-> io::Result<(server::future::Handle, reactor::Core, Listen<S>)>
|
||||
where S: FutureServiceExt
|
||||
{
|
||||
let mut reactor = reactor::Core::new()?;
|
||||
let reactor = reactor::Core::new()?;
|
||||
let server_options = get_tls_server_options();
|
||||
let (handle, server) = server.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
@@ -1056,7 +1057,7 @@ mod functional_test {
|
||||
#[test]
|
||||
fn simple() {
|
||||
let _ = env_logger::init();
|
||||
let (_, mut client, _) = unwrap!(start_server_with_sync_client::<SyncClient,
|
||||
let (_, client, _) = unwrap!(start_server_with_sync_client::<SyncClient,
|
||||
Server>(Server));
|
||||
assert_eq!(3, client.add(1, 2).unwrap());
|
||||
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
|
||||
@@ -1067,7 +1068,7 @@ mod functional_test {
|
||||
use futures::Future;
|
||||
|
||||
let _ = env_logger::init();
|
||||
let (addr, mut client, shutdown) =
|
||||
let (addr, client, shutdown) =
|
||||
unwrap!(start_server_with_sync_client::<SyncClient, Server>(Server));
|
||||
assert_eq!(3, client.add(1, 2).unwrap());
|
||||
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
|
||||
@@ -1078,7 +1079,7 @@ mod functional_test {
|
||||
let (tx2, rx2) = ::std::sync::mpsc::channel();
|
||||
let shutdown2 = shutdown.clone();
|
||||
::std::thread::spawn(move || {
|
||||
let mut client = get_sync_client::<SyncClient>(addr).unwrap();
|
||||
let client = get_sync_client::<SyncClient>(addr).unwrap();
|
||||
tx.send(()).unwrap();
|
||||
let add = client.add(3, 2).unwrap();
|
||||
drop(client);
|
||||
@@ -1098,7 +1099,7 @@ mod functional_test {
|
||||
#[test]
|
||||
fn no_shutdown() {
|
||||
let _ = env_logger::init();
|
||||
let (addr, mut client, shutdown) =
|
||||
let (addr, client, shutdown) =
|
||||
unwrap!(start_server_with_sync_client::<SyncClient, Server>(Server));
|
||||
assert_eq!(3, client.add(1, 2).unwrap());
|
||||
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
|
||||
@@ -1114,7 +1115,7 @@ mod functional_test {
|
||||
#[test]
|
||||
fn other_service() {
|
||||
let _ = env_logger::init();
|
||||
let (_, mut client, _) =
|
||||
let (_, client, _) =
|
||||
unwrap!(start_server_with_sync_client::<super::other_service::SyncClient,
|
||||
Server>(Server));
|
||||
match client.foo().err().expect("failed unwrap") {
|
||||
|
||||
Reference in New Issue
Block a user