11 Commits

Author SHA1 Message Date
Tim
9ce7938fdc Merge pull request #167 from tikue/master
* Fix breakage from nightly compiler changes.

* Remove unused imports

* Update TLS code to not use deprecated method

* Bump versions

Fixes #166
2017-09-17 17:58:50 -07:00
Tim Kuehn
650dc88da5 Bump versions 2017-09-17 16:02:28 -07:00
Tim Kuehn
3601763442 Update TLS code to not use deprecated method 2017-09-17 15:56:07 -07:00
Tim Kuehn
4aaaea1e04 Remove unused imports 2017-09-17 15:56:07 -07:00
Tim Kuehn
2e214c85d3 Fix breakage from nightly compiler changes.
Fixes #166
2017-09-17 15:56:07 -07:00
Cyril Plisko
0676ab67df Refresh dependencies (#164) 2017-08-14 14:31:47 -07:00
Tim
0b843512dd Use #[allow(unreachable_patterns)] to simplify some macro code. (#163)
By allowing unreachable patterns, we don't have to have 'NotIrrefutable' variants in the Request, Response, and Error enums.

This commit also removes an unused macro in an example.
2017-08-02 13:54:06 -07:00
Jesper Håkansson
85d9416750 doc: Update tarpc version in readme (#162) 2017-07-21 08:37:07 -07:00
Tim
5e3cf3c807 Run the new nightly cargo fmt (#156) 2017-07-18 14:54:46 -07:00
Tim
4dfb3a48c3 Derive Deserialize, Serialize in macros. Requires feature(use_extern_macros). (#151) 2017-07-18 12:04:56 -07:00
Tim
21e8883877 Update to serde 1.0 and bincode 0.8 (#149) 2017-05-05 22:16:44 -07:00
28 changed files with 681 additions and 676 deletions

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "tarpc" name = "tarpc"
version = "0.7.3" version = "0.9.0"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"] authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
license = "MIT" license = "MIT"
documentation = "https://docs.rs/tarpc" documentation = "https://docs.rs/tarpc"
@@ -15,7 +15,7 @@ description = "An RPC framework for Rust with a focus on ease of use."
travis-ci = { repository = "google/tarpc" } travis-ci = { repository = "google/tarpc" }
[dependencies] [dependencies]
bincode = "1.0.0-alpha6" bincode = "0.8"
byteorder = "1.0" byteorder = "1.0"
bytes = "0.4" bytes = "0.4"
cfg-if = "0.1.0" cfg-if = "0.1.0"
@@ -24,9 +24,9 @@ lazy_static = "0.2"
log = "0.3" log = "0.3"
net2 = "0.2" net2 = "0.2"
num_cpus = "1.0" num_cpus = "1.0"
serde = "0.9" serde = "1.0"
serde_derive = "0.9" serde_derive = "1.0"
tarpc-plugins = { path = "src/plugins", version = "0.1.1" } tarpc-plugins = { path = "src/plugins", version = "0.2.0" }
thread-pool = "0.1.1" thread-pool = "0.1.1"
tokio-core = "0.1.6" tokio-core = "0.1.6"
tokio-io = "0.1" tokio-io = "0.1"
@@ -38,10 +38,11 @@ native-tls = { version = "0.1.1", optional = true }
tokio-tls = { version = "0.1", optional = true } tokio-tls = { version = "0.1", optional = true }
[dev-dependencies] [dev-dependencies]
chrono = "0.3" chrono = "0.4"
env_logger = "0.3" env_logger = "0.4"
futures-cpupool = "0.1" futures-cpupool = "0.1"
clap = "2.0" clap = "2.0"
serde_bytes = "0.10"
[target.'cfg(target_os = "macos")'.dev-dependencies] [target.'cfg(target_os = "macos")'.dev-dependencies]
security-framework = "0.1" security-framework = "0.1"

View File

@@ -37,8 +37,8 @@ arguments to tarpc fns.
Add to your `Cargo.toml` dependencies: Add to your `Cargo.toml` dependencies:
```toml ```toml
tarpc = "0.7.2" tarpc = "0.9.0"
tarpc-plugins = "0.1.1" tarpc-plugins = "0.2.0"
``` ```
## Example: Sync ## Example: Sync
@@ -47,7 +47,7 @@ tarpc has two APIs: `sync` for blocking code and `future` for asynchronous
code. Here's how to use the sync api. code. Here's how to use the sync api.
```rust ```rust
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
#[macro_use] #[macro_use]
@@ -100,7 +100,7 @@ races! See the `tarpc_examples` package for more examples.
Here's the same service, implemented using futures. Here's the same service, implemented using futures.
```rust ```rust
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
extern crate futures; extern crate futures;
@@ -171,7 +171,7 @@ However, if you are working with both stream types, ensure that you use the TLS
servers and TCP clients with TCP servers. servers and TCP clients with TCP servers.
```rust,no_run ```rust,no_run
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
extern crate futures; extern crate futures;

View File

@@ -1,3 +1,16 @@
## 0.9.0 (2017-09-17)
## Breaking Changes
Updates tarpc to use tarpc-plugins 0.2.
## 0.8.0 (2017-05-05)
## Breaking Changes
This release updates tarpc to use serde 1.0.
As such, users must also update to use serde 1.0.
The serde 1.0 [release notes](https://github.com/serde-rs/serde/releases/tag/v1.0.0)
detail migration paths.
## 0.7.3 (2017-04-26) ## 0.7.3 (2017-04-26)
This release removes the `Sync` bound on RPC args for both sync and future This release removes the `Sync` bound on RPC args for both sync and future

View File

@@ -3,7 +3,7 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>. // Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms. // This file may not be copied, modified, or distributed except according to those terms.
#![feature(plugin, test)] #![feature(plugin, test, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
#[macro_use] #[macro_use]
@@ -41,13 +41,17 @@ fn latency(bencher: &mut Bencher) {
let _ = env_logger::init(); let _ = env_logger::init();
let mut reactor = reactor::Core::new().unwrap(); let mut reactor = reactor::Core::new().unwrap();
let (handle, server) = Server let (handle, server) = Server
.listen("localhost:0".first_socket_addr(), .listen(
&reactor.handle(), "localhost:0".first_socket_addr(),
server::Options::default()) &reactor.handle(),
server::Options::default(),
)
.unwrap(); .unwrap();
reactor.handle().spawn(server); reactor.handle().spawn(server);
let client = FutureClient::connect(handle.addr(), let client = FutureClient::connect(
client::Options::default().handle(reactor.handle())); handle.addr(),
client::Options::default().handle(reactor.handle()),
);
let client = reactor.run(client).unwrap(); let client = reactor.run(client).unwrap();
bencher.iter(|| reactor.run(client.ack()).unwrap()); bencher.iter(|| reactor.run(client.ack()).unwrap());

View File

@@ -3,7 +3,7 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>. // Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms. // This file may not be copied, modified, or distributed except according to those terms.
#![feature(inclusive_range_syntax, conservative_impl_trait, plugin, never_type)] #![feature(inclusive_range_syntax, conservative_impl_trait, plugin, never_type, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
extern crate chrono; extern crate chrono;
@@ -12,7 +12,7 @@ extern crate env_logger;
extern crate futures; extern crate futures;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
extern crate serde; extern crate serde_bytes;
#[macro_use] #[macro_use]
extern crate tarpc; extern crate tarpc;
extern crate tokio_core; extern crate tokio_core;
@@ -31,7 +31,7 @@ use tarpc::util::{FirstSocketAddr, Never};
use tokio_core::reactor; use tokio_core::reactor;
service! { service! {
rpc read(size: u32) -> serde::bytes::ByteBuf; rpc read(size: u32) -> serde_bytes::ByteBuf;
} }
#[derive(Clone)] #[derive(Clone)]
@@ -50,20 +50,19 @@ impl Server {
} }
impl FutureService for Server { impl FutureService for Server {
type ReadFut = CpuFuture<serde::bytes::ByteBuf, Never>; type ReadFut = CpuFuture<serde_bytes::ByteBuf, Never>;
fn read(&self, size: u32) -> Self::ReadFut { fn read(&self, size: u32) -> Self::ReadFut {
let request_number = self.request_count.fetch_add(1, Ordering::SeqCst); let request_number = self.request_count.fetch_add(1, Ordering::SeqCst);
debug!("Server received read({}) no. {}", size, request_number); debug!("Server received read({}) no. {}", size, request_number);
self.pool self.pool.spawn(futures::lazy(move || {
.spawn(futures::lazy(move || { let mut vec = Vec::with_capacity(size as usize);
let mut vec = Vec::with_capacity(size as usize); for i in 0..size {
for i in 0..size { vec.push(((i % 2) << 8) as u8);
vec.push(((i % 2) << 8) as u8); }
} debug!("Server sending response no. {}", request_number);
debug!("Server sending response no. {}", request_number); Ok(vec.into())
Ok(vec.into()) }))
}))
} }
} }
@@ -94,69 +93,81 @@ struct Stats {
fn spawn_core() -> reactor::Remote { fn spawn_core() -> reactor::Remote {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
let mut core = reactor::Core::new().unwrap(); let mut core = reactor::Core::new().unwrap();
tx.send(core.handle().remote().clone()).unwrap(); tx.send(core.handle().remote().clone()).unwrap();
// Run forever // Run forever
core.run(futures::empty::<(), !>()).unwrap(); core.run(futures::empty::<(), !>()).unwrap();
}); });
rx.recv().unwrap() rx.recv().unwrap()
} }
fn run_once(clients: Vec<FutureClient>, fn run_once(
concurrency: u32) clients: Vec<FutureClient>,
-> impl Future<Item = (), Error = ()> + 'static { concurrency: u32,
) -> impl Future<Item = (), Error = ()> + 'static {
let start = Instant::now(); let start = Instant::now();
futures::stream::futures_unordered((0..concurrency as usize) futures::stream::futures_unordered(
.zip(clients.iter().enumerate().cycle()) (0..concurrency as usize)
.map(|(iteration, (client_idx, client))| { .zip(clients.iter().enumerate().cycle())
let start = Instant::now(); .map(|(iteration, (client_idx, client))| {
debug!("Client {} reading (iteration {})...", client_idx, iteration); let start = Instant::now();
client debug!("Client {} reading (iteration {})...", client_idx, iteration);
.read(CHUNK_SIZE) client
.map(move |_| (client_idx, iteration, start)) .read(CHUNK_SIZE)
})) .map(move |_| (client_idx, iteration, start))
.map(|(client_idx, iteration, start)| { }),
let elapsed = start.elapsed(); ).map(|(client_idx, iteration, start)| {
debug!("Client {} received reply (iteration {}).", let elapsed = start.elapsed();
client_idx, debug!(
iteration); "Client {} received reply (iteration {}).",
elapsed client_idx,
}) iteration
.map_err(|e| panic!(e)) );
.fold(Stats::default(), move |mut stats, elapsed| { elapsed
stats.sum += elapsed; })
stats.count += 1; .map_err(|e| panic!(e))
stats.min = Some(cmp::min(stats.min.unwrap_or(elapsed), elapsed)); .fold(Stats::default(), move |mut stats, elapsed| {
stats.max = Some(cmp::max(stats.max.unwrap_or(elapsed), elapsed)); stats.sum += elapsed;
Ok(stats) stats.count += 1;
}) stats.min = Some(cmp::min(stats.min.unwrap_or(elapsed), elapsed));
.map(move |stats| { stats.max = Some(cmp::max(stats.max.unwrap_or(elapsed), elapsed));
info!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs", Ok(stats)
stats.count, })
stats.sum.microseconds() as f64 / stats.count as f64, .map(move |stats| {
stats.min.unwrap().microseconds(), info!(
stats.max.unwrap().microseconds(), "{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs",
start.elapsed().microseconds()); stats.count,
}) stats.sum.microseconds() as f64 / stats.count as f64,
stats.min.unwrap().microseconds(),
stats.max.unwrap().microseconds(),
start.elapsed().microseconds()
);
})
} }
fn main() { fn main() {
let _ = env_logger::init(); let _ = env_logger::init();
let matches = App::new("Tarpc Concurrency") let matches = App::new("Tarpc Concurrency")
.about("Demonstrates making concurrent requests to a tarpc service.") .about(
.arg(Arg::with_name("concurrency") "Demonstrates making concurrent requests to a tarpc service.",
.short("c") )
.long("concurrency") .arg(
.value_name("LEVEL") Arg::with_name("concurrency")
.help("Sets a custom concurrency level") .short("c")
.takes_value(true)) .long("concurrency")
.arg(Arg::with_name("clients") .value_name("LEVEL")
.short("n") .help("Sets a custom concurrency level")
.long("num_clients") .takes_value(true),
.value_name("AMOUNT") )
.help("How many clients to distribute requests between") .arg(
.takes_value(true)) Arg::with_name("clients")
.short("n")
.long("num_clients")
.value_name("AMOUNT")
.help("How many clients to distribute requests between")
.takes_value(true),
)
.get_matches(); .get_matches();
let concurrency = matches let concurrency = matches
.value_of("concurrency") .value_of("concurrency")
@@ -171,9 +182,11 @@ fn main() {
let mut reactor = reactor::Core::new().unwrap(); let mut reactor = reactor::Core::new().unwrap();
let (handle, server) = Server::new() let (handle, server) = Server::new()
.listen("localhost:0".first_socket_addr(), .listen(
&reactor.handle(), "localhost:0".first_socket_addr(),
server::Options::default()) &reactor.handle(),
server::Options::default(),
)
.unwrap(); .unwrap();
reactor.handle().spawn(server); reactor.handle().spawn(server);
info!("Server listening on {}.", handle.addr()); info!("Server listening on {}.", handle.addr());

View File

@@ -3,7 +3,7 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>. // Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms. // This file may not be copied, modified, or distributed except according to those terms.
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
extern crate env_logger; extern crate env_logger;
@@ -74,7 +74,9 @@ struct Publisher {
impl Publisher { impl Publisher {
fn new() -> Publisher { fn new() -> Publisher {
Publisher { clients: Rc::new(RefCell::new(HashMap::new())) } Publisher {
clients: Rc::new(RefCell::new(HashMap::new())),
}
} }
} }
@@ -99,13 +101,15 @@ impl publisher::FutureService for Publisher {
fn subscribe(&self, id: u32, address: SocketAddr) -> Self::SubscribeFut { fn subscribe(&self, id: u32, address: SocketAddr) -> Self::SubscribeFut {
let clients = self.clients.clone(); let clients = self.clients.clone();
Box::new(subscriber::FutureClient::connect(address, client::Options::default()) Box::new(
.map(move |subscriber| { subscriber::FutureClient::connect(address, client::Options::default())
println!("Subscribing {}.", id); .map(move |subscriber| {
clients.borrow_mut().insert(id, subscriber); println!("Subscribing {}.", id);
() clients.borrow_mut().insert(id, subscriber);
}) ()
.map_err(|e| e.to_string().into())) })
.map_err(|e| e.to_string().into()),
)
} }
type UnsubscribeFut = Box<Future<Item = (), Error = Never>>; type UnsubscribeFut = Box<Future<Item = (), Error = Never>>;
@@ -121,9 +125,11 @@ fn main() {
let _ = env_logger::init(); let _ = env_logger::init();
let mut reactor = reactor::Core::new().unwrap(); let mut reactor = reactor::Core::new().unwrap();
let (publisher_handle, server) = Publisher::new() let (publisher_handle, server) = Publisher::new()
.listen("localhost:0".first_socket_addr(), .listen(
&reactor.handle(), "localhost:0".first_socket_addr(),
server::Options::default()) &reactor.handle(),
server::Options::default(),
)
.unwrap(); .unwrap();
reactor.handle().spawn(server); reactor.handle().spawn(server);
@@ -131,19 +137,24 @@ fn main() {
let subscriber2 = Subscriber::listen(1, &reactor.handle(), server::Options::default()); let subscriber2 = Subscriber::listen(1, &reactor.handle(), server::Options::default());
let publisher = reactor let publisher = reactor
.run(publisher::FutureClient::connect(publisher_handle.addr(), client::Options::default())) .run(publisher::FutureClient::connect(
publisher_handle.addr(),
client::Options::default(),
))
.unwrap(); .unwrap();
reactor reactor
.run(publisher .run(
.subscribe(0, subscriber1.addr()) publisher
.and_then(|_| publisher.subscribe(1, subscriber2.addr())) .subscribe(0, subscriber1.addr())
.map_err(|e| panic!(e)) .and_then(|_| publisher.subscribe(1, subscriber2.addr()))
.and_then(|_| { .map_err(|e| panic!(e))
println!("Broadcasting..."); .and_then(|_| {
publisher.broadcast("hello to all".to_string()) println!("Broadcasting...");
}) publisher.broadcast("hello to all".to_string())
.and_then(|_| publisher.unsubscribe(1)) })
.and_then(|_| publisher.broadcast("hi again".to_string()))) .and_then(|_| publisher.unsubscribe(1))
.and_then(|_| publisher.broadcast("hi again".to_string())),
)
.unwrap(); .unwrap();
thread::sleep(Duration::from_millis(300)); thread::sleep(Duration::from_millis(300));
} }

View File

@@ -3,15 +3,13 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>. // Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms. // This file may not be copied, modified, or distributed except according to those terms.
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
extern crate futures;
#[macro_use] #[macro_use]
extern crate tarpc; extern crate tarpc;
#[macro_use] #[macro_use]
extern crate serde_derive; extern crate serde_derive;
extern crate tokio_core;
use std::error::Error; use std::error::Error;
use std::fmt; use std::fmt;
@@ -55,12 +53,12 @@ impl SyncService for HelloServer {
fn main() { fn main() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
let handle = HelloServer let handle = HelloServer
.listen("localhost:10000", server::Options::default()) .listen("localhost:10000", server::Options::default())
.unwrap(); .unwrap();
tx.send(handle.addr()).unwrap(); tx.send(handle.addr()).unwrap();
handle.run(); handle.run();
}); });
let client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap(); let client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap();
println!("{}", client.hello("Mom".to_string()).unwrap()); println!("{}", client.hello("Mom".to_string()).unwrap());
println!("{}", client.hello("".to_string()).unwrap_err()); println!("{}", client.hello("".to_string()).unwrap_err());

View File

@@ -3,7 +3,7 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>. // Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms. // This file may not be copied, modified, or distributed except according to those terms.
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
extern crate futures; extern crate futures;
@@ -35,17 +35,21 @@ impl FutureService for HelloServer {
fn main() { fn main() {
let mut reactor = reactor::Core::new().unwrap(); let mut reactor = reactor::Core::new().unwrap();
let (handle, server) = HelloServer let (handle, server) = HelloServer
.listen("localhost:10000".first_socket_addr(), .listen(
&reactor.handle(), "localhost:10000".first_socket_addr(),
server::Options::default()) &reactor.handle(),
server::Options::default(),
)
.unwrap(); .unwrap();
reactor.handle().spawn(server); reactor.handle().spawn(server);
let options = client::Options::default().handle(reactor.handle()); let options = client::Options::default().handle(reactor.handle());
reactor reactor
.run(FutureClient::connect(handle.addr(), options) .run(
.map_err(tarpc::Error::from) FutureClient::connect(handle.addr(), options)
.and_then(|client| client.hello("Mom".to_string())) .map_err(tarpc::Error::from)
.map(|resp| println!("{}", resp))) .and_then(|client| client.hello("Mom".to_string()))
.map(|resp| println!("{}", resp)),
)
.unwrap(); .unwrap();
} }

View File

@@ -4,13 +4,11 @@
// This file may not be copied, modified, or distributed except according to those terms. // This file may not be copied, modified, or distributed except according to those terms.
// required by `FutureClient` (not used directly in this example) // required by `FutureClient` (not used directly in this example)
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
extern crate futures;
#[macro_use] #[macro_use]
extern crate tarpc; extern crate tarpc;
extern crate tokio_core;
use std::sync::mpsc; use std::sync::mpsc;
use std::thread; use std::thread;
@@ -27,21 +25,23 @@ struct HelloServer;
impl SyncService for HelloServer { impl SyncService for HelloServer {
fn hello(&self, name: String) -> Result<String, Never> { fn hello(&self, name: String) -> Result<String, Never> {
Ok(format!("Hello from thread {}, {}!", Ok(format!(
thread::current().name().unwrap(), "Hello from thread {}, {}!",
name)) thread::current().name().unwrap(),
name
))
} }
} }
fn main() { fn main() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
let handle = HelloServer let handle = HelloServer
.listen("localhost:0", server::Options::default()) .listen("localhost:0", server::Options::default())
.unwrap(); .unwrap();
tx.send(handle.addr()).unwrap(); tx.send(handle.addr()).unwrap();
handle.run(); handle.run();
}); });
let client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap(); let client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap();
println!("{}", client.hello("Mom".to_string()).unwrap()); println!("{}", client.hello("Mom".to_string()).unwrap());
} }

View File

@@ -3,7 +3,7 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>. // Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms. // This file may not be copied, modified, or distributed except according to those terms.
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
extern crate env_logger; extern crate env_logger;
@@ -73,9 +73,11 @@ fn main() {
let _ = env_logger::init(); let _ = env_logger::init();
let mut reactor = reactor::Core::new().unwrap(); let mut reactor = reactor::Core::new().unwrap();
let (add, server) = AddServer let (add, server) = AddServer
.listen("localhost:0".first_socket_addr(), .listen(
&reactor.handle(), "localhost:0".first_socket_addr(),
server::Options::default()) &reactor.handle(),
server::Options::default(),
)
.unwrap(); .unwrap();
reactor.handle().spawn(server); reactor.handle().spawn(server);
@@ -85,21 +87,28 @@ fn main() {
.unwrap(); .unwrap();
let (double, server) = DoubleServer::new(add_client) let (double, server) = DoubleServer::new(add_client)
.listen("localhost:0".first_socket_addr(), .listen(
&reactor.handle(), "localhost:0".first_socket_addr(),
server::Options::default()) &reactor.handle(),
server::Options::default(),
)
.unwrap(); .unwrap();
reactor.handle().spawn(server); reactor.handle().spawn(server);
let double_client = reactor let double_client = reactor
.run(double::FutureClient::connect(double.addr(), client::Options::default())) .run(double::FutureClient::connect(
double.addr(),
client::Options::default(),
))
.unwrap(); .unwrap();
reactor reactor
.run(futures::stream::futures_unordered((0..5).map(|i| double_client.double(i))) .run(
.map_err(|e| println!("{}", e)) futures::stream::futures_unordered((0..5).map(|i| double_client.double(i)))
.for_each(|i| { .map_err(|e| println!("{}", e))
println!("{:?}", i); .for_each(|i| {
Ok(()) println!("{:?}", i);
})) Ok(())
}),
)
.unwrap(); .unwrap();
} }

View File

@@ -3,14 +3,12 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>. // Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms. // This file may not be copied, modified, or distributed except according to those terms.
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
extern crate env_logger; extern crate env_logger;
#[macro_use] #[macro_use]
extern crate tarpc; extern crate tarpc;
extern crate futures;
extern crate tokio_core;
use add::{SyncService as AddSyncService, SyncServiceExt as AddExt}; use add::{SyncService as AddSyncService, SyncServiceExt as AddExt};
use double::{SyncService as DoubleSyncService, SyncServiceExt as DoubleExt}; use double::{SyncService as DoubleSyncService, SyncServiceExt as DoubleExt};
@@ -66,13 +64,15 @@ fn main() {
let _ = env_logger::init(); let _ = env_logger::init();
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
let handle = AddServer let handle = AddServer
.listen("localhost:0".first_socket_addr(), .listen(
server::Options::default()) "localhost:0".first_socket_addr(),
.unwrap(); server::Options::default(),
tx.send(handle.addr()).unwrap(); )
handle.run(); .unwrap();
}); tx.send(handle.addr()).unwrap();
handle.run();
});
let add = rx.recv().unwrap(); let add = rx.recv().unwrap();
@@ -80,8 +80,10 @@ fn main() {
thread::spawn(move || { thread::spawn(move || {
let add_client = add::SyncClient::connect(add, client::Options::default()).unwrap(); let add_client = add::SyncClient::connect(add, client::Options::default()).unwrap();
let handle = DoubleServer::new(add_client) let handle = DoubleServer::new(add_client)
.listen("localhost:0".first_socket_addr(), .listen(
server::Options::default()) "localhost:0".first_socket_addr(),
server::Options::default(),
)
.unwrap(); .unwrap();
tx.send(handle.addr()).unwrap(); tx.send(handle.addr()).unwrap();
handle.run(); handle.run();

View File

@@ -3,7 +3,7 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>. // Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms. // This file may not be copied, modified, or distributed except according to those terms.
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
#[macro_use] #[macro_use]
@@ -11,13 +11,11 @@ extern crate lazy_static;
#[macro_use] #[macro_use]
extern crate tarpc; extern crate tarpc;
extern crate env_logger; extern crate env_logger;
extern crate futures; extern crate serde_bytes;
extern crate serde;
extern crate tokio_core; extern crate tokio_core;
use std::io::{Read, Write, stdout}; use std::io::{Read, Write, stdout};
use std::net; use std::net;
use std::sync::Arc;
use std::sync::mpsc; use std::sync::mpsc;
use std::thread; use std::thread;
use std::time; use std::time;
@@ -27,7 +25,7 @@ use tarpc::util::{FirstSocketAddr, Never};
use tokio_core::reactor; use tokio_core::reactor;
lazy_static! { lazy_static! {
static ref BUF: Arc<serde::bytes::ByteBuf> = Arc::new(gen_vec(CHUNK_SIZE as usize).into()); static ref BUF: serde_bytes::ByteBuf = gen_vec(CHUNK_SIZE as usize).into();
} }
fn gen_vec(size: usize) -> Vec<u8> { fn gen_vec(size: usize) -> Vec<u8> {
@@ -39,14 +37,14 @@ fn gen_vec(size: usize) -> Vec<u8> {
} }
service! { service! {
rpc read() -> Arc<serde::bytes::ByteBuf>; rpc read() -> serde_bytes::ByteBuf;
} }
#[derive(Clone)] #[derive(Clone)]
struct Server; struct Server;
impl FutureService for Server { impl FutureService for Server {
type ReadFut = Result<Arc<serde::bytes::ByteBuf>, Never>; type ReadFut = Result<serde_bytes::ByteBuf, Never>;
fn read(&self) -> Self::ReadFut { fn read(&self) -> Self::ReadFut {
Ok(BUF.clone()) Ok(BUF.clone())
@@ -60,15 +58,17 @@ fn bench_tarpc(target: u64) {
thread::spawn(move || { thread::spawn(move || {
let mut reactor = reactor::Core::new().unwrap(); let mut reactor = reactor::Core::new().unwrap();
let (addr, server) = Server let (addr, server) = Server
.listen("localhost:0".first_socket_addr(), .listen(
&reactor.handle(), "localhost:0".first_socket_addr(),
server::Options::default()) &reactor.handle(),
server::Options::default(),
)
.unwrap(); .unwrap();
tx.send(addr).unwrap(); tx.send(addr).unwrap();
reactor.run(server).unwrap(); reactor.run(server).unwrap();
}); });
let client = SyncClient::connect(rx.recv().unwrap().addr(), client::Options::default()) let client =
.unwrap(); SyncClient::connect(rx.recv().unwrap().addr(), client::Options::default()).unwrap();
let start = time::Instant::now(); let start = time::Instant::now();
let mut nread = 0; let mut nread = 0;
while nread < target { while nread < target {
@@ -78,18 +78,20 @@ fn bench_tarpc(target: u64) {
} }
println!("done"); println!("done");
let duration = time::Instant::now() - start; let duration = time::Instant::now() - start;
println!("TARPC: {}MB/s", println!(
(target as f64 / (1024f64 * 1024f64)) / "TARPC: {}MB/s",
(duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 10E9)); (target as f64 / (1024f64 * 1024f64)) /
(duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 10E9)
);
} }
fn bench_tcp(target: u64) { fn bench_tcp(target: u64) {
let l = net::TcpListener::bind("localhost:0").unwrap(); let l = net::TcpListener::bind("localhost:0").unwrap();
let addr = l.local_addr().unwrap(); let addr = l.local_addr().unwrap();
thread::spawn(move || { thread::spawn(move || {
let (mut stream, _) = l.accept().unwrap(); let (mut stream, _) = l.accept().unwrap();
while let Ok(_) = stream.write_all(&*BUF) {} while let Ok(_) = stream.write_all(&*BUF) {}
}); });
let mut stream = net::TcpStream::connect(&addr).unwrap(); let mut stream = net::TcpStream::connect(&addr).unwrap();
let mut buf = vec![0; CHUNK_SIZE as usize]; let mut buf = vec![0; CHUNK_SIZE as usize];
let start = time::Instant::now(); let start = time::Instant::now();
@@ -102,9 +104,11 @@ fn bench_tcp(target: u64) {
} }
println!("done"); println!("done");
let duration = time::Instant::now() - start; let duration = time::Instant::now() - start;
println!("TCP: {}MB/s", println!(
(target as f64 / (1024f64 * 1024f64)) / "TCP: {}MB/s",
(duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 10E9)); (target as f64 / (1024f64 * 1024f64)) /
(duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 10E9)
);
} }
fn main() { fn main() {

View File

@@ -3,16 +3,14 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>. // Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms. // This file may not be copied, modified, or distributed except according to those terms.
#![feature(plugin)] #![feature(plugin, use_extern_macros)]
#![plugin(tarpc_plugins)] #![plugin(tarpc_plugins)]
#[macro_use] #[macro_use]
extern crate log; extern crate log;
#[macro_use] #[macro_use]
extern crate tarpc; extern crate tarpc;
extern crate bincode;
extern crate env_logger; extern crate env_logger;
extern crate futures;
extern crate tokio_core; extern crate tokio_core;
use bar::FutureServiceExt as BarExt; use bar::FutureServiceExt as BarExt;
@@ -57,20 +55,17 @@ impl baz::FutureService for Baz {
} }
} }
macro_rules! pos {
() => (concat!(file!(), ":", line!()))
}
fn main() { fn main() {
let _ = env_logger::init(); let _ = env_logger::init();
let bar_client = { let bar_client = {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
let mut reactor = reactor::Core::new().unwrap(); let mut reactor = reactor::Core::new().unwrap();
let (handle, server) = Bar.listen("localhost:0".first_socket_addr(), let (handle, server) = Bar.listen(
&reactor.handle(), "localhost:0".first_socket_addr(),
server::Options::default()) &reactor.handle(),
.unwrap(); server::Options::default(),
).unwrap();
tx.send(handle).unwrap(); tx.send(handle).unwrap();
reactor.run(server).unwrap(); reactor.run(server).unwrap();
}); });
@@ -82,10 +77,11 @@ fn main() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
let mut reactor = reactor::Core::new().unwrap(); let mut reactor = reactor::Core::new().unwrap();
let (handle, server) = Baz.listen("localhost:0".first_socket_addr(), let (handle, server) = Baz.listen(
&reactor.handle(), "localhost:0".first_socket_addr(),
server::Options::default()) &reactor.handle(),
.unwrap(); server::Options::default(),
).unwrap();
tx.send(handle).unwrap(); tx.send(handle).unwrap();
reactor.run(server).unwrap(); reactor.run(server).unwrap();
}); });

View File

@@ -67,7 +67,7 @@ else
fi fi
printf "${PREFIX} Checking for rustfmt ... " printf "${PREFIX} Checking for rustfmt ... "
command -v rustfmt &>/dev/null command -v cargo fmt &>/dev/null
if [ $? == 0 ]; then if [ $? == 0 ]; then
printf "${SUCCESS}\n" printf "${SUCCESS}\n"
else else
@@ -93,7 +93,7 @@ diff=""
for file in $(git diff --name-only --cached); for file in $(git diff --name-only --cached);
do do
if [ ${file: -3} == ".rs" ]; then if [ ${file: -3} == ".rs" ]; then
diff="$diff$(rustfmt --skip-children --write-mode=diff $file)" diff="$diff$(cargo fmt -- --skip-children --write-mode=diff $file)"
fi fi
done done
if grep --quiet "^Diff at line" <<< "$diff"; then if grep --quiet "^Diff at line" <<< "$diff"; then

View File

@@ -29,7 +29,7 @@ pub enum Error<E> {
App(E), App(E),
} }
impl<E: StdError + Deserialize + Serialize + Send + 'static> fmt::Display for Error<E> { impl<'a, E: StdError + Deserialize<'a> + Serialize + Send + 'static> fmt::Display for Error<E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self { match *self {
Error::ResponseDeserialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e), Error::ResponseDeserialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e),
@@ -40,7 +40,7 @@ impl<E: StdError + Deserialize + Serialize + Send + 'static> fmt::Display for Er
} }
} }
impl<E: StdError + Deserialize + Serialize + Send + 'static> StdError for Error<E> { impl<'a, E: StdError + Deserialize<'a> + Serialize + Send + 'static> StdError for Error<E> {
fn description(&self) -> &str { fn description(&self) -> &str {
match *self { match *self {
Error::ResponseDeserialize(_) => "The client failed to deserialize the response.", Error::ResponseDeserialize(_) => "The client failed to deserialize the response.",
@@ -53,8 +53,7 @@ impl<E: StdError + Deserialize + Serialize + Send + 'static> StdError for Error<
fn cause(&self) -> Option<&StdError> { fn cause(&self) -> Option<&StdError> {
match *self { match *self {
Error::ResponseDeserialize(ref e) => e.cause(), Error::ResponseDeserialize(ref e) => e.cause(),
Error::RequestDeserialize(_) | Error::RequestDeserialize(_) | Error::App(_) => None,
Error::App(_) => None,
Error::Io(ref e) => e.cause(), Error::Io(ref e) => e.cause(),
} }
} }

View File

@@ -7,7 +7,8 @@ use {REMOTE, bincode};
use future::server::Response; use future::server::Response;
use futures::{self, Future, future}; use futures::{self, Future, future};
use protocol::Proto; use protocol::Proto;
use serde::{Deserialize, Serialize}; use serde::Serialize;
use serde::de::DeserializeOwned;
use std::fmt; use std::fmt;
use std::io; use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
@@ -102,27 +103,32 @@ impl fmt::Debug for Reactor {
} }
#[doc(hidden)] #[doc(hidden)]
pub struct Client<Req, Resp, E> pub struct Client<Req, Resp, E>
where Req: Serialize + 'static, where
Resp: Deserialize + 'static, Req: Serialize + 'static,
E: Deserialize + 'static Resp: DeserializeOwned + 'static,
E: DeserializeOwned + 'static,
{ {
inner: ClientService<StreamType, Proto<Req, Response<Resp, E>>>, inner: ClientService<StreamType, Proto<Req, Response<Resp, E>>>,
} }
impl<Req, Resp, E> Clone for Client<Req, Resp, E> impl<Req, Resp, E> Clone for Client<Req, Resp, E>
where Req: Serialize + 'static, where
Resp: Deserialize + 'static, Req: Serialize + 'static,
E: Deserialize + 'static Resp: DeserializeOwned + 'static,
E: DeserializeOwned + 'static,
{ {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Client { inner: self.inner.clone() } Client {
inner: self.inner.clone(),
}
} }
} }
impl<Req, Resp, E> Service for Client<Req, Resp, E> impl<Req, Resp, E> Service for Client<Req, Resp, E>
where Req: Serialize + Send + 'static, where
Resp: Deserialize + Send + 'static, Req: Serialize + Send + 'static,
E: Deserialize + Send + 'static Resp: DeserializeOwned + Send + 'static,
E: DeserializeOwned + Send + 'static,
{ {
type Request = Req; type Request = Req;
type Response = Resp; type Response = Resp;
@@ -142,14 +148,16 @@ impl<Req, Resp, E> Service for Client<Req, Resp, E>
} }
impl<Req, Resp, E> Client<Req, Resp, E> impl<Req, Resp, E> Client<Req, Resp, E>
where Req: Serialize + 'static, where
Resp: Deserialize + 'static, Req: Serialize + 'static,
E: Deserialize + 'static Resp: DeserializeOwned + 'static,
E: DeserializeOwned + 'static,
{ {
fn bind(handle: &reactor::Handle, tcp: StreamType, max_payload_size: u64) -> Self fn bind(handle: &reactor::Handle, tcp: StreamType, max_payload_size: u64) -> Self
where Req: Serialize + Send + 'static, where
Resp: Deserialize + Send + 'static, Req: Serialize + Send + 'static,
E: Deserialize + Send + 'static Resp: DeserializeOwned + Send + 'static,
E: DeserializeOwned + Send + 'static,
{ {
let inner = Proto::new(max_payload_size).bind_client(&handle, tcp); let inner = Proto::new(max_payload_size).bind_client(&handle, tcp);
Client { inner } Client { inner }
@@ -163,9 +171,10 @@ impl<Req, Resp, E> Client<Req, Resp, E>
} }
impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E> impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E>
where Req: Serialize + 'static, where
Resp: Deserialize + 'static, Req: Serialize + 'static,
E: Deserialize + 'static Resp: DeserializeOwned + 'static,
E: DeserializeOwned + 'static,
{ {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "Client {{ .. }}") write!(f, "Client {{ .. }}")
@@ -182,14 +191,18 @@ pub trait ClientExt: Sized {
} }
/// A future that resolves to a `Client` or an `io::Error`. /// A future that resolves to a `Client` or an `io::Error`.
pub type ConnectFuture<Req, Resp, E> = pub type ConnectFuture<Req, Resp, E> = futures::Flatten<
futures::Flatten<futures::MapErr<futures::Oneshot<io::Result<Client<Req, Resp, E>>>, futures::MapErr<
fn(futures::Canceled) -> io::Error>>; futures::Oneshot<io::Result<Client<Req, Resp, E>>>,
fn(futures::Canceled) -> io::Error,
>,
>;
impl<Req, Resp, E> ClientExt for Client<Req, Resp, E> impl<Req, Resp, E> ClientExt for Client<Req, Resp, E>
where Req: Serialize + Send + 'static, where
Resp: Deserialize + Send + 'static, Req: Serialize + Send + 'static,
E: Deserialize + Send + 'static Resp: DeserializeOwned + Send + 'static,
E: DeserializeOwned + Send + 'static,
{ {
type ConnectFut = ConnectFuture<Req, Resp, E>; type ConnectFut = ConnectFuture<Req, Resp, E>;
@@ -212,16 +225,17 @@ impl<Req, Resp, E> ClientExt for Client<Req, Resp, E>
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
match tls_ctx { match tls_ctx {
Some(tls_ctx) => { Some(tls_ctx) => {
future::Either::A(tls_ctx future::Either::A(
.tls_connector tls_ctx
.connect_async(&tls_ctx.domain, socket) .tls_connector
.map(StreamType::Tls) .connect_async(&tls_ctx.domain, socket)
.map_err(native_to_io)) .map(StreamType::Tls)
.map_err(native_to_io),
)
} }
None => future::Either::B(future::ok(StreamType::Tcp(socket))), None => future::Either::B(future::ok(StreamType::Tcp(socket))),
} }
#[cfg(not(feature = "tls"))] #[cfg(not(feature = "tls"))] future::ok(StreamType::Tcp(socket))
future::ok(StreamType::Tcp(socket))
}) })
.map(move |tcp| Client::bind(&handle2, tcp, max_payload_size)) .map(move |tcp| Client::bind(&handle2, tcp, max_payload_size))
}; };

View File

@@ -69,8 +69,8 @@ impl<S: NewService> NewService for TrackingNewService<S> {
fn new_service(&self) -> io::Result<Self::Instance> { fn new_service(&self) -> io::Result<Self::Instance> {
self.connection_tracker.increment(); self.connection_tracker.increment();
Ok(TrackingService { Ok(TrackingService {
service: self.new_service.new_service()?, service: self.new_service.new_service()?,
tracker: self.connection_tracker.clone(), tracker: self.connection_tracker.clone(),
}) })
} }
} }

View File

@@ -7,7 +7,8 @@ use {bincode, net2};
use errors::WireError; use errors::WireError;
use futures::{Async, Future, Poll, Stream, future as futures}; use futures::{Async, Future, Poll, Stream, future as futures};
use protocol::Proto; use protocol::Proto;
use serde::{Deserialize, Serialize}; use serde::Serialize;
use serde::de::DeserializeOwned;
use std::fmt; use std::fmt;
use std::io; use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
@@ -58,10 +59,13 @@ enum Acceptor {
struct Accept { struct Accept {
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
inner: futures::Either<futures::MapErr<futures::Map<AcceptAsync<TcpStream>, inner: futures::Either<
fn(TlsStream<TcpStream>) -> StreamType>, futures::MapErr<
fn(native_tls::Error) -> io::Error>, futures::Map<AcceptAsync<TcpStream>, fn(TlsStream<TcpStream>) -> StreamType>,
futures::FutureResult<StreamType, io::Error>>, fn(native_tls::Error) -> io::Error,
>,
futures::FutureResult<StreamType, io::Error>,
>,
#[cfg(not(feature = "tls"))] #[cfg(not(feature = "tls"))]
inner: futures::FutureResult<StreamType, io::Error>, inner: futures::FutureResult<StreamType, io::Error>,
} }
@@ -82,10 +86,12 @@ impl Acceptor {
Accept { Accept {
inner: match *self { inner: match *self {
Acceptor::Tls(ref tls_acceptor) => { Acceptor::Tls(ref tls_acceptor) => {
futures::Either::A(tls_acceptor futures::Either::A(
.accept_async(socket) tls_acceptor
.map(StreamType::Tls as _) .accept_async(socket)
.map_err(native_to_io)) .map(StreamType::Tls as _)
.map_err(native_to_io),
)
} }
Acceptor::Tcp => futures::Either::B(futures::ok(StreamType::Tcp(socket))), Acceptor::Tcp => futures::Either::B(futures::ok(StreamType::Tcp(socket))),
}, },
@@ -94,7 +100,9 @@ impl Acceptor {
#[cfg(not(feature = "tls"))] #[cfg(not(feature = "tls"))]
fn accept(&self, socket: TcpStream) -> Accept { fn accept(&self, socket: TcpStream) -> Accept {
Accept { inner: futures::ok(StreamType::Tcp(socket)) } Accept {
inner: futures::ok(StreamType::Tcp(socket)),
}
} }
} }
@@ -143,7 +151,8 @@ struct AcceptStream<S> {
} }
impl<S> Stream for AcceptStream<S> impl<S> Stream for AcceptStream<S>
where S: Stream<Item = (TcpStream, SocketAddr), Error = io::Error> where
S: Stream<Item = (TcpStream, SocketAddr), Error = io::Error>,
{ {
type Item = <Accept as Future>::Item; type Item = <Accept as Future>::Item;
type Error = io::Error; type Error = io::Error;
@@ -182,7 +191,9 @@ pub struct Options {
impl Default for Options { impl Default for Options {
#[cfg(not(feature = "tls"))] #[cfg(not(feature = "tls"))]
fn default() -> Self { fn default() -> Self {
Options { max_payload_size: 2 << 20 } Options {
max_payload_size: 2 << 20,
}
} }
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
@@ -218,12 +229,14 @@ impl fmt::Debug for Options {
let mut debug_struct = fmt.debug_struct("Options"); let mut debug_struct = fmt.debug_struct("Options");
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
debug_struct.field("tls_acceptor", debug_struct.field(
if self.tls_acceptor.is_some() { "tls_acceptor",
SOME if self.tls_acceptor.is_some() {
} else { SOME
NONE } else {
}); NONE
},
);
debug_struct.finish() debug_struct.finish()
} }
} }
@@ -241,20 +254,24 @@ pub fn listen<S, Req, Resp, E>(new_service: S,
where S: NewService<Request = Result<Req, bincode::Error>, where S: NewService<Request = Result<Req, bincode::Error>,
Response = Response<Resp, E>, Response = Response<Resp, E>,
Error = io::Error> + 'static, Error = io::Error> + 'static,
Req: Deserialize + 'static, Req: DeserializeOwned + 'static,
Resp: Serialize + 'static, Resp: Serialize + 'static,
E: Serialize + 'static E: Serialize + 'static
{ {
let (addr, shutdown, server) = listen_with(new_service, let (addr, shutdown, server) = listen_with(
addr, new_service,
handle, addr,
options.max_payload_size, handle,
Acceptor::from(options))?; options.max_payload_size,
Ok((Handle { Acceptor::from(options),
)?;
Ok((
Handle {
addr: addr, addr: addr,
shutdown: shutdown, shutdown: shutdown,
}, },
server)) server,
))
} }
/// Spawns a service that binds to the given address using the given handle. /// Spawns a service that binds to the given address using the given handle.
@@ -267,7 +284,7 @@ fn listen_with<S, Req, Resp, E>(new_service: S,
where S: NewService<Request = Result<Req, bincode::Error>, where S: NewService<Request = Result<Req, bincode::Error>,
Response = Response<Resp, E>, Response = Response<Resp, E>,
Error = io::Error> + 'static, Error = io::Error> + 'static,
Req: Deserialize + 'static, Req: DeserializeOwned + 'static,
Resp: Serialize + 'static, Resp: Serialize + 'static,
E: Serialize + 'static E: Serialize + 'static
{ {
@@ -330,8 +347,9 @@ struct BindStream<S, St> {
} }
impl<S, St> fmt::Debug for BindStream<S, St> impl<S, St> fmt::Debug for BindStream<S, St>
where S: fmt::Debug, where
St: fmt::Debug S: fmt::Debug,
St: fmt::Debug,
{ {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
const HANDLE: &'static &'static str = &"Handle { .. }"; const HANDLE: &'static &'static str = &"Handle { .. }";
@@ -347,7 +365,7 @@ impl<S, Req, Resp, E, I, St> BindStream<S, St>
where S: NewService<Request = Result<Req, bincode::Error>, where S: NewService<Request = Result<Req, bincode::Error>,
Response = Response<Resp, E>, Response = Response<Resp, E>,
Error = io::Error> + 'static, Error = io::Error> + 'static,
Req: Deserialize + 'static, Req: DeserializeOwned + 'static,
Resp: Serialize + 'static, Resp: Serialize + 'static,
E: Serialize + 'static, E: Serialize + 'static,
I: AsyncRead + AsyncWrite + 'static, I: AsyncRead + AsyncWrite + 'static,
@@ -372,7 +390,7 @@ impl<S, Req, Resp, E, I, St> Future for BindStream<S, St>
where S: NewService<Request = Result<Req, bincode::Error>, where S: NewService<Request = Result<Req, bincode::Error>,
Response = Response<Resp, E>, Response = Response<Resp, E>,
Error = io::Error> + 'static, Error = io::Error> + 'static,
Req: Deserialize + 'static, Req: DeserializeOwned + 'static,
Resp: Serialize + 'static, Resp: Serialize + 'static,
E: Serialize + 'static, E: Serialize + 'static,
I: AsyncRead + AsyncWrite + 'static, I: AsyncRead + AsyncWrite + 'static,
@@ -399,7 +417,7 @@ pub struct Listen<S, Req, Resp, E>
where S: NewService<Request = Result<Req, bincode::Error>, where S: NewService<Request = Result<Req, bincode::Error>,
Response = Response<Resp, E>, Response = Response<Resp, E>,
Error = io::Error> + 'static, Error = io::Error> + 'static,
Req: Deserialize + 'static, Req: DeserializeOwned + 'static,
Resp: Serialize + 'static, Resp: Serialize + 'static,
E: Serialize + 'static E: Serialize + 'static
{ {
@@ -410,7 +428,7 @@ impl<S, Req, Resp, E> Future for Listen<S, Req, Resp, E>
where S: NewService<Request = Result<Req, bincode::Error>, where S: NewService<Request = Result<Req, bincode::Error>,
Response = Response<Resp, E>, Response = Response<Resp, E>,
Error = io::Error> + 'static, Error = io::Error> + 'static,
Req: Deserialize + 'static, Req: DeserializeOwned + 'static,
Resp: Serialize + 'static, Resp: Serialize + 'static,
E: Serialize + 'static E: Serialize + 'static
{ {
@@ -426,7 +444,7 @@ impl<S, Req, Resp, E> fmt::Debug for Listen<S, Req, Resp, E>
where S: NewService<Request = Result<Req, bincode::Error>, where S: NewService<Request = Result<Req, bincode::Error>,
Response = Response<Resp, E>, Response = Response<Resp, E>,
Error = io::Error> + 'static, Error = io::Error> + 'static,
Req: Deserialize + 'static, Req: DeserializeOwned + 'static,
Resp: Serialize + 'static, Resp: Serialize + 'static,
E: Serialize + 'static E: Serialize + 'static
{ {
@@ -439,15 +457,15 @@ impl<S, Req, Resp, E> fmt::Debug for Listen<S, Req, Resp, E>
struct AlwaysOkUnit<F>(F); struct AlwaysOkUnit<F>(F);
impl<F> Future for AlwaysOkUnit<F> impl<F> Future for AlwaysOkUnit<F>
where F: Future where
F: Future,
{ {
type Item = (); type Item = ();
type Error = (); type Error = ();
fn poll(&mut self) -> Poll<(), ()> { fn poll(&mut self) -> Poll<(), ()> {
match self.0.poll() { match self.0.poll() {
Ok(Async::Ready(_)) | Ok(Async::Ready(_)) | Err(_) => Ok(Async::Ready(())),
Err(_) => Ok(Async::Ready(())),
Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::NotReady) => Ok(Async::NotReady),
} }
} }

View File

@@ -60,16 +60,18 @@ impl Watcher {
pub fn triple() -> (connection::Tracker, Shutdown, Self) { pub fn triple() -> (connection::Tracker, Shutdown, Self) {
let (connection_tx, connections) = connection::Tracker::pair(); let (connection_tx, connections) = connection::Tracker::pair();
let (shutdown_tx, shutdown_rx) = mpsc::unbounded(); let (shutdown_tx, shutdown_rx) = mpsc::unbounded();
(connection_tx, (
Shutdown { tx: shutdown_tx }, connection_tx,
Watcher { Shutdown { tx: shutdown_tx },
shutdown_rx: shutdown_rx.take(1), Watcher {
connections: connections, shutdown_rx: shutdown_rx.take(1),
queued_error: None, connections: connections,
shutdown: None, queued_error: None,
done: false, shutdown: None,
num_connections: 0, done: false,
}) num_connections: 0,
},
)
} }
fn process_connection(&mut self, action: connection::Action) { fn process_connection(&mut self, action: connection::Action) {
@@ -81,23 +83,23 @@ impl Watcher {
fn poll_shutdown_requests(&mut self) -> Poll<Option<()>, ()> { fn poll_shutdown_requests(&mut self) -> Poll<Option<()>, ()> {
Ok(Async::Ready(match try_ready!(self.shutdown_rx.poll()) { Ok(Async::Ready(match try_ready!(self.shutdown_rx.poll()) {
Some(tx) => { Some(tx) => {
debug!("Received shutdown request."); debug!("Received shutdown request.");
self.shutdown = Some(tx); self.shutdown = Some(tx);
Some(()) Some(())
} }
None => None, None => None,
})) }))
} }
fn poll_connections(&mut self) -> Poll<Option<()>, ()> { fn poll_connections(&mut self) -> Poll<Option<()>, ()> {
Ok(Async::Ready(match try_ready!(self.connections.poll()) { Ok(Async::Ready(match try_ready!(self.connections.poll()) {
Some(action) => { Some(action) => {
self.process_connection(action); self.process_connection(action);
Some(()) Some(())
} }
None => None, None => None,
})) }))
} }
fn poll_shutdown_requests_and_connections(&mut self) -> Poll<Option<()>, ()> { fn poll_shutdown_requests_and_connections(&mut self) -> Poll<Option<()>, ()> {

View File

@@ -27,7 +27,7 @@
//! Example usage: //! Example usage:
//! //!
//! ``` //! ```
//! #![feature(plugin)] //! #![feature(plugin, use_extern_macros)]
//! #![plugin(tarpc_plugins)] //! #![plugin(tarpc_plugins)]
//! //!
//! #[macro_use] //! #[macro_use]
@@ -71,7 +71,7 @@
//! Example usage with TLS: //! Example usage with TLS:
//! //!
//! ```no-run //! ```no-run
//! #![feature(plugin)] //! #![feature(plugin, use_extern_macros)]
//! #![plugin(tarpc_plugins)] //! #![plugin(tarpc_plugins)]
//! //!
//! #[macro_use] //! #[macro_use]
@@ -116,7 +116,7 @@
#![deny(missing_docs, missing_debug_implementations)] #![deny(missing_docs, missing_debug_implementations)]
#![feature(never_type)] #![feature(never_type)]
#![cfg_attr(test, feature(plugin))] #![cfg_attr(test, feature(plugin, use_extern_macros))]
#![cfg_attr(test, plugin(tarpc_plugins))] #![cfg_attr(test, plugin(tarpc_plugins))]
extern crate byteorder; extern crate byteorder;
@@ -129,8 +129,6 @@ extern crate lazy_static;
extern crate log; extern crate log;
extern crate net2; extern crate net2;
extern crate num_cpus; extern crate num_cpus;
#[macro_use]
extern crate serde_derive;
extern crate thread_pool; extern crate thread_pool;
extern crate tokio_io; extern crate tokio_io;
@@ -142,6 +140,9 @@ pub extern crate futures;
#[doc(hidden)] #[doc(hidden)]
pub extern crate serde; pub extern crate serde;
#[doc(hidden)] #[doc(hidden)]
#[macro_use]
pub extern crate serde_derive;
#[doc(hidden)]
pub extern crate tokio_core; pub extern crate tokio_core;
#[doc(hidden)] #[doc(hidden)]
pub extern crate tokio_proto; pub extern crate tokio_proto;
@@ -189,12 +190,12 @@ lazy_static! {
fn spawn_core() -> reactor::Remote { fn spawn_core() -> reactor::Remote {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
let mut core = reactor::Core::new().unwrap(); let mut core = reactor::Core::new().unwrap();
tx.send(core.handle().remote().clone()).unwrap(); tx.send(core.handle().remote().clone()).unwrap();
// Run forever // Run forever
core.run(futures::empty::<(), !>()).unwrap(); core.run(futures::empty::<(), !>()).unwrap();
}); });
rx.recv().unwrap() rx.recv().unwrap()
} }

View File

@@ -9,158 +9,12 @@ macro_rules! as_item {
($i:item) => {$i}; ($i:item) => {$i};
} }
#[doc(hidden)]
#[macro_export]
macro_rules! impl_serialize {
($impler:ident, { $($lifetime:tt)* }, $(@($name:ident $n:expr))* -- #($n_:expr) ) => {
as_item! {
impl$($lifetime)* $crate::serde::Serialize for $impler$($lifetime)* {
fn serialize<S>(&self, impl_serialize_serializer__: S)
-> ::std::result::Result<S::Ok, S::Error>
where S: $crate::serde::Serializer
{
match *self {
$(
$impler::$name(ref impl_serialize_field__) =>
$crate::serde::Serializer::serialize_newtype_variant(
impl_serialize_serializer__,
stringify!($impler),
$n,
stringify!($name),
impl_serialize_field__,
)
),*
}
}
}
}
};
// All args are wrapped in a tuple so we can use the newtype variant for each one.
($impler:ident,
{ $($lifetime:tt)* },
$(@$finished:tt)*
-- #($n:expr) $name:ident($field:ty) $($req:tt)*) =>
(
impl_serialize!($impler,
{ $($lifetime)* },
$(@$finished)* @($name $n)
-- #($n + 1) $($req)*);
);
// Entry
($impler:ident,
{ $($lifetime:tt)* },
$($started:tt)*) => (impl_serialize!($impler, { $($lifetime)* }, -- #(0) $($started)*););
}
#[doc(hidden)]
#[macro_export]
macro_rules! impl_deserialize {
($impler:ident, $(@($name:ident $n:expr))* -- #($n_:expr) ) => (
impl $crate::serde::Deserialize for $impler {
#[allow(non_camel_case_types)]
fn deserialize<impl_deserialize_D__>(
impl_deserialize_deserializer__: impl_deserialize_D__)
-> ::std::result::Result<$impler, impl_deserialize_D__::Error>
where impl_deserialize_D__: $crate::serde::Deserializer
{
#[allow(non_camel_case_types, unused)]
enum impl_deserialize_Field__ {
$($name),*
}
impl $crate::serde::Deserialize for impl_deserialize_Field__ {
fn deserialize<D>(impl_deserialize_deserializer__: D)
-> ::std::result::Result<impl_deserialize_Field__, D::Error>
where D: $crate::serde::Deserializer
{
struct impl_deserialize_FieldVisitor__;
impl $crate::serde::de::Visitor for impl_deserialize_FieldVisitor__ {
type Value = impl_deserialize_Field__;
fn expecting(&self, formatter: &mut ::std::fmt::Formatter)
-> ::std::fmt::Result
{
formatter.write_str("an unsigned integer")
}
fn visit_u64<E>(self, impl_deserialize_value__: u64)
-> ::std::result::Result<impl_deserialize_Field__, E>
where E: $crate::serde::de::Error,
{
if impl_deserialize_value__ == 0 {
return ::std::result::Result::Err(
$crate::serde::de::Error::custom(
"Variant 0 is a sentinel value and should not \
be serialized!"));
}
$(
if impl_deserialize_value__ == $n {
return ::std::result::Result::Ok(
impl_deserialize_Field__::$name);
}
)*
::std::result::Result::Err(
$crate::serde::de::Error::custom(
format!("No variants have a value of {}!",
impl_deserialize_value__))
)
}
}
impl_deserialize_deserializer__.deserialize_struct_field(
impl_deserialize_FieldVisitor__)
}
}
struct Visitor;
impl $crate::serde::de::Visitor for Visitor {
type Value = $impler;
fn expecting(&self, formatter: &mut ::std::fmt::Formatter)
-> ::std::fmt::Result
{
formatter.write_str("an enum variant")
}
fn visit_enum<V>(self, visitor__: V)
-> ::std::result::Result<Self::Value, V::Error>
where V: $crate::serde::de::EnumVisitor
{
use $crate::serde::de::VariantVisitor;
match visitor__.visit_variant()? {
$(
(impl_deserialize_Field__::$name, variant) => {
::std::result::Result::Ok(
$impler::$name(variant.visit_newtype()?))
}
),*
}
}
}
const TARPC_VARIANTS__: &'static [&'static str] = &[
$(
stringify!($name)
),*
];
impl_deserialize_deserializer__.deserialize_enum(
stringify!($impler), TARPC_VARIANTS__, Visitor)
}
}
);
// All args are wrapped in a tuple so we can use the newtype variant for each one.
($impler:ident, $(@$finished:tt)* -- #($n:expr) $name:ident($field:ty) $($req:tt)*) => (
impl_deserialize!($impler, $(@$finished)* @($name $n) -- #($n + 1) $($req)*);
);
// Entry
($impler:ident, $($started:tt)*) => (impl_deserialize!($impler, -- #(0) $($started)*););
}
/// The main macro that creates RPC services. /// The main macro that creates RPC services.
/// ///
/// Rpc methods are specified, mirroring trait syntax: /// Rpc methods are specified, mirroring trait syntax:
/// ///
/// ``` /// ```
/// # #![feature(plugin)] /// # #![feature(plugin, use_extern_macros)]
/// # #![plugin(tarpc_plugins)] /// # #![plugin(tarpc_plugins)]
/// # #[macro_use] extern crate tarpc; /// # #[macro_use] extern crate tarpc;
/// # fn main() {} /// # fn main() {}
@@ -290,41 +144,31 @@ macro_rules! service {
#[doc(hidden)] #[doc(hidden)]
#[allow(non_camel_case_types, unused)] #[allow(non_camel_case_types, unused)]
#[derive($crate::serde_derive::Serialize, $crate::serde_derive::Deserialize)]
pub enum Request__ { pub enum Request__ {
NotIrrefutable(()),
$( $(
$fn_name(( $($in_,)* )) $fn_name{ $($arg: $in_,)* }
),* ),*
} }
impl_deserialize!(Request__, NotIrrefutable(()) $($fn_name(($($in_),*)))*);
impl_serialize!(Request__, {}, NotIrrefutable(()) $($fn_name(($($in_),*)))*);
#[doc(hidden)] #[doc(hidden)]
#[allow(non_camel_case_types, unused)] #[allow(non_camel_case_types, unused)]
#[derive($crate::serde_derive::Serialize, $crate::serde_derive::Deserialize)]
pub enum Response__ { pub enum Response__ {
NotIrrefutable(()),
$( $(
$fn_name($out) $fn_name($out)
),* ),*
} }
impl_deserialize!(Response__, NotIrrefutable(()) $($fn_name($out))*);
impl_serialize!(Response__, {}, NotIrrefutable(()) $($fn_name($out))*);
#[doc(hidden)] #[doc(hidden)]
#[allow(non_camel_case_types, unused)] #[allow(non_camel_case_types, unused)]
#[derive(Debug)] #[derive(Debug, $crate::serde_derive::Deserialize, $crate::serde_derive::Serialize)]
pub enum Error__ { pub enum Error__ {
NotIrrefutable(()),
$( $(
$fn_name($error) $fn_name($error)
),* ),*
} }
impl_deserialize!(Error__, NotIrrefutable(()) $($fn_name($error))*);
impl_serialize!(Error__, {}, NotIrrefutable(()) $($fn_name($error))*);
/// Defines the `Future` RPC service. Implementors must be `Clone` and `'static`, /// Defines the `Future` RPC service. Implementors must be `Clone` and `'static`,
/// as required by `tokio_proto::NewService`. This is required so that the service can be used /// as required by `tokio_proto::NewService`. This is required so that the service can be used
/// to respond to multiple requests concurrently. /// to respond to multiple requests concurrently.
@@ -408,10 +252,10 @@ macro_rules! service {
::std::string::ToString::to_string(&err__))))); ::std::string::ToString::to_string(&err__)))));
} }
}; };
#[allow(unreachable_patterns)]
match request__ { match request__ {
Request__::NotIrrefutable(()) => unreachable!(),
$( $(
Request__::$fn_name(( $($arg,)* )) => { Request__::$fn_name{ $($arg,)* } => {
fn wrap__(response__: ::std::result::Result<$out, $error>) fn wrap__(response__: ::std::result::Result<$out, $error>)
-> ResponseFuture__ -> ResponseFuture__
{ {
@@ -430,6 +274,7 @@ macro_rules! service {
wrap__)); wrap__));
} }
)* )*
_ => unreachable!(),
} }
} }
} }
@@ -581,7 +426,7 @@ macro_rules! service {
-> ::std::result::Result<$out, $crate::Error<$error>> -> ::std::result::Result<$out, $crate::Error<$error>>
{ {
tarpc_service_then__!($out, $error, $fn_name); tarpc_service_then__!($out, $error, $fn_name);
let resp__ = self.inner.call(Request__::$fn_name(($($arg,)*))); let resp__ = self.inner.call(Request__::$fn_name { $($arg,)* });
tarpc_service_then__(resp__) tarpc_service_then__(resp__)
} }
)* )*
@@ -646,7 +491,7 @@ macro_rules! service {
-> ::std::result::Result<$out, $crate::Error<$error>>> { -> ::std::result::Result<$out, $crate::Error<$error>>> {
tarpc_service_then__!($out, $error, $fn_name); tarpc_service_then__!($out, $error, $fn_name);
let request__ = Request__::$fn_name(($($arg,)*)); let request__ = Request__::$fn_name { $($arg,)* };
let future__ = $crate::tokio_service::Service::call(&self.0, request__); let future__ = $crate::tokio_service::Service::call(&self.0, request__);
return $crate::futures::Future::then(future__, tarpc_service_then__); return $crate::futures::Future::then(future__, tarpc_service_then__);
} }
@@ -665,23 +510,21 @@ macro_rules! tarpc_service_then__ {
-> ::std::result::Result<$out, $crate::Error<$error>> { -> ::std::result::Result<$out, $crate::Error<$error>> {
match msg__ { match msg__ {
::std::result::Result::Ok(msg__) => { ::std::result::Result::Ok(msg__) => {
if let Response__::$fn_name(msg__) = #[allow(unreachable_patterns)]
msg__ match msg__ {
{ Response__::$fn_name(msg__) =>
::std::result::Result::Ok(msg__) ::std::result::Result::Ok(msg__),
} else { _ => unreachable!(),
unreachable!()
} }
} }
::std::result::Result::Err(err__) => { ::std::result::Result::Err(err__) => {
::std::result::Result::Err(match err__ { ::std::result::Result::Err(match err__ {
$crate::Error::App(err__) => { $crate::Error::App(err__) => {
if let Error__::$fn_name( #[allow(unreachable_patterns)]
err__) = err__ match err__ {
{ Error__::$fn_name(err__) =>
$crate::Error::App(err__) $crate::Error::App(err__),
} else { _ => unreachable!(),
unreachable!()
} }
} }
$crate::Error::RequestDeserialize(err__) => { $crate::Error::RequestDeserialize(err__) => {
@@ -777,8 +620,7 @@ mod functional_test {
if #[cfg(target_os = "macos")] { if #[cfg(target_os = "macos")] {
extern crate security_framework; extern crate security_framework;
use self::security_framework::certificate::SecCertificate; use native_tls_inner::Certificate;
use native_tls_inner::backend::security_framework::TlsConnectorBuilderExt;
fn get_future_tls_client_options() -> future::client::Options { fn get_future_tls_client_options() -> future::client::Options {
future::client::Options::default().tls(get_tls_client_context()) future::client::Options::default().tls(get_tls_client_context())
@@ -790,9 +632,9 @@ mod functional_test {
fn get_tls_client_context() -> Context { fn get_tls_client_context() -> Context {
let buf = include_bytes!("../test/root-ca.der"); let buf = include_bytes!("../test/root-ca.der");
let cert = unwrap!(SecCertificate::from_der(buf)); let cert = unwrap!(Certificate::from_der(buf));
let mut connector = unwrap!(TlsConnector::builder()); let mut connector = unwrap!(TlsConnector::builder());
connector.anchor_certificates(&[cert]); connector.add_root_certificate(cert);
Context { Context {
domain: DOMAIN.into(), domain: DOMAIN.into(),
@@ -1009,8 +851,8 @@ mod functional_test {
#[test] #[test]
fn simple() { fn simple() {
let _ = env_logger::init(); let _ = env_logger::init();
let (_, client, _) = unwrap!(start_server_with_sync_client::<SyncClient, let (_, client, _) =
Server>(Server)); unwrap!(start_server_with_sync_client::<SyncClient, Server>(Server));
assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!(3, client.add(1, 2).unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
} }
@@ -1020,8 +862,8 @@ mod functional_test {
use futures::{Async, Future}; use futures::{Async, Future};
let _ = env_logger::init(); let _ = env_logger::init();
let (addr, client, shutdown) = unwrap!(start_server_with_sync_client::<SyncClient, let (addr, client, shutdown) =
Server>(Server)); unwrap!(start_server_with_sync_client::<SyncClient, Server>(Server));
assert_eq!(3, unwrap!(client.add(1, 2))); assert_eq!(3, unwrap!(client.add(1, 2)));
assert_eq!("Hey, Tim.", unwrap!(client.hey("Tim".to_string()))); assert_eq!("Hey, Tim.", unwrap!(client.hey("Tim".to_string())));
@@ -1056,8 +898,8 @@ mod functional_test {
#[test] #[test]
fn no_shutdown() { fn no_shutdown() {
let _ = env_logger::init(); let _ = env_logger::init();
let (addr, client, shutdown) = unwrap!(start_server_with_sync_client::<SyncClient, let (addr, client, shutdown) =
Server>(Server)); unwrap!(start_server_with_sync_client::<SyncClient, Server>(Server));
assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!(3, client.add(1, 2).unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
@@ -1072,9 +914,10 @@ mod functional_test {
#[test] #[test]
fn other_service() { fn other_service() {
let _ = env_logger::init(); let _ = env_logger::init();
let (_, client, _) = unwrap!( let (_, client, _) = unwrap!(start_server_with_sync_client::<
start_server_with_sync_client::<super::other_service::SyncClient, Server>(Server) super::other_service::SyncClient,
); Server,
>(Server));
match client.foo().err().expect("failed unwrap") { match client.foo().err().expect("failed unwrap") {
::Error::RequestDeserialize(_) => {} // good ::Error::RequestDeserialize(_) => {} // good
bad => panic!("Expected Error::RequestDeserialize but got {}", bad), bad => panic!("Expected Error::RequestDeserialize but got {}", bad),
@@ -1093,7 +936,8 @@ mod functional_test {
impl Serialize for Bad { impl Serialize for Bad {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: Serializer where
S: Serializer,
{ {
serializer.serialize_seq(None)?.end() serializer.serialize_seq(None)?.end()
} }
@@ -1146,11 +990,14 @@ mod functional_test {
#[test] #[test]
fn simple() { fn simple() {
let _ = env_logger::init(); let _ = env_logger::init();
let (_, mut reactor, client) = let (_, mut reactor, client) = unwrap!(
unwrap!(start_server_with_async_client::<FutureClient, Server>(Server)); start_server_with_async_client::<FutureClient, Server>(Server)
);
assert_eq!(3, reactor.run(client.add(1, 2)).unwrap()); assert_eq!(3, reactor.run(client.add(1, 2)).unwrap());
assert_eq!("Hey, Tim.", assert_eq!(
reactor.run(client.hey("Tim".to_string())).unwrap()); "Hey, Tim.",
reactor.run(client.hey("Tim".to_string())).unwrap()
);
} }
#[test] #[test]
@@ -1181,8 +1028,9 @@ mod functional_test {
#[test] #[test]
fn concurrent() { fn concurrent() {
let _ = env_logger::init(); let _ = env_logger::init();
let (_, mut reactor, client) = let (_, mut reactor, client) = unwrap!(
unwrap!(start_server_with_async_client::<FutureClient, Server>(Server)); start_server_with_async_client::<FutureClient, Server>(Server)
);
let req1 = client.add(1, 2); let req1 = client.add(1, 2);
let req2 = client.add(3, 4); let req2 = client.add(3, 4);
let req3 = client.hey("Tim".to_string()); let req3 = client.hey("Tim".to_string());
@@ -1194,9 +1042,10 @@ mod functional_test {
#[test] #[test]
fn other_service() { fn other_service() {
let _ = env_logger::init(); let _ = env_logger::init();
let (_, mut reactor, client) = let (_, mut reactor, client) = unwrap!(start_server_with_async_client::<
unwrap!(start_server_with_async_client::<super::other_service::FutureClient, super::other_service::FutureClient,
Server>(Server)); Server,
>(Server));
match reactor.run(client.foo()).err().unwrap() { match reactor.run(client.foo()).err().unwrap() {
::Error::RequestDeserialize(_) => {} // good ::Error::RequestDeserialize(_) => {} // good
bad => panic!(r#"Expected Error::RequestDeserialize but got "{}""#, bad), bad => panic!(r#"Expected Error::RequestDeserialize but got "{}""#, bad),
@@ -1212,9 +1061,11 @@ mod functional_test {
let _ = env_logger::init(); let _ = env_logger::init();
let reactor = reactor::Core::new().unwrap(); let reactor = reactor::Core::new().unwrap();
let handle = Server let handle = Server
.listen("localhost:0".first_socket_addr(), .listen(
&reactor.handle(), "localhost:0".first_socket_addr(),
server::Options::default()) &reactor.handle(),
server::Options::default(),
)
.unwrap() .unwrap()
.0; .0;
Server Server
@@ -1232,22 +1083,26 @@ mod functional_test {
let _ = env_logger::init(); let _ = env_logger::init();
let mut reactor = reactor::Core::new().unwrap(); let mut reactor = reactor::Core::new().unwrap();
let (handle, server) = Server let (handle, server) = Server
.listen("localhost:0".first_socket_addr(), .listen(
&reactor.handle(), "localhost:0".first_socket_addr(),
server::Options::default()) &reactor.handle(),
server::Options::default(),
)
.unwrap(); .unwrap();
reactor.handle().spawn(server); reactor.handle().spawn(server);
let client = FutureClient::connect(handle.addr(), let client = FutureClient::connect(
client::Options::default() handle.addr(),
.handle(reactor.handle())); client::Options::default().handle(reactor.handle()),
);
let client = unwrap!(reactor.run(client)); let client = unwrap!(reactor.run(client));
assert_eq!(reactor.run(client.add(1, 2)).unwrap(), 3); assert_eq!(reactor.run(client.add(1, 2)).unwrap(), 3);
drop(client); drop(client);
let client = FutureClient::connect(handle.addr(), let client = FutureClient::connect(
client::Options::default() handle.addr(),
.handle(reactor.handle())); client::Options::default().handle(reactor.handle()),
);
let client = unwrap!(reactor.run(client)); let client = unwrap!(reactor.run(client));
assert_eq!(reactor.run(client.add(1, 2)).unwrap(), 3); assert_eq!(reactor.run(client.add(1, 2)).unwrap(), 3);
} }
@@ -1261,16 +1116,21 @@ mod functional_test {
use super::FutureServiceExt; use super::FutureServiceExt;
let _ = env_logger::init(); let _ = env_logger::init();
let (_, mut reactor, client) = let (_, mut reactor, client) = unwrap!(
unwrap!(start_server_with_async_client::<FutureClient, Server>(Server)); start_server_with_async_client::<FutureClient, Server>(Server)
);
assert_eq!(3, reactor.run(client.add(1, 2)).unwrap()); assert_eq!(3, reactor.run(client.add(1, 2)).unwrap());
assert_eq!("Hey, Tim.", assert_eq!(
reactor.run(client.hey("Tim".to_string())).unwrap()); "Hey, Tim.",
reactor.run(client.hey("Tim".to_string())).unwrap()
);
let (handle, server) = Server let (handle, server) = Server
.listen("localhost:0".first_socket_addr(), .listen(
&reactor.handle(), "localhost:0".first_socket_addr(),
server::Options::default()) &reactor.handle(),
server::Options::default(),
)
.unwrap(); .unwrap();
reactor.handle().spawn(server); reactor.handle().spawn(server);
let options = client::Options::default().handle(reactor.handle()); let options = client::Options::default().handle(reactor.handle());
@@ -1278,8 +1138,10 @@ mod functional_test {
.run(FutureClient::connect(handle.addr(), options)) .run(FutureClient::connect(handle.addr(), options))
.unwrap(); .unwrap();
assert_eq!(3, reactor.run(client.add(1, 2)).unwrap()); assert_eq!(3, reactor.run(client.add(1, 2)).unwrap());
assert_eq!("Hey, Tim.", assert_eq!(
reactor.run(client.hey("Tim".to_string())).unwrap()); "Hey, Tim.",
reactor.run(client.hey("Tim".to_string())).unwrap()
);
} }
} }
@@ -1310,9 +1172,7 @@ mod functional_test {
let (_, mut reactor, client) = let (_, mut reactor, client) =
start_err_server_with_async_client::<FutureClient, ErrorServer>(ErrorServer).unwrap(); start_err_server_with_async_client::<FutureClient, ErrorServer>(ErrorServer).unwrap();
reactor reactor
.run(client .run(client.bar().then(move |result| {
.bar()
.then(move |result| {
match result.err().unwrap() { match result.err().unwrap() {
::Error::App(e) => { ::Error::App(e) => {
assert_eq!(e.description(), "lol jk"); assert_eq!(e.description(), "lol jk");

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "tarpc-plugins" name = "tarpc-plugins"
version = "0.1.1" version = "0.2.0"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"] authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
license = "MIT" license = "MIT"
documentation = "https://docs.rs/tarpc" documentation = "https://docs.rs/tarpc"
@@ -15,7 +15,7 @@ description = "Plugins for tarpc, an RPC framework for Rust with a focus on ease
travis-ci = { repository = "google/tarpc" } travis-ci = { repository = "google/tarpc" }
[dependencies] [dependencies]
itertools = "0.5" itertools = "0.6"
[lib] [lib]
plugin = true plugin = true

View File

@@ -65,7 +65,7 @@ fn snake_to_camel(cx: &mut ExtCtxt, sp: Span, tts: &[TokenTree]) -> Box<MacResul
if let Some(tt @ TokenTree::Token(_, token::Eq)) = tokens.next() { if let Some(tt @ TokenTree::Token(_, token::Eq)) = tokens.next() {
let mut docstr = tokens.next().expect("Docstrings must have literal docstring"); let mut docstr = tokens.next().expect("Docstrings must have literal docstring");
if let TokenTree::Token(_, token::Literal(token::Str_(ref mut doc), _)) = docstr { if let TokenTree::Token(_, token::Literal(token::Str_(ref mut doc), _)) = docstr {
*doc = Symbol::intern(&str_lit(&doc.as_str()).replace("{}", &old_ident)); *doc = Symbol::intern(&str_lit(&doc.as_str(), None).replace("{}", &old_ident));
} else { } else {
unreachable!(); unreachable!();
} }

View File

@@ -42,18 +42,25 @@ impl<Encode, Decode> Codec<Encode, Decode> {
} }
fn too_big(payload_size: u64, max_payload_size: u64) -> io::Error { fn too_big(payload_size: u64, max_payload_size: u64) -> io::Error {
warn!("Not sending too-big packet of size {} (max is {})", warn!(
payload_size, "Not sending too-big packet of size {} (max is {})",
max_payload_size); payload_size,
io::Error::new(io::ErrorKind::InvalidData, max_payload_size
format!("Maximum payload size is {} bytes but got a payload of {}", );
max_payload_size, io::Error::new(
payload_size)) io::ErrorKind::InvalidData,
format!(
"Maximum payload size is {} bytes but got a payload of {}",
max_payload_size,
payload_size
),
)
} }
impl<Encode, Decode> Encoder for Codec<Encode, Decode> impl<Encode, Decode> Encoder for Codec<Encode, Decode>
where Encode: serde::Serialize, where
Decode: serde::Deserialize Encode: serde::Serialize,
Decode: serde::de::DeserializeOwned,
{ {
type Item = (RequestId, Encode); type Item = (RequestId, Encode);
type Error = io::Error; type Error = io::Error;
@@ -69,14 +76,17 @@ impl<Encode, Decode> Encoder for Codec<Encode, Decode>
trace!("Encoded request id = {} as {:?}", id, buf); trace!("Encoded request id = {} as {:?}", id, buf);
buf.put_u64::<BigEndian>(payload_size); buf.put_u64::<BigEndian>(payload_size);
bincode::serialize_into(&mut buf.writer(), &message, Infinite) bincode::serialize_into(&mut buf.writer(), &message, Infinite)
.map_err(|serialize_err| io::Error::new(io::ErrorKind::Other, serialize_err))?; .map_err(|serialize_err| {
io::Error::new(io::ErrorKind::Other, serialize_err)
})?;
trace!("Encoded buffer: {:?}", buf); trace!("Encoded buffer: {:?}", buf);
Ok(()) Ok(())
} }
} }
impl<Encode, Decode> Decoder for Codec<Encode, Decode> impl<Encode, Decode> Decoder for Codec<Encode, Decode>
where Decode: serde::Deserialize where
Decode: serde::de::DeserializeOwned,
{ {
type Item = (RequestId, Result<Decode, bincode::Error>); type Item = (RequestId, Result<Decode, bincode::Error>);
type Error = io::Error; type Error = io::Error;
@@ -98,25 +108,31 @@ impl<Encode, Decode> Decoder for Codec<Encode, Decode>
self.state = Len { id: id }; self.state = Len { id: id };
} }
Len { .. } if buf.len() < mem::size_of::<u64>() => { Len { .. } if buf.len() < mem::size_of::<u64>() => {
trace!("--> Buf len is {}; waiting for 8 to parse packet length.", trace!(
buf.len()); "--> Buf len is {}; waiting for 8 to parse packet length.",
buf.len()
);
return Ok(None); return Ok(None);
} }
Len { id } => { Len { id } => {
let len_buf = buf.split_to(mem::size_of::<u64>()); let len_buf = buf.split_to(mem::size_of::<u64>());
let len = Cursor::new(len_buf).read_u64::<BigEndian>()?; let len = Cursor::new(len_buf).read_u64::<BigEndian>()?;
trace!("--> Parsed payload length = {}, remaining buffer length = {}", trace!(
len, "--> Parsed payload length = {}, remaining buffer length = {}",
buf.len()); len,
buf.len()
);
if len > self.max_payload_size { if len > self.max_payload_size {
return Err(too_big(len, self.max_payload_size)); return Err(too_big(len, self.max_payload_size));
} }
self.state = Payload { id: id, len: len }; self.state = Payload { id: id, len: len };
} }
Payload { len, .. } if buf.len() < len as usize => { Payload { len, .. } if buf.len() < len as usize => {
trace!("--> Buf len is {}; waiting for {} to parse payload.", trace!(
buf.len(), "--> Buf len is {}; waiting for {} to parse payload.",
len); buf.len(),
len
);
return Ok(None); return Ok(None);
} }
Payload { id, len } => { Payload { id, len } => {
@@ -151,9 +167,10 @@ impl<Encode, Decode> Proto<Encode, Decode> {
} }
impl<T, Encode, Decode> ServerProto<T> for Proto<Encode, Decode> impl<T, Encode, Decode> ServerProto<T> for Proto<Encode, Decode>
where T: AsyncRead + AsyncWrite + 'static, where
Encode: serde::Serialize + 'static, T: AsyncRead + AsyncWrite + 'static,
Decode: serde::Deserialize + 'static Encode: serde::Serialize + 'static,
Decode: serde::de::DeserializeOwned + 'static,
{ {
type Response = Encode; type Response = Encode;
type Request = Result<Decode, bincode::Error>; type Request = Result<Decode, bincode::Error>;
@@ -166,9 +183,10 @@ impl<T, Encode, Decode> ServerProto<T> for Proto<Encode, Decode>
} }
impl<T, Encode, Decode> ClientProto<T> for Proto<Encode, Decode> impl<T, Encode, Decode> ClientProto<T> for Proto<Encode, Decode>
where T: AsyncRead + AsyncWrite + 'static, where
Encode: serde::Serialize + 'static, T: AsyncRead + AsyncWrite + 'static,
Decode: serde::Deserialize + 'static Encode: serde::Serialize + 'static,
Decode: serde::de::DeserializeOwned + 'static,
{ {
type Response = Result<Decode, bincode::Error>; type Response = Result<Decode, bincode::Error>;
type Request = Encode; type Request = Encode;
@@ -189,8 +207,10 @@ fn serialize() {
for _ in 0..2 { for _ in 0..2 {
let mut codec: Codec<(char, char, char), (char, char, char)> = Codec::new(2_000_000); let mut codec: Codec<(char, char, char), (char, char, char)> = Codec::new(2_000_000);
codec.encode(MSG, &mut buf).unwrap(); codec.encode(MSG, &mut buf).unwrap();
let actual: Result<Option<(u64, Result<(char, char, char), bincode::Error>)>, let actual: Result<
io::Error> = codec.decode(&mut buf); Option<(u64, Result<(char, char, char), bincode::Error>)>,
io::Error,
> = codec.decode(&mut buf);
match actual { match actual {
Ok(Some((id, ref v))) if id == MSG.0 && *v.as_ref().unwrap() == MSG.1 => {} Ok(Some((id, ref v))) if id == MSG.0 && *v.as_ref().unwrap() == MSG.1 => {}
@@ -206,17 +226,21 @@ fn deserialize_big() {
let mut codec: Codec<Vec<u8>, Vec<u8>> = Codec::new(24); let mut codec: Codec<Vec<u8>, Vec<u8>> = Codec::new(24);
let mut buf = BytesMut::with_capacity(40); let mut buf = BytesMut::with_capacity(40);
assert_eq!(codec assert_eq!(
.encode((0, vec![0; 24]), &mut buf) codec
.err() .encode((0, vec![0; 24]), &mut buf)
.unwrap() .err()
.kind(), .unwrap()
io::ErrorKind::InvalidData); .kind(),
io::ErrorKind::InvalidData
);
// Header // Header
buf.put_slice(&mut [0u8; 8]); buf.put_slice(&mut [0u8; 8]);
// Len // Len
buf.put_slice(&mut [0u8, 0, 0, 0, 0, 0, 0, 25]); buf.put_slice(&mut [0u8, 0, 0, 0, 0, 0, 0, 25]);
assert_eq!(codec.decode(&mut buf).err().unwrap().kind(), assert_eq!(
io::ErrorKind::InvalidData); codec.decode(&mut buf).err().unwrap().kind(),
io::ErrorKind::InvalidData
);
} }

View File

@@ -1,8 +1,8 @@
use future::client::{Client as FutureClient, ClientExt as FutureClientExt, use future::client::{Client as FutureClient, ClientExt as FutureClientExt,
Options as FutureOptions}; Options as FutureOptions};
/// Exposes a trait for connecting synchronously to servers.
use futures::{Future, Stream}; use futures::{Future, Stream};
use serde::{Deserialize, Serialize}; use serde::Serialize;
use serde::de::DeserializeOwned;
use std::fmt; use std::fmt;
use std::io; use std::io;
use std::net::{SocketAddr, ToSocketAddrs}; use std::net::{SocketAddr, ToSocketAddrs};
@@ -22,7 +22,9 @@ pub struct Client<Req, Resp, E> {
impl<Req, Resp, E> Clone for Client<Req, Resp, E> { impl<Req, Resp, E> Clone for Client<Req, Resp, E> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Client { proxy: self.proxy.clone() } Client {
proxy: self.proxy.clone(),
}
} }
} }
@@ -34,9 +36,10 @@ impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E> {
} }
impl<Req, Resp, E> Client<Req, Resp, E> impl<Req, Resp, E> Client<Req, Resp, E>
where Req: Serialize + Send + 'static, where
Resp: Deserialize + Send + 'static, Req: Serialize + Send + 'static,
E: Deserialize + Send + 'static Resp: DeserializeOwned + Send + 'static,
E: DeserializeOwned + Send + 'static,
{ {
/// Drives an RPC call for the given request. /// Drives an RPC call for the given request.
pub fn call(&self, request: Req) -> Result<Resp, ::Error<E>> { pub fn call(&self, request: Req) -> Result<Resp, ::Error<E>> {
@@ -58,7 +61,9 @@ pub struct Options {
impl Default for Options { impl Default for Options {
#[cfg(not(feature = "tls"))] #[cfg(not(feature = "tls"))]
fn default() -> Self { fn default() -> Self {
Options { max_payload_size: 2_000_000 } Options {
max_payload_size: 2_000_000,
}
} }
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
@@ -92,8 +97,7 @@ impl fmt::Debug for Options {
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
const NONE: &'static &'static str = &"None"; const NONE: &'static &'static str = &"None";
let mut f = f.debug_struct("Options"); let mut f = f.debug_struct("Options");
#[cfg(feature = "tls")] #[cfg(feature = "tls")] f.field("tls_ctx", if self.tls_ctx.is_some() { SOME } else { NONE });
f.field("tls_ctx", if self.tls_ctx.is_some() { SOME } else { NONE });
f.finish() f.finish()
} }
} }
@@ -119,26 +123,30 @@ impl Into<FutureOptions> for (reactor::Handle, Options) {
/// Extension methods for Clients. /// Extension methods for Clients.
pub trait ClientExt: Sized { pub trait ClientExt: Sized {
/// Connects to a server located at the given address. /// Connects to a server located at the given address.
fn connect<A>(addr: A, options: Options) -> io::Result<Self> where A: ToSocketAddrs; fn connect<A>(addr: A, options: Options) -> io::Result<Self>
where
A: ToSocketAddrs;
} }
impl<Req, Resp, E> ClientExt for Client<Req, Resp, E> impl<Req, Resp, E> ClientExt for Client<Req, Resp, E>
where Req: Serialize + Send + 'static, where
Resp: Deserialize + Send + 'static, Req: Serialize + Send + 'static,
E: Deserialize + Send + 'static Resp: DeserializeOwned + Send + 'static,
E: DeserializeOwned + Send + 'static,
{ {
fn connect<A>(addr: A, options: Options) -> io::Result<Self> fn connect<A>(addr: A, options: Options) -> io::Result<Self>
where A: ToSocketAddrs where
A: ToSocketAddrs,
{ {
let addr = addr.try_first_socket_addr()?; let addr = addr.try_first_socket_addr()?;
let (connect_tx, connect_rx) = mpsc::channel(); let (connect_tx, connect_rx) = mpsc::channel();
thread::spawn(move || match RequestHandler::connect(addr, options) { thread::spawn(move || match RequestHandler::connect(addr, options) {
Ok((proxy, mut handler)) => { Ok((proxy, mut handler)) => {
connect_tx.send(Ok(proxy)).unwrap(); connect_tx.send(Ok(proxy)).unwrap();
handler.handle_requests(); handler.handle_requests();
} }
Err(e) => connect_tx.send(Err(e)).unwrap(), Err(e) => connect_tx.send(Err(e)).unwrap(),
}); });
Ok(connect_rx.recv().unwrap()?) Ok(connect_rx.recv().unwrap()?)
} }
} }
@@ -153,9 +161,10 @@ struct RequestHandler<Req, Resp, E, S> {
} }
impl<Req, Resp, E> RequestHandler<Req, Resp, E, FutureClient<Req, Resp, E>> impl<Req, Resp, E> RequestHandler<Req, Resp, E, FutureClient<Req, Resp, E>>
where Req: Serialize + Send + 'static, where
Resp: Deserialize + Send + 'static, Req: Serialize + Send + 'static,
E: Deserialize + Send + 'static Resp: DeserializeOwned + Send + 'static,
E: DeserializeOwned + Send + 'static,
{ {
/// Creates a new `RequestHandler` by connecting a `FutureClient` to the given address /// Creates a new `RequestHandler` by connecting a `FutureClient` to the given address
/// using the given options. /// using the given options.
@@ -164,21 +173,24 @@ impl<Req, Resp, E> RequestHandler<Req, Resp, E, FutureClient<Req, Resp, E>>
let options = (reactor.handle(), options).into(); let options = (reactor.handle(), options).into();
let client = reactor.run(FutureClient::connect(addr, options))?; let client = reactor.run(FutureClient::connect(addr, options))?;
let (proxy, requests) = pair(); let (proxy, requests) = pair();
Ok((Client { proxy }, Ok((
Client { proxy },
RequestHandler { RequestHandler {
reactor, reactor,
client, client,
requests, requests,
})) },
))
} }
} }
impl<Req, Resp, E, S> RequestHandler<Req, Resp, E, S> impl<Req, Resp, E, S> RequestHandler<Req, Resp, E, S>
where Req: Serialize + 'static, where
Resp: Deserialize + 'static, Req: Serialize + 'static,
E: Deserialize + 'static, Resp: DeserializeOwned + 'static,
S: Service<Request = Req, Response = Resp, Error = ::Error<E>>, E: DeserializeOwned + 'static,
S::Future: 'static S: Service<Request = Req, Response = Resp, Error = ::Error<E>>,
S::Future: 'static,
{ {
fn handle_requests(&mut self) { fn handle_requests(&mut self) {
let RequestHandler { let RequestHandler {
@@ -187,27 +199,26 @@ impl<Req, Resp, E, S> RequestHandler<Req, Resp, E, S>
ref mut client, ref mut client,
} = *self; } = *self;
let handle = reactor.handle(); let handle = reactor.handle();
let requests = let requests = requests
requests .map(|result| {
.map(|result| { match result {
match result { Ok(req) => req,
Ok(req) => req, // The ClientProxy never sends Err currently
// The ClientProxy never sends Err currently Err(e) => panic!("Unimplemented error handling in RequestHandler: {}", e),
Err(e) => panic!("Unimplemented error handling in RequestHandler: {}", e), }
} })
}) .for_each(|(request, response_tx)| {
.for_each(|(request, response_tx)| { let request = client.call(request).then(move |response| {
let request = client.call(request) // Safe to unwrap because clients always block on the response future.
.then(move |response| { response_tx
// Safe to unwrap because clients always block on the response future. .send(response)
response_tx.send(response) .map_err(|_| ())
.map_err(|_| ()) .expect("Client should block on response");
.expect("Client should block on response");
Ok(())
});
handle.spawn(request);
Ok(()) Ok(())
}); });
handle.spawn(request);
Ok(())
});
reactor.run(requests).unwrap(); reactor.run(requests).unwrap();
} }
} }

View File

@@ -4,7 +4,8 @@ use futures::{Future, future as futures};
use futures::sync::oneshot; use futures::sync::oneshot;
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
use native_tls_inner::TlsAcceptor; use native_tls_inner::TlsAcceptor;
use serde::{Deserialize, Serialize}; use serde::Serialize;
use serde::de::DeserializeOwned;
use std::fmt; use std::fmt;
use std::io; use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
@@ -110,7 +111,7 @@ pub fn listen<S, Req, Resp, E>(new_service: S,
<S::Instance as Service>::Future: Send + 'static, <S::Instance as Service>::Future: Send + 'static,
S::Response: Send, S::Response: Send,
S::Error: Send, S::Error: Send,
Req: Deserialize + 'static, Req: DeserializeOwned + 'static,
Resp: Serialize + 'static, Resp: Serialize + 'static,
E: Serialize + 'static E: Serialize + 'static
{ {
@@ -120,15 +121,16 @@ pub fn listen<S, Req, Resp, E>(new_service: S,
future::server::listen(new_service, addr, &reactor.handle(), options.opts)?; future::server::listen(new_service, addr, &reactor.handle(), options.opts)?;
let server = Box::new(server); let server = Box::new(server);
Ok(Handle { Ok(Handle {
reactor: reactor, reactor: reactor,
handle: handle, handle: handle,
server: server, server: server,
}) })
} }
/// A service that uses a thread pool. /// A service that uses a thread pool.
struct NewThreadService<S> struct NewThreadService<S>
where S: NewService where
S: NewService,
{ {
new_service: S, new_service: S,
sender: Sender<ServiceTask<<S::Instance as Service>::Future>>, sender: Sender<ServiceTask<<S::Instance as Service>::Future>>,
@@ -137,7 +139,8 @@ struct NewThreadService<S>
/// A service that runs by executing request handlers in a thread pool. /// A service that runs by executing request handlers in a thread pool.
struct ThreadService<S> struct ThreadService<S>
where S: Service where
S: Service,
{ {
service: S, service: S,
sender: Sender<ServiceTask<S::Future>>, sender: Sender<ServiceTask<S::Future>>,
@@ -145,17 +148,19 @@ struct ThreadService<S>
/// A task that handles a single request. /// A task that handles a single request.
struct ServiceTask<F> struct ServiceTask<F>
where F: Future where
F: Future,
{ {
future: F, future: F,
tx: oneshot::Sender<Result<F::Item, F::Error>>, tx: oneshot::Sender<Result<F::Item, F::Error>>,
} }
impl<S> NewThreadService<S> impl<S> NewThreadService<S>
where S: NewService, where
<S::Instance as Service>::Future: Send + 'static, S: NewService,
S::Response: Send, <S::Instance as Service>::Future: Send + 'static,
S::Error: Send S::Response: Send,
S::Error: Send,
{ {
/// Create a NewThreadService by wrapping another service. /// Create a NewThreadService by wrapping another service.
fn new(new_service: S, pool: thread_pool::Builder) -> Self { fn new(new_service: S, pool: thread_pool::Builder) -> Self {
@@ -169,10 +174,11 @@ impl<S> NewThreadService<S>
} }
impl<S> NewService for NewThreadService<S> impl<S> NewService for NewThreadService<S>
where S: NewService, where
<S::Instance as Service>::Future: Send + 'static, S: NewService,
S::Response: Send, <S::Instance as Service>::Future: Send + 'static,
S::Error: Send S::Response: Send,
S::Error: Send,
{ {
type Request = S::Request; type Request = S::Request;
type Response = S::Response; type Response = S::Response;
@@ -181,16 +187,17 @@ impl<S> NewService for NewThreadService<S>
fn new_service(&self) -> io::Result<Self::Instance> { fn new_service(&self) -> io::Result<Self::Instance> {
Ok(ThreadService { Ok(ThreadService {
service: self.new_service.new_service()?, service: self.new_service.new_service()?,
sender: self.sender.clone(), sender: self.sender.clone(),
}) })
} }
} }
impl<F> Task for ServiceTask<F> impl<F> Task for ServiceTask<F>
where F: Future + Send + 'static, where
F::Item: Send, F: Future + Send + 'static,
F::Error: Send F::Item: Send,
F::Error: Send,
{ {
fn run(self) { fn run(self) {
// Don't care if sending fails. It just means the request is no longer // Don't care if sending fails. It just means the request is no longer
@@ -200,35 +207,40 @@ impl<F> Task for ServiceTask<F>
} }
impl<S> Service for ThreadService<S> impl<S> Service for ThreadService<S>
where S: Service, where
S::Future: Send + 'static, S: Service,
S::Response: Send, S::Future: Send + 'static,
S::Error: Send S::Response: Send,
S::Error: Send,
{ {
type Request = S::Request; type Request = S::Request;
type Response = S::Response; type Response = S::Response;
type Error = S::Error; type Error = S::Error;
type Future = futures::AndThen<futures::MapErr<oneshot::Receiver<Result<Self::Response, type Future = futures::AndThen<
Self::Error>>, futures::MapErr<
fn(oneshot::Canceled) -> Self::Error>, oneshot::Receiver<Result<Self::Response, Self::Error>>,
Result<Self::Response, Self::Error>, fn(oneshot::Canceled) -> Self::Error,
fn(Result<Self::Response, Self::Error>) >,
-> Result<Self::Response, Self::Error>>; Result<Self::Response, Self::Error>,
fn(Result<Self::Response, Self::Error>)
-> Result<Self::Response, Self::Error>,
>;
fn call(&self, request: Self::Request) -> Self::Future { fn call(&self, request: Self::Request) -> Self::Future {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.sender self.sender
.send(ServiceTask { .send(ServiceTask {
future: self.service.call(request), future: self.service.call(request),
tx: tx, tx: tx,
}) })
.unwrap(); .unwrap();
rx.map_err(unreachable as _).and_then(ident) rx.map_err(unreachable as _).and_then(ident)
} }
} }
fn unreachable<T, U>(t: T) -> U fn unreachable<T, U>(t: T) -> U
where T: fmt::Display where
T: fmt::Display,
{ {
unreachable!(t) unreachable!(t)
} }

View File

@@ -19,9 +19,9 @@ pub mod client {
/// validation. /// validation.
pub fn new<S: Into<String>>(domain: S) -> Result<Self, Error> { pub fn new<S: Into<String>>(domain: S) -> Result<Self, Error> {
Ok(Context { Ok(Context {
domain: domain.into(), domain: domain.into(),
tls_connector: TlsConnector::builder()?.build()?, tls_connector: TlsConnector::builder()?.build()?,
}) })
} }
/// Construct a new `Context` using the provided domain and `TlsConnector` /// Construct a new `Context` using the provided domain and `TlsConnector`

View File

@@ -53,16 +53,18 @@ impl Stream for Never {
impl Serialize for Never { impl Serialize for Never {
fn serialize<S>(&self, _: S) -> Result<S::Ok, S::Error> fn serialize<S>(&self, _: S) -> Result<S::Ok, S::Error>
where S: Serializer where
S: Serializer,
{ {
self.0 self.0
} }
} }
// Please don't try to deserialize this. :( // Please don't try to deserialize this. :(
impl Deserialize for Never { impl<'a> Deserialize<'a> for Never {
fn deserialize<D>(_: D) -> Result<Self, D::Error> fn deserialize<D>(_: D) -> Result<Self, D::Error>
where D: Deserializer where
D: Deserializer<'a>,
{ {
panic!("Never cannot be instantiated!"); panic!("Never cannot be instantiated!");
} }
@@ -99,8 +101,10 @@ pub trait FirstSocketAddr: ToSocketAddrs {
if let Some(a) = self.to_socket_addrs()?.next() { if let Some(a) = self.to_socket_addrs()?.next() {
Ok(a) Ok(a)
} else { } else {
Err(io::Error::new(io::ErrorKind::AddrNotAvailable, Err(io::Error::new(
"`ToSocketAddrs::to_socket_addrs` returned an empty iterator.")) io::ErrorKind::AddrNotAvailable,
"`ToSocketAddrs::to_socket_addrs` returned an empty iterator.",
))
} }
} }
@@ -119,10 +123,13 @@ impl<A: ToSocketAddrs> FirstSocketAddr for A {}
/// on it, otherwise the callback never runs. Once run, however, this future is /// on it, otherwise the callback never runs. Once run, however, this future is
/// the same as the one the closure creates. /// the same as the one the closure creates.
pub fn lazy<F, A, R>(f: F, args: A) -> Lazy<F, A, R> pub fn lazy<F, A, R>(f: F, args: A) -> Lazy<F, A, R>
where F: FnOnce(A) -> R, where
R: IntoFuture F: FnOnce(A) -> R,
R: IntoFuture,
{ {
Lazy { inner: _Lazy::First(f, args) } Lazy {
inner: _Lazy::First(f, args),
}
} }
/// A future which defers creation of the actual future until a callback is /// A future which defers creation of the actual future until a callback is
@@ -143,8 +150,9 @@ enum _Lazy<F, A, R> {
} }
impl<F, A, R> Lazy<F, A, R> impl<F, A, R> Lazy<F, A, R>
where F: FnOnce(A) -> R, where
R: IntoFuture F: FnOnce(A) -> R,
R: IntoFuture,
{ {
fn get(&mut self) -> &mut R::Future { fn get(&mut self) -> &mut R::Future {
match self.inner { match self.inner {
@@ -164,8 +172,9 @@ impl<F, A, R> Lazy<F, A, R>
} }
impl<F, A, R> Future for Lazy<F, A, R> impl<F, A, R> Future for Lazy<F, A, R>
where F: FnOnce(A) -> R, where
R: IntoFuture F: FnOnce(A) -> R,
R: IntoFuture,
{ {
type Item = R::Item; type Item = R::Item;
type Error = R::Error; type Error = R::Error;