diff --git a/examples/concurrency.rs b/examples/concurrency.rs index 0ab4ff9..841a424 100644 --- a/examples/concurrency.rs +++ b/examples/concurrency.rs @@ -20,7 +20,7 @@ use futures_cpupool::{CpuFuture, CpuPool}; use std::thread; use std::time::{Duration, Instant, SystemTime}; use tarpc::future::{Connect}; -use tarpc::util::Never; +use tarpc::util::{FirstSocketAddr, Never}; service! { rpc read(size: u32) -> Vec; @@ -105,7 +105,7 @@ const MAX_CONCURRENCY: u32 = 100; fn main() { let _ = env_logger::init(); - let server = Server::new().listen("localhost:0").wait().unwrap(); + let server = Server::new().listen("localhost:0".first_socket_addr()).wait().unwrap(); println!("Server listening on {}.", server.local_addr()); let clients: Vec<_> = (1...5) .map(|i| { diff --git a/examples/pubsub.rs b/examples/pubsub.rs index a34e9a9..b7d9380 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -20,9 +20,9 @@ use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; -use tarpc::util::{Never, Message}; use tarpc::future::Connect as Fc; use tarpc::sync::Connect as Sc; +use tarpc::util::{FirstSocketAddr, Message, Never}; pub mod subscriber { service! { @@ -62,7 +62,7 @@ impl Subscriber { id: id, publisher: publisher.clone(), } - .listen("localhost:0") + .listen("localhost:0".first_socket_addr()) .wait() .unwrap(); publisher.subscribe(id, *subscriber.local_addr()).unwrap(); @@ -121,7 +121,7 @@ impl publisher::FutureService for Publisher { fn main() { let _ = env_logger::init(); - let publisher = Publisher::new().listen("localhost:0").wait().unwrap(); + let publisher = Publisher::new().listen("localhost:0".first_socket_addr()).wait().unwrap(); let publisher = publisher::SyncClient::connect(publisher.local_addr()).unwrap(); let _subscriber1 = Subscriber::new(0, publisher.clone()); let _subscriber2 = Subscriber::new(1, publisher.clone()); diff --git a/examples/server_calling_server.rs b/examples/server_calling_server.rs index aebdfe3..c3a1a1b 100644 --- a/examples/server_calling_server.rs +++ b/examples/server_calling_server.rs @@ -13,7 +13,7 @@ extern crate futures; use futures::{BoxFuture, Future}; use add::{FutureService as AddFutureService, FutureServiceExt as AddExt}; use double::{FutureService as DoubleFutureService, FutureServiceExt as DoubleExt}; -use tarpc::util::{Never, Message}; +use tarpc::util::{FirstSocketAddr, Message, Never}; use tarpc::future::Connect as Fc; use tarpc::sync::Connect as Sc; @@ -61,10 +61,10 @@ impl DoubleFutureService for DoubleServer { } fn main() { - let add = AddServer.listen("localhost:0").wait().unwrap(); + let add = AddServer.listen("localhost:0".first_socket_addr()).wait().unwrap(); let add_client = add::FutureClient::connect(add.local_addr()).wait().unwrap(); let double = DoubleServer { client: add_client }; - let double = double.listen("localhost:0").wait().unwrap(); + let double = double.listen("localhost:0".first_socket_addr()).wait().unwrap(); let double_client = double::SyncClient::connect(double.local_addr()).unwrap(); for i in 0..5 { diff --git a/examples/throughput.rs b/examples/throughput.rs index af53875..f82b5cb 100644 --- a/examples/throughput.rs +++ b/examples/throughput.rs @@ -14,13 +14,13 @@ extern crate env_logger; extern crate futures; use std::sync::Arc; -use std::time; use std::net; use std::thread; +use std::time; use std::io::{Read, Write, stdout}; use futures::Future; -use tarpc::util::Never; use tarpc::sync::Connect; +use tarpc::util::{FirstSocketAddr, Never}; lazy_static! { static ref BUF: Arc> = Arc::new(gen_vec(CHUNK_SIZE as usize)); @@ -52,7 +52,7 @@ impl FutureService for Server { const CHUNK_SIZE: u32 = 1 << 19; fn bench_tarpc(target: u64) { - let handle = Server.listen("localhost:0").wait().unwrap(); + let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); let client = SyncClient::connect(handle.local_addr()).unwrap(); let start = time::Instant::now(); let mut nread = 0; diff --git a/examples/two_clients.rs b/examples/two_clients.rs index feaad18..b26e0d0 100644 --- a/examples/two_clients.rs +++ b/examples/two_clients.rs @@ -17,7 +17,7 @@ extern crate futures; use bar::FutureServiceExt as BarExt; use baz::FutureServiceExt as BazExt; use futures::Future; -use tarpc::util::Never; +use tarpc::util::{FirstSocketAddr, Never}; use tarpc::sync::Connect; mod bar { @@ -58,8 +58,8 @@ macro_rules! pos { fn main() { let _ = env_logger::init(); - let bar = Bar.listen("localhost:0").wait().unwrap(); - let baz = Baz.listen("localhost:0").wait().unwrap(); + let bar = Bar.listen("localhost:0".first_socket_addr()).wait().unwrap(); + let baz = Baz.listen("localhost:0".first_socket_addr()).wait().unwrap(); let bar_client = bar::SyncClient::connect(bar.local_addr()).unwrap(); let baz_client = baz::SyncClient::connect(baz.local_addr()).unwrap(); diff --git a/src/lib.rs b/src/lib.rs index 711300f..e9b779c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -97,7 +97,7 @@ pub use errors::{WireError}; #[doc(hidden)] pub use framed::Framed; #[doc(hidden)] -pub use server::{ListenFuture, Response, listen_pipeline}; +pub use server::{ListenFuture, Response, listen, listen_with}; /// Provides some utility error types, as well as a trait for spawning futures on the default event /// loop. diff --git a/src/macros.rs b/src/macros.rs index 077ac96..05055ad 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -399,10 +399,8 @@ macro_rules! service { pub trait FutureServiceExt: FutureService { /// Spawns the service, binding to the given address and running on /// the default tokio `Loop`. - fn listen(self, addr: L) -> $crate::ListenFuture - where L: ::std::net::ToSocketAddrs - { - return $crate::listen_pipeline(addr, __tarpc_service_AsyncServer(self)); + fn listen(self, addr: ::std::net::SocketAddr) -> $crate::ListenFuture { + return $crate::listen(addr, __tarpc_service_AsyncServer(self)); #[allow(non_camel_case_types)] #[derive(Clone)] @@ -523,14 +521,20 @@ macro_rules! service { /// Spawns the service, binding to the given address and running on /// the default tokio `Loop`. fn listen(self, addr: L) - -> $crate::tokio_proto::server::ServerHandle + -> ::std::io::Result<$crate::tokio_proto::server::ServerHandle> where L: ::std::net::ToSocketAddrs { + let addr = if let ::std::option::Option::Some(a) = ::std::iter::Iterator::next(&mut try!(::std::net::ToSocketAddrs::to_socket_addrs(&addr))) { + a + } else { + return Err(::std::io::Error::new(::std::io::ErrorKind::AddrNotAvailable, + "`ToSocketAddrs::to_socket_addrs` returned an empty iterator.")); + }; let __tarpc_service_service = __SyncServer { service: self, }; - return ::std::result::Result::unwrap($crate::futures::Future::wait(FutureServiceExt::listen(__tarpc_service_service, addr))); + return $crate::futures::Future::wait(FutureServiceExt::listen(__tarpc_service_service, addr)); #[derive(Clone)] struct __SyncServer { @@ -733,6 +737,7 @@ mod syntax_test { #[cfg(test)] mod functional_test { + use util::FirstSocketAddr; use futures::{Future, failed}; extern crate env_logger; @@ -742,6 +747,7 @@ mod functional_test { } mod sync { + use util::FirstSocketAddr; use super::{SyncClient, SyncService, SyncServiceExt}; use super::env_logger; use sync::Connect; @@ -762,7 +768,7 @@ mod functional_test { #[test] fn simple() { let _ = env_logger::init(); - let handle = Server.listen("localhost:0"); + let handle = Server.listen("localhost:0".first_socket_addr()).unwrap(); let client = SyncClient::connect(handle.local_addr()).unwrap(); assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap()); @@ -770,7 +776,7 @@ mod functional_test { #[test] fn clone() { - let handle = Server.listen("localhost:0"); + let handle = Server.listen("localhost:0".first_socket_addr()).unwrap(); let client1 = SyncClient::connect(handle.local_addr()).unwrap(); let client2 = client1.clone(); assert_eq!(3, client1.add(1, 2).unwrap()); @@ -780,7 +786,7 @@ mod functional_test { #[test] fn other_service() { let _ = env_logger::init(); - let handle = Server.listen("localhost:0"); + let handle = Server.listen("localhost:0".first_socket_addr()).unwrap(); let client = super::other_service::SyncClient::connect(handle.local_addr()).unwrap(); match client.foo().err().unwrap() { ::Error::ServerDeserialize(_) => {} // good @@ -790,6 +796,7 @@ mod functional_test { } mod future { + use util::FirstSocketAddr; use future::Connect; use futures::{Finished, Future, finished}; use super::{FutureClient, FutureService, FutureServiceExt}; @@ -816,7 +823,7 @@ mod functional_test { #[test] fn simple() { let _ = env_logger::init(); - let handle = Server.listen("localhost:0").wait().unwrap(); + let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); let client = FutureClient::connect(handle.local_addr()).wait().unwrap(); assert_eq!(3, client.add(1, 2).wait().unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap()); @@ -825,7 +832,7 @@ mod functional_test { #[test] fn clone() { let _ = env_logger::init(); - let handle = Server.listen("localhost:0").wait().unwrap(); + let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); let client1 = FutureClient::connect(handle.local_addr()).wait().unwrap(); let client2 = client1.clone(); assert_eq!(3, client1.add(1, 2).wait().unwrap()); @@ -835,7 +842,7 @@ mod functional_test { #[test] fn other_service() { let _ = env_logger::init(); - let handle = Server.listen("localhost:0").wait().unwrap(); + let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); let client = super::other_service::FutureClient::connect(handle.local_addr()).wait().unwrap(); match client.foo().wait().err().unwrap() { @@ -871,7 +878,7 @@ mod functional_test { use self::error_service::*; let _ = env_logger::init(); - let handle = ErrorServer.listen("localhost:0").wait().unwrap(); + let handle = ErrorServer.listen("localhost:0".first_socket_addr()).wait().unwrap(); let client = FutureClient::connect(handle.local_addr()).wait().unwrap(); client.bar() .then(move |result| { diff --git a/src/server.rs b/src/server.rs index ffd0fc7..9dbee23 100644 --- a/src/server.rs +++ b/src/server.rs @@ -11,7 +11,8 @@ use futures::stream::Empty; use framed::Framed; use serde::{Deserialize, Serialize}; use std::io; -use std::net::ToSocketAddrs; +use std::net::SocketAddr; +use tokio_core::reactor::Handle; use tokio_proto::pipeline; use tokio_proto::server::{self, ServerHandle}; use tokio_service::NewService; @@ -20,40 +21,50 @@ use util::Never; /// A message from server to client. pub type Response = pipeline::Message>, Empty>; -/// Spawns a service that binds to the given address and runs on the default tokio `Loop`. -pub fn listen_pipeline(addr: A, new_service: S) -> ListenFuture +/// Spawns a service that binds to the given address and runs on the default reactor core. +pub fn listen(addr: SocketAddr, new_service: S) -> ListenFuture where S: NewService, Response = Response, Error = io::Error> + Send + 'static, - A: ToSocketAddrs, Req: Deserialize, Resp: Serialize, E: Serialize, { - // TODO(tikue): don't use ToSocketAddrs, or don't unwrap. - let addr = addr.to_socket_addrs().unwrap().next().unwrap(); - let (tx, rx) = futures::oneshot(); REMOTE.spawn(move |handle| { - Ok(tx.complete(server::listen(handle, addr, move |stream| { - pipeline::Server::new(new_service.new_service()?, Framed::new(stream)) - }).unwrap())) + Ok(tx.complete(listen_with(addr, new_service, handle))) }); ListenFuture { inner: rx } } +/// Spawns a service that binds to the given address using the given handle. +pub fn listen_with(addr: SocketAddr, new_service: S, handle: &Handle) + -> io::Result + where S: NewService, + Response = Response, + Error = io::Error> + Send + 'static, + Req: Deserialize, + Resp: Serialize, + E: Serialize, +{ + server::listen(handle, addr, move |stream| { + pipeline::Server::new(new_service.new_service()?, Framed::new(stream)) + }) +} + /// A future that resolves to a `ServerHandle`. pub struct ListenFuture { - inner: futures::Oneshot, + inner: futures::Oneshot>, } impl Future for ListenFuture { type Item = ServerHandle; - type Error = Never; + type Error = io::Error; fn poll(&mut self) -> futures::Poll { + // Can't panic the oneshot is always completed. match self.inner.poll().unwrap() { - Async::Ready(server_handle) => Ok(Async::Ready(server_handle)), + Async::Ready(result) => result.map(Async::Ready), Async::NotReady => Ok(Async::NotReady), } } diff --git a/src/util.rs b/src/util.rs index aeb5539..35253f0 100644 --- a/src/util.rs +++ b/src/util.rs @@ -7,6 +7,7 @@ use futures::{Future, Poll}; use futures::stream::Stream; use std::fmt; use std::error::Error; +use std::net::{SocketAddr, ToSocketAddrs}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; /// A bottom type that impls `Error`, `Serialize`, and `Deserialize`. It is impossible to @@ -97,3 +98,15 @@ impl> From for Message { Message(s.into()) } } + + +/// Provides a utility method for more ergonomically parsing a `SocketAddr` when panicking is +/// acceptable. +pub trait FirstSocketAddr: ToSocketAddrs { + /// Returns the first resolved `SocketAddr` or panics otherwise. + fn first_socket_addr(&self) -> SocketAddr { + self.to_socket_addrs().unwrap().next().unwrap() + } +} + +impl FirstSocketAddr for A {}