Merge branch master into tikue/into-future.

This commit is contained in:
Tim Kuehn
2017-02-16 00:37:19 -08:00
19 changed files with 681 additions and 707 deletions

View File

@@ -11,24 +11,23 @@ readme = "README.md"
description = "An RPC framework for Rust with a focus on ease of use."
[dependencies]
bincode = "1.0.0-alpha"
bincode = "1.0.0-alpha2"
byteorder = "1.0"
cfg-if = "0.1.0"
bytes = "0.3"
futures = "0.1.7"
lazy_static = "0.2"
log = "0.3"
native-tls = { version = "0.1.1", optional = true }
scoped-pool = "1.0"
net2 = "0.2"
serde = "0.9"
serde_derive = "0.9"
tarpc-plugins = { path = "src/plugins" }
take = "0.1"
tokio-service = "0.1"
tokio-proto = "0.1"
tokio-core = "0.1"
tokio-proto = "0.1"
tokio-service = "0.1"
# Optional dependencies
native-tls = { version = "0.1.1", optional = true }
tokio-tls = { version = "0.1", optional = true }
net2 = "0.2"
[dev-dependencies]
chrono = "0.2"

View File

@@ -50,10 +50,12 @@ tarpc-plugins = { git = "https://github.com/google/tarpc" }
extern crate futures;
#[macro_use]
extern crate tarpc;
extern crate tokio_core;
use tarpc::{client, server};
use tarpc::client::sync::Connect;
use tarpc::util::Never;
use tarpc::util::{FirstSocketAddr, Never};
use tokio_core::reactor;
service! {
rpc hello(name: String) -> String;
@@ -69,9 +71,11 @@ impl SyncService for HelloServer {
}
fn main() {
let addr = "localhost:10000";
HelloServer.listen(addr, server::Options::default()).unwrap();
let client = SyncClient::connect(addr, client::Options::default()).unwrap();
let reactor = reactor::Core::new().unwrap();
let addr = HelloServer.listen("localhost:0".first_socket_addr(),
server::Options::default())
.unwrap();
let mut client = SyncClient::connect(addr, client::Options::default()).unwrap();
println!("{}", client.hello("Mom".to_string()).unwrap());
}
```
@@ -101,7 +105,7 @@ extern crate tokio_core;
use futures::Future;
use tarpc::{client, server};
use tarpc::client::future::Connect;
use tarpc::client::future::ClientExt;
use tarpc::util::{FirstSocketAddr, Never};
use tokio_core::reactor;
@@ -121,9 +125,10 @@ impl FutureService for HelloServer {
}
fn main() {
let addr = "localhost:10000".first_socket_addr();
let mut core = reactor::Core::new().unwrap();
HelloServer.listen(addr, server::Options::default().handle(core.handle())).wait().unwrap();
let addr = HelloServer.listen("localhost:10000".first_socket_addr(),
server::Options::from(core.handle()))
.unwrap();
let options = client::Options::default().handle(core.handle());
core.run(FutureClient::connect(addr, options)
.map_err(tarpc::Error::from)
@@ -169,7 +174,7 @@ extern crate tokio_core;
use futures::Future;
use tarpc::{client, server};
use tarpc::client::future::Connect;
use tarpc::client::future::ClientExt;
use tarpc::util::{FirstSocketAddr, Never};
use tokio_core::reactor;
use tarpc::native_tls::{Pkcs12, TlsAcceptor};
@@ -196,14 +201,15 @@ fn get_acceptor() -> TlsAcceptor {
}
fn main() {
let addr = "localhost:10000".first_socket_addr();
let mut core = reactor::Core::new().unwrap();
let acceptor = get_acceptor();
HelloServer.listen(addr, server::Options::default()
.handle(core.handle())
.tls(acceptor)).wait().unwrap();
let options = client::Options::default().handle(core.handle()
.tls(client::tls::Context::new("foobar.com").unwrap()));
let addr = HelloServer.listen("localhost:10000".first_socket_addr(),
server::Options::from(core.handle())
.tls(acceptor))
.unwrap();
let options = client::Options::default()
.handle(core.handle())
.tls(client::tls::Context::new("foobar.com").unwrap()));
core.run(FutureClient::connect(addr, options)
.map_err(tarpc::Error::from)
.and_then(|client| client.hello("Mom".to_string()))

View File

@@ -12,13 +12,14 @@ extern crate tarpc;
extern crate test;
extern crate env_logger;
extern crate futures;
extern crate tokio_core;
use futures::Future;
use tarpc::{client, server};
use tarpc::client::sync::Connect;
use tarpc::client::future::ClientExt;
use tarpc::util::{FirstSocketAddr, Never};
#[cfg(test)]
use test::Bencher;
use tokio_core::reactor;
service! {
rpc ack();
@@ -38,8 +39,12 @@ impl FutureService for Server {
#[bench]
fn latency(bencher: &mut Bencher) {
let _ = env_logger::init();
let addr = Server.listen("localhost:0".first_socket_addr(), server::Options::default()).wait().unwrap();
let client = SyncClient::connect(addr, client::Options::default()).unwrap();
let mut reactor = reactor::Core::new().unwrap();
let addr = Server.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default())
.unwrap();
let client = reactor.run(FutureClient::connect(addr, client::Options::default())).unwrap();
bencher.iter(|| { client.ack().unwrap(); });
bencher.iter(|| reactor.run(client.ack()).unwrap());
}

View File

@@ -26,7 +26,7 @@ use std::sync::{Arc, mpsc};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use tarpc::{client, server};
use tarpc::client::future::Connect;
use tarpc::client::future::ClientExt;
use tarpc::util::{FirstSocketAddr, Never};
use tokio_core::reactor;
@@ -166,10 +166,11 @@ fn main() {
.map(Result::unwrap)
.unwrap_or(4);
let mut reactor = reactor::Core::new().unwrap();
let addr = Server::new()
.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default())
.wait()
.unwrap();
info!("Server listening on {}.", addr);
@@ -186,8 +187,5 @@ fn main() {
info!("Starting...");
// The driver of the main future.
let mut core = reactor::Core::new().unwrap();
core.run(run).unwrap();
reactor.run(run).unwrap();
}

View File

@@ -10,20 +10,21 @@ extern crate env_logger;
extern crate futures;
#[macro_use]
extern crate tarpc;
extern crate tokio_proto as tokio;
extern crate tokio_core;
use futures::{BoxFuture, Future};
use futures::{Future, future};
use publisher::FutureServiceExt as PublisherExt;
use std::cell::RefCell;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::rc::Rc;
use std::thread;
use std::time::Duration;
use subscriber::FutureServiceExt as SubscriberExt;
use tarpc::{client, server};
use tarpc::client::future::Connect as Fc;
use tarpc::client::sync::Connect as Sc;
use tarpc::client::future::ClientExt;
use tarpc::util::{FirstSocketAddr, Message, Never};
use tokio_core::reactor;
pub mod subscriber {
service! {
@@ -57,39 +58,39 @@ impl subscriber::FutureService for Subscriber {
}
impl Subscriber {
fn listen(id: u32) -> SocketAddr {
fn listen(id: u32, handle: &reactor::Handle, options: server::Options) -> SocketAddr {
Subscriber { id: id }
.listen("localhost:0".first_socket_addr(),
server::Options::default())
.wait()
.listen("localhost:0".first_socket_addr(), handle, options)
.unwrap()
}
}
#[derive(Clone, Debug)]
struct Publisher {
clients: Arc<Mutex<HashMap<u32, subscriber::FutureClient>>>,
clients: Rc<RefCell<HashMap<u32, subscriber::FutureClient>>>,
}
impl Publisher {
fn new() -> Publisher {
Publisher { clients: Arc::new(Mutex::new(HashMap::new())) }
Publisher { clients: Rc::new(RefCell::new(HashMap::new())) }
}
}
impl publisher::FutureService for Publisher {
type BroadcastFut = BoxFuture<(), Never>;
type BroadcastFut = Box<Future<Item = (), Error = Never>>;
fn broadcast(&self, message: String) -> Self::BroadcastFut {
futures::collect(self.clients
.lock()
.unwrap()
.values_mut()
// Ignore failing subscribers.
.map(move |client| client.receive(message.clone()).then(|_| Ok(())))
.collect::<Vec<_>>())
.map(|_| ())
.boxed()
let acks = self.clients
.borrow()
.values()
.map(move |client| client.receive(message.clone())
// Ignore failing subscribers. In a real pubsub,
// you'd want to continually retry until subscribers
// ack.
.then(|_| Ok(())))
// Collect to a vec to end the borrow on `self.clients`.
.collect::<Vec<_>>();
Box::new(future::join_all(acks).map(|_| ()))
}
type SubscribeFut = Box<Future<Item = (), Error = Message>>;
@@ -99,42 +100,45 @@ impl publisher::FutureService for Publisher {
Box::new(subscriber::FutureClient::connect(address, client::Options::default())
.map(move |subscriber| {
println!("Subscribing {}.", id);
clients.lock().unwrap().insert(id, subscriber);
clients.borrow_mut().insert(id, subscriber);
()
})
.map_err(|e| e.to_string().into()))
}
type UnsubscribeFut = BoxFuture<(), Never>;
type UnsubscribeFut = Box<Future<Item = (), Error = Never>>;
fn unsubscribe(&self, id: u32) -> Self::UnsubscribeFut {
println!("Unsubscribing {}", id);
self.clients.lock().unwrap().remove(&id).unwrap();
self.clients.borrow_mut().remove(&id).unwrap();
futures::finished(()).boxed()
}
}
fn main() {
let _ = env_logger::init();
let mut reactor = reactor::Core::new().unwrap();
let publisher_addr = Publisher::new()
.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default())
.wait()
.unwrap();
let publisher_client =
publisher::SyncClient::connect(publisher_addr, client::Options::default()).unwrap();
let subscriber1 = Subscriber::listen(0, &reactor.handle(), server::Options::default());
let subscriber2 = Subscriber::listen(1, &reactor.handle(), server::Options::default());
let subscriber1 = Subscriber::listen(0);
publisher_client.subscribe(0, subscriber1).unwrap();
let subscriber2 = Subscriber::listen(1);
publisher_client.subscribe(1, subscriber2).unwrap();
println!("Broadcasting...");
publisher_client.broadcast("hello to all".to_string()).unwrap();
publisher_client.unsubscribe(1).unwrap();
publisher_client.broadcast("hello again".to_string()).unwrap();
let publisher =
reactor.run(publisher::FutureClient::connect(publisher_addr, client::Options::default()))
.unwrap();
reactor.run(publisher.subscribe(0, subscriber1)
.and_then(|_| publisher.subscribe(1, subscriber2))
.map_err(|e| panic!(e))
.and_then(|_| {
println!("Broadcasting...");
publisher.broadcast("hello to all".to_string())
})
.and_then(|_| publisher.unsubscribe(1))
.and_then(|_| publisher.broadcast("hi again".to_string())))
.unwrap();
thread::sleep(Duration::from_millis(300));
}

View File

@@ -11,11 +11,13 @@ extern crate futures;
extern crate tarpc;
#[macro_use]
extern crate serde_derive;
extern crate tokio_core;
use std::error::Error;
use std::fmt;
use tarpc::{client, server};
use tarpc::client::sync::Connect;
use tarpc::client::sync::ClientExt;
use tarpc::util::FirstSocketAddr;
service! {
rpc hello(name: String) -> String | NoNameGiven;
@@ -50,8 +52,10 @@ impl SyncService for HelloServer {
}
fn main() {
let addr = HelloServer.listen("localhost:10000", server::Options::default()).unwrap();
let client = SyncClient::connect(addr, client::Options::default()).unwrap();
let addr = HelloServer.listen("localhost:10000".first_socket_addr(),
server::Options::default())
.unwrap();
let mut client = SyncClient::connect(addr, client::Options::default()).unwrap();
println!("{}", client.hello("Mom".to_string()).unwrap());
println!("{}", client.hello("".to_string()).unwrap_err());
}

View File

@@ -13,7 +13,7 @@ extern crate tokio_core;
use futures::Future;
use tarpc::{client, server};
use tarpc::client::future::Connect;
use tarpc::client::future::ClientExt;
use tarpc::util::{FirstSocketAddr, Never};
use tokio_core::reactor;
@@ -33,11 +33,13 @@ impl FutureService for HelloServer {
}
fn main() {
let addr = "localhost:10000".first_socket_addr();
let mut core = reactor::Core::new().unwrap();
HelloServer.listen(addr, server::Options::default().handle(core.handle())).wait().unwrap();
let options = client::Options::default().handle(core.handle());
core.run(FutureClient::connect(addr, options)
let mut reactor = reactor::Core::new().unwrap();
let addr = HelloServer.listen("localhost:10000".first_socket_addr(),
&reactor.handle(),
server::Options::default())
.unwrap();
let options = client::Options::default().handle(reactor.handle());
reactor.run(FutureClient::connect(addr, options)
.map_err(tarpc::Error::from)
.and_then(|client| client.hello("Mom".to_string()))
.map(|resp| println!("{}", resp)))

View File

@@ -10,10 +10,11 @@
extern crate futures;
#[macro_use]
extern crate tarpc;
extern crate tokio_core;
use tarpc::{client, server};
use tarpc::client::sync::Connect;
use tarpc::util::Never;
use tarpc::client::sync::ClientExt;
use tarpc::util::{FirstSocketAddr, Never};
service! {
rpc hello(name: String) -> String;
@@ -29,8 +30,9 @@ impl SyncService for HelloServer {
}
fn main() {
let addr = "localhost:10000";
HelloServer.listen(addr, server::Options::default()).unwrap();
let client = SyncClient::connect(addr, client::Options::default()).unwrap();
let addr = HelloServer.listen("localhost:0".first_socket_addr(),
server::Options::default())
.unwrap();
let mut client = SyncClient::connect(addr, client::Options::default()).unwrap();
println!("{}", client.hello("Mom".to_string()).unwrap());
}

View File

@@ -10,15 +10,15 @@ extern crate env_logger;
#[macro_use]
extern crate tarpc;
extern crate futures;
extern crate tokio_core;
use add::{FutureService as AddFutureService, FutureServiceExt as AddExt};
use double::{FutureService as DoubleFutureService, FutureServiceExt as DoubleExt};
use futures::{BoxFuture, Future};
use std::sync::{Arc, Mutex};
use futures::{BoxFuture, Future, Stream};
use tarpc::{client, server};
use tarpc::client::future::Connect as Fc;
use tarpc::client::sync::Connect as Sc;
use tarpc::client::future::ClientExt as Fc;
use tarpc::util::{FirstSocketAddr, Message, Never};
use tokio_core::reactor;
pub mod add {
service! {
@@ -49,12 +49,12 @@ impl AddFutureService for AddServer {
#[derive(Clone)]
struct DoubleServer {
client: Arc<Mutex<add::FutureClient>>,
client: add::FutureClient,
}
impl DoubleServer {
fn new(client: add::FutureClient) -> Self {
DoubleServer { client: Arc::new(Mutex::new(client)) }
DoubleServer { client: client }
}
}
@@ -63,8 +63,6 @@ impl DoubleFutureService for DoubleServer {
fn double(&self, x: i32) -> Self::DoubleFut {
self.client
.lock()
.unwrap()
.add(x, x)
.map_err(|e| e.to_string().into())
.boxed()
@@ -73,22 +71,29 @@ impl DoubleFutureService for DoubleServer {
fn main() {
let _ = env_logger::init();
let mut reactor = reactor::Core::new().unwrap();
let add_addr = AddServer.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default())
.wait()
.unwrap();
let add_client =
add::FutureClient::connect(add_addr, client::Options::default()).wait().unwrap();
let double = DoubleServer::new(add_client);
let double_addr = double.listen("localhost:0".first_socket_addr(),
server::Options::default())
.wait()
.unwrap();
let double_client = double::SyncClient::connect(double_addr, client::Options::default())
let options = client::Options::default().handle(reactor.handle());
let add_client = reactor.run(add::FutureClient::connect(add_addr, options)).unwrap();
let double_addr = DoubleServer::new(add_client)
.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default())
.unwrap();
let double_client =
reactor.run(double::FutureClient::connect(double_addr, client::Options::default()))
.unwrap();
reactor.run(futures::stream::futures_unordered((0..5).map(|i| double_client.double(i)))
.map_err(|e| println!("{}", e))
.for_each(|i| {
println!("{:?}", i);
Ok(())
}))
.unwrap();
for i in 0..5 {
println!("{:?}", double_client.double(i).unwrap());
}
}

View File

@@ -13,16 +13,17 @@ extern crate tarpc;
extern crate env_logger;
extern crate futures;
extern crate serde;
extern crate tokio_core;
use futures::Future;
use std::io::{Read, Write, stdout};
use std::net;
use std::sync::Arc;
use std::thread;
use std::time;
use tarpc::{client, server};
use tarpc::client::sync::Connect;
use tarpc::client::sync::ClientExt;
use tarpc::util::{FirstSocketAddr, Never};
use tokio_core::reactor;
lazy_static! {
static ref BUF: Arc<serde::bytes::ByteBuf> = Arc::new(gen_vec(CHUNK_SIZE as usize).into());
@@ -54,11 +55,12 @@ impl FutureService for Server {
const CHUNK_SIZE: u32 = 1 << 19;
fn bench_tarpc(target: u64) {
let reactor = reactor::Core::new().unwrap();
let addr = Server.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default())
.wait()
.unwrap();
let client = SyncClient::connect(addr, client::Options::default()).unwrap();
let mut client = SyncClient::connect(addr, client::Options::default()).unwrap();
let start = time::Instant::now();
let mut nread = 0;
while nread < target {

View File

@@ -13,13 +13,14 @@ extern crate tarpc;
extern crate bincode;
extern crate env_logger;
extern crate futures;
extern crate tokio_core;
use bar::FutureServiceExt as BarExt;
use baz::FutureServiceExt as BazExt;
use futures::Future;
use tarpc::{client, server};
use tarpc::client::sync::Connect;
use tarpc::client::sync::ClientExt;
use tarpc::util::{FirstSocketAddr, Never};
use tokio_core::reactor;
mod bar {
service! {
@@ -59,17 +60,27 @@ macro_rules! pos {
fn main() {
let _ = env_logger::init();
let bar_addr = Bar.listen("localhost:0".first_socket_addr(),
server::Options::default())
.wait()
.unwrap();
let baz_addr = Baz.listen("localhost:0".first_socket_addr(),
server::Options::default())
.wait()
.unwrap();
let mut bar_client = {
let reactor = reactor::Core::new().unwrap();
let addr = Bar.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default())
.unwrap();
// TODO: Need to set up each client with its own reactor. Should it be shareable across
// multiple clients? e.g. Rc<RefCell<Core>> or something similar?
bar::SyncClient::connect(addr, client::Options::default()).unwrap()
};
let mut baz_client = {
// Need to set up each client with its own reactor.
let reactor = reactor::Core::new().unwrap();
let addr = Baz.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default())
.unwrap();
baz::SyncClient::connect(addr, client::Options::default()).unwrap()
};
let bar_client = bar::SyncClient::connect(bar_addr, client::Options::default()).unwrap();
let baz_client = baz::SyncClient::connect(baz_addr, client::Options::default()).unwrap();
info!("Result: {:?}", bar_client.bar(17));

View File

@@ -3,26 +3,12 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms.
use {Reactor, WireError};
use bincode::serde::DeserializeError;
use futures::{self, Future};
use protocol::Proto;
#[cfg(feature = "tls")]
use self::tls::*;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::io;
use stream_type::StreamType;
use {WireError, bincode};
use tokio_core::reactor;
use tokio_proto::BindClient as ProtoBindClient;
use tokio_proto::multiplex::Multiplex;
use tokio_service::Service;
type WireResponse<Resp, E> = Result<Result<Resp, WireError<E>>, DeserializeError>;
type ResponseFuture<Req, Resp, E> = futures::Map<<BindClient<Req, Resp, E> as Service>::Future,
fn(WireResponse<Resp, E>) -> Result<Resp, ::Error<E>>>;
type BindClient<Req, Resp, E> = <Proto<Req, Result<Resp, WireError<E>>> as
ProtoBindClient<Multiplex, StreamType>>::BindClient;
type WireResponse<Resp, E> = Result<Result<Resp, WireError<E>>, bincode::Error>;
/// TLS-specific functionality
#[cfg(feature = "tls")]
@@ -64,74 +50,6 @@ pub mod tls {
}
}
/// A client that impls `tokio_service::Service` that writes and reads bytes.
///
/// Typically, this would be combined with a serialization pre-processing step
/// and a deserialization post-processing step.
#[doc(hidden)]
pub struct Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
inner: BindClient<Req, Resp, E>,
}
impl<Req, Resp, E> Clone for Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
fn clone(&self) -> Self {
Client { inner: self.inner.clone() }
}
}
impl<Req, Resp, E> Service for Client<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
type Request = Req;
type Response = Result<Resp, ::Error<E>>;
type Error = io::Error;
type Future = ResponseFuture<Req, Resp, E>;
fn call(&self, request: Self::Request) -> Self::Future {
self.inner.call(request).map(Self::map_err)
}
}
impl<Req, Resp, E> Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
fn new(inner: BindClient<Req, Resp, E>) -> Self
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
Client { inner: inner }
}
fn map_err(resp: WireResponse<Resp, E>) -> Result<Resp, ::Error<E>> {
resp.map(|r| r.map_err(::Error::from))
.map_err(::Error::ClientDeserialize)
.and_then(|r| r)
}
}
impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "Client {{ .. }}")
}
}
/// Additional options to configure how the client connects and operates.
#[derive(Default)]
pub struct Options {
@@ -141,13 +59,13 @@ pub struct Options {
}
impl Options {
/// Connect using the given reactor handle.
/// Drive using the given reactor handle. Only used by `FutureClient`s.
pub fn handle(mut self, handle: reactor::Handle) -> Self {
self.reactor = Some(Reactor::Handle(handle));
self
}
/// Connect using the given reactor remote.
/// Drive using the given reactor remote. Only used by `FutureClient`s.
pub fn remote(mut self, remote: reactor::Remote) -> Self {
self.reactor = Some(Reactor::Remote(remote));
self
@@ -161,30 +79,105 @@ impl Options {
}
}
enum Reactor {
Handle(reactor::Handle),
Remote(reactor::Remote),
}
/// Exposes a trait for connecting asynchronously to servers.
pub mod future {
use {REMOTE, Reactor};
use futures::{self, Async, Future, future};
use super::{Options, Reactor, WireResponse};
use {REMOTE, WireError};
#[cfg(feature = "tls")]
use errors::native_to_io;
use futures::{self, Future, future};
use protocol::Proto;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::net::SocketAddr;
use stream_type::StreamType;
use super::{Client, Options};
use tokio_core::net::{TcpStream, TcpStreamNew};
use tokio_core::net::TcpStream;
use tokio_core::reactor;
use tokio_proto::BindClient;
cfg_if! {
if #[cfg(feature = "tls")] {
use tokio_tls::{ConnectAsync, TlsStream, TlsConnectorExt};
use super::tls::Context;
use errors::native_to_io;
} else {}
use tokio_proto::BindClient as ProtoBindClient;
use tokio_proto::multiplex::Multiplex;
use tokio_service::Service;
#[cfg(feature = "tls")]
use tokio_tls::TlsConnectorExt;
#[doc(hidden)]
pub struct Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
inner: BindClient<Req, Resp, E>,
}
/// Types that can connect to a server asynchronously.
pub trait Connect: Sized {
impl<Req, Resp, E> Clone for Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
fn clone(&self) -> Self {
Client { inner: self.inner.clone() }
}
}
impl<Req, Resp, E> Service for Client<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
type Request = Req;
type Response = Resp;
type Error = ::Error<E>;
type Future = ResponseFuture<Req, Resp, E>;
fn call(&self, request: Self::Request) -> Self::Future {
fn identity<T>(t: T) -> T {
t
}
self.inner
.call(request)
.map(Self::map_err as _)
.map_err(::Error::from as _)
.and_then(identity as _)
}
}
impl<Req, Resp, E> Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
fn new(inner: BindClient<Req, Resp, E>) -> Self
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
Client { inner: inner }
}
fn map_err(resp: WireResponse<Resp, E>) -> Result<Resp, ::Error<E>> {
resp.map(|r| r.map_err(::Error::from))
.map_err(::Error::ClientSerialize)
.and_then(|r| r)
}
}
impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "Client {{ .. }}")
}
}
/// Extension methods for clients.
pub trait ClientExt: Sized {
/// The type of the future returned when calling `connect`.
type ConnectFut: Future<Item = Self, Error = io::Error>;
@@ -192,100 +185,12 @@ pub mod future {
fn connect(addr: SocketAddr, options: Options) -> Self::ConnectFut;
}
type ConnectFutureInner<Req, Resp, E, T> = future::Either<futures::Map<futures::AndThen<
TcpStreamNew, T, ConnectFn>, MultiplexConnect<Req, Resp, E>>, futures::Flatten<
futures::MapErr<futures::Oneshot<io::Result<Client<Req, Resp, E>>>,
fn(futures::Canceled) -> io::Error>>>;
/// A future that resolves to a `Client` or an `io::Error`.
#[doc(hidden)]
pub struct ConnectFuture<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
#[cfg(not(feature = "tls"))]
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
inner: ConnectFutureInner<Req, Resp, E, future::FutureResult<StreamType, io::Error>>,
#[cfg(feature = "tls")]
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
inner: ConnectFutureInner<Req, Resp, E, future::Either<future::FutureResult<
StreamType, io::Error>, futures::Map<futures::MapErr<ConnectAsync<TcpStream>,
fn(::native_tls::Error) -> io::Error>, fn(TlsStream<TcpStream>) -> StreamType>>>,
}
pub type ConnectFuture<Req, Resp, E> =
futures::Flatten<futures::MapErr<futures::Oneshot<io::Result<Client<Req, Resp, E>>>,
fn(futures::Canceled) -> io::Error>>;
impl<Req, Resp, E> Future for ConnectFuture<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
type Item = Client<Req, Resp, E>;
type Error = io::Error;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
// Ok to unwrap because we ensure the oneshot is always completed.
match Future::poll(&mut self.inner)? {
Async::Ready(client) => Ok(Async::Ready(client)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
struct MultiplexConnect<Req, Resp, E>(reactor::Handle, PhantomData<(Req, Resp, E)>);
impl<Req, Resp, E> MultiplexConnect<Req, Resp, E> {
fn new(handle: reactor::Handle) -> Self {
MultiplexConnect(handle, PhantomData)
}
}
impl<Req, Resp, E, I> FnOnce<(I,)> for MultiplexConnect<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static,
I: Into<StreamType>
{
type Output = Client<Req, Resp, E>;
extern "rust-call" fn call_once(self, (stream,): (I,)) -> Self::Output {
Client::new(Proto::new().bind_client(&self.0, stream.into()))
}
}
/// Provides the connection Fn impl for Tls
struct ConnectFn {
#[cfg(feature = "tls")]
tls_ctx: Option<Context>,
}
impl FnOnce<(TcpStream,)> for ConnectFn {
#[cfg(feature = "tls")]
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
type Output = future::Either<future::FutureResult<StreamType, io::Error>,
futures::Map<futures::MapErr<ConnectAsync<TcpStream>,
fn(::native_tls::Error)
-> io::Error>,
fn(TlsStream<TcpStream>) -> StreamType>>;
#[cfg(not(feature = "tls"))]
type Output = future::FutureResult<StreamType, io::Error>;
extern "rust-call" fn call_once(self, (tcp,): (TcpStream,)) -> Self::Output {
#[cfg(feature = "tls")]
match self.tls_ctx {
None => future::Either::A(future::ok(StreamType::from(tcp))),
Some(tls_ctx) => {
future::Either::B(tls_ctx.tls_connector
.connect_async(&tls_ctx.domain, tcp)
.map_err(native_to_io as fn(_) -> _)
.map(StreamType::from as fn(_) -> _))
}
}
#[cfg(not(feature = "tls"))]
future::ok(StreamType::from(tcp))
}
}
impl<Req, Resp, E> Connect for Client<Req, Resp, E>
impl<Req, Resp, E> ClientExt for Client<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
@@ -300,90 +205,128 @@ pub mod future {
#[cfg(feature = "tls")]
let tls_ctx = options.tls_ctx.take();
let setup = move |tx: futures::sync::oneshot::Sender<_>| {
move |handle: &reactor::Handle| {
let handle2 = handle.clone();
TcpStream::connect(&addr, handle)
.and_then(move |socket| {
#[cfg(feature = "tls")]
match tls_ctx {
Some(tls_ctx) => {
future::Either::A(tls_ctx.tls_connector
.connect_async(&tls_ctx.domain, socket)
.map(StreamType::Tls)
.map_err(native_to_io))
}
None => future::Either::B(future::ok(StreamType::Tcp(socket))),
let connect = move |handle: &reactor::Handle| {
let handle2 = handle.clone();
TcpStream::connect(&addr, handle)
.and_then(move |socket| {
#[cfg(feature = "tls")]
match tls_ctx {
Some(tls_ctx) => {
future::Either::A(tls_ctx.tls_connector
.connect_async(&tls_ctx.domain, socket)
.map(StreamType::Tls)
.map_err(native_to_io))
}
#[cfg(not(feature = "tls"))]
future::ok(StreamType::Tcp(socket))
})
.map(move |tcp| Client::new(Proto::new().bind_client(&handle2, tcp)))
.then(move |result| {
tx.complete(result);
Ok(())
})
}
None => future::Either::B(future::ok(StreamType::Tcp(socket))),
}
#[cfg(not(feature = "tls"))]
future::ok(StreamType::Tcp(socket))
})
.map(move |tcp| Client::new(Proto::new().bind_client(&handle2, tcp)))
};
let (tx, rx) = futures::oneshot();
let setup = move |handle: &reactor::Handle| {
connect(handle).then(move |result| {
tx.complete(result);
Ok(())
})
};
let rx = match options.reactor {
match options.reactor {
Some(Reactor::Handle(handle)) => {
#[cfg(feature = "tls")]
let connect_fn = ConnectFn { tls_ctx: options.tls_ctx };
#[cfg(not(feature = "tls"))]
let connect_fn = ConnectFn {};
let tcp = TcpStream::connect(&addr, &handle)
.and_then(connect_fn)
.map(MultiplexConnect::new(handle));
return ConnectFuture { inner: future::Either::A(tcp) };
handle.spawn(setup(&handle));
}
Some(Reactor::Remote(remote)) => {
let (tx, rx) = futures::oneshot();
remote.spawn(setup(tx));
rx
remote.spawn(setup);
}
None => {
let (tx, rx) = futures::oneshot();
REMOTE.spawn(setup(tx));
rx
REMOTE.spawn(setup);
}
};
}
fn panic(canceled: futures::Canceled) -> io::Error {
unreachable!(canceled)
}
ConnectFuture { inner: future::Either::B(rx.map_err(panic as fn(_) -> _).flatten()) }
rx.map_err(panic as _).flatten()
}
}
type ResponseFuture<Req, Resp, E> =
futures::AndThen<futures::MapErr<
futures::Map<<BindClient<Req, Resp, E> as Service>::Future,
fn(WireResponse<Resp, E>) -> Result<Resp, ::Error<E>>>,
fn(io::Error) -> ::Error<E>>,
Result<Resp, ::Error<E>>,
fn(Result<Resp, ::Error<E>>) -> Result<Resp, ::Error<E>>>;
type BindClient<Req, Resp, E> =
<Proto<Req, Result<Resp, WireError<E>>>
as ProtoBindClient<Multiplex, StreamType>>::BindClient;
}
/// Exposes a trait for connecting synchronously to servers.
pub mod sync {
use client::future::Connect as FutureConnect;
use futures::{Future, future};
use super::Options;
use super::Reactor;
use super::future::{Client as FutureClient, ClientExt as FutureClientExt};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::io;
use std::net::ToSocketAddrs;
use super::{Client, Options};
use tokio_core::reactor;
use tokio_service::Service;
use util::FirstSocketAddr;
/// Types that can connect to a server synchronously.
pub trait Connect: Sized {
/// Connects to a server located at the given address.
fn connect<A>(addr: A, options: Options) -> Result<Self, io::Error> where A: ToSocketAddrs;
#[doc(hidden)]
pub struct Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
inner: FutureClient<Req, Resp, E>,
reactor: reactor::Core,
}
impl<Req, Resp, E> Connect for Client<Req, Resp, E>
impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "Client {{ .. }}")
}
}
impl<Req, Resp, E> Client<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
fn connect<A>(addr: A, options: Options) -> Result<Self, io::Error>
/// Drives an RPC call for the given request.
pub fn call(&mut self, request: Req) -> Result<Resp, ::Error<E>> {
self.reactor.run(self.inner.call(request))
}
}
/// Extension methods for Clients.
pub trait ClientExt: Sized {
/// Connects to a server located at the given address.
fn connect<A>(addr: A, options: Options) -> io::Result<Self> where A: ToSocketAddrs;
}
impl<Req, Resp, E> ClientExt for Client<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
fn connect<A>(addr: A, mut options: Options) -> io::Result<Self>
where A: ToSocketAddrs
{
let mut reactor = reactor::Core::new()?;
let addr = addr.try_first_socket_addr()?;
// Wrapped in a lazy future to ensure execution occurs when a task is present.
future::lazy(move || <Self as FutureConnect>::connect(addr, options)).wait()
options.reactor = Some(Reactor::Handle(reactor.handle()));
Ok(Client {
inner: reactor.run(FutureClient::connect(addr, options))?,
reactor: reactor,
})
}
}
}

View File

@@ -3,7 +3,6 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms.
use bincode;
use serde::{Deserialize, Serialize};
use std::{fmt, io};
use std::error::Error as StdError;
@@ -13,23 +12,15 @@ use std::error::Error as StdError;
pub enum Error<E> {
/// Any IO error.
Io(io::Error),
/// Error in deserializing a server response.
/// Error serializing the client request or deserializing the server response.
///
/// Typically this indicates a faulty implementation of `serde::Serialize` or
/// `serde::Deserialize`.
ClientDeserialize(bincode::serde::DeserializeError),
/// Error in serializing a client request.
///
/// Typically this indicates a faulty implementation of `serde::Serialize`.
ClientSerialize(bincode::serde::SerializeError),
/// Error in deserializing a client request.
ClientSerialize(::bincode::Error),
/// Error serializing the server response or deserializing the client request.
///
/// Typically this indicates a faulty implementation of `serde::Serialize` or
/// `serde::Deserialize`.
ServerDeserialize(String),
/// Error in serializing a server response.
///
/// Typically this indicates a faulty implementation of `serde::Serialize`.
ServerSerialize(String),
/// The server was unable to reply to the rpc for some reason.
///
@@ -41,9 +32,7 @@ pub enum Error<E> {
impl<E: StdError + Deserialize + Serialize + Send + 'static> fmt::Display for Error<E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::ClientDeserialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e),
Error::ClientSerialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e),
Error::ServerDeserialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e),
Error::ServerSerialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e),
Error::App(ref e) => fmt::Display::fmt(e, f),
Error::Io(ref e) => fmt::Display::fmt(e, f),
@@ -54,10 +43,12 @@ impl<E: StdError + Deserialize + Serialize + Send + 'static> fmt::Display for Er
impl<E: StdError + Deserialize + Serialize + Send + 'static> StdError for Error<E> {
fn description(&self) -> &str {
match *self {
Error::ClientDeserialize(_) => "The client failed to deserialize the server response.",
Error::ClientSerialize(_) => "The client failed to serialize the request.",
Error::ServerDeserialize(_) => "The server failed to deserialize the request.",
Error::ServerSerialize(_) => "The server failed to serialize the response.",
Error::ClientSerialize(_) => {
"The client failed to serialize the request or deserialize the response."
}
Error::ServerSerialize(_) => {
"The server failed to serialize the response or deserialize the request."
}
Error::App(ref e) => e.description(),
Error::Io(ref e) => e.description(),
}
@@ -65,9 +56,7 @@ impl<E: StdError + Deserialize + Serialize + Send + 'static> StdError for Error<
fn cause(&self) -> Option<&StdError> {
match *self {
Error::ClientDeserialize(ref e) => e.cause(),
Error::ClientSerialize(ref e) => e.cause(),
Error::ServerDeserialize(_) |
Error::ServerSerialize(_) |
Error::App(_) => None,
Error::Io(ref e) => e.cause(),
@@ -84,7 +73,6 @@ impl<E> From<io::Error> for Error<E> {
impl<E> From<WireError<E>> for Error<E> {
fn from(err: WireError<E>) -> Self {
match err {
WireError::ServerDeserialize(s) => Error::ServerDeserialize(s),
WireError::ServerSerialize(s) => Error::ServerSerialize(s),
WireError::App(e) => Error::App(e),
}
@@ -95,9 +83,7 @@ impl<E> From<WireError<E>> for Error<E> {
#[doc(hidden)]
#[derive(Deserialize, Serialize, Clone, Debug)]
pub enum WireError<E> {
/// Error in deserializing a client request.
ServerDeserialize(String),
/// Error in serializing server response.
/// Error in serializing the server response or deserializing the client request.
ServerSerialize(String),
/// The server was unable to reply to the rpc for some reason.
App(E),

View File

@@ -33,10 +33,12 @@
//!
//! #[macro_use]
//! extern crate tarpc;
//! extern crate tokio_core;
//!
//! use tarpc::{client, server};
//! use tarpc::client::sync::Connect;
//! use tarpc::client::sync::ClientExt;
//! use tarpc::util::Never;
//! use tokio_core::reactor;
//!
//! service! {
//! rpc hello(name: String) -> String;
@@ -53,8 +55,9 @@
//!
//! fn main() {
//! let addr = "localhost:10000";
//! let reactor = reactor::Core::new().unwrap();
//! let _server = HelloServer.listen(addr, server::Options::default());
//! let client = SyncClient::connect(addr, client::Options::default()).unwrap();
//! let mut client = SyncClient::connect(addr, client::Options::default()).unwrap();
//! println!("{}", client.hello("Mom".to_string()).unwrap());
//! }
//! ```
@@ -70,7 +73,7 @@
//! extern crate tarpc;
//!
//! use tarpc::{client, server};
//! use tarpc::client::sync::Connect;
//! use tarpc::client::sync::ClientExt;
//! use tarpc::util::Never;
//! use tarpc::native_tls::{TlsAcceptor, Pkcs12};
//!
@@ -97,7 +100,7 @@
//! let addr = "localhost:10000";
//! let acceptor = get_acceptor();
//! let _server = HelloServer.listen(addr, server::Options::default().tls(acceptor));
//! let client = SyncClient::connect(addr,
//! let mut client = SyncClient::connect(addr,
//! client::Options::default()
//! .tls(client::tls::Context::new("foobar.com").unwrap()))
//! .unwrap();
@@ -106,12 +109,10 @@
//! ```
//!
#![deny(missing_docs)]
#![feature(plugin, conservative_impl_trait, never_type, unboxed_closures, fn_traits,
specialization)]
#![feature(plugin, never_type, struct_field_attributes)]
#![plugin(tarpc_plugins)]
extern crate byteorder;
extern crate bytes;
#[macro_use]
extern crate lazy_static;
#[macro_use]
@@ -119,7 +120,6 @@ extern crate log;
extern crate net2;
#[macro_use]
extern crate serde_derive;
extern crate take;
#[macro_use]
extern crate cfg_if;
@@ -183,12 +183,6 @@ fn spawn_core() -> reactor::Remote {
rx.recv().unwrap()
}
#[derive(Clone)]
enum Reactor {
Handle(reactor::Handle),
Remote(reactor::Remote),
}
cfg_if! {
if #[cfg(feature = "tls")] {
extern crate tokio_tls;

View File

@@ -77,7 +77,9 @@ macro_rules! impl_deserialize {
impl $crate::serde::de::Visitor for impl_deserialize_FieldVisitor__ {
type Value = impl_deserialize_Field__;
fn expecting(&self, formatter: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
fn expecting(&self, formatter: &mut ::std::fmt::Formatter)
-> ::std::fmt::Result
{
formatter.write_str("an unsigned integer")
}
@@ -318,11 +320,10 @@ macro_rules! service {
impl_deserialize!(tarpc_service_Error__, NotIrrefutable(()) $($fn_name($error))*);
impl_serialize!(tarpc_service_Error__, {}, NotIrrefutable(()) $($fn_name($error))*);
/// Defines the `Future` RPC service. Implementors must be `Clone`, `Send`, and `'static`,
/// Defines the `Future` RPC service. Implementors must be `Clone` and `'static`,
/// as required by `tokio_proto::NewService`. This is required so that the service can be used
/// to respond to multiple requests concurrently.
pub trait FutureService:
::std::marker::Send +
::std::clone::Clone +
'static
{
@@ -345,13 +346,14 @@ macro_rules! service {
/// the default tokio `Loop`.
fn listen(self,
addr: ::std::net::SocketAddr,
handle: &$crate::tokio_core::reactor::Handle,
options: $crate::server::Options)
-> $crate::server::ListenFuture
-> ::std::io::Result<::std::net::SocketAddr>
{
return $crate::server::listen(
move || Ok(tarpc_service_AsyncServer__(self.clone())),
addr,
options);
return $crate::server::listen(move || Ok(tarpc_service_AsyncServer__(self.clone())),
addr,
handle,
options);
#[allow(non_camel_case_types)]
#[derive(Clone)]
@@ -412,7 +414,7 @@ macro_rules! service {
where tarpc_service_S__: FutureService
{
type Request = ::std::result::Result<tarpc_service_Request__,
$crate::bincode::serde::DeserializeError>;
$crate::bincode::Error>;
type Response = $crate::server::Response<tarpc_service_Response__,
tarpc_service_Error__>;
type Error = ::std::io::Error;
@@ -425,7 +427,7 @@ macro_rules! service {
return tarpc_service_FutureReply__::DeserializeError(
$crate::futures::finished(
::std::result::Result::Err(
$crate::WireError::ServerDeserialize(
$crate::WireError::ServerSerialize(
::std::string::ToString::to_string(
&tarpc_service_deserialize_err__)))));
}
@@ -481,9 +483,9 @@ macro_rules! service {
pub trait SyncServiceExt: SyncService {
/// Spawns the service, binding to the given address and running on
/// the default tokio `Loop`.
fn listen<L>(self, addr: L, options: $crate::server::Options)
fn listen<A>(self, addr: A, options: $crate::server::Options)
-> ::std::io::Result<::std::net::SocketAddr>
where L: ::std::net::ToSocketAddrs
where A: ::std::net::ToSocketAddrs
{
let tarpc_service__ = SyncServer__ {
service: self,
@@ -492,10 +494,26 @@ macro_rules! service {
let tarpc_service_addr__ =
$crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?;
// Wrapped in a lazy future to ensure execution occurs when a task is present.
return $crate::futures::Future::wait($crate::futures::future::lazy(move || {
FutureServiceExt::listen(tarpc_service__, tarpc_service_addr__, options)
}));
let (tx_, rx_) = ::std::sync::mpsc::channel();
::std::thread::spawn(move || {
match $crate::tokio_core::reactor::Core::new() {
::std::result::Result::Ok(mut reactor_) => {
let addr_ = FutureServiceExt::listen(tarpc_service__,
tarpc_service_addr__,
&reactor_.handle(),
options);
tx_.send(addr_).unwrap();
loop {
reactor_.turn(::std::option::Option::None);
}
}
::std::result::Result::Err(error_) => {
tx_.send(Err(error_)).unwrap();
}
}
});
return rx_.recv().unwrap();
#[derive(Clone)]
struct SyncServer__<S> {
@@ -546,22 +564,26 @@ macro_rules! service {
impl<S> SyncServiceExt for S where S: SyncService {}
#[allow(unused)]
#[derive(Clone, Debug)]
/// The client stub that makes RPC calls to the server. Exposes a blocking interface.
pub struct SyncClient(FutureClient);
pub struct SyncClient {
inner: tarpc_service_SyncClient__,
}
impl $crate::client::sync::Connect for SyncClient {
fn connect<A>(addr_: A, opts_: $crate::client::Options)
-> ::std::result::Result<Self, ::std::io::Error>
impl ::std::fmt::Debug for SyncClient {
fn fmt(&self, formatter: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
write!(formatter, "SyncClient {{ inner: {:?}, .. }}", self.inner)
}
}
impl $crate::client::sync::ClientExt for SyncClient {
fn connect<A>(addr_: A, options_: $crate::client::Options) -> ::std::io::Result<Self>
where A: ::std::net::ToSocketAddrs,
{
let addr_ = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr_)?;
// Wrapped in a lazy future to ensure execution occurs when a task is present.
let client_ = $crate::futures::Future::wait($crate::futures::future::lazy(move || {
<FutureClient as $crate::client::future::Connect>::connect(addr_, opts_)
}))?;
let client_ = SyncClient(client_);
::std::result::Result::Ok(client_)
let client_ = <tarpc_service_SyncClient__
as $crate::client::sync::ClientExt>::connect(addr_, options_)?;
::std::result::Result::Ok(SyncClient {
inner: client_,
})
}
}
@@ -569,89 +591,17 @@ macro_rules! service {
$(
#[allow(unused)]
$(#[$attr])*
pub fn $fn_name(&self, $($arg: $in_),*)
pub fn $fn_name(&mut self, $($arg: $in_),*)
-> ::std::result::Result<$out, $crate::Error<$error>>
{
// Wrapped in a lazy future to ensure execution occurs when a task is present.
$crate::futures::Future::wait($crate::futures::future::lazy(move || {
(self.0).$fn_name($($arg),*)
}))
}
)*
}
#[allow(non_camel_case_types)]
type tarpc_service_Client__ =
$crate::client::Client<tarpc_service_Request__,
tarpc_service_Response__,
tarpc_service_Error__>;
#[allow(non_camel_case_types)]
/// Implementation detail: Pending connection.
pub struct tarpc_service_ConnectFuture__<T> {
inner: $crate::futures::Map<$crate::client::future::ConnectFuture<
tarpc_service_Request__,
tarpc_service_Response__,
tarpc_service_Error__>,
fn(tarpc_service_Client__) -> T>,
}
impl<T> $crate::futures::Future for tarpc_service_ConnectFuture__<T> {
type Item = T;
type Error = ::std::io::Error;
fn poll(&mut self) -> $crate::futures::Poll<Self::Item, Self::Error> {
$crate::futures::Future::poll(&mut self.inner)
}
}
#[allow(unused)]
#[derive(Clone, Debug)]
/// The client stub that makes RPC calls to the server. Exposes a Future interface.
pub struct FutureClient(tarpc_service_Client__);
impl<'a> $crate::client::future::Connect for FutureClient {
type ConnectFut = tarpc_service_ConnectFuture__<Self>;
fn connect(tarpc_service_addr__: ::std::net::SocketAddr,
tarpc_service_options__: $crate::client::Options)
-> Self::ConnectFut
{
let client = <tarpc_service_Client__ as $crate::client::future::Connect>::connect(
tarpc_service_addr__, tarpc_service_options__);
tarpc_service_ConnectFuture__ {
inner: $crate::futures::Future::map(client, FutureClient)
}
}
}
impl FutureClient {
$(
#[allow(unused)]
$(#[$attr])*
pub fn $fn_name(&self, $($arg: $in_),*)
-> $crate::futures::future::Then<
<tarpc_service_Client__ as $crate::tokio_service::Service>::Future,
::std::result::Result<$out, $crate::Error<$error>>,
fn(::std::result::Result<
::std::result::Result<tarpc_service_Response__,
$crate::Error<tarpc_service_Error__>>,
::std::io::Error>)
-> ::std::result::Result<$out, $crate::Error<$error>>> {
let tarpc_service_req__ = tarpc_service_Request__::$fn_name(($($arg,)*));
let tarpc_service_fut__ =
$crate::tokio_service::Service::call(&self.0, tarpc_service_req__);
return $crate::futures::Future::then(tarpc_service_fut__, then__);
return then__(self.inner.call(tarpc_service_Request__::$fn_name(($($arg,)*))));
// TODO: this code is duplicated in both FutureClient and SyncClient.
fn then__(tarpc_service_msg__:
::std::result::Result<
::std::result::Result<tarpc_service_Response__,
$crate::Error<tarpc_service_Error__>>,
::std::io::Error>)
$crate::Error<tarpc_service_Error__>>)
-> ::std::result::Result<$out, $crate::Error<$error>> {
match tarpc_service_msg__? {
match tarpc_service_msg__ {
::std::result::Result::Ok(tarpc_service_msg__) => {
if let tarpc_service_Response__::$fn_name(tarpc_service_msg__) =
tarpc_service_msg__
@@ -672,14 +622,120 @@ macro_rules! service {
unreachable!()
}
}
$crate::Error::ServerDeserialize(tarpc_service_err__) => {
$crate::Error::ServerDeserialize(tarpc_service_err__)
}
$crate::Error::ServerSerialize(tarpc_service_err__) => {
$crate::Error::ServerSerialize(tarpc_service_err__)
}
$crate::Error::ClientDeserialize(tarpc_service_err__) => {
$crate::Error::ClientDeserialize(tarpc_service_err__)
$crate::Error::ClientSerialize(tarpc_service_err__) => {
$crate::Error::ClientSerialize(tarpc_service_err__)
}
$crate::Error::Io(tarpc_service_error__) => {
$crate::Error::Io(tarpc_service_error__)
}
})
}
}
}
}
)*
}
#[allow(non_camel_case_types)]
type tarpc_service_FutureClient__ =
$crate::client::future::Client<tarpc_service_Request__,
tarpc_service_Response__,
tarpc_service_Error__>;
#[allow(non_camel_case_types)]
type tarpc_service_SyncClient__ =
$crate::client::sync::Client<tarpc_service_Request__,
tarpc_service_Response__,
tarpc_service_Error__>;
#[allow(non_camel_case_types)]
/// Implementation detail: Pending connection.
pub struct tarpc_service_ConnectFuture__<T> {
inner: $crate::futures::Map<$crate::client::future::ConnectFuture<
tarpc_service_Request__,
tarpc_service_Response__,
tarpc_service_Error__>,
fn(tarpc_service_FutureClient__) -> T>,
}
impl<T> $crate::futures::Future for tarpc_service_ConnectFuture__<T> {
type Item = T;
type Error = ::std::io::Error;
fn poll(&mut self) -> $crate::futures::Poll<Self::Item, Self::Error> {
$crate::futures::Future::poll(&mut self.inner)
}
}
#[allow(unused)]
#[derive(Clone, Debug)]
/// The client stub that makes RPC calls to the server. Exposes a Future interface.
pub struct FutureClient(tarpc_service_FutureClient__);
impl<'a> $crate::client::future::ClientExt for FutureClient {
type ConnectFut = tarpc_service_ConnectFuture__<Self>;
fn connect(tarpc_service_addr__: ::std::net::SocketAddr,
tarpc_service_options__: $crate::client::Options)
-> Self::ConnectFut
{
let client = <tarpc_service_FutureClient__
as $crate::client::future::ClientExt>::connect(tarpc_service_addr__,
tarpc_service_options__);
tarpc_service_ConnectFuture__ {
inner: $crate::futures::Future::map(client, FutureClient)
}
}
}
impl FutureClient {
$(
#[allow(unused)]
$(#[$attr])*
pub fn $fn_name(&self, $($arg: $in_),*)
-> $crate::futures::future::Then<
<tarpc_service_FutureClient__ as $crate::tokio_service::Service>::Future,
::std::result::Result<$out, $crate::Error<$error>>,
fn(::std::result::Result<tarpc_service_Response__,
$crate::Error<tarpc_service_Error__>>)
-> ::std::result::Result<$out, $crate::Error<$error>>> {
let tarpc_service_req__ = tarpc_service_Request__::$fn_name(($($arg,)*));
let tarpc_service_fut__ =
$crate::tokio_service::Service::call(&self.0, tarpc_service_req__);
return $crate::futures::Future::then(tarpc_service_fut__, then__);
fn then__(tarpc_service_msg__:
::std::result::Result<tarpc_service_Response__,
$crate::Error<tarpc_service_Error__>>)
-> ::std::result::Result<$out, $crate::Error<$error>> {
match tarpc_service_msg__ {
::std::result::Result::Ok(tarpc_service_msg__) => {
if let tarpc_service_Response__::$fn_name(tarpc_service_msg__) =
tarpc_service_msg__
{
::std::result::Result::Ok(tarpc_service_msg__)
} else {
unreachable!()
}
}
::std::result::Result::Err(tarpc_service_err__) => {
::std::result::Result::Err(match tarpc_service_err__ {
$crate::Error::App(tarpc_service_err__) => {
if let tarpc_service_Error__::$fn_name(
tarpc_service_err__) = tarpc_service_err__
{
$crate::Error::App(tarpc_service_err__)
} else {
unreachable!()
}
}
$crate::Error::ServerSerialize(tarpc_service_err__) => {
$crate::Error::ServerSerialize(tarpc_service_err__)
}
$crate::Error::ClientSerialize(tarpc_service_err__) => {
$crate::Error::ClientSerialize(tarpc_service_err__)
@@ -729,6 +785,7 @@ mod functional_test {
use futures::{Future, failed};
use std::io;
use std::net::SocketAddr;
use tokio_core::reactor;
use util::FirstSocketAddr;
extern crate env_logger;
@@ -751,14 +808,11 @@ mod functional_test {
use client::tls::Context;
use native_tls::{Pkcs12, TlsAcceptor, TlsConnector};
fn tls_context() -> (server::Options, client::Options) {
fn get_tls_server_options() -> server::Options {
let buf = include_bytes!("../test/identity.p12");
let pkcs12 = unwrap!(Pkcs12::from_der(buf, "mypass"));
let acceptor = unwrap!(unwrap!(TlsAcceptor::builder(pkcs12)).build());
let server_options = server::Options::default().tls(acceptor);
let client_options = get_tls_client_options();
(server_options, client_options)
server::Options::default().tls(acceptor)
}
// Making the TlsConnector for testing needs to be OS-dependent just like native-tls.
@@ -778,10 +832,11 @@ mod functional_test {
let mut connector = unwrap!(TlsConnector::builder());
connector.anchor_certificates(&[cert]);
client::Options::default().tls(Context {
domain: DOMAIN.into(),
tls_connector: unwrap!(connector.build()),
})
client::Options::default()
.tls(Context {
domain: DOMAIN.into(),
tls_connector: unwrap!(connector.build()),
})
}
} else if #[cfg(all(not(target_os = "macos"), not(windows)))] {
use native_tls_inner::backend::openssl::TlsConnectorBuilderExt;
@@ -792,10 +847,11 @@ mod functional_test {
.builder_mut()
.set_ca_file("test/root-ca.pem"));
client::Options::default().tls(Context {
domain: DOMAIN.into(),
tls_connector: unwrap!(connector.build()),
})
client::Options::default()
.tls(Context {
domain: DOMAIN.into(),
tls_connector: unwrap!(connector.build()),
})
}
// not implemented for windows or other platforms
} else {
@@ -805,41 +861,42 @@ mod functional_test {
}
}
fn get_sync_client<C>(addr: SocketAddr) -> io::Result<C>
where C: client::sync::Connect
fn start_server_with_sync_client<C, S>(server: S) -> io::Result<(SocketAddr, C)>
where C: client::sync::ClientExt, S: SyncServiceExt
{
let client_options = get_tls_client_options();
C::connect(addr, client_options)
}
fn start_server_with_sync_client<C, S>(server: S) -> (SocketAddr, io::Result<C>)
where C: client::sync::Connect, S: SyncServiceExt
{
let (server_options, client_options) = tls_context();
let server_options = get_tls_server_options();
let addr = unwrap!(server.listen("localhost:0".first_socket_addr(),
server_options));
let client = C::connect(addr, client_options);
(addr, client)
let client = unwrap!(C::connect(addr, get_tls_client_options()));
Ok((addr, client))
}
fn start_server_with_async_client<C, S>(server: S) -> (SocketAddr, C)
where C: client::future::Connect, S: FutureServiceExt
fn start_server_with_async_client<C, S>(server: S)
-> io::Result<(SocketAddr, reactor::Core, C)>
where C: client::future::ClientExt, S: FutureServiceExt
{
let (server_options, client_options) = tls_context();
let addr = unwrap!(server.listen("localhost:0".first_socket_addr(),
server_options).wait());
let client = unwrap!(C::connect(addr, client_options).wait());
(addr, client)
let mut reactor = reactor::Core::new()?;
let server_options = get_tls_server_options();
let addr = server.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
server_options)?;
let client_options = get_tls_client_options().handle(reactor.handle());
let client = unwrap!(reactor.run(C::connect(addr, client_options)));
Ok((addr, reactor, client))
}
fn start_err_server_with_async_client<C, S>(server: S) -> (SocketAddr, C)
where C: client::future::Connect, S: error_service::FutureServiceExt
fn start_err_server_with_async_client<C, S>(server: S)
-> io::Result<(SocketAddr, reactor::Core, C)>
where C: client::future::ClientExt, S: error_service::FutureServiceExt
{
let (server_options, client_options) = tls_context();
let addr = unwrap!(server.listen("localhost:0".first_socket_addr(),
server_options).wait());
let client = unwrap!(C::connect(addr, client_options).wait());
(addr, client)
let mut reactor = reactor::Core::new()?;
let server_options = get_tls_server_options();
let addr = server.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
server_options)?;
let client_options = get_tls_client_options().handle(reactor.handle());
let client = unwrap!(reactor.run(C::connect(addr, client_options)));
Ok((addr, reactor, client))
}
} else {
fn get_server_options() -> server::Options {
@@ -851,36 +908,45 @@ mod functional_test {
}
fn get_sync_client<C>(addr: SocketAddr) -> io::Result<C>
where C: client::sync::Connect
where C: client::sync::ClientExt
{
C::connect(addr, get_client_options())
}
fn start_server_with_sync_client<C, S>(server: S) -> (SocketAddr, io::Result<C>)
where C: client::sync::Connect, S: SyncServiceExt
fn start_server_with_sync_client<C, S>(server: S) -> io::Result<(SocketAddr, C)>
where C: client::sync::ClientExt, S: SyncServiceExt
{
let addr = unwrap!(server.listen("localhost:0".first_socket_addr(),
get_server_options()));
let options = get_server_options();
let addr = unwrap!(server.listen("localhost:0".first_socket_addr(), options));
let client = unwrap!(get_sync_client(addr));
Ok((addr, client))
}
fn start_server_with_async_client<C, S>(server: S)
-> io::Result<(SocketAddr, reactor::Core, C)>
where C: client::future::ClientExt, S: FutureServiceExt
{
let mut reactor = reactor::Core::new()?;
let options = get_server_options();
let addr = server.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
options)?;
let client = unwrap!(reactor.run(C::connect(addr, get_client_options())));
Ok((addr, reactor, client))
}
fn start_err_server_with_async_client<C, S>(server: S)
-> io::Result<(SocketAddr, reactor::Core, C)>
where C: client::future::ClientExt, S: error_service::FutureServiceExt
{
let mut reactor = reactor::Core::new()?;
let options = get_server_options();
let addr = server.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
options)?;
let client = C::connect(addr, get_client_options());
(addr, client)
}
fn start_server_with_async_client<C, S>(server: S) -> (SocketAddr, C)
where C: client::future::Connect, S: FutureServiceExt
{
let addr = unwrap!(server.listen("localhost:0".first_socket_addr(),
get_server_options()).wait());
let client = unwrap!(C::connect(addr, get_client_options()).wait());
(addr, client)
}
fn start_err_server_with_async_client<C, S>(server: S) -> (SocketAddr, C)
where C: client::future::Connect, S: error_service::FutureServiceExt
{
let addr = unwrap!(server.listen("localhost:0".first_socket_addr(),
get_server_options()).wait());
let client = unwrap!(C::connect(addr, get_client_options()).wait());
(addr, client)
let client = unwrap!(reactor.run(client));
Ok((addr, reactor, client))
}
}
}
@@ -905,8 +971,8 @@ mod functional_test {
#[test]
fn simple() {
let _ = env_logger::init();
let (_, client) = start_server_with_sync_client::<SyncClient, Server>(Server);
let client = unwrap!(client);
let (_, mut client) = unwrap!(start_server_with_sync_client::<SyncClient,
Server>(Server));
assert_eq!(3, client.add(1, 2).unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
}
@@ -914,19 +980,20 @@ mod functional_test {
#[test]
fn other_service() {
let _ = env_logger::init();
let (_, client) = start_server_with_sync_client::<super::other_service::SyncClient,
Server>(Server);
let client = client.expect("Could not connect!");
let (_, mut client) =
unwrap!(start_server_with_sync_client::<super::other_service::SyncClient,
Server>(Server));
match client.foo().err().expect("failed unwrap") {
::Error::ServerDeserialize(_) => {} // good
bad => panic!("Expected Error::ServerDeserialize but got {}", bad),
::Error::ServerSerialize(_) => {} // good
bad => panic!("Expected Error::ServerSerialize but got {}", bad),
}
}
}
mod future {
use futures::{Finished, Future, finished};
use super::{FutureClient, FutureService, env_logger, start_server_with_async_client};
use futures::{Finished, finished};
use tokio_core::reactor;
use util::Never;
#[derive(Clone)]
@@ -949,32 +1016,35 @@ mod functional_test {
#[test]
fn simple() {
let _ = env_logger::init();
let (_, client) = start_server_with_async_client::<FutureClient, Server>(Server);
assert_eq!(3, client.add(1, 2).wait().unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap());
let (_, mut reactor, client) =
unwrap!(start_server_with_async_client::<FutureClient, Server>(Server));
assert_eq!(3, reactor.run(client.add(1, 2)).unwrap());
assert_eq!("Hey, Tim.",
reactor.run(client.hey("Tim".to_string())).unwrap());
}
#[test]
fn concurrent() {
let _ = env_logger::init();
let (_, client) = start_server_with_async_client::<FutureClient, Server>(Server);
let (_, mut reactor, client) =
unwrap!(start_server_with_async_client::<FutureClient, Server>(Server));
let req1 = client.add(1, 2);
let req2 = client.add(3, 4);
let req3 = client.hey("Tim".to_string());
assert_eq!(3, req1.wait().unwrap());
assert_eq!(7, req2.wait().unwrap());
assert_eq!("Hey, Tim.", req3.wait().unwrap());
assert_eq!(3, reactor.run(req1).unwrap());
assert_eq!(7, reactor.run(req2).unwrap());
assert_eq!("Hey, Tim.", reactor.run(req3).unwrap());
}
#[test]
fn other_service() {
let _ = env_logger::init();
let (_, client) =
start_server_with_async_client::<super::other_service::FutureClient,
Server>(Server);
match client.foo().wait().err().unwrap() {
::Error::ServerDeserialize(_) => {} // good
bad => panic!(r#"Expected Error::ServerDeserialize but got "{}""#, bad),
let (_, mut reactor, client) =
unwrap!(start_server_with_async_client::<super::other_service::FutureClient,
Server>(Server));
match reactor.run(client.foo()).err().unwrap() {
::Error::ServerSerialize(_) => {} // good
bad => panic!(r#"Expected Error::ServerSerialize but got "{}""#, bad),
}
}
@@ -983,36 +1053,40 @@ mod functional_test {
use util::FirstSocketAddr;
use server;
use super::FutureServiceExt;
let _ = env_logger::init();
let addr = Server.listen("localhost:0".first_socket_addr(), server::Options::default())
.wait()
.unwrap();
Server.listen(addr, server::Options::default())
.wait()
let reactor = reactor::Core::new().unwrap();
let addr = Server.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default())
.unwrap();
Server.listen(addr, &reactor.handle(), server::Options::default()).unwrap();
}
#[cfg(feature = "tls")]
#[test]
fn tcp_and_tls() {
use {client, server};
use util::FirstSocketAddr;
use client::future::Connect;
use client::future::ClientExt;
use super::FutureServiceExt;
let _ = env_logger::init();
let (_, client) = start_server_with_async_client::<FutureClient, Server>(Server);
assert_eq!(3, client.add(1, 2).wait().unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap());
let (_, mut reactor, client) =
unwrap!(start_server_with_async_client::<FutureClient, Server>(Server));
assert_eq!(3, reactor.run(client.add(1, 2)).unwrap());
assert_eq!("Hey, Tim.",
reactor.run(client.hey("Tim".to_string())).unwrap());
let addr = Server.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default())
.wait()
.unwrap();
let client = FutureClient::connect(addr, client::Options::default()).wait().unwrap();
assert_eq!(3, client.add(1, 2).wait().unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap());
let options = client::Options::default().handle(reactor.handle());
let client = reactor.run(FutureClient::connect(addr, options)).unwrap();
assert_eq!(3, reactor.run(client.add(1, 2)).unwrap());
assert_eq!("Hey, Tim.",
reactor.run(client.hey("Tim".to_string())).unwrap());
}
}
@@ -1040,28 +1114,19 @@ mod functional_test {
use self::error_service::*;
let _ = env_logger::init();
let (addr, client) = start_err_server_with_async_client::<FutureClient,
ErrorServer>(ErrorServer);
client.bar()
.then(move |result| {
match result.err().unwrap() {
::Error::App(e) => {
assert_eq!(e.description(), "lol jk");
Ok::<_, ()>(())
} // good
bad => panic!("Expected Error::App but got {:?}", bad),
}
})
.wait()
let (_, mut reactor, client) =
start_err_server_with_async_client::<FutureClient, ErrorServer>(ErrorServer).unwrap();
reactor.run(client.bar()
.then(move |result| {
match result.err().unwrap() {
::Error::App(e) => {
assert_eq!(e.description(), "lol jk");
Ok::<_, ()>(())
} // good
bad => panic!("Expected Error::App but got {:?}", bad),
}
}))
.unwrap();
let client = get_sync_client::<SyncClient>(addr).unwrap();
match client.bar().err().unwrap() {
::Error::App(e) => {
assert_eq!(e.description(), "lol jk");
} // good
bad => panic!("Expected Error::App but got {:?}", bad),
}
}
pub mod other_service {

View File

@@ -1,7 +1,6 @@
#![feature(plugin_registrar, rustc_private)]
extern crate itertools;
extern crate rustc;
extern crate rustc_plugin;
extern crate syntax;

View File

@@ -4,7 +4,7 @@
// This file may not be copied, modified, or distributed except according to those terms.
use {serde, tokio_core};
use bincode::{SizeLimit, serde as bincode};
use bincode::{self, SizeLimit};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::io::{self, Cursor};
use std::marker::PhantomData;
@@ -12,7 +12,6 @@ use std::mem;
use tokio_core::io::{EasyBuf, Framed, Io};
use tokio_proto::multiplex::{ClientProto, ServerProto};
use tokio_proto::streaming::multiplex::RequestId;
use util::Debugger;
// `Encode` is the type that `Codec` encodes. `Decode` is the type it decodes.
pub struct Codec<Encode, Decode> {
@@ -40,7 +39,7 @@ impl<Encode, Decode> tokio_core::io::Codec for Codec<Encode, Decode>
Decode: serde::Deserialize
{
type Out = (RequestId, Encode);
type In = (RequestId, Result<Decode, bincode::DeserializeError>);
type In = (RequestId, Result<Decode, bincode::Error>);
fn encode(&mut self, (id, message): Self::Out, buf: &mut Vec<u8>) -> io::Result<()> {
buf.write_u64::<BigEndian>(id).unwrap();
@@ -97,7 +96,6 @@ impl<Encode, Decode> tokio_core::io::Codec for Codec<Encode, Decode>
// message.
self.state = Id;
trace!("--> Parsed message: {:?}", Debugger(&result));
return Ok(Some((id, result)));
}
}
@@ -121,7 +119,7 @@ impl<T, Encode, Decode> ServerProto<T> for Proto<Encode, Decode>
Decode: serde::Deserialize + 'static
{
type Response = Encode;
type Request = Result<Decode, bincode::DeserializeError>;
type Request = Result<Decode, bincode::Error>;
type Transport = Framed<T, Codec<Encode, Decode>>;
type BindTransport = Result<Self::Transport, io::Error>;
@@ -135,7 +133,7 @@ impl<T, Encode, Decode> ClientProto<T> for Proto<Encode, Decode>
Encode: serde::Serialize + 'static,
Decode: serde::Deserialize + 'static
{
type Response = Result<Decode, bincode::DeserializeError>;
type Response = Result<Decode, bincode::Error>;
type Request = Encode;
type Transport = Framed<T, Codec<Encode, Decode>>;
type BindTransport = Result<Self::Transport, io::Error>;
@@ -158,8 +156,8 @@ fn serialize() {
let mut codec: Codec<(char, char, char), (char, char, char)> = Codec::new();
codec.encode(MSG, &mut vec).unwrap();
buf.get_mut().append(&mut vec);
let actual: Result<Option<(u64, Result<(char, char, char), bincode::DeserializeError>)>,
io::Error> = codec.decode(&mut buf);
let actual: Result<Option<(u64, Result<(char, char, char), bincode::Error>)>, io::Error> =
codec.decode(&mut buf);
match actual {
Ok(Some((id, ref v))) if id == MSG.0 && *v.as_ref().unwrap() == MSG.1 => {}

View File

@@ -3,8 +3,7 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms.
use {REMOTE, Reactor};
use bincode::serde::DeserializeError;
use bincode;
use errors::WireError;
use futures::{self, Async, Future, Stream, future};
use net2;
@@ -35,24 +34,11 @@ enum Acceptor {
/// Additional options to configure how the server operates.
#[derive(Default)]
pub struct Options {
reactor: Option<Reactor>,
#[cfg(feature = "tls")]
tls_acceptor: Option<TlsAcceptor>,
}
impl Options {
/// Listen using the given reactor handle.
pub fn handle(mut self, handle: reactor::Handle) -> Self {
self.reactor = Some(Reactor::Handle(handle));
self
}
/// Listen using the given reactor remote.
pub fn remote(mut self, remote: reactor::Remote) -> Self {
self.reactor = Some(Reactor::Remote(remote));
self
}
/// Set the `TlsAcceptor`
#[cfg(feature = "tls")]
pub fn tls(mut self, tls_acceptor: TlsAcceptor) -> Self {
@@ -66,10 +52,14 @@ impl Options {
pub type Response<T, E> = Result<T, WireError<E>>;
#[doc(hidden)]
pub fn listen<S, Req, Resp, E>(new_service: S, addr: SocketAddr, options: Options) -> ListenFuture
where S: NewService<Request = Result<Req, DeserializeError>,
pub fn listen<S, Req, Resp, E>(new_service: S,
addr: SocketAddr,
handle: &reactor::Handle,
_options: Options)
-> io::Result<SocketAddr>
where S: NewService<Request = Result<Req, bincode::Error>,
Response = Response<Resp, E>,
Error = io::Error> + Send + 'static,
Error = io::Error> + 'static,
Req: Deserialize + 'static,
Resp: Serialize + 'static,
E: Serialize + 'static
@@ -77,53 +67,30 @@ pub fn listen<S, Req, Resp, E>(new_service: S, addr: SocketAddr, options: Option
// Similar to the client, since `Options` is not `Send`, we take the `TlsAcceptor` when it is
// available.
#[cfg(feature = "tls")]
let acceptor = match options.tls_acceptor {
let acceptor = match _options.tls_acceptor {
Some(tls_acceptor) => Acceptor::Tls(tls_acceptor),
None => Acceptor::Tcp,
};
#[cfg(not(feature = "tls"))]
let acceptor = Acceptor::Tcp;
match options.reactor {
None => {
let (tx, rx) = futures::oneshot();
REMOTE.spawn(move |handle| {
Ok(tx.complete(listen_with(new_service, addr, handle.clone(), acceptor)))
});
ListenFuture { inner: future::Either::A(rx) }
}
Some(Reactor::Remote(remote)) => {
let (tx, rx) = futures::oneshot();
remote.spawn(move |handle| {
Ok(tx.complete(listen_with(new_service, addr, handle.clone(), acceptor)))
});
ListenFuture { inner: future::Either::A(rx) }
}
Some(Reactor::Handle(handle)) => {
ListenFuture {
inner: future::Either::B(future::ok(listen_with(new_service,
addr,
handle,
acceptor))),
}
}
}
listen_with(new_service, addr, handle, acceptor)
}
/// Spawns a service that binds to the given address using the given handle.
fn listen_with<S, Req, Resp, E>(new_service: S,
addr: SocketAddr,
handle: Handle,
handle: &Handle,
_acceptor: Acceptor)
-> io::Result<SocketAddr>
where S: NewService<Request = Result<Req, DeserializeError>,
where S: NewService<Request = Result<Req, bincode::Error>,
Response = Response<Resp, E>,
Error = io::Error> + Send + 'static,
Error = io::Error> + 'static,
Req: Deserialize + 'static,
Resp: Serialize + 'static,
E: Serialize + 'static
{
let listener = listener(&addr, &handle)?;
let listener = listener(&addr, handle)?;
let addr = listener.local_addr()?;
let handle2 = handle.clone();
@@ -175,8 +142,7 @@ fn listener(addr: &SocketAddr, handle: &Handle) -> io::Result<TcpListener> {
/// A future that resolves to a `ServerHandle`.
#[doc(hidden)]
pub struct ListenFuture {
inner: future::Either<futures::Oneshot<io::Result<SocketAddr>>,
future::FutureResult<io::Result<SocketAddr>, futures::Canceled>>,
inner: future::FutureResult<io::Result<SocketAddr>, futures::Canceled>,
}
impl Future for ListenFuture {

View File

@@ -111,18 +111,3 @@ pub trait FirstSocketAddr: ToSocketAddrs {
}
impl<A: ToSocketAddrs> FirstSocketAddr for A {}
/// A struct that will format as the contained type if the type impls Debug.
pub struct Debugger<'a, T: 'a>(pub &'a T);
impl<'a, T: fmt::Debug> fmt::Debug for Debugger<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "{:?}", self.0)
}
}
impl<'a, T> fmt::Debug for Debugger<'a, T> {
default fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "{{not debuggable}}")
}
}