mirror of
https://github.com/OMGeeky/tarpc.git
synced 2025-12-30 08:08:58 +01:00
Return a concrete type from server::listen (#113)
* Return a concrete type from `server::listen`. * Change `FutureServiceExt::listen` to return `(SocketAddr, Listen)`, where `Listen` is a struct created by the `service!` macro that `impls Future<Item=(), Error=()>` and represents server execution. * Disable `conservative_impl_trait` as it's no longer used. * Update `FutureServiceExt` doc comment. * Update `SyncServiceExt` doc comment. Also annotate `server::Handle` with `#[must_use]`. * `cargo fmt`
This commit is contained in:
22
README.md
22
README.md
@@ -46,9 +46,8 @@ tarpc-plugins = { git = "https://github.com/google/tarpc" }
|
||||
tarpc has two APIs: `sync` for blocking code and `future` for asynchronous
|
||||
code. Here's how to use the sync api.
|
||||
|
||||
```rust,no_run
|
||||
// required by `FutureClient` (not used directly in this example)
|
||||
#![feature(conservative_impl_trait, plugin)]
|
||||
```rust
|
||||
#![feature(plugin)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
#[macro_use]
|
||||
@@ -100,8 +99,8 @@ races! See the `tarpc_examples` package for more examples.
|
||||
|
||||
Here's the same service, implemented using futures.
|
||||
|
||||
```rust,no_run
|
||||
#![feature(conservative_impl_trait, plugin)]
|
||||
```rust
|
||||
#![feature(plugin)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate futures;
|
||||
@@ -132,10 +131,11 @@ impl FutureService for HelloServer {
|
||||
|
||||
fn main() {
|
||||
let mut reactor = reactor::Core::new().unwrap();
|
||||
let addr = HelloServer.listen("localhost:10000".first_socket_addr(),
|
||||
let (addr, server) = HelloServer.listen("localhost:10000".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
.unwrap();
|
||||
reactor.handle().spawn(server);
|
||||
let options = client::Options::default().handle(reactor.handle());
|
||||
reactor.run(FutureClient::connect(addr, options)
|
||||
.map_err(tarpc::Error::from)
|
||||
@@ -171,7 +171,7 @@ However, if you are working with both stream types, ensure that you use the TLS
|
||||
servers and TCP clients with TCP servers.
|
||||
|
||||
```rust,no_run
|
||||
#![feature(conservative_impl_trait, plugin)]
|
||||
#![feature(plugin)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate futures;
|
||||
@@ -210,10 +210,10 @@ fn get_acceptor() -> TlsAcceptor {
|
||||
fn main() {
|
||||
let mut reactor = reactor::Core::new().unwrap();
|
||||
let acceptor = get_acceptor();
|
||||
let addr = HelloServer.listen("localhost:10000".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default().tls(acceptor))
|
||||
.unwrap();
|
||||
let (addr, server) = HelloServer.listen("localhost:10000".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default().tls(acceptor)).unwrap();
|
||||
reactor.handle().spawn(server);
|
||||
let options = client::Options::default()
|
||||
.handle(reactor.handle())
|
||||
.tls(client::tls::Context::new("foobar.com").unwrap());
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(plugin, conservative_impl_trait, test)]
|
||||
#![feature(plugin, test)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
#[macro_use]
|
||||
@@ -40,10 +40,11 @@ impl FutureService for Server {
|
||||
fn latency(bencher: &mut Bencher) {
|
||||
let _ = env_logger::init();
|
||||
let mut reactor = reactor::Core::new().unwrap();
|
||||
let addr = Server.listen("localhost:0".first_socket_addr(),
|
||||
let (addr, server) = Server.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
.unwrap();
|
||||
reactor.handle().spawn(server);
|
||||
let client = reactor.run(FutureClient::connect(addr, client::Options::default())).unwrap();
|
||||
|
||||
bencher.iter(|| reactor.run(client.ack()).unwrap());
|
||||
|
||||
@@ -167,11 +167,12 @@ fn main() {
|
||||
.unwrap_or(4);
|
||||
|
||||
let mut reactor = reactor::Core::new().unwrap();
|
||||
let addr = Server::new()
|
||||
let (addr, server) = Server::new()
|
||||
.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
.unwrap();
|
||||
reactor.handle().spawn(server);
|
||||
info!("Server listening on {}.", addr);
|
||||
|
||||
let clients = (0..num_clients)
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(conservative_impl_trait, plugin)]
|
||||
#![feature(plugin)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate env_logger;
|
||||
@@ -59,9 +59,11 @@ impl subscriber::FutureService for Subscriber {
|
||||
|
||||
impl Subscriber {
|
||||
fn listen(id: u32, handle: &reactor::Handle, options: server::Options) -> SocketAddr {
|
||||
Subscriber { id: id }
|
||||
let (addr, server) = Subscriber { id: id }
|
||||
.listen("localhost:0".first_socket_addr(), handle, options)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
handle.spawn(server);
|
||||
addr
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,11 +120,12 @@ impl publisher::FutureService for Publisher {
|
||||
fn main() {
|
||||
let _ = env_logger::init();
|
||||
let mut reactor = reactor::Core::new().unwrap();
|
||||
let publisher_addr = Publisher::new()
|
||||
let (publisher_addr, server) = Publisher::new()
|
||||
.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
.unwrap();
|
||||
reactor.handle().spawn(server);
|
||||
|
||||
let subscriber1 = Subscriber::listen(0, &reactor.handle(), server::Options::default());
|
||||
let subscriber2 = Subscriber::listen(1, &reactor.handle(), server::Options::default());
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(conservative_impl_trait, plugin)]
|
||||
#![feature(plugin)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate futures;
|
||||
@@ -19,7 +19,6 @@ use std::sync::mpsc;
|
||||
use std::thread;
|
||||
use tarpc::{client, server};
|
||||
use tarpc::client::sync::ClientExt;
|
||||
use tarpc::util::FirstSocketAddr;
|
||||
|
||||
service! {
|
||||
rpc hello(name: String) -> String | NoNameGiven;
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(conservative_impl_trait, plugin)]
|
||||
#![feature(plugin)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate futures;
|
||||
@@ -34,10 +34,12 @@ impl FutureService for HelloServer {
|
||||
|
||||
fn main() {
|
||||
let mut reactor = reactor::Core::new().unwrap();
|
||||
let addr = HelloServer.listen("localhost:10000".first_socket_addr(),
|
||||
let (addr, server) = HelloServer.listen("localhost:10000".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
.unwrap();
|
||||
reactor.handle().spawn(server);
|
||||
|
||||
let options = client::Options::default().handle(reactor.handle());
|
||||
reactor.run(FutureClient::connect(addr, options)
|
||||
.map_err(tarpc::Error::from)
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
// required by `FutureClient` (not used directly in this example)
|
||||
#![feature(conservative_impl_trait, plugin)]
|
||||
#![feature(plugin)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate futures;
|
||||
@@ -16,7 +16,7 @@ use std::sync::mpsc;
|
||||
use std::thread;
|
||||
use tarpc::{client, server};
|
||||
use tarpc::client::sync::ClientExt;
|
||||
use tarpc::util::{FirstSocketAddr, Never};
|
||||
use tarpc::util::Never;
|
||||
|
||||
service! {
|
||||
rpc hello(name: String) -> String;
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(conservative_impl_trait, plugin)]
|
||||
#![feature(plugin)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate env_logger;
|
||||
@@ -72,19 +72,21 @@ impl DoubleFutureService for DoubleServer {
|
||||
fn main() {
|
||||
let _ = env_logger::init();
|
||||
let mut reactor = reactor::Core::new().unwrap();
|
||||
let add_addr = AddServer.listen("localhost:0".first_socket_addr(),
|
||||
let (add_addr, server) = AddServer.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
.unwrap();
|
||||
reactor.handle().spawn(server);
|
||||
|
||||
let options = client::Options::default().handle(reactor.handle());
|
||||
let add_client = reactor.run(add::FutureClient::connect(add_addr, options)).unwrap();
|
||||
|
||||
let double_addr = DoubleServer::new(add_client)
|
||||
let (double_addr, server) = DoubleServer::new(add_client)
|
||||
.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
.unwrap();
|
||||
reactor.handle().spawn(server);
|
||||
|
||||
let double_client =
|
||||
reactor.run(double::FutureClient::connect(double_addr, client::Options::default()))
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(conservative_impl_trait, plugin)]
|
||||
#![feature(plugin)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
#[macro_use]
|
||||
@@ -56,10 +56,11 @@ const CHUNK_SIZE: u32 = 1 << 19;
|
||||
|
||||
fn bench_tarpc(target: u64) {
|
||||
let reactor = reactor::Core::new().unwrap();
|
||||
let addr = Server.listen("localhost:0".first_socket_addr(),
|
||||
let (addr, server) = Server.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
.unwrap();
|
||||
reactor.handle().spawn(server);
|
||||
let mut client = SyncClient::connect(addr, client::Options::default()).unwrap();
|
||||
let start = time::Instant::now();
|
||||
let mut nread = 0;
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(conservative_impl_trait, plugin)]
|
||||
#![feature(plugin)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
#[macro_use]
|
||||
@@ -17,6 +17,8 @@ extern crate tokio_core;
|
||||
|
||||
use bar::FutureServiceExt as BarExt;
|
||||
use baz::FutureServiceExt as BazExt;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
use tarpc::{client, server};
|
||||
use tarpc::client::sync::ClientExt;
|
||||
use tarpc::util::{FirstSocketAddr, Never};
|
||||
@@ -61,23 +63,32 @@ macro_rules! pos {
|
||||
fn main() {
|
||||
let _ = env_logger::init();
|
||||
let mut bar_client = {
|
||||
let reactor = reactor::Core::new().unwrap();
|
||||
let addr = Bar.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
.unwrap();
|
||||
// TODO: Need to set up each client with its own reactor. Should it be shareable across
|
||||
// multiple clients? e.g. Rc<RefCell<Core>> or something similar?
|
||||
let (tx, rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let mut reactor = reactor::Core::new().unwrap();
|
||||
let (addr, server) = Bar.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
.unwrap();
|
||||
tx.send(addr).unwrap();
|
||||
reactor.run(server).unwrap();
|
||||
});
|
||||
let addr = rx.recv().unwrap();
|
||||
bar::SyncClient::connect(addr, client::Options::default()).unwrap()
|
||||
};
|
||||
|
||||
let mut baz_client = {
|
||||
// Need to set up each client with its own reactor.
|
||||
let reactor = reactor::Core::new().unwrap();
|
||||
let addr = Baz.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
.unwrap();
|
||||
let (tx, rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let mut reactor = reactor::Core::new().unwrap();
|
||||
let (addr, server) = Baz.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
.unwrap();
|
||||
tx.send(addr).unwrap();
|
||||
reactor.run(server).unwrap();
|
||||
});
|
||||
let addr = rx.recv().unwrap();
|
||||
baz::SyncClient::connect(addr, client::Options::default()).unwrap()
|
||||
};
|
||||
|
||||
|
||||
@@ -3,9 +3,9 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
use {WireError, bincode};
|
||||
#[cfg(feature = "tls")]
|
||||
use self::tls::*;
|
||||
use {WireError, bincode};
|
||||
use tokio_core::reactor;
|
||||
|
||||
type WireResponse<Resp, E> = Result<Result<Resp, WireError<E>>, bincode::Error>;
|
||||
@@ -86,6 +86,7 @@ enum Reactor {
|
||||
|
||||
/// Exposes a trait for connecting asynchronously to servers.
|
||||
pub mod future {
|
||||
use super::{Options, Reactor, WireResponse};
|
||||
use {REMOTE, WireError};
|
||||
#[cfg(feature = "tls")]
|
||||
use errors::native_to_io;
|
||||
@@ -96,7 +97,6 @@ pub mod future {
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
use stream_type::StreamType;
|
||||
use super::{Options, Reactor, WireResponse};
|
||||
use tokio_core::net::TcpStream;
|
||||
use tokio_core::reactor;
|
||||
use tokio_proto::BindClient as ProtoBindClient;
|
||||
@@ -264,13 +264,13 @@ pub mod future {
|
||||
|
||||
/// Exposes a trait for connecting synchronously to servers.
|
||||
pub mod sync {
|
||||
use super::Options;
|
||||
use super::Reactor;
|
||||
use super::future::{Client as FutureClient, ClientExt as FutureClientExt};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::net::ToSocketAddrs;
|
||||
use super::Options;
|
||||
use super::Reactor;
|
||||
use super::future::{Client as FutureClient, ClientExt as FutureClientExt};
|
||||
use tokio_core::reactor;
|
||||
use tokio_service::Service;
|
||||
use util::FirstSocketAddr;
|
||||
|
||||
@@ -27,8 +27,7 @@
|
||||
//! Example usage:
|
||||
//!
|
||||
//! ```
|
||||
//! // required by `FutureClient` (not used in this example)
|
||||
//! #![feature(conservative_impl_trait, plugin)]
|
||||
//! #![feature(plugin)]
|
||||
//! #![plugin(tarpc_plugins)]
|
||||
//!
|
||||
//! #[macro_use]
|
||||
@@ -72,8 +71,7 @@
|
||||
//! Example usage with TLS:
|
||||
//!
|
||||
//! ```ignore
|
||||
//! // required by `FutureClient` (not used in this example)
|
||||
//! #![feature(conservative_impl_trait, plugin)]
|
||||
//! #![feature(plugin)]
|
||||
//! #![plugin(tarpc_plugins)]
|
||||
//!
|
||||
//! #[macro_use]
|
||||
@@ -116,7 +114,7 @@
|
||||
//! ```
|
||||
//!
|
||||
#![deny(missing_docs)]
|
||||
#![feature(conservative_impl_trait, never_type, plugin, struct_field_attributes)]
|
||||
#![feature(never_type, plugin, struct_field_attributes, fn_traits, unboxed_closures)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate byteorder;
|
||||
|
||||
@@ -153,7 +153,7 @@ macro_rules! impl_deserialize {
|
||||
/// Rpc methods are specified, mirroring trait syntax:
|
||||
///
|
||||
/// ```
|
||||
/// # #![feature(conservative_impl_trait, plugin)]
|
||||
/// # #![feature(plugin)]
|
||||
/// # #![plugin(tarpc_plugins)]
|
||||
/// # #[macro_use] extern crate tarpc;
|
||||
/// # fn main() {}
|
||||
@@ -446,25 +446,62 @@ macro_rules! service {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
impl<tarpc_service_S__> $crate::tokio_service::NewService
|
||||
for tarpc_service_AsyncServer__<tarpc_service_S__>
|
||||
where tarpc_service_S__: FutureService
|
||||
{
|
||||
type Request = <Self as $crate::tokio_service::Service>::Request;
|
||||
type Response = <Self as $crate::tokio_service::Service>::Response;
|
||||
type Error = <Self as $crate::tokio_service::Service>::Error;
|
||||
type Instance = Self;
|
||||
|
||||
fn new_service(&self) -> ::std::io::Result<Self> {
|
||||
Ok(self.clone())
|
||||
}
|
||||
}
|
||||
|
||||
/// The future returned by `FutureServiceExt::listen`.
|
||||
#[allow(unused)]
|
||||
pub struct Listen<S>
|
||||
where S: FutureService,
|
||||
{
|
||||
inner: $crate::server::Listen<tarpc_service_AsyncServer__<S>,
|
||||
tarpc_service_Request__,
|
||||
tarpc_service_Response__,
|
||||
tarpc_service_Error__>,
|
||||
}
|
||||
|
||||
impl<S> $crate::futures::Future for Listen<S>
|
||||
where S: FutureService
|
||||
{
|
||||
type Item = ();
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> $crate::futures::Poll<(), ()> {
|
||||
self.inner.poll()
|
||||
}
|
||||
}
|
||||
|
||||
/// Provides a function for starting the service. This is a separate trait from
|
||||
/// `FutureService` to prevent collisions with the names of RPCs.
|
||||
pub trait FutureServiceExt: FutureService {
|
||||
/// Spawns the service, binding to the given address and running on
|
||||
/// the default tokio `Loop`.
|
||||
/// the `reactor::Core` associated with `handle`.
|
||||
///
|
||||
/// Returns the address being listened on as well as the server future. The future
|
||||
/// must be executed for the server to run.
|
||||
fn listen(self,
|
||||
addr: ::std::net::SocketAddr,
|
||||
handle: &$crate::tokio_core::reactor::Handle,
|
||||
options: $crate::server::Options)
|
||||
-> ::std::io::Result<::std::net::SocketAddr>
|
||||
-> ::std::io::Result<(::std::net::SocketAddr, Listen<Self>)>
|
||||
{
|
||||
$crate::server::listen(move || Ok(tarpc_service_AsyncServer__(self.clone())),
|
||||
$crate::server::listen(tarpc_service_AsyncServer__(self),
|
||||
addr,
|
||||
handle,
|
||||
options)
|
||||
.map(|(addr_, server_)| {
|
||||
handle.spawn(server_);
|
||||
addr_
|
||||
})
|
||||
.map(|(addr, inner)| (addr, Listen { inner }))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -485,8 +522,9 @@ macro_rules! service {
|
||||
/// Provides a function for starting the service. This is a separate trait from
|
||||
/// `SyncService` to prevent collisions with the names of RPCs.
|
||||
pub trait SyncServiceExt: SyncService {
|
||||
/// Spawns the service, binding to the given address and running on
|
||||
/// the default tokio `Loop`.
|
||||
/// Spawns the service, binding to the given address and returning the server handle.
|
||||
///
|
||||
/// To actually run the server, call `run` on the returned handle.
|
||||
fn listen<A>(self, addr: A, options: $crate::server::Options)
|
||||
-> ::std::io::Result<$crate::server::Handle>
|
||||
where A: ::std::net::ToSocketAddrs
|
||||
@@ -500,7 +538,7 @@ macro_rules! service {
|
||||
|
||||
let reactor_ = $crate::tokio_core::reactor::Core::new()?;
|
||||
let (addr_, server_) = $crate::server::listen(
|
||||
move || Ok(tarpc_service__.clone()),
|
||||
tarpc_service__,
|
||||
tarpc_service_addr__,
|
||||
&reactor_.handle(),
|
||||
options)?;
|
||||
@@ -859,7 +897,8 @@ mod functional_test {
|
||||
let options = get_tls_server_options();
|
||||
let (tx, rx) = ::std::sync::mpsc::channel();
|
||||
::std::thread::spawn(move || {
|
||||
let mut handle = unwrap!(server.listen("localhost:0".first_socket_addr(), options));
|
||||
let mut handle = unwrap!(server.listen("localhost:0".first_socket_addr(),
|
||||
options));
|
||||
tx.send(handle.addr()).unwrap();
|
||||
handle.run();
|
||||
});
|
||||
@@ -874,9 +913,10 @@ mod functional_test {
|
||||
{
|
||||
let mut reactor = reactor::Core::new()?;
|
||||
let server_options = get_tls_server_options();
|
||||
let addr = server.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server_options)?;
|
||||
let (addr, server) = server.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server_options)?;
|
||||
reactor.handle().spawn(server);
|
||||
let client_options = get_tls_client_options().handle(reactor.handle());
|
||||
let client = unwrap!(reactor.run(C::connect(addr, client_options)));
|
||||
Ok((addr, reactor, client))
|
||||
@@ -888,9 +928,10 @@ mod functional_test {
|
||||
{
|
||||
let mut reactor = reactor::Core::new()?;
|
||||
let server_options = get_tls_server_options();
|
||||
let addr = server.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server_options)?;
|
||||
let (addr, server) = server.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server_options)?;
|
||||
reactor.handle().spawn(server);
|
||||
let client_options = get_tls_client_options().handle(reactor.handle());
|
||||
let client = unwrap!(reactor.run(C::connect(addr, client_options)));
|
||||
Ok((addr, reactor, client))
|
||||
@@ -916,7 +957,8 @@ mod functional_test {
|
||||
let options = get_server_options();
|
||||
let (tx, rx) = ::std::sync::mpsc::channel();
|
||||
::std::thread::spawn(move || {
|
||||
let mut handle = unwrap!(server.listen("localhost:0".first_socket_addr(), options));
|
||||
let mut handle = unwrap!(server.listen("localhost:0".first_socket_addr(),
|
||||
options));
|
||||
tx.send(handle.addr()).unwrap();
|
||||
handle.run();
|
||||
});
|
||||
@@ -931,9 +973,10 @@ mod functional_test {
|
||||
{
|
||||
let mut reactor = reactor::Core::new()?;
|
||||
let options = get_server_options();
|
||||
let addr = server.listen("localhost:0".first_socket_addr(),
|
||||
let (addr, server) = server.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
options)?;
|
||||
reactor.handle().spawn(server);
|
||||
let client = unwrap!(reactor.run(C::connect(addr, get_client_options())));
|
||||
Ok((addr, reactor, client))
|
||||
}
|
||||
@@ -944,9 +987,10 @@ mod functional_test {
|
||||
{
|
||||
let mut reactor = reactor::Core::new()?;
|
||||
let options = get_server_options();
|
||||
let addr = server.listen("localhost:0".first_socket_addr(),
|
||||
let (addr, server) = server.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
options)?;
|
||||
reactor.handle().spawn(server);
|
||||
let client = C::connect(addr, get_client_options());
|
||||
let client = unwrap!(reactor.run(client));
|
||||
Ok((addr, reactor, client))
|
||||
@@ -994,8 +1038,8 @@ mod functional_test {
|
||||
}
|
||||
|
||||
mod future {
|
||||
use futures::{Finished, finished};
|
||||
use super::{FutureClient, FutureService, env_logger, start_server_with_async_client};
|
||||
use futures::{Finished, finished};
|
||||
use tokio_core::reactor;
|
||||
use util::Never;
|
||||
|
||||
@@ -1059,7 +1103,7 @@ mod functional_test {
|
||||
|
||||
let _ = env_logger::init();
|
||||
let reactor = reactor::Core::new().unwrap();
|
||||
let addr = Server.listen("localhost:0".first_socket_addr(),
|
||||
let (addr, _) = Server.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
.unwrap();
|
||||
@@ -1081,10 +1125,11 @@ mod functional_test {
|
||||
assert_eq!("Hey, Tim.",
|
||||
reactor.run(client.hey("Tim".to_string())).unwrap());
|
||||
|
||||
let addr = Server.listen("localhost:0".first_socket_addr(),
|
||||
let (addr, server) = Server.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
.unwrap();
|
||||
reactor.handle().spawn(server);
|
||||
let options = client::Options::default().handle(reactor.handle());
|
||||
let client = reactor.run(FutureClient::connect(addr, options)).unwrap();
|
||||
assert_eq!(3, reactor.run(client.add(1, 2)).unwrap());
|
||||
|
||||
225
src/server.rs
225
src/server.rs
@@ -5,21 +5,22 @@
|
||||
|
||||
use bincode;
|
||||
use errors::WireError;
|
||||
use futures::{Future, Stream, future};
|
||||
use futures::{Future, Poll, Stream, future, stream};
|
||||
use net2;
|
||||
use protocol::Proto;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
use tokio_core::net::TcpListener;
|
||||
use tokio_core::io::Io;
|
||||
use tokio_core::net::{Incoming, TcpListener, TcpStream};
|
||||
use tokio_core::reactor;
|
||||
use tokio_proto::BindServer;
|
||||
use tokio_service::NewService;
|
||||
|
||||
cfg_if! {
|
||||
if #[cfg(feature = "tls")] {
|
||||
use native_tls::TlsAcceptor;
|
||||
use tokio_tls::TlsAcceptorExt;
|
||||
use native_tls::{self, TlsAcceptor};
|
||||
use tokio_tls::{AcceptAsync, TlsAcceptorExt, TlsStream};
|
||||
use errors::native_to_io;
|
||||
use stream_type::StreamType;
|
||||
} else {}
|
||||
@@ -31,6 +32,73 @@ enum Acceptor {
|
||||
Tls(TlsAcceptor),
|
||||
}
|
||||
|
||||
#[cfg(feature = "tls")]
|
||||
type Accept = future::Either<future::MapErr<future::Map<AcceptAsync<TcpStream>,
|
||||
fn(TlsStream<TcpStream>) -> StreamType>,
|
||||
fn(native_tls::Error) -> io::Error>,
|
||||
future::FutureResult<StreamType, io::Error>>;
|
||||
|
||||
#[cfg(not(feature = "tls"))]
|
||||
type Accept = future::FutureResult<TcpStream, io::Error>;
|
||||
|
||||
impl Acceptor {
|
||||
#[cfg(feature = "tls")]
|
||||
fn accept(&self, socket: TcpStream) -> Accept {
|
||||
match *self {
|
||||
Acceptor::Tls(ref tls_acceptor) => {
|
||||
future::Either::A(tls_acceptor.accept_async(socket)
|
||||
.map(StreamType::Tls as _)
|
||||
.map_err(native_to_io))
|
||||
}
|
||||
Acceptor::Tcp => future::Either::B(future::ok(StreamType::Tcp(socket))),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "tls"))]
|
||||
fn accept(&self, socket: TcpStream) -> Accept {
|
||||
future::ok(socket)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "tls")]
|
||||
impl From<Options> for Acceptor {
|
||||
fn from(options: Options) -> Self {
|
||||
match options.tls_acceptor {
|
||||
Some(tls_acceptor) => Acceptor::Tls(tls_acceptor),
|
||||
None => Acceptor::Tcp,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "tls"))]
|
||||
impl From<Options> for Acceptor {
|
||||
fn from(_: Options) -> Self {
|
||||
Acceptor::Tcp
|
||||
}
|
||||
}
|
||||
|
||||
impl FnOnce<((TcpStream, SocketAddr),)> for Acceptor {
|
||||
type Output = Accept;
|
||||
|
||||
extern "rust-call" fn call_once(self, ((socket, _),): ((TcpStream, SocketAddr),)) -> Accept {
|
||||
self.accept(socket)
|
||||
}
|
||||
}
|
||||
|
||||
impl FnMut<((TcpStream, SocketAddr),)> for Acceptor {
|
||||
extern "rust-call" fn call_mut(&mut self,
|
||||
((socket, _),): ((TcpStream, SocketAddr),))
|
||||
-> Accept {
|
||||
self.accept(socket)
|
||||
}
|
||||
}
|
||||
|
||||
impl Fn<((TcpStream, SocketAddr),)> for Acceptor {
|
||||
extern "rust-call" fn call(&self, ((socket, _),): ((TcpStream, SocketAddr),)) -> Accept {
|
||||
self.accept(socket)
|
||||
}
|
||||
}
|
||||
|
||||
/// Additional options to configure how the server operates.
|
||||
#[derive(Default)]
|
||||
pub struct Options {
|
||||
@@ -55,8 +123,8 @@ pub type Response<T, E> = Result<T, WireError<E>>;
|
||||
pub fn listen<S, Req, Resp, E>(new_service: S,
|
||||
addr: SocketAddr,
|
||||
handle: &reactor::Handle,
|
||||
_options: Options)
|
||||
-> io::Result<(SocketAddr, impl Future<Item = (), Error = ()>)>
|
||||
options: Options)
|
||||
-> io::Result<(SocketAddr, Listen<S, Req, Resp, E>)>
|
||||
where S: NewService<Request = Result<Req, bincode::Error>,
|
||||
Response = Response<Resp, E>,
|
||||
Error = io::Error> + 'static,
|
||||
@@ -64,20 +132,11 @@ pub fn listen<S, Req, Resp, E>(new_service: S,
|
||||
Resp: Serialize + 'static,
|
||||
E: Serialize + 'static
|
||||
{
|
||||
// Similar to the client, since `Options` is not `Send`, we take the `TlsAcceptor` when it is
|
||||
// available.
|
||||
#[cfg(feature = "tls")]
|
||||
let acceptor = match _options.tls_acceptor {
|
||||
Some(tls_acceptor) => Acceptor::Tls(tls_acceptor),
|
||||
None => Acceptor::Tcp,
|
||||
};
|
||||
#[cfg(not(feature = "tls"))]
|
||||
let acceptor = Acceptor::Tcp;
|
||||
|
||||
listen_with(new_service, addr, handle, acceptor)
|
||||
listen_with(new_service, addr, handle, Acceptor::from(options))
|
||||
}
|
||||
|
||||
/// A handle to a bound server. Must be run to start serving requests.
|
||||
#[must_use = "A server does nothing until `run` is called."]
|
||||
pub struct Handle {
|
||||
reactor: reactor::Core,
|
||||
addr: SocketAddr,
|
||||
@@ -105,12 +164,44 @@ impl Handle {
|
||||
}
|
||||
}
|
||||
|
||||
/// The future representing a running server.
|
||||
#[doc(hidden)]
|
||||
pub struct Listen<S, Req, Resp, E>
|
||||
where S: NewService<Request = Result<Req, bincode::Error>,
|
||||
Response = Response<Resp, E>,
|
||||
Error = io::Error> + 'static,
|
||||
Req: Deserialize + 'static,
|
||||
Resp: Serialize + 'static,
|
||||
E: Serialize + 'static
|
||||
{
|
||||
inner: future::MapErr<stream::ForEach<stream::AndThen<Incoming, Acceptor, Accept>,
|
||||
Bind<S>,
|
||||
io::Result<()>>,
|
||||
fn(io::Error)>,
|
||||
}
|
||||
|
||||
impl<S, Req, Resp, E> Future for Listen<S, Req, Resp, E>
|
||||
where S: NewService<Request = Result<Req, bincode::Error>,
|
||||
Response = Response<Resp, E>,
|
||||
Error = io::Error> + 'static,
|
||||
Req: Deserialize + 'static,
|
||||
Resp: Serialize + 'static,
|
||||
E: Serialize + 'static
|
||||
{
|
||||
type Item = ();
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<(), ()> {
|
||||
self.inner.poll()
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawns a service that binds to the given address using the given handle.
|
||||
fn listen_with<S, Req, Resp, E>(new_service: S,
|
||||
addr: SocketAddr,
|
||||
handle: &reactor::Handle,
|
||||
_acceptor: Acceptor)
|
||||
-> io::Result<(SocketAddr, impl Future<Item = (), Error = ()>)>
|
||||
acceptor: Acceptor)
|
||||
-> io::Result<(SocketAddr, Listen<S, Req, Resp, E>)>
|
||||
where S: NewService<Request = Result<Req, bincode::Error>,
|
||||
Response = Response<Resp, E>,
|
||||
Error = io::Error> + 'static,
|
||||
@@ -121,29 +212,85 @@ fn listen_with<S, Req, Resp, E>(new_service: S,
|
||||
let listener = listener(&addr, handle)?;
|
||||
let addr = listener.local_addr()?;
|
||||
|
||||
let handle2 = handle.clone();
|
||||
let handle = handle.clone();
|
||||
|
||||
let server = listener.incoming()
|
||||
.and_then(move |(socket, _)| {
|
||||
#[cfg(feature = "tls")]
|
||||
match _acceptor {
|
||||
Acceptor::Tls(ref tls_acceptor) => {
|
||||
future::Either::A(tls_acceptor.accept_async(socket)
|
||||
.map(StreamType::Tls)
|
||||
.map_err(native_to_io))
|
||||
}
|
||||
Acceptor::Tcp => future::Either::B(future::ok(StreamType::Tcp(socket))),
|
||||
}
|
||||
#[cfg(not(feature = "tls"))]
|
||||
future::ok(socket)
|
||||
let inner = listener.incoming()
|
||||
.and_then(acceptor)
|
||||
.for_each(Bind {
|
||||
handle: handle,
|
||||
new_service: new_service,
|
||||
})
|
||||
.for_each(move |socket| {
|
||||
Proto::new().bind_server(&handle2, socket, new_service.new_service()?);
|
||||
.map_err(log_err as _);
|
||||
Ok((addr, Listen { inner: inner }))
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.map_err(|e| error!("While processing incoming connections: {}", e));
|
||||
Ok((addr, server))
|
||||
fn log_err(e: io::Error) {
|
||||
error!("While processing incoming connections: {}", e);
|
||||
}
|
||||
|
||||
struct Bind<S> {
|
||||
handle: reactor::Handle,
|
||||
new_service: S,
|
||||
}
|
||||
|
||||
impl<S, Req, Resp, E> Bind<S>
|
||||
where S: NewService<Request = Result<Req, bincode::Error>,
|
||||
Response = Response<Resp, E>,
|
||||
Error = io::Error> + 'static,
|
||||
Req: Deserialize + 'static,
|
||||
Resp: Serialize + 'static,
|
||||
E: Serialize + 'static
|
||||
{
|
||||
fn bind<I>(&self, socket: I) -> io::Result<()>
|
||||
where I: Io + 'static
|
||||
{
|
||||
Proto::new().bind_server(&self.handle, socket, self.new_service.new_service()?);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, S, Req, Resp, E> FnOnce<(I,)> for Bind<S>
|
||||
where I: Io + 'static,
|
||||
S: NewService<Request = Result<Req, bincode::Error>,
|
||||
Response = Response<Resp, E>,
|
||||
Error = io::Error> + 'static,
|
||||
Req: Deserialize + 'static,
|
||||
Resp: Serialize + 'static,
|
||||
E: Serialize + 'static
|
||||
{
|
||||
type Output = io::Result<()>;
|
||||
|
||||
extern "rust-call" fn call_once(self, (socket,): (I,)) -> io::Result<()> {
|
||||
self.bind(socket)
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, S, Req, Resp, E> FnMut<(I,)> for Bind<S>
|
||||
where I: Io + 'static,
|
||||
S: NewService<Request = Result<Req, bincode::Error>,
|
||||
Response = Response<Resp, E>,
|
||||
Error = io::Error> + 'static,
|
||||
Req: Deserialize + 'static,
|
||||
Resp: Serialize + 'static,
|
||||
E: Serialize + 'static
|
||||
{
|
||||
extern "rust-call" fn call_mut(&mut self, (socket,): (I,)) -> io::Result<()> {
|
||||
self.bind(socket)
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, S, Req, Resp, E> Fn<(I,)> for Bind<S>
|
||||
where I: Io + 'static,
|
||||
S: NewService<Request = Result<Req, bincode::Error>,
|
||||
Response = Response<Resp, E>,
|
||||
Error = io::Error> + 'static,
|
||||
Req: Deserialize + 'static,
|
||||
Resp: Serialize + 'static,
|
||||
E: Serialize + 'static
|
||||
{
|
||||
extern "rust-call" fn call(&self, (socket,): (I,)) -> io::Result<()> {
|
||||
self.bind(socket)
|
||||
}
|
||||
}
|
||||
|
||||
fn listener(addr: &SocketAddr, handle: &reactor::Handle) -> io::Result<TcpListener> {
|
||||
|
||||
Reference in New Issue
Block a user