diff --git a/Cargo.toml b/Cargo.toml index 477290c..5c6e698 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,24 +11,23 @@ readme = "README.md" description = "An RPC framework for Rust with a focus on ease of use." [dependencies] -bincode = "1.0.0-alpha" +bincode = "1.0.0-alpha2" byteorder = "1.0" cfg-if = "0.1.0" -bytes = "0.3" futures = "0.1.7" lazy_static = "0.2" log = "0.3" -native-tls = { version = "0.1.1", optional = true } -scoped-pool = "1.0" +net2 = "0.2" serde = "0.9" serde_derive = "0.9" tarpc-plugins = { path = "src/plugins" } -take = "0.1" -tokio-service = "0.1" -tokio-proto = "0.1" tokio-core = "0.1" +tokio-proto = "0.1" +tokio-service = "0.1" + +# Optional dependencies +native-tls = { version = "0.1.1", optional = true } tokio-tls = { version = "0.1", optional = true } -net2 = "0.2" [dev-dependencies] chrono = "0.2" diff --git a/README.md b/README.md index da7b06c..0604e41 100644 --- a/README.md +++ b/README.md @@ -50,10 +50,12 @@ tarpc-plugins = { git = "https://github.com/google/tarpc" } extern crate futures; #[macro_use] extern crate tarpc; +extern crate tokio_core; use tarpc::{client, server}; use tarpc::client::sync::Connect; -use tarpc::util::Never; +use tarpc::util::{FirstSocketAddr, Never}; +use tokio_core::reactor; service! { rpc hello(name: String) -> String; @@ -69,9 +71,11 @@ impl SyncService for HelloServer { } fn main() { - let addr = "localhost:10000"; - HelloServer.listen(addr, server::Options::default()).unwrap(); - let client = SyncClient::connect(addr, client::Options::default()).unwrap(); + let reactor = reactor::Core::new().unwrap(); + let addr = HelloServer.listen("localhost:0".first_socket_addr(), + server::Options::default()) + .unwrap(); + let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); } ``` @@ -101,7 +105,7 @@ extern crate tokio_core; use futures::Future; use tarpc::{client, server}; -use tarpc::client::future::Connect; +use tarpc::client::future::ClientExt; use tarpc::util::{FirstSocketAddr, Never}; use tokio_core::reactor; @@ -121,9 +125,10 @@ impl FutureService for HelloServer { } fn main() { - let addr = "localhost:10000".first_socket_addr(); let mut core = reactor::Core::new().unwrap(); - HelloServer.listen(addr, server::Options::default().handle(core.handle())).wait().unwrap(); + let addr = HelloServer.listen("localhost:10000".first_socket_addr(), + server::Options::from(core.handle())) + .unwrap(); let options = client::Options::default().handle(core.handle()); core.run(FutureClient::connect(addr, options) .map_err(tarpc::Error::from) @@ -169,7 +174,7 @@ extern crate tokio_core; use futures::Future; use tarpc::{client, server}; -use tarpc::client::future::Connect; +use tarpc::client::future::ClientExt; use tarpc::util::{FirstSocketAddr, Never}; use tokio_core::reactor; use tarpc::native_tls::{Pkcs12, TlsAcceptor}; @@ -196,14 +201,15 @@ fn get_acceptor() -> TlsAcceptor { } fn main() { - let addr = "localhost:10000".first_socket_addr(); let mut core = reactor::Core::new().unwrap(); let acceptor = get_acceptor(); - HelloServer.listen(addr, server::Options::default() - .handle(core.handle()) - .tls(acceptor)).wait().unwrap(); - let options = client::Options::default().handle(core.handle() - .tls(client::tls::Context::new("foobar.com").unwrap())); + let addr = HelloServer.listen("localhost:10000".first_socket_addr(), + server::Options::from(core.handle()) + .tls(acceptor)) + .unwrap(); + let options = client::Options::default() + .handle(core.handle()) + .tls(client::tls::Context::new("foobar.com").unwrap())); core.run(FutureClient::connect(addr, options) .map_err(tarpc::Error::from) .and_then(|client| client.hello("Mom".to_string())) diff --git a/benches/latency.rs b/benches/latency.rs index 3c4e49b..768bd3d 100644 --- a/benches/latency.rs +++ b/benches/latency.rs @@ -12,13 +12,14 @@ extern crate tarpc; extern crate test; extern crate env_logger; extern crate futures; +extern crate tokio_core; -use futures::Future; use tarpc::{client, server}; -use tarpc::client::sync::Connect; +use tarpc::client::future::ClientExt; use tarpc::util::{FirstSocketAddr, Never}; #[cfg(test)] use test::Bencher; +use tokio_core::reactor; service! { rpc ack(); @@ -38,8 +39,12 @@ impl FutureService for Server { #[bench] fn latency(bencher: &mut Bencher) { let _ = env_logger::init(); - let addr = Server.listen("localhost:0".first_socket_addr(), server::Options::default()).wait().unwrap(); - let client = SyncClient::connect(addr, client::Options::default()).unwrap(); + let mut reactor = reactor::Core::new().unwrap(); + let addr = Server.listen("localhost:0".first_socket_addr(), + &reactor.handle(), + server::Options::default()) + .unwrap(); + let client = reactor.run(FutureClient::connect(addr, client::Options::default())).unwrap(); - bencher.iter(|| { client.ack().unwrap(); }); + bencher.iter(|| reactor.run(client.ack()).unwrap()); } diff --git a/examples/concurrency.rs b/examples/concurrency.rs index f6c420f..6bdba37 100644 --- a/examples/concurrency.rs +++ b/examples/concurrency.rs @@ -26,7 +26,7 @@ use std::sync::{Arc, mpsc}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::{Duration, Instant}; use tarpc::{client, server}; -use tarpc::client::future::Connect; +use tarpc::client::future::ClientExt; use tarpc::util::{FirstSocketAddr, Never}; use tokio_core::reactor; @@ -166,10 +166,11 @@ fn main() { .map(Result::unwrap) .unwrap_or(4); + let mut reactor = reactor::Core::new().unwrap(); let addr = Server::new() .listen("localhost:0".first_socket_addr(), + &reactor.handle(), server::Options::default()) - .wait() .unwrap(); info!("Server listening on {}.", addr); @@ -186,8 +187,5 @@ fn main() { info!("Starting..."); - // The driver of the main future. - let mut core = reactor::Core::new().unwrap(); - - core.run(run).unwrap(); + reactor.run(run).unwrap(); } diff --git a/examples/pubsub.rs b/examples/pubsub.rs index e6d5ca1..dab209d 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -10,20 +10,21 @@ extern crate env_logger; extern crate futures; #[macro_use] extern crate tarpc; -extern crate tokio_proto as tokio; +extern crate tokio_core; -use futures::{BoxFuture, Future}; +use futures::{Future, future}; use publisher::FutureServiceExt as PublisherExt; +use std::cell::RefCell; use std::collections::HashMap; use std::net::SocketAddr; -use std::sync::{Arc, Mutex}; +use std::rc::Rc; use std::thread; use std::time::Duration; use subscriber::FutureServiceExt as SubscriberExt; use tarpc::{client, server}; -use tarpc::client::future::Connect as Fc; -use tarpc::client::sync::Connect as Sc; +use tarpc::client::future::ClientExt; use tarpc::util::{FirstSocketAddr, Message, Never}; +use tokio_core::reactor; pub mod subscriber { service! { @@ -57,39 +58,39 @@ impl subscriber::FutureService for Subscriber { } impl Subscriber { - fn listen(id: u32) -> SocketAddr { + fn listen(id: u32, handle: &reactor::Handle, options: server::Options) -> SocketAddr { Subscriber { id: id } - .listen("localhost:0".first_socket_addr(), - server::Options::default()) - .wait() + .listen("localhost:0".first_socket_addr(), handle, options) .unwrap() } } #[derive(Clone, Debug)] struct Publisher { - clients: Arc>>, + clients: Rc>>, } impl Publisher { fn new() -> Publisher { - Publisher { clients: Arc::new(Mutex::new(HashMap::new())) } + Publisher { clients: Rc::new(RefCell::new(HashMap::new())) } } } impl publisher::FutureService for Publisher { - type BroadcastFut = BoxFuture<(), Never>; + type BroadcastFut = Box>; fn broadcast(&self, message: String) -> Self::BroadcastFut { - futures::collect(self.clients - .lock() - .unwrap() - .values_mut() - // Ignore failing subscribers. - .map(move |client| client.receive(message.clone()).then(|_| Ok(()))) - .collect::>()) - .map(|_| ()) - .boxed() + let acks = self.clients + .borrow() + .values() + .map(move |client| client.receive(message.clone()) + // Ignore failing subscribers. In a real pubsub, + // you'd want to continually retry until subscribers + // ack. + .then(|_| Ok(()))) + // Collect to a vec to end the borrow on `self.clients`. + .collect::>(); + Box::new(future::join_all(acks).map(|_| ())) } type SubscribeFut = Box>; @@ -99,42 +100,45 @@ impl publisher::FutureService for Publisher { Box::new(subscriber::FutureClient::connect(address, client::Options::default()) .map(move |subscriber| { println!("Subscribing {}.", id); - clients.lock().unwrap().insert(id, subscriber); + clients.borrow_mut().insert(id, subscriber); () }) .map_err(|e| e.to_string().into())) } - type UnsubscribeFut = BoxFuture<(), Never>; + type UnsubscribeFut = Box>; fn unsubscribe(&self, id: u32) -> Self::UnsubscribeFut { println!("Unsubscribing {}", id); - self.clients.lock().unwrap().remove(&id).unwrap(); + self.clients.borrow_mut().remove(&id).unwrap(); futures::finished(()).boxed() } } fn main() { let _ = env_logger::init(); + let mut reactor = reactor::Core::new().unwrap(); let publisher_addr = Publisher::new() .listen("localhost:0".first_socket_addr(), + &reactor.handle(), server::Options::default()) - .wait() .unwrap(); - let publisher_client = - publisher::SyncClient::connect(publisher_addr, client::Options::default()).unwrap(); + let subscriber1 = Subscriber::listen(0, &reactor.handle(), server::Options::default()); + let subscriber2 = Subscriber::listen(1, &reactor.handle(), server::Options::default()); - let subscriber1 = Subscriber::listen(0); - publisher_client.subscribe(0, subscriber1).unwrap(); - - let subscriber2 = Subscriber::listen(1); - publisher_client.subscribe(1, subscriber2).unwrap(); - - - println!("Broadcasting..."); - publisher_client.broadcast("hello to all".to_string()).unwrap(); - publisher_client.unsubscribe(1).unwrap(); - publisher_client.broadcast("hello again".to_string()).unwrap(); + let publisher = + reactor.run(publisher::FutureClient::connect(publisher_addr, client::Options::default())) + .unwrap(); + reactor.run(publisher.subscribe(0, subscriber1) + .and_then(|_| publisher.subscribe(1, subscriber2)) + .map_err(|e| panic!(e)) + .and_then(|_| { + println!("Broadcasting..."); + publisher.broadcast("hello to all".to_string()) + }) + .and_then(|_| publisher.unsubscribe(1)) + .and_then(|_| publisher.broadcast("hi again".to_string()))) + .unwrap(); thread::sleep(Duration::from_millis(300)); } diff --git a/examples/readme_errors.rs b/examples/readme_errors.rs index c3f0a9c..60c340a 100644 --- a/examples/readme_errors.rs +++ b/examples/readme_errors.rs @@ -11,11 +11,13 @@ extern crate futures; extern crate tarpc; #[macro_use] extern crate serde_derive; +extern crate tokio_core; use std::error::Error; use std::fmt; use tarpc::{client, server}; -use tarpc::client::sync::Connect; +use tarpc::client::sync::ClientExt; +use tarpc::util::FirstSocketAddr; service! { rpc hello(name: String) -> String | NoNameGiven; @@ -50,8 +52,10 @@ impl SyncService for HelloServer { } fn main() { - let addr = HelloServer.listen("localhost:10000", server::Options::default()).unwrap(); - let client = SyncClient::connect(addr, client::Options::default()).unwrap(); + let addr = HelloServer.listen("localhost:10000".first_socket_addr(), + server::Options::default()) + .unwrap(); + let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); println!("{}", client.hello("".to_string()).unwrap_err()); } diff --git a/examples/readme_futures.rs b/examples/readme_futures.rs index 5553aed..375b5ba 100644 --- a/examples/readme_futures.rs +++ b/examples/readme_futures.rs @@ -13,7 +13,7 @@ extern crate tokio_core; use futures::Future; use tarpc::{client, server}; -use tarpc::client::future::Connect; +use tarpc::client::future::ClientExt; use tarpc::util::{FirstSocketAddr, Never}; use tokio_core::reactor; @@ -33,11 +33,13 @@ impl FutureService for HelloServer { } fn main() { - let addr = "localhost:10000".first_socket_addr(); - let mut core = reactor::Core::new().unwrap(); - HelloServer.listen(addr, server::Options::default().handle(core.handle())).wait().unwrap(); - let options = client::Options::default().handle(core.handle()); - core.run(FutureClient::connect(addr, options) + let mut reactor = reactor::Core::new().unwrap(); + let addr = HelloServer.listen("localhost:10000".first_socket_addr(), + &reactor.handle(), + server::Options::default()) + .unwrap(); + let options = client::Options::default().handle(reactor.handle()); + reactor.run(FutureClient::connect(addr, options) .map_err(tarpc::Error::from) .and_then(|client| client.hello("Mom".to_string())) .map(|resp| println!("{}", resp))) diff --git a/examples/readme_sync.rs b/examples/readme_sync.rs index 80055d4..e2097f6 100644 --- a/examples/readme_sync.rs +++ b/examples/readme_sync.rs @@ -10,10 +10,11 @@ extern crate futures; #[macro_use] extern crate tarpc; +extern crate tokio_core; use tarpc::{client, server}; -use tarpc::client::sync::Connect; -use tarpc::util::Never; +use tarpc::client::sync::ClientExt; +use tarpc::util::{FirstSocketAddr, Never}; service! { rpc hello(name: String) -> String; @@ -29,8 +30,9 @@ impl SyncService for HelloServer { } fn main() { - let addr = "localhost:10000"; - HelloServer.listen(addr, server::Options::default()).unwrap(); - let client = SyncClient::connect(addr, client::Options::default()).unwrap(); + let addr = HelloServer.listen("localhost:0".first_socket_addr(), + server::Options::default()) + .unwrap(); + let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); } diff --git a/examples/server_calling_server.rs b/examples/server_calling_server.rs index dcf5c9f..df6629d 100644 --- a/examples/server_calling_server.rs +++ b/examples/server_calling_server.rs @@ -10,15 +10,15 @@ extern crate env_logger; #[macro_use] extern crate tarpc; extern crate futures; +extern crate tokio_core; use add::{FutureService as AddFutureService, FutureServiceExt as AddExt}; use double::{FutureService as DoubleFutureService, FutureServiceExt as DoubleExt}; -use futures::{BoxFuture, Future}; -use std::sync::{Arc, Mutex}; +use futures::{BoxFuture, Future, Stream}; use tarpc::{client, server}; -use tarpc::client::future::Connect as Fc; -use tarpc::client::sync::Connect as Sc; +use tarpc::client::future::ClientExt as Fc; use tarpc::util::{FirstSocketAddr, Message, Never}; +use tokio_core::reactor; pub mod add { service! { @@ -49,12 +49,12 @@ impl AddFutureService for AddServer { #[derive(Clone)] struct DoubleServer { - client: Arc>, + client: add::FutureClient, } impl DoubleServer { fn new(client: add::FutureClient) -> Self { - DoubleServer { client: Arc::new(Mutex::new(client)) } + DoubleServer { client: client } } } @@ -63,8 +63,6 @@ impl DoubleFutureService for DoubleServer { fn double(&self, x: i32) -> Self::DoubleFut { self.client - .lock() - .unwrap() .add(x, x) .map_err(|e| e.to_string().into()) .boxed() @@ -73,22 +71,29 @@ impl DoubleFutureService for DoubleServer { fn main() { let _ = env_logger::init(); + let mut reactor = reactor::Core::new().unwrap(); let add_addr = AddServer.listen("localhost:0".first_socket_addr(), + &reactor.handle(), server::Options::default()) - .wait() - .unwrap(); - let add_client = - add::FutureClient::connect(add_addr, client::Options::default()).wait().unwrap(); - - let double = DoubleServer::new(add_client); - let double_addr = double.listen("localhost:0".first_socket_addr(), - server::Options::default()) - .wait() .unwrap(); - let double_client = double::SyncClient::connect(double_addr, client::Options::default()) + let options = client::Options::default().handle(reactor.handle()); + let add_client = reactor.run(add::FutureClient::connect(add_addr, options)).unwrap(); + + let double_addr = DoubleServer::new(add_client) + .listen("localhost:0".first_socket_addr(), + &reactor.handle(), + server::Options::default()) + .unwrap(); + + let double_client = + reactor.run(double::FutureClient::connect(double_addr, client::Options::default())) + .unwrap(); + reactor.run(futures::stream::futures_unordered((0..5).map(|i| double_client.double(i))) + .map_err(|e| println!("{}", e)) + .for_each(|i| { + println!("{:?}", i); + Ok(()) + })) .unwrap(); - for i in 0..5 { - println!("{:?}", double_client.double(i).unwrap()); - } } diff --git a/examples/throughput.rs b/examples/throughput.rs index d260b98..dce50ca 100644 --- a/examples/throughput.rs +++ b/examples/throughput.rs @@ -13,16 +13,17 @@ extern crate tarpc; extern crate env_logger; extern crate futures; extern crate serde; +extern crate tokio_core; -use futures::Future; use std::io::{Read, Write, stdout}; use std::net; use std::sync::Arc; use std::thread; use std::time; use tarpc::{client, server}; -use tarpc::client::sync::Connect; +use tarpc::client::sync::ClientExt; use tarpc::util::{FirstSocketAddr, Never}; +use tokio_core::reactor; lazy_static! { static ref BUF: Arc = Arc::new(gen_vec(CHUNK_SIZE as usize).into()); @@ -54,11 +55,12 @@ impl FutureService for Server { const CHUNK_SIZE: u32 = 1 << 19; fn bench_tarpc(target: u64) { + let reactor = reactor::Core::new().unwrap(); let addr = Server.listen("localhost:0".first_socket_addr(), + &reactor.handle(), server::Options::default()) - .wait() .unwrap(); - let client = SyncClient::connect(addr, client::Options::default()).unwrap(); + let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); let start = time::Instant::now(); let mut nread = 0; while nread < target { diff --git a/examples/two_clients.rs b/examples/two_clients.rs index 3ad0366..163e765 100644 --- a/examples/two_clients.rs +++ b/examples/two_clients.rs @@ -13,13 +13,14 @@ extern crate tarpc; extern crate bincode; extern crate env_logger; extern crate futures; +extern crate tokio_core; use bar::FutureServiceExt as BarExt; use baz::FutureServiceExt as BazExt; -use futures::Future; use tarpc::{client, server}; -use tarpc::client::sync::Connect; +use tarpc::client::sync::ClientExt; use tarpc::util::{FirstSocketAddr, Never}; +use tokio_core::reactor; mod bar { service! { @@ -59,17 +60,27 @@ macro_rules! pos { fn main() { let _ = env_logger::init(); - let bar_addr = Bar.listen("localhost:0".first_socket_addr(), - server::Options::default()) - .wait() - .unwrap(); - let baz_addr = Baz.listen("localhost:0".first_socket_addr(), - server::Options::default()) - .wait() - .unwrap(); + let mut bar_client = { + let reactor = reactor::Core::new().unwrap(); + let addr = Bar.listen("localhost:0".first_socket_addr(), + &reactor.handle(), + server::Options::default()) + .unwrap(); + // TODO: Need to set up each client with its own reactor. Should it be shareable across + // multiple clients? e.g. Rc> or something similar? + bar::SyncClient::connect(addr, client::Options::default()).unwrap() + }; + + let mut baz_client = { + // Need to set up each client with its own reactor. + let reactor = reactor::Core::new().unwrap(); + let addr = Baz.listen("localhost:0".first_socket_addr(), + &reactor.handle(), + server::Options::default()) + .unwrap(); + baz::SyncClient::connect(addr, client::Options::default()).unwrap() + }; - let bar_client = bar::SyncClient::connect(bar_addr, client::Options::default()).unwrap(); - let baz_client = baz::SyncClient::connect(baz_addr, client::Options::default()).unwrap(); info!("Result: {:?}", bar_client.bar(17)); diff --git a/src/client.rs b/src/client.rs index a1a2b19..00543c0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,26 +3,12 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use {Reactor, WireError}; -use bincode::serde::DeserializeError; -use futures::{self, Future}; -use protocol::Proto; #[cfg(feature = "tls")] use self::tls::*; -use serde::{Deserialize, Serialize}; -use std::fmt; -use std::io; -use stream_type::StreamType; +use {WireError, bincode}; use tokio_core::reactor; -use tokio_proto::BindClient as ProtoBindClient; -use tokio_proto::multiplex::Multiplex; -use tokio_service::Service; -type WireResponse = Result>, DeserializeError>; -type ResponseFuture = futures::Map< as Service>::Future, - fn(WireResponse) -> Result>>; -type BindClient = >> as - ProtoBindClient>::BindClient; +type WireResponse = Result>, bincode::Error>; /// TLS-specific functionality #[cfg(feature = "tls")] @@ -64,74 +50,6 @@ pub mod tls { } } -/// A client that impls `tokio_service::Service` that writes and reads bytes. -/// -/// Typically, this would be combined with a serialization pre-processing step -/// and a deserialization post-processing step. -#[doc(hidden)] -pub struct Client - where Req: Serialize + 'static, - Resp: Deserialize + 'static, - E: Deserialize + 'static -{ - inner: BindClient, -} - -impl Clone for Client - where Req: Serialize + 'static, - Resp: Deserialize + 'static, - E: Deserialize + 'static -{ - fn clone(&self) -> Self { - Client { inner: self.inner.clone() } - } -} - -impl Service for Client - where Req: Serialize + Sync + Send + 'static, - Resp: Deserialize + Sync + Send + 'static, - E: Deserialize + Sync + Send + 'static -{ - type Request = Req; - type Response = Result>; - type Error = io::Error; - type Future = ResponseFuture; - - fn call(&self, request: Self::Request) -> Self::Future { - self.inner.call(request).map(Self::map_err) - } -} - -impl Client - where Req: Serialize + 'static, - Resp: Deserialize + 'static, - E: Deserialize + 'static -{ - fn new(inner: BindClient) -> Self - where Req: Serialize + Sync + Send + 'static, - Resp: Deserialize + Sync + Send + 'static, - E: Deserialize + Sync + Send + 'static - { - Client { inner: inner } - } - - fn map_err(resp: WireResponse) -> Result> { - resp.map(|r| r.map_err(::Error::from)) - .map_err(::Error::ClientDeserialize) - .and_then(|r| r) - } -} - -impl fmt::Debug for Client - where Req: Serialize + 'static, - Resp: Deserialize + 'static, - E: Deserialize + 'static -{ - fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!(f, "Client {{ .. }}") - } -} - /// Additional options to configure how the client connects and operates. #[derive(Default)] pub struct Options { @@ -141,13 +59,13 @@ pub struct Options { } impl Options { - /// Connect using the given reactor handle. + /// Drive using the given reactor handle. Only used by `FutureClient`s. pub fn handle(mut self, handle: reactor::Handle) -> Self { self.reactor = Some(Reactor::Handle(handle)); self } - /// Connect using the given reactor remote. + /// Drive using the given reactor remote. Only used by `FutureClient`s. pub fn remote(mut self, remote: reactor::Remote) -> Self { self.reactor = Some(Reactor::Remote(remote)); self @@ -161,30 +79,105 @@ impl Options { } } +enum Reactor { + Handle(reactor::Handle), + Remote(reactor::Remote), +} + /// Exposes a trait for connecting asynchronously to servers. pub mod future { - use {REMOTE, Reactor}; - use futures::{self, Async, Future, future}; + use super::{Options, Reactor, WireResponse}; + use {REMOTE, WireError}; + #[cfg(feature = "tls")] + use errors::native_to_io; + use futures::{self, Future, future}; use protocol::Proto; use serde::{Deserialize, Serialize}; + use std::fmt; use std::io; - use std::marker::PhantomData; use std::net::SocketAddr; use stream_type::StreamType; - use super::{Client, Options}; - use tokio_core::net::{TcpStream, TcpStreamNew}; + use tokio_core::net::TcpStream; use tokio_core::reactor; - use tokio_proto::BindClient; - cfg_if! { - if #[cfg(feature = "tls")] { - use tokio_tls::{ConnectAsync, TlsStream, TlsConnectorExt}; - use super::tls::Context; - use errors::native_to_io; - } else {} + use tokio_proto::BindClient as ProtoBindClient; + use tokio_proto::multiplex::Multiplex; + use tokio_service::Service; + #[cfg(feature = "tls")] + use tokio_tls::TlsConnectorExt; + + #[doc(hidden)] + pub struct Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static + { + inner: BindClient, } - /// Types that can connect to a server asynchronously. - pub trait Connect: Sized { + impl Clone for Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static + { + fn clone(&self) -> Self { + Client { inner: self.inner.clone() } + } + } + + impl Service for Client + where Req: Serialize + Sync + Send + 'static, + Resp: Deserialize + Sync + Send + 'static, + E: Deserialize + Sync + Send + 'static + { + type Request = Req; + type Response = Resp; + type Error = ::Error; + type Future = ResponseFuture; + + fn call(&self, request: Self::Request) -> Self::Future { + fn identity(t: T) -> T { + t + } + self.inner + .call(request) + .map(Self::map_err as _) + .map_err(::Error::from as _) + .and_then(identity as _) + } + } + + impl Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static + { + fn new(inner: BindClient) -> Self + where Req: Serialize + Sync + Send + 'static, + Resp: Deserialize + Sync + Send + 'static, + E: Deserialize + Sync + Send + 'static + { + Client { inner: inner } + } + + fn map_err(resp: WireResponse) -> Result> { + resp.map(|r| r.map_err(::Error::from)) + .map_err(::Error::ClientSerialize) + .and_then(|r| r) + } + } + + impl fmt::Debug for Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static + { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "Client {{ .. }}") + } + } + + /// Extension methods for clients. + pub trait ClientExt: Sized { /// The type of the future returned when calling `connect`. type ConnectFut: Future; @@ -192,100 +185,12 @@ pub mod future { fn connect(addr: SocketAddr, options: Options) -> Self::ConnectFut; } - type ConnectFutureInner = future::Either, MultiplexConnect>, futures::Flatten< - futures::MapErr>>, - fn(futures::Canceled) -> io::Error>>>; - /// A future that resolves to a `Client` or an `io::Error`. - #[doc(hidden)] - pub struct ConnectFuture - where Req: Serialize + 'static, - Resp: Deserialize + 'static, - E: Deserialize + 'static - { - #[cfg(not(feature = "tls"))] - #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] - inner: ConnectFutureInner>, - #[cfg(feature = "tls")] - #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] - inner: ConnectFutureInner, futures::Map, - fn(::native_tls::Error) -> io::Error>, fn(TlsStream) -> StreamType>>>, - } + pub type ConnectFuture = + futures::Flatten>>, + fn(futures::Canceled) -> io::Error>>; - impl Future for ConnectFuture - where Req: Serialize + Sync + Send + 'static, - Resp: Deserialize + Sync + Send + 'static, - E: Deserialize + Sync + Send + 'static - { - type Item = Client; - type Error = io::Error; - - fn poll(&mut self) -> futures::Poll { - // Ok to unwrap because we ensure the oneshot is always completed. - match Future::poll(&mut self.inner)? { - Async::Ready(client) => Ok(Async::Ready(client)), - Async::NotReady => Ok(Async::NotReady), - } - } - } - - struct MultiplexConnect(reactor::Handle, PhantomData<(Req, Resp, E)>); - - impl MultiplexConnect { - fn new(handle: reactor::Handle) -> Self { - MultiplexConnect(handle, PhantomData) - } - } - - impl FnOnce<(I,)> for MultiplexConnect - where Req: Serialize + Sync + Send + 'static, - Resp: Deserialize + Sync + Send + 'static, - E: Deserialize + Sync + Send + 'static, - I: Into - { - type Output = Client; - - extern "rust-call" fn call_once(self, (stream,): (I,)) -> Self::Output { - Client::new(Proto::new().bind_client(&self.0, stream.into())) - } - } - - /// Provides the connection Fn impl for Tls - struct ConnectFn { - #[cfg(feature = "tls")] - tls_ctx: Option, - } - - impl FnOnce<(TcpStream,)> for ConnectFn { - #[cfg(feature = "tls")] - #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] - type Output = future::Either, - futures::Map, - fn(::native_tls::Error) - -> io::Error>, - fn(TlsStream) -> StreamType>>; - #[cfg(not(feature = "tls"))] - type Output = future::FutureResult; - - extern "rust-call" fn call_once(self, (tcp,): (TcpStream,)) -> Self::Output { - #[cfg(feature = "tls")] - match self.tls_ctx { - None => future::Either::A(future::ok(StreamType::from(tcp))), - Some(tls_ctx) => { - future::Either::B(tls_ctx.tls_connector - .connect_async(&tls_ctx.domain, tcp) - .map_err(native_to_io as fn(_) -> _) - .map(StreamType::from as fn(_) -> _)) - } - } - #[cfg(not(feature = "tls"))] - future::ok(StreamType::from(tcp)) - } - } - - impl Connect for Client + impl ClientExt for Client where Req: Serialize + Sync + Send + 'static, Resp: Deserialize + Sync + Send + 'static, E: Deserialize + Sync + Send + 'static @@ -300,90 +205,128 @@ pub mod future { #[cfg(feature = "tls")] let tls_ctx = options.tls_ctx.take(); - let setup = move |tx: futures::sync::oneshot::Sender<_>| { - move |handle: &reactor::Handle| { - let handle2 = handle.clone(); - TcpStream::connect(&addr, handle) - .and_then(move |socket| { - #[cfg(feature = "tls")] - match tls_ctx { - Some(tls_ctx) => { - future::Either::A(tls_ctx.tls_connector - .connect_async(&tls_ctx.domain, socket) - .map(StreamType::Tls) - .map_err(native_to_io)) - } - None => future::Either::B(future::ok(StreamType::Tcp(socket))), + let connect = move |handle: &reactor::Handle| { + let handle2 = handle.clone(); + TcpStream::connect(&addr, handle) + .and_then(move |socket| { + #[cfg(feature = "tls")] + match tls_ctx { + Some(tls_ctx) => { + future::Either::A(tls_ctx.tls_connector + .connect_async(&tls_ctx.domain, socket) + .map(StreamType::Tls) + .map_err(native_to_io)) } - #[cfg(not(feature = "tls"))] - future::ok(StreamType::Tcp(socket)) - }) - .map(move |tcp| Client::new(Proto::new().bind_client(&handle2, tcp))) - .then(move |result| { - tx.complete(result); - Ok(()) - }) - } + None => future::Either::B(future::ok(StreamType::Tcp(socket))), + } + #[cfg(not(feature = "tls"))] + future::ok(StreamType::Tcp(socket)) + }) + .map(move |tcp| Client::new(Proto::new().bind_client(&handle2, tcp))) + }; + let (tx, rx) = futures::oneshot(); + let setup = move |handle: &reactor::Handle| { + connect(handle).then(move |result| { + tx.complete(result); + Ok(()) + }) }; - let rx = match options.reactor { + match options.reactor { Some(Reactor::Handle(handle)) => { - #[cfg(feature = "tls")] - let connect_fn = ConnectFn { tls_ctx: options.tls_ctx }; - #[cfg(not(feature = "tls"))] - let connect_fn = ConnectFn {}; - let tcp = TcpStream::connect(&addr, &handle) - .and_then(connect_fn) - .map(MultiplexConnect::new(handle)); - return ConnectFuture { inner: future::Either::A(tcp) }; + handle.spawn(setup(&handle)); } Some(Reactor::Remote(remote)) => { - let (tx, rx) = futures::oneshot(); - remote.spawn(setup(tx)); - rx + remote.spawn(setup); } None => { - let (tx, rx) = futures::oneshot(); - REMOTE.spawn(setup(tx)); - rx + REMOTE.spawn(setup); } - }; + } fn panic(canceled: futures::Canceled) -> io::Error { unreachable!(canceled) } - ConnectFuture { inner: future::Either::B(rx.map_err(panic as fn(_) -> _).flatten()) } + rx.map_err(panic as _).flatten() } } + + type ResponseFuture = + futures::AndThen as Service>::Future, + fn(WireResponse) -> Result>>, + fn(io::Error) -> ::Error>, + Result>, + fn(Result>) -> Result>>; + type BindClient = + >> + as ProtoBindClient>::BindClient; } /// Exposes a trait for connecting synchronously to servers. pub mod sync { - use client::future::Connect as FutureConnect; - use futures::{Future, future}; + use super::Options; + use super::Reactor; + use super::future::{Client as FutureClient, ClientExt as FutureClientExt}; use serde::{Deserialize, Serialize}; + use std::fmt; use std::io; use std::net::ToSocketAddrs; - use super::{Client, Options}; + use tokio_core::reactor; + use tokio_service::Service; use util::FirstSocketAddr; - /// Types that can connect to a server synchronously. - pub trait Connect: Sized { - /// Connects to a server located at the given address. - fn connect(addr: A, options: Options) -> Result where A: ToSocketAddrs; + #[doc(hidden)] + pub struct Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static + { + inner: FutureClient, + reactor: reactor::Core, } - impl Connect for Client + impl fmt::Debug for Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static + { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "Client {{ .. }}") + } + } + + impl Client where Req: Serialize + Sync + Send + 'static, Resp: Deserialize + Sync + Send + 'static, E: Deserialize + Sync + Send + 'static { - fn connect(addr: A, options: Options) -> Result + /// Drives an RPC call for the given request. + pub fn call(&mut self, request: Req) -> Result> { + self.reactor.run(self.inner.call(request)) + } + } + + /// Extension methods for Clients. + pub trait ClientExt: Sized { + /// Connects to a server located at the given address. + fn connect(addr: A, options: Options) -> io::Result where A: ToSocketAddrs; + } + + impl ClientExt for Client + where Req: Serialize + Sync + Send + 'static, + Resp: Deserialize + Sync + Send + 'static, + E: Deserialize + Sync + Send + 'static + { + fn connect(addr: A, mut options: Options) -> io::Result where A: ToSocketAddrs { + let mut reactor = reactor::Core::new()?; let addr = addr.try_first_socket_addr()?; - - // Wrapped in a lazy future to ensure execution occurs when a task is present. - future::lazy(move || ::connect(addr, options)).wait() + options.reactor = Some(Reactor::Handle(reactor.handle())); + Ok(Client { + inner: reactor.run(FutureClient::connect(addr, options))?, + reactor: reactor, + }) } } } diff --git a/src/errors.rs b/src/errors.rs index 3bb4189..1916f74 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -3,7 +3,6 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use bincode; use serde::{Deserialize, Serialize}; use std::{fmt, io}; use std::error::Error as StdError; @@ -13,23 +12,15 @@ use std::error::Error as StdError; pub enum Error { /// Any IO error. Io(io::Error), - /// Error in deserializing a server response. + /// Error serializing the client request or deserializing the server response. /// /// Typically this indicates a faulty implementation of `serde::Serialize` or /// `serde::Deserialize`. - ClientDeserialize(bincode::serde::DeserializeError), - /// Error in serializing a client request. - /// - /// Typically this indicates a faulty implementation of `serde::Serialize`. - ClientSerialize(bincode::serde::SerializeError), - /// Error in deserializing a client request. + ClientSerialize(::bincode::Error), + /// Error serializing the server response or deserializing the client request. /// /// Typically this indicates a faulty implementation of `serde::Serialize` or /// `serde::Deserialize`. - ServerDeserialize(String), - /// Error in serializing a server response. - /// - /// Typically this indicates a faulty implementation of `serde::Serialize`. ServerSerialize(String), /// The server was unable to reply to the rpc for some reason. /// @@ -41,9 +32,7 @@ pub enum Error { impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { - Error::ClientDeserialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e), Error::ClientSerialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e), - Error::ServerDeserialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e), Error::ServerSerialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e), Error::App(ref e) => fmt::Display::fmt(e, f), Error::Io(ref e) => fmt::Display::fmt(e, f), @@ -54,10 +43,12 @@ impl fmt::Display for Er impl StdError for Error { fn description(&self) -> &str { match *self { - Error::ClientDeserialize(_) => "The client failed to deserialize the server response.", - Error::ClientSerialize(_) => "The client failed to serialize the request.", - Error::ServerDeserialize(_) => "The server failed to deserialize the request.", - Error::ServerSerialize(_) => "The server failed to serialize the response.", + Error::ClientSerialize(_) => { + "The client failed to serialize the request or deserialize the response." + } + Error::ServerSerialize(_) => { + "The server failed to serialize the response or deserialize the request." + } Error::App(ref e) => e.description(), Error::Io(ref e) => e.description(), } @@ -65,9 +56,7 @@ impl StdError for Error< fn cause(&self) -> Option<&StdError> { match *self { - Error::ClientDeserialize(ref e) => e.cause(), Error::ClientSerialize(ref e) => e.cause(), - Error::ServerDeserialize(_) | Error::ServerSerialize(_) | Error::App(_) => None, Error::Io(ref e) => e.cause(), @@ -84,7 +73,6 @@ impl From for Error { impl From> for Error { fn from(err: WireError) -> Self { match err { - WireError::ServerDeserialize(s) => Error::ServerDeserialize(s), WireError::ServerSerialize(s) => Error::ServerSerialize(s), WireError::App(e) => Error::App(e), } @@ -95,9 +83,7 @@ impl From> for Error { #[doc(hidden)] #[derive(Deserialize, Serialize, Clone, Debug)] pub enum WireError { - /// Error in deserializing a client request. - ServerDeserialize(String), - /// Error in serializing server response. + /// Error in serializing the server response or deserializing the client request. ServerSerialize(String), /// The server was unable to reply to the rpc for some reason. App(E), diff --git a/src/lib.rs b/src/lib.rs index da44cec..4c8b479 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,10 +33,12 @@ //! //! #[macro_use] //! extern crate tarpc; +//! extern crate tokio_core; //! //! use tarpc::{client, server}; -//! use tarpc::client::sync::Connect; +//! use tarpc::client::sync::ClientExt; //! use tarpc::util::Never; +//! use tokio_core::reactor; //! //! service! { //! rpc hello(name: String) -> String; @@ -53,8 +55,9 @@ //! //! fn main() { //! let addr = "localhost:10000"; +//! let reactor = reactor::Core::new().unwrap(); //! let _server = HelloServer.listen(addr, server::Options::default()); -//! let client = SyncClient::connect(addr, client::Options::default()).unwrap(); +//! let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); //! println!("{}", client.hello("Mom".to_string()).unwrap()); //! } //! ``` @@ -70,7 +73,7 @@ //! extern crate tarpc; //! //! use tarpc::{client, server}; -//! use tarpc::client::sync::Connect; +//! use tarpc::client::sync::ClientExt; //! use tarpc::util::Never; //! use tarpc::native_tls::{TlsAcceptor, Pkcs12}; //! @@ -97,7 +100,7 @@ //! let addr = "localhost:10000"; //! let acceptor = get_acceptor(); //! let _server = HelloServer.listen(addr, server::Options::default().tls(acceptor)); -//! let client = SyncClient::connect(addr, +//! let mut client = SyncClient::connect(addr, //! client::Options::default() //! .tls(client::tls::Context::new("foobar.com").unwrap())) //! .unwrap(); @@ -106,12 +109,10 @@ //! ``` //! #![deny(missing_docs)] -#![feature(plugin, conservative_impl_trait, never_type, unboxed_closures, fn_traits, - specialization)] +#![feature(plugin, never_type, struct_field_attributes)] #![plugin(tarpc_plugins)] extern crate byteorder; -extern crate bytes; #[macro_use] extern crate lazy_static; #[macro_use] @@ -119,7 +120,6 @@ extern crate log; extern crate net2; #[macro_use] extern crate serde_derive; -extern crate take; #[macro_use] extern crate cfg_if; @@ -183,12 +183,6 @@ fn spawn_core() -> reactor::Remote { rx.recv().unwrap() } -#[derive(Clone)] -enum Reactor { - Handle(reactor::Handle), - Remote(reactor::Remote), -} - cfg_if! { if #[cfg(feature = "tls")] { extern crate tokio_tls; diff --git a/src/macros.rs b/src/macros.rs index c6e27d3..9c07e8e 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -77,7 +77,9 @@ macro_rules! impl_deserialize { impl $crate::serde::de::Visitor for impl_deserialize_FieldVisitor__ { type Value = impl_deserialize_Field__; - fn expecting(&self, formatter: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + fn expecting(&self, formatter: &mut ::std::fmt::Formatter) + -> ::std::fmt::Result + { formatter.write_str("an unsigned integer") } @@ -318,11 +320,10 @@ macro_rules! service { impl_deserialize!(tarpc_service_Error__, NotIrrefutable(()) $($fn_name($error))*); impl_serialize!(tarpc_service_Error__, {}, NotIrrefutable(()) $($fn_name($error))*); -/// Defines the `Future` RPC service. Implementors must be `Clone`, `Send`, and `'static`, +/// Defines the `Future` RPC service. Implementors must be `Clone` and `'static`, /// as required by `tokio_proto::NewService`. This is required so that the service can be used /// to respond to multiple requests concurrently. pub trait FutureService: - ::std::marker::Send + ::std::clone::Clone + 'static { @@ -345,13 +346,14 @@ macro_rules! service { /// the default tokio `Loop`. fn listen(self, addr: ::std::net::SocketAddr, + handle: &$crate::tokio_core::reactor::Handle, options: $crate::server::Options) - -> $crate::server::ListenFuture + -> ::std::io::Result<::std::net::SocketAddr> { - return $crate::server::listen( - move || Ok(tarpc_service_AsyncServer__(self.clone())), - addr, - options); + return $crate::server::listen(move || Ok(tarpc_service_AsyncServer__(self.clone())), + addr, + handle, + options); #[allow(non_camel_case_types)] #[derive(Clone)] @@ -412,7 +414,7 @@ macro_rules! service { where tarpc_service_S__: FutureService { type Request = ::std::result::Result; + $crate::bincode::Error>; type Response = $crate::server::Response; type Error = ::std::io::Error; @@ -425,7 +427,7 @@ macro_rules! service { return tarpc_service_FutureReply__::DeserializeError( $crate::futures::finished( ::std::result::Result::Err( - $crate::WireError::ServerDeserialize( + $crate::WireError::ServerSerialize( ::std::string::ToString::to_string( &tarpc_service_deserialize_err__))))); } @@ -481,9 +483,9 @@ macro_rules! service { pub trait SyncServiceExt: SyncService { /// Spawns the service, binding to the given address and running on /// the default tokio `Loop`. - fn listen(self, addr: L, options: $crate::server::Options) + fn listen(self, addr: A, options: $crate::server::Options) -> ::std::io::Result<::std::net::SocketAddr> - where L: ::std::net::ToSocketAddrs + where A: ::std::net::ToSocketAddrs { let tarpc_service__ = SyncServer__ { service: self, @@ -492,10 +494,26 @@ macro_rules! service { let tarpc_service_addr__ = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?; - // Wrapped in a lazy future to ensure execution occurs when a task is present. - return $crate::futures::Future::wait($crate::futures::future::lazy(move || { - FutureServiceExt::listen(tarpc_service__, tarpc_service_addr__, options) - })); + let (tx_, rx_) = ::std::sync::mpsc::channel(); + + ::std::thread::spawn(move || { + match $crate::tokio_core::reactor::Core::new() { + ::std::result::Result::Ok(mut reactor_) => { + let addr_ = FutureServiceExt::listen(tarpc_service__, + tarpc_service_addr__, + &reactor_.handle(), + options); + tx_.send(addr_).unwrap(); + loop { + reactor_.turn(::std::option::Option::None); + } + } + ::std::result::Result::Err(error_) => { + tx_.send(Err(error_)).unwrap(); + } + } + }); + return rx_.recv().unwrap(); #[derive(Clone)] struct SyncServer__ { @@ -546,22 +564,26 @@ macro_rules! service { impl SyncServiceExt for S where S: SyncService {} #[allow(unused)] - #[derive(Clone, Debug)] /// The client stub that makes RPC calls to the server. Exposes a blocking interface. - pub struct SyncClient(FutureClient); + pub struct SyncClient { + inner: tarpc_service_SyncClient__, + } - impl $crate::client::sync::Connect for SyncClient { - fn connect(addr_: A, opts_: $crate::client::Options) - -> ::std::result::Result + impl ::std::fmt::Debug for SyncClient { + fn fmt(&self, formatter: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + write!(formatter, "SyncClient {{ inner: {:?}, .. }}", self.inner) + } + } + + impl $crate::client::sync::ClientExt for SyncClient { + fn connect(addr_: A, options_: $crate::client::Options) -> ::std::io::Result where A: ::std::net::ToSocketAddrs, { - let addr_ = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr_)?; - // Wrapped in a lazy future to ensure execution occurs when a task is present. - let client_ = $crate::futures::Future::wait($crate::futures::future::lazy(move || { - ::connect(addr_, opts_) - }))?; - let client_ = SyncClient(client_); - ::std::result::Result::Ok(client_) + let client_ = ::connect(addr_, options_)?; + ::std::result::Result::Ok(SyncClient { + inner: client_, + }) } } @@ -569,89 +591,17 @@ macro_rules! service { $( #[allow(unused)] $(#[$attr])* - pub fn $fn_name(&self, $($arg: $in_),*) + pub fn $fn_name(&mut self, $($arg: $in_),*) -> ::std::result::Result<$out, $crate::Error<$error>> { - // Wrapped in a lazy future to ensure execution occurs when a task is present. - $crate::futures::Future::wait($crate::futures::future::lazy(move || { - (self.0).$fn_name($($arg),*) - })) - } - )* - } - - #[allow(non_camel_case_types)] - type tarpc_service_Client__ = - $crate::client::Client; - - #[allow(non_camel_case_types)] - /// Implementation detail: Pending connection. - pub struct tarpc_service_ConnectFuture__ { - inner: $crate::futures::Map<$crate::client::future::ConnectFuture< - tarpc_service_Request__, - tarpc_service_Response__, - tarpc_service_Error__>, - fn(tarpc_service_Client__) -> T>, - } - - impl $crate::futures::Future for tarpc_service_ConnectFuture__ { - type Item = T; - type Error = ::std::io::Error; - - fn poll(&mut self) -> $crate::futures::Poll { - $crate::futures::Future::poll(&mut self.inner) - } - } - - #[allow(unused)] - #[derive(Clone, Debug)] - /// The client stub that makes RPC calls to the server. Exposes a Future interface. - pub struct FutureClient(tarpc_service_Client__); - - impl<'a> $crate::client::future::Connect for FutureClient { - type ConnectFut = tarpc_service_ConnectFuture__; - - fn connect(tarpc_service_addr__: ::std::net::SocketAddr, - tarpc_service_options__: $crate::client::Options) - -> Self::ConnectFut - { - let client = ::connect( - tarpc_service_addr__, tarpc_service_options__); - - tarpc_service_ConnectFuture__ { - inner: $crate::futures::Future::map(client, FutureClient) - } - } - } - - impl FutureClient { - $( - #[allow(unused)] - $(#[$attr])* - pub fn $fn_name(&self, $($arg: $in_),*) - -> $crate::futures::future::Then< - ::Future, - ::std::result::Result<$out, $crate::Error<$error>>, - fn(::std::result::Result< - ::std::result::Result>, - ::std::io::Error>) - -> ::std::result::Result<$out, $crate::Error<$error>>> { - - let tarpc_service_req__ = tarpc_service_Request__::$fn_name(($($arg,)*)); - let tarpc_service_fut__ = - $crate::tokio_service::Service::call(&self.0, tarpc_service_req__); - return $crate::futures::Future::then(tarpc_service_fut__, then__); + return then__(self.inner.call(tarpc_service_Request__::$fn_name(($($arg,)*)))); + // TODO: this code is duplicated in both FutureClient and SyncClient. fn then__(tarpc_service_msg__: - ::std::result::Result< ::std::result::Result>, - ::std::io::Error>) + $crate::Error>) -> ::std::result::Result<$out, $crate::Error<$error>> { - match tarpc_service_msg__? { + match tarpc_service_msg__ { ::std::result::Result::Ok(tarpc_service_msg__) => { if let tarpc_service_Response__::$fn_name(tarpc_service_msg__) = tarpc_service_msg__ @@ -672,14 +622,120 @@ macro_rules! service { unreachable!() } } - $crate::Error::ServerDeserialize(tarpc_service_err__) => { - $crate::Error::ServerDeserialize(tarpc_service_err__) - } $crate::Error::ServerSerialize(tarpc_service_err__) => { $crate::Error::ServerSerialize(tarpc_service_err__) } - $crate::Error::ClientDeserialize(tarpc_service_err__) => { - $crate::Error::ClientDeserialize(tarpc_service_err__) + $crate::Error::ClientSerialize(tarpc_service_err__) => { + $crate::Error::ClientSerialize(tarpc_service_err__) + } + $crate::Error::Io(tarpc_service_error__) => { + $crate::Error::Io(tarpc_service_error__) + } + }) + } + } + } + } + )* + } + + #[allow(non_camel_case_types)] + type tarpc_service_FutureClient__ = + $crate::client::future::Client; + + #[allow(non_camel_case_types)] + type tarpc_service_SyncClient__ = + $crate::client::sync::Client; + + #[allow(non_camel_case_types)] + /// Implementation detail: Pending connection. + pub struct tarpc_service_ConnectFuture__ { + inner: $crate::futures::Map<$crate::client::future::ConnectFuture< + tarpc_service_Request__, + tarpc_service_Response__, + tarpc_service_Error__>, + fn(tarpc_service_FutureClient__) -> T>, + } + + impl $crate::futures::Future for tarpc_service_ConnectFuture__ { + type Item = T; + type Error = ::std::io::Error; + + fn poll(&mut self) -> $crate::futures::Poll { + $crate::futures::Future::poll(&mut self.inner) + } + } + + #[allow(unused)] + #[derive(Clone, Debug)] + /// The client stub that makes RPC calls to the server. Exposes a Future interface. + pub struct FutureClient(tarpc_service_FutureClient__); + + impl<'a> $crate::client::future::ClientExt for FutureClient { + type ConnectFut = tarpc_service_ConnectFuture__; + + fn connect(tarpc_service_addr__: ::std::net::SocketAddr, + tarpc_service_options__: $crate::client::Options) + -> Self::ConnectFut + { + let client = ::connect(tarpc_service_addr__, + tarpc_service_options__); + + tarpc_service_ConnectFuture__ { + inner: $crate::futures::Future::map(client, FutureClient) + } + } + } + + impl FutureClient { + $( + #[allow(unused)] + $(#[$attr])* + pub fn $fn_name(&self, $($arg: $in_),*) + -> $crate::futures::future::Then< + ::Future, + ::std::result::Result<$out, $crate::Error<$error>>, + fn(::std::result::Result>) + -> ::std::result::Result<$out, $crate::Error<$error>>> { + + let tarpc_service_req__ = tarpc_service_Request__::$fn_name(($($arg,)*)); + let tarpc_service_fut__ = + $crate::tokio_service::Service::call(&self.0, tarpc_service_req__); + return $crate::futures::Future::then(tarpc_service_fut__, then__); + + fn then__(tarpc_service_msg__: + ::std::result::Result>) + -> ::std::result::Result<$out, $crate::Error<$error>> { + match tarpc_service_msg__ { + ::std::result::Result::Ok(tarpc_service_msg__) => { + if let tarpc_service_Response__::$fn_name(tarpc_service_msg__) = + tarpc_service_msg__ + { + ::std::result::Result::Ok(tarpc_service_msg__) + } else { + unreachable!() + } + } + ::std::result::Result::Err(tarpc_service_err__) => { + ::std::result::Result::Err(match tarpc_service_err__ { + $crate::Error::App(tarpc_service_err__) => { + if let tarpc_service_Error__::$fn_name( + tarpc_service_err__) = tarpc_service_err__ + { + $crate::Error::App(tarpc_service_err__) + } else { + unreachable!() + } + } + $crate::Error::ServerSerialize(tarpc_service_err__) => { + $crate::Error::ServerSerialize(tarpc_service_err__) } $crate::Error::ClientSerialize(tarpc_service_err__) => { $crate::Error::ClientSerialize(tarpc_service_err__) @@ -729,6 +785,7 @@ mod functional_test { use futures::{Future, failed}; use std::io; use std::net::SocketAddr; + use tokio_core::reactor; use util::FirstSocketAddr; extern crate env_logger; @@ -751,14 +808,11 @@ mod functional_test { use client::tls::Context; use native_tls::{Pkcs12, TlsAcceptor, TlsConnector}; - fn tls_context() -> (server::Options, client::Options) { + fn get_tls_server_options() -> server::Options { let buf = include_bytes!("../test/identity.p12"); let pkcs12 = unwrap!(Pkcs12::from_der(buf, "mypass")); let acceptor = unwrap!(unwrap!(TlsAcceptor::builder(pkcs12)).build()); - let server_options = server::Options::default().tls(acceptor); - let client_options = get_tls_client_options(); - - (server_options, client_options) + server::Options::default().tls(acceptor) } // Making the TlsConnector for testing needs to be OS-dependent just like native-tls. @@ -778,10 +832,11 @@ mod functional_test { let mut connector = unwrap!(TlsConnector::builder()); connector.anchor_certificates(&[cert]); - client::Options::default().tls(Context { - domain: DOMAIN.into(), - tls_connector: unwrap!(connector.build()), - }) + client::Options::default() + .tls(Context { + domain: DOMAIN.into(), + tls_connector: unwrap!(connector.build()), + }) } } else if #[cfg(all(not(target_os = "macos"), not(windows)))] { use native_tls_inner::backend::openssl::TlsConnectorBuilderExt; @@ -792,10 +847,11 @@ mod functional_test { .builder_mut() .set_ca_file("test/root-ca.pem")); - client::Options::default().tls(Context { - domain: DOMAIN.into(), - tls_connector: unwrap!(connector.build()), - }) + client::Options::default() + .tls(Context { + domain: DOMAIN.into(), + tls_connector: unwrap!(connector.build()), + }) } // not implemented for windows or other platforms } else { @@ -805,41 +861,42 @@ mod functional_test { } } - fn get_sync_client(addr: SocketAddr) -> io::Result - where C: client::sync::Connect + fn start_server_with_sync_client(server: S) -> io::Result<(SocketAddr, C)> + where C: client::sync::ClientExt, S: SyncServiceExt { - let client_options = get_tls_client_options(); - C::connect(addr, client_options) - } - - fn start_server_with_sync_client(server: S) -> (SocketAddr, io::Result) - where C: client::sync::Connect, S: SyncServiceExt - { - let (server_options, client_options) = tls_context(); + let server_options = get_tls_server_options(); let addr = unwrap!(server.listen("localhost:0".first_socket_addr(), server_options)); - let client = C::connect(addr, client_options); - (addr, client) + let client = unwrap!(C::connect(addr, get_tls_client_options())); + Ok((addr, client)) } - fn start_server_with_async_client(server: S) -> (SocketAddr, C) - where C: client::future::Connect, S: FutureServiceExt + fn start_server_with_async_client(server: S) + -> io::Result<(SocketAddr, reactor::Core, C)> + where C: client::future::ClientExt, S: FutureServiceExt { - let (server_options, client_options) = tls_context(); - let addr = unwrap!(server.listen("localhost:0".first_socket_addr(), - server_options).wait()); - let client = unwrap!(C::connect(addr, client_options).wait()); - (addr, client) + let mut reactor = reactor::Core::new()?; + let server_options = get_tls_server_options(); + let addr = server.listen("localhost:0".first_socket_addr(), + &reactor.handle(), + server_options)?; + let client_options = get_tls_client_options().handle(reactor.handle()); + let client = unwrap!(reactor.run(C::connect(addr, client_options))); + Ok((addr, reactor, client)) } - fn start_err_server_with_async_client(server: S) -> (SocketAddr, C) - where C: client::future::Connect, S: error_service::FutureServiceExt + fn start_err_server_with_async_client(server: S) + -> io::Result<(SocketAddr, reactor::Core, C)> + where C: client::future::ClientExt, S: error_service::FutureServiceExt { - let (server_options, client_options) = tls_context(); - let addr = unwrap!(server.listen("localhost:0".first_socket_addr(), - server_options).wait()); - let client = unwrap!(C::connect(addr, client_options).wait()); - (addr, client) + let mut reactor = reactor::Core::new()?; + let server_options = get_tls_server_options(); + let addr = server.listen("localhost:0".first_socket_addr(), + &reactor.handle(), + server_options)?; + let client_options = get_tls_client_options().handle(reactor.handle()); + let client = unwrap!(reactor.run(C::connect(addr, client_options))); + Ok((addr, reactor, client)) } } else { fn get_server_options() -> server::Options { @@ -851,36 +908,45 @@ mod functional_test { } fn get_sync_client(addr: SocketAddr) -> io::Result - where C: client::sync::Connect + where C: client::sync::ClientExt { C::connect(addr, get_client_options()) } - fn start_server_with_sync_client(server: S) -> (SocketAddr, io::Result) - where C: client::sync::Connect, S: SyncServiceExt + fn start_server_with_sync_client(server: S) -> io::Result<(SocketAddr, C)> + where C: client::sync::ClientExt, S: SyncServiceExt { - let addr = unwrap!(server.listen("localhost:0".first_socket_addr(), - get_server_options())); + let options = get_server_options(); + let addr = unwrap!(server.listen("localhost:0".first_socket_addr(), options)); + let client = unwrap!(get_sync_client(addr)); + Ok((addr, client)) + } + + fn start_server_with_async_client(server: S) + -> io::Result<(SocketAddr, reactor::Core, C)> + where C: client::future::ClientExt, S: FutureServiceExt + { + let mut reactor = reactor::Core::new()?; + let options = get_server_options(); + let addr = server.listen("localhost:0".first_socket_addr(), + &reactor.handle(), + options)?; + let client = unwrap!(reactor.run(C::connect(addr, get_client_options()))); + Ok((addr, reactor, client)) + } + + fn start_err_server_with_async_client(server: S) + -> io::Result<(SocketAddr, reactor::Core, C)> + where C: client::future::ClientExt, S: error_service::FutureServiceExt + { + let mut reactor = reactor::Core::new()?; + let options = get_server_options(); + let addr = server.listen("localhost:0".first_socket_addr(), + &reactor.handle(), + options)?; let client = C::connect(addr, get_client_options()); - (addr, client) - } - - fn start_server_with_async_client(server: S) -> (SocketAddr, C) - where C: client::future::Connect, S: FutureServiceExt - { - let addr = unwrap!(server.listen("localhost:0".first_socket_addr(), - get_server_options()).wait()); - let client = unwrap!(C::connect(addr, get_client_options()).wait()); - (addr, client) - } - - fn start_err_server_with_async_client(server: S) -> (SocketAddr, C) - where C: client::future::Connect, S: error_service::FutureServiceExt - { - let addr = unwrap!(server.listen("localhost:0".first_socket_addr(), - get_server_options()).wait()); - let client = unwrap!(C::connect(addr, get_client_options()).wait()); - (addr, client) + let client = unwrap!(reactor.run(client)); + Ok((addr, reactor, client)) } } } @@ -905,8 +971,8 @@ mod functional_test { #[test] fn simple() { let _ = env_logger::init(); - let (_, client) = start_server_with_sync_client::(Server); - let client = unwrap!(client); + let (_, mut client) = unwrap!(start_server_with_sync_client::(Server)); assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap()); } @@ -914,19 +980,20 @@ mod functional_test { #[test] fn other_service() { let _ = env_logger::init(); - let (_, client) = start_server_with_sync_client::(Server); - let client = client.expect("Could not connect!"); + let (_, mut client) = + unwrap!(start_server_with_sync_client::(Server)); match client.foo().err().expect("failed unwrap") { - ::Error::ServerDeserialize(_) => {} // good - bad => panic!("Expected Error::ServerDeserialize but got {}", bad), + ::Error::ServerSerialize(_) => {} // good + bad => panic!("Expected Error::ServerSerialize but got {}", bad), } } } mod future { - use futures::{Finished, Future, finished}; use super::{FutureClient, FutureService, env_logger, start_server_with_async_client}; + use futures::{Finished, finished}; + use tokio_core::reactor; use util::Never; #[derive(Clone)] @@ -949,32 +1016,35 @@ mod functional_test { #[test] fn simple() { let _ = env_logger::init(); - let (_, client) = start_server_with_async_client::(Server); - assert_eq!(3, client.add(1, 2).wait().unwrap()); - assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap()); + let (_, mut reactor, client) = + unwrap!(start_server_with_async_client::(Server)); + assert_eq!(3, reactor.run(client.add(1, 2)).unwrap()); + assert_eq!("Hey, Tim.", + reactor.run(client.hey("Tim".to_string())).unwrap()); } #[test] fn concurrent() { let _ = env_logger::init(); - let (_, client) = start_server_with_async_client::(Server); + let (_, mut reactor, client) = + unwrap!(start_server_with_async_client::(Server)); let req1 = client.add(1, 2); let req2 = client.add(3, 4); let req3 = client.hey("Tim".to_string()); - assert_eq!(3, req1.wait().unwrap()); - assert_eq!(7, req2.wait().unwrap()); - assert_eq!("Hey, Tim.", req3.wait().unwrap()); + assert_eq!(3, reactor.run(req1).unwrap()); + assert_eq!(7, reactor.run(req2).unwrap()); + assert_eq!("Hey, Tim.", reactor.run(req3).unwrap()); } #[test] fn other_service() { let _ = env_logger::init(); - let (_, client) = - start_server_with_async_client::(Server); - match client.foo().wait().err().unwrap() { - ::Error::ServerDeserialize(_) => {} // good - bad => panic!(r#"Expected Error::ServerDeserialize but got "{}""#, bad), + let (_, mut reactor, client) = + unwrap!(start_server_with_async_client::(Server)); + match reactor.run(client.foo()).err().unwrap() { + ::Error::ServerSerialize(_) => {} // good + bad => panic!(r#"Expected Error::ServerSerialize but got "{}""#, bad), } } @@ -983,36 +1053,40 @@ mod functional_test { use util::FirstSocketAddr; use server; use super::FutureServiceExt; - + let _ = env_logger::init(); - let addr = Server.listen("localhost:0".first_socket_addr(), server::Options::default()) - .wait() - .unwrap(); - Server.listen(addr, server::Options::default()) - .wait() + let reactor = reactor::Core::new().unwrap(); + let addr = Server.listen("localhost:0".first_socket_addr(), + &reactor.handle(), + server::Options::default()) .unwrap(); + Server.listen(addr, &reactor.handle(), server::Options::default()).unwrap(); } - + #[cfg(feature = "tls")] #[test] fn tcp_and_tls() { use {client, server}; use util::FirstSocketAddr; - use client::future::Connect; + use client::future::ClientExt; use super::FutureServiceExt; let _ = env_logger::init(); - let (_, client) = start_server_with_async_client::(Server); - assert_eq!(3, client.add(1, 2).wait().unwrap()); - assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap()); + let (_, mut reactor, client) = + unwrap!(start_server_with_async_client::(Server)); + assert_eq!(3, reactor.run(client.add(1, 2)).unwrap()); + assert_eq!("Hey, Tim.", + reactor.run(client.hey("Tim".to_string())).unwrap()); let addr = Server.listen("localhost:0".first_socket_addr(), + &reactor.handle(), server::Options::default()) - .wait() .unwrap(); - let client = FutureClient::connect(addr, client::Options::default()).wait().unwrap(); - assert_eq!(3, client.add(1, 2).wait().unwrap()); - assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap()); + let options = client::Options::default().handle(reactor.handle()); + let client = reactor.run(FutureClient::connect(addr, options)).unwrap(); + assert_eq!(3, reactor.run(client.add(1, 2)).unwrap()); + assert_eq!("Hey, Tim.", + reactor.run(client.hey("Tim".to_string())).unwrap()); } } @@ -1040,28 +1114,19 @@ mod functional_test { use self::error_service::*; let _ = env_logger::init(); - let (addr, client) = start_err_server_with_async_client::(ErrorServer); - client.bar() - .then(move |result| { - match result.err().unwrap() { - ::Error::App(e) => { - assert_eq!(e.description(), "lol jk"); - Ok::<_, ()>(()) - } // good - bad => panic!("Expected Error::App but got {:?}", bad), - } - }) - .wait() + let (_, mut reactor, client) = + start_err_server_with_async_client::(ErrorServer).unwrap(); + reactor.run(client.bar() + .then(move |result| { + match result.err().unwrap() { + ::Error::App(e) => { + assert_eq!(e.description(), "lol jk"); + Ok::<_, ()>(()) + } // good + bad => panic!("Expected Error::App but got {:?}", bad), + } + })) .unwrap(); - - let client = get_sync_client::(addr).unwrap(); - match client.bar().err().unwrap() { - ::Error::App(e) => { - assert_eq!(e.description(), "lol jk"); - } // good - bad => panic!("Expected Error::App but got {:?}", bad), - } } pub mod other_service { diff --git a/src/plugins/src/lib.rs b/src/plugins/src/lib.rs index 95f22be..99a9eb8 100644 --- a/src/plugins/src/lib.rs +++ b/src/plugins/src/lib.rs @@ -1,7 +1,6 @@ #![feature(plugin_registrar, rustc_private)] extern crate itertools; -extern crate rustc; extern crate rustc_plugin; extern crate syntax; diff --git a/src/protocol.rs b/src/protocol.rs index b973b38..ec9ed96 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -4,7 +4,7 @@ // This file may not be copied, modified, or distributed except according to those terms. use {serde, tokio_core}; -use bincode::{SizeLimit, serde as bincode}; +use bincode::{self, SizeLimit}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use std::io::{self, Cursor}; use std::marker::PhantomData; @@ -12,7 +12,6 @@ use std::mem; use tokio_core::io::{EasyBuf, Framed, Io}; use tokio_proto::multiplex::{ClientProto, ServerProto}; use tokio_proto::streaming::multiplex::RequestId; -use util::Debugger; // `Encode` is the type that `Codec` encodes. `Decode` is the type it decodes. pub struct Codec { @@ -40,7 +39,7 @@ impl tokio_core::io::Codec for Codec Decode: serde::Deserialize { type Out = (RequestId, Encode); - type In = (RequestId, Result); + type In = (RequestId, Result); fn encode(&mut self, (id, message): Self::Out, buf: &mut Vec) -> io::Result<()> { buf.write_u64::(id).unwrap(); @@ -97,7 +96,6 @@ impl tokio_core::io::Codec for Codec // message. self.state = Id; - trace!("--> Parsed message: {:?}", Debugger(&result)); return Ok(Some((id, result))); } } @@ -121,7 +119,7 @@ impl ServerProto for Proto Decode: serde::Deserialize + 'static { type Response = Encode; - type Request = Result; + type Request = Result; type Transport = Framed>; type BindTransport = Result; @@ -135,7 +133,7 @@ impl ClientProto for Proto Encode: serde::Serialize + 'static, Decode: serde::Deserialize + 'static { - type Response = Result; + type Response = Result; type Request = Encode; type Transport = Framed>; type BindTransport = Result; @@ -158,8 +156,8 @@ fn serialize() { let mut codec: Codec<(char, char, char), (char, char, char)> = Codec::new(); codec.encode(MSG, &mut vec).unwrap(); buf.get_mut().append(&mut vec); - let actual: Result)>, - io::Error> = codec.decode(&mut buf); + let actual: Result)>, io::Error> = + codec.decode(&mut buf); match actual { Ok(Some((id, ref v))) if id == MSG.0 && *v.as_ref().unwrap() == MSG.1 => {} diff --git a/src/server.rs b/src/server.rs index 2739c46..1e1b4e6 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,8 +3,7 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use {REMOTE, Reactor}; -use bincode::serde::DeserializeError; +use bincode; use errors::WireError; use futures::{self, Async, Future, Stream, future}; use net2; @@ -35,24 +34,11 @@ enum Acceptor { /// Additional options to configure how the server operates. #[derive(Default)] pub struct Options { - reactor: Option, #[cfg(feature = "tls")] tls_acceptor: Option, } impl Options { - /// Listen using the given reactor handle. - pub fn handle(mut self, handle: reactor::Handle) -> Self { - self.reactor = Some(Reactor::Handle(handle)); - self - } - - /// Listen using the given reactor remote. - pub fn remote(mut self, remote: reactor::Remote) -> Self { - self.reactor = Some(Reactor::Remote(remote)); - self - } - /// Set the `TlsAcceptor` #[cfg(feature = "tls")] pub fn tls(mut self, tls_acceptor: TlsAcceptor) -> Self { @@ -66,10 +52,14 @@ impl Options { pub type Response = Result>; #[doc(hidden)] -pub fn listen(new_service: S, addr: SocketAddr, options: Options) -> ListenFuture - where S: NewService, +pub fn listen(new_service: S, + addr: SocketAddr, + handle: &reactor::Handle, + _options: Options) + -> io::Result + where S: NewService, Response = Response, - Error = io::Error> + Send + 'static, + Error = io::Error> + 'static, Req: Deserialize + 'static, Resp: Serialize + 'static, E: Serialize + 'static @@ -77,53 +67,30 @@ pub fn listen(new_service: S, addr: SocketAddr, options: Option // Similar to the client, since `Options` is not `Send`, we take the `TlsAcceptor` when it is // available. #[cfg(feature = "tls")] - let acceptor = match options.tls_acceptor { + let acceptor = match _options.tls_acceptor { Some(tls_acceptor) => Acceptor::Tls(tls_acceptor), None => Acceptor::Tcp, }; #[cfg(not(feature = "tls"))] let acceptor = Acceptor::Tcp; - match options.reactor { - None => { - let (tx, rx) = futures::oneshot(); - REMOTE.spawn(move |handle| { - Ok(tx.complete(listen_with(new_service, addr, handle.clone(), acceptor))) - }); - ListenFuture { inner: future::Either::A(rx) } - } - Some(Reactor::Remote(remote)) => { - let (tx, rx) = futures::oneshot(); - remote.spawn(move |handle| { - Ok(tx.complete(listen_with(new_service, addr, handle.clone(), acceptor))) - }); - ListenFuture { inner: future::Either::A(rx) } - } - Some(Reactor::Handle(handle)) => { - ListenFuture { - inner: future::Either::B(future::ok(listen_with(new_service, - addr, - handle, - acceptor))), - } - } - } + listen_with(new_service, addr, handle, acceptor) } /// Spawns a service that binds to the given address using the given handle. fn listen_with(new_service: S, addr: SocketAddr, - handle: Handle, + handle: &Handle, _acceptor: Acceptor) -> io::Result - where S: NewService, + where S: NewService, Response = Response, - Error = io::Error> + Send + 'static, + Error = io::Error> + 'static, Req: Deserialize + 'static, Resp: Serialize + 'static, E: Serialize + 'static { - let listener = listener(&addr, &handle)?; + let listener = listener(&addr, handle)?; let addr = listener.local_addr()?; let handle2 = handle.clone(); @@ -175,8 +142,7 @@ fn listener(addr: &SocketAddr, handle: &Handle) -> io::Result { /// A future that resolves to a `ServerHandle`. #[doc(hidden)] pub struct ListenFuture { - inner: future::Either>, - future::FutureResult, futures::Canceled>>, + inner: future::FutureResult, futures::Canceled>, } impl Future for ListenFuture { diff --git a/src/util.rs b/src/util.rs index 0eb1f75..44f36d2 100644 --- a/src/util.rs +++ b/src/util.rs @@ -111,18 +111,3 @@ pub trait FirstSocketAddr: ToSocketAddrs { } impl FirstSocketAddr for A {} - -/// A struct that will format as the contained type if the type impls Debug. -pub struct Debugger<'a, T: 'a>(pub &'a T); - -impl<'a, T: fmt::Debug> fmt::Debug for Debugger<'a, T> { - fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!(f, "{:?}", self.0) - } -} - -impl<'a, T> fmt::Debug for Debugger<'a, T> { - default fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!(f, "{{not debuggable}}") - } -}