Rework the future Connect trait to only have one method, which takes an Options arg.

This commit is contained in:
Tim Kuehn
2017-01-11 22:26:12 -08:00
parent 568484f14f
commit 05c6be192d
16 changed files with 257 additions and 282 deletions

View File

@@ -14,10 +14,10 @@ extern crate env_logger;
extern crate futures;
use futures::Future;
#[cfg(test)]
use test::Bencher;
use tarpc::sync::Connect;
use tarpc::util::{FirstSocketAddr, Never};
#[cfg(test)]
use test::Bencher;
service! {
rpc ack();

View File

@@ -24,7 +24,7 @@ use std::{cmp, thread};
use std::sync::{Arc, mpsc};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use tarpc::future::{Connect};
use tarpc::client::future::{Connect, Options};
use tarpc::util::{FirstSocketAddr, Never};
use tokio_core::reactor;
@@ -73,10 +73,10 @@ trait Microseconds {
impl Microseconds for Duration {
fn microseconds(&self) -> i64 {
chrono::Duration::from_std(*self)
.unwrap()
.num_microseconds()
.unwrap()
chrono::Duration::from_std(*self)
.unwrap()
.num_microseconds()
.unwrap()
}
}
@@ -101,21 +101,25 @@ fn spawn_core() -> reactor::Remote {
rx.recv().unwrap()
}
fn run_once(clients: Vec<FutureClient>, concurrency: u32) -> impl Future<Item=(), Error=()> + 'static {
fn run_once(clients: Vec<FutureClient>,
concurrency: u32)
-> impl Future<Item = (), Error = ()> + 'static {
let start = Instant::now();
let num_clients = clients.len();
futures::stream::futures_unordered((0..concurrency as usize)
.map(|iteration| (iteration + 1, iteration % num_clients))
.map(|(iteration, client_idx)| {
let client = &clients[client_idx];
let start = Instant::now();
debug!("Client {} reading (iteration {})...", client_idx, iteration);
client.read(CHUNK_SIZE)
.map(move |_| (client_idx, iteration, start))
}))
.map(|iteration| (iteration + 1, iteration % num_clients))
.map(|(iteration, client_idx)| {
let client = &clients[client_idx];
let start = Instant::now();
debug!("Client {} reading (iteration {})...", client_idx, iteration);
client.read(CHUNK_SIZE)
.map(move |_| (client_idx, iteration, start))
}))
.map(|(client_idx, iteration, start)| {
let elapsed = start.elapsed();
debug!("Client {} received reply (iteration {}).", client_idx, iteration);
debug!("Client {} received reply (iteration {}).",
client_idx,
iteration);
elapsed
})
.map_err(|e| panic!(e))
@@ -128,31 +132,31 @@ fn run_once(clients: Vec<FutureClient>, concurrency: u32) -> impl Future<Item=()
})
.map(move |stats| {
info!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs",
stats.count,
stats.sum.microseconds() as f64 / stats.count as f64,
stats.min.unwrap().microseconds(),
stats.max.unwrap().microseconds(),
start.elapsed().microseconds());
stats.count,
stats.sum.microseconds() as f64 / stats.count as f64,
stats.min.unwrap().microseconds(),
stats.max.unwrap().microseconds(),
start.elapsed().microseconds());
})
}
fn main() {
let _ = env_logger::init();
let matches = App::new("Tarpc Concurrency")
.about("Demonstrates making concurrent requests to a tarpc service.")
.arg(Arg::with_name("concurrency")
.short("c")
.long("concurrency")
.value_name("LEVEL")
.help("Sets a custom concurrency level")
.takes_value(true))
.arg(Arg::with_name("clients")
.short("n")
.long("num_clients")
.value_name("AMOUNT")
.help("How many clients to distribute requests between")
.takes_value(true))
.get_matches();
.about("Demonstrates making concurrent requests to a tarpc service.")
.arg(Arg::with_name("concurrency")
.short("c")
.long("concurrency")
.value_name("LEVEL")
.help("Sets a custom concurrency level")
.takes_value(true))
.arg(Arg::with_name("clients")
.short("n")
.long("num_clients")
.value_name("AMOUNT")
.help("How many clients to distribute requests between")
.takes_value(true))
.get_matches();
let concurrency = matches.value_of("concurrency")
.map(&str::parse)
.map(Result::unwrap)
@@ -170,7 +174,7 @@ fn main() {
.map(|i| (i, spawn_core()))
.map(|(i, remote)| {
info!("Client {} connecting...", i);
FutureClient::connect_remotely(&addr, &remote)
FutureClient::connect(addr, Options::default().remote(remote))
.map_err(|e| panic!(e))
})
// Need an intermediate collection to connect the clients in parallel,

View File

@@ -14,14 +14,14 @@ extern crate tokio_proto as tokio;
use futures::{BoxFuture, Future};
use publisher::FutureServiceExt as PublisherExt;
use subscriber::FutureServiceExt as SubscriberExt;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use tarpc::future::Connect as Fc;
use tarpc::sync::Connect as Sc;
use subscriber::FutureServiceExt as SubscriberExt;
use tarpc::client::future::{Connect as Fc, Options};
use tarpc::client::sync::Connect as Sc;
use tarpc::util::{FirstSocketAddr, Message, Never};
pub mod subscriber {
@@ -57,12 +57,10 @@ impl subscriber::FutureService for Subscriber {
impl Subscriber {
fn new(id: u32) -> SocketAddr {
Subscriber {
id: id,
}
.listen("localhost:0".first_socket_addr())
.wait()
.unwrap()
Subscriber { id: id }
.listen("localhost:0".first_socket_addr())
.wait()
.unwrap()
}
}
@@ -88,22 +86,21 @@ impl publisher::FutureService for Publisher {
// Ignore failing subscribers.
.map(move |client| client.receive(message.clone()).then(|_| Ok(())))
.collect::<Vec<_>>())
.map(|_| ())
.boxed()
.map(|_| ())
.boxed()
}
type SubscribeFut = BoxFuture<(), Message>;
type SubscribeFut = Box<Future<Item = (), Error = Message>>;
fn subscribe(&self, id: u32, address: SocketAddr) -> Self::SubscribeFut {
let clients = self.clients.clone();
subscriber::FutureClient::connect(&address)
Box::new(subscriber::FutureClient::connect(address, Options::default())
.map(move |subscriber| {
println!("Subscribing {}.", id);
clients.lock().unwrap().insert(id, subscriber);
()
})
.map_err(|e| e.to_string().into())
.boxed()
.map_err(|e| e.to_string().into()))
}
type UnsubscribeFut = BoxFuture<(), Never>;

View File

@@ -14,7 +14,7 @@ extern crate serde_derive;
use std::error::Error;
use std::fmt;
use tarpc::sync::Connect;
use tarpc::client::sync::Connect;
service! {
rpc hello(name: String) -> String | NoNameGiven;

View File

@@ -22,7 +22,7 @@ use bincode::serde::DeserializeError;
use futures::{Future, IntoFuture};
use std::io;
use std::net::SocketAddr;
use tarpc::future::Connect;
use tarpc::client::future::{Connect, Options};
use tarpc::util::FirstSocketAddr;
use tarpc::util::Never;
use tokio_service::Service;
@@ -31,9 +31,9 @@ use tokio_service::Service;
struct HelloServer;
impl HelloServer {
fn listen(addr: SocketAddr) -> impl Future<Item=SocketAddr, Error=io::Error> {
fn listen(addr: SocketAddr) -> impl Future<Item = SocketAddr, Error = io::Error> {
let (tx, rx) = futures::oneshot();
tarpc::future::REMOTE.spawn(move |handle| {
tarpc::REMOTE.spawn(move |handle| {
Ok(tx.complete(tarpc::listen_with(addr, move || Ok(HelloServer), handle.clone())))
});
rx.map_err(|e| panic!(e)).and_then(|result| result)
@@ -56,13 +56,13 @@ impl Service for HelloServer {
pub struct FutureClient(tarpc::Client<String, String, Never>);
impl FutureClient {
fn connect(addr: &SocketAddr) -> impl Future<Item = FutureClient, Error = io::Error> {
tarpc::Client::connect_remotely(addr, &tarpc::future::REMOTE).map(FutureClient)
fn connect(addr: SocketAddr) -> impl Future<Item = FutureClient, Error = io::Error> {
tarpc::Client::connect(addr, Options::default()).map(FutureClient)
}
pub fn hello(&self, name: String)
-> impl Future<Item = String, Error = tarpc::Error<Never>> + 'static
{
pub fn hello(&self,
name: String)
-> impl Future<Item = String, Error = tarpc::Error<Never>> + 'static {
self.0.call(name).then(|msg| msg.unwrap())
}
}
@@ -71,7 +71,7 @@ fn main() {
let _ = env_logger::init();
let mut core = tokio_core::reactor::Core::new().unwrap();
let addr = HelloServer::listen("localhost:10000".first_socket_addr()).wait().unwrap();
let f = FutureClient::connect(&addr)
let f = FutureClient::connect(addr)
.map_err(tarpc::Error::from)
.and_then(|client| {
let resp1 = client.hello("Mom".to_string());

View File

@@ -12,7 +12,7 @@ extern crate tarpc;
extern crate tokio_core;
use futures::Future;
use tarpc::future::Connect;
use tarpc::client::future::{Connect, Options};
use tarpc::util::{FirstSocketAddr, Never};
use tokio_core::reactor;
@@ -34,11 +34,11 @@ impl FutureService for HelloServer {
fn main() {
let addr = "localhost:10000".first_socket_addr();
let mut core = reactor::Core::new().unwrap();
let handle = core.handle();
HelloServer.listen_with(addr, core.handle()).unwrap();
core.run(
FutureClient::connect(&addr)
core.run(FutureClient::connect(addr, Options::default().handle(handle))
.map_err(tarpc::Error::from)
.and_then(|client| client.hello("Mom".to_string()))
.map(|resp| println!("{}", resp))
).unwrap();
.map(|resp| println!("{}", resp)))
.unwrap();
}

View File

@@ -11,8 +11,8 @@ extern crate futures;
#[macro_use]
extern crate tarpc;
use tarpc::client::sync::Connect;
use tarpc::util::Never;
use tarpc::sync::Connect;
service! {
rpc hello(name: String) -> String;

View File

@@ -15,9 +15,9 @@ use add::{FutureService as AddFutureService, FutureServiceExt as AddExt};
use double::{FutureService as DoubleFutureService, FutureServiceExt as DoubleExt};
use futures::{BoxFuture, Future};
use std::sync::{Arc, Mutex};
use tarpc::client::future::{Connect as Fc, Options};
use tarpc::client::sync::Connect as Sc;
use tarpc::util::{FirstSocketAddr, Message, Never};
use tarpc::future::Connect as Fc;
use tarpc::sync::Connect as Sc;
pub mod add {
service! {
@@ -53,9 +53,7 @@ struct DoubleServer {
impl DoubleServer {
fn new(client: add::FutureClient) -> Self {
DoubleServer {
client: Arc::new(Mutex::new(client)),
}
DoubleServer { client: Arc::new(Mutex::new(client)) }
}
}
@@ -75,7 +73,7 @@ impl DoubleFutureService for DoubleServer {
fn main() {
let _ = env_logger::init();
let add_addr = AddServer.listen("localhost:0".first_socket_addr()).wait().unwrap();
let add_client = add::FutureClient::connect(&add_addr).wait().unwrap();
let add_client = add::FutureClient::connect(add_addr, Options::default()).wait().unwrap();
let double = DoubleServer::new(add_client);
let double_addr = double.listen("localhost:0".first_socket_addr()).wait().unwrap();

View File

@@ -13,13 +13,13 @@ extern crate tarpc;
extern crate env_logger;
extern crate futures;
use std::sync::Arc;
use futures::Future;
use std::io::{Read, Write, stdout};
use std::net;
use std::sync::Arc;
use std::thread;
use std::time;
use std::io::{Read, Write, stdout};
use futures::Future;
use tarpc::sync::Connect;
use tarpc::client::sync::Connect;
use tarpc::util::{FirstSocketAddr, Never};
lazy_static! {

View File

@@ -17,8 +17,8 @@ extern crate futures;
use bar::FutureServiceExt as BarExt;
use baz::FutureServiceExt as BazExt;
use futures::Future;
use tarpc::client::sync::Connect;
use tarpc::util::{FirstSocketAddr, Never};
use tarpc::sync::Connect;
mod bar {
service! {

View File

@@ -25,10 +25,11 @@ type BindClient<Req, Resp, E> =
///
/// Typically, this would be combined with a serialization pre-processing step
/// and a deserialization post-processing step.
#[doc(hidden)]
pub struct Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static,
E: Deserialize + 'static
{
inner: BindClient<Req, Resp, E>,
}
@@ -36,12 +37,10 @@ pub struct Client<Req, Resp, E>
impl<Req, Resp, E> Clone for Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static,
E: Deserialize + 'static
{
fn clone(&self) -> Self {
Client {
inner: self.inner.clone(),
}
Client { inner: self.inner.clone() }
}
}
@@ -63,16 +62,14 @@ impl<Req, Resp, E> Service for Client<Req, Resp, E>
impl<Req, Resp, E> Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static,
E: Deserialize + 'static
{
fn new(inner: BindClient<Req, Resp, E>) -> Self
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
Client {
inner: inner,
}
Client { inner: inner }
}
fn map_err(resp: WireResponse<Resp, E>) -> Result<Resp, ::Error<E>> {
@@ -85,7 +82,7 @@ impl<Req, Resp, E> Client<Req, Resp, E>
impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static,
E: Deserialize + 'static
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "Client {{ .. }}")
@@ -94,73 +91,70 @@ impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E>
/// Exposes a trait for connecting asynchronously to servers.
pub mod future {
use future::REMOTE;
use futures::{self, Async, Future};
use REMOTE;
use futures::{self, Async, Future, future};
use protocol::Proto;
use serde::{Deserialize, Serialize};
use std::io;
use std::marker::PhantomData;
use std::net::SocketAddr;
use super::Client;
use tokio_core::net::TcpStream;
use tokio_core::{self, reactor};
use tokio_core::net::TcpStream;
use tokio_proto::BindClient;
/// Types that can connect to a server asynchronously.
pub trait Connect<'a>: Sized {
/// The type of the future returned when calling `connect`.
type ConnectFut: Future<Item = Self, Error = io::Error> + 'static;
/// Additional options to configure how the client connects.
#[derive(Clone, Default)]
pub struct Options {
reactor: Option<Reactor>,
}
/// The type of the future returned when calling `connect_with`.
type ConnectWithFut: Future<Item = Self, Error = io::Error> + 'a;
/// Connects to a server located at the given address, using a remote to the default
/// reactor.
fn connect(addr: &SocketAddr) -> Self::ConnectFut {
Self::connect_remotely(addr, &REMOTE)
impl Options {
/// Connect using the given reactor handle.
pub fn handle(mut self, handle: reactor::Handle) -> Self {
self.reactor = Some(Reactor::Handle(handle));
self
}
/// Connects to a server located at the given address, using the given reactor remote.
fn connect_remotely(addr: &SocketAddr, remote: &reactor::Remote) -> Self::ConnectFut;
/// Connect using the given reactor remote.
pub fn remote(mut self, remote: reactor::Remote) -> Self {
self.reactor = Some(Reactor::Remote(remote));
self
}
}
/// Connects to a server located at the given address, using the given reactor handle.
fn connect_with(addr: &SocketAddr, handle: &'a reactor::Handle) -> Self::ConnectWithFut;
#[derive(Clone)]
enum Reactor {
Handle(reactor::Handle),
Remote(reactor::Remote),
}
/// Types that can connect to a server asynchronously.
pub trait Connect: Sized {
/// The type of the future returned when calling `connect`.
type ConnectFut: Future<Item = Self, Error = io::Error>;
/// Connects to a server located at the given address, using the given options.
fn connect(addr: SocketAddr, options: Options) -> Self::ConnectFut;
}
/// A future that resolves to a `Client` or an `io::Error`.
#[doc(hidden)]
pub struct ConnectFuture<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static,
{
inner: futures::Oneshot<io::Result<Client<Req, Resp, E>>>,
inner:
future::Either<
futures::Map<tokio_core::net::TcpStreamNew, MultiplexConnect<Req, Resp, E>>,
futures::Flatten<
futures::MapErr<
futures::Oneshot<io::Result<Client<Req, Resp, E>>>,
fn(futures::Canceled) -> io::Error>>>,
}
impl<Req, Resp, E> Future for ConnectFuture<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static,
{
type Item = Client<Req, Resp, E>;
type Error = io::Error;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
// Ok to unwrap because we ensure the oneshot is always completed.
match self.inner.poll().unwrap() {
Async::Ready(Ok(client)) => Ok(Async::Ready(client)),
Async::Ready(Err(err)) => Err(err),
Async::NotReady => Ok(Async::NotReady),
}
}
}
/// A future that resolves to a `Client` or an `io::Error`.
pub struct ConnectWithFuture<'a, Req, Resp, E> {
inner: futures::Map<tokio_core::net::TcpStreamNew,
MultiplexConnect<'a, Req, Resp, E>>,
}
impl<'a, Req, Resp, E> Future for ConnectWithFuture<'a, Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
@@ -169,19 +163,23 @@ pub mod future {
type Error = io::Error;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
self.inner.poll()
// Ok to unwrap because we ensure the oneshot is always completed.
match Future::poll(&mut self.inner)? {
Async::Ready(client) => Ok(Async::Ready(client)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
struct MultiplexConnect<'a, Req, Resp, E>(&'a reactor::Handle, PhantomData<(Req, Resp, E)>);
struct MultiplexConnect<Req, Resp, E>(reactor::Handle, PhantomData<(Req, Resp, E)>);
impl<'a, Req, Resp, E> MultiplexConnect<'a, Req, Resp, E> {
fn new(handle: &'a reactor::Handle) -> Self {
impl<Req, Resp, E> MultiplexConnect<Req, Resp, E> {
fn new(handle: reactor::Handle) -> Self {
MultiplexConnect(handle, PhantomData)
}
}
impl<'a, Req, Resp, E> FnOnce<(TcpStream,)> for MultiplexConnect<'a, Req, Resp, E>
impl<Req, Resp, E> FnOnce<(TcpStream,)> for MultiplexConnect<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
@@ -189,37 +187,49 @@ pub mod future {
type Output = Client<Req, Resp, E>;
extern "rust-call" fn call_once(self, (tcp,): (TcpStream,)) -> Client<Req, Resp, E> {
Client::new(Proto::new().bind_client(self.0, tcp))
Client::new(Proto::new().bind_client(&self.0, tcp))
}
}
impl<'a, Req, Resp, E> Connect<'a> for Client<Req, Resp, E>
impl<Req, Resp, E> Connect for Client<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
type ConnectFut = ConnectFuture<Req, Resp, E>;
type ConnectWithFut = ConnectWithFuture<'a, Req, Resp, E>;
fn connect_remotely(addr: &SocketAddr, remote: &reactor::Remote) -> Self::ConnectFut {
let addr = *addr;
let (tx, rx) = futures::oneshot();
remote.spawn(move |handle| {
let handle2 = handle.clone();
TcpStream::connect(&addr, handle)
.map(move |tcp| Client::new(Proto::new().bind_client(&handle2, tcp)))
.then(move |result| {
tx.complete(result);
Ok(())
})
});
ConnectFuture { inner: rx }
}
fn connect_with(addr: &SocketAddr, handle: &'a reactor::Handle) -> Self::ConnectWithFut {
ConnectWithFuture {
inner: TcpStream::connect(addr, handle).map(MultiplexConnect::new(handle))
fn connect(addr: SocketAddr, options: Options) -> Self::ConnectFut {
let setup = move |tx: futures::sync::oneshot::Sender<_>| {
move |handle: &reactor::Handle| {
let handle2 = handle.clone();
TcpStream::connect(&addr, handle)
.map(move |tcp| Client::new(Proto::new().bind_client(&handle2, tcp)))
.then(move |result| {
tx.complete(result);
Ok(())
})
}
};
let rx = match options.reactor {
Some(Reactor::Handle(handle)) => {
let tcp = TcpStream::connect(&addr, &handle).map(MultiplexConnect::new(handle));
return ConnectFuture { inner: future::Either::A(tcp) };
}
Some(Reactor::Remote(remote)) => {
let (tx, rx) = futures::oneshot();
remote.spawn(setup(tx));
rx
}
None => {
let (tx, rx) = futures::oneshot();
REMOTE.spawn(setup(tx));
rx
}
};
fn panic(canceled: futures::Canceled) -> io::Error {
unreachable!(canceled)
}
ConnectFuture { inner: future::Either::B(rx.map_err(panic as fn(_) -> _).flatten()) }
}
}
}
@@ -253,7 +263,8 @@ pub mod sync {
"`ToSocketAddrs::to_socket_addrs` returned an empty \
iterator."));
};
<Self as super::future::Connect>::connect(&addr).wait()
<Self as super::future::Connect>::connect(addr, super::future::Options::default())
.wait()
}
}
}

View File

@@ -34,7 +34,7 @@
//! #[macro_use]
//! extern crate tarpc;
//!
//! use tarpc::sync::Connect;
//! use tarpc::client::sync::Connect;
//! use tarpc::util::Never;
//!
//! service! {
@@ -59,7 +59,8 @@
//! ```
//!
#![deny(missing_docs)]
#![feature(plugin, conservative_impl_trait, never_type, unboxed_closures, fn_traits, specialization)]
#![feature(plugin, conservative_impl_trait, never_type, unboxed_closures, fn_traits,
specialization)]
#![plugin(tarpc_plugins)]
extern crate byteorder;
@@ -89,7 +90,7 @@ pub extern crate tokio_service;
#[doc(hidden)]
pub use client::Client;
#[doc(hidden)]
pub use client::future::{ConnectFuture, ConnectWithFuture};
pub use client::future::ConnectFuture;
pub use errors::Error;
#[doc(hidden)]
pub use errors::WireError;
@@ -104,7 +105,7 @@ pub mod util;
#[macro_use]
mod macros;
/// Provides the base client stubs used by the service macro.
mod client;
pub mod client;
/// Provides the base server boilerplate used by service implementations.
mod server;
/// Provides implementations of `ClientProto` and `ServerProto` that implement the tarpc protocol.
@@ -113,37 +114,27 @@ mod protocol;
/// Provides a few different error types.
mod errors;
/// Utility specific to synchronous implementation.
pub mod sync {
pub use client::sync::*;
use std::sync::mpsc;
use std::thread;
use tokio_core::reactor;
lazy_static! {
/// The `Remote` for the default reactor core.
#[doc(hidden)]
pub static ref REMOTE: reactor::Remote = {
spawn_core()
};
}
/// Utility specific to futures implementation.
pub mod future {
pub use client::future::*;
use futures;
use tokio_core::reactor;
use std::thread;
use std::sync::mpsc;
lazy_static! {
/// The `Remote` for the default reactor core.
pub static ref REMOTE: reactor::Remote = {
spawn_core()
};
}
/// Spawns a `reactor::Core` running forever on a new thread.
fn spawn_core() -> reactor::Remote {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut core = reactor::Core::new().unwrap();
tx.send(core.handle().remote().clone()).unwrap();
// Run forever
core.run(futures::empty::<(), !>()).unwrap();
});
rx.recv().unwrap()
}
/// Spawns a `reactor::Core` running forever on a new thread.
fn spawn_core() -> reactor::Remote {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut core = reactor::Core::new().unwrap();
tx.send(core.handle().remote().clone()).unwrap();
// Run forever
core.run(futures::empty::<(), !>()).unwrap();
});
rx.recv().unwrap()
}

View File

@@ -330,7 +330,7 @@ macro_rules! service {
fn listen(self, addr: ::std::net::SocketAddr) -> $crate::ListenFuture
{
let (tx, rx) = $crate::futures::oneshot();
$crate::future::REMOTE.spawn(move |handle|
$crate::REMOTE.spawn(move |handle|
Ok(tx.complete(Self::listen_with(self,
addr,
handle.clone()))));
@@ -476,7 +476,9 @@ macro_rules! service {
{
let addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?;
let (tx, rx) = $crate::futures::oneshot();
$crate::future::REMOTE.spawn(move |handle| Ok(tx.complete(Self::listen_with(self, addr, handle.clone()))));
$crate::REMOTE.spawn(move |handle| {
Ok(tx.complete(Self::listen_with(self, addr, handle.clone())))
});
$crate::futures::Future::wait($crate::ListenFuture::from_oneshot(rx))
}
@@ -547,12 +549,13 @@ macro_rules! service {
/// The client stub that makes RPC calls to the server. Exposes a blocking interface.
pub struct SyncClient(FutureClient);
impl $crate::sync::Connect for SyncClient {
impl $crate::client::sync::Connect for SyncClient {
fn connect<A>(addr: A) -> ::std::result::Result<Self, ::std::io::Error>
where A: ::std::net::ToSocketAddrs,
{
let addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?;
let client = <FutureClient as $crate::future::Connect>::connect(&addr);
let client = <FutureClient as $crate::client::future::Connect>::connect(
addr, $crate::client::future::Options::default());
let client = $crate::futures::Future::wait(client)?;
let client = SyncClient(client);
::std::result::Result::Ok(client)
@@ -596,57 +599,25 @@ macro_rules! service {
}
}
#[allow(non_camel_case_types)]
/// Implementation detail: Pending connection.
pub struct __tarpc_service_ConnectWithFuture<'a, T> {
inner: $crate::futures::Map<$crate::ConnectWithFuture<'a,
__tarpc_service_Request,
__tarpc_service_Response,
__tarpc_service_Error>,
fn(__tarpc_service_Client) -> T>,
}
impl<'a, T> $crate::futures::Future for __tarpc_service_ConnectWithFuture<'a, T> {
type Item = T;
type Error = ::std::io::Error;
fn poll(&mut self) -> $crate::futures::Poll<Self::Item, Self::Error> {
$crate::futures::Future::poll(&mut self.inner)
}
}
#[allow(unused)]
#[derive(Clone, Debug)]
/// The client stub that makes RPC calls to the server. Exposes a Future interface.
pub struct FutureClient(__tarpc_service_Client);
impl<'a> $crate::future::Connect<'a> for FutureClient {
impl<'a> $crate::client::future::Connect for FutureClient {
type ConnectFut = __tarpc_service_ConnectFuture<Self>;
type ConnectWithFut = __tarpc_service_ConnectWithFuture<'a, Self>;
fn connect_remotely(__tarpc_service_addr: &::std::net::SocketAddr,
__tarpc_service_remote: &$crate::tokio_core::reactor::Remote)
fn connect(__tarpc_service_addr: ::std::net::SocketAddr,
__tarpc_service_options: $crate::client::future::Options)
-> Self::ConnectFut
{
let client = <__tarpc_service_Client as $crate::future::Connect>::connect_remotely(
__tarpc_service_addr, __tarpc_service_remote);
let client = <__tarpc_service_Client as $crate::client::future::Connect>::connect(
__tarpc_service_addr, __tarpc_service_options);
__tarpc_service_ConnectFuture {
inner: $crate::futures::Future::map(client, FutureClient)
}
}
fn connect_with(__tarpc_service_addr: &::std::net::SocketAddr,
__tarpc_service_handle: &'a $crate::tokio_core::reactor::Handle)
-> Self::ConnectWithFut
{
let client = <__tarpc_service_Client as $crate::future::Connect>::connect_with(
__tarpc_service_addr, __tarpc_service_handle);
__tarpc_service_ConnectWithFuture {
inner: $crate::futures::Future::map(client, FutureClient)
}
}
}
impl FutureClient {
@@ -746,9 +717,9 @@ mod functional_test {
}
mod sync {
use client::sync::Connect;
use super::{SyncClient, SyncService, SyncServiceExt};
use super::env_logger;
use sync::Connect;
use util::FirstSocketAddr;
use util::Never;
@@ -777,7 +748,8 @@ mod functional_test {
fn other_service() {
let _ = env_logger::init();
let addr = Server.listen("localhost:0".first_socket_addr()).unwrap();
let client = super::other_service::SyncClient::connect(addr).expect("Could not connect!");
let client = super::other_service::SyncClient::connect(addr)
.expect("Could not connect!");
match client.foo().err().unwrap() {
::Error::ServerDeserialize(_) => {} // good
bad => panic!("Expected Error::ServerDeserialize but got {}", bad),
@@ -786,7 +758,7 @@ mod functional_test {
}
mod future {
use future::Connect;
use client::future::{Connect, Options};
use futures::{Finished, Future, finished};
use super::{FutureClient, FutureService, FutureServiceExt};
use super::env_logger;
@@ -814,7 +786,7 @@ mod functional_test {
fn simple() {
let _ = env_logger::init();
let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
let client = FutureClient::connect(&addr).wait().unwrap();
let client = FutureClient::connect(addr, Options::default()).wait().unwrap();
assert_eq!(3, client.add(1, 2).wait().unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap());
}
@@ -823,7 +795,7 @@ mod functional_test {
fn concurrent() {
let _ = env_logger::init();
let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
let client = FutureClient::connect(&addr).wait().unwrap();
let client = FutureClient::connect(addr, Options::default()).wait().unwrap();
let req1 = client.add(1, 2);
let req2 = client.add(3, 4);
let req3 = client.hey("Tim".to_string());
@@ -836,8 +808,9 @@ mod functional_test {
fn other_service() {
let _ = env_logger::init();
let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
let client =
super::other_service::FutureClient::connect(&addr).wait().unwrap();
let client = super::other_service::FutureClient::connect(addr, Options::default())
.wait()
.unwrap();
match client.foo().wait().err().unwrap() {
::Error::ServerDeserialize(_) => {} // good
bad => panic!(r#"Expected Error::ServerDeserialize but got "{}""#, bad),
@@ -865,14 +838,14 @@ mod functional_test {
#[test]
fn error() {
use future::Connect as Fc;
use sync::Connect as Sc;
use client::future::{Connect as Fc, Options};
use client::sync::Connect as Sc;
use std::error::Error as E;
use self::error_service::*;
let _ = env_logger::init();
let addr = ErrorServer.listen("localhost:0".first_socket_addr()).wait().unwrap();
let client = FutureClient::connect(&addr).wait().unwrap();
let client = FutureClient::connect(addr, Options::default()).wait().unwrap();
client.bar()
.then(move |result| {
match result.err().unwrap() {

View File

@@ -10,8 +10,8 @@ use std::io::{self, Cursor};
use std::marker::PhantomData;
use std::mem;
use tokio_core::io::{EasyBuf, Framed, Io};
use tokio_proto::streaming::multiplex::RequestId;
use tokio_proto::multiplex::{ClientProto, ServerProto};
use tokio_proto::streaming::multiplex::RequestId;
use util::Debugger;
// `Encode` is the type that `Codec` encodes. `Decode` is the type it decodes.
@@ -37,7 +37,7 @@ impl<Encode, Decode> Codec<Encode, Decode> {
impl<Encode, Decode> tokio_core::io::Codec for Codec<Encode, Decode>
where Encode: serde::Serialize,
Decode: serde::Deserialize,
Decode: serde::Deserialize
{
type Out = (RequestId, Encode);
type In = (RequestId, Result<Decode, bincode::DeserializeError>);
@@ -62,7 +62,7 @@ impl<Encode, Decode> tokio_core::io::Codec for Codec<Encode, Decode>
match self.state {
Id if buf.len() < mem::size_of::<u64>() => {
trace!("--> Buf len is {}; waiting for 8 to parse id.", buf.len());
return Ok(None)
return Ok(None);
}
Id => {
let mut id_buf = buf.drain_to(mem::size_of::<u64>());
@@ -71,22 +71,23 @@ impl<Encode, Decode> tokio_core::io::Codec for Codec<Encode, Decode>
self.state = Len { id: id };
}
Len { .. } if buf.len() < mem::size_of::<u64>() => {
trace!("--> Buf len is {}; waiting for 8 to parse packet length.", buf.len());
return Ok(None)
trace!("--> Buf len is {}; waiting for 8 to parse packet length.",
buf.len());
return Ok(None);
}
Len { id } => {
let len_buf = buf.drain_to(mem::size_of::<u64>());
let len = Cursor::new(len_buf).read_u64::<BigEndian>()?;
trace!("--> Parsed payload length = {}, remaining buffer length = {}",
len, buf.len());
self.state = Payload {
id: id,
len: len,
};
len,
buf.len());
self.state = Payload { id: id, len: len };
}
Payload { len, .. } if buf.len() < len as usize => {
trace!("--> Buf len is {}; waiting for {} to parse payload.", buf.len(), len);
return Ok(None)
trace!("--> Buf len is {}; waiting for {} to parse payload.",
buf.len(),
len);
return Ok(None);
}
Payload { id, len } => {
let payload = buf.drain_to(len as usize);
@@ -117,7 +118,7 @@ impl<Encode, Decode> Proto<Encode, Decode> {
impl<T, Encode, Decode> ServerProto<T> for Proto<Encode, Decode>
where T: Io + 'static,
Encode: serde::Serialize + 'static,
Decode: serde::Deserialize + 'static,
Decode: serde::Deserialize + 'static
{
type Response = Encode;
type Request = Result<Decode, bincode::DeserializeError>;
@@ -132,7 +133,7 @@ impl<T, Encode, Decode> ServerProto<T> for Proto<Encode, Decode>
impl<T, Encode, Decode> ClientProto<T> for Proto<Encode, Decode>
where T: Io + 'static,
Encode: serde::Serialize + 'static,
Decode: serde::Deserialize + 'static,
Decode: serde::Deserialize + 'static
{
type Response = Result<Decode, bincode::DeserializeError>;
type Request = Encode;
@@ -157,8 +158,8 @@ fn serialize() {
let mut codec: Codec<(char, char, char), (char, char, char)> = Codec::new();
codec.encode(MSG, &mut vec).unwrap();
buf.get_mut().append(&mut vec);
let actual: Result<Option<(u64, Result<(char, char, char), bincode::DeserializeError>)>, io::Error> =
codec.decode(&mut buf);
let actual: Result<Option<(u64, Result<(char, char, char), bincode::DeserializeError>)>,
io::Error> = codec.decode(&mut buf);
match actual {
Ok(Some((id, ref v))) if id == MSG.0 && *v.as_ref().unwrap() == MSG.1 => {}

View File

@@ -3,12 +3,12 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms.
use net2;
use REMOTE;
use bincode::serde::DeserializeError;
use errors::WireError;
use future::REMOTE;
use protocol::Proto;
use futures::{self, Async, Future, Stream};
use net2;
use protocol::Proto;
use serde::{Deserialize, Serialize};
use std::io;
use std::net::SocketAddr;
@@ -50,29 +50,29 @@ pub fn listen_with<S, Req, Resp, E>(addr: SocketAddr,
let addr = listener.local_addr()?;
let handle2 = handle.clone();
let server = listener.incoming().for_each(move |(socket, _)| {
Proto::new().bind_server(&handle2, socket, new_service.new_service()?);
let server = listener.incoming()
.for_each(move |(socket, _)| {
Proto::new().bind_server(&handle2, socket, new_service.new_service()?);
Ok(())
}).map_err(|e| error!("While processing incoming connections: {}", e));
Ok(())
})
.map_err(|e| error!("While processing incoming connections: {}", e));
handle.spawn(server);
Ok(addr)
}
fn listener(addr: &SocketAddr,
handle: &Handle) -> io::Result<TcpListener> {
fn listener(addr: &SocketAddr, handle: &Handle) -> io::Result<TcpListener> {
const PENDING_CONNECTION_BACKLOG: i32 = 1024;
match *addr {
SocketAddr::V4(_) => net2::TcpBuilder::new_v4(),
SocketAddr::V6(_) => net2::TcpBuilder::new_v6()
}?
.reuse_address(true)?
.bind(addr)?
.listen(PENDING_CONNECTION_BACKLOG)
.and_then(|l| {
TcpListener::from_listener(l, addr, handle)
})
SocketAddr::V4(_) => net2::TcpBuilder::new_v4(),
SocketAddr::V6(_) => net2::TcpBuilder::new_v6(),
}
?
.reuse_address(true)?
.bind(addr)?
.listen(PENDING_CONNECTION_BACKLOG)
.and_then(|l| TcpListener::from_listener(l, addr, handle))
}
/// A future that resolves to a `ServerHandle`.

View File

@@ -6,8 +6,8 @@
use futures::{Future, Poll};
use futures::stream::Stream;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::error::Error;
use std::{fmt, io};
use std::error::Error;
use std::net::{SocketAddr, ToSocketAddrs};
/// A bottom type that impls `Error`, `Serialize`, and `Deserialize`. It is impossible to
@@ -102,10 +102,10 @@ pub trait FirstSocketAddr: ToSocketAddrs {
/// Returns the first resolved `SocketAddr`, if one exists.
fn try_first_socket_addr(&self) -> io::Result<SocketAddr> {
if let Some(a) = self.to_socket_addrs()?.next() {
Ok(a)
Ok(a)
} else {
Err(io::Error::new(io::ErrorKind::AddrNotAvailable,
"`ToSocketAddrs::to_socket_addrs` returned an empty iterator."))
Err(io::Error::new(io::ErrorKind::AddrNotAvailable,
"`ToSocketAddrs::to_socket_addrs` returned an empty iterator."))
}
}