diff --git a/README.md b/README.md index 3c7c5af..6d02014 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,7 @@ extern crate futures; #[macro_use] extern crate tarpc; +use tarpc::{client, server}; use tarpc::client::sync::Connect; use tarpc::util::Never; @@ -69,8 +70,8 @@ impl SyncService for HelloServer { fn main() { let addr = "localhost:10000"; - HelloServer.listen(addr).unwrap(); - let client = SyncClient::connect(addr).unwrap(); + HelloServer.listen(addr, server::Options::default()).unwrap(); + let client = SyncClient::connect(addr, client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); } ``` @@ -99,7 +100,8 @@ extern crate tarpc; extern crate tokio_core; use futures::Future; -use tarpc::client::future::{Connect, Options}; +use tarpc::{client, server}; +use tarpc::client::future::Connect; use tarpc::util::{FirstSocketAddr, Never}; use tokio_core::reactor; @@ -121,8 +123,8 @@ impl FutureService for HelloServer { fn main() { let addr = "localhost:10000".first_socket_addr(); let mut core = reactor::Core::new().unwrap(); - HelloServer.listen_with(addr, core.handle()).unwrap(); - let options = Options::default().handle(core.handle()); + HelloServer.listen(addr, server::Options::default().handle(core.handle())).wait().unwrap(); + let options = client::Options::default().handle(core.handle()); core.run(FutureClient::connect(addr, options) .map_err(tarpc::Error::from) .and_then(|client| client.hello("Mom".to_string())) diff --git a/examples/concurrency.rs b/examples/concurrency.rs index e4a37bf..c721fe8 100644 --- a/examples/concurrency.rs +++ b/examples/concurrency.rs @@ -24,7 +24,8 @@ use std::{cmp, thread}; use std::sync::{Arc, mpsc}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::{Duration, Instant}; -use tarpc::client::future::{Connect, Options}; +use tarpc::{client, server}; +use tarpc::client::future::Connect; use tarpc::util::{FirstSocketAddr, Never}; use tokio_core::reactor; @@ -166,7 +167,11 @@ fn main() { .map(Result::unwrap) .unwrap_or(4); - let addr = Server::new().listen("localhost:0".first_socket_addr()).wait().unwrap(); + let addr = Server::new() + .listen("localhost:0".first_socket_addr(), + server::Options::default()) + .wait() + .unwrap(); info!("Server listening on {}.", addr); let clients = (0..num_clients) @@ -174,7 +179,7 @@ fn main() { .map(|i| (i, spawn_core())) .map(|(i, remote)| { info!("Client {} connecting...", i); - FutureClient::connect(addr, Options::default().remote(remote)) + FutureClient::connect(addr, client::Options::default().remote(remote)) .map_err(|e| panic!(e)) }) // Need an intermediate collection to connect the clients in parallel, diff --git a/examples/pubsub.rs b/examples/pubsub.rs index 34a0a09..394793b 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -20,7 +20,8 @@ use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; use subscriber::FutureServiceExt as SubscriberExt; -use tarpc::client::future::{Connect as Fc, Options}; +use tarpc::{client, server}; +use tarpc::client::future::Connect as Fc; use tarpc::client::sync::Connect as Sc; use tarpc::util::{FirstSocketAddr, Message, Never}; @@ -58,7 +59,8 @@ impl subscriber::FutureService for Subscriber { impl Subscriber { fn new(id: u32) -> SocketAddr { Subscriber { id: id } - .listen("localhost:0".first_socket_addr()) + .listen("localhost:0".first_socket_addr(), + server::Options::default()) .wait() .unwrap() } @@ -94,7 +96,7 @@ impl publisher::FutureService for Publisher { fn subscribe(&self, id: u32, address: SocketAddr) -> Self::SubscribeFut { let clients = self.clients.clone(); - Box::new(subscriber::FutureClient::connect(address, Options::default()) + Box::new(subscriber::FutureClient::connect(address, client::Options::default()) .map(move |subscriber| { println!("Subscribing {}.", id); clients.lock().unwrap().insert(id, subscriber); @@ -115,11 +117,13 @@ impl publisher::FutureService for Publisher { fn main() { let _ = env_logger::init(); let publisher_addr = Publisher::new() - .listen("localhost:0".first_socket_addr()) + .listen("localhost:0".first_socket_addr(), + server::Options::default()) .wait() .unwrap(); - let publisher_client = publisher::SyncClient::connect(publisher_addr).unwrap(); + let publisher_client = + publisher::SyncClient::connect(publisher_addr, client::Options::default()).unwrap(); let subscriber1 = Subscriber::new(0); publisher_client.subscribe(0, subscriber1).unwrap(); diff --git a/examples/readme_errors.rs b/examples/readme_errors.rs index 67d4958..c3f0a9c 100644 --- a/examples/readme_errors.rs +++ b/examples/readme_errors.rs @@ -14,6 +14,7 @@ extern crate serde_derive; use std::error::Error; use std::fmt; +use tarpc::{client, server}; use tarpc::client::sync::Connect; service! { @@ -49,8 +50,8 @@ impl SyncService for HelloServer { } fn main() { - let addr = HelloServer.listen("localhost:10000").unwrap(); - let client = SyncClient::connect(addr).unwrap(); + let addr = HelloServer.listen("localhost:10000", server::Options::default()).unwrap(); + let client = SyncClient::connect(addr, client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); println!("{}", client.hello("".to_string()).unwrap_err()); } diff --git a/examples/readme_futures.rs b/examples/readme_futures.rs index c538416..7bf66d2 100644 --- a/examples/readme_futures.rs +++ b/examples/readme_futures.rs @@ -12,7 +12,8 @@ extern crate tarpc; extern crate tokio_core; use futures::Future; -use tarpc::client::future::{Connect, Options}; +use tarpc::{client, server}; +use tarpc::client::future::Connect; use tarpc::util::{FirstSocketAddr, Never}; use tokio_core::reactor; @@ -34,8 +35,8 @@ impl FutureService for HelloServer { fn main() { let addr = "localhost:10000".first_socket_addr(); let mut core = reactor::Core::new().unwrap(); - HelloServer.listen_with(addr, core.handle()).unwrap(); - let options = Options::default().handle(core.handle()); + HelloServer.listen(addr, server::Options::default().handle(core.handle())).wait().unwrap(); + let options = client::Options::default().handle(core.handle()); core.run(FutureClient::connect(addr, options) .map_err(tarpc::Error::from) .and_then(|client| client.hello("Mom".to_string())) diff --git a/examples/readme_sync.rs b/examples/readme_sync.rs index 4d42d29..80055d4 100644 --- a/examples/readme_sync.rs +++ b/examples/readme_sync.rs @@ -11,6 +11,7 @@ extern crate futures; #[macro_use] extern crate tarpc; +use tarpc::{client, server}; use tarpc::client::sync::Connect; use tarpc::util::Never; @@ -29,7 +30,7 @@ impl SyncService for HelloServer { fn main() { let addr = "localhost:10000"; - HelloServer.listen(addr).unwrap(); - let client = SyncClient::connect(addr).unwrap(); + HelloServer.listen(addr, server::Options::default()).unwrap(); + let client = SyncClient::connect(addr, client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); } diff --git a/examples/server_calling_server.rs b/examples/server_calling_server.rs index 180793b..a1ac41c 100644 --- a/examples/server_calling_server.rs +++ b/examples/server_calling_server.rs @@ -15,7 +15,8 @@ use add::{FutureService as AddFutureService, FutureServiceExt as AddExt}; use double::{FutureService as DoubleFutureService, FutureServiceExt as DoubleExt}; use futures::{BoxFuture, Future}; use std::sync::{Arc, Mutex}; -use tarpc::client::future::{Connect as Fc, Options}; +use tarpc::{client, server}; +use tarpc::client::future::Connect as Fc; use tarpc::client::sync::Connect as Sc; use tarpc::util::{FirstSocketAddr, Message, Never}; @@ -72,13 +73,21 @@ impl DoubleFutureService for DoubleServer { fn main() { let _ = env_logger::init(); - let add_addr = AddServer.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let add_client = add::FutureClient::connect(add_addr, Options::default()).wait().unwrap(); + let add_addr = AddServer.listen("localhost:0".first_socket_addr(), + server::Options::default()) + .wait() + .unwrap(); + let add_client = + add::FutureClient::connect(add_addr, client::Options::default()).wait().unwrap(); let double = DoubleServer::new(add_client); - let double_addr = double.listen("localhost:0".first_socket_addr()).wait().unwrap(); + let double_addr = double.listen("localhost:0".first_socket_addr(), + server::Options::default()) + .wait() + .unwrap(); - let double_client = double::SyncClient::connect(&double_addr).unwrap(); + let double_client = double::SyncClient::connect(double_addr, client::Options::default()) + .unwrap(); for i in 0..5 { println!("{:?}", double_client.double(i).unwrap()); } diff --git a/examples/throughput.rs b/examples/throughput.rs index 6afff01..f24b8d7 100644 --- a/examples/throughput.rs +++ b/examples/throughput.rs @@ -19,6 +19,7 @@ use std::net; use std::sync::Arc; use std::thread; use std::time; +use tarpc::{client, server}; use tarpc::client::sync::Connect; use tarpc::util::{FirstSocketAddr, Never}; @@ -52,8 +53,11 @@ impl FutureService for Server { const CHUNK_SIZE: u32 = 1 << 19; fn bench_tarpc(target: u64) { - let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let client = SyncClient::connect(&addr).unwrap(); + let addr = Server.listen("localhost:0".first_socket_addr(), + server::Options::default()) + .wait() + .unwrap(); + let client = SyncClient::connect(addr, client::Options::default()).unwrap(); let start = time::Instant::now(); let mut nread = 0; while nread < target { diff --git a/examples/two_clients.rs b/examples/two_clients.rs index be405b1..0e56ce4 100644 --- a/examples/two_clients.rs +++ b/examples/two_clients.rs @@ -17,6 +17,7 @@ extern crate futures; use bar::FutureServiceExt as BarExt; use baz::FutureServiceExt as BazExt; use futures::Future; +use tarpc::{client, server}; use tarpc::client::sync::Connect; use tarpc::util::{FirstSocketAddr, Never}; @@ -58,11 +59,17 @@ macro_rules! pos { fn main() { let _ = env_logger::init(); - let bar_addr = Bar.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let baz_addr = Baz.listen("localhost:0".first_socket_addr()).wait().unwrap(); + let bar_addr = Bar.listen("localhost:0".first_socket_addr(), + server::Options::default()) + .wait() + .unwrap(); + let baz_addr = Baz.listen("localhost:0".first_socket_addr(), + server::Options::default()) + .wait() + .unwrap(); - let bar_client = bar::SyncClient::connect(&bar_addr).unwrap(); - let baz_client = baz::SyncClient::connect(&baz_addr).unwrap(); + let bar_client = bar::SyncClient::connect(bar_addr, client::Options::default()).unwrap(); + let baz_client = baz::SyncClient::connect(baz_addr, client::Options::default()).unwrap(); info!("Result: {:?}", bar_client.bar(17)); diff --git a/src/client.rs b/src/client.rs index 1c80360..0187c72 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,7 +3,7 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use WireError; +use {Reactor, WireError}; use bincode::serde::DeserializeError; use futures::{self, Future}; use protocol::Proto; @@ -11,6 +11,7 @@ use serde::{Deserialize, Serialize}; use std::fmt; use std::io; use tokio_core::net::TcpStream; +use tokio_core::reactor; use tokio_proto::BindClient as ProtoBindClient; use tokio_proto::multiplex::Multiplex; use tokio_service::Service; @@ -89,46 +90,40 @@ impl fmt::Debug for Client } } +/// Additional options to configure how the client connects. +#[derive(Clone, Default)] +pub struct Options { + reactor: Option, +} + +impl Options { + /// Connect using the given reactor handle. + pub fn handle(mut self, handle: reactor::Handle) -> Self { + self.reactor = Some(Reactor::Handle(handle)); + self + } + + /// Connect using the given reactor remote. + pub fn remote(mut self, remote: reactor::Remote) -> Self { + self.reactor = Some(Reactor::Remote(remote)); + self + } +} + /// Exposes a trait for connecting asynchronously to servers. pub mod future { - use REMOTE; + use {REMOTE, Reactor}; use futures::{self, Async, Future, future}; use protocol::Proto; use serde::{Deserialize, Serialize}; use std::io; use std::marker::PhantomData; use std::net::SocketAddr; - use super::Client; + use super::{Client, Options}; use tokio_core::{self, reactor}; use tokio_core::net::TcpStream; use tokio_proto::BindClient; - /// Additional options to configure how the client connects. - #[derive(Clone, Default)] - pub struct Options { - reactor: Option, - } - - impl Options { - /// Connect using the given reactor handle. - pub fn handle(mut self, handle: reactor::Handle) -> Self { - self.reactor = Some(Reactor::Handle(handle)); - self - } - - /// Connect using the given reactor remote. - pub fn remote(mut self, remote: reactor::Remote) -> Self { - self.reactor = Some(Reactor::Remote(remote)); - self - } - } - - #[derive(Clone)] - enum Reactor { - Handle(reactor::Handle), - Remote(reactor::Remote), - } - /// Types that can connect to a server asynchronously. pub trait Connect: Sized { /// The type of the future returned when calling `connect`. @@ -240,12 +235,12 @@ pub mod sync { use serde::{Deserialize, Serialize}; use std::io; use std::net::ToSocketAddrs; - use super::Client; + use super::{Client, Options}; /// Types that can connect to a server synchronously. pub trait Connect: Sized { /// Connects to a server located at the given address. - fn connect(addr: A) -> Result where A: ToSocketAddrs; + fn connect(addr: A, options: Options) -> Result where A: ToSocketAddrs; } impl Connect for Client @@ -253,7 +248,7 @@ pub mod sync { Resp: Deserialize + Sync + Send + 'static, E: Deserialize + Sync + Send + 'static { - fn connect(addr: A) -> Result + fn connect(addr: A, options: Options) -> Result where A: ToSocketAddrs { let addr = if let Some(a) = addr.to_socket_addrs()?.next() { @@ -263,8 +258,7 @@ pub mod sync { "`ToSocketAddrs::to_socket_addrs` returned an empty \ iterator.")); }; - ::connect(addr, super::future::Options::default()) - .wait() + ::connect(addr, options).wait() } } } diff --git a/src/lib.rs b/src/lib.rs index 0b4678b..13f2176 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,6 +34,7 @@ //! #[macro_use] //! extern crate tarpc; //! +//! use tarpc::{client, server}; //! use tarpc::client::sync::Connect; //! use tarpc::util::Never; //! @@ -52,8 +53,8 @@ //! //! fn main() { //! let addr = "localhost:10000"; -//! let _server = HelloServer.listen(addr); -//! let client = SyncClient::connect(addr).unwrap(); +//! let _server = HelloServer.listen(addr, server::Options::default()); +//! let client = SyncClient::connect(addr, client::Options::default()).unwrap(); //! println!("{}", client.hello("Mom".to_string()).unwrap()); //! } //! ``` @@ -94,8 +95,6 @@ pub use client::future::ConnectFuture; pub use errors::Error; #[doc(hidden)] pub use errors::WireError; -#[doc(hidden)] -pub use server::{ListenFuture, Response, listen, listen_with}; /// Provides some utility error types, as well as a trait for spawning futures on the default event /// loop. @@ -107,7 +106,7 @@ mod macros; /// Provides the base client stubs used by the service macro. pub mod client; /// Provides the base server boilerplate used by service implementations. -mod server; +pub mod server; /// Provides implementations of `ClientProto` and `ServerProto` that implement the tarpc protocol. /// The tarpc protocol is a length-delimited, bincode-serialized payload. mod protocol; @@ -138,3 +137,9 @@ fn spawn_core() -> reactor::Remote { }); rx.recv().unwrap() } + +#[derive(Clone)] +enum Reactor { + Handle(reactor::Handle), + Remote(reactor::Remote), +} diff --git a/src/macros.rs b/src/macros.rs index ac3af6d..fff5c44 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -327,26 +327,17 @@ macro_rules! service { /// Provides a function for starting the service. This is a separate trait from /// `FutureService` to prevent collisions with the names of RPCs. pub trait FutureServiceExt: FutureService { - fn listen(self, addr: ::std::net::SocketAddr) -> $crate::ListenFuture - { - let (tx, rx) = $crate::futures::oneshot(); - $crate::REMOTE.spawn(move |handle| - Ok(tx.complete(Self::listen_with(self, - addr, - handle.clone())))); - $crate::ListenFuture::from_oneshot(rx) - } - /// Spawns the service, binding to the given address and running on /// the default tokio `Loop`. - fn listen_with(self, - addr: ::std::net::SocketAddr, - handle: $crate::tokio_core::reactor::Handle) - -> ::std::io::Result<::std::net::SocketAddr> + fn listen(self, + addr: ::std::net::SocketAddr, + options: $crate::server::Options) + -> $crate::server::ListenFuture { - return $crate::listen_with(addr, + return $crate::server::listen( move || Ok(__tarpc_service_AsyncServer(self.clone())), - handle); + addr, + options); #[allow(non_camel_case_types)] #[derive(Clone)] @@ -360,7 +351,7 @@ macro_rules! service { #[allow(non_camel_case_types)] type __tarpc_service_Future = - $crate::futures::Finished<$crate::Response<__tarpc_service_Response, + $crate::futures::Finished<$crate::server::Response<__tarpc_service_Response, __tarpc_service_Error>, ::std::io::Error>; @@ -375,7 +366,8 @@ macro_rules! service { } impl $crate::futures::Future for __tarpc_service_FutureReply { - type Item = $crate::Response<__tarpc_service_Response, __tarpc_service_Error>; + type Item = $crate::server::Response<__tarpc_service_Response, + __tarpc_service_Error>; type Error = ::std::io::Error; @@ -405,7 +397,7 @@ macro_rules! service { { type Request = ::std::result::Result<__tarpc_service_Request, $crate::bincode::serde::DeserializeError>; - type Response = $crate::Response<__tarpc_service_Response, + type Response = $crate::server::Response<__tarpc_service_Response, __tarpc_service_Error>; type Error = ::std::io::Error; type Future = __tarpc_service_FutureReply<__tarpc_service_S>; @@ -470,31 +462,19 @@ macro_rules! service { /// Provides a function for starting the service. This is a separate trait from /// `SyncService` to prevent collisions with the names of RPCs. pub trait SyncServiceExt: SyncService { - fn listen(self, addr: L) - -> ::std::io::Result<::std::net::SocketAddr> - where L: ::std::net::ToSocketAddrs - { - let addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?; - let (tx, rx) = $crate::futures::oneshot(); - $crate::REMOTE.spawn(move |handle| { - Ok(tx.complete(Self::listen_with(self, addr, handle.clone()))) - }); - $crate::futures::Future::wait($crate::ListenFuture::from_oneshot(rx)) - } - /// Spawns the service, binding to the given address and running on /// the default tokio `Loop`. - fn listen_with(self, addr: L, handle: $crate::tokio_core::reactor::Handle) + fn listen(self, addr: L, options: $crate::server::Options) -> ::std::io::Result<::std::net::SocketAddr> where L: ::std::net::ToSocketAddrs { let __tarpc_service_service = __SyncServer { service: self, }; - return FutureServiceExt::listen_with( + return $crate::futures::Future::wait(FutureServiceExt::listen( __tarpc_service_service, $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?, - handle); + options)); #[derive(Clone)] struct __SyncServer { @@ -550,12 +530,13 @@ macro_rules! service { pub struct SyncClient(FutureClient); impl $crate::client::sync::Connect for SyncClient { - fn connect(addr: A) -> ::std::result::Result + fn connect(addr: A, options: $crate::client::Options) + -> ::std::result::Result where A: ::std::net::ToSocketAddrs, { let addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?; let client = ::connect( - addr, $crate::client::future::Options::default()); + addr, options); let client = $crate::futures::Future::wait(client)?; let client = SyncClient(client); ::std::result::Result::Ok(client) @@ -608,7 +589,7 @@ macro_rules! service { type ConnectFut = __tarpc_service_ConnectFuture; fn connect(__tarpc_service_addr: ::std::net::SocketAddr, - __tarpc_service_options: $crate::client::future::Options) + __tarpc_service_options: $crate::client::Options) -> Self::ConnectFut { let client = <__tarpc_service_Client as $crate::client::future::Connect>::connect( @@ -717,6 +698,7 @@ mod functional_test { } mod sync { + use {client, server}; use client::sync::Connect; use super::{SyncClient, SyncService, SyncServiceExt}; use super::env_logger; @@ -738,8 +720,10 @@ mod functional_test { #[test] fn simple() { let _ = env_logger::init(); - let addr = Server.listen("localhost:0".first_socket_addr()).unwrap(); - let client = SyncClient::connect(addr).unwrap(); + let addr = Server.listen("localhost:0".first_socket_addr(), + server::Options::default()) + .unwrap(); + let client = SyncClient::connect(addr, client::Options::default()).unwrap(); assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap()); } @@ -747,8 +731,11 @@ mod functional_test { #[test] fn other_service() { let _ = env_logger::init(); - let addr = Server.listen("localhost:0".first_socket_addr()).unwrap(); - let client = super::other_service::SyncClient::connect(addr) + let addr = Server.listen("localhost:0".first_socket_addr(), + server::Options::default()) + .unwrap(); + let client = super::other_service::SyncClient::connect(addr, + client::Options::default()) .expect("Could not connect!"); match client.foo().err().unwrap() { ::Error::ServerDeserialize(_) => {} // good @@ -758,7 +745,8 @@ mod functional_test { } mod future { - use client::future::{Connect, Options}; + use {client, server}; + use client::future::Connect; use futures::{Finished, Future, finished}; use super::{FutureClient, FutureService, FutureServiceExt}; use super::env_logger; @@ -785,8 +773,11 @@ mod functional_test { #[test] fn simple() { let _ = env_logger::init(); - let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let client = FutureClient::connect(addr, Options::default()).wait().unwrap(); + let addr = Server.listen("localhost:0".first_socket_addr(), + server::Options::default()) + .wait() + .unwrap(); + let client = FutureClient::connect(addr, client::Options::default()).wait().unwrap(); assert_eq!(3, client.add(1, 2).wait().unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap()); } @@ -794,8 +785,11 @@ mod functional_test { #[test] fn concurrent() { let _ = env_logger::init(); - let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let client = FutureClient::connect(addr, Options::default()).wait().unwrap(); + let addr = Server.listen("localhost:0".first_socket_addr(), + server::Options::default()) + .wait() + .unwrap(); + let client = FutureClient::connect(addr, client::Options::default()).wait().unwrap(); let req1 = client.add(1, 2); let req2 = client.add(3, 4); let req3 = client.hey("Tim".to_string()); @@ -807,8 +801,12 @@ mod functional_test { #[test] fn other_service() { let _ = env_logger::init(); - let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let client = super::other_service::FutureClient::connect(addr, Options::default()) + let addr = Server.listen("localhost:0".first_socket_addr(), + server::Options::default()) + .wait() + .unwrap(); + let client = super::other_service::FutureClient::connect(addr, + client::Options::default()) .wait() .unwrap(); match client.foo().wait().err().unwrap() { @@ -838,14 +836,18 @@ mod functional_test { #[test] fn error() { - use client::future::{Connect as Fc, Options}; + use {client, server}; + use client::future::Connect as Fc; use client::sync::Connect as Sc; use std::error::Error as E; use self::error_service::*; let _ = env_logger::init(); - let addr = ErrorServer.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let client = FutureClient::connect(addr, Options::default()).wait().unwrap(); + let addr = ErrorServer.listen("localhost:0".first_socket_addr(), + server::Options::default()) + .wait() + .unwrap(); + let client = FutureClient::connect(addr, client::Options::default()).wait().unwrap(); client.bar() .then(move |result| { match result.err().unwrap() { @@ -859,7 +861,7 @@ mod functional_test { .wait() .unwrap(); - let client = SyncClient::connect(&addr).unwrap(); + let client = SyncClient::connect(&addr, client::Options::default()).unwrap(); match client.bar().err().unwrap() { ::Error::App(e) => { assert_eq!(e.description(), "lol jk"); diff --git a/src/server.rs b/src/server.rs index 10b3c83..d0a7975 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,25 +3,46 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use REMOTE; +use {REMOTE, Reactor}; use bincode::serde::DeserializeError; use errors::WireError; -use futures::{self, Async, Future, Stream}; +use futures::{self, Async, Future, Stream, future}; use net2; use protocol::Proto; use serde::{Deserialize, Serialize}; use std::io; use std::net::SocketAddr; use tokio_core::net::TcpListener; -use tokio_core::reactor::Handle; +use tokio_core::reactor::{self, Handle}; use tokio_proto::BindServer; use tokio_service::NewService; +/// Additional options to configure how the server starts up. +#[derive(Clone, Default)] +pub struct Options { + reactor: Option, +} + +impl Options { + /// Listen using the given reactor handle. + pub fn handle(mut self, handle: reactor::Handle) -> Self { + self.reactor = Some(Reactor::Handle(handle)); + self + } + + /// Listen using the given reactor remote. + pub fn remote(mut self, remote: reactor::Remote) -> Self { + self.reactor = Some(Reactor::Remote(remote)); + self + } +} + /// A message from server to client. +#[doc(hidden)] pub type Response = Result>; -/// Spawns a service that binds to the given address and runs on the default reactor core. -pub fn listen(addr: SocketAddr, new_service: S) -> ListenFuture +#[doc(hidden)] +pub fn listen(new_service: S, addr: SocketAddr, options: Options) -> ListenFuture where S: NewService, Response = Response, Error = io::Error> + Send + 'static, @@ -29,14 +50,32 @@ pub fn listen(addr: SocketAddr, new_service: S) -> ListenFuture Resp: Serialize + 'static, E: Serialize + 'static { - let (tx, rx) = futures::oneshot(); - REMOTE.spawn(move |handle| Ok(tx.complete(listen_with(addr, new_service, handle.clone())))); - ListenFuture { inner: rx } + match options.reactor { + None => { + let (tx, rx) = futures::oneshot(); + REMOTE.spawn(move |handle| { + Ok(tx.complete(listen_with(new_service, addr, handle.clone()))) + }); + ListenFuture { inner: future::Either::A(rx) } + } + Some(Reactor::Remote(remote)) => { + let (tx, rx) = futures::oneshot(); + remote.spawn(move |handle| { + Ok(tx.complete(listen_with(new_service, addr, handle.clone()))) + }); + ListenFuture { inner: future::Either::A(rx) } + } + Some(Reactor::Handle(handle)) => { + ListenFuture { + inner: future::Either::B(future::ok(listen_with(new_service, addr, handle))), + } + } + } } - /// Spawns a service that binds to the given address using the given handle. -pub fn listen_with(addr: SocketAddr, - new_service: S, +#[doc(hidden)] +pub fn listen_with(new_service: S, + addr: SocketAddr, handle: Handle) -> io::Result where S: NewService, @@ -76,14 +115,16 @@ fn listener(addr: &SocketAddr, handle: &Handle) -> io::Result { } /// A future that resolves to a `ServerHandle`. +#[doc(hidden)] pub struct ListenFuture { - inner: futures::Oneshot>, + inner: future::Either>, + future::FutureResult, futures::Canceled>>, } impl ListenFuture { #[doc(hidden)] pub fn from_oneshot(rx: futures::Oneshot>) -> Self { - ListenFuture { inner: rx } + ListenFuture { inner: future::Either::A(rx) } } }