From 2b8f3db1fd02e45b34fbffd508b6207f9c932c50 Mon Sep 17 00:00:00 2001 From: Tim Date: Tue, 21 Feb 2017 22:01:59 -0800 Subject: [PATCH] Return a concrete type from server::listen (#113) * Return a concrete type from `server::listen`. * Change `FutureServiceExt::listen` to return `(SocketAddr, Listen)`, where `Listen` is a struct created by the `service!` macro that `impls Future` and represents server execution. * Disable `conservative_impl_trait` as it's no longer used. * Update `FutureServiceExt` doc comment. * Update `SyncServiceExt` doc comment. Also annotate `server::Handle` with `#[must_use]`. * `cargo fmt` --- README.md | 22 +-- benches/latency.rs | 5 +- examples/concurrency.rs | 3 +- examples/pubsub.rs | 11 +- examples/readme_errors.rs | 3 +- examples/readme_futures.rs | 6 +- examples/readme_sync.rs | 4 +- examples/server_calling_server.rs | 8 +- examples/throughput.rs | 5 +- examples/two_clients.rs | 39 ++++-- src/client.rs | 10 +- src/lib.rs | 8 +- src/macros.rs | 93 ++++++++---- src/server.rs | 225 ++++++++++++++++++++++++------ 14 files changed, 326 insertions(+), 116 deletions(-) diff --git a/README.md b/README.md index bc92a09..af3fe31 100644 --- a/README.md +++ b/README.md @@ -46,9 +46,8 @@ tarpc-plugins = { git = "https://github.com/google/tarpc" } tarpc has two APIs: `sync` for blocking code and `future` for asynchronous code. Here's how to use the sync api. -```rust,no_run -// required by `FutureClient` (not used directly in this example) -#![feature(conservative_impl_trait, plugin)] +```rust +#![feature(plugin)] #![plugin(tarpc_plugins)] #[macro_use] @@ -100,8 +99,8 @@ races! See the `tarpc_examples` package for more examples. Here's the same service, implemented using futures. -```rust,no_run -#![feature(conservative_impl_trait, plugin)] +```rust +#![feature(plugin)] #![plugin(tarpc_plugins)] extern crate futures; @@ -132,10 +131,11 @@ impl FutureService for HelloServer { fn main() { let mut reactor = reactor::Core::new().unwrap(); - let addr = HelloServer.listen("localhost:10000".first_socket_addr(), + let (addr, server) = HelloServer.listen("localhost:10000".first_socket_addr(), &reactor.handle(), server::Options::default()) .unwrap(); + reactor.handle().spawn(server); let options = client::Options::default().handle(reactor.handle()); reactor.run(FutureClient::connect(addr, options) .map_err(tarpc::Error::from) @@ -171,7 +171,7 @@ However, if you are working with both stream types, ensure that you use the TLS servers and TCP clients with TCP servers. ```rust,no_run -#![feature(conservative_impl_trait, plugin)] +#![feature(plugin)] #![plugin(tarpc_plugins)] extern crate futures; @@ -210,10 +210,10 @@ fn get_acceptor() -> TlsAcceptor { fn main() { let mut reactor = reactor::Core::new().unwrap(); let acceptor = get_acceptor(); - let addr = HelloServer.listen("localhost:10000".first_socket_addr(), - &reactor.handle(), - server::Options::default().tls(acceptor)) - .unwrap(); + let (addr, server) = HelloServer.listen("localhost:10000".first_socket_addr(), + &reactor.handle(), + server::Options::default().tls(acceptor)).unwrap(); + reactor.handle().spawn(server); let options = client::Options::default() .handle(reactor.handle()) .tls(client::tls::Context::new("foobar.com").unwrap()); diff --git a/benches/latency.rs b/benches/latency.rs index 3d6550f..c1791e1 100644 --- a/benches/latency.rs +++ b/benches/latency.rs @@ -3,7 +3,7 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -#![feature(plugin, conservative_impl_trait, test)] +#![feature(plugin, test)] #![plugin(tarpc_plugins)] #[macro_use] @@ -40,10 +40,11 @@ impl FutureService for Server { fn latency(bencher: &mut Bencher) { let _ = env_logger::init(); let mut reactor = reactor::Core::new().unwrap(); - let addr = Server.listen("localhost:0".first_socket_addr(), + let (addr, server) = Server.listen("localhost:0".first_socket_addr(), &reactor.handle(), server::Options::default()) .unwrap(); + reactor.handle().spawn(server); let client = reactor.run(FutureClient::connect(addr, client::Options::default())).unwrap(); bencher.iter(|| reactor.run(client.ack()).unwrap()); diff --git a/examples/concurrency.rs b/examples/concurrency.rs index 6bdba37..35c5dfd 100644 --- a/examples/concurrency.rs +++ b/examples/concurrency.rs @@ -167,11 +167,12 @@ fn main() { .unwrap_or(4); let mut reactor = reactor::Core::new().unwrap(); - let addr = Server::new() + let (addr, server) = Server::new() .listen("localhost:0".first_socket_addr(), &reactor.handle(), server::Options::default()) .unwrap(); + reactor.handle().spawn(server); info!("Server listening on {}.", addr); let clients = (0..num_clients) diff --git a/examples/pubsub.rs b/examples/pubsub.rs index dab209d..189f7d8 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -3,7 +3,7 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -#![feature(conservative_impl_trait, plugin)] +#![feature(plugin)] #![plugin(tarpc_plugins)] extern crate env_logger; @@ -59,9 +59,11 @@ impl subscriber::FutureService for Subscriber { impl Subscriber { fn listen(id: u32, handle: &reactor::Handle, options: server::Options) -> SocketAddr { - Subscriber { id: id } + let (addr, server) = Subscriber { id: id } .listen("localhost:0".first_socket_addr(), handle, options) - .unwrap() + .unwrap(); + handle.spawn(server); + addr } } @@ -118,11 +120,12 @@ impl publisher::FutureService for Publisher { fn main() { let _ = env_logger::init(); let mut reactor = reactor::Core::new().unwrap(); - let publisher_addr = Publisher::new() + let (publisher_addr, server) = Publisher::new() .listen("localhost:0".first_socket_addr(), &reactor.handle(), server::Options::default()) .unwrap(); + reactor.handle().spawn(server); let subscriber1 = Subscriber::listen(0, &reactor.handle(), server::Options::default()); let subscriber2 = Subscriber::listen(1, &reactor.handle(), server::Options::default()); diff --git a/examples/readme_errors.rs b/examples/readme_errors.rs index 2c961bd..4833c10 100644 --- a/examples/readme_errors.rs +++ b/examples/readme_errors.rs @@ -3,7 +3,7 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -#![feature(conservative_impl_trait, plugin)] +#![feature(plugin)] #![plugin(tarpc_plugins)] extern crate futures; @@ -19,7 +19,6 @@ use std::sync::mpsc; use std::thread; use tarpc::{client, server}; use tarpc::client::sync::ClientExt; -use tarpc::util::FirstSocketAddr; service! { rpc hello(name: String) -> String | NoNameGiven; diff --git a/examples/readme_futures.rs b/examples/readme_futures.rs index 375b5ba..c4cb4b6 100644 --- a/examples/readme_futures.rs +++ b/examples/readme_futures.rs @@ -3,7 +3,7 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -#![feature(conservative_impl_trait, plugin)] +#![feature(plugin)] #![plugin(tarpc_plugins)] extern crate futures; @@ -34,10 +34,12 @@ impl FutureService for HelloServer { fn main() { let mut reactor = reactor::Core::new().unwrap(); - let addr = HelloServer.listen("localhost:10000".first_socket_addr(), + let (addr, server) = HelloServer.listen("localhost:10000".first_socket_addr(), &reactor.handle(), server::Options::default()) .unwrap(); + reactor.handle().spawn(server); + let options = client::Options::default().handle(reactor.handle()); reactor.run(FutureClient::connect(addr, options) .map_err(tarpc::Error::from) diff --git a/examples/readme_sync.rs b/examples/readme_sync.rs index 15db3a6..85d50de 100644 --- a/examples/readme_sync.rs +++ b/examples/readme_sync.rs @@ -4,7 +4,7 @@ // This file may not be copied, modified, or distributed except according to those terms. // required by `FutureClient` (not used directly in this example) -#![feature(conservative_impl_trait, plugin)] +#![feature(plugin)] #![plugin(tarpc_plugins)] extern crate futures; @@ -16,7 +16,7 @@ use std::sync::mpsc; use std::thread; use tarpc::{client, server}; use tarpc::client::sync::ClientExt; -use tarpc::util::{FirstSocketAddr, Never}; +use tarpc::util::Never; service! { rpc hello(name: String) -> String; diff --git a/examples/server_calling_server.rs b/examples/server_calling_server.rs index df6629d..56714e8 100644 --- a/examples/server_calling_server.rs +++ b/examples/server_calling_server.rs @@ -3,7 +3,7 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -#![feature(conservative_impl_trait, plugin)] +#![feature(plugin)] #![plugin(tarpc_plugins)] extern crate env_logger; @@ -72,19 +72,21 @@ impl DoubleFutureService for DoubleServer { fn main() { let _ = env_logger::init(); let mut reactor = reactor::Core::new().unwrap(); - let add_addr = AddServer.listen("localhost:0".first_socket_addr(), + let (add_addr, server) = AddServer.listen("localhost:0".first_socket_addr(), &reactor.handle(), server::Options::default()) .unwrap(); + reactor.handle().spawn(server); let options = client::Options::default().handle(reactor.handle()); let add_client = reactor.run(add::FutureClient::connect(add_addr, options)).unwrap(); - let double_addr = DoubleServer::new(add_client) + let (double_addr, server) = DoubleServer::new(add_client) .listen("localhost:0".first_socket_addr(), &reactor.handle(), server::Options::default()) .unwrap(); + reactor.handle().spawn(server); let double_client = reactor.run(double::FutureClient::connect(double_addr, client::Options::default())) diff --git a/examples/throughput.rs b/examples/throughput.rs index dce50ca..660690f 100644 --- a/examples/throughput.rs +++ b/examples/throughput.rs @@ -3,7 +3,7 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -#![feature(conservative_impl_trait, plugin)] +#![feature(plugin)] #![plugin(tarpc_plugins)] #[macro_use] @@ -56,10 +56,11 @@ const CHUNK_SIZE: u32 = 1 << 19; fn bench_tarpc(target: u64) { let reactor = reactor::Core::new().unwrap(); - let addr = Server.listen("localhost:0".first_socket_addr(), + let (addr, server) = Server.listen("localhost:0".first_socket_addr(), &reactor.handle(), server::Options::default()) .unwrap(); + reactor.handle().spawn(server); let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); let start = time::Instant::now(); let mut nread = 0; diff --git a/examples/two_clients.rs b/examples/two_clients.rs index 163e765..5ec2954 100644 --- a/examples/two_clients.rs +++ b/examples/two_clients.rs @@ -3,7 +3,7 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -#![feature(conservative_impl_trait, plugin)] +#![feature(plugin)] #![plugin(tarpc_plugins)] #[macro_use] @@ -17,6 +17,8 @@ extern crate tokio_core; use bar::FutureServiceExt as BarExt; use baz::FutureServiceExt as BazExt; +use std::sync::mpsc; +use std::thread; use tarpc::{client, server}; use tarpc::client::sync::ClientExt; use tarpc::util::{FirstSocketAddr, Never}; @@ -61,23 +63,32 @@ macro_rules! pos { fn main() { let _ = env_logger::init(); let mut bar_client = { - let reactor = reactor::Core::new().unwrap(); - let addr = Bar.listen("localhost:0".first_socket_addr(), - &reactor.handle(), - server::Options::default()) - .unwrap(); - // TODO: Need to set up each client with its own reactor. Should it be shareable across - // multiple clients? e.g. Rc> or something similar? + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + let mut reactor = reactor::Core::new().unwrap(); + let (addr, server) = Bar.listen("localhost:0".first_socket_addr(), + &reactor.handle(), + server::Options::default()) + .unwrap(); + tx.send(addr).unwrap(); + reactor.run(server).unwrap(); + }); + let addr = rx.recv().unwrap(); bar::SyncClient::connect(addr, client::Options::default()).unwrap() }; let mut baz_client = { - // Need to set up each client with its own reactor. - let reactor = reactor::Core::new().unwrap(); - let addr = Baz.listen("localhost:0".first_socket_addr(), - &reactor.handle(), - server::Options::default()) - .unwrap(); + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + let mut reactor = reactor::Core::new().unwrap(); + let (addr, server) = Baz.listen("localhost:0".first_socket_addr(), + &reactor.handle(), + server::Options::default()) + .unwrap(); + tx.send(addr).unwrap(); + reactor.run(server).unwrap(); + }); + let addr = rx.recv().unwrap(); baz::SyncClient::connect(addr, client::Options::default()).unwrap() }; diff --git a/src/client.rs b/src/client.rs index 450e6ed..00543c0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,9 +3,9 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use {WireError, bincode}; #[cfg(feature = "tls")] use self::tls::*; +use {WireError, bincode}; use tokio_core::reactor; type WireResponse = Result>, bincode::Error>; @@ -86,6 +86,7 @@ enum Reactor { /// Exposes a trait for connecting asynchronously to servers. pub mod future { + use super::{Options, Reactor, WireResponse}; use {REMOTE, WireError}; #[cfg(feature = "tls")] use errors::native_to_io; @@ -96,7 +97,6 @@ pub mod future { use std::io; use std::net::SocketAddr; use stream_type::StreamType; - use super::{Options, Reactor, WireResponse}; use tokio_core::net::TcpStream; use tokio_core::reactor; use tokio_proto::BindClient as ProtoBindClient; @@ -264,13 +264,13 @@ pub mod future { /// Exposes a trait for connecting synchronously to servers. pub mod sync { + use super::Options; + use super::Reactor; + use super::future::{Client as FutureClient, ClientExt as FutureClientExt}; use serde::{Deserialize, Serialize}; use std::fmt; use std::io; use std::net::ToSocketAddrs; - use super::Options; - use super::Reactor; - use super::future::{Client as FutureClient, ClientExt as FutureClientExt}; use tokio_core::reactor; use tokio_service::Service; use util::FirstSocketAddr; diff --git a/src/lib.rs b/src/lib.rs index afd3479..352b1b8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,8 +27,7 @@ //! Example usage: //! //! ``` -//! // required by `FutureClient` (not used in this example) -//! #![feature(conservative_impl_trait, plugin)] +//! #![feature(plugin)] //! #![plugin(tarpc_plugins)] //! //! #[macro_use] @@ -72,8 +71,7 @@ //! Example usage with TLS: //! //! ```ignore -//! // required by `FutureClient` (not used in this example) -//! #![feature(conservative_impl_trait, plugin)] +//! #![feature(plugin)] //! #![plugin(tarpc_plugins)] //! //! #[macro_use] @@ -116,7 +114,7 @@ //! ``` //! #![deny(missing_docs)] -#![feature(conservative_impl_trait, never_type, plugin, struct_field_attributes)] +#![feature(never_type, plugin, struct_field_attributes, fn_traits, unboxed_closures)] #![plugin(tarpc_plugins)] extern crate byteorder; diff --git a/src/macros.rs b/src/macros.rs index e8f8486..5a44745 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -153,7 +153,7 @@ macro_rules! impl_deserialize { /// Rpc methods are specified, mirroring trait syntax: /// /// ``` -/// # #![feature(conservative_impl_trait, plugin)] +/// # #![feature(plugin)] /// # #![plugin(tarpc_plugins)] /// # #[macro_use] extern crate tarpc; /// # fn main() {} @@ -446,25 +446,62 @@ macro_rules! service { } } + #[allow(non_camel_case_types)] + impl $crate::tokio_service::NewService + for tarpc_service_AsyncServer__ + where tarpc_service_S__: FutureService + { + type Request = ::Request; + type Response = ::Response; + type Error = ::Error; + type Instance = Self; + + fn new_service(&self) -> ::std::io::Result { + Ok(self.clone()) + } + } + + /// The future returned by `FutureServiceExt::listen`. + #[allow(unused)] + pub struct Listen + where S: FutureService, + { + inner: $crate::server::Listen, + tarpc_service_Request__, + tarpc_service_Response__, + tarpc_service_Error__>, + } + + impl $crate::futures::Future for Listen + where S: FutureService + { + type Item = (); + type Error = (); + + fn poll(&mut self) -> $crate::futures::Poll<(), ()> { + self.inner.poll() + } + } + /// Provides a function for starting the service. This is a separate trait from /// `FutureService` to prevent collisions with the names of RPCs. pub trait FutureServiceExt: FutureService { /// Spawns the service, binding to the given address and running on - /// the default tokio `Loop`. + /// the `reactor::Core` associated with `handle`. + /// + /// Returns the address being listened on as well as the server future. The future + /// must be executed for the server to run. fn listen(self, addr: ::std::net::SocketAddr, handle: &$crate::tokio_core::reactor::Handle, options: $crate::server::Options) - -> ::std::io::Result<::std::net::SocketAddr> + -> ::std::io::Result<(::std::net::SocketAddr, Listen)> { - $crate::server::listen(move || Ok(tarpc_service_AsyncServer__(self.clone())), + $crate::server::listen(tarpc_service_AsyncServer__(self), addr, handle, options) - .map(|(addr_, server_)| { - handle.spawn(server_); - addr_ - }) + .map(|(addr, inner)| (addr, Listen { inner })) } } @@ -485,8 +522,9 @@ macro_rules! service { /// Provides a function for starting the service. This is a separate trait from /// `SyncService` to prevent collisions with the names of RPCs. pub trait SyncServiceExt: SyncService { - /// Spawns the service, binding to the given address and running on - /// the default tokio `Loop`. + /// Spawns the service, binding to the given address and returning the server handle. + /// + /// To actually run the server, call `run` on the returned handle. fn listen(self, addr: A, options: $crate::server::Options) -> ::std::io::Result<$crate::server::Handle> where A: ::std::net::ToSocketAddrs @@ -500,7 +538,7 @@ macro_rules! service { let reactor_ = $crate::tokio_core::reactor::Core::new()?; let (addr_, server_) = $crate::server::listen( - move || Ok(tarpc_service__.clone()), + tarpc_service__, tarpc_service_addr__, &reactor_.handle(), options)?; @@ -859,7 +897,8 @@ mod functional_test { let options = get_tls_server_options(); let (tx, rx) = ::std::sync::mpsc::channel(); ::std::thread::spawn(move || { - let mut handle = unwrap!(server.listen("localhost:0".first_socket_addr(), options)); + let mut handle = unwrap!(server.listen("localhost:0".first_socket_addr(), + options)); tx.send(handle.addr()).unwrap(); handle.run(); }); @@ -874,9 +913,10 @@ mod functional_test { { let mut reactor = reactor::Core::new()?; let server_options = get_tls_server_options(); - let addr = server.listen("localhost:0".first_socket_addr(), - &reactor.handle(), - server_options)?; + let (addr, server) = server.listen("localhost:0".first_socket_addr(), + &reactor.handle(), + server_options)?; + reactor.handle().spawn(server); let client_options = get_tls_client_options().handle(reactor.handle()); let client = unwrap!(reactor.run(C::connect(addr, client_options))); Ok((addr, reactor, client)) @@ -888,9 +928,10 @@ mod functional_test { { let mut reactor = reactor::Core::new()?; let server_options = get_tls_server_options(); - let addr = server.listen("localhost:0".first_socket_addr(), - &reactor.handle(), - server_options)?; + let (addr, server) = server.listen("localhost:0".first_socket_addr(), + &reactor.handle(), + server_options)?; + reactor.handle().spawn(server); let client_options = get_tls_client_options().handle(reactor.handle()); let client = unwrap!(reactor.run(C::connect(addr, client_options))); Ok((addr, reactor, client)) @@ -916,7 +957,8 @@ mod functional_test { let options = get_server_options(); let (tx, rx) = ::std::sync::mpsc::channel(); ::std::thread::spawn(move || { - let mut handle = unwrap!(server.listen("localhost:0".first_socket_addr(), options)); + let mut handle = unwrap!(server.listen("localhost:0".first_socket_addr(), + options)); tx.send(handle.addr()).unwrap(); handle.run(); }); @@ -931,9 +973,10 @@ mod functional_test { { let mut reactor = reactor::Core::new()?; let options = get_server_options(); - let addr = server.listen("localhost:0".first_socket_addr(), + let (addr, server) = server.listen("localhost:0".first_socket_addr(), &reactor.handle(), options)?; + reactor.handle().spawn(server); let client = unwrap!(reactor.run(C::connect(addr, get_client_options()))); Ok((addr, reactor, client)) } @@ -944,9 +987,10 @@ mod functional_test { { let mut reactor = reactor::Core::new()?; let options = get_server_options(); - let addr = server.listen("localhost:0".first_socket_addr(), + let (addr, server) = server.listen("localhost:0".first_socket_addr(), &reactor.handle(), options)?; + reactor.handle().spawn(server); let client = C::connect(addr, get_client_options()); let client = unwrap!(reactor.run(client)); Ok((addr, reactor, client)) @@ -994,8 +1038,8 @@ mod functional_test { } mod future { - use futures::{Finished, finished}; use super::{FutureClient, FutureService, env_logger, start_server_with_async_client}; + use futures::{Finished, finished}; use tokio_core::reactor; use util::Never; @@ -1059,7 +1103,7 @@ mod functional_test { let _ = env_logger::init(); let reactor = reactor::Core::new().unwrap(); - let addr = Server.listen("localhost:0".first_socket_addr(), + let (addr, _) = Server.listen("localhost:0".first_socket_addr(), &reactor.handle(), server::Options::default()) .unwrap(); @@ -1081,10 +1125,11 @@ mod functional_test { assert_eq!("Hey, Tim.", reactor.run(client.hey("Tim".to_string())).unwrap()); - let addr = Server.listen("localhost:0".first_socket_addr(), + let (addr, server) = Server.listen("localhost:0".first_socket_addr(), &reactor.handle(), server::Options::default()) .unwrap(); + reactor.handle().spawn(server); let options = client::Options::default().handle(reactor.handle()); let client = reactor.run(FutureClient::connect(addr, options)).unwrap(); assert_eq!(3, reactor.run(client.add(1, 2)).unwrap()); diff --git a/src/server.rs b/src/server.rs index 30f579d..ee72c86 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,21 +5,22 @@ use bincode; use errors::WireError; -use futures::{Future, Stream, future}; +use futures::{Future, Poll, Stream, future, stream}; use net2; use protocol::Proto; use serde::{Deserialize, Serialize}; use std::io; use std::net::SocketAddr; -use tokio_core::net::TcpListener; +use tokio_core::io::Io; +use tokio_core::net::{Incoming, TcpListener, TcpStream}; use tokio_core::reactor; use tokio_proto::BindServer; use tokio_service::NewService; cfg_if! { if #[cfg(feature = "tls")] { - use native_tls::TlsAcceptor; - use tokio_tls::TlsAcceptorExt; + use native_tls::{self, TlsAcceptor}; + use tokio_tls::{AcceptAsync, TlsAcceptorExt, TlsStream}; use errors::native_to_io; use stream_type::StreamType; } else {} @@ -31,6 +32,73 @@ enum Acceptor { Tls(TlsAcceptor), } +#[cfg(feature = "tls")] +type Accept = future::Either, + fn(TlsStream) -> StreamType>, + fn(native_tls::Error) -> io::Error>, + future::FutureResult>; + +#[cfg(not(feature = "tls"))] +type Accept = future::FutureResult; + +impl Acceptor { + #[cfg(feature = "tls")] + fn accept(&self, socket: TcpStream) -> Accept { + match *self { + Acceptor::Tls(ref tls_acceptor) => { + future::Either::A(tls_acceptor.accept_async(socket) + .map(StreamType::Tls as _) + .map_err(native_to_io)) + } + Acceptor::Tcp => future::Either::B(future::ok(StreamType::Tcp(socket))), + } + } + + #[cfg(not(feature = "tls"))] + fn accept(&self, socket: TcpStream) -> Accept { + future::ok(socket) + } +} + +#[cfg(feature = "tls")] +impl From for Acceptor { + fn from(options: Options) -> Self { + match options.tls_acceptor { + Some(tls_acceptor) => Acceptor::Tls(tls_acceptor), + None => Acceptor::Tcp, + } + } +} + +#[cfg(not(feature = "tls"))] +impl From for Acceptor { + fn from(_: Options) -> Self { + Acceptor::Tcp + } +} + +impl FnOnce<((TcpStream, SocketAddr),)> for Acceptor { + type Output = Accept; + + extern "rust-call" fn call_once(self, ((socket, _),): ((TcpStream, SocketAddr),)) -> Accept { + self.accept(socket) + } +} + +impl FnMut<((TcpStream, SocketAddr),)> for Acceptor { + extern "rust-call" fn call_mut(&mut self, + ((socket, _),): ((TcpStream, SocketAddr),)) + -> Accept { + self.accept(socket) + } +} + +impl Fn<((TcpStream, SocketAddr),)> for Acceptor { + extern "rust-call" fn call(&self, ((socket, _),): ((TcpStream, SocketAddr),)) -> Accept { + self.accept(socket) + } +} + /// Additional options to configure how the server operates. #[derive(Default)] pub struct Options { @@ -55,8 +123,8 @@ pub type Response = Result>; pub fn listen(new_service: S, addr: SocketAddr, handle: &reactor::Handle, - _options: Options) - -> io::Result<(SocketAddr, impl Future)> + options: Options) + -> io::Result<(SocketAddr, Listen)> where S: NewService, Response = Response, Error = io::Error> + 'static, @@ -64,20 +132,11 @@ pub fn listen(new_service: S, Resp: Serialize + 'static, E: Serialize + 'static { - // Similar to the client, since `Options` is not `Send`, we take the `TlsAcceptor` when it is - // available. - #[cfg(feature = "tls")] - let acceptor = match _options.tls_acceptor { - Some(tls_acceptor) => Acceptor::Tls(tls_acceptor), - None => Acceptor::Tcp, - }; - #[cfg(not(feature = "tls"))] - let acceptor = Acceptor::Tcp; - - listen_with(new_service, addr, handle, acceptor) + listen_with(new_service, addr, handle, Acceptor::from(options)) } /// A handle to a bound server. Must be run to start serving requests. +#[must_use = "A server does nothing until `run` is called."] pub struct Handle { reactor: reactor::Core, addr: SocketAddr, @@ -105,12 +164,44 @@ impl Handle { } } +/// The future representing a running server. +#[doc(hidden)] +pub struct Listen + where S: NewService, + Response = Response, + Error = io::Error> + 'static, + Req: Deserialize + 'static, + Resp: Serialize + 'static, + E: Serialize + 'static +{ + inner: future::MapErr, + Bind, + io::Result<()>>, + fn(io::Error)>, +} + +impl Future for Listen + where S: NewService, + Response = Response, + Error = io::Error> + 'static, + Req: Deserialize + 'static, + Resp: Serialize + 'static, + E: Serialize + 'static +{ + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + self.inner.poll() + } +} + /// Spawns a service that binds to the given address using the given handle. fn listen_with(new_service: S, addr: SocketAddr, handle: &reactor::Handle, - _acceptor: Acceptor) - -> io::Result<(SocketAddr, impl Future)> + acceptor: Acceptor) + -> io::Result<(SocketAddr, Listen)> where S: NewService, Response = Response, Error = io::Error> + 'static, @@ -121,29 +212,85 @@ fn listen_with(new_service: S, let listener = listener(&addr, handle)?; let addr = listener.local_addr()?; - let handle2 = handle.clone(); + let handle = handle.clone(); - let server = listener.incoming() - .and_then(move |(socket, _)| { - #[cfg(feature = "tls")] - match _acceptor { - Acceptor::Tls(ref tls_acceptor) => { - future::Either::A(tls_acceptor.accept_async(socket) - .map(StreamType::Tls) - .map_err(native_to_io)) - } - Acceptor::Tcp => future::Either::B(future::ok(StreamType::Tcp(socket))), - } - #[cfg(not(feature = "tls"))] - future::ok(socket) + let inner = listener.incoming() + .and_then(acceptor) + .for_each(Bind { + handle: handle, + new_service: new_service, }) - .for_each(move |socket| { - Proto::new().bind_server(&handle2, socket, new_service.new_service()?); + .map_err(log_err as _); + Ok((addr, Listen { inner: inner })) +} - Ok(()) - }) - .map_err(|e| error!("While processing incoming connections: {}", e)); - Ok((addr, server)) +fn log_err(e: io::Error) { + error!("While processing incoming connections: {}", e); +} + +struct Bind { + handle: reactor::Handle, + new_service: S, +} + +impl Bind + where S: NewService, + Response = Response, + Error = io::Error> + 'static, + Req: Deserialize + 'static, + Resp: Serialize + 'static, + E: Serialize + 'static +{ + fn bind(&self, socket: I) -> io::Result<()> + where I: Io + 'static + { + Proto::new().bind_server(&self.handle, socket, self.new_service.new_service()?); + Ok(()) + } +} + +impl FnOnce<(I,)> for Bind + where I: Io + 'static, + S: NewService, + Response = Response, + Error = io::Error> + 'static, + Req: Deserialize + 'static, + Resp: Serialize + 'static, + E: Serialize + 'static +{ + type Output = io::Result<()>; + + extern "rust-call" fn call_once(self, (socket,): (I,)) -> io::Result<()> { + self.bind(socket) + } +} + +impl FnMut<(I,)> for Bind + where I: Io + 'static, + S: NewService, + Response = Response, + Error = io::Error> + 'static, + Req: Deserialize + 'static, + Resp: Serialize + 'static, + E: Serialize + 'static +{ + extern "rust-call" fn call_mut(&mut self, (socket,): (I,)) -> io::Result<()> { + self.bind(socket) + } +} + +impl Fn<(I,)> for Bind + where I: Io + 'static, + S: NewService, + Response = Response, + Error = io::Error> + 'static, + Req: Deserialize + 'static, + Resp: Serialize + 'static, + E: Serialize + 'static +{ + extern "rust-call" fn call(&self, (socket,): (I,)) -> io::Result<()> { + self.bind(socket) + } } fn listener(addr: &SocketAddr, handle: &reactor::Handle) -> io::Result {