Remove some panics, and don't use ToSocketAddrs in async methods.

This commit is contained in:
Tim Kuehn
2016-09-19 00:16:47 -07:00
parent 3eb57d4009
commit 4a63064cbd
9 changed files with 72 additions and 41 deletions

View File

@@ -20,7 +20,7 @@ use futures_cpupool::{CpuFuture, CpuPool};
use std::thread;
use std::time::{Duration, Instant, SystemTime};
use tarpc::future::{Connect};
use tarpc::util::Never;
use tarpc::util::{FirstSocketAddr, Never};
service! {
rpc read(size: u32) -> Vec<u8>;
@@ -105,7 +105,7 @@ const MAX_CONCURRENCY: u32 = 100;
fn main() {
let _ = env_logger::init();
let server = Server::new().listen("localhost:0").wait().unwrap();
let server = Server::new().listen("localhost:0".first_socket_addr()).wait().unwrap();
println!("Server listening on {}.", server.local_addr());
let clients: Vec<_> = (1...5)
.map(|i| {

View File

@@ -20,9 +20,9 @@ use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use tarpc::util::{Never, Message};
use tarpc::future::Connect as Fc;
use tarpc::sync::Connect as Sc;
use tarpc::util::{FirstSocketAddr, Message, Never};
pub mod subscriber {
service! {
@@ -62,7 +62,7 @@ impl Subscriber {
id: id,
publisher: publisher.clone(),
}
.listen("localhost:0")
.listen("localhost:0".first_socket_addr())
.wait()
.unwrap();
publisher.subscribe(id, *subscriber.local_addr()).unwrap();
@@ -121,7 +121,7 @@ impl publisher::FutureService for Publisher {
fn main() {
let _ = env_logger::init();
let publisher = Publisher::new().listen("localhost:0").wait().unwrap();
let publisher = Publisher::new().listen("localhost:0".first_socket_addr()).wait().unwrap();
let publisher = publisher::SyncClient::connect(publisher.local_addr()).unwrap();
let _subscriber1 = Subscriber::new(0, publisher.clone());
let _subscriber2 = Subscriber::new(1, publisher.clone());

View File

@@ -13,7 +13,7 @@ extern crate futures;
use futures::{BoxFuture, Future};
use add::{FutureService as AddFutureService, FutureServiceExt as AddExt};
use double::{FutureService as DoubleFutureService, FutureServiceExt as DoubleExt};
use tarpc::util::{Never, Message};
use tarpc::util::{FirstSocketAddr, Message, Never};
use tarpc::future::Connect as Fc;
use tarpc::sync::Connect as Sc;
@@ -61,10 +61,10 @@ impl DoubleFutureService for DoubleServer {
}
fn main() {
let add = AddServer.listen("localhost:0").wait().unwrap();
let add = AddServer.listen("localhost:0".first_socket_addr()).wait().unwrap();
let add_client = add::FutureClient::connect(add.local_addr()).wait().unwrap();
let double = DoubleServer { client: add_client };
let double = double.listen("localhost:0").wait().unwrap();
let double = double.listen("localhost:0".first_socket_addr()).wait().unwrap();
let double_client = double::SyncClient::connect(double.local_addr()).unwrap();
for i in 0..5 {

View File

@@ -14,13 +14,13 @@ extern crate env_logger;
extern crate futures;
use std::sync::Arc;
use std::time;
use std::net;
use std::thread;
use std::time;
use std::io::{Read, Write, stdout};
use futures::Future;
use tarpc::util::Never;
use tarpc::sync::Connect;
use tarpc::util::{FirstSocketAddr, Never};
lazy_static! {
static ref BUF: Arc<Vec<u8>> = Arc::new(gen_vec(CHUNK_SIZE as usize));
@@ -52,7 +52,7 @@ impl FutureService for Server {
const CHUNK_SIZE: u32 = 1 << 19;
fn bench_tarpc(target: u64) {
let handle = Server.listen("localhost:0").wait().unwrap();
let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
let client = SyncClient::connect(handle.local_addr()).unwrap();
let start = time::Instant::now();
let mut nread = 0;

View File

@@ -17,7 +17,7 @@ extern crate futures;
use bar::FutureServiceExt as BarExt;
use baz::FutureServiceExt as BazExt;
use futures::Future;
use tarpc::util::Never;
use tarpc::util::{FirstSocketAddr, Never};
use tarpc::sync::Connect;
mod bar {
@@ -58,8 +58,8 @@ macro_rules! pos {
fn main() {
let _ = env_logger::init();
let bar = Bar.listen("localhost:0").wait().unwrap();
let baz = Baz.listen("localhost:0").wait().unwrap();
let bar = Bar.listen("localhost:0".first_socket_addr()).wait().unwrap();
let baz = Baz.listen("localhost:0".first_socket_addr()).wait().unwrap();
let bar_client = bar::SyncClient::connect(bar.local_addr()).unwrap();
let baz_client = baz::SyncClient::connect(baz.local_addr()).unwrap();

View File

@@ -97,7 +97,7 @@ pub use errors::{WireError};
#[doc(hidden)]
pub use framed::Framed;
#[doc(hidden)]
pub use server::{ListenFuture, Response, listen_pipeline};
pub use server::{ListenFuture, Response, listen, listen_with};
/// Provides some utility error types, as well as a trait for spawning futures on the default event
/// loop.

View File

@@ -399,10 +399,8 @@ macro_rules! service {
pub trait FutureServiceExt: FutureService {
/// Spawns the service, binding to the given address and running on
/// the default tokio `Loop`.
fn listen<L>(self, addr: L) -> $crate::ListenFuture
where L: ::std::net::ToSocketAddrs
{
return $crate::listen_pipeline(addr, __tarpc_service_AsyncServer(self));
fn listen(self, addr: ::std::net::SocketAddr) -> $crate::ListenFuture {
return $crate::listen(addr, __tarpc_service_AsyncServer(self));
#[allow(non_camel_case_types)]
#[derive(Clone)]
@@ -523,14 +521,20 @@ macro_rules! service {
/// Spawns the service, binding to the given address and running on
/// the default tokio `Loop`.
fn listen<L>(self, addr: L)
-> $crate::tokio_proto::server::ServerHandle
-> ::std::io::Result<$crate::tokio_proto::server::ServerHandle>
where L: ::std::net::ToSocketAddrs
{
let addr = if let ::std::option::Option::Some(a) = ::std::iter::Iterator::next(&mut try!(::std::net::ToSocketAddrs::to_socket_addrs(&addr))) {
a
} else {
return Err(::std::io::Error::new(::std::io::ErrorKind::AddrNotAvailable,
"`ToSocketAddrs::to_socket_addrs` returned an empty iterator."));
};
let __tarpc_service_service = __SyncServer {
service: self,
};
return ::std::result::Result::unwrap($crate::futures::Future::wait(FutureServiceExt::listen(__tarpc_service_service, addr)));
return $crate::futures::Future::wait(FutureServiceExt::listen(__tarpc_service_service, addr));
#[derive(Clone)]
struct __SyncServer<S> {
@@ -733,6 +737,7 @@ mod syntax_test {
#[cfg(test)]
mod functional_test {
use util::FirstSocketAddr;
use futures::{Future, failed};
extern crate env_logger;
@@ -742,6 +747,7 @@ mod functional_test {
}
mod sync {
use util::FirstSocketAddr;
use super::{SyncClient, SyncService, SyncServiceExt};
use super::env_logger;
use sync::Connect;
@@ -762,7 +768,7 @@ mod functional_test {
#[test]
fn simple() {
let _ = env_logger::init();
let handle = Server.listen("localhost:0");
let handle = Server.listen("localhost:0".first_socket_addr()).unwrap();
let client = SyncClient::connect(handle.local_addr()).unwrap();
assert_eq!(3, client.add(1, 2).unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
@@ -770,7 +776,7 @@ mod functional_test {
#[test]
fn clone() {
let handle = Server.listen("localhost:0");
let handle = Server.listen("localhost:0".first_socket_addr()).unwrap();
let client1 = SyncClient::connect(handle.local_addr()).unwrap();
let client2 = client1.clone();
assert_eq!(3, client1.add(1, 2).unwrap());
@@ -780,7 +786,7 @@ mod functional_test {
#[test]
fn other_service() {
let _ = env_logger::init();
let handle = Server.listen("localhost:0");
let handle = Server.listen("localhost:0".first_socket_addr()).unwrap();
let client = super::other_service::SyncClient::connect(handle.local_addr()).unwrap();
match client.foo().err().unwrap() {
::Error::ServerDeserialize(_) => {} // good
@@ -790,6 +796,7 @@ mod functional_test {
}
mod future {
use util::FirstSocketAddr;
use future::Connect;
use futures::{Finished, Future, finished};
use super::{FutureClient, FutureService, FutureServiceExt};
@@ -816,7 +823,7 @@ mod functional_test {
#[test]
fn simple() {
let _ = env_logger::init();
let handle = Server.listen("localhost:0").wait().unwrap();
let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
let client = FutureClient::connect(handle.local_addr()).wait().unwrap();
assert_eq!(3, client.add(1, 2).wait().unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap());
@@ -825,7 +832,7 @@ mod functional_test {
#[test]
fn clone() {
let _ = env_logger::init();
let handle = Server.listen("localhost:0").wait().unwrap();
let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
let client1 = FutureClient::connect(handle.local_addr()).wait().unwrap();
let client2 = client1.clone();
assert_eq!(3, client1.add(1, 2).wait().unwrap());
@@ -835,7 +842,7 @@ mod functional_test {
#[test]
fn other_service() {
let _ = env_logger::init();
let handle = Server.listen("localhost:0").wait().unwrap();
let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
let client =
super::other_service::FutureClient::connect(handle.local_addr()).wait().unwrap();
match client.foo().wait().err().unwrap() {
@@ -871,7 +878,7 @@ mod functional_test {
use self::error_service::*;
let _ = env_logger::init();
let handle = ErrorServer.listen("localhost:0").wait().unwrap();
let handle = ErrorServer.listen("localhost:0".first_socket_addr()).wait().unwrap();
let client = FutureClient::connect(handle.local_addr()).wait().unwrap();
client.bar()
.then(move |result| {

View File

@@ -11,7 +11,8 @@ use futures::stream::Empty;
use framed::Framed;
use serde::{Deserialize, Serialize};
use std::io;
use std::net::ToSocketAddrs;
use std::net::SocketAddr;
use tokio_core::reactor::Handle;
use tokio_proto::pipeline;
use tokio_proto::server::{self, ServerHandle};
use tokio_service::NewService;
@@ -20,40 +21,50 @@ use util::Never;
/// A message from server to client.
pub type Response<T, E> = pipeline::Message<Result<T, WireError<E>>, Empty<Never, io::Error>>;
/// Spawns a service that binds to the given address and runs on the default tokio `Loop`.
pub fn listen_pipeline<A, S, Req, Resp, E>(addr: A, new_service: S) -> ListenFuture
/// Spawns a service that binds to the given address and runs on the default reactor core.
pub fn listen<S, Req, Resp, E>(addr: SocketAddr, new_service: S) -> ListenFuture
where S: NewService<Request = Result<Req, DeserializeError>,
Response = Response<Resp, E>,
Error = io::Error> + Send + 'static,
A: ToSocketAddrs,
Req: Deserialize,
Resp: Serialize,
E: Serialize,
{
// TODO(tikue): don't use ToSocketAddrs, or don't unwrap.
let addr = addr.to_socket_addrs().unwrap().next().unwrap();
let (tx, rx) = futures::oneshot();
REMOTE.spawn(move |handle| {
Ok(tx.complete(server::listen(handle, addr, move |stream| {
pipeline::Server::new(new_service.new_service()?, Framed::new(stream))
}).unwrap()))
Ok(tx.complete(listen_with(addr, new_service, handle)))
});
ListenFuture { inner: rx }
}
/// Spawns a service that binds to the given address using the given handle.
pub fn listen_with<S, Req, Resp, E>(addr: SocketAddr, new_service: S, handle: &Handle)
-> io::Result<ServerHandle>
where S: NewService<Request = Result<Req, DeserializeError>,
Response = Response<Resp, E>,
Error = io::Error> + Send + 'static,
Req: Deserialize,
Resp: Serialize,
E: Serialize,
{
server::listen(handle, addr, move |stream| {
pipeline::Server::new(new_service.new_service()?, Framed::new(stream))
})
}
/// A future that resolves to a `ServerHandle`.
pub struct ListenFuture {
inner: futures::Oneshot<ServerHandle>,
inner: futures::Oneshot<io::Result<ServerHandle>>,
}
impl Future for ListenFuture {
type Item = ServerHandle;
type Error = Never;
type Error = io::Error;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
// Can't panic the oneshot is always completed.
match self.inner.poll().unwrap() {
Async::Ready(server_handle) => Ok(Async::Ready(server_handle)),
Async::Ready(result) => result.map(Async::Ready),
Async::NotReady => Ok(Async::NotReady),
}
}

View File

@@ -7,6 +7,7 @@ use futures::{Future, Poll};
use futures::stream::Stream;
use std::fmt;
use std::error::Error;
use std::net::{SocketAddr, ToSocketAddrs};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
/// A bottom type that impls `Error`, `Serialize`, and `Deserialize`. It is impossible to
@@ -97,3 +98,15 @@ impl<S: Into<String>> From<S> for Message {
Message(s.into())
}
}
/// Provides a utility method for more ergonomically parsing a `SocketAddr` when panicking is
/// acceptable.
pub trait FirstSocketAddr: ToSocketAddrs {
/// Returns the first resolved `SocketAddr` or panics otherwise.
fn first_socket_addr(&self) -> SocketAddr {
self.to_socket_addrs().unwrap().next().unwrap()
}
}
impl<A: ToSocketAddrs> FirstSocketAddr for A {}