14 Commits

Author SHA1 Message Date
Tim
9ce7938fdc Merge pull request #167 from tikue/master
* Fix breakage from nightly compiler changes.

* Remove unused imports

* Update TLS code to not use deprecated method

* Bump versions

Fixes #166
2017-09-17 17:58:50 -07:00
Tim Kuehn
650dc88da5 Bump versions 2017-09-17 16:02:28 -07:00
Tim Kuehn
3601763442 Update TLS code to not use deprecated method 2017-09-17 15:56:07 -07:00
Tim Kuehn
4aaaea1e04 Remove unused imports 2017-09-17 15:56:07 -07:00
Tim Kuehn
2e214c85d3 Fix breakage from nightly compiler changes.
Fixes #166
2017-09-17 15:56:07 -07:00
Cyril Plisko
0676ab67df Refresh dependencies (#164) 2017-08-14 14:31:47 -07:00
Tim
0b843512dd Use #[allow(unreachable_patterns)] to simplify some macro code. (#163)
By allowing unreachable patterns, we don't have to have 'NotIrrefutable' variants in the Request, Response, and Error enums.

This commit also removes an unused macro in an example.
2017-08-02 13:54:06 -07:00
Jesper Håkansson
85d9416750 doc: Update tarpc version in readme (#162) 2017-07-21 08:37:07 -07:00
Tim
5e3cf3c807 Run the new nightly cargo fmt (#156) 2017-07-18 14:54:46 -07:00
Tim
4dfb3a48c3 Derive Deserialize, Serialize in macros. Requires feature(use_extern_macros). (#151) 2017-07-18 12:04:56 -07:00
Tim
21e8883877 Update to serde 1.0 and bincode 0.8 (#149) 2017-05-05 22:16:44 -07:00
Tim Kuehn
7dbfe07c97 Bump to version 0.7.3 and update release notes. 2017-04-26 12:33:58 -07:00
Jon Gjengset
8bc01a993b format with rustfmt 0.8.3 (#148) 2017-04-26 12:25:49 -07:00
Jon Gjengset
e2728d84f3 Remove unnecessary Sync bound on clients (#147) 2017-04-26 12:11:30 -07:00
27 changed files with 722 additions and 650 deletions

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "tarpc" name = "tarpc"
version = "0.7.2" version = "0.9.0"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"] authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
license = "MIT" license = "MIT"
documentation = "https://docs.rs/tarpc" documentation = "https://docs.rs/tarpc"
@@ -15,7 +15,7 @@ description = "An RPC framework for Rust with a focus on ease of use."
travis-ci = { repository = "google/tarpc" } travis-ci = { repository = "google/tarpc" }
[dependencies] [dependencies]
bincode = "1.0.0-alpha6" bincode = "0.8"
byteorder = "1.0" byteorder = "1.0"
bytes = "0.4" bytes = "0.4"
cfg-if = "0.1.0" cfg-if = "0.1.0"
@@ -24,9 +24,9 @@ lazy_static = "0.2"
log = "0.3" log = "0.3"
net2 = "0.2" net2 = "0.2"
num_cpus = "1.0" num_cpus = "1.0"
serde = "0.9" serde = "1.0"
serde_derive = "0.9" serde_derive = "1.0"
tarpc-plugins = { path = "src/plugins", version = "0.1.1" } tarpc-plugins = { path = "src/plugins", version = "0.2.0" }
thread-pool = "0.1.1" thread-pool = "0.1.1"
tokio-core = "0.1.6" tokio-core = "0.1.6"
tokio-io = "0.1" tokio-io = "0.1"
@@ -38,10 +38,11 @@ native-tls = { version = "0.1.1", optional = true }
tokio-tls = { version = "0.1", optional = true } tokio-tls = { version = "0.1", optional = true }
[dev-dependencies] [dev-dependencies]
chrono = "0.3" chrono = "0.4"
env_logger = "0.3" env_logger = "0.4"
futures-cpupool = "0.1" futures-cpupool = "0.1"
clap = "2.0" clap = "2.0"
serde_bytes = "0.10"
[target.'cfg(target_os = "macos")'.dev-dependencies] [target.'cfg(target_os = "macos")'.dev-dependencies]
security-framework = "0.1" security-framework = "0.1"

View File

@@ -37,8 +37,8 @@ arguments to tarpc fns.
Add to your `Cargo.toml` dependencies: Add to your `Cargo.toml` dependencies:
```toml ```toml
tarpc = "0.7.2" tarpc = "0.9.0"
tarpc-plugins = "0.1.1" tarpc-plugins = "0.2.0"
``` ```
## Example: Sync ## Example: Sync
@@ -47,7 +47,7 @@ tarpc has two APIs: `sync` for blocking code and `future` for asynchronous
code. Here's how to use the sync api. code. Here's how to use the sync api.
```rust ```rust
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
#[macro_use] #[macro_use]
@@ -100,7 +100,7 @@ races! See the `tarpc_examples` package for more examples.
Here's the same service, implemented using futures. Here's the same service, implemented using futures.
```rust ```rust
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
extern crate futures; extern crate futures;
@@ -171,7 +171,7 @@ However, if you are working with both stream types, ensure that you use the TLS
servers and TCP clients with TCP servers. servers and TCP clients with TCP servers.
```rust,no_run ```rust,no_run
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
extern crate futures; extern crate futures;

View File

@@ -1,3 +1,32 @@
## 0.9.0 (2017-09-17)
## Breaking Changes
Updates tarpc to use tarpc-plugins 0.2.
## 0.8.0 (2017-05-05)
## Breaking Changes
This release updates tarpc to use serde 1.0.
As such, users must also update to use serde 1.0.
The serde 1.0 [release notes](https://github.com/serde-rs/serde/releases/tag/v1.0.0)
detail migration paths.
## 0.7.3 (2017-04-26)
This release removes the `Sync` bound on RPC args for both sync and future
clients. No breaking changes.
## 0.7.2 (2017-04-22)
## Breaking Changes
This release updates tarpc-plugins to work with rustc master. Thus, older
versions of rustc are no longer supported. We chose a minor version bump
because it is still source-compatible with existing code using tarpc.
## 0.7.1 (2017-03-31)
This release was purely doc fixes. No breaking changes.
## 0.7 (2017-03-31) ## 0.7 (2017-03-31)
## Breaking Changes ## Breaking Changes

View File

@@ -3,7 +3,7 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>. // Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms. // This file may not be copied, modified, or distributed except according to those terms.
#![feature(plugin, test)] #![feature(plugin, test, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
#[macro_use] #[macro_use]
@@ -40,13 +40,18 @@ impl FutureService for Server {
fn latency(bencher: &mut Bencher) { fn latency(bencher: &mut Bencher) {
let _ = env_logger::init(); let _ = env_logger::init();
let mut reactor = reactor::Core::new().unwrap(); let mut reactor = reactor::Core::new().unwrap();
let (handle, server) = Server.listen("localhost:0".first_socket_addr(), let (handle, server) = Server
&reactor.handle(), .listen(
server::Options::default()) "localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default(),
)
.unwrap(); .unwrap();
reactor.handle().spawn(server); reactor.handle().spawn(server);
let client = FutureClient::connect(handle.addr(), let client = FutureClient::connect(
client::Options::default().handle(reactor.handle())); handle.addr(),
client::Options::default().handle(reactor.handle()),
);
let client = reactor.run(client).unwrap(); let client = reactor.run(client).unwrap();
bencher.iter(|| reactor.run(client.ack()).unwrap()); bencher.iter(|| reactor.run(client.ack()).unwrap());

View File

@@ -3,7 +3,7 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>. // Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms. // This file may not be copied, modified, or distributed except according to those terms.
#![feature(inclusive_range_syntax, conservative_impl_trait, plugin, never_type)] #![feature(inclusive_range_syntax, conservative_impl_trait, plugin, never_type, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
extern crate chrono; extern crate chrono;
@@ -12,7 +12,7 @@ extern crate env_logger;
extern crate futures; extern crate futures;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
extern crate serde; extern crate serde_bytes;
#[macro_use] #[macro_use]
extern crate tarpc; extern crate tarpc;
extern crate tokio_core; extern crate tokio_core;
@@ -31,7 +31,7 @@ use tarpc::util::{FirstSocketAddr, Never};
use tokio_core::reactor; use tokio_core::reactor;
service! { service! {
rpc read(size: u32) -> serde::bytes::ByteBuf; rpc read(size: u32) -> serde_bytes::ByteBuf;
} }
#[derive(Clone)] #[derive(Clone)]
@@ -50,20 +50,19 @@ impl Server {
} }
impl FutureService for Server { impl FutureService for Server {
type ReadFut = CpuFuture<serde::bytes::ByteBuf, Never>; type ReadFut = CpuFuture<serde_bytes::ByteBuf, Never>;
fn read(&self, size: u32) -> Self::ReadFut { fn read(&self, size: u32) -> Self::ReadFut {
let request_number = self.request_count.fetch_add(1, Ordering::SeqCst); let request_number = self.request_count.fetch_add(1, Ordering::SeqCst);
debug!("Server received read({}) no. {}", size, request_number); debug!("Server received read({}) no. {}", size, request_number);
self.pool self.pool.spawn(futures::lazy(move || {
.spawn(futures::lazy(move || { let mut vec = Vec::with_capacity(size as usize);
let mut vec = Vec::with_capacity(size as usize); for i in 0..size {
for i in 0..size { vec.push(((i % 2) << 8) as u8);
vec.push(((i % 2) << 8) as u8); }
} debug!("Server sending response no. {}", request_number);
debug!("Server sending response no. {}", request_number); Ok(vec.into())
Ok(vec.into()) }))
}))
} }
} }
@@ -103,25 +102,30 @@ fn spawn_core() -> reactor::Remote {
rx.recv().unwrap() rx.recv().unwrap()
} }
fn run_once(clients: Vec<FutureClient>, fn run_once(
concurrency: u32) clients: Vec<FutureClient>,
-> impl Future<Item = (), Error = ()> + 'static { concurrency: u32,
) -> impl Future<Item = (), Error = ()> + 'static {
let start = Instant::now(); let start = Instant::now();
futures::stream::futures_unordered((0..concurrency as usize) futures::stream::futures_unordered(
(0..concurrency as usize)
.zip(clients.iter().enumerate().cycle()) .zip(clients.iter().enumerate().cycle())
.map(|(iteration, (client_idx, client))| { .map(|(iteration, (client_idx, client))| {
let start = Instant::now(); let start = Instant::now();
debug!("Client {} reading (iteration {})...", client_idx, iteration); debug!("Client {} reading (iteration {})...", client_idx, iteration);
client.read(CHUNK_SIZE) client
.read(CHUNK_SIZE)
.map(move |_| (client_idx, iteration, start)) .map(move |_| (client_idx, iteration, start))
})) }),
.map(|(client_idx, iteration, start)| { ).map(|(client_idx, iteration, start)| {
let elapsed = start.elapsed(); let elapsed = start.elapsed();
debug!("Client {} received reply (iteration {}).", debug!(
client_idx, "Client {} received reply (iteration {}).",
iteration); client_idx,
elapsed iteration
}) );
elapsed
})
.map_err(|e| panic!(e)) .map_err(|e| panic!(e))
.fold(Stats::default(), move |mut stats, elapsed| { .fold(Stats::default(), move |mut stats, elapsed| {
stats.sum += elapsed; stats.sum += elapsed;
@@ -131,46 +135,58 @@ fn run_once(clients: Vec<FutureClient>,
Ok(stats) Ok(stats)
}) })
.map(move |stats| { .map(move |stats| {
info!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs", info!(
stats.count, "{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs",
stats.sum.microseconds() as f64 / stats.count as f64, stats.count,
stats.min.unwrap().microseconds(), stats.sum.microseconds() as f64 / stats.count as f64,
stats.max.unwrap().microseconds(), stats.min.unwrap().microseconds(),
start.elapsed().microseconds()); stats.max.unwrap().microseconds(),
start.elapsed().microseconds()
);
}) })
} }
fn main() { fn main() {
let _ = env_logger::init(); let _ = env_logger::init();
let matches = App::new("Tarpc Concurrency") let matches = App::new("Tarpc Concurrency")
.about("Demonstrates making concurrent requests to a tarpc service.") .about(
.arg(Arg::with_name("concurrency") "Demonstrates making concurrent requests to a tarpc service.",
.short("c") )
.long("concurrency") .arg(
.value_name("LEVEL") Arg::with_name("concurrency")
.help("Sets a custom concurrency level") .short("c")
.takes_value(true)) .long("concurrency")
.arg(Arg::with_name("clients") .value_name("LEVEL")
.short("n") .help("Sets a custom concurrency level")
.long("num_clients") .takes_value(true),
.value_name("AMOUNT") )
.help("How many clients to distribute requests between") .arg(
.takes_value(true)) Arg::with_name("clients")
.short("n")
.long("num_clients")
.value_name("AMOUNT")
.help("How many clients to distribute requests between")
.takes_value(true),
)
.get_matches(); .get_matches();
let concurrency = matches.value_of("concurrency") let concurrency = matches
.value_of("concurrency")
.map(&str::parse) .map(&str::parse)
.map(Result::unwrap) .map(Result::unwrap)
.unwrap_or(10); .unwrap_or(10);
let num_clients = matches.value_of("clients") let num_clients = matches
.value_of("clients")
.map(&str::parse) .map(&str::parse)
.map(Result::unwrap) .map(Result::unwrap)
.unwrap_or(4); .unwrap_or(4);
let mut reactor = reactor::Core::new().unwrap(); let mut reactor = reactor::Core::new().unwrap();
let (handle, server) = Server::new() let (handle, server) = Server::new()
.listen("localhost:0".first_socket_addr(), .listen(
&reactor.handle(), "localhost:0".first_socket_addr(),
server::Options::default()) &reactor.handle(),
server::Options::default(),
)
.unwrap(); .unwrap();
reactor.handle().spawn(server); reactor.handle().spawn(server);
info!("Server listening on {}.", handle.addr()); info!("Server listening on {}.", handle.addr());

View File

@@ -3,7 +3,7 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>. // Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms. // This file may not be copied, modified, or distributed except according to those terms.
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
extern crate env_logger; extern crate env_logger;
@@ -58,10 +58,7 @@ impl subscriber::FutureService for Subscriber {
} }
impl Subscriber { impl Subscriber {
fn listen(id: u32, fn listen(id: u32, handle: &reactor::Handle, options: server::Options) -> server::Handle {
handle: &reactor::Handle,
options: server::Options)
-> server::Handle {
let (server_handle, server) = Subscriber { id: id } let (server_handle, server) = Subscriber { id: id }
.listen("localhost:0".first_socket_addr(), handle, options) .listen("localhost:0".first_socket_addr(), handle, options)
.unwrap(); .unwrap();
@@ -77,7 +74,9 @@ struct Publisher {
impl Publisher { impl Publisher {
fn new() -> Publisher { fn new() -> Publisher {
Publisher { clients: Rc::new(RefCell::new(HashMap::new())) } Publisher {
clients: Rc::new(RefCell::new(HashMap::new())),
}
} }
} }
@@ -102,13 +101,15 @@ impl publisher::FutureService for Publisher {
fn subscribe(&self, id: u32, address: SocketAddr) -> Self::SubscribeFut { fn subscribe(&self, id: u32, address: SocketAddr) -> Self::SubscribeFut {
let clients = self.clients.clone(); let clients = self.clients.clone();
Box::new(subscriber::FutureClient::connect(address, client::Options::default()) Box::new(
.map(move |subscriber| { subscriber::FutureClient::connect(address, client::Options::default())
println!("Subscribing {}.", id); .map(move |subscriber| {
clients.borrow_mut().insert(id, subscriber); println!("Subscribing {}.", id);
() clients.borrow_mut().insert(id, subscriber);
}) ()
.map_err(|e| e.to_string().into())) })
.map_err(|e| e.to_string().into()),
)
} }
type UnsubscribeFut = Box<Future<Item = (), Error = Never>>; type UnsubscribeFut = Box<Future<Item = (), Error = Never>>;
@@ -124,28 +125,36 @@ fn main() {
let _ = env_logger::init(); let _ = env_logger::init();
let mut reactor = reactor::Core::new().unwrap(); let mut reactor = reactor::Core::new().unwrap();
let (publisher_handle, server) = Publisher::new() let (publisher_handle, server) = Publisher::new()
.listen("localhost:0".first_socket_addr(), .listen(
&reactor.handle(), "localhost:0".first_socket_addr(),
server::Options::default()) &reactor.handle(),
server::Options::default(),
)
.unwrap(); .unwrap();
reactor.handle().spawn(server); reactor.handle().spawn(server);
let subscriber1 = Subscriber::listen(0, &reactor.handle(), server::Options::default()); let subscriber1 = Subscriber::listen(0, &reactor.handle(), server::Options::default());
let subscriber2 = Subscriber::listen(1, &reactor.handle(), server::Options::default()); let subscriber2 = Subscriber::listen(1, &reactor.handle(), server::Options::default());
let publisher = let publisher = reactor
reactor.run(publisher::FutureClient::connect(publisher_handle.addr(), .run(publisher::FutureClient::connect(
client::Options::default())) publisher_handle.addr(),
.unwrap(); client::Options::default(),
reactor.run(publisher.subscribe(0, subscriber1.addr()) ))
.and_then(|_| publisher.subscribe(1, subscriber2.addr())) .unwrap();
.map_err(|e| panic!(e)) reactor
.and_then(|_| { .run(
println!("Broadcasting..."); publisher
publisher.broadcast("hello to all".to_string()) .subscribe(0, subscriber1.addr())
}) .and_then(|_| publisher.subscribe(1, subscriber2.addr()))
.and_then(|_| publisher.unsubscribe(1)) .map_err(|e| panic!(e))
.and_then(|_| publisher.broadcast("hi again".to_string()))) .and_then(|_| {
println!("Broadcasting...");
publisher.broadcast("hello to all".to_string())
})
.and_then(|_| publisher.unsubscribe(1))
.and_then(|_| publisher.broadcast("hi again".to_string())),
)
.unwrap(); .unwrap();
thread::sleep(Duration::from_millis(300)); thread::sleep(Duration::from_millis(300));
} }

View File

@@ -3,15 +3,13 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>. // Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms. // This file may not be copied, modified, or distributed except according to those terms.
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
extern crate futures;
#[macro_use] #[macro_use]
extern crate tarpc; extern crate tarpc;
#[macro_use] #[macro_use]
extern crate serde_derive; extern crate serde_derive;
extern crate tokio_core;
use std::error::Error; use std::error::Error;
use std::fmt; use std::fmt;
@@ -55,7 +53,9 @@ impl SyncService for HelloServer {
fn main() { fn main() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
let handle = HelloServer.listen("localhost:10000", server::Options::default()).unwrap(); let handle = HelloServer
.listen("localhost:10000", server::Options::default())
.unwrap();
tx.send(handle.addr()).unwrap(); tx.send(handle.addr()).unwrap();
handle.run(); handle.run();
}); });

View File

@@ -3,7 +3,7 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>. // Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms. // This file may not be copied, modified, or distributed except according to those terms.
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
extern crate futures; extern crate futures;
@@ -34,16 +34,22 @@ impl FutureService for HelloServer {
fn main() { fn main() {
let mut reactor = reactor::Core::new().unwrap(); let mut reactor = reactor::Core::new().unwrap();
let (handle, server) = HelloServer.listen("localhost:10000".first_socket_addr(), let (handle, server) = HelloServer
&reactor.handle(), .listen(
server::Options::default()) "localhost:10000".first_socket_addr(),
&reactor.handle(),
server::Options::default(),
)
.unwrap(); .unwrap();
reactor.handle().spawn(server); reactor.handle().spawn(server);
let options = client::Options::default().handle(reactor.handle()); let options = client::Options::default().handle(reactor.handle());
reactor.run(FutureClient::connect(handle.addr(), options) reactor
.map_err(tarpc::Error::from) .run(
.and_then(|client| client.hello("Mom".to_string())) FutureClient::connect(handle.addr(), options)
.map(|resp| println!("{}", resp))) .map_err(tarpc::Error::from)
.and_then(|client| client.hello("Mom".to_string()))
.map(|resp| println!("{}", resp)),
)
.unwrap(); .unwrap();
} }

View File

@@ -4,13 +4,11 @@
// This file may not be copied, modified, or distributed except according to those terms. // This file may not be copied, modified, or distributed except according to those terms.
// required by `FutureClient` (not used directly in this example) // required by `FutureClient` (not used directly in this example)
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
extern crate futures;
#[macro_use] #[macro_use]
extern crate tarpc; extern crate tarpc;
extern crate tokio_core;
use std::sync::mpsc; use std::sync::mpsc;
use std::thread; use std::thread;
@@ -27,14 +25,20 @@ struct HelloServer;
impl SyncService for HelloServer { impl SyncService for HelloServer {
fn hello(&self, name: String) -> Result<String, Never> { fn hello(&self, name: String) -> Result<String, Never> {
Ok(format!("Hello from thread {}, {}!", thread::current().name().unwrap(), name)) Ok(format!(
"Hello from thread {}, {}!",
thread::current().name().unwrap(),
name
))
} }
} }
fn main() { fn main() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
let handle = HelloServer.listen("localhost:0", server::Options::default()).unwrap(); let handle = HelloServer
.listen("localhost:0", server::Options::default())
.unwrap();
tx.send(handle.addr()).unwrap(); tx.send(handle.addr()).unwrap();
handle.run(); handle.run();
}); });

View File

@@ -3,7 +3,7 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>. // Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms. // This file may not be copied, modified, or distributed except according to those terms.
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
extern crate env_logger; extern crate env_logger;
@@ -72,30 +72,43 @@ impl DoubleFutureService for DoubleServer {
fn main() { fn main() {
let _ = env_logger::init(); let _ = env_logger::init();
let mut reactor = reactor::Core::new().unwrap(); let mut reactor = reactor::Core::new().unwrap();
let (add, server) = AddServer.listen("localhost:0".first_socket_addr(), let (add, server) = AddServer
&reactor.handle(), .listen(
server::Options::default()) "localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default(),
)
.unwrap(); .unwrap();
reactor.handle().spawn(server); reactor.handle().spawn(server);
let options = client::Options::default().handle(reactor.handle()); let options = client::Options::default().handle(reactor.handle());
let add_client = reactor.run(add::FutureClient::connect(add.addr(), options)).unwrap(); let add_client = reactor
.run(add::FutureClient::connect(add.addr(), options))
.unwrap();
let (double, server) = DoubleServer::new(add_client) let (double, server) = DoubleServer::new(add_client)
.listen("localhost:0".first_socket_addr(), .listen(
&reactor.handle(), "localhost:0".first_socket_addr(),
server::Options::default()) &reactor.handle(),
server::Options::default(),
)
.unwrap(); .unwrap();
reactor.handle().spawn(server); reactor.handle().spawn(server);
let double_client = let double_client = reactor
reactor.run(double::FutureClient::connect(double.addr(), client::Options::default())) .run(double::FutureClient::connect(
.unwrap(); double.addr(),
reactor.run(futures::stream::futures_unordered((0..5).map(|i| double_client.double(i))) client::Options::default(),
.map_err(|e| println!("{}", e)) ))
.for_each(|i| { .unwrap();
println!("{:?}", i); reactor
Ok(()) .run(
})) futures::stream::futures_unordered((0..5).map(|i| double_client.double(i)))
.map_err(|e| println!("{}", e))
.for_each(|i| {
println!("{:?}", i);
Ok(())
}),
)
.unwrap(); .unwrap();
} }

View File

@@ -3,14 +3,12 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>. // Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms. // This file may not be copied, modified, or distributed except according to those terms.
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
extern crate env_logger; extern crate env_logger;
#[macro_use] #[macro_use]
extern crate tarpc; extern crate tarpc;
extern crate futures;
extern crate tokio_core;
use add::{SyncService as AddSyncService, SyncServiceExt as AddExt}; use add::{SyncService as AddSyncService, SyncServiceExt as AddExt};
use double::{SyncService as DoubleSyncService, SyncServiceExt as DoubleExt}; use double::{SyncService as DoubleSyncService, SyncServiceExt as DoubleExt};
@@ -58,9 +56,7 @@ impl DoubleServer {
impl DoubleSyncService for DoubleServer { impl DoubleSyncService for DoubleServer {
fn double(&self, x: i32) -> Result<i32, Message> { fn double(&self, x: i32) -> Result<i32, Message> {
self.client self.client.add(x, x).map_err(|e| e.to_string().into())
.add(x, x)
.map_err(|e| e.to_string().into())
} }
} }
@@ -68,8 +64,11 @@ fn main() {
let _ = env_logger::init(); let _ = env_logger::init();
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
let handle = AddServer.listen("localhost:0".first_socket_addr(), let handle = AddServer
server::Options::default()) .listen(
"localhost:0".first_socket_addr(),
server::Options::default(),
)
.unwrap(); .unwrap();
tx.send(handle.addr()).unwrap(); tx.send(handle.addr()).unwrap();
handle.run(); handle.run();
@@ -81,8 +80,10 @@ fn main() {
thread::spawn(move || { thread::spawn(move || {
let add_client = add::SyncClient::connect(add, client::Options::default()).unwrap(); let add_client = add::SyncClient::connect(add, client::Options::default()).unwrap();
let handle = DoubleServer::new(add_client) let handle = DoubleServer::new(add_client)
.listen("localhost:0".first_socket_addr(), .listen(
server::Options::default()) "localhost:0".first_socket_addr(),
server::Options::default(),
)
.unwrap(); .unwrap();
tx.send(handle.addr()).unwrap(); tx.send(handle.addr()).unwrap();
handle.run(); handle.run();

View File

@@ -3,7 +3,7 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>. // Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms. // This file may not be copied, modified, or distributed except according to those terms.
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
#[macro_use] #[macro_use]
@@ -11,13 +11,11 @@ extern crate lazy_static;
#[macro_use] #[macro_use]
extern crate tarpc; extern crate tarpc;
extern crate env_logger; extern crate env_logger;
extern crate futures; extern crate serde_bytes;
extern crate serde;
extern crate tokio_core; extern crate tokio_core;
use std::io::{Read, Write, stdout}; use std::io::{Read, Write, stdout};
use std::net; use std::net;
use std::sync::Arc;
use std::sync::mpsc; use std::sync::mpsc;
use std::thread; use std::thread;
use std::time; use std::time;
@@ -27,7 +25,7 @@ use tarpc::util::{FirstSocketAddr, Never};
use tokio_core::reactor; use tokio_core::reactor;
lazy_static! { lazy_static! {
static ref BUF: Arc<serde::bytes::ByteBuf> = Arc::new(gen_vec(CHUNK_SIZE as usize).into()); static ref BUF: serde_bytes::ByteBuf = gen_vec(CHUNK_SIZE as usize).into();
} }
fn gen_vec(size: usize) -> Vec<u8> { fn gen_vec(size: usize) -> Vec<u8> {
@@ -39,14 +37,14 @@ fn gen_vec(size: usize) -> Vec<u8> {
} }
service! { service! {
rpc read() -> Arc<serde::bytes::ByteBuf>; rpc read() -> serde_bytes::ByteBuf;
} }
#[derive(Clone)] #[derive(Clone)]
struct Server; struct Server;
impl FutureService for Server { impl FutureService for Server {
type ReadFut = Result<Arc<serde::bytes::ByteBuf>, Never>; type ReadFut = Result<serde_bytes::ByteBuf, Never>;
fn read(&self) -> Self::ReadFut { fn read(&self) -> Self::ReadFut {
Ok(BUF.clone()) Ok(BUF.clone())
@@ -59,15 +57,18 @@ fn bench_tarpc(target: u64) {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
let mut reactor = reactor::Core::new().unwrap(); let mut reactor = reactor::Core::new().unwrap();
let (addr, server) = Server.listen("localhost:0".first_socket_addr(), let (addr, server) = Server
&reactor.handle(), .listen(
server::Options::default()) "localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default(),
)
.unwrap(); .unwrap();
tx.send(addr).unwrap(); tx.send(addr).unwrap();
reactor.run(server).unwrap(); reactor.run(server).unwrap();
}); });
let client = SyncClient::connect(rx.recv().unwrap().addr(), client::Options::default()) let client =
.unwrap(); SyncClient::connect(rx.recv().unwrap().addr(), client::Options::default()).unwrap();
let start = time::Instant::now(); let start = time::Instant::now();
let mut nread = 0; let mut nread = 0;
while nread < target { while nread < target {
@@ -77,9 +78,11 @@ fn bench_tarpc(target: u64) {
} }
println!("done"); println!("done");
let duration = time::Instant::now() - start; let duration = time::Instant::now() - start;
println!("TARPC: {}MB/s", println!(
(target as f64 / (1024f64 * 1024f64)) / "TARPC: {}MB/s",
(duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 10E9)); (target as f64 / (1024f64 * 1024f64)) /
(duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 10E9)
);
} }
fn bench_tcp(target: u64) { fn bench_tcp(target: u64) {
@@ -101,9 +104,11 @@ fn bench_tcp(target: u64) {
} }
println!("done"); println!("done");
let duration = time::Instant::now() - start; let duration = time::Instant::now() - start;
println!("TCP: {}MB/s", println!(
(target as f64 / (1024f64 * 1024f64)) / "TCP: {}MB/s",
(duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 10E9)); (target as f64 / (1024f64 * 1024f64)) /
(duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 10E9)
);
} }
fn main() { fn main() {

View File

@@ -3,16 +3,14 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>. // Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms. // This file may not be copied, modified, or distributed except according to those terms.
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
#[macro_use] #[macro_use]
extern crate log; extern crate log;
#[macro_use] #[macro_use]
extern crate tarpc; extern crate tarpc;
extern crate bincode;
extern crate env_logger; extern crate env_logger;
extern crate futures;
extern crate tokio_core; extern crate tokio_core;
use bar::FutureServiceExt as BarExt; use bar::FutureServiceExt as BarExt;
@@ -57,20 +55,17 @@ impl baz::FutureService for Baz {
} }
} }
macro_rules! pos {
() => (concat!(file!(), ":", line!()))
}
fn main() { fn main() {
let _ = env_logger::init(); let _ = env_logger::init();
let bar_client = { let bar_client = {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
let mut reactor = reactor::Core::new().unwrap(); let mut reactor = reactor::Core::new().unwrap();
let (handle, server) = Bar.listen("localhost:0".first_socket_addr(), let (handle, server) = Bar.listen(
&reactor.handle(), "localhost:0".first_socket_addr(),
server::Options::default()) &reactor.handle(),
.unwrap(); server::Options::default(),
).unwrap();
tx.send(handle).unwrap(); tx.send(handle).unwrap();
reactor.run(server).unwrap(); reactor.run(server).unwrap();
}); });
@@ -82,10 +77,11 @@ fn main() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
let mut reactor = reactor::Core::new().unwrap(); let mut reactor = reactor::Core::new().unwrap();
let (handle, server) = Baz.listen("localhost:0".first_socket_addr(), let (handle, server) = Baz.listen(
&reactor.handle(), "localhost:0".first_socket_addr(),
server::Options::default()) &reactor.handle(),
.unwrap(); server::Options::default(),
).unwrap();
tx.send(handle).unwrap(); tx.send(handle).unwrap();
reactor.run(server).unwrap(); reactor.run(server).unwrap();
}); });

View File

@@ -67,7 +67,7 @@ else
fi fi
printf "${PREFIX} Checking for rustfmt ... " printf "${PREFIX} Checking for rustfmt ... "
command -v rustfmt &>/dev/null command -v cargo fmt &>/dev/null
if [ $? == 0 ]; then if [ $? == 0 ]; then
printf "${SUCCESS}\n" printf "${SUCCESS}\n"
else else
@@ -93,7 +93,7 @@ diff=""
for file in $(git diff --name-only --cached); for file in $(git diff --name-only --cached);
do do
if [ ${file: -3} == ".rs" ]; then if [ ${file: -3} == ".rs" ]; then
diff="$diff$(rustfmt --skip-children --write-mode=diff $file)" diff="$diff$(cargo fmt -- --skip-children --write-mode=diff $file)"
fi fi
done done
if grep --quiet "^Diff at line" <<< "$diff"; then if grep --quiet "^Diff at line" <<< "$diff"; then

View File

@@ -29,7 +29,7 @@ pub enum Error<E> {
App(E), App(E),
} }
impl<E: StdError + Deserialize + Serialize + Send + 'static> fmt::Display for Error<E> { impl<'a, E: StdError + Deserialize<'a> + Serialize + Send + 'static> fmt::Display for Error<E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self { match *self {
Error::ResponseDeserialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e), Error::ResponseDeserialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e),
@@ -40,7 +40,7 @@ impl<E: StdError + Deserialize + Serialize + Send + 'static> fmt::Display for Er
} }
} }
impl<E: StdError + Deserialize + Serialize + Send + 'static> StdError for Error<E> { impl<'a, E: StdError + Deserialize<'a> + Serialize + Send + 'static> StdError for Error<E> {
fn description(&self) -> &str { fn description(&self) -> &str {
match *self { match *self {
Error::ResponseDeserialize(_) => "The client failed to deserialize the response.", Error::ResponseDeserialize(_) => "The client failed to deserialize the response.",
@@ -53,8 +53,7 @@ impl<E: StdError + Deserialize + Serialize + Send + 'static> StdError for Error<
fn cause(&self) -> Option<&StdError> { fn cause(&self) -> Option<&StdError> {
match *self { match *self {
Error::ResponseDeserialize(ref e) => e.cause(), Error::ResponseDeserialize(ref e) => e.cause(),
Error::RequestDeserialize(_) | Error::RequestDeserialize(_) | Error::App(_) => None,
Error::App(_) => None,
Error::Io(ref e) => e.cause(), Error::Io(ref e) => e.cause(),
} }
} }

View File

@@ -7,7 +7,8 @@ use {REMOTE, bincode};
use future::server::Response; use future::server::Response;
use futures::{self, Future, future}; use futures::{self, Future, future};
use protocol::Proto; use protocol::Proto;
use serde::{Deserialize, Serialize}; use serde::Serialize;
use serde::de::DeserializeOwned;
use std::fmt; use std::fmt;
use std::io; use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
@@ -102,27 +103,32 @@ impl fmt::Debug for Reactor {
} }
#[doc(hidden)] #[doc(hidden)]
pub struct Client<Req, Resp, E> pub struct Client<Req, Resp, E>
where Req: Serialize + 'static, where
Resp: Deserialize + 'static, Req: Serialize + 'static,
E: Deserialize + 'static Resp: DeserializeOwned + 'static,
E: DeserializeOwned + 'static,
{ {
inner: ClientService<StreamType, Proto<Req, Response<Resp, E>>>, inner: ClientService<StreamType, Proto<Req, Response<Resp, E>>>,
} }
impl<Req, Resp, E> Clone for Client<Req, Resp, E> impl<Req, Resp, E> Clone for Client<Req, Resp, E>
where Req: Serialize + 'static, where
Resp: Deserialize + 'static, Req: Serialize + 'static,
E: Deserialize + 'static Resp: DeserializeOwned + 'static,
E: DeserializeOwned + 'static,
{ {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Client { inner: self.inner.clone() } Client {
inner: self.inner.clone(),
}
} }
} }
impl<Req, Resp, E> Service for Client<Req, Resp, E> impl<Req, Resp, E> Service for Client<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static, where
Resp: Deserialize + Sync + Send + 'static, Req: Serialize + Send + 'static,
E: Deserialize + Sync + Send + 'static Resp: DeserializeOwned + Send + 'static,
E: DeserializeOwned + Send + 'static,
{ {
type Request = Req; type Request = Req;
type Response = Resp; type Response = Resp;
@@ -142,14 +148,16 @@ impl<Req, Resp, E> Service for Client<Req, Resp, E>
} }
impl<Req, Resp, E> Client<Req, Resp, E> impl<Req, Resp, E> Client<Req, Resp, E>
where Req: Serialize + 'static, where
Resp: Deserialize + 'static, Req: Serialize + 'static,
E: Deserialize + 'static Resp: DeserializeOwned + 'static,
E: DeserializeOwned + 'static,
{ {
fn bind(handle: &reactor::Handle, tcp: StreamType, max_payload_size: u64) -> Self fn bind(handle: &reactor::Handle, tcp: StreamType, max_payload_size: u64) -> Self
where Req: Serialize + Sync + Send + 'static, where
Resp: Deserialize + Sync + Send + 'static, Req: Serialize + Send + 'static,
E: Deserialize + Sync + Send + 'static Resp: DeserializeOwned + Send + 'static,
E: DeserializeOwned + Send + 'static,
{ {
let inner = Proto::new(max_payload_size).bind_client(&handle, tcp); let inner = Proto::new(max_payload_size).bind_client(&handle, tcp);
Client { inner } Client { inner }
@@ -163,9 +171,10 @@ impl<Req, Resp, E> Client<Req, Resp, E>
} }
impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E> impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E>
where Req: Serialize + 'static, where
Resp: Deserialize + 'static, Req: Serialize + 'static,
E: Deserialize + 'static Resp: DeserializeOwned + 'static,
E: DeserializeOwned + 'static,
{ {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "Client {{ .. }}") write!(f, "Client {{ .. }}")
@@ -182,14 +191,18 @@ pub trait ClientExt: Sized {
} }
/// A future that resolves to a `Client` or an `io::Error`. /// A future that resolves to a `Client` or an `io::Error`.
pub type ConnectFuture<Req, Resp, E> = pub type ConnectFuture<Req, Resp, E> = futures::Flatten<
futures::Flatten<futures::MapErr<futures::Oneshot<io::Result<Client<Req, Resp, E>>>, futures::MapErr<
fn(futures::Canceled) -> io::Error>>; futures::Oneshot<io::Result<Client<Req, Resp, E>>>,
fn(futures::Canceled) -> io::Error,
>,
>;
impl<Req, Resp, E> ClientExt for Client<Req, Resp, E> impl<Req, Resp, E> ClientExt for Client<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static, where
Resp: Deserialize + Sync + Send + 'static, Req: Serialize + Send + 'static,
E: Deserialize + Sync + Send + 'static Resp: DeserializeOwned + Send + 'static,
E: DeserializeOwned + Send + 'static,
{ {
type ConnectFut = ConnectFuture<Req, Resp, E>; type ConnectFut = ConnectFuture<Req, Resp, E>;
@@ -212,15 +225,17 @@ impl<Req, Resp, E> ClientExt for Client<Req, Resp, E>
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
match tls_ctx { match tls_ctx {
Some(tls_ctx) => { Some(tls_ctx) => {
future::Either::A(tls_ctx.tls_connector future::Either::A(
.connect_async(&tls_ctx.domain, socket) tls_ctx
.map(StreamType::Tls) .tls_connector
.map_err(native_to_io)) .connect_async(&tls_ctx.domain, socket)
.map(StreamType::Tls)
.map_err(native_to_io),
)
} }
None => future::Either::B(future::ok(StreamType::Tcp(socket))), None => future::Either::B(future::ok(StreamType::Tcp(socket))),
} }
#[cfg(not(feature = "tls"))] #[cfg(not(feature = "tls"))] future::ok(StreamType::Tcp(socket))
future::ok(StreamType::Tcp(socket))
}) })
.map(move |tcp| Client::bind(&handle2, tcp, max_payload_size)) .map(move |tcp| Client::bind(&handle2, tcp, max_payload_size))
}; };

View File

@@ -7,14 +7,15 @@ use {bincode, net2};
use errors::WireError; use errors::WireError;
use futures::{Async, Future, Poll, Stream, future as futures}; use futures::{Async, Future, Poll, Stream, future as futures};
use protocol::Proto; use protocol::Proto;
use serde::{Deserialize, Serialize}; use serde::Serialize;
use serde::de::DeserializeOwned;
use std::fmt; use std::fmt;
use std::io; use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
use stream_type::StreamType; use stream_type::StreamType;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::net::{Incoming, TcpListener, TcpStream}; use tokio_core::net::{Incoming, TcpListener, TcpStream};
use tokio_core::reactor; use tokio_core::reactor;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_proto::BindServer; use tokio_proto::BindServer;
use tokio_service::NewService; use tokio_service::NewService;
@@ -58,10 +59,13 @@ enum Acceptor {
struct Accept { struct Accept {
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
inner: futures::Either<futures::MapErr<futures::Map<AcceptAsync<TcpStream>, inner: futures::Either<
fn(TlsStream<TcpStream>) -> StreamType>, futures::MapErr<
fn(native_tls::Error) -> io::Error>, futures::Map<AcceptAsync<TcpStream>, fn(TlsStream<TcpStream>) -> StreamType>,
futures::FutureResult<StreamType, io::Error>>, fn(native_tls::Error) -> io::Error,
>,
futures::FutureResult<StreamType, io::Error>,
>,
#[cfg(not(feature = "tls"))] #[cfg(not(feature = "tls"))]
inner: futures::FutureResult<StreamType, io::Error>, inner: futures::FutureResult<StreamType, io::Error>,
} }
@@ -82,19 +86,22 @@ impl Acceptor {
Accept { Accept {
inner: match *self { inner: match *self {
Acceptor::Tls(ref tls_acceptor) => { Acceptor::Tls(ref tls_acceptor) => {
futures::Either::A(tls_acceptor.accept_async(socket) futures::Either::A(
.map(StreamType::Tls as _) tls_acceptor
.map_err(native_to_io)) .accept_async(socket)
.map(StreamType::Tls as _)
.map_err(native_to_io),
)
} }
Acceptor::Tcp => futures::Either::B(futures::ok(StreamType::Tcp(socket))), Acceptor::Tcp => futures::Either::B(futures::ok(StreamType::Tcp(socket))),
} },
} }
} }
#[cfg(not(feature = "tls"))] #[cfg(not(feature = "tls"))]
fn accept(&self, socket: TcpStream) -> Accept { fn accept(&self, socket: TcpStream) -> Accept {
Accept { Accept {
inner: futures::ok(StreamType::Tcp(socket)) inner: futures::ok(StreamType::Tcp(socket)),
} }
} }
} }
@@ -144,7 +151,8 @@ struct AcceptStream<S> {
} }
impl<S> Stream for AcceptStream<S> impl<S> Stream for AcceptStream<S>
where S: Stream<Item=(TcpStream, SocketAddr), Error = io::Error>, where
S: Stream<Item = (TcpStream, SocketAddr), Error = io::Error>,
{ {
type Item = <Accept as Future>::Item; type Item = <Accept as Future>::Item;
type Error = io::Error; type Error = io::Error;
@@ -167,7 +175,7 @@ impl<S> Stream for AcceptStream<S>
self.future = None; self.future = None;
Err(e) Err(e)
} }
Ok(Async::NotReady) => Ok(Async::NotReady) Ok(Async::NotReady) => Ok(Async::NotReady),
} }
} }
} }
@@ -221,7 +229,14 @@ impl fmt::Debug for Options {
let mut debug_struct = fmt.debug_struct("Options"); let mut debug_struct = fmt.debug_struct("Options");
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
debug_struct.field("tls_acceptor", if self.tls_acceptor.is_some() { SOME } else { NONE }); debug_struct.field(
"tls_acceptor",
if self.tls_acceptor.is_some() {
SOME
} else {
NONE
},
);
debug_struct.finish() debug_struct.finish()
} }
} }
@@ -239,17 +254,24 @@ pub fn listen<S, Req, Resp, E>(new_service: S,
where S: NewService<Request = Result<Req, bincode::Error>, where S: NewService<Request = Result<Req, bincode::Error>,
Response = Response<Resp, E>, Response = Response<Resp, E>,
Error = io::Error> + 'static, Error = io::Error> + 'static,
Req: Deserialize + 'static, Req: DeserializeOwned + 'static,
Resp: Serialize + 'static, Resp: Serialize + 'static,
E: Serialize + 'static E: Serialize + 'static
{ {
let (addr, shutdown, server) = listen_with( let (addr, shutdown, server) = listen_with(
new_service, addr, handle, options.max_payload_size, Acceptor::from(options))?; new_service,
Ok((Handle { addr,
handle,
options.max_payload_size,
Acceptor::from(options),
)?;
Ok((
Handle {
addr: addr, addr: addr,
shutdown: shutdown, shutdown: shutdown,
}, },
server)) server,
))
} }
/// Spawns a service that binds to the given address using the given handle. /// Spawns a service that binds to the given address using the given handle.
@@ -262,7 +284,7 @@ fn listen_with<S, Req, Resp, E>(new_service: S,
where S: NewService<Request = Result<Req, bincode::Error>, where S: NewService<Request = Result<Req, bincode::Error>,
Response = Response<Resp, E>, Response = Response<Resp, E>,
Error = io::Error> + 'static, Error = io::Error> + 'static,
Req: Deserialize + 'static, Req: DeserializeOwned + 'static,
Resp: Serialize + 'static, Resp: Serialize + 'static,
E: Serialize + 'static E: Serialize + 'static
{ {
@@ -299,7 +321,8 @@ fn listener(addr: &SocketAddr, handle: &reactor::Handle) -> io::Result<TcpListen
}?; }?;
configure_tcp(&builder)?; configure_tcp(&builder)?;
builder.reuse_address(true)?; builder.reuse_address(true)?;
builder.bind(addr)? builder
.bind(addr)?
.listen(PENDING_CONNECTION_BACKLOG) .listen(PENDING_CONNECTION_BACKLOG)
.and_then(|l| TcpListener::from_listener(l, addr, handle)) .and_then(|l| TcpListener::from_listener(l, addr, handle))
} }
@@ -324,16 +347,17 @@ struct BindStream<S, St> {
} }
impl<S, St> fmt::Debug for BindStream<S, St> impl<S, St> fmt::Debug for BindStream<S, St>
where S: fmt::Debug, where
St: fmt::Debug, S: fmt::Debug,
St: fmt::Debug,
{ {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
const HANDLE: &'static &'static str = &"Handle { .. }"; const HANDLE: &'static &'static str = &"Handle { .. }";
f.debug_struct("BindStream") f.debug_struct("BindStream")
.field("handle", HANDLE) .field("handle", HANDLE)
.field("new_service", &self.new_service) .field("new_service", &self.new_service)
.field("stream", &self.stream) .field("stream", &self.stream)
.finish() .finish()
} }
} }
@@ -341,18 +365,19 @@ impl<S, Req, Resp, E, I, St> BindStream<S, St>
where S: NewService<Request = Result<Req, bincode::Error>, where S: NewService<Request = Result<Req, bincode::Error>,
Response = Response<Resp, E>, Response = Response<Resp, E>,
Error = io::Error> + 'static, Error = io::Error> + 'static,
Req: Deserialize + 'static, Req: DeserializeOwned + 'static,
Resp: Serialize + 'static, Resp: Serialize + 'static,
E: Serialize + 'static, E: Serialize + 'static,
I: AsyncRead + AsyncWrite + 'static, I: AsyncRead + AsyncWrite + 'static,
St: Stream<Item=I, Error=io::Error>, St: Stream<Item = I, Error = io::Error>
{ {
fn bind_each(&mut self) -> Poll<(), io::Error> { fn bind_each(&mut self) -> Poll<(), io::Error> {
loop { loop {
match try!(self.stream.poll()) { match try!(self.stream.poll()) {
Async::Ready(Some(socket)) => { Async::Ready(Some(socket)) => {
Proto::new(self.max_payload_size) Proto::new(self.max_payload_size).bind_server(&self.handle,
.bind_server(&self.handle, socket, self.new_service.new_service()?); socket,
self.new_service.new_service()?);
} }
Async::Ready(None) => return Ok(Async::Ready(())), Async::Ready(None) => return Ok(Async::Ready(())),
Async::NotReady => return Ok(Async::NotReady), Async::NotReady => return Ok(Async::NotReady),
@@ -365,11 +390,11 @@ impl<S, Req, Resp, E, I, St> Future for BindStream<S, St>
where S: NewService<Request = Result<Req, bincode::Error>, where S: NewService<Request = Result<Req, bincode::Error>,
Response = Response<Resp, E>, Response = Response<Resp, E>,
Error = io::Error> + 'static, Error = io::Error> + 'static,
Req: Deserialize + 'static, Req: DeserializeOwned + 'static,
Resp: Serialize + 'static, Resp: Serialize + 'static,
E: Serialize + 'static, E: Serialize + 'static,
I: AsyncRead + AsyncWrite + 'static, I: AsyncRead + AsyncWrite + 'static,
St: Stream<Item=I, Error=io::Error>, St: Stream<Item = I, Error = io::Error>
{ {
type Item = (); type Item = ();
type Error = (); type Error = ();
@@ -392,19 +417,18 @@ pub struct Listen<S, Req, Resp, E>
where S: NewService<Request = Result<Req, bincode::Error>, where S: NewService<Request = Result<Req, bincode::Error>,
Response = Response<Resp, E>, Response = Response<Resp, E>,
Error = io::Error> + 'static, Error = io::Error> + 'static,
Req: Deserialize + 'static, Req: DeserializeOwned + 'static,
Resp: Serialize + 'static, Resp: Serialize + 'static,
E: Serialize + 'static E: Serialize + 'static
{ {
inner: AlwaysOkUnit<futures::Select<BindStream<S, AcceptStream<Incoming>>, inner: AlwaysOkUnit<futures::Select<BindStream<S, AcceptStream<Incoming>>, shutdown::Watcher>>,
shutdown::Watcher>>,
} }
impl<S, Req, Resp, E> Future for Listen<S, Req, Resp, E> impl<S, Req, Resp, E> Future for Listen<S, Req, Resp, E>
where S: NewService<Request = Result<Req, bincode::Error>, where S: NewService<Request = Result<Req, bincode::Error>,
Response = Response<Resp, E>, Response = Response<Resp, E>,
Error = io::Error> + 'static, Error = io::Error> + 'static,
Req: Deserialize + 'static, Req: DeserializeOwned + 'static,
Resp: Serialize + 'static, Resp: Serialize + 'static,
E: Serialize + 'static E: Serialize + 'static
{ {
@@ -420,9 +444,9 @@ impl<S, Req, Resp, E> fmt::Debug for Listen<S, Req, Resp, E>
where S: NewService<Request = Result<Req, bincode::Error>, where S: NewService<Request = Result<Req, bincode::Error>,
Response = Response<Resp, E>, Response = Response<Resp, E>,
Error = io::Error> + 'static, Error = io::Error> + 'static,
Req: Deserialize + 'static, Req: DeserializeOwned + 'static,
Resp: Serialize + 'static, Resp: Serialize + 'static,
E: Serialize + 'static, E: Serialize + 'static
{ {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_struct("Listen").finish() f.debug_struct("Listen").finish()
@@ -433,7 +457,8 @@ impl<S, Req, Resp, E> fmt::Debug for Listen<S, Req, Resp, E>
struct AlwaysOkUnit<F>(F); struct AlwaysOkUnit<F>(F);
impl<F> Future for AlwaysOkUnit<F> impl<F> Future for AlwaysOkUnit<F>
where F: Future, where
F: Future,
{ {
type Item = (); type Item = ();
type Error = (); type Error = ();
@@ -445,4 +470,3 @@ impl<F> Future for AlwaysOkUnit<F>
} }
} }
} }

View File

@@ -1,9 +1,10 @@
use super::{AlwaysOkUnit, connection};
use futures::{Async, Future, Poll, Stream, future as futures, stream}; use futures::{Async, Future, Poll, Stream, future as futures, stream};
use futures::sync::{mpsc, oneshot}; use futures::sync::{mpsc, oneshot};
use futures::unsync; use futures::unsync;
use super::{AlwaysOkUnit, connection};
/// A hook to shut down a running server. /// A hook to shut down a running server.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Shutdown { pub struct Shutdown {
@@ -13,8 +14,7 @@ pub struct Shutdown {
/// A future that resolves when server shutdown completes. /// A future that resolves when server shutdown completes.
#[derive(Debug)] #[derive(Debug)]
pub struct ShutdownFuture { pub struct ShutdownFuture {
inner: futures::Either<futures::FutureResult<(), ()>, inner: futures::Either<futures::FutureResult<(), ()>, AlwaysOkUnit<oneshot::Receiver<()>>>,
AlwaysOkUnit<oneshot::Receiver<()>>>,
} }
impl Future for ShutdownFuture { impl Future for ShutdownFuture {
@@ -60,16 +60,18 @@ impl Watcher {
pub fn triple() -> (connection::Tracker, Shutdown, Self) { pub fn triple() -> (connection::Tracker, Shutdown, Self) {
let (connection_tx, connections) = connection::Tracker::pair(); let (connection_tx, connections) = connection::Tracker::pair();
let (shutdown_tx, shutdown_rx) = mpsc::unbounded(); let (shutdown_tx, shutdown_rx) = mpsc::unbounded();
(connection_tx, (
Shutdown { tx: shutdown_tx }, connection_tx,
Watcher { Shutdown { tx: shutdown_tx },
shutdown_rx: shutdown_rx.take(1), Watcher {
connections: connections, shutdown_rx: shutdown_rx.take(1),
queued_error: None, connections: connections,
shutdown: None, queued_error: None,
done: false, shutdown: None,
num_connections: 0, done: false,
}) num_connections: 0,
},
)
} }
fn process_connection(&mut self, action: connection::Action) { fn process_connection(&mut self, action: connection::Action) {
@@ -102,7 +104,7 @@ impl Watcher {
fn poll_shutdown_requests_and_connections(&mut self) -> Poll<Option<()>, ()> { fn poll_shutdown_requests_and_connections(&mut self) -> Poll<Option<()>, ()> {
if let Some(e) = self.queued_error.take() { if let Some(e) = self.queued_error.take() {
return Err(e) return Err(e);
} }
match try!(self.poll_shutdown_requests()) { match try!(self.poll_shutdown_requests()) {
@@ -178,4 +180,3 @@ impl Future for Watcher {
} }
} }
} }

View File

@@ -27,7 +27,7 @@
//! Example usage: //! Example usage:
//! //!
//! ``` //! ```
//! #![feature(plugin)] //! #![feature(plugin, use_extern_macros)]
//! #![plugin(tarpc_plugins)] //! #![plugin(tarpc_plugins)]
//! //!
//! #[macro_use] //! #[macro_use]
@@ -71,7 +71,7 @@
//! Example usage with TLS: //! Example usage with TLS:
//! //!
//! ```no-run //! ```no-run
//! #![feature(plugin)] //! #![feature(plugin, use_extern_macros)]
//! #![plugin(tarpc_plugins)] //! #![plugin(tarpc_plugins)]
//! //!
//! #[macro_use] //! #[macro_use]
@@ -116,7 +116,7 @@
#![deny(missing_docs, missing_debug_implementations)] #![deny(missing_docs, missing_debug_implementations)]
#![feature(never_type)] #![feature(never_type)]
#![cfg_attr(test, feature(plugin))] #![cfg_attr(test, feature(plugin, use_extern_macros))]
#![cfg_attr(test, plugin(tarpc_plugins))] #![cfg_attr(test, plugin(tarpc_plugins))]
extern crate byteorder; extern crate byteorder;
@@ -129,8 +129,6 @@ extern crate lazy_static;
extern crate log; extern crate log;
extern crate net2; extern crate net2;
extern crate num_cpus; extern crate num_cpus;
#[macro_use]
extern crate serde_derive;
extern crate thread_pool; extern crate thread_pool;
extern crate tokio_io; extern crate tokio_io;
@@ -142,6 +140,9 @@ pub extern crate futures;
#[doc(hidden)] #[doc(hidden)]
pub extern crate serde; pub extern crate serde;
#[doc(hidden)] #[doc(hidden)]
#[macro_use]
pub extern crate serde_derive;
#[doc(hidden)]
pub extern crate tokio_core; pub extern crate tokio_core;
#[doc(hidden)] #[doc(hidden)]
pub extern crate tokio_proto; pub extern crate tokio_proto;

View File

@@ -9,158 +9,12 @@ macro_rules! as_item {
($i:item) => {$i}; ($i:item) => {$i};
} }
#[doc(hidden)]
#[macro_export]
macro_rules! impl_serialize {
($impler:ident, { $($lifetime:tt)* }, $(@($name:ident $n:expr))* -- #($n_:expr) ) => {
as_item! {
impl$($lifetime)* $crate::serde::Serialize for $impler$($lifetime)* {
fn serialize<S>(&self, impl_serialize_serializer__: S)
-> ::std::result::Result<S::Ok, S::Error>
where S: $crate::serde::Serializer
{
match *self {
$(
$impler::$name(ref impl_serialize_field__) =>
$crate::serde::Serializer::serialize_newtype_variant(
impl_serialize_serializer__,
stringify!($impler),
$n,
stringify!($name),
impl_serialize_field__,
)
),*
}
}
}
}
};
// All args are wrapped in a tuple so we can use the newtype variant for each one.
($impler:ident,
{ $($lifetime:tt)* },
$(@$finished:tt)*
-- #($n:expr) $name:ident($field:ty) $($req:tt)*) =>
(
impl_serialize!($impler,
{ $($lifetime)* },
$(@$finished)* @($name $n)
-- #($n + 1) $($req)*);
);
// Entry
($impler:ident,
{ $($lifetime:tt)* },
$($started:tt)*) => (impl_serialize!($impler, { $($lifetime)* }, -- #(0) $($started)*););
}
#[doc(hidden)]
#[macro_export]
macro_rules! impl_deserialize {
($impler:ident, $(@($name:ident $n:expr))* -- #($n_:expr) ) => (
impl $crate::serde::Deserialize for $impler {
#[allow(non_camel_case_types)]
fn deserialize<impl_deserialize_D__>(
impl_deserialize_deserializer__: impl_deserialize_D__)
-> ::std::result::Result<$impler, impl_deserialize_D__::Error>
where impl_deserialize_D__: $crate::serde::Deserializer
{
#[allow(non_camel_case_types, unused)]
enum impl_deserialize_Field__ {
$($name),*
}
impl $crate::serde::Deserialize for impl_deserialize_Field__ {
fn deserialize<D>(impl_deserialize_deserializer__: D)
-> ::std::result::Result<impl_deserialize_Field__, D::Error>
where D: $crate::serde::Deserializer
{
struct impl_deserialize_FieldVisitor__;
impl $crate::serde::de::Visitor for impl_deserialize_FieldVisitor__ {
type Value = impl_deserialize_Field__;
fn expecting(&self, formatter: &mut ::std::fmt::Formatter)
-> ::std::fmt::Result
{
formatter.write_str("an unsigned integer")
}
fn visit_u64<E>(self, impl_deserialize_value__: u64)
-> ::std::result::Result<impl_deserialize_Field__, E>
where E: $crate::serde::de::Error,
{
if impl_deserialize_value__ == 0 {
return ::std::result::Result::Err(
$crate::serde::de::Error::custom(
"Variant 0 is a sentinel value and should not \
be serialized!"));
}
$(
if impl_deserialize_value__ == $n {
return ::std::result::Result::Ok(
impl_deserialize_Field__::$name);
}
)*
::std::result::Result::Err(
$crate::serde::de::Error::custom(
format!("No variants have a value of {}!",
impl_deserialize_value__))
)
}
}
impl_deserialize_deserializer__.deserialize_struct_field(
impl_deserialize_FieldVisitor__)
}
}
struct Visitor;
impl $crate::serde::de::Visitor for Visitor {
type Value = $impler;
fn expecting(&self, formatter: &mut ::std::fmt::Formatter)
-> ::std::fmt::Result
{
formatter.write_str("an enum variant")
}
fn visit_enum<V>(self, visitor__: V)
-> ::std::result::Result<Self::Value, V::Error>
where V: $crate::serde::de::EnumVisitor
{
use $crate::serde::de::VariantVisitor;
match visitor__.visit_variant()? {
$(
(impl_deserialize_Field__::$name, variant) => {
::std::result::Result::Ok(
$impler::$name(variant.visit_newtype()?))
}
),*
}
}
}
const TARPC_VARIANTS__: &'static [&'static str] = &[
$(
stringify!($name)
),*
];
impl_deserialize_deserializer__.deserialize_enum(
stringify!($impler), TARPC_VARIANTS__, Visitor)
}
}
);
// All args are wrapped in a tuple so we can use the newtype variant for each one.
($impler:ident, $(@$finished:tt)* -- #($n:expr) $name:ident($field:ty) $($req:tt)*) => (
impl_deserialize!($impler, $(@$finished)* @($name $n) -- #($n + 1) $($req)*);
);
// Entry
($impler:ident, $($started:tt)*) => (impl_deserialize!($impler, -- #(0) $($started)*););
}
/// The main macro that creates RPC services. /// The main macro that creates RPC services.
/// ///
/// Rpc methods are specified, mirroring trait syntax: /// Rpc methods are specified, mirroring trait syntax:
/// ///
/// ``` /// ```
/// # #![feature(plugin)] /// # #![feature(plugin, use_extern_macros)]
/// # #![plugin(tarpc_plugins)] /// # #![plugin(tarpc_plugins)]
/// # #[macro_use] extern crate tarpc; /// # #[macro_use] extern crate tarpc;
/// # fn main() {} /// # fn main() {}
@@ -290,41 +144,31 @@ macro_rules! service {
#[doc(hidden)] #[doc(hidden)]
#[allow(non_camel_case_types, unused)] #[allow(non_camel_case_types, unused)]
#[derive($crate::serde_derive::Serialize, $crate::serde_derive::Deserialize)]
pub enum Request__ { pub enum Request__ {
NotIrrefutable(()),
$( $(
$fn_name(( $($in_,)* )) $fn_name{ $($arg: $in_,)* }
),* ),*
} }
impl_deserialize!(Request__, NotIrrefutable(()) $($fn_name(($($in_),*)))*);
impl_serialize!(Request__, {}, NotIrrefutable(()) $($fn_name(($($in_),*)))*);
#[doc(hidden)] #[doc(hidden)]
#[allow(non_camel_case_types, unused)] #[allow(non_camel_case_types, unused)]
#[derive($crate::serde_derive::Serialize, $crate::serde_derive::Deserialize)]
pub enum Response__ { pub enum Response__ {
NotIrrefutable(()),
$( $(
$fn_name($out) $fn_name($out)
),* ),*
} }
impl_deserialize!(Response__, NotIrrefutable(()) $($fn_name($out))*);
impl_serialize!(Response__, {}, NotIrrefutable(()) $($fn_name($out))*);
#[doc(hidden)] #[doc(hidden)]
#[allow(non_camel_case_types, unused)] #[allow(non_camel_case_types, unused)]
#[derive(Debug)] #[derive(Debug, $crate::serde_derive::Deserialize, $crate::serde_derive::Serialize)]
pub enum Error__ { pub enum Error__ {
NotIrrefutable(()),
$( $(
$fn_name($error) $fn_name($error)
),* ),*
} }
impl_deserialize!(Error__, NotIrrefutable(()) $($fn_name($error))*);
impl_serialize!(Error__, {}, NotIrrefutable(()) $($fn_name($error))*);
/// Defines the `Future` RPC service. Implementors must be `Clone` and `'static`, /// Defines the `Future` RPC service. Implementors must be `Clone` and `'static`,
/// as required by `tokio_proto::NewService`. This is required so that the service can be used /// as required by `tokio_proto::NewService`. This is required so that the service can be used
/// to respond to multiple requests concurrently. /// to respond to multiple requests concurrently.
@@ -408,10 +252,10 @@ macro_rules! service {
::std::string::ToString::to_string(&err__))))); ::std::string::ToString::to_string(&err__)))));
} }
}; };
#[allow(unreachable_patterns)]
match request__ { match request__ {
Request__::NotIrrefutable(()) => unreachable!(),
$( $(
Request__::$fn_name(( $($arg,)* )) => { Request__::$fn_name{ $($arg,)* } => {
fn wrap__(response__: ::std::result::Result<$out, $error>) fn wrap__(response__: ::std::result::Result<$out, $error>)
-> ResponseFuture__ -> ResponseFuture__
{ {
@@ -430,6 +274,7 @@ macro_rules! service {
wrap__)); wrap__));
} }
)* )*
_ => unreachable!(),
} }
} }
} }
@@ -581,7 +426,7 @@ macro_rules! service {
-> ::std::result::Result<$out, $crate::Error<$error>> -> ::std::result::Result<$out, $crate::Error<$error>>
{ {
tarpc_service_then__!($out, $error, $fn_name); tarpc_service_then__!($out, $error, $fn_name);
let resp__ = self.inner.call(Request__::$fn_name(($($arg,)*))); let resp__ = self.inner.call(Request__::$fn_name { $($arg,)* });
tarpc_service_then__(resp__) tarpc_service_then__(resp__)
} }
)* )*
@@ -646,7 +491,7 @@ macro_rules! service {
-> ::std::result::Result<$out, $crate::Error<$error>>> { -> ::std::result::Result<$out, $crate::Error<$error>>> {
tarpc_service_then__!($out, $error, $fn_name); tarpc_service_then__!($out, $error, $fn_name);
let request__ = Request__::$fn_name(($($arg,)*)); let request__ = Request__::$fn_name { $($arg,)* };
let future__ = $crate::tokio_service::Service::call(&self.0, request__); let future__ = $crate::tokio_service::Service::call(&self.0, request__);
return $crate::futures::Future::then(future__, tarpc_service_then__); return $crate::futures::Future::then(future__, tarpc_service_then__);
} }
@@ -665,23 +510,21 @@ macro_rules! tarpc_service_then__ {
-> ::std::result::Result<$out, $crate::Error<$error>> { -> ::std::result::Result<$out, $crate::Error<$error>> {
match msg__ { match msg__ {
::std::result::Result::Ok(msg__) => { ::std::result::Result::Ok(msg__) => {
if let Response__::$fn_name(msg__) = #[allow(unreachable_patterns)]
msg__ match msg__ {
{ Response__::$fn_name(msg__) =>
::std::result::Result::Ok(msg__) ::std::result::Result::Ok(msg__),
} else { _ => unreachable!(),
unreachable!()
} }
} }
::std::result::Result::Err(err__) => { ::std::result::Result::Err(err__) => {
::std::result::Result::Err(match err__ { ::std::result::Result::Err(match err__ {
$crate::Error::App(err__) => { $crate::Error::App(err__) => {
if let Error__::$fn_name( #[allow(unreachable_patterns)]
err__) = err__ match err__ {
{ Error__::$fn_name(err__) =>
$crate::Error::App(err__) $crate::Error::App(err__),
} else { _ => unreachable!(),
unreachable!()
} }
} }
$crate::Error::RequestDeserialize(err__) => { $crate::Error::RequestDeserialize(err__) => {
@@ -777,8 +620,7 @@ mod functional_test {
if #[cfg(target_os = "macos")] { if #[cfg(target_os = "macos")] {
extern crate security_framework; extern crate security_framework;
use self::security_framework::certificate::SecCertificate; use native_tls_inner::Certificate;
use native_tls_inner::backend::security_framework::TlsConnectorBuilderExt;
fn get_future_tls_client_options() -> future::client::Options { fn get_future_tls_client_options() -> future::client::Options {
future::client::Options::default().tls(get_tls_client_context()) future::client::Options::default().tls(get_tls_client_context())
@@ -790,9 +632,9 @@ mod functional_test {
fn get_tls_client_context() -> Context { fn get_tls_client_context() -> Context {
let buf = include_bytes!("../test/root-ca.der"); let buf = include_bytes!("../test/root-ca.der");
let cert = unwrap!(SecCertificate::from_der(buf)); let cert = unwrap!(Certificate::from_der(buf));
let mut connector = unwrap!(TlsConnector::builder()); let mut connector = unwrap!(TlsConnector::builder());
connector.anchor_certificates(&[cert]); connector.add_root_certificate(cert);
Context { Context {
domain: DOMAIN.into(), domain: DOMAIN.into(),
@@ -1009,8 +851,8 @@ mod functional_test {
#[test] #[test]
fn simple() { fn simple() {
let _ = env_logger::init(); let _ = env_logger::init();
let (_, client, _) = unwrap!(start_server_with_sync_client::<SyncClient, let (_, client, _) =
Server>(Server)); unwrap!(start_server_with_sync_client::<SyncClient, Server>(Server));
assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!(3, client.add(1, 2).unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
} }
@@ -1020,8 +862,8 @@ mod functional_test {
use futures::{Async, Future}; use futures::{Async, Future};
let _ = env_logger::init(); let _ = env_logger::init();
let (addr, client, shutdown) = unwrap!(start_server_with_sync_client::<SyncClient, let (addr, client, shutdown) =
Server>(Server)); unwrap!(start_server_with_sync_client::<SyncClient, Server>(Server));
assert_eq!(3, unwrap!(client.add(1, 2))); assert_eq!(3, unwrap!(client.add(1, 2)));
assert_eq!("Hey, Tim.", unwrap!(client.hey("Tim".to_string()))); assert_eq!("Hey, Tim.", unwrap!(client.hey("Tim".to_string())));
@@ -1056,8 +898,8 @@ mod functional_test {
#[test] #[test]
fn no_shutdown() { fn no_shutdown() {
let _ = env_logger::init(); let _ = env_logger::init();
let (addr, client, shutdown) = unwrap!(start_server_with_sync_client::<SyncClient, let (addr, client, shutdown) =
Server>(Server)); unwrap!(start_server_with_sync_client::<SyncClient, Server>(Server));
assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!(3, client.add(1, 2).unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
@@ -1072,9 +914,10 @@ mod functional_test {
#[test] #[test]
fn other_service() { fn other_service() {
let _ = env_logger::init(); let _ = env_logger::init();
let (_, client, _) = let (_, client, _) = unwrap!(start_server_with_sync_client::<
unwrap!(start_server_with_sync_client::<super::other_service::SyncClient, super::other_service::SyncClient,
Server>(Server)); Server,
>(Server));
match client.foo().err().expect("failed unwrap") { match client.foo().err().expect("failed unwrap") {
::Error::RequestDeserialize(_) => {} // good ::Error::RequestDeserialize(_) => {} // good
bad => panic!("Expected Error::RequestDeserialize but got {}", bad), bad => panic!("Expected Error::RequestDeserialize but got {}", bad),
@@ -1093,7 +936,8 @@ mod functional_test {
impl Serialize for Bad { impl Serialize for Bad {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: Serializer where
S: Serializer,
{ {
serializer.serialize_seq(None)?.end() serializer.serialize_seq(None)?.end()
} }
@@ -1111,7 +955,9 @@ mod functional_test {
#[test] #[test]
fn bad_serialize() { fn bad_serialize() {
let handle = ().listen("localhost:0", server::Options::default()).unwrap(); let handle = ()
.listen("localhost:0", server::Options::default())
.unwrap();
let client = SyncClient::connect(handle.addr(), client::Options::default()).unwrap(); let client = SyncClient::connect(handle.addr(), client::Options::default()).unwrap();
client.bad(Bad).err().unwrap(); client.bad(Bad).err().unwrap();
} }
@@ -1144,11 +990,14 @@ mod functional_test {
#[test] #[test]
fn simple() { fn simple() {
let _ = env_logger::init(); let _ = env_logger::init();
let (_, mut reactor, client) = let (_, mut reactor, client) = unwrap!(
unwrap!(start_server_with_async_client::<FutureClient, Server>(Server)); start_server_with_async_client::<FutureClient, Server>(Server)
);
assert_eq!(3, reactor.run(client.add(1, 2)).unwrap()); assert_eq!(3, reactor.run(client.add(1, 2)).unwrap());
assert_eq!("Hey, Tim.", assert_eq!(
reactor.run(client.hey("Tim".to_string())).unwrap()); "Hey, Tim.",
reactor.run(client.hey("Tim".to_string())).unwrap()
);
} }
#[test] #[test]
@@ -1179,8 +1028,9 @@ mod functional_test {
#[test] #[test]
fn concurrent() { fn concurrent() {
let _ = env_logger::init(); let _ = env_logger::init();
let (_, mut reactor, client) = let (_, mut reactor, client) = unwrap!(
unwrap!(start_server_with_async_client::<FutureClient, Server>(Server)); start_server_with_async_client::<FutureClient, Server>(Server)
);
let req1 = client.add(1, 2); let req1 = client.add(1, 2);
let req2 = client.add(3, 4); let req2 = client.add(3, 4);
let req3 = client.hey("Tim".to_string()); let req3 = client.hey("Tim".to_string());
@@ -1192,9 +1042,10 @@ mod functional_test {
#[test] #[test]
fn other_service() { fn other_service() {
let _ = env_logger::init(); let _ = env_logger::init();
let (_, mut reactor, client) = let (_, mut reactor, client) = unwrap!(start_server_with_async_client::<
unwrap!(start_server_with_async_client::<super::other_service::FutureClient, super::other_service::FutureClient,
Server>(Server)); Server,
>(Server));
match reactor.run(client.foo()).err().unwrap() { match reactor.run(client.foo()).err().unwrap() {
::Error::RequestDeserialize(_) => {} // good ::Error::RequestDeserialize(_) => {} // good
bad => panic!(r#"Expected Error::RequestDeserialize but got "{}""#, bad), bad => panic!(r#"Expected Error::RequestDeserialize but got "{}""#, bad),
@@ -1209,12 +1060,17 @@ mod functional_test {
let _ = env_logger::init(); let _ = env_logger::init();
let reactor = reactor::Core::new().unwrap(); let reactor = reactor::Core::new().unwrap();
let handle = Server.listen("localhost:0".first_socket_addr(), let handle = Server
&reactor.handle(), .listen(
server::Options::default()) "localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default(),
)
.unwrap() .unwrap()
.0; .0;
Server.listen(handle.addr(), &reactor.handle(), server::Options::default()).unwrap(); Server
.listen(handle.addr(), &reactor.handle(), server::Options::default())
.unwrap();
} }
#[test] #[test]
@@ -1226,20 +1082,27 @@ mod functional_test {
let _ = env_logger::init(); let _ = env_logger::init();
let mut reactor = reactor::Core::new().unwrap(); let mut reactor = reactor::Core::new().unwrap();
let (handle, server) = Server.listen("localhost:0".first_socket_addr(), let (handle, server) = Server
&reactor.handle(), .listen(
server::Options::default()) "localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default(),
)
.unwrap(); .unwrap();
reactor.handle().spawn(server); reactor.handle().spawn(server);
let client = FutureClient::connect(handle.addr(), let client = FutureClient::connect(
client::Options::default().handle(reactor.handle())); handle.addr(),
client::Options::default().handle(reactor.handle()),
);
let client = unwrap!(reactor.run(client)); let client = unwrap!(reactor.run(client));
assert_eq!(reactor.run(client.add(1, 2)).unwrap(), 3); assert_eq!(reactor.run(client.add(1, 2)).unwrap(), 3);
drop(client); drop(client);
let client = FutureClient::connect(handle.addr(), let client = FutureClient::connect(
client::Options::default().handle(reactor.handle())); handle.addr(),
client::Options::default().handle(reactor.handle()),
);
let client = unwrap!(reactor.run(client)); let client = unwrap!(reactor.run(client));
assert_eq!(reactor.run(client.add(1, 2)).unwrap(), 3); assert_eq!(reactor.run(client.add(1, 2)).unwrap(), 3);
} }
@@ -1253,22 +1116,32 @@ mod functional_test {
use super::FutureServiceExt; use super::FutureServiceExt;
let _ = env_logger::init(); let _ = env_logger::init();
let (_, mut reactor, client) = let (_, mut reactor, client) = unwrap!(
unwrap!(start_server_with_async_client::<FutureClient, Server>(Server)); start_server_with_async_client::<FutureClient, Server>(Server)
);
assert_eq!(3, reactor.run(client.add(1, 2)).unwrap()); assert_eq!(3, reactor.run(client.add(1, 2)).unwrap());
assert_eq!("Hey, Tim.", assert_eq!(
reactor.run(client.hey("Tim".to_string())).unwrap()); "Hey, Tim.",
reactor.run(client.hey("Tim".to_string())).unwrap()
);
let (handle, server) = Server.listen("localhost:0".first_socket_addr(), let (handle, server) = Server
&reactor.handle(), .listen(
server::Options::default()) "localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default(),
)
.unwrap(); .unwrap();
reactor.handle().spawn(server); reactor.handle().spawn(server);
let options = client::Options::default().handle(reactor.handle()); let options = client::Options::default().handle(reactor.handle());
let client = reactor.run(FutureClient::connect(handle.addr(), options)).unwrap(); let client = reactor
.run(FutureClient::connect(handle.addr(), options))
.unwrap();
assert_eq!(3, reactor.run(client.add(1, 2)).unwrap()); assert_eq!(3, reactor.run(client.add(1, 2)).unwrap());
assert_eq!("Hey, Tim.", assert_eq!(
reactor.run(client.hey("Tim".to_string())).unwrap()); "Hey, Tim.",
reactor.run(client.hey("Tim".to_string())).unwrap()
);
} }
} }
@@ -1298,16 +1171,16 @@ mod functional_test {
let (_, mut reactor, client) = let (_, mut reactor, client) =
start_err_server_with_async_client::<FutureClient, ErrorServer>(ErrorServer).unwrap(); start_err_server_with_async_client::<FutureClient, ErrorServer>(ErrorServer).unwrap();
reactor.run(client.bar() reactor
.then(move |result| { .run(client.bar().then(move |result| {
match result.err().unwrap() { match result.err().unwrap() {
::Error::App(e) => { ::Error::App(e) => {
assert_eq!(e.description(), "lol jk"); assert_eq!(e.description(), "lol jk");
Ok::<_, ()>(()) Ok::<_, ()>(())
} // good } // good
bad => panic!("Expected Error::App but got {:?}", bad), bad => panic!("Expected Error::App but got {:?}", bad),
} }
})) }))
.unwrap(); .unwrap();
} }

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "tarpc-plugins" name = "tarpc-plugins"
version = "0.1.1" version = "0.2.0"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"] authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
license = "MIT" license = "MIT"
documentation = "https://docs.rs/tarpc" documentation = "https://docs.rs/tarpc"
@@ -15,7 +15,7 @@ description = "Plugins for tarpc, an RPC framework for Rust with a focus on ease
travis-ci = { repository = "google/tarpc" } travis-ci = { repository = "google/tarpc" }
[dependencies] [dependencies]
itertools = "0.5" itertools = "0.6"
[lib] [lib]
plugin = true plugin = true

View File

@@ -65,7 +65,7 @@ fn snake_to_camel(cx: &mut ExtCtxt, sp: Span, tts: &[TokenTree]) -> Box<MacResul
if let Some(tt @ TokenTree::Token(_, token::Eq)) = tokens.next() { if let Some(tt @ TokenTree::Token(_, token::Eq)) = tokens.next() {
let mut docstr = tokens.next().expect("Docstrings must have literal docstring"); let mut docstr = tokens.next().expect("Docstrings must have literal docstring");
if let TokenTree::Token(_, token::Literal(token::Str_(ref mut doc), _)) = docstr { if let TokenTree::Token(_, token::Literal(token::Str_(ref mut doc), _)) = docstr {
*doc = Symbol::intern(&str_lit(&doc.as_str()).replace("{}", &old_ident)); *doc = Symbol::intern(&str_lit(&doc.as_str(), None).replace("{}", &old_ident));
} else { } else {
unreachable!(); unreachable!();
} }

View File

@@ -3,11 +3,11 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>. // Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms. // This file may not be copied, modified, or distributed except according to those terms.
use serde;
use bincode::{self, Infinite}; use bincode::{self, Infinite};
use byteorder::{BigEndian, ReadBytesExt}; use byteorder::{BigEndian, ReadBytesExt};
use bytes::BytesMut; use bytes::BytesMut;
use bytes::buf::BufMut; use bytes::buf::BufMut;
use serde;
use std::io::{self, Cursor}; use std::io::{self, Cursor};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::mem; use std::mem;
@@ -42,16 +42,25 @@ impl<Encode, Decode> Codec<Encode, Decode> {
} }
fn too_big(payload_size: u64, max_payload_size: u64) -> io::Error { fn too_big(payload_size: u64, max_payload_size: u64) -> io::Error {
warn!("Not sending too-big packet of size {} (max is {})", warn!(
payload_size, max_payload_size); "Not sending too-big packet of size {} (max is {})",
io::Error::new(io::ErrorKind::InvalidData, payload_size,
format!("Maximum payload size is {} bytes but got a payload of {}", max_payload_size
max_payload_size, payload_size)) );
io::Error::new(
io::ErrorKind::InvalidData,
format!(
"Maximum payload size is {} bytes but got a payload of {}",
max_payload_size,
payload_size
),
)
} }
impl<Encode, Decode> Encoder for Codec<Encode, Decode> impl<Encode, Decode> Encoder for Codec<Encode, Decode>
where Encode: serde::Serialize, where
Decode: serde::Deserialize Encode: serde::Serialize,
Decode: serde::de::DeserializeOwned,
{ {
type Item = (RequestId, Encode); type Item = (RequestId, Encode);
type Error = io::Error; type Error = io::Error;
@@ -66,17 +75,18 @@ impl<Encode, Decode> Encoder for Codec<Encode, Decode>
buf.put_u64::<BigEndian>(id); buf.put_u64::<BigEndian>(id);
trace!("Encoded request id = {} as {:?}", id, buf); trace!("Encoded request id = {} as {:?}", id, buf);
buf.put_u64::<BigEndian>(payload_size); buf.put_u64::<BigEndian>(payload_size);
bincode::serialize_into(&mut buf.writer(), bincode::serialize_into(&mut buf.writer(), &message, Infinite)
&message, .map_err(|serialize_err| {
Infinite) io::Error::new(io::ErrorKind::Other, serialize_err)
.map_err(|serialize_err| io::Error::new(io::ErrorKind::Other, serialize_err))?; })?;
trace!("Encoded buffer: {:?}", buf); trace!("Encoded buffer: {:?}", buf);
Ok(()) Ok(())
} }
} }
impl<Encode, Decode> Decoder for Codec<Encode, Decode> impl<Encode, Decode> Decoder for Codec<Encode, Decode>
where Decode: serde::Deserialize where
Decode: serde::de::DeserializeOwned,
{ {
type Item = (RequestId, Result<Decode, bincode::Error>); type Item = (RequestId, Result<Decode, bincode::Error>);
type Error = io::Error; type Error = io::Error;
@@ -98,31 +108,36 @@ impl<Encode, Decode> Decoder for Codec<Encode, Decode>
self.state = Len { id: id }; self.state = Len { id: id };
} }
Len { .. } if buf.len() < mem::size_of::<u64>() => { Len { .. } if buf.len() < mem::size_of::<u64>() => {
trace!("--> Buf len is {}; waiting for 8 to parse packet length.", trace!(
buf.len()); "--> Buf len is {}; waiting for 8 to parse packet length.",
buf.len()
);
return Ok(None); return Ok(None);
} }
Len { id } => { Len { id } => {
let len_buf = buf.split_to(mem::size_of::<u64>()); let len_buf = buf.split_to(mem::size_of::<u64>());
let len = Cursor::new(len_buf).read_u64::<BigEndian>()?; let len = Cursor::new(len_buf).read_u64::<BigEndian>()?;
trace!("--> Parsed payload length = {}, remaining buffer length = {}", trace!(
len, "--> Parsed payload length = {}, remaining buffer length = {}",
buf.len()); len,
buf.len()
);
if len > self.max_payload_size { if len > self.max_payload_size {
return Err(too_big(len, self.max_payload_size)); return Err(too_big(len, self.max_payload_size));
} }
self.state = Payload { id: id, len: len }; self.state = Payload { id: id, len: len };
} }
Payload { len, .. } if buf.len() < len as usize => { Payload { len, .. } if buf.len() < len as usize => {
trace!("--> Buf len is {}; waiting for {} to parse payload.", trace!(
buf.len(), "--> Buf len is {}; waiting for {} to parse payload.",
len); buf.len(),
len
);
return Ok(None); return Ok(None);
} }
Payload { id, len } => { Payload { id, len } => {
let payload = buf.split_to(len as usize); let payload = buf.split_to(len as usize);
let result = bincode::deserialize_from(&mut Cursor::new(payload), let result = bincode::deserialize_from(&mut Cursor::new(payload), Infinite);
Infinite);
// Reset the state machine because, either way, we're done processing this // Reset the state machine because, either way, we're done processing this
// message. // message.
self.state = Id; self.state = Id;
@@ -146,15 +161,16 @@ impl<Encode, Decode> Proto<Encode, Decode> {
pub fn new(max_payload_size: u64) -> Self { pub fn new(max_payload_size: u64) -> Self {
Proto { Proto {
max_payload_size: max_payload_size, max_payload_size: max_payload_size,
_phantom_data: PhantomData _phantom_data: PhantomData,
} }
} }
} }
impl<T, Encode, Decode> ServerProto<T> for Proto<Encode, Decode> impl<T, Encode, Decode> ServerProto<T> for Proto<Encode, Decode>
where T: AsyncRead + AsyncWrite + 'static, where
Encode: serde::Serialize + 'static, T: AsyncRead + AsyncWrite + 'static,
Decode: serde::Deserialize + 'static Encode: serde::Serialize + 'static,
Decode: serde::de::DeserializeOwned + 'static,
{ {
type Response = Encode; type Response = Encode;
type Request = Result<Decode, bincode::Error>; type Request = Result<Decode, bincode::Error>;
@@ -167,9 +183,10 @@ impl<T, Encode, Decode> ServerProto<T> for Proto<Encode, Decode>
} }
impl<T, Encode, Decode> ClientProto<T> for Proto<Encode, Decode> impl<T, Encode, Decode> ClientProto<T> for Proto<Encode, Decode>
where T: AsyncRead + AsyncWrite + 'static, where
Encode: serde::Serialize + 'static, T: AsyncRead + AsyncWrite + 'static,
Decode: serde::Deserialize + 'static Encode: serde::Serialize + 'static,
Decode: serde::de::DeserializeOwned + 'static,
{ {
type Response = Result<Decode, bincode::Error>; type Response = Result<Decode, bincode::Error>;
type Request = Encode; type Request = Encode;
@@ -190,8 +207,10 @@ fn serialize() {
for _ in 0..2 { for _ in 0..2 {
let mut codec: Codec<(char, char, char), (char, char, char)> = Codec::new(2_000_000); let mut codec: Codec<(char, char, char), (char, char, char)> = Codec::new(2_000_000);
codec.encode(MSG, &mut buf).unwrap(); codec.encode(MSG, &mut buf).unwrap();
let actual: Result<Option<(u64, Result<(char, char, char), bincode::Error>)>, io::Error> = let actual: Result<
codec.decode(&mut buf); Option<(u64, Result<(char, char, char), bincode::Error>)>,
io::Error,
> = codec.decode(&mut buf);
match actual { match actual {
Ok(Some((id, ref v))) if id == MSG.0 && *v.as_ref().unwrap() == MSG.1 => {} Ok(Some((id, ref v))) if id == MSG.0 && *v.as_ref().unwrap() == MSG.1 => {}
@@ -207,13 +226,21 @@ fn deserialize_big() {
let mut codec: Codec<Vec<u8>, Vec<u8>> = Codec::new(24); let mut codec: Codec<Vec<u8>, Vec<u8>> = Codec::new(24);
let mut buf = BytesMut::with_capacity(40); let mut buf = BytesMut::with_capacity(40);
assert_eq!(codec.encode((0, vec![0; 24]), &mut buf).err().unwrap().kind(), assert_eq!(
io::ErrorKind::InvalidData); codec
.encode((0, vec![0; 24]), &mut buf)
.err()
.unwrap()
.kind(),
io::ErrorKind::InvalidData
);
// Header // Header
buf.put_slice(&mut [0u8; 8]); buf.put_slice(&mut [0u8; 8]);
// Len // Len
buf.put_slice(&mut [0u8, 0, 0, 0, 0, 0, 0, 25]); buf.put_slice(&mut [0u8, 0, 0, 0, 0, 0, 0, 25]);
assert_eq!(codec.decode(&mut buf).err().unwrap().kind(), assert_eq!(
io::ErrorKind::InvalidData); codec.decode(&mut buf).err().unwrap().kind(),
io::ErrorKind::InvalidData
);
} }

View File

@@ -1,19 +1,19 @@
use future::client::{Client as FutureClient, ClientExt as FutureClientExt, use future::client::{Client as FutureClient, ClientExt as FutureClientExt,
Options as FutureOptions}; Options as FutureOptions};
/// Exposes a trait for connecting synchronously to servers.
use futures::{Future, Stream}; use futures::{Future, Stream};
use serde::{Deserialize, Serialize}; use serde::Serialize;
use serde::de::DeserializeOwned;
use std::fmt; use std::fmt;
use std::io; use std::io;
use std::net::{SocketAddr, ToSocketAddrs}; use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::mpsc; use std::sync::mpsc;
use std::thread; use std::thread;
#[cfg(feature = "tls")]
use tls::client::Context;
use tokio_core::reactor; use tokio_core::reactor;
use tokio_proto::util::client_proxy::{ClientProxy, Receiver, pair}; use tokio_proto::util::client_proxy::{ClientProxy, Receiver, pair};
use tokio_service::Service; use tokio_service::Service;
use util::FirstSocketAddr; use util::FirstSocketAddr;
#[cfg(feature = "tls")]
use tls::client::Context;
#[doc(hidden)] #[doc(hidden)]
pub struct Client<Req, Resp, E> { pub struct Client<Req, Resp, E> {
@@ -22,23 +22,24 @@ pub struct Client<Req, Resp, E> {
impl<Req, Resp, E> Clone for Client<Req, Resp, E> { impl<Req, Resp, E> Clone for Client<Req, Resp, E> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Client { proxy: self.proxy.clone() } Client {
proxy: self.proxy.clone(),
}
} }
} }
impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E> { impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E> {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
const PROXY: &'static &'static str = &"ClientProxy { .. }"; const PROXY: &'static &'static str = &"ClientProxy { .. }";
f.debug_struct("Client") f.debug_struct("Client").field("proxy", PROXY).finish()
.field("proxy", PROXY)
.finish()
} }
} }
impl<Req, Resp, E> Client<Req, Resp, E> impl<Req, Resp, E> Client<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static, where
Resp: Deserialize + Sync + Send + 'static, Req: Serialize + Send + 'static,
E: Deserialize + Sync + Send + 'static Resp: DeserializeOwned + Send + 'static,
E: DeserializeOwned + Send + 'static,
{ {
/// Drives an RPC call for the given request. /// Drives an RPC call for the given request.
pub fn call(&self, request: Req) -> Result<Resp, ::Error<E>> { pub fn call(&self, request: Req) -> Result<Resp, ::Error<E>> {
@@ -47,7 +48,6 @@ impl<Req, Resp, E> Client<Req, Resp, E>
// oneshot send. // oneshot send.
self.proxy.call(request).wait() self.proxy.call(request).wait()
} }
} }
/// Additional options to configure how the client connects and operates. /// Additional options to configure how the client connects and operates.
@@ -97,8 +97,7 @@ impl fmt::Debug for Options {
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
const NONE: &'static &'static str = &"None"; const NONE: &'static &'static str = &"None";
let mut f = f.debug_struct("Options"); let mut f = f.debug_struct("Options");
#[cfg(feature = "tls")] #[cfg(feature = "tls")] f.field("tls_ctx", if self.tls_ctx.is_some() { SOME } else { NONE });
f.field("tls_ctx", if self.tls_ctx.is_some() { SOME } else { NONE });
f.finish() f.finish()
} }
} }
@@ -124,27 +123,29 @@ impl Into<FutureOptions> for (reactor::Handle, Options) {
/// Extension methods for Clients. /// Extension methods for Clients.
pub trait ClientExt: Sized { pub trait ClientExt: Sized {
/// Connects to a server located at the given address. /// Connects to a server located at the given address.
fn connect<A>(addr: A, options: Options) -> io::Result<Self> where A: ToSocketAddrs; fn connect<A>(addr: A, options: Options) -> io::Result<Self>
where
A: ToSocketAddrs;
} }
impl<Req, Resp, E> ClientExt for Client<Req, Resp, E> impl<Req, Resp, E> ClientExt for Client<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static, where
Resp: Deserialize + Sync + Send + 'static, Req: Serialize + Send + 'static,
E: Deserialize + Sync + Send + 'static Resp: DeserializeOwned + Send + 'static,
E: DeserializeOwned + Send + 'static,
{ {
fn connect<A>(addr: A, options: Options) -> io::Result<Self> fn connect<A>(addr: A, options: Options) -> io::Result<Self>
where A: ToSocketAddrs where
A: ToSocketAddrs,
{ {
let addr = addr.try_first_socket_addr()?; let addr = addr.try_first_socket_addr()?;
let (connect_tx, connect_rx) = mpsc::channel(); let (connect_tx, connect_rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || match RequestHandler::connect(addr, options) {
match RequestHandler::connect(addr, options) { Ok((proxy, mut handler)) => {
Ok((proxy, mut handler)) => { connect_tx.send(Ok(proxy)).unwrap();
connect_tx.send(Ok(proxy)).unwrap(); handler.handle_requests();
handler.handle_requests();
}
Err(e) => connect_tx.send(Err(e)).unwrap(),
} }
Err(e) => connect_tx.send(Err(e)).unwrap(),
}); });
Ok(connect_rx.recv().unwrap()?) Ok(connect_rx.recv().unwrap()?)
} }
@@ -160,32 +161,43 @@ struct RequestHandler<Req, Resp, E, S> {
} }
impl<Req, Resp, E> RequestHandler<Req, Resp, E, FutureClient<Req, Resp, E>> impl<Req, Resp, E> RequestHandler<Req, Resp, E, FutureClient<Req, Resp, E>>
where Req: Serialize + Sync + Send + 'static, where
Resp: Deserialize + Sync + Send + 'static, Req: Serialize + Send + 'static,
E: Deserialize + Sync + Send + 'static Resp: DeserializeOwned + Send + 'static,
E: DeserializeOwned + Send + 'static,
{ {
/// Creates a new `RequestHandler` by connecting a `FutureClient` to the given address /// Creates a new `RequestHandler` by connecting a `FutureClient` to the given address
/// using the given options. /// using the given options.
fn connect(addr: SocketAddr, options: Options) fn connect(addr: SocketAddr, options: Options) -> io::Result<(Client<Req, Resp, E>, Self)> {
-> io::Result<(Client<Req, Resp, E>, Self)>
{
let mut reactor = reactor::Core::new()?; let mut reactor = reactor::Core::new()?;
let options = (reactor.handle(), options).into(); let options = (reactor.handle(), options).into();
let client = reactor.run(FutureClient::connect(addr, options))?; let client = reactor.run(FutureClient::connect(addr, options))?;
let (proxy, requests) = pair(); let (proxy, requests) = pair();
Ok((Client { proxy }, RequestHandler { reactor, client, requests })) Ok((
Client { proxy },
RequestHandler {
reactor,
client,
requests,
},
))
} }
} }
impl<Req, Resp, E, S> RequestHandler<Req, Resp, E, S> impl<Req, Resp, E, S> RequestHandler<Req, Resp, E, S>
where Req: Serialize + 'static, where
Resp: Deserialize + 'static, Req: Serialize + 'static,
E: Deserialize + 'static, Resp: DeserializeOwned + 'static,
S: Service<Request = Req, Response = Resp, Error = ::Error<E>>, E: DeserializeOwned + 'static,
S::Future: 'static, S: Service<Request = Req, Response = Resp, Error = ::Error<E>>,
S::Future: 'static,
{ {
fn handle_requests(&mut self) { fn handle_requests(&mut self) {
let RequestHandler { ref mut reactor, ref mut requests, ref mut client } = *self; let RequestHandler {
ref mut reactor,
ref mut requests,
ref mut client,
} = *self;
let handle = reactor.handle(); let handle = reactor.handle();
let requests = requests let requests = requests
.map(|result| { .map(|result| {
@@ -196,14 +208,14 @@ impl<Req, Resp, E, S> RequestHandler<Req, Resp, E, S>
} }
}) })
.for_each(|(request, response_tx)| { .for_each(|(request, response_tx)| {
let request = client.call(request) let request = client.call(request).then(move |response| {
.then(move |response| { // Safe to unwrap because clients always block on the response future.
// Safe to unwrap because clients always block on the response future. response_tx
response_tx.send(response) .send(response)
.map_err(|_| ()) .map_err(|_| ())
.expect("Client should block on response"); .expect("Client should block on response");
Ok(()) Ok(())
}); });
handle.spawn(request); handle.spawn(request);
Ok(()) Ok(())
}); });
@@ -230,7 +242,11 @@ fn handle_requests() {
let (request, requests) = ::futures::sync::mpsc::unbounded(); let (request, requests) = ::futures::sync::mpsc::unbounded();
let reactor = reactor::Core::new().unwrap(); let reactor = reactor::Core::new().unwrap();
let client = Client; let client = Client;
let mut request_handler = RequestHandler { reactor, client, requests }; let mut request_handler = RequestHandler {
reactor,
client,
requests,
};
// Test that `handle_requests` returns when all request senders are dropped. // Test that `handle_requests` returns when all request senders are dropped.
drop(request); drop(request);
request_handler.handle_requests(); request_handler.handle_requests();

View File

@@ -2,17 +2,18 @@ use {bincode, future, num_cpus};
use future::server::{Response, Shutdown}; use future::server::{Response, Shutdown};
use futures::{Future, future as futures}; use futures::{Future, future as futures};
use futures::sync::oneshot; use futures::sync::oneshot;
use serde::{Deserialize, Serialize}; #[cfg(feature = "tls")]
use std::io; use native_tls_inner::TlsAcceptor;
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::fmt; use std::fmt;
use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::Duration; use std::time::Duration;
use std::usize; use std::usize;
use thread_pool::{self, Sender, Task, ThreadPool}; use thread_pool::{self, Sender, Task, ThreadPool};
use tokio_core::reactor; use tokio_core::reactor;
use tokio_service::{NewService, Service}; use tokio_service::{NewService, Service};
#[cfg(feature = "tls")]
use native_tls_inner::TlsAcceptor;
/// Additional options to configure how the server operates. /// Additional options to configure how the server operates.
#[derive(Debug)] #[derive(Debug)]
@@ -91,23 +92,26 @@ impl fmt::Debug for Handle {
const CORE: &'static &'static str = &"Core { .. }"; const CORE: &'static &'static str = &"Core { .. }";
const SERVER: &'static &'static str = &"Box<Future<Item = (), Error = ()>>"; const SERVER: &'static &'static str = &"Box<Future<Item = (), Error = ()>>";
f.debug_struct("Handle").field("reactor", CORE) f.debug_struct("Handle")
.field("handle", &self.handle) .field("reactor", CORE)
.field("server", SERVER) .field("handle", &self.handle)
.finish() .field("server", SERVER)
.finish()
} }
} }
#[doc(hidden)] #[doc(hidden)]
pub fn listen<S, Req, Resp, E>(new_service: S, addr: SocketAddr, options: Options) pub fn listen<S, Req, Resp, E>(new_service: S,
-> io::Result<Handle> addr: SocketAddr,
options: Options)
-> io::Result<Handle>
where S: NewService<Request = Result<Req, bincode::Error>, where S: NewService<Request = Result<Req, bincode::Error>,
Response = Response<Resp, E>, Response = Response<Resp, E>,
Error = io::Error> + 'static, Error = io::Error> + 'static,
<S::Instance as Service>::Future: Send + 'static, <S::Instance as Service>::Future: Send + 'static,
S::Response: Send, S::Response: Send,
S::Error: Send, S::Error: Send,
Req: Deserialize + 'static, Req: DeserializeOwned + 'static,
Resp: Serialize + 'static, Resp: Serialize + 'static,
E: Serialize + 'static E: Serialize + 'static
{ {
@@ -124,42 +128,57 @@ pub fn listen<S, Req, Resp, E>(new_service: S, addr: SocketAddr, options: Option
} }
/// A service that uses a thread pool. /// A service that uses a thread pool.
struct NewThreadService<S> where S: NewService { struct NewThreadService<S>
where
S: NewService,
{
new_service: S, new_service: S,
sender: Sender<ServiceTask<<S::Instance as Service>::Future>>, sender: Sender<ServiceTask<<S::Instance as Service>::Future>>,
_pool: ThreadPool<ServiceTask<<S::Instance as Service>::Future>>, _pool: ThreadPool<ServiceTask<<S::Instance as Service>::Future>>,
} }
/// A service that runs by executing request handlers in a thread pool. /// A service that runs by executing request handlers in a thread pool.
struct ThreadService<S> where S: Service { struct ThreadService<S>
where
S: Service,
{
service: S, service: S,
sender: Sender<ServiceTask<S::Future>>, sender: Sender<ServiceTask<S::Future>>,
} }
/// A task that handles a single request. /// A task that handles a single request.
struct ServiceTask<F> where F: Future { struct ServiceTask<F>
where
F: Future,
{
future: F, future: F,
tx: oneshot::Sender<Result<F::Item, F::Error>>, tx: oneshot::Sender<Result<F::Item, F::Error>>,
} }
impl<S> NewThreadService<S> impl<S> NewThreadService<S>
where S: NewService, where
<S::Instance as Service>::Future: Send + 'static, S: NewService,
S::Response: Send, <S::Instance as Service>::Future: Send + 'static,
S::Error: Send, S::Response: Send,
S::Error: Send,
{ {
/// Create a NewThreadService by wrapping another service. /// Create a NewThreadService by wrapping another service.
fn new(new_service: S, pool: thread_pool::Builder) -> Self { fn new(new_service: S, pool: thread_pool::Builder) -> Self {
let (sender, _pool) = pool.build(); let (sender, _pool) = pool.build();
NewThreadService { new_service, sender, _pool } NewThreadService {
new_service,
sender,
_pool,
}
} }
} }
impl<S> NewService for NewThreadService<S> impl<S> NewService for NewThreadService<S>
where S: NewService, where
<S::Instance as Service>::Future: Send + 'static, S: NewService,
S::Response: Send, <S::Instance as Service>::Future: Send + 'static,
S::Error: Send, S::Response: Send,
S::Error: Send,
{ {
type Request = S::Request; type Request = S::Request;
type Response = S::Response; type Response = S::Response;
@@ -175,9 +194,10 @@ impl<S> NewService for NewThreadService<S>
} }
impl<F> Task for ServiceTask<F> impl<F> Task for ServiceTask<F>
where F: Future + Send + 'static, where
F::Item: Send, F: Future + Send + 'static,
F::Error: Send, F::Item: Send,
F::Error: Send,
{ {
fn run(self) { fn run(self) {
// Don't care if sending fails. It just means the request is no longer // Don't care if sending fails. It just means the request is no longer
@@ -187,34 +207,40 @@ impl<F> Task for ServiceTask<F>
} }
impl<S> Service for ThreadService<S> impl<S> Service for ThreadService<S>
where S: Service, where
S::Future: Send + 'static, S: Service,
S::Response: Send, S::Future: Send + 'static,
S::Error: Send, S::Response: Send,
S::Error: Send,
{ {
type Request = S::Request; type Request = S::Request;
type Response = S::Response; type Response = S::Response;
type Error = S::Error; type Error = S::Error;
type Future = type Future = futures::AndThen<
futures::AndThen< futures::MapErr<
futures::MapErr< oneshot::Receiver<Result<Self::Response, Self::Error>>,
oneshot::Receiver<Result<Self::Response, Self::Error>>, fn(oneshot::Canceled) -> Self::Error,
fn(oneshot::Canceled) -> Self::Error>, >,
Result<Self::Response, Self::Error>, Result<Self::Response, Self::Error>,
fn(Result<Self::Response, Self::Error>) -> Result<Self::Response, Self::Error>>; fn(Result<Self::Response, Self::Error>)
-> Result<Self::Response, Self::Error>,
>;
fn call(&self, request: Self::Request) -> Self::Future { fn call(&self, request: Self::Request) -> Self::Future {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.sender.send(ServiceTask { self.sender
future: self.service.call(request), .send(ServiceTask {
tx: tx, future: self.service.call(request),
}).unwrap(); tx: tx,
})
.unwrap();
rx.map_err(unreachable as _).and_then(ident) rx.map_err(unreachable as _).and_then(ident)
} }
} }
fn unreachable<T, U>(t: T) -> U fn unreachable<T, U>(t: T) -> U
where T: fmt::Display where
T: fmt::Display,
{ {
unreachable!(t) unreachable!(t)
} }
@@ -222,4 +248,3 @@ fn unreachable<T, U>(t: T) -> U
fn ident<T>(t: T) -> T { fn ident<T>(t: T) -> T {
t t
} }

View File

@@ -41,11 +41,10 @@ pub mod client {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
const TLS_CONNECTOR: &'static &'static str = &"TlsConnector { .. }"; const TLS_CONNECTOR: &'static &'static str = &"TlsConnector { .. }";
f.debug_struct("Context") f.debug_struct("Context")
.field("domain", &self.domain) .field("domain", &self.domain)
.field("tls_connector", TLS_CONNECTOR) .field("tls_connector", TLS_CONNECTOR)
.finish() .finish()
} }
} }
} }

View File

@@ -53,16 +53,18 @@ impl Stream for Never {
impl Serialize for Never { impl Serialize for Never {
fn serialize<S>(&self, _: S) -> Result<S::Ok, S::Error> fn serialize<S>(&self, _: S) -> Result<S::Ok, S::Error>
where S: Serializer where
S: Serializer,
{ {
self.0 self.0
} }
} }
// Please don't try to deserialize this. :( // Please don't try to deserialize this. :(
impl Deserialize for Never { impl<'a> Deserialize<'a> for Never {
fn deserialize<D>(_: D) -> Result<Self, D::Error> fn deserialize<D>(_: D) -> Result<Self, D::Error>
where D: Deserializer where
D: Deserializer<'a>,
{ {
panic!("Never cannot be instantiated!"); panic!("Never cannot be instantiated!");
} }
@@ -99,8 +101,10 @@ pub trait FirstSocketAddr: ToSocketAddrs {
if let Some(a) = self.to_socket_addrs()?.next() { if let Some(a) = self.to_socket_addrs()?.next() {
Ok(a) Ok(a)
} else { } else {
Err(io::Error::new(io::ErrorKind::AddrNotAvailable, Err(io::Error::new(
"`ToSocketAddrs::to_socket_addrs` returned an empty iterator.")) io::ErrorKind::AddrNotAvailable,
"`ToSocketAddrs::to_socket_addrs` returned an empty iterator.",
))
} }
} }
@@ -119,8 +123,9 @@ impl<A: ToSocketAddrs> FirstSocketAddr for A {}
/// on it, otherwise the callback never runs. Once run, however, this future is /// on it, otherwise the callback never runs. Once run, however, this future is
/// the same as the one the closure creates. /// the same as the one the closure creates.
pub fn lazy<F, A, R>(f: F, args: A) -> Lazy<F, A, R> pub fn lazy<F, A, R>(f: F, args: A) -> Lazy<F, A, R>
where F: FnOnce(A) -> R, where
R: IntoFuture F: FnOnce(A) -> R,
R: IntoFuture,
{ {
Lazy { Lazy {
inner: _Lazy::First(f, args), inner: _Lazy::First(f, args),
@@ -145,8 +150,9 @@ enum _Lazy<F, A, R> {
} }
impl<F, A, R> Lazy<F, A, R> impl<F, A, R> Lazy<F, A, R>
where F: FnOnce(A) -> R, where
R: IntoFuture, F: FnOnce(A) -> R,
R: IntoFuture,
{ {
fn get(&mut self) -> &mut R::Future { fn get(&mut self) -> &mut R::Future {
match self.inner { match self.inner {
@@ -166,8 +172,9 @@ impl<F, A, R> Lazy<F, A, R>
} }
impl<F, A, R> Future for Lazy<F, A, R> impl<F, A, R> Future for Lazy<F, A, R>
where F: FnOnce(A) -> R, where
R: IntoFuture, F: FnOnce(A) -> R,
R: IntoFuture,
{ {
type Item = R::Item; type Item = R::Item;
type Error = R::Error; type Error = R::Error;