Add Options to all connect and listen fns

This commit is contained in:
Tim Kuehn
2017-01-12 00:06:17 -08:00
parent d34ca2acda
commit 95c57a4b2d
13 changed files with 211 additions and 135 deletions

View File

@@ -51,6 +51,7 @@ extern crate futures;
#[macro_use]
extern crate tarpc;
use tarpc::{client, server};
use tarpc::client::sync::Connect;
use tarpc::util::Never;
@@ -69,8 +70,8 @@ impl SyncService for HelloServer {
fn main() {
let addr = "localhost:10000";
HelloServer.listen(addr).unwrap();
let client = SyncClient::connect(addr).unwrap();
HelloServer.listen(addr, server::Options::default()).unwrap();
let client = SyncClient::connect(addr, client::Options::default()).unwrap();
println!("{}", client.hello("Mom".to_string()).unwrap());
}
```
@@ -99,7 +100,8 @@ extern crate tarpc;
extern crate tokio_core;
use futures::Future;
use tarpc::client::future::{Connect, Options};
use tarpc::{client, server};
use tarpc::client::future::Connect;
use tarpc::util::{FirstSocketAddr, Never};
use tokio_core::reactor;
@@ -121,8 +123,8 @@ impl FutureService for HelloServer {
fn main() {
let addr = "localhost:10000".first_socket_addr();
let mut core = reactor::Core::new().unwrap();
HelloServer.listen_with(addr, core.handle()).unwrap();
let options = Options::default().handle(core.handle());
HelloServer.listen(addr, server::Options::default().handle(core.handle())).wait().unwrap();
let options = client::Options::default().handle(core.handle());
core.run(FutureClient::connect(addr, options)
.map_err(tarpc::Error::from)
.and_then(|client| client.hello("Mom".to_string()))

View File

@@ -24,7 +24,8 @@ use std::{cmp, thread};
use std::sync::{Arc, mpsc};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use tarpc::client::future::{Connect, Options};
use tarpc::{client, server};
use tarpc::client::future::Connect;
use tarpc::util::{FirstSocketAddr, Never};
use tokio_core::reactor;
@@ -166,7 +167,11 @@ fn main() {
.map(Result::unwrap)
.unwrap_or(4);
let addr = Server::new().listen("localhost:0".first_socket_addr()).wait().unwrap();
let addr = Server::new()
.listen("localhost:0".first_socket_addr(),
server::Options::default())
.wait()
.unwrap();
info!("Server listening on {}.", addr);
let clients = (0..num_clients)
@@ -174,7 +179,7 @@ fn main() {
.map(|i| (i, spawn_core()))
.map(|(i, remote)| {
info!("Client {} connecting...", i);
FutureClient::connect(addr, Options::default().remote(remote))
FutureClient::connect(addr, client::Options::default().remote(remote))
.map_err(|e| panic!(e))
})
// Need an intermediate collection to connect the clients in parallel,

View File

@@ -20,7 +20,8 @@ use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use subscriber::FutureServiceExt as SubscriberExt;
use tarpc::client::future::{Connect as Fc, Options};
use tarpc::{client, server};
use tarpc::client::future::Connect as Fc;
use tarpc::client::sync::Connect as Sc;
use tarpc::util::{FirstSocketAddr, Message, Never};
@@ -58,7 +59,8 @@ impl subscriber::FutureService for Subscriber {
impl Subscriber {
fn new(id: u32) -> SocketAddr {
Subscriber { id: id }
.listen("localhost:0".first_socket_addr())
.listen("localhost:0".first_socket_addr(),
server::Options::default())
.wait()
.unwrap()
}
@@ -94,7 +96,7 @@ impl publisher::FutureService for Publisher {
fn subscribe(&self, id: u32, address: SocketAddr) -> Self::SubscribeFut {
let clients = self.clients.clone();
Box::new(subscriber::FutureClient::connect(address, Options::default())
Box::new(subscriber::FutureClient::connect(address, client::Options::default())
.map(move |subscriber| {
println!("Subscribing {}.", id);
clients.lock().unwrap().insert(id, subscriber);
@@ -115,11 +117,13 @@ impl publisher::FutureService for Publisher {
fn main() {
let _ = env_logger::init();
let publisher_addr = Publisher::new()
.listen("localhost:0".first_socket_addr())
.listen("localhost:0".first_socket_addr(),
server::Options::default())
.wait()
.unwrap();
let publisher_client = publisher::SyncClient::connect(publisher_addr).unwrap();
let publisher_client =
publisher::SyncClient::connect(publisher_addr, client::Options::default()).unwrap();
let subscriber1 = Subscriber::new(0);
publisher_client.subscribe(0, subscriber1).unwrap();

View File

@@ -14,6 +14,7 @@ extern crate serde_derive;
use std::error::Error;
use std::fmt;
use tarpc::{client, server};
use tarpc::client::sync::Connect;
service! {
@@ -49,8 +50,8 @@ impl SyncService for HelloServer {
}
fn main() {
let addr = HelloServer.listen("localhost:10000").unwrap();
let client = SyncClient::connect(addr).unwrap();
let addr = HelloServer.listen("localhost:10000", server::Options::default()).unwrap();
let client = SyncClient::connect(addr, client::Options::default()).unwrap();
println!("{}", client.hello("Mom".to_string()).unwrap());
println!("{}", client.hello("".to_string()).unwrap_err());
}

View File

@@ -12,7 +12,8 @@ extern crate tarpc;
extern crate tokio_core;
use futures::Future;
use tarpc::client::future::{Connect, Options};
use tarpc::{client, server};
use tarpc::client::future::Connect;
use tarpc::util::{FirstSocketAddr, Never};
use tokio_core::reactor;
@@ -34,8 +35,8 @@ impl FutureService for HelloServer {
fn main() {
let addr = "localhost:10000".first_socket_addr();
let mut core = reactor::Core::new().unwrap();
HelloServer.listen_with(addr, core.handle()).unwrap();
let options = Options::default().handle(core.handle());
HelloServer.listen(addr, server::Options::default().handle(core.handle())).wait().unwrap();
let options = client::Options::default().handle(core.handle());
core.run(FutureClient::connect(addr, options)
.map_err(tarpc::Error::from)
.and_then(|client| client.hello("Mom".to_string()))

View File

@@ -11,6 +11,7 @@ extern crate futures;
#[macro_use]
extern crate tarpc;
use tarpc::{client, server};
use tarpc::client::sync::Connect;
use tarpc::util::Never;
@@ -29,7 +30,7 @@ impl SyncService for HelloServer {
fn main() {
let addr = "localhost:10000";
HelloServer.listen(addr).unwrap();
let client = SyncClient::connect(addr).unwrap();
HelloServer.listen(addr, server::Options::default()).unwrap();
let client = SyncClient::connect(addr, client::Options::default()).unwrap();
println!("{}", client.hello("Mom".to_string()).unwrap());
}

View File

@@ -15,7 +15,8 @@ use add::{FutureService as AddFutureService, FutureServiceExt as AddExt};
use double::{FutureService as DoubleFutureService, FutureServiceExt as DoubleExt};
use futures::{BoxFuture, Future};
use std::sync::{Arc, Mutex};
use tarpc::client::future::{Connect as Fc, Options};
use tarpc::{client, server};
use tarpc::client::future::Connect as Fc;
use tarpc::client::sync::Connect as Sc;
use tarpc::util::{FirstSocketAddr, Message, Never};
@@ -72,13 +73,21 @@ impl DoubleFutureService for DoubleServer {
fn main() {
let _ = env_logger::init();
let add_addr = AddServer.listen("localhost:0".first_socket_addr()).wait().unwrap();
let add_client = add::FutureClient::connect(add_addr, Options::default()).wait().unwrap();
let add_addr = AddServer.listen("localhost:0".first_socket_addr(),
server::Options::default())
.wait()
.unwrap();
let add_client =
add::FutureClient::connect(add_addr, client::Options::default()).wait().unwrap();
let double = DoubleServer::new(add_client);
let double_addr = double.listen("localhost:0".first_socket_addr()).wait().unwrap();
let double_addr = double.listen("localhost:0".first_socket_addr(),
server::Options::default())
.wait()
.unwrap();
let double_client = double::SyncClient::connect(&double_addr).unwrap();
let double_client = double::SyncClient::connect(double_addr, client::Options::default())
.unwrap();
for i in 0..5 {
println!("{:?}", double_client.double(i).unwrap());
}

View File

@@ -19,6 +19,7 @@ use std::net;
use std::sync::Arc;
use std::thread;
use std::time;
use tarpc::{client, server};
use tarpc::client::sync::Connect;
use tarpc::util::{FirstSocketAddr, Never};
@@ -52,8 +53,11 @@ impl FutureService for Server {
const CHUNK_SIZE: u32 = 1 << 19;
fn bench_tarpc(target: u64) {
let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
let client = SyncClient::connect(&addr).unwrap();
let addr = Server.listen("localhost:0".first_socket_addr(),
server::Options::default())
.wait()
.unwrap();
let client = SyncClient::connect(addr, client::Options::default()).unwrap();
let start = time::Instant::now();
let mut nread = 0;
while nread < target {

View File

@@ -17,6 +17,7 @@ extern crate futures;
use bar::FutureServiceExt as BarExt;
use baz::FutureServiceExt as BazExt;
use futures::Future;
use tarpc::{client, server};
use tarpc::client::sync::Connect;
use tarpc::util::{FirstSocketAddr, Never};
@@ -58,11 +59,17 @@ macro_rules! pos {
fn main() {
let _ = env_logger::init();
let bar_addr = Bar.listen("localhost:0".first_socket_addr()).wait().unwrap();
let baz_addr = Baz.listen("localhost:0".first_socket_addr()).wait().unwrap();
let bar_addr = Bar.listen("localhost:0".first_socket_addr(),
server::Options::default())
.wait()
.unwrap();
let baz_addr = Baz.listen("localhost:0".first_socket_addr(),
server::Options::default())
.wait()
.unwrap();
let bar_client = bar::SyncClient::connect(&bar_addr).unwrap();
let baz_client = baz::SyncClient::connect(&baz_addr).unwrap();
let bar_client = bar::SyncClient::connect(bar_addr, client::Options::default()).unwrap();
let baz_client = baz::SyncClient::connect(baz_addr, client::Options::default()).unwrap();
info!("Result: {:?}", bar_client.bar(17));

View File

@@ -3,7 +3,7 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms.
use WireError;
use {Reactor, WireError};
use bincode::serde::DeserializeError;
use futures::{self, Future};
use protocol::Proto;
@@ -11,6 +11,7 @@ use serde::{Deserialize, Serialize};
use std::fmt;
use std::io;
use tokio_core::net::TcpStream;
use tokio_core::reactor;
use tokio_proto::BindClient as ProtoBindClient;
use tokio_proto::multiplex::Multiplex;
use tokio_service::Service;
@@ -89,46 +90,40 @@ impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E>
}
}
/// Additional options to configure how the client connects.
#[derive(Clone, Default)]
pub struct Options {
reactor: Option<Reactor>,
}
impl Options {
/// Connect using the given reactor handle.
pub fn handle(mut self, handle: reactor::Handle) -> Self {
self.reactor = Some(Reactor::Handle(handle));
self
}
/// Connect using the given reactor remote.
pub fn remote(mut self, remote: reactor::Remote) -> Self {
self.reactor = Some(Reactor::Remote(remote));
self
}
}
/// Exposes a trait for connecting asynchronously to servers.
pub mod future {
use REMOTE;
use {REMOTE, Reactor};
use futures::{self, Async, Future, future};
use protocol::Proto;
use serde::{Deserialize, Serialize};
use std::io;
use std::marker::PhantomData;
use std::net::SocketAddr;
use super::Client;
use super::{Client, Options};
use tokio_core::{self, reactor};
use tokio_core::net::TcpStream;
use tokio_proto::BindClient;
/// Additional options to configure how the client connects.
#[derive(Clone, Default)]
pub struct Options {
reactor: Option<Reactor>,
}
impl Options {
/// Connect using the given reactor handle.
pub fn handle(mut self, handle: reactor::Handle) -> Self {
self.reactor = Some(Reactor::Handle(handle));
self
}
/// Connect using the given reactor remote.
pub fn remote(mut self, remote: reactor::Remote) -> Self {
self.reactor = Some(Reactor::Remote(remote));
self
}
}
#[derive(Clone)]
enum Reactor {
Handle(reactor::Handle),
Remote(reactor::Remote),
}
/// Types that can connect to a server asynchronously.
pub trait Connect: Sized {
/// The type of the future returned when calling `connect`.
@@ -240,12 +235,12 @@ pub mod sync {
use serde::{Deserialize, Serialize};
use std::io;
use std::net::ToSocketAddrs;
use super::Client;
use super::{Client, Options};
/// Types that can connect to a server synchronously.
pub trait Connect: Sized {
/// Connects to a server located at the given address.
fn connect<A>(addr: A) -> Result<Self, io::Error> where A: ToSocketAddrs;
fn connect<A>(addr: A, options: Options) -> Result<Self, io::Error> where A: ToSocketAddrs;
}
impl<Req, Resp, E> Connect for Client<Req, Resp, E>
@@ -253,7 +248,7 @@ pub mod sync {
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
fn connect<A>(addr: A) -> Result<Self, io::Error>
fn connect<A>(addr: A, options: Options) -> Result<Self, io::Error>
where A: ToSocketAddrs
{
let addr = if let Some(a) = addr.to_socket_addrs()?.next() {
@@ -263,8 +258,7 @@ pub mod sync {
"`ToSocketAddrs::to_socket_addrs` returned an empty \
iterator."));
};
<Self as super::future::Connect>::connect(addr, super::future::Options::default())
.wait()
<Self as super::future::Connect>::connect(addr, options).wait()
}
}
}

View File

@@ -34,6 +34,7 @@
//! #[macro_use]
//! extern crate tarpc;
//!
//! use tarpc::{client, server};
//! use tarpc::client::sync::Connect;
//! use tarpc::util::Never;
//!
@@ -52,8 +53,8 @@
//!
//! fn main() {
//! let addr = "localhost:10000";
//! let _server = HelloServer.listen(addr);
//! let client = SyncClient::connect(addr).unwrap();
//! let _server = HelloServer.listen(addr, server::Options::default());
//! let client = SyncClient::connect(addr, client::Options::default()).unwrap();
//! println!("{}", client.hello("Mom".to_string()).unwrap());
//! }
//! ```
@@ -94,8 +95,6 @@ pub use client::future::ConnectFuture;
pub use errors::Error;
#[doc(hidden)]
pub use errors::WireError;
#[doc(hidden)]
pub use server::{ListenFuture, Response, listen, listen_with};
/// Provides some utility error types, as well as a trait for spawning futures on the default event
/// loop.
@@ -107,7 +106,7 @@ mod macros;
/// Provides the base client stubs used by the service macro.
pub mod client;
/// Provides the base server boilerplate used by service implementations.
mod server;
pub mod server;
/// Provides implementations of `ClientProto` and `ServerProto` that implement the tarpc protocol.
/// The tarpc protocol is a length-delimited, bincode-serialized payload.
mod protocol;
@@ -138,3 +137,9 @@ fn spawn_core() -> reactor::Remote {
});
rx.recv().unwrap()
}
#[derive(Clone)]
enum Reactor {
Handle(reactor::Handle),
Remote(reactor::Remote),
}

View File

@@ -327,26 +327,17 @@ macro_rules! service {
/// Provides a function for starting the service. This is a separate trait from
/// `FutureService` to prevent collisions with the names of RPCs.
pub trait FutureServiceExt: FutureService {
fn listen(self, addr: ::std::net::SocketAddr) -> $crate::ListenFuture
{
let (tx, rx) = $crate::futures::oneshot();
$crate::REMOTE.spawn(move |handle|
Ok(tx.complete(Self::listen_with(self,
addr,
handle.clone()))));
$crate::ListenFuture::from_oneshot(rx)
}
/// Spawns the service, binding to the given address and running on
/// the default tokio `Loop`.
fn listen_with(self,
addr: ::std::net::SocketAddr,
handle: $crate::tokio_core::reactor::Handle)
-> ::std::io::Result<::std::net::SocketAddr>
fn listen(self,
addr: ::std::net::SocketAddr,
options: $crate::server::Options)
-> $crate::server::ListenFuture
{
return $crate::listen_with(addr,
return $crate::server::listen(
move || Ok(__tarpc_service_AsyncServer(self.clone())),
handle);
addr,
options);
#[allow(non_camel_case_types)]
#[derive(Clone)]
@@ -360,7 +351,7 @@ macro_rules! service {
#[allow(non_camel_case_types)]
type __tarpc_service_Future =
$crate::futures::Finished<$crate::Response<__tarpc_service_Response,
$crate::futures::Finished<$crate::server::Response<__tarpc_service_Response,
__tarpc_service_Error>,
::std::io::Error>;
@@ -375,7 +366,8 @@ macro_rules! service {
}
impl<S: FutureService> $crate::futures::Future for __tarpc_service_FutureReply<S> {
type Item = $crate::Response<__tarpc_service_Response, __tarpc_service_Error>;
type Item = $crate::server::Response<__tarpc_service_Response,
__tarpc_service_Error>;
type Error = ::std::io::Error;
@@ -405,7 +397,7 @@ macro_rules! service {
{
type Request = ::std::result::Result<__tarpc_service_Request,
$crate::bincode::serde::DeserializeError>;
type Response = $crate::Response<__tarpc_service_Response,
type Response = $crate::server::Response<__tarpc_service_Response,
__tarpc_service_Error>;
type Error = ::std::io::Error;
type Future = __tarpc_service_FutureReply<__tarpc_service_S>;
@@ -470,31 +462,19 @@ macro_rules! service {
/// Provides a function for starting the service. This is a separate trait from
/// `SyncService` to prevent collisions with the names of RPCs.
pub trait SyncServiceExt: SyncService {
fn listen<L>(self, addr: L)
-> ::std::io::Result<::std::net::SocketAddr>
where L: ::std::net::ToSocketAddrs
{
let addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?;
let (tx, rx) = $crate::futures::oneshot();
$crate::REMOTE.spawn(move |handle| {
Ok(tx.complete(Self::listen_with(self, addr, handle.clone())))
});
$crate::futures::Future::wait($crate::ListenFuture::from_oneshot(rx))
}
/// Spawns the service, binding to the given address and running on
/// the default tokio `Loop`.
fn listen_with<L>(self, addr: L, handle: $crate::tokio_core::reactor::Handle)
fn listen<L>(self, addr: L, options: $crate::server::Options)
-> ::std::io::Result<::std::net::SocketAddr>
where L: ::std::net::ToSocketAddrs
{
let __tarpc_service_service = __SyncServer {
service: self,
};
return FutureServiceExt::listen_with(
return $crate::futures::Future::wait(FutureServiceExt::listen(
__tarpc_service_service,
$crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?,
handle);
options));
#[derive(Clone)]
struct __SyncServer<S> {
@@ -550,12 +530,13 @@ macro_rules! service {
pub struct SyncClient(FutureClient);
impl $crate::client::sync::Connect for SyncClient {
fn connect<A>(addr: A) -> ::std::result::Result<Self, ::std::io::Error>
fn connect<A>(addr: A, options: $crate::client::Options)
-> ::std::result::Result<Self, ::std::io::Error>
where A: ::std::net::ToSocketAddrs,
{
let addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?;
let client = <FutureClient as $crate::client::future::Connect>::connect(
addr, $crate::client::future::Options::default());
addr, options);
let client = $crate::futures::Future::wait(client)?;
let client = SyncClient(client);
::std::result::Result::Ok(client)
@@ -608,7 +589,7 @@ macro_rules! service {
type ConnectFut = __tarpc_service_ConnectFuture<Self>;
fn connect(__tarpc_service_addr: ::std::net::SocketAddr,
__tarpc_service_options: $crate::client::future::Options)
__tarpc_service_options: $crate::client::Options)
-> Self::ConnectFut
{
let client = <__tarpc_service_Client as $crate::client::future::Connect>::connect(
@@ -717,6 +698,7 @@ mod functional_test {
}
mod sync {
use {client, server};
use client::sync::Connect;
use super::{SyncClient, SyncService, SyncServiceExt};
use super::env_logger;
@@ -738,8 +720,10 @@ mod functional_test {
#[test]
fn simple() {
let _ = env_logger::init();
let addr = Server.listen("localhost:0".first_socket_addr()).unwrap();
let client = SyncClient::connect(addr).unwrap();
let addr = Server.listen("localhost:0".first_socket_addr(),
server::Options::default())
.unwrap();
let client = SyncClient::connect(addr, client::Options::default()).unwrap();
assert_eq!(3, client.add(1, 2).unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
}
@@ -747,8 +731,11 @@ mod functional_test {
#[test]
fn other_service() {
let _ = env_logger::init();
let addr = Server.listen("localhost:0".first_socket_addr()).unwrap();
let client = super::other_service::SyncClient::connect(addr)
let addr = Server.listen("localhost:0".first_socket_addr(),
server::Options::default())
.unwrap();
let client = super::other_service::SyncClient::connect(addr,
client::Options::default())
.expect("Could not connect!");
match client.foo().err().unwrap() {
::Error::ServerDeserialize(_) => {} // good
@@ -758,7 +745,8 @@ mod functional_test {
}
mod future {
use client::future::{Connect, Options};
use {client, server};
use client::future::Connect;
use futures::{Finished, Future, finished};
use super::{FutureClient, FutureService, FutureServiceExt};
use super::env_logger;
@@ -785,8 +773,11 @@ mod functional_test {
#[test]
fn simple() {
let _ = env_logger::init();
let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
let client = FutureClient::connect(addr, Options::default()).wait().unwrap();
let addr = Server.listen("localhost:0".first_socket_addr(),
server::Options::default())
.wait()
.unwrap();
let client = FutureClient::connect(addr, client::Options::default()).wait().unwrap();
assert_eq!(3, client.add(1, 2).wait().unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap());
}
@@ -794,8 +785,11 @@ mod functional_test {
#[test]
fn concurrent() {
let _ = env_logger::init();
let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
let client = FutureClient::connect(addr, Options::default()).wait().unwrap();
let addr = Server.listen("localhost:0".first_socket_addr(),
server::Options::default())
.wait()
.unwrap();
let client = FutureClient::connect(addr, client::Options::default()).wait().unwrap();
let req1 = client.add(1, 2);
let req2 = client.add(3, 4);
let req3 = client.hey("Tim".to_string());
@@ -807,8 +801,12 @@ mod functional_test {
#[test]
fn other_service() {
let _ = env_logger::init();
let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
let client = super::other_service::FutureClient::connect(addr, Options::default())
let addr = Server.listen("localhost:0".first_socket_addr(),
server::Options::default())
.wait()
.unwrap();
let client = super::other_service::FutureClient::connect(addr,
client::Options::default())
.wait()
.unwrap();
match client.foo().wait().err().unwrap() {
@@ -838,14 +836,18 @@ mod functional_test {
#[test]
fn error() {
use client::future::{Connect as Fc, Options};
use {client, server};
use client::future::Connect as Fc;
use client::sync::Connect as Sc;
use std::error::Error as E;
use self::error_service::*;
let _ = env_logger::init();
let addr = ErrorServer.listen("localhost:0".first_socket_addr()).wait().unwrap();
let client = FutureClient::connect(addr, Options::default()).wait().unwrap();
let addr = ErrorServer.listen("localhost:0".first_socket_addr(),
server::Options::default())
.wait()
.unwrap();
let client = FutureClient::connect(addr, client::Options::default()).wait().unwrap();
client.bar()
.then(move |result| {
match result.err().unwrap() {
@@ -859,7 +861,7 @@ mod functional_test {
.wait()
.unwrap();
let client = SyncClient::connect(&addr).unwrap();
let client = SyncClient::connect(&addr, client::Options::default()).unwrap();
match client.bar().err().unwrap() {
::Error::App(e) => {
assert_eq!(e.description(), "lol jk");

View File

@@ -3,25 +3,46 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms.
use REMOTE;
use {REMOTE, Reactor};
use bincode::serde::DeserializeError;
use errors::WireError;
use futures::{self, Async, Future, Stream};
use futures::{self, Async, Future, Stream, future};
use net2;
use protocol::Proto;
use serde::{Deserialize, Serialize};
use std::io;
use std::net::SocketAddr;
use tokio_core::net::TcpListener;
use tokio_core::reactor::Handle;
use tokio_core::reactor::{self, Handle};
use tokio_proto::BindServer;
use tokio_service::NewService;
/// Additional options to configure how the server starts up.
#[derive(Clone, Default)]
pub struct Options {
reactor: Option<Reactor>,
}
impl Options {
/// Listen using the given reactor handle.
pub fn handle(mut self, handle: reactor::Handle) -> Self {
self.reactor = Some(Reactor::Handle(handle));
self
}
/// Listen using the given reactor remote.
pub fn remote(mut self, remote: reactor::Remote) -> Self {
self.reactor = Some(Reactor::Remote(remote));
self
}
}
/// A message from server to client.
#[doc(hidden)]
pub type Response<T, E> = Result<T, WireError<E>>;
/// Spawns a service that binds to the given address and runs on the default reactor core.
pub fn listen<S, Req, Resp, E>(addr: SocketAddr, new_service: S) -> ListenFuture
#[doc(hidden)]
pub fn listen<S, Req, Resp, E>(new_service: S, addr: SocketAddr, options: Options) -> ListenFuture
where S: NewService<Request = Result<Req, DeserializeError>,
Response = Response<Resp, E>,
Error = io::Error> + Send + 'static,
@@ -29,14 +50,32 @@ pub fn listen<S, Req, Resp, E>(addr: SocketAddr, new_service: S) -> ListenFuture
Resp: Serialize + 'static,
E: Serialize + 'static
{
let (tx, rx) = futures::oneshot();
REMOTE.spawn(move |handle| Ok(tx.complete(listen_with(addr, new_service, handle.clone()))));
ListenFuture { inner: rx }
match options.reactor {
None => {
let (tx, rx) = futures::oneshot();
REMOTE.spawn(move |handle| {
Ok(tx.complete(listen_with(new_service, addr, handle.clone())))
});
ListenFuture { inner: future::Either::A(rx) }
}
Some(Reactor::Remote(remote)) => {
let (tx, rx) = futures::oneshot();
remote.spawn(move |handle| {
Ok(tx.complete(listen_with(new_service, addr, handle.clone())))
});
ListenFuture { inner: future::Either::A(rx) }
}
Some(Reactor::Handle(handle)) => {
ListenFuture {
inner: future::Either::B(future::ok(listen_with(new_service, addr, handle))),
}
}
}
}
/// Spawns a service that binds to the given address using the given handle.
pub fn listen_with<S, Req, Resp, E>(addr: SocketAddr,
new_service: S,
#[doc(hidden)]
pub fn listen_with<S, Req, Resp, E>(new_service: S,
addr: SocketAddr,
handle: Handle)
-> io::Result<SocketAddr>
where S: NewService<Request = Result<Req, DeserializeError>,
@@ -76,14 +115,16 @@ fn listener(addr: &SocketAddr, handle: &Handle) -> io::Result<TcpListener> {
}
/// A future that resolves to a `ServerHandle`.
#[doc(hidden)]
pub struct ListenFuture {
inner: futures::Oneshot<io::Result<SocketAddr>>,
inner: future::Either<futures::Oneshot<io::Result<SocketAddr>>,
future::FutureResult<io::Result<SocketAddr>, futures::Canceled>>,
}
impl ListenFuture {
#[doc(hidden)]
pub fn from_oneshot(rx: futures::Oneshot<io::Result<SocketAddr>>) -> Self {
ListenFuture { inner: rx }
ListenFuture { inner: future::Either::A(rx) }
}
}