diff --git a/README.md b/README.md index 515a2a5..bc92a09 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,8 @@ code. Here's how to use the sync api. #[macro_use] extern crate tarpc; +use std::sync::mpsc; +use std::thread; use tarpc::{client, server}; use tarpc::client::sync::ClientExt; use tarpc::util::{FirstSocketAddr, Never}; @@ -72,10 +74,14 @@ impl SyncService for HelloServer { } fn main() { - let addr = HelloServer.listen("localhost:0".first_socket_addr(), - server::Options::default()) - .unwrap(); - let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + let mut handle = HelloServer.listen("localhost:0", server::Options::default()) + .unwrap(); + tx.send(handle.addr()).unwrap(); + handle.run(); + }); + let mut client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); } ``` diff --git a/examples/readme_errors.rs b/examples/readme_errors.rs index 60c340a..2c961bd 100644 --- a/examples/readme_errors.rs +++ b/examples/readme_errors.rs @@ -15,6 +15,8 @@ extern crate tokio_core; use std::error::Error; use std::fmt; +use std::sync::mpsc; +use std::thread; use tarpc::{client, server}; use tarpc::client::sync::ClientExt; use tarpc::util::FirstSocketAddr; @@ -52,10 +54,14 @@ impl SyncService for HelloServer { } fn main() { - let addr = HelloServer.listen("localhost:10000".first_socket_addr(), - server::Options::default()) - .unwrap(); - let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + let mut handle = HelloServer.listen("localhost:10000", server::Options::default()) + .unwrap(); + tx.send(handle.addr()).unwrap(); + handle.run(); + }); + let mut client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); println!("{}", client.hello("".to_string()).unwrap_err()); } diff --git a/examples/readme_sync.rs b/examples/readme_sync.rs index e2097f6..15db3a6 100644 --- a/examples/readme_sync.rs +++ b/examples/readme_sync.rs @@ -12,6 +12,8 @@ extern crate futures; extern crate tarpc; extern crate tokio_core; +use std::sync::mpsc; +use std::thread; use tarpc::{client, server}; use tarpc::client::sync::ClientExt; use tarpc::util::{FirstSocketAddr, Never}; @@ -30,9 +32,13 @@ impl SyncService for HelloServer { } fn main() { - let addr = HelloServer.listen("localhost:0".first_socket_addr(), - server::Options::default()) - .unwrap(); - let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + let mut handle = HelloServer.listen("localhost:0", server::Options::default()) + .unwrap(); + tx.send(handle.addr()).unwrap(); + handle.run(); + }); + let mut client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); } diff --git a/src/lib.rs b/src/lib.rs index 4c8b479..afd3479 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,6 +39,8 @@ //! use tarpc::client::sync::ClientExt; //! use tarpc::util::Never; //! use tokio_core::reactor; +//! use std::sync::mpsc; +//! use std::thread; //! //! service! { //! rpc hello(name: String) -> String; @@ -54,9 +56,14 @@ //! } //! //! fn main() { -//! let addr = "localhost:10000"; -//! let reactor = reactor::Core::new().unwrap(); -//! let _server = HelloServer.listen(addr, server::Options::default()); +//! let (tx, rx) = mpsc::channel(); +//! thread::spawn(move || { +//! let mut handle = HelloServer.listen("localhost:10000", +//! server::Options::default()).unwrap(); +//! tx.send(handle.addr()).unwrap(); +//! handle.run(); +//! }); +//! let addr = rx.recv().unwrap(); //! let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); //! println!("{}", client.hello("Mom".to_string()).unwrap()); //! } @@ -109,7 +116,7 @@ //! ``` //! #![deny(missing_docs)] -#![feature(plugin, never_type, struct_field_attributes)] +#![feature(conservative_impl_trait, never_type, plugin, struct_field_attributes)] #![plugin(tarpc_plugins)] extern crate byteorder; diff --git a/src/macros.rs b/src/macros.rs index d4f2d84..e8f8486 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -339,6 +339,113 @@ macro_rules! service { )* } + #[allow(non_camel_case_types)] + #[derive(Clone)] + struct tarpc_service_AsyncServer__(S); + + impl ::std::fmt::Debug for tarpc_service_AsyncServer__ { + fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + write!(fmt, "tarpc_service_AsyncServer__ {{ .. }}") + } + } + + #[allow(non_camel_case_types)] + type tarpc_service_Future__ = + $crate::futures::Finished<$crate::server::Response, + ::std::io::Error>; + + #[allow(non_camel_case_types)] + enum tarpc_service_FutureReply__ { + DeserializeError(tarpc_service_Future__), + $($fn_name( + $crate::futures::Then< + ::Future, + tarpc_service_Future__, + fn(::std::result::Result<$out, $error>) + -> tarpc_service_Future__>)),* + } + + impl $crate::futures::Future for tarpc_service_FutureReply__ { + type Item = $crate::server::Response; + + type Error = ::std::io::Error; + + fn poll(&mut self) -> $crate::futures::Poll { + match *self { + tarpc_service_FutureReply__::DeserializeError( + ref mut tarpc_service_future__) => + { + $crate::futures::Future::poll(tarpc_service_future__) + } + $( + tarpc_service_FutureReply__::$fn_name( + ref mut tarpc_service_future__) => + { + $crate::futures::Future::poll(tarpc_service_future__) + } + ),* + } + } + } + + + #[allow(non_camel_case_types)] + impl $crate::tokio_service::Service + for tarpc_service_AsyncServer__ + where tarpc_service_S__: FutureService + { + type Request = ::std::result::Result; + type Response = $crate::server::Response; + type Error = ::std::io::Error; + type Future = tarpc_service_FutureReply__; + + fn call(&self, tarpc_service_request__: Self::Request) -> Self::Future { + let tarpc_service_request__ = match tarpc_service_request__ { + Ok(tarpc_service_request__) => tarpc_service_request__, + Err(tarpc_service_deserialize_err__) => { + return tarpc_service_FutureReply__::DeserializeError( + $crate::futures::finished( + ::std::result::Result::Err( + $crate::WireError::ServerSerialize( + ::std::string::ToString::to_string( + &tarpc_service_deserialize_err__))))); + } + }; + match tarpc_service_request__ { + tarpc_service_Request__::NotIrrefutable(()) => unreachable!(), + $( + tarpc_service_Request__::$fn_name(( $($arg,)* )) => { + fn tarpc_service_wrap__( + tarpc_service_response__: + ::std::result::Result<$out, $error>) + -> tarpc_service_Future__ + { + $crate::futures::finished( + tarpc_service_response__ + .map(tarpc_service_Response__::$fn_name) + .map_err(|tarpc_service_error__| { + $crate::WireError::App( + tarpc_service_Error__::$fn_name( + tarpc_service_error__)) + }) + ) + } + return tarpc_service_FutureReply__::$fn_name( + $crate::futures::Future::then( + $crate::futures::IntoFuture::into_future( + FutureService::$fn_name(&self.0, $($arg),*)), + tarpc_service_wrap__)); + } + )* + } + } + } + /// Provides a function for starting the service. This is a separate trait from /// `FutureService` to prevent collisions with the names of RPCs. pub trait FutureServiceExt: FutureService { @@ -350,117 +457,14 @@ macro_rules! service { options: $crate::server::Options) -> ::std::io::Result<::std::net::SocketAddr> { - return $crate::server::listen(move || Ok(tarpc_service_AsyncServer__(self.clone())), + $crate::server::listen(move || Ok(tarpc_service_AsyncServer__(self.clone())), addr, handle, - options); - - #[allow(non_camel_case_types)] - #[derive(Clone)] - struct tarpc_service_AsyncServer__(S); - - impl ::std::fmt::Debug for tarpc_service_AsyncServer__ { - fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { - write!(fmt, "tarpc_service_AsyncServer__ {{ .. }}") - } - } - - #[allow(non_camel_case_types)] - type tarpc_service_Future__ = - $crate::futures::Finished<$crate::server::Response, - ::std::io::Error>; - - #[allow(non_camel_case_types)] - enum tarpc_service_FutureReply__ { - DeserializeError(tarpc_service_Future__), - $($fn_name( - $crate::futures::Then< - ::Future, - tarpc_service_Future__, - fn(::std::result::Result<$out, $error>) - -> tarpc_service_Future__>)),* - } - - impl $crate::futures::Future for tarpc_service_FutureReply__ { - type Item = $crate::server::Response; - - type Error = ::std::io::Error; - - fn poll(&mut self) -> $crate::futures::Poll { - match *self { - tarpc_service_FutureReply__::DeserializeError( - ref mut tarpc_service_future__) => - { - $crate::futures::Future::poll(tarpc_service_future__) - } - $( - tarpc_service_FutureReply__::$fn_name( - ref mut tarpc_service_future__) => - { - $crate::futures::Future::poll(tarpc_service_future__) - } - ),* - } - } - } - - - #[allow(non_camel_case_types)] - impl $crate::tokio_service::Service - for tarpc_service_AsyncServer__ - where tarpc_service_S__: FutureService - { - type Request = ::std::result::Result; - type Response = $crate::server::Response; - type Error = ::std::io::Error; - type Future = tarpc_service_FutureReply__; - - fn call(&self, tarpc_service_request__: Self::Request) -> Self::Future { - let tarpc_service_request__ = match tarpc_service_request__ { - Ok(tarpc_service_request__) => tarpc_service_request__, - Err(tarpc_service_deserialize_err__) => { - return tarpc_service_FutureReply__::DeserializeError( - $crate::futures::finished( - ::std::result::Result::Err( - $crate::WireError::ServerSerialize( - ::std::string::ToString::to_string( - &tarpc_service_deserialize_err__))))); - } - }; - match tarpc_service_request__ { - tarpc_service_Request__::NotIrrefutable(()) => unreachable!(), - $( - tarpc_service_Request__::$fn_name(( $($arg,)* )) => { - fn tarpc_service_wrap__( - tarpc_service_response__: - ::std::result::Result<$out, $error>) - -> tarpc_service_Future__ - { - $crate::futures::finished( - tarpc_service_response__ - .map(tarpc_service_Response__::$fn_name) - .map_err(|tarpc_service_error__| { - $crate::WireError::App( - tarpc_service_Error__::$fn_name( - tarpc_service_error__)) - }) - ) - } - return tarpc_service_FutureReply__::$fn_name( - $crate::futures::Future::then( - $crate::futures::IntoFuture::into_future( - FutureService::$fn_name(&self.0, $($arg),*)), - tarpc_service_wrap__)); - } - )* - } - } - } + options) + .map(|(addr_, server_)| { + handle.spawn(server_); + addr_ + }) } } @@ -484,36 +488,24 @@ macro_rules! service { /// Spawns the service, binding to the given address and running on /// the default tokio `Loop`. fn listen(self, addr: A, options: $crate::server::Options) - -> ::std::io::Result<::std::net::SocketAddr> + -> ::std::io::Result<$crate::server::Handle> where A: ::std::net::ToSocketAddrs { - let tarpc_service__ = SyncServer__ { + let tarpc_service__ = tarpc_service_AsyncServer__(SyncServer__ { service: self, - }; + }); let tarpc_service_addr__ = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?; - let (tx_, rx_) = ::std::sync::mpsc::channel(); - - ::std::thread::spawn(move || { - match $crate::tokio_core::reactor::Core::new() { - ::std::result::Result::Ok(mut reactor_) => { - let addr_ = FutureServiceExt::listen(tarpc_service__, - tarpc_service_addr__, - &reactor_.handle(), - options); - tx_.send(addr_).unwrap(); - loop { - reactor_.turn(::std::option::Option::None); - } - } - ::std::result::Result::Err(error_) => { - tx_.send(Err(error_)).unwrap(); - } - } - }); - return rx_.recv().unwrap(); + let reactor_ = $crate::tokio_core::reactor::Core::new()?; + let (addr_, server_) = $crate::server::listen( + move || Ok(tarpc_service__.clone()), + tarpc_service_addr__, + &reactor_.handle(), + options)?; + reactor_.handle().spawn(server_); + return Ok($crate::server::Handle::new(reactor_, addr_)); #[derive(Clone)] struct SyncServer__ { @@ -864,9 +856,14 @@ mod functional_test { fn start_server_with_sync_client(server: S) -> io::Result<(SocketAddr, C)> where C: client::sync::ClientExt, S: SyncServiceExt { - let server_options = get_tls_server_options(); - let addr = unwrap!(server.listen("localhost:0".first_socket_addr(), - server_options)); + let options = get_tls_server_options(); + let (tx, rx) = ::std::sync::mpsc::channel(); + ::std::thread::spawn(move || { + let mut handle = unwrap!(server.listen("localhost:0".first_socket_addr(), options)); + tx.send(handle.addr()).unwrap(); + handle.run(); + }); + let addr = rx.recv().unwrap(); let client = unwrap!(C::connect(addr, get_tls_client_options())); Ok((addr, client)) } @@ -917,7 +914,13 @@ mod functional_test { where C: client::sync::ClientExt, S: SyncServiceExt { let options = get_server_options(); - let addr = unwrap!(server.listen("localhost:0".first_socket_addr(), options)); + let (tx, rx) = ::std::sync::mpsc::channel(); + ::std::thread::spawn(move || { + let mut handle = unwrap!(server.listen("localhost:0".first_socket_addr(), options)); + tx.send(handle.addr()).unwrap(); + handle.run(); + }); + let addr = rx.recv().unwrap(); let client = unwrap!(get_sync_client(addr)); Ok((addr, client)) } diff --git a/src/server.rs b/src/server.rs index 5016376..30f579d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize}; use std::io; use std::net::SocketAddr; use tokio_core::net::TcpListener; -use tokio_core::reactor::{self, Handle}; +use tokio_core::reactor; use tokio_proto::BindServer; use tokio_service::NewService; @@ -56,7 +56,7 @@ pub fn listen(new_service: S, addr: SocketAddr, handle: &reactor::Handle, _options: Options) - -> io::Result + -> io::Result<(SocketAddr, impl Future)> where S: NewService, Response = Response, Error = io::Error> + 'static, @@ -77,12 +77,40 @@ pub fn listen(new_service: S, listen_with(new_service, addr, handle, acceptor) } +/// A handle to a bound server. Must be run to start serving requests. +pub struct Handle { + reactor: reactor::Core, + addr: SocketAddr, +} + +impl Handle { + #[doc(hidden)] + pub fn new(reactor: reactor::Core, addr: SocketAddr) -> Self { + Handle { + reactor: reactor, + addr: addr, + } + } + + /// Runs the server on the current thread, blocking indefinitely. + pub fn run(&mut self) -> ! { + loop { + self.reactor.turn(None) + } + } + + /// The socket address the server is bound to. + pub fn addr(&self) -> SocketAddr { + self.addr + } +} + /// Spawns a service that binds to the given address using the given handle. fn listen_with(new_service: S, addr: SocketAddr, - handle: &Handle, + handle: &reactor::Handle, _acceptor: Acceptor) - -> io::Result + -> io::Result<(SocketAddr, impl Future)> where S: NewService, Response = Response, Error = io::Error> + 'static, @@ -115,11 +143,10 @@ fn listen_with(new_service: S, Ok(()) }) .map_err(|e| error!("While processing incoming connections: {}", e)); - handle.spawn(server); - Ok(addr) + Ok((addr, server)) } -fn listener(addr: &SocketAddr, handle: &Handle) -> io::Result { +fn listener(addr: &SocketAddr, handle: &reactor::Handle) -> io::Result { const PENDING_CONNECTION_BACKLOG: i32 = 1024; #[cfg(unix)] use net2::unix::UnixTcpBuilderExt;