mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-02-23 15:49:54 +01:00
Compare commits
34 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
92f157206d | ||
|
|
b093db63a3 | ||
|
|
8c3e3df47f | ||
|
|
6907c6e0a3 | ||
|
|
4b5273127d | ||
|
|
4b763e9f52 | ||
|
|
848eb00bea | ||
|
|
44ec68c002 | ||
|
|
b2282f9d7a | ||
|
|
326f0270b9 | ||
|
|
fd47a6c038 | ||
|
|
77cfffaaed | ||
|
|
118893678b | ||
|
|
ae3985de46 | ||
|
|
49f36e0b2b | ||
|
|
4a7082b27c | ||
|
|
3aa53a06fb | ||
|
|
a0afbefef4 | ||
|
|
5b554f7062 | ||
|
|
0411a90be9 | ||
|
|
9ce7938fdc | ||
|
|
650dc88da5 | ||
|
|
3601763442 | ||
|
|
4aaaea1e04 | ||
|
|
2e214c85d3 | ||
|
|
0676ab67df | ||
|
|
0b843512dd | ||
|
|
85d9416750 | ||
|
|
5e3cf3c807 | ||
|
|
4dfb3a48c3 | ||
|
|
21e8883877 | ||
|
|
7dbfe07c97 | ||
|
|
8bc01a993b | ||
|
|
e2728d84f3 |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -3,3 +3,5 @@ Cargo.lock
|
||||
.cargo
|
||||
*.swp
|
||||
*.bk
|
||||
tarpc.iml
|
||||
.idea
|
||||
|
||||
24
Cargo.toml
24
Cargo.toml
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tarpc"
|
||||
version = "0.7.2"
|
||||
version = "0.12.0"
|
||||
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
|
||||
license = "MIT"
|
||||
documentation = "https://docs.rs/tarpc"
|
||||
@@ -15,36 +15,38 @@ description = "An RPC framework for Rust with a focus on ease of use."
|
||||
travis-ci = { repository = "google/tarpc" }
|
||||
|
||||
[dependencies]
|
||||
bincode = "1.0.0-alpha6"
|
||||
bincode = "1.0"
|
||||
byteorder = "1.0"
|
||||
bytes = "0.4"
|
||||
cfg-if = "0.1.0"
|
||||
futures = "0.1.11"
|
||||
lazy_static = "0.2"
|
||||
log = "0.3"
|
||||
lazy_static = "1.0"
|
||||
log = "0.4"
|
||||
net2 = "0.2"
|
||||
num_cpus = "1.0"
|
||||
serde = "0.9"
|
||||
serde_derive = "0.9"
|
||||
tarpc-plugins = { path = "src/plugins", version = "0.1.1" }
|
||||
serde = "1.0"
|
||||
serde_derive = "1.0"
|
||||
tarpc-plugins = { path = "src/plugins", version = "0.4.0" }
|
||||
thread-pool = "0.1.1"
|
||||
tokio-codec = "0.1"
|
||||
tokio-core = "0.1.6"
|
||||
tokio-io = "0.1"
|
||||
tokio-proto = "0.1.1"
|
||||
tokio-service = "0.1"
|
||||
|
||||
# Optional dependencies
|
||||
native-tls = { version = "0.1.1", optional = true }
|
||||
native-tls = { version = "0.1", optional = true }
|
||||
tokio-tls = { version = "0.1", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
chrono = "0.3"
|
||||
env_logger = "0.3"
|
||||
chrono = "0.4"
|
||||
env_logger = "0.5"
|
||||
futures-cpupool = "0.1"
|
||||
clap = "2.0"
|
||||
serde_bytes = "0.10"
|
||||
|
||||
[target.'cfg(target_os = "macos")'.dev-dependencies]
|
||||
security-framework = "0.1"
|
||||
security-framework = "0.2"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
|
||||
12
README.md
12
README.md
@@ -11,7 +11,7 @@ tarpc is an RPC framework for rust with a focus on ease of use. Defining a
|
||||
service can be done in just a few lines of code, and most of the boilerplate of
|
||||
writing a server is taken care of for you.
|
||||
|
||||
[Documentation](https://docs.rs/tarpc)
|
||||
[Documentation](https://docs.rs/crate/tarpc/)
|
||||
|
||||
## What is an RPC framework?
|
||||
"RPC" stands for "Remote Procedure Call," a function call where the work of
|
||||
@@ -37,8 +37,8 @@ arguments to tarpc fns.
|
||||
Add to your `Cargo.toml` dependencies:
|
||||
|
||||
```toml
|
||||
tarpc = "0.7.2"
|
||||
tarpc-plugins = "0.1.1"
|
||||
tarpc = "0.12.0"
|
||||
tarpc-plugins = "0.4.0"
|
||||
```
|
||||
|
||||
## Example: Sync
|
||||
@@ -47,7 +47,7 @@ tarpc has two APIs: `sync` for blocking code and `future` for asynchronous
|
||||
code. Here's how to use the sync api.
|
||||
|
||||
```rust
|
||||
#![feature(plugin)]
|
||||
#![feature(plugin, use_extern_macros, proc_macro_path_invoc)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
#[macro_use]
|
||||
@@ -100,7 +100,7 @@ races! See the `tarpc_examples` package for more examples.
|
||||
Here's the same service, implemented using futures.
|
||||
|
||||
```rust
|
||||
#![feature(plugin)]
|
||||
#![feature(plugin, use_extern_macros, proc_macro_path_invoc)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate futures;
|
||||
@@ -171,7 +171,7 @@ However, if you are working with both stream types, ensure that you use the TLS
|
||||
servers and TCP clients with TCP servers.
|
||||
|
||||
```rust,no_run
|
||||
#![feature(plugin)]
|
||||
#![feature(plugin, use_extern_macros, proc_macro_path_invoc)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate futures;
|
||||
|
||||
39
RELEASES.md
39
RELEASES.md
@@ -1,3 +1,42 @@
|
||||
## 0.10.0 (2018-04-08)
|
||||
|
||||
## Breaking Changes
|
||||
Fixed rustc breakage in tarpc-plugins. These changes require a recent version of rustc.
|
||||
|
||||
## 0.10.0 (2018-03-26)
|
||||
|
||||
## Breaking Changes
|
||||
Updates bincode to version 1.0.
|
||||
|
||||
## 0.9.0 (2017-09-17)
|
||||
|
||||
## Breaking Changes
|
||||
Updates tarpc to use tarpc-plugins 0.2.
|
||||
|
||||
## 0.8.0 (2017-05-05)
|
||||
|
||||
## Breaking Changes
|
||||
This release updates tarpc to use serde 1.0.
|
||||
As such, users must also update to use serde 1.0.
|
||||
The serde 1.0 [release notes](https://github.com/serde-rs/serde/releases/tag/v1.0.0)
|
||||
detail migration paths.
|
||||
|
||||
## 0.7.3 (2017-04-26)
|
||||
|
||||
This release removes the `Sync` bound on RPC args for both sync and future
|
||||
clients. No breaking changes.
|
||||
|
||||
## 0.7.2 (2017-04-22)
|
||||
|
||||
## Breaking Changes
|
||||
This release updates tarpc-plugins to work with rustc master. Thus, older
|
||||
versions of rustc are no longer supported. We chose a minor version bump
|
||||
because it is still source-compatible with existing code using tarpc.
|
||||
|
||||
## 0.7.1 (2017-03-31)
|
||||
|
||||
This release was purely doc fixes. No breaking changes.
|
||||
|
||||
## 0.7 (2017-03-31)
|
||||
|
||||
## Breaking Changes
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(plugin, test)]
|
||||
#![feature(plugin, test, use_extern_macros, proc_macro_path_invoc)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
#[macro_use]
|
||||
@@ -38,15 +38,20 @@ impl FutureService for Server {
|
||||
#[cfg(test)]
|
||||
#[bench]
|
||||
fn latency(bencher: &mut Bencher) {
|
||||
let _ = env_logger::init();
|
||||
let _ = env_logger::try_init();
|
||||
let mut reactor = reactor::Core::new().unwrap();
|
||||
let (handle, server) = Server.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
let (handle, server) = Server
|
||||
.listen(
|
||||
"localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default(),
|
||||
)
|
||||
.unwrap();
|
||||
reactor.handle().spawn(server);
|
||||
let client = FutureClient::connect(handle.addr(),
|
||||
client::Options::default().handle(reactor.handle()));
|
||||
let client = FutureClient::connect(
|
||||
handle.addr(),
|
||||
client::Options::default().handle(reactor.handle()),
|
||||
);
|
||||
let client = reactor.run(client).unwrap();
|
||||
|
||||
bencher.iter(|| reactor.run(client.ack()).unwrap());
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(inclusive_range_syntax, conservative_impl_trait, plugin, never_type)]
|
||||
#![feature(plugin, never_type, use_extern_macros, proc_macro_path_invoc)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate chrono;
|
||||
@@ -12,7 +12,7 @@ extern crate env_logger;
|
||||
extern crate futures;
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
extern crate serde;
|
||||
extern crate serde_bytes;
|
||||
#[macro_use]
|
||||
extern crate tarpc;
|
||||
extern crate tokio_core;
|
||||
@@ -31,7 +31,7 @@ use tarpc::util::{FirstSocketAddr, Never};
|
||||
use tokio_core::reactor;
|
||||
|
||||
service! {
|
||||
rpc read(size: u32) -> serde::bytes::ByteBuf;
|
||||
rpc read(size: u32) -> serde_bytes::ByteBuf;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -50,20 +50,19 @@ impl Server {
|
||||
}
|
||||
|
||||
impl FutureService for Server {
|
||||
type ReadFut = CpuFuture<serde::bytes::ByteBuf, Never>;
|
||||
type ReadFut = CpuFuture<serde_bytes::ByteBuf, Never>;
|
||||
|
||||
fn read(&self, size: u32) -> Self::ReadFut {
|
||||
let request_number = self.request_count.fetch_add(1, Ordering::SeqCst);
|
||||
debug!("Server received read({}) no. {}", size, request_number);
|
||||
self.pool
|
||||
.spawn(futures::lazy(move || {
|
||||
let mut vec = Vec::with_capacity(size as usize);
|
||||
for i in 0..size {
|
||||
vec.push(((i % 2) << 8) as u8);
|
||||
}
|
||||
debug!("Server sending response no. {}", request_number);
|
||||
Ok(vec.into())
|
||||
}))
|
||||
self.pool.spawn(futures::lazy(move || {
|
||||
let mut vec = Vec::with_capacity(size as usize);
|
||||
for i in 0..size {
|
||||
vec.push(((i % 2) << 8) as u8);
|
||||
}
|
||||
debug!("Server sending response no. {}", request_number);
|
||||
Ok(vec.into())
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,25 +102,30 @@ fn spawn_core() -> reactor::Remote {
|
||||
rx.recv().unwrap()
|
||||
}
|
||||
|
||||
fn run_once(clients: Vec<FutureClient>,
|
||||
concurrency: u32)
|
||||
-> impl Future<Item = (), Error = ()> + 'static {
|
||||
fn run_once(
|
||||
clients: Vec<FutureClient>,
|
||||
concurrency: u32,
|
||||
) -> impl Future<Item = (), Error = ()> + 'static {
|
||||
let start = Instant::now();
|
||||
futures::stream::futures_unordered((0..concurrency as usize)
|
||||
futures::stream::futures_unordered(
|
||||
(0..concurrency as usize)
|
||||
.zip(clients.iter().enumerate().cycle())
|
||||
.map(|(iteration, (client_idx, client))| {
|
||||
let start = Instant::now();
|
||||
debug!("Client {} reading (iteration {})...", client_idx, iteration);
|
||||
client.read(CHUNK_SIZE)
|
||||
client
|
||||
.read(CHUNK_SIZE)
|
||||
.map(move |_| (client_idx, iteration, start))
|
||||
}))
|
||||
.map(|(client_idx, iteration, start)| {
|
||||
let elapsed = start.elapsed();
|
||||
debug!("Client {} received reply (iteration {}).",
|
||||
client_idx,
|
||||
iteration);
|
||||
elapsed
|
||||
})
|
||||
}),
|
||||
).map(|(client_idx, iteration, start)| {
|
||||
let elapsed = start.elapsed();
|
||||
debug!(
|
||||
"Client {} received reply (iteration {}).",
|
||||
client_idx,
|
||||
iteration
|
||||
);
|
||||
elapsed
|
||||
})
|
||||
.map_err(|e| panic!(e))
|
||||
.fold(Stats::default(), move |mut stats, elapsed| {
|
||||
stats.sum += elapsed;
|
||||
@@ -131,46 +135,58 @@ fn run_once(clients: Vec<FutureClient>,
|
||||
Ok(stats)
|
||||
})
|
||||
.map(move |stats| {
|
||||
info!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs",
|
||||
stats.count,
|
||||
stats.sum.microseconds() as f64 / stats.count as f64,
|
||||
stats.min.unwrap().microseconds(),
|
||||
stats.max.unwrap().microseconds(),
|
||||
start.elapsed().microseconds());
|
||||
info!(
|
||||
"{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs",
|
||||
stats.count,
|
||||
stats.sum.microseconds() as f64 / stats.count as f64,
|
||||
stats.min.unwrap().microseconds(),
|
||||
stats.max.unwrap().microseconds(),
|
||||
start.elapsed().microseconds()
|
||||
);
|
||||
})
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let _ = env_logger::init();
|
||||
env_logger::init();
|
||||
let matches = App::new("Tarpc Concurrency")
|
||||
.about("Demonstrates making concurrent requests to a tarpc service.")
|
||||
.arg(Arg::with_name("concurrency")
|
||||
.short("c")
|
||||
.long("concurrency")
|
||||
.value_name("LEVEL")
|
||||
.help("Sets a custom concurrency level")
|
||||
.takes_value(true))
|
||||
.arg(Arg::with_name("clients")
|
||||
.short("n")
|
||||
.long("num_clients")
|
||||
.value_name("AMOUNT")
|
||||
.help("How many clients to distribute requests between")
|
||||
.takes_value(true))
|
||||
.about(
|
||||
"Demonstrates making concurrent requests to a tarpc service.",
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("concurrency")
|
||||
.short("c")
|
||||
.long("concurrency")
|
||||
.value_name("LEVEL")
|
||||
.help("Sets a custom concurrency level")
|
||||
.takes_value(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("clients")
|
||||
.short("n")
|
||||
.long("num_clients")
|
||||
.value_name("AMOUNT")
|
||||
.help("How many clients to distribute requests between")
|
||||
.takes_value(true),
|
||||
)
|
||||
.get_matches();
|
||||
let concurrency = matches.value_of("concurrency")
|
||||
let concurrency = matches
|
||||
.value_of("concurrency")
|
||||
.map(&str::parse)
|
||||
.map(Result::unwrap)
|
||||
.unwrap_or(10);
|
||||
let num_clients = matches.value_of("clients")
|
||||
let num_clients = matches
|
||||
.value_of("clients")
|
||||
.map(&str::parse)
|
||||
.map(Result::unwrap)
|
||||
.unwrap_or(4);
|
||||
|
||||
let mut reactor = reactor::Core::new().unwrap();
|
||||
let (handle, server) = Server::new()
|
||||
.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
.listen(
|
||||
"localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default(),
|
||||
)
|
||||
.unwrap();
|
||||
reactor.handle().spawn(server);
|
||||
info!("Server listening on {}.", handle.addr());
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(plugin)]
|
||||
#![feature(plugin, use_extern_macros, proc_macro_path_invoc)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate env_logger;
|
||||
@@ -58,10 +58,7 @@ impl subscriber::FutureService for Subscriber {
|
||||
}
|
||||
|
||||
impl Subscriber {
|
||||
fn listen(id: u32,
|
||||
handle: &reactor::Handle,
|
||||
options: server::Options)
|
||||
-> server::Handle {
|
||||
fn listen(id: u32, handle: &reactor::Handle, options: server::Options) -> server::Handle {
|
||||
let (server_handle, server) = Subscriber { id: id }
|
||||
.listen("localhost:0".first_socket_addr(), handle, options)
|
||||
.unwrap();
|
||||
@@ -77,7 +74,9 @@ struct Publisher {
|
||||
|
||||
impl Publisher {
|
||||
fn new() -> Publisher {
|
||||
Publisher { clients: Rc::new(RefCell::new(HashMap::new())) }
|
||||
Publisher {
|
||||
clients: Rc::new(RefCell::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,14 +100,16 @@ impl publisher::FutureService for Publisher {
|
||||
type SubscribeFut = Box<Future<Item = (), Error = Message>>;
|
||||
|
||||
fn subscribe(&self, id: u32, address: SocketAddr) -> Self::SubscribeFut {
|
||||
let clients = self.clients.clone();
|
||||
Box::new(subscriber::FutureClient::connect(address, client::Options::default())
|
||||
.map(move |subscriber| {
|
||||
println!("Subscribing {}.", id);
|
||||
clients.borrow_mut().insert(id, subscriber);
|
||||
()
|
||||
})
|
||||
.map_err(|e| e.to_string().into()))
|
||||
let clients = Rc::clone(&self.clients);
|
||||
Box::new(
|
||||
subscriber::FutureClient::connect(address, client::Options::default())
|
||||
.map(move |subscriber| {
|
||||
println!("Subscribing {}.", id);
|
||||
clients.borrow_mut().insert(id, subscriber);
|
||||
()
|
||||
})
|
||||
.map_err(|e| e.to_string().into()),
|
||||
)
|
||||
}
|
||||
|
||||
type UnsubscribeFut = Box<Future<Item = (), Error = Never>>;
|
||||
@@ -116,36 +117,44 @@ impl publisher::FutureService for Publisher {
|
||||
fn unsubscribe(&self, id: u32) -> Self::UnsubscribeFut {
|
||||
println!("Unsubscribing {}", id);
|
||||
self.clients.borrow_mut().remove(&id).unwrap();
|
||||
futures::finished(()).boxed()
|
||||
Box::new(futures::finished(()))
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let _ = env_logger::init();
|
||||
env_logger::init();
|
||||
let mut reactor = reactor::Core::new().unwrap();
|
||||
let (publisher_handle, server) = Publisher::new()
|
||||
.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
.listen(
|
||||
"localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default(),
|
||||
)
|
||||
.unwrap();
|
||||
reactor.handle().spawn(server);
|
||||
|
||||
let subscriber1 = Subscriber::listen(0, &reactor.handle(), server::Options::default());
|
||||
let subscriber2 = Subscriber::listen(1, &reactor.handle(), server::Options::default());
|
||||
|
||||
let publisher =
|
||||
reactor.run(publisher::FutureClient::connect(publisher_handle.addr(),
|
||||
client::Options::default()))
|
||||
.unwrap();
|
||||
reactor.run(publisher.subscribe(0, subscriber1.addr())
|
||||
.and_then(|_| publisher.subscribe(1, subscriber2.addr()))
|
||||
.map_err(|e| panic!(e))
|
||||
.and_then(|_| {
|
||||
println!("Broadcasting...");
|
||||
publisher.broadcast("hello to all".to_string())
|
||||
})
|
||||
.and_then(|_| publisher.unsubscribe(1))
|
||||
.and_then(|_| publisher.broadcast("hi again".to_string())))
|
||||
let publisher = reactor
|
||||
.run(publisher::FutureClient::connect(
|
||||
publisher_handle.addr(),
|
||||
client::Options::default(),
|
||||
))
|
||||
.unwrap();
|
||||
reactor
|
||||
.run(
|
||||
publisher
|
||||
.subscribe(0, subscriber1.addr())
|
||||
.and_then(|_| publisher.subscribe(1, subscriber2.addr()))
|
||||
.map_err(|e| panic!(e))
|
||||
.and_then(|_| {
|
||||
println!("Broadcasting...");
|
||||
publisher.broadcast("hello to all".to_string())
|
||||
})
|
||||
.and_then(|_| publisher.unsubscribe(1))
|
||||
.and_then(|_| publisher.broadcast("hi again".to_string())),
|
||||
)
|
||||
.unwrap();
|
||||
thread::sleep(Duration::from_millis(300));
|
||||
}
|
||||
|
||||
@@ -3,15 +3,13 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(plugin)]
|
||||
#![feature(plugin, use_extern_macros, proc_macro_path_invoc)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate futures;
|
||||
#[macro_use]
|
||||
extern crate tarpc;
|
||||
#[macro_use]
|
||||
extern crate serde_derive;
|
||||
extern crate tokio_core;
|
||||
|
||||
use std::error::Error;
|
||||
use std::fmt;
|
||||
@@ -55,7 +53,9 @@ impl SyncService for HelloServer {
|
||||
fn main() {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let handle = HelloServer.listen("localhost:10000", server::Options::default()).unwrap();
|
||||
let handle = HelloServer
|
||||
.listen("localhost:10000", server::Options::default())
|
||||
.unwrap();
|
||||
tx.send(handle.addr()).unwrap();
|
||||
handle.run();
|
||||
});
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(plugin)]
|
||||
#![feature(plugin, use_extern_macros, proc_macro_path_invoc)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate futures;
|
||||
@@ -34,16 +34,22 @@ impl FutureService for HelloServer {
|
||||
|
||||
fn main() {
|
||||
let mut reactor = reactor::Core::new().unwrap();
|
||||
let (handle, server) = HelloServer.listen("localhost:10000".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
let (handle, server) = HelloServer
|
||||
.listen(
|
||||
"localhost:10000".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default(),
|
||||
)
|
||||
.unwrap();
|
||||
reactor.handle().spawn(server);
|
||||
|
||||
let options = client::Options::default().handle(reactor.handle());
|
||||
reactor.run(FutureClient::connect(handle.addr(), options)
|
||||
.map_err(tarpc::Error::from)
|
||||
.and_then(|client| client.hello("Mom".to_string()))
|
||||
.map(|resp| println!("{}", resp)))
|
||||
reactor
|
||||
.run(
|
||||
FutureClient::connect(handle.addr(), options)
|
||||
.map_err(tarpc::Error::from)
|
||||
.and_then(|client| client.hello("Mom".to_string()))
|
||||
.map(|resp| println!("{}", resp)),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
@@ -4,13 +4,11 @@
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
// required by `FutureClient` (not used directly in this example)
|
||||
#![feature(plugin)]
|
||||
#![feature(plugin, use_extern_macros, proc_macro_path_invoc)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate futures;
|
||||
#[macro_use]
|
||||
extern crate tarpc;
|
||||
extern crate tokio_core;
|
||||
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
@@ -27,14 +25,20 @@ struct HelloServer;
|
||||
|
||||
impl SyncService for HelloServer {
|
||||
fn hello(&self, name: String) -> Result<String, Never> {
|
||||
Ok(format!("Hello from thread {}, {}!", thread::current().name().unwrap(), name))
|
||||
Ok(format!(
|
||||
"Hello from thread {}, {}!",
|
||||
thread::current().name().unwrap(),
|
||||
name
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let handle = HelloServer.listen("localhost:0", server::Options::default()).unwrap();
|
||||
let handle = HelloServer
|
||||
.listen("localhost:0", server::Options::default())
|
||||
.unwrap();
|
||||
tx.send(handle.addr()).unwrap();
|
||||
handle.run();
|
||||
});
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(plugin)]
|
||||
#![feature(plugin, use_extern_macros, proc_macro_path_invoc)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate env_logger;
|
||||
@@ -14,7 +14,7 @@ extern crate tokio_core;
|
||||
|
||||
use add::{FutureService as AddFutureService, FutureServiceExt as AddExt};
|
||||
use double::{FutureService as DoubleFutureService, FutureServiceExt as DoubleExt};
|
||||
use futures::{BoxFuture, Future, Stream};
|
||||
use futures::{Future, Stream};
|
||||
use tarpc::future::{client, server};
|
||||
use tarpc::future::client::ClientExt as Fc;
|
||||
use tarpc::util::{FirstSocketAddr, Message, Never};
|
||||
@@ -59,43 +59,55 @@ impl DoubleServer {
|
||||
}
|
||||
|
||||
impl DoubleFutureService for DoubleServer {
|
||||
type DoubleFut = BoxFuture<i32, Message>;
|
||||
type DoubleFut = Box<Future<Item=i32, Error=Message>>;
|
||||
|
||||
fn double(&self, x: i32) -> Self::DoubleFut {
|
||||
self.client
|
||||
Box::new(self.client
|
||||
.add(x, x)
|
||||
.map_err(|e| e.to_string().into())
|
||||
.boxed()
|
||||
.map_err(|e| e.to_string().into()))
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let _ = env_logger::init();
|
||||
env_logger::init();
|
||||
let mut reactor = reactor::Core::new().unwrap();
|
||||
let (add, server) = AddServer.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
let (add, server) = AddServer
|
||||
.listen(
|
||||
"localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default(),
|
||||
)
|
||||
.unwrap();
|
||||
reactor.handle().spawn(server);
|
||||
|
||||
let options = client::Options::default().handle(reactor.handle());
|
||||
let add_client = reactor.run(add::FutureClient::connect(add.addr(), options)).unwrap();
|
||||
let add_client = reactor
|
||||
.run(add::FutureClient::connect(add.addr(), options))
|
||||
.unwrap();
|
||||
|
||||
let (double, server) = DoubleServer::new(add_client)
|
||||
.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
.listen(
|
||||
"localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default(),
|
||||
)
|
||||
.unwrap();
|
||||
reactor.handle().spawn(server);
|
||||
|
||||
let double_client =
|
||||
reactor.run(double::FutureClient::connect(double.addr(), client::Options::default()))
|
||||
.unwrap();
|
||||
reactor.run(futures::stream::futures_unordered((0..5).map(|i| double_client.double(i)))
|
||||
.map_err(|e| println!("{}", e))
|
||||
.for_each(|i| {
|
||||
println!("{:?}", i);
|
||||
Ok(())
|
||||
}))
|
||||
let double_client = reactor
|
||||
.run(double::FutureClient::connect(
|
||||
double.addr(),
|
||||
client::Options::default(),
|
||||
))
|
||||
.unwrap();
|
||||
reactor
|
||||
.run(
|
||||
futures::stream::futures_unordered((0..5).map(|i| double_client.double(i)))
|
||||
.map_err(|e| println!("{}", e))
|
||||
.for_each(|i| {
|
||||
println!("{:?}", i);
|
||||
Ok(())
|
||||
}),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
@@ -3,14 +3,12 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(plugin)]
|
||||
#![feature(plugin, use_extern_macros, proc_macro_path_invoc)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate env_logger;
|
||||
#[macro_use]
|
||||
extern crate tarpc;
|
||||
extern crate futures;
|
||||
extern crate tokio_core;
|
||||
|
||||
use add::{SyncService as AddSyncService, SyncServiceExt as AddExt};
|
||||
use double::{SyncService as DoubleSyncService, SyncServiceExt as DoubleExt};
|
||||
@@ -58,18 +56,19 @@ impl DoubleServer {
|
||||
|
||||
impl DoubleSyncService for DoubleServer {
|
||||
fn double(&self, x: i32) -> Result<i32, Message> {
|
||||
self.client
|
||||
.add(x, x)
|
||||
.map_err(|e| e.to_string().into())
|
||||
self.client.add(x, x).map_err(|e| e.to_string().into())
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let _ = env_logger::init();
|
||||
env_logger::init();
|
||||
let (tx, rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let handle = AddServer.listen("localhost:0".first_socket_addr(),
|
||||
server::Options::default())
|
||||
let handle = AddServer
|
||||
.listen(
|
||||
"localhost:0".first_socket_addr(),
|
||||
server::Options::default(),
|
||||
)
|
||||
.unwrap();
|
||||
tx.send(handle.addr()).unwrap();
|
||||
handle.run();
|
||||
@@ -81,8 +80,10 @@ fn main() {
|
||||
thread::spawn(move || {
|
||||
let add_client = add::SyncClient::connect(add, client::Options::default()).unwrap();
|
||||
let handle = DoubleServer::new(add_client)
|
||||
.listen("localhost:0".first_socket_addr(),
|
||||
server::Options::default())
|
||||
.listen(
|
||||
"localhost:0".first_socket_addr(),
|
||||
server::Options::default(),
|
||||
)
|
||||
.unwrap();
|
||||
tx.send(handle.addr()).unwrap();
|
||||
handle.run();
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(plugin)]
|
||||
#![feature(plugin, use_extern_macros, proc_macro_path_invoc)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
#[macro_use]
|
||||
@@ -11,13 +11,11 @@ extern crate lazy_static;
|
||||
#[macro_use]
|
||||
extern crate tarpc;
|
||||
extern crate env_logger;
|
||||
extern crate futures;
|
||||
extern crate serde;
|
||||
extern crate serde_bytes;
|
||||
extern crate tokio_core;
|
||||
|
||||
use std::io::{Read, Write, stdout};
|
||||
use std::net;
|
||||
use std::sync::Arc;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
use std::time;
|
||||
@@ -27,7 +25,7 @@ use tarpc::util::{FirstSocketAddr, Never};
|
||||
use tokio_core::reactor;
|
||||
|
||||
lazy_static! {
|
||||
static ref BUF: Arc<serde::bytes::ByteBuf> = Arc::new(gen_vec(CHUNK_SIZE as usize).into());
|
||||
static ref BUF: serde_bytes::ByteBuf = gen_vec(CHUNK_SIZE as usize).into();
|
||||
}
|
||||
|
||||
fn gen_vec(size: usize) -> Vec<u8> {
|
||||
@@ -39,14 +37,14 @@ fn gen_vec(size: usize) -> Vec<u8> {
|
||||
}
|
||||
|
||||
service! {
|
||||
rpc read() -> Arc<serde::bytes::ByteBuf>;
|
||||
rpc read() -> serde_bytes::ByteBuf;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Server;
|
||||
|
||||
impl FutureService for Server {
|
||||
type ReadFut = Result<Arc<serde::bytes::ByteBuf>, Never>;
|
||||
type ReadFut = Result<serde_bytes::ByteBuf, Never>;
|
||||
|
||||
fn read(&self) -> Self::ReadFut {
|
||||
Ok(BUF.clone())
|
||||
@@ -59,15 +57,18 @@ fn bench_tarpc(target: u64) {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let mut reactor = reactor::Core::new().unwrap();
|
||||
let (addr, server) = Server.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
let (addr, server) = Server
|
||||
.listen(
|
||||
"localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default(),
|
||||
)
|
||||
.unwrap();
|
||||
tx.send(addr).unwrap();
|
||||
reactor.run(server).unwrap();
|
||||
});
|
||||
let client = SyncClient::connect(rx.recv().unwrap().addr(), client::Options::default())
|
||||
.unwrap();
|
||||
let client =
|
||||
SyncClient::connect(rx.recv().unwrap().addr(), client::Options::default()).unwrap();
|
||||
let start = time::Instant::now();
|
||||
let mut nread = 0;
|
||||
while nread < target {
|
||||
@@ -77,9 +78,11 @@ fn bench_tarpc(target: u64) {
|
||||
}
|
||||
println!("done");
|
||||
let duration = time::Instant::now() - start;
|
||||
println!("TARPC: {}MB/s",
|
||||
(target as f64 / (1024f64 * 1024f64)) /
|
||||
(duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 10E9));
|
||||
println!(
|
||||
"TARPC: {}MB/s",
|
||||
(target as f64 / (1024f64 * 1024f64)) /
|
||||
(duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 10E9)
|
||||
);
|
||||
}
|
||||
|
||||
fn bench_tcp(target: u64) {
|
||||
@@ -101,13 +104,15 @@ fn bench_tcp(target: u64) {
|
||||
}
|
||||
println!("done");
|
||||
let duration = time::Instant::now() - start;
|
||||
println!("TCP: {}MB/s",
|
||||
(target as f64 / (1024f64 * 1024f64)) /
|
||||
(duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 10E9));
|
||||
println!(
|
||||
"TCP: {}MB/s",
|
||||
(target as f64 / (1024f64 * 1024f64)) /
|
||||
(duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 10E9)
|
||||
);
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let _ = env_logger::init();
|
||||
env_logger::init();
|
||||
let _ = *BUF; // To non-lazily initialize it.
|
||||
bench_tcp(256 << 20);
|
||||
bench_tarpc(256 << 20);
|
||||
|
||||
@@ -3,16 +3,14 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(plugin)]
|
||||
#![feature(plugin, use_extern_macros, proc_macro_path_invoc)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
#[macro_use]
|
||||
extern crate tarpc;
|
||||
extern crate bincode;
|
||||
extern crate env_logger;
|
||||
extern crate futures;
|
||||
extern crate tokio_core;
|
||||
|
||||
use bar::FutureServiceExt as BarExt;
|
||||
@@ -57,20 +55,17 @@ impl baz::FutureService for Baz {
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! pos {
|
||||
() => (concat!(file!(), ":", line!()))
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let _ = env_logger::init();
|
||||
env_logger::init();
|
||||
let bar_client = {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let mut reactor = reactor::Core::new().unwrap();
|
||||
let (handle, server) = Bar.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
.unwrap();
|
||||
let (handle, server) = Bar.listen(
|
||||
"localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default(),
|
||||
).unwrap();
|
||||
tx.send(handle).unwrap();
|
||||
reactor.run(server).unwrap();
|
||||
});
|
||||
@@ -82,10 +77,11 @@ fn main() {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let mut reactor = reactor::Core::new().unwrap();
|
||||
let (handle, server) = Baz.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
.unwrap();
|
||||
let (handle, server) = Baz.listen(
|
||||
"localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default(),
|
||||
).unwrap();
|
||||
tx.send(handle).unwrap();
|
||||
reactor.run(server).unwrap();
|
||||
});
|
||||
|
||||
@@ -67,7 +67,7 @@ else
|
||||
fi
|
||||
|
||||
printf "${PREFIX} Checking for rustfmt ... "
|
||||
command -v rustfmt &>/dev/null
|
||||
command -v cargo fmt &>/dev/null
|
||||
if [ $? == 0 ]; then
|
||||
printf "${SUCCESS}\n"
|
||||
else
|
||||
@@ -93,7 +93,7 @@ diff=""
|
||||
for file in $(git diff --name-only --cached);
|
||||
do
|
||||
if [ ${file: -3} == ".rs" ]; then
|
||||
diff="$diff$(rustfmt --skip-children --write-mode=diff $file)"
|
||||
diff="$diff$(cargo fmt -- --skip-children --write-mode=diff $file)"
|
||||
fi
|
||||
done
|
||||
if grep --quiet "^Diff at line" <<< "$diff"; then
|
||||
|
||||
@@ -1,2 +1 @@
|
||||
ideal_width = 100
|
||||
reorder_imports = true
|
||||
|
||||
@@ -29,7 +29,7 @@ pub enum Error<E> {
|
||||
App(E),
|
||||
}
|
||||
|
||||
impl<E: StdError + Deserialize + Serialize + Send + 'static> fmt::Display for Error<E> {
|
||||
impl<'a, E: StdError + Deserialize<'a> + Serialize + Send + 'static> fmt::Display for Error<E> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match *self {
|
||||
Error::ResponseDeserialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e),
|
||||
@@ -40,7 +40,7 @@ impl<E: StdError + Deserialize + Serialize + Send + 'static> fmt::Display for Er
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: StdError + Deserialize + Serialize + Send + 'static> StdError for Error<E> {
|
||||
impl<'a, E: StdError + Deserialize<'a> + Serialize + Send + 'static> StdError for Error<E> {
|
||||
fn description(&self) -> &str {
|
||||
match *self {
|
||||
Error::ResponseDeserialize(_) => "The client failed to deserialize the response.",
|
||||
@@ -53,8 +53,7 @@ impl<E: StdError + Deserialize + Serialize + Send + 'static> StdError for Error<
|
||||
fn cause(&self) -> Option<&StdError> {
|
||||
match *self {
|
||||
Error::ResponseDeserialize(ref e) => e.cause(),
|
||||
Error::RequestDeserialize(_) |
|
||||
Error::App(_) => None,
|
||||
Error::RequestDeserialize(_) | Error::App(_) => None,
|
||||
Error::Io(ref e) => e.cause(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,8 @@ use {REMOTE, bincode};
|
||||
use future::server::Response;
|
||||
use futures::{self, Future, future};
|
||||
use protocol::Proto;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde::Serialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
@@ -89,40 +90,46 @@ enum Reactor {
|
||||
|
||||
impl fmt::Debug for Reactor {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
const HANDLE: &'static &'static str = &"Reactor::Handle";
|
||||
const HANDLE_INNER: &'static &'static str = &"Handle { .. }";
|
||||
const REMOTE: &'static &'static str = &"Reactor::Remote";
|
||||
const REMOTE_INNER: &'static &'static str = &"Remote { .. }";
|
||||
const HANDLE: &str = "Reactor::Handle";
|
||||
const HANDLE_INNER: &str = "Handle { .. }";
|
||||
const REMOTE: &str = "Reactor::Remote";
|
||||
const REMOTE_INNER: &str = "Remote { .. }";
|
||||
|
||||
match *self {
|
||||
Reactor::Handle(_) => f.debug_tuple(HANDLE).field(HANDLE_INNER).finish(),
|
||||
Reactor::Remote(_) => f.debug_tuple(REMOTE).field(REMOTE_INNER).finish(),
|
||||
Reactor::Handle(_) => f.debug_tuple(HANDLE).field(&HANDLE_INNER).finish(),
|
||||
Reactor::Remote(_) => f.debug_tuple(REMOTE).field(&REMOTE_INNER).finish(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub struct Client<Req, Resp, E>
|
||||
where Req: Serialize + 'static,
|
||||
Resp: Deserialize + 'static,
|
||||
E: Deserialize + 'static
|
||||
where
|
||||
Req: Serialize + 'static,
|
||||
Resp: DeserializeOwned + 'static,
|
||||
E: DeserializeOwned + 'static,
|
||||
{
|
||||
inner: ClientService<StreamType, Proto<Req, Response<Resp, E>>>,
|
||||
}
|
||||
|
||||
impl<Req, Resp, E> Clone for Client<Req, Resp, E>
|
||||
where Req: Serialize + 'static,
|
||||
Resp: Deserialize + 'static,
|
||||
E: Deserialize + 'static
|
||||
where
|
||||
Req: Serialize + 'static,
|
||||
Resp: DeserializeOwned + 'static,
|
||||
E: DeserializeOwned + 'static,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
Client { inner: self.inner.clone() }
|
||||
Client {
|
||||
inner: self.inner.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp, E> Service for Client<Req, Resp, E>
|
||||
where Req: Serialize + Sync + Send + 'static,
|
||||
Resp: Deserialize + Sync + Send + 'static,
|
||||
E: Deserialize + Sync + Send + 'static
|
||||
where
|
||||
Req: Serialize + Send + 'static,
|
||||
Resp: DeserializeOwned + Send + 'static,
|
||||
E: DeserializeOwned + Send + 'static,
|
||||
{
|
||||
type Request = Req;
|
||||
type Response = Resp;
|
||||
@@ -142,16 +149,18 @@ impl<Req, Resp, E> Service for Client<Req, Resp, E>
|
||||
}
|
||||
|
||||
impl<Req, Resp, E> Client<Req, Resp, E>
|
||||
where Req: Serialize + 'static,
|
||||
Resp: Deserialize + 'static,
|
||||
E: Deserialize + 'static
|
||||
where
|
||||
Req: Serialize + 'static,
|
||||
Resp: DeserializeOwned + 'static,
|
||||
E: DeserializeOwned + 'static,
|
||||
{
|
||||
fn bind(handle: &reactor::Handle, tcp: StreamType, max_payload_size: u64) -> Self
|
||||
where Req: Serialize + Sync + Send + 'static,
|
||||
Resp: Deserialize + Sync + Send + 'static,
|
||||
E: Deserialize + Sync + Send + 'static
|
||||
where
|
||||
Req: Serialize + Send + 'static,
|
||||
Resp: DeserializeOwned + Send + 'static,
|
||||
E: DeserializeOwned + Send + 'static,
|
||||
{
|
||||
let inner = Proto::new(max_payload_size).bind_client(&handle, tcp);
|
||||
let inner = Proto::new(max_payload_size).bind_client(handle, tcp);
|
||||
Client { inner }
|
||||
}
|
||||
|
||||
@@ -163,9 +172,10 @@ impl<Req, Resp, E> Client<Req, Resp, E>
|
||||
}
|
||||
|
||||
impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E>
|
||||
where Req: Serialize + 'static,
|
||||
Resp: Deserialize + 'static,
|
||||
E: Deserialize + 'static
|
||||
where
|
||||
Req: Serialize + 'static,
|
||||
Resp: DeserializeOwned + 'static,
|
||||
E: DeserializeOwned + 'static,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
write!(f, "Client {{ .. }}")
|
||||
@@ -182,14 +192,18 @@ pub trait ClientExt: Sized {
|
||||
}
|
||||
|
||||
/// A future that resolves to a `Client` or an `io::Error`.
|
||||
pub type ConnectFuture<Req, Resp, E> =
|
||||
futures::Flatten<futures::MapErr<futures::Oneshot<io::Result<Client<Req, Resp, E>>>,
|
||||
fn(futures::Canceled) -> io::Error>>;
|
||||
pub type ConnectFuture<Req, Resp, E> = futures::Flatten<
|
||||
futures::MapErr<
|
||||
futures::Oneshot<io::Result<Client<Req, Resp, E>>>,
|
||||
fn(futures::Canceled) -> io::Error,
|
||||
>,
|
||||
>;
|
||||
|
||||
impl<Req, Resp, E> ClientExt for Client<Req, Resp, E>
|
||||
where Req: Serialize + Sync + Send + 'static,
|
||||
Resp: Deserialize + Sync + Send + 'static,
|
||||
E: Deserialize + Sync + Send + 'static
|
||||
where
|
||||
Req: Serialize + Send + 'static,
|
||||
Resp: DeserializeOwned + Send + 'static,
|
||||
E: DeserializeOwned + Send + 'static,
|
||||
{
|
||||
type ConnectFut = ConnectFuture<Req, Resp, E>;
|
||||
|
||||
@@ -212,15 +226,17 @@ impl<Req, Resp, E> ClientExt for Client<Req, Resp, E>
|
||||
#[cfg(feature = "tls")]
|
||||
match tls_ctx {
|
||||
Some(tls_ctx) => {
|
||||
future::Either::A(tls_ctx.tls_connector
|
||||
.connect_async(&tls_ctx.domain, socket)
|
||||
.map(StreamType::Tls)
|
||||
.map_err(native_to_io))
|
||||
future::Either::A(
|
||||
tls_ctx
|
||||
.tls_connector
|
||||
.connect_async(&tls_ctx.domain, socket)
|
||||
.map(StreamType::Tls)
|
||||
.map_err(native_to_io),
|
||||
)
|
||||
}
|
||||
None => future::Either::B(future::ok(StreamType::Tcp(socket))),
|
||||
}
|
||||
#[cfg(not(feature = "tls"))]
|
||||
future::ok(StreamType::Tcp(socket))
|
||||
#[cfg(not(feature = "tls"))] future::ok(StreamType::Tcp(socket))
|
||||
})
|
||||
.map(move |tcp| Client::bind(&handle2, tcp, max_payload_size))
|
||||
};
|
||||
|
||||
@@ -20,12 +20,12 @@ impl Tracker {
|
||||
}
|
||||
|
||||
pub fn increment(&self) {
|
||||
let _ = self.tx.send(Action::Increment);
|
||||
let _ = self.tx.unbounded_send(Action::Increment);
|
||||
}
|
||||
|
||||
pub fn decrement(&self) {
|
||||
debug!("Closing connection");
|
||||
let _ = self.tx.send(Action::Decrement);
|
||||
let _ = self.tx.unbounded_send(Action::Decrement);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -7,14 +7,15 @@ use {bincode, net2};
|
||||
use errors::WireError;
|
||||
use futures::{Async, Future, Poll, Stream, future as futures};
|
||||
use protocol::Proto;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde::Serialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
use stream_type::StreamType;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_core::net::{Incoming, TcpListener, TcpStream};
|
||||
use tokio_core::reactor;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_proto::BindServer;
|
||||
use tokio_service::NewService;
|
||||
|
||||
@@ -58,10 +59,13 @@ enum Acceptor {
|
||||
|
||||
struct Accept {
|
||||
#[cfg(feature = "tls")]
|
||||
inner: futures::Either<futures::MapErr<futures::Map<AcceptAsync<TcpStream>,
|
||||
fn(TlsStream<TcpStream>) -> StreamType>,
|
||||
fn(native_tls::Error) -> io::Error>,
|
||||
futures::FutureResult<StreamType, io::Error>>,
|
||||
inner: futures::Either<
|
||||
futures::MapErr<
|
||||
futures::Map<AcceptAsync<TcpStream>, fn(TlsStream<TcpStream>) -> StreamType>,
|
||||
fn(native_tls::Error) -> io::Error,
|
||||
>,
|
||||
futures::FutureResult<StreamType, io::Error>,
|
||||
>,
|
||||
#[cfg(not(feature = "tls"))]
|
||||
inner: futures::FutureResult<StreamType, io::Error>,
|
||||
}
|
||||
@@ -82,19 +86,22 @@ impl Acceptor {
|
||||
Accept {
|
||||
inner: match *self {
|
||||
Acceptor::Tls(ref tls_acceptor) => {
|
||||
futures::Either::A(tls_acceptor.accept_async(socket)
|
||||
.map(StreamType::Tls as _)
|
||||
.map_err(native_to_io))
|
||||
futures::Either::A(
|
||||
tls_acceptor
|
||||
.accept_async(socket)
|
||||
.map(StreamType::Tls as _)
|
||||
.map_err(native_to_io),
|
||||
)
|
||||
}
|
||||
Acceptor::Tcp => futures::Either::B(futures::ok(StreamType::Tcp(socket))),
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "tls"))]
|
||||
fn accept(&self, socket: TcpStream) -> Accept {
|
||||
Accept {
|
||||
inner: futures::ok(StreamType::Tcp(socket))
|
||||
inner: futures::ok(StreamType::Tcp(socket)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -120,12 +127,12 @@ impl fmt::Debug for Acceptor {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
use self::Acceptor::*;
|
||||
#[cfg(feature = "tls")]
|
||||
const TLS: &'static &'static str = &"TlsAcceptor { .. }";
|
||||
const TLS: &str = "TlsAcceptor { .. }";
|
||||
|
||||
match *self {
|
||||
Tcp => fmt.debug_tuple("Acceptor::Tcp").finish(),
|
||||
#[cfg(feature = "tls")]
|
||||
Tls(_) => fmt.debug_tuple("Acceptlr::Tls").field(TLS).finish(),
|
||||
Tls(_) => fmt.debug_tuple("Acceptor::Tls").field(&TLS).finish(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -144,7 +151,8 @@ struct AcceptStream<S> {
|
||||
}
|
||||
|
||||
impl<S> Stream for AcceptStream<S>
|
||||
where S: Stream<Item=(TcpStream, SocketAddr), Error = io::Error>,
|
||||
where
|
||||
S: Stream<Item = (TcpStream, SocketAddr), Error = io::Error>,
|
||||
{
|
||||
type Item = <Accept as Future>::Item;
|
||||
type Error = io::Error;
|
||||
@@ -167,7 +175,7 @@ impl<S> Stream for AcceptStream<S>
|
||||
self.future = None;
|
||||
Err(e)
|
||||
}
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady)
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -215,13 +223,20 @@ impl Options {
|
||||
impl fmt::Debug for Options {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
#[cfg(feature = "tls")]
|
||||
const SOME: &'static &'static str = &"Some(_)";
|
||||
const SOME: &str = "Some(_)";
|
||||
#[cfg(feature = "tls")]
|
||||
const NONE: &'static &'static str = &"None";
|
||||
const NONE: &str = "None";
|
||||
|
||||
let mut debug_struct = fmt.debug_struct("Options");
|
||||
#[cfg(feature = "tls")]
|
||||
debug_struct.field("tls_acceptor", if self.tls_acceptor.is_some() { SOME } else { NONE });
|
||||
debug_struct.field(
|
||||
"tls_acceptor",
|
||||
if self.tls_acceptor.is_some() {
|
||||
&SOME
|
||||
} else {
|
||||
&NONE
|
||||
},
|
||||
);
|
||||
debug_struct.finish()
|
||||
}
|
||||
}
|
||||
@@ -239,17 +254,24 @@ pub fn listen<S, Req, Resp, E>(new_service: S,
|
||||
where S: NewService<Request = Result<Req, bincode::Error>,
|
||||
Response = Response<Resp, E>,
|
||||
Error = io::Error> + 'static,
|
||||
Req: Deserialize + 'static,
|
||||
Req: DeserializeOwned + 'static,
|
||||
Resp: Serialize + 'static,
|
||||
E: Serialize + 'static
|
||||
{
|
||||
let (addr, shutdown, server) = listen_with(
|
||||
new_service, addr, handle, options.max_payload_size, Acceptor::from(options))?;
|
||||
Ok((Handle {
|
||||
new_service,
|
||||
addr,
|
||||
handle,
|
||||
options.max_payload_size,
|
||||
Acceptor::from(options),
|
||||
)?;
|
||||
Ok((
|
||||
Handle {
|
||||
addr: addr,
|
||||
shutdown: shutdown,
|
||||
},
|
||||
server))
|
||||
server,
|
||||
))
|
||||
}
|
||||
|
||||
/// Spawns a service that binds to the given address using the given handle.
|
||||
@@ -262,7 +284,7 @@ fn listen_with<S, Req, Resp, E>(new_service: S,
|
||||
where S: NewService<Request = Result<Req, bincode::Error>,
|
||||
Response = Response<Resp, E>,
|
||||
Error = io::Error> + 'static,
|
||||
Req: Deserialize + 'static,
|
||||
Req: DeserializeOwned + 'static,
|
||||
Resp: Serialize + 'static,
|
||||
E: Serialize + 'static
|
||||
{
|
||||
@@ -299,7 +321,8 @@ fn listener(addr: &SocketAddr, handle: &reactor::Handle) -> io::Result<TcpListen
|
||||
}?;
|
||||
configure_tcp(&builder)?;
|
||||
builder.reuse_address(true)?;
|
||||
builder.bind(addr)?
|
||||
builder
|
||||
.bind(addr)?
|
||||
.listen(PENDING_CONNECTION_BACKLOG)
|
||||
.and_then(|l| TcpListener::from_listener(l, addr, handle))
|
||||
}
|
||||
@@ -324,16 +347,16 @@ struct BindStream<S, St> {
|
||||
}
|
||||
|
||||
impl<S, St> fmt::Debug for BindStream<S, St>
|
||||
where S: fmt::Debug,
|
||||
St: fmt::Debug,
|
||||
where
|
||||
S: fmt::Debug,
|
||||
St: fmt::Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
const HANDLE: &'static &'static str = &"Handle { .. }";
|
||||
f.debug_struct("BindStream")
|
||||
.field("handle", HANDLE)
|
||||
.field("new_service", &self.new_service)
|
||||
.field("stream", &self.stream)
|
||||
.finish()
|
||||
.field("handle", &self.handle)
|
||||
.field("new_service", &self.new_service)
|
||||
.field("stream", &self.stream)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -341,18 +364,19 @@ impl<S, Req, Resp, E, I, St> BindStream<S, St>
|
||||
where S: NewService<Request = Result<Req, bincode::Error>,
|
||||
Response = Response<Resp, E>,
|
||||
Error = io::Error> + 'static,
|
||||
Req: Deserialize + 'static,
|
||||
Req: DeserializeOwned + 'static,
|
||||
Resp: Serialize + 'static,
|
||||
E: Serialize + 'static,
|
||||
I: AsyncRead + AsyncWrite + 'static,
|
||||
St: Stream<Item=I, Error=io::Error>,
|
||||
St: Stream<Item = I, Error = io::Error>
|
||||
{
|
||||
fn bind_each(&mut self) -> Poll<(), io::Error> {
|
||||
loop {
|
||||
match try!(self.stream.poll()) {
|
||||
Async::Ready(Some(socket)) => {
|
||||
Proto::new(self.max_payload_size)
|
||||
.bind_server(&self.handle, socket, self.new_service.new_service()?);
|
||||
Proto::new(self.max_payload_size).bind_server(&self.handle,
|
||||
socket,
|
||||
self.new_service.new_service()?);
|
||||
}
|
||||
Async::Ready(None) => return Ok(Async::Ready(())),
|
||||
Async::NotReady => return Ok(Async::NotReady),
|
||||
@@ -365,11 +389,11 @@ impl<S, Req, Resp, E, I, St> Future for BindStream<S, St>
|
||||
where S: NewService<Request = Result<Req, bincode::Error>,
|
||||
Response = Response<Resp, E>,
|
||||
Error = io::Error> + 'static,
|
||||
Req: Deserialize + 'static,
|
||||
Req: DeserializeOwned + 'static,
|
||||
Resp: Serialize + 'static,
|
||||
E: Serialize + 'static,
|
||||
I: AsyncRead + AsyncWrite + 'static,
|
||||
St: Stream<Item=I, Error=io::Error>,
|
||||
St: Stream<Item = I, Error = io::Error>
|
||||
{
|
||||
type Item = ();
|
||||
type Error = ();
|
||||
@@ -392,19 +416,18 @@ pub struct Listen<S, Req, Resp, E>
|
||||
where S: NewService<Request = Result<Req, bincode::Error>,
|
||||
Response = Response<Resp, E>,
|
||||
Error = io::Error> + 'static,
|
||||
Req: Deserialize + 'static,
|
||||
Req: DeserializeOwned + 'static,
|
||||
Resp: Serialize + 'static,
|
||||
E: Serialize + 'static
|
||||
{
|
||||
inner: AlwaysOkUnit<futures::Select<BindStream<S, AcceptStream<Incoming>>,
|
||||
shutdown::Watcher>>,
|
||||
inner: AlwaysOkUnit<futures::Select<BindStream<S, AcceptStream<Incoming>>, shutdown::Watcher>>,
|
||||
}
|
||||
|
||||
impl<S, Req, Resp, E> Future for Listen<S, Req, Resp, E>
|
||||
where S: NewService<Request = Result<Req, bincode::Error>,
|
||||
Response = Response<Resp, E>,
|
||||
Error = io::Error> + 'static,
|
||||
Req: Deserialize + 'static,
|
||||
Req: DeserializeOwned + 'static,
|
||||
Resp: Serialize + 'static,
|
||||
E: Serialize + 'static
|
||||
{
|
||||
@@ -420,9 +443,9 @@ impl<S, Req, Resp, E> fmt::Debug for Listen<S, Req, Resp, E>
|
||||
where S: NewService<Request = Result<Req, bincode::Error>,
|
||||
Response = Response<Resp, E>,
|
||||
Error = io::Error> + 'static,
|
||||
Req: Deserialize + 'static,
|
||||
Req: DeserializeOwned + 'static,
|
||||
Resp: Serialize + 'static,
|
||||
E: Serialize + 'static,
|
||||
E: Serialize + 'static
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
f.debug_struct("Listen").finish()
|
||||
@@ -433,7 +456,8 @@ impl<S, Req, Resp, E> fmt::Debug for Listen<S, Req, Resp, E>
|
||||
struct AlwaysOkUnit<F>(F);
|
||||
|
||||
impl<F> Future for AlwaysOkUnit<F>
|
||||
where F: Future,
|
||||
where
|
||||
F: Future,
|
||||
{
|
||||
type Item = ();
|
||||
type Error = ();
|
||||
@@ -445,4 +469,3 @@ impl<F> Future for AlwaysOkUnit<F>
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
|
||||
|
||||
use super::{AlwaysOkUnit, connection};
|
||||
use futures::{Async, Future, Poll, Stream, future as futures, stream};
|
||||
use futures::sync::{mpsc, oneshot};
|
||||
use futures::unsync;
|
||||
|
||||
use super::{AlwaysOkUnit, connection};
|
||||
|
||||
/// A hook to shut down a running server.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Shutdown {
|
||||
@@ -13,8 +14,7 @@ pub struct Shutdown {
|
||||
/// A future that resolves when server shutdown completes.
|
||||
#[derive(Debug)]
|
||||
pub struct ShutdownFuture {
|
||||
inner: futures::Either<futures::FutureResult<(), ()>,
|
||||
AlwaysOkUnit<oneshot::Receiver<()>>>,
|
||||
inner: futures::Either<futures::FutureResult<(), ()>, AlwaysOkUnit<oneshot::Receiver<()>>>,
|
||||
}
|
||||
|
||||
impl Future for ShutdownFuture {
|
||||
@@ -36,7 +36,7 @@ impl Shutdown {
|
||||
/// The returned future resolves when the server is completely shut down.
|
||||
pub fn shutdown(&self) -> ShutdownFuture {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let inner = if let Err(_) = self.tx.send(tx) {
|
||||
let inner = if self.tx.unbounded_send(tx).is_err() {
|
||||
trace!("Server already initiated shutdown.");
|
||||
futures::Either::A(futures::ok(()))
|
||||
} else {
|
||||
@@ -60,16 +60,18 @@ impl Watcher {
|
||||
pub fn triple() -> (connection::Tracker, Shutdown, Self) {
|
||||
let (connection_tx, connections) = connection::Tracker::pair();
|
||||
let (shutdown_tx, shutdown_rx) = mpsc::unbounded();
|
||||
(connection_tx,
|
||||
Shutdown { tx: shutdown_tx },
|
||||
Watcher {
|
||||
shutdown_rx: shutdown_rx.take(1),
|
||||
connections: connections,
|
||||
queued_error: None,
|
||||
shutdown: None,
|
||||
done: false,
|
||||
num_connections: 0,
|
||||
})
|
||||
(
|
||||
connection_tx,
|
||||
Shutdown { tx: shutdown_tx },
|
||||
Watcher {
|
||||
shutdown_rx: shutdown_rx.take(1),
|
||||
connections: connections,
|
||||
queued_error: None,
|
||||
shutdown: None,
|
||||
done: false,
|
||||
num_connections: 0,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
fn process_connection(&mut self, action: connection::Action) {
|
||||
@@ -102,7 +104,7 @@ impl Watcher {
|
||||
|
||||
fn poll_shutdown_requests_and_connections(&mut self) -> Poll<Option<()>, ()> {
|
||||
if let Some(e) = self.queued_error.take() {
|
||||
return Err(e)
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
match try!(self.poll_shutdown_requests()) {
|
||||
@@ -178,4 +180,3 @@ impl Future for Watcher {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
12
src/lib.rs
12
src/lib.rs
@@ -27,7 +27,7 @@
|
||||
//! Example usage:
|
||||
//!
|
||||
//! ```
|
||||
//! #![feature(plugin)]
|
||||
//! #![feature(plugin, use_extern_macros, proc_macro_path_invoc)]
|
||||
//! #![plugin(tarpc_plugins)]
|
||||
//!
|
||||
//! #[macro_use]
|
||||
@@ -71,7 +71,7 @@
|
||||
//! Example usage with TLS:
|
||||
//!
|
||||
//! ```no-run
|
||||
//! #![feature(plugin)]
|
||||
//! #![feature(plugin, use_extern_macros, proc_macro_path_invoc)]
|
||||
//! #![plugin(tarpc_plugins)]
|
||||
//!
|
||||
//! #[macro_use]
|
||||
@@ -116,7 +116,7 @@
|
||||
|
||||
#![deny(missing_docs, missing_debug_implementations)]
|
||||
#![feature(never_type)]
|
||||
#![cfg_attr(test, feature(plugin))]
|
||||
#![cfg_attr(test, feature(plugin, use_extern_macros, proc_macro_path_invoc))]
|
||||
#![cfg_attr(test, plugin(tarpc_plugins))]
|
||||
|
||||
extern crate byteorder;
|
||||
@@ -129,9 +129,8 @@ extern crate lazy_static;
|
||||
extern crate log;
|
||||
extern crate net2;
|
||||
extern crate num_cpus;
|
||||
#[macro_use]
|
||||
extern crate serde_derive;
|
||||
extern crate thread_pool;
|
||||
extern crate tokio_codec;
|
||||
extern crate tokio_io;
|
||||
|
||||
#[doc(hidden)]
|
||||
@@ -142,6 +141,9 @@ pub extern crate futures;
|
||||
#[doc(hidden)]
|
||||
pub extern crate serde;
|
||||
#[doc(hidden)]
|
||||
#[macro_use]
|
||||
pub extern crate serde_derive;
|
||||
#[doc(hidden)]
|
||||
pub extern crate tokio_core;
|
||||
#[doc(hidden)]
|
||||
pub extern crate tokio_proto;
|
||||
|
||||
364
src/macros.rs
364
src/macros.rs
@@ -9,158 +9,12 @@ macro_rules! as_item {
|
||||
($i:item) => {$i};
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[macro_export]
|
||||
macro_rules! impl_serialize {
|
||||
($impler:ident, { $($lifetime:tt)* }, $(@($name:ident $n:expr))* -- #($n_:expr) ) => {
|
||||
as_item! {
|
||||
impl$($lifetime)* $crate::serde::Serialize for $impler$($lifetime)* {
|
||||
fn serialize<S>(&self, impl_serialize_serializer__: S)
|
||||
-> ::std::result::Result<S::Ok, S::Error>
|
||||
where S: $crate::serde::Serializer
|
||||
{
|
||||
match *self {
|
||||
$(
|
||||
$impler::$name(ref impl_serialize_field__) =>
|
||||
$crate::serde::Serializer::serialize_newtype_variant(
|
||||
impl_serialize_serializer__,
|
||||
stringify!($impler),
|
||||
$n,
|
||||
stringify!($name),
|
||||
impl_serialize_field__,
|
||||
)
|
||||
),*
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
// All args are wrapped in a tuple so we can use the newtype variant for each one.
|
||||
($impler:ident,
|
||||
{ $($lifetime:tt)* },
|
||||
$(@$finished:tt)*
|
||||
-- #($n:expr) $name:ident($field:ty) $($req:tt)*) =>
|
||||
(
|
||||
impl_serialize!($impler,
|
||||
{ $($lifetime)* },
|
||||
$(@$finished)* @($name $n)
|
||||
-- #($n + 1) $($req)*);
|
||||
);
|
||||
// Entry
|
||||
($impler:ident,
|
||||
{ $($lifetime:tt)* },
|
||||
$($started:tt)*) => (impl_serialize!($impler, { $($lifetime)* }, -- #(0) $($started)*););
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[macro_export]
|
||||
macro_rules! impl_deserialize {
|
||||
($impler:ident, $(@($name:ident $n:expr))* -- #($n_:expr) ) => (
|
||||
impl $crate::serde::Deserialize for $impler {
|
||||
#[allow(non_camel_case_types)]
|
||||
fn deserialize<impl_deserialize_D__>(
|
||||
impl_deserialize_deserializer__: impl_deserialize_D__)
|
||||
-> ::std::result::Result<$impler, impl_deserialize_D__::Error>
|
||||
where impl_deserialize_D__: $crate::serde::Deserializer
|
||||
{
|
||||
#[allow(non_camel_case_types, unused)]
|
||||
enum impl_deserialize_Field__ {
|
||||
$($name),*
|
||||
}
|
||||
|
||||
impl $crate::serde::Deserialize for impl_deserialize_Field__ {
|
||||
fn deserialize<D>(impl_deserialize_deserializer__: D)
|
||||
-> ::std::result::Result<impl_deserialize_Field__, D::Error>
|
||||
where D: $crate::serde::Deserializer
|
||||
{
|
||||
struct impl_deserialize_FieldVisitor__;
|
||||
impl $crate::serde::de::Visitor for impl_deserialize_FieldVisitor__ {
|
||||
type Value = impl_deserialize_Field__;
|
||||
|
||||
fn expecting(&self, formatter: &mut ::std::fmt::Formatter)
|
||||
-> ::std::fmt::Result
|
||||
{
|
||||
formatter.write_str("an unsigned integer")
|
||||
}
|
||||
|
||||
fn visit_u64<E>(self, impl_deserialize_value__: u64)
|
||||
-> ::std::result::Result<impl_deserialize_Field__, E>
|
||||
where E: $crate::serde::de::Error,
|
||||
{
|
||||
if impl_deserialize_value__ == 0 {
|
||||
return ::std::result::Result::Err(
|
||||
$crate::serde::de::Error::custom(
|
||||
"Variant 0 is a sentinel value and should not \
|
||||
be serialized!"));
|
||||
}
|
||||
|
||||
$(
|
||||
if impl_deserialize_value__ == $n {
|
||||
return ::std::result::Result::Ok(
|
||||
impl_deserialize_Field__::$name);
|
||||
}
|
||||
)*
|
||||
::std::result::Result::Err(
|
||||
$crate::serde::de::Error::custom(
|
||||
format!("No variants have a value of {}!",
|
||||
impl_deserialize_value__))
|
||||
)
|
||||
}
|
||||
}
|
||||
impl_deserialize_deserializer__.deserialize_struct_field(
|
||||
impl_deserialize_FieldVisitor__)
|
||||
}
|
||||
}
|
||||
|
||||
struct Visitor;
|
||||
impl $crate::serde::de::Visitor for Visitor {
|
||||
type Value = $impler;
|
||||
|
||||
fn expecting(&self, formatter: &mut ::std::fmt::Formatter)
|
||||
-> ::std::fmt::Result
|
||||
{
|
||||
formatter.write_str("an enum variant")
|
||||
}
|
||||
|
||||
fn visit_enum<V>(self, visitor__: V)
|
||||
-> ::std::result::Result<Self::Value, V::Error>
|
||||
where V: $crate::serde::de::EnumVisitor
|
||||
{
|
||||
use $crate::serde::de::VariantVisitor;
|
||||
match visitor__.visit_variant()? {
|
||||
$(
|
||||
(impl_deserialize_Field__::$name, variant) => {
|
||||
::std::result::Result::Ok(
|
||||
$impler::$name(variant.visit_newtype()?))
|
||||
}
|
||||
),*
|
||||
}
|
||||
}
|
||||
}
|
||||
const TARPC_VARIANTS__: &'static [&'static str] = &[
|
||||
$(
|
||||
stringify!($name)
|
||||
),*
|
||||
];
|
||||
impl_deserialize_deserializer__.deserialize_enum(
|
||||
stringify!($impler), TARPC_VARIANTS__, Visitor)
|
||||
}
|
||||
}
|
||||
);
|
||||
// All args are wrapped in a tuple so we can use the newtype variant for each one.
|
||||
($impler:ident, $(@$finished:tt)* -- #($n:expr) $name:ident($field:ty) $($req:tt)*) => (
|
||||
impl_deserialize!($impler, $(@$finished)* @($name $n) -- #($n + 1) $($req)*);
|
||||
);
|
||||
// Entry
|
||||
($impler:ident, $($started:tt)*) => (impl_deserialize!($impler, -- #(0) $($started)*););
|
||||
}
|
||||
|
||||
/// The main macro that creates RPC services.
|
||||
///
|
||||
/// Rpc methods are specified, mirroring trait syntax:
|
||||
///
|
||||
/// ```
|
||||
/// # #![feature(plugin)]
|
||||
/// # #![feature(plugin, use_extern_macros, proc_macro_path_invoc)]
|
||||
/// # #![plugin(tarpc_plugins)]
|
||||
/// # #[macro_use] extern crate tarpc;
|
||||
/// # fn main() {}
|
||||
@@ -290,41 +144,31 @@ macro_rules! service {
|
||||
|
||||
#[doc(hidden)]
|
||||
#[allow(non_camel_case_types, unused)]
|
||||
#[derive($crate::serde_derive::Serialize, $crate::serde_derive::Deserialize)]
|
||||
pub enum Request__ {
|
||||
NotIrrefutable(()),
|
||||
$(
|
||||
$fn_name(( $($in_,)* ))
|
||||
$fn_name{ $($arg: $in_,)* }
|
||||
),*
|
||||
}
|
||||
|
||||
impl_deserialize!(Request__, NotIrrefutable(()) $($fn_name(($($in_),*)))*);
|
||||
impl_serialize!(Request__, {}, NotIrrefutable(()) $($fn_name(($($in_),*)))*);
|
||||
|
||||
#[doc(hidden)]
|
||||
#[allow(non_camel_case_types, unused)]
|
||||
#[derive($crate::serde_derive::Serialize, $crate::serde_derive::Deserialize)]
|
||||
pub enum Response__ {
|
||||
NotIrrefutable(()),
|
||||
$(
|
||||
$fn_name($out)
|
||||
),*
|
||||
}
|
||||
|
||||
impl_deserialize!(Response__, NotIrrefutable(()) $($fn_name($out))*);
|
||||
impl_serialize!(Response__, {}, NotIrrefutable(()) $($fn_name($out))*);
|
||||
|
||||
#[doc(hidden)]
|
||||
#[allow(non_camel_case_types, unused)]
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, $crate::serde_derive::Deserialize, $crate::serde_derive::Serialize)]
|
||||
pub enum Error__ {
|
||||
NotIrrefutable(()),
|
||||
$(
|
||||
$fn_name($error)
|
||||
),*
|
||||
}
|
||||
|
||||
impl_deserialize!(Error__, NotIrrefutable(()) $($fn_name($error))*);
|
||||
impl_serialize!(Error__, {}, NotIrrefutable(()) $($fn_name($error))*);
|
||||
|
||||
/// Defines the `Future` RPC service. Implementors must be `Clone` and `'static`,
|
||||
/// as required by `tokio_proto::NewService`. This is required so that the service can be used
|
||||
/// to respond to multiple requests concurrently.
|
||||
@@ -408,10 +252,10 @@ macro_rules! service {
|
||||
::std::string::ToString::to_string(&err__)))));
|
||||
}
|
||||
};
|
||||
#[allow(unreachable_patterns)]
|
||||
match request__ {
|
||||
Request__::NotIrrefutable(()) => unreachable!(),
|
||||
$(
|
||||
Request__::$fn_name(( $($arg,)* )) => {
|
||||
Request__::$fn_name{ $($arg,)* } => {
|
||||
fn wrap__(response__: ::std::result::Result<$out, $error>)
|
||||
-> ResponseFuture__
|
||||
{
|
||||
@@ -430,6 +274,7 @@ macro_rules! service {
|
||||
wrap__));
|
||||
}
|
||||
)*
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -581,7 +426,7 @@ macro_rules! service {
|
||||
-> ::std::result::Result<$out, $crate::Error<$error>>
|
||||
{
|
||||
tarpc_service_then__!($out, $error, $fn_name);
|
||||
let resp__ = self.inner.call(Request__::$fn_name(($($arg,)*)));
|
||||
let resp__ = self.inner.call(Request__::$fn_name { $($arg,)* });
|
||||
tarpc_service_then__(resp__)
|
||||
}
|
||||
)*
|
||||
@@ -646,7 +491,7 @@ macro_rules! service {
|
||||
-> ::std::result::Result<$out, $crate::Error<$error>>> {
|
||||
tarpc_service_then__!($out, $error, $fn_name);
|
||||
|
||||
let request__ = Request__::$fn_name(($($arg,)*));
|
||||
let request__ = Request__::$fn_name { $($arg,)* };
|
||||
let future__ = $crate::tokio_service::Service::call(&self.0, request__);
|
||||
return $crate::futures::Future::then(future__, tarpc_service_then__);
|
||||
}
|
||||
@@ -665,23 +510,21 @@ macro_rules! tarpc_service_then__ {
|
||||
-> ::std::result::Result<$out, $crate::Error<$error>> {
|
||||
match msg__ {
|
||||
::std::result::Result::Ok(msg__) => {
|
||||
if let Response__::$fn_name(msg__) =
|
||||
msg__
|
||||
{
|
||||
::std::result::Result::Ok(msg__)
|
||||
} else {
|
||||
unreachable!()
|
||||
#[allow(unreachable_patterns)]
|
||||
match msg__ {
|
||||
Response__::$fn_name(msg__) =>
|
||||
::std::result::Result::Ok(msg__),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
::std::result::Result::Err(err__) => {
|
||||
::std::result::Result::Err(match err__ {
|
||||
$crate::Error::App(err__) => {
|
||||
if let Error__::$fn_name(
|
||||
err__) = err__
|
||||
{
|
||||
$crate::Error::App(err__)
|
||||
} else {
|
||||
unreachable!()
|
||||
#[allow(unreachable_patterns)]
|
||||
match err__ {
|
||||
Error__::$fn_name(err__) =>
|
||||
$crate::Error::App(err__),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
$crate::Error::RequestDeserialize(err__) => {
|
||||
@@ -750,7 +593,7 @@ mod functional_test {
|
||||
|
||||
cfg_if! {
|
||||
if #[cfg(feature = "tls")] {
|
||||
const DOMAIN: &'static str = "foobar.com";
|
||||
const DOMAIN: &str = "foobar.com";
|
||||
|
||||
use tls::client::Context;
|
||||
use native_tls::{Pkcs12, TlsAcceptor, TlsConnector};
|
||||
@@ -777,8 +620,7 @@ mod functional_test {
|
||||
if #[cfg(target_os = "macos")] {
|
||||
extern crate security_framework;
|
||||
|
||||
use self::security_framework::certificate::SecCertificate;
|
||||
use native_tls_inner::backend::security_framework::TlsConnectorBuilderExt;
|
||||
use native_tls_inner::Certificate;
|
||||
|
||||
fn get_future_tls_client_options() -> future::client::Options {
|
||||
future::client::Options::default().tls(get_tls_client_context())
|
||||
@@ -790,9 +632,9 @@ mod functional_test {
|
||||
|
||||
fn get_tls_client_context() -> Context {
|
||||
let buf = include_bytes!("../test/root-ca.der");
|
||||
let cert = unwrap!(SecCertificate::from_der(buf));
|
||||
let cert = unwrap!(Certificate::from_der(buf));
|
||||
let mut connector = unwrap!(TlsConnector::builder());
|
||||
connector.anchor_certificates(&[cert]);
|
||||
connector.add_root_certificate(cert).unwrap();
|
||||
|
||||
Context {
|
||||
domain: DOMAIN.into(),
|
||||
@@ -815,7 +657,6 @@ mod functional_test {
|
||||
fn get_tls_client_context() -> Context {
|
||||
let mut connector = unwrap!(TlsConnector::builder());
|
||||
unwrap!(connector.builder_mut()
|
||||
.builder_mut()
|
||||
.set_ca_file("test/root-ca.pem"));
|
||||
Context {
|
||||
domain: DOMAIN.into(),
|
||||
@@ -1008,9 +849,9 @@ mod functional_test {
|
||||
|
||||
#[test]
|
||||
fn simple() {
|
||||
let _ = env_logger::init();
|
||||
let (_, client, _) = unwrap!(start_server_with_sync_client::<SyncClient,
|
||||
Server>(Server));
|
||||
let _ = env_logger::try_init();
|
||||
let (_, client, _) =
|
||||
unwrap!(start_server_with_sync_client::<SyncClient, Server>(Server));
|
||||
assert_eq!(3, client.add(1, 2).unwrap());
|
||||
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
|
||||
}
|
||||
@@ -1019,9 +860,9 @@ mod functional_test {
|
||||
fn shutdown() {
|
||||
use futures::{Async, Future};
|
||||
|
||||
let _ = env_logger::init();
|
||||
let (addr, client, shutdown) = unwrap!(start_server_with_sync_client::<SyncClient,
|
||||
Server>(Server));
|
||||
let _ = env_logger::try_init();
|
||||
let (addr, client, shutdown) =
|
||||
unwrap!(start_server_with_sync_client::<SyncClient, Server>(Server));
|
||||
assert_eq!(3, unwrap!(client.add(1, 2)));
|
||||
assert_eq!("Hey, Tim.", unwrap!(client.hey("Tim".to_string())));
|
||||
|
||||
@@ -1055,9 +896,9 @@ mod functional_test {
|
||||
|
||||
#[test]
|
||||
fn no_shutdown() {
|
||||
let _ = env_logger::init();
|
||||
let (addr, client, shutdown) = unwrap!(start_server_with_sync_client::<SyncClient,
|
||||
Server>(Server));
|
||||
let _ = env_logger::try_init();
|
||||
let (addr, client, shutdown) =
|
||||
unwrap!(start_server_with_sync_client::<SyncClient, Server>(Server));
|
||||
assert_eq!(3, client.add(1, 2).unwrap());
|
||||
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
|
||||
|
||||
@@ -1071,10 +912,11 @@ mod functional_test {
|
||||
|
||||
#[test]
|
||||
fn other_service() {
|
||||
let _ = env_logger::init();
|
||||
let (_, client, _) =
|
||||
unwrap!(start_server_with_sync_client::<super::other_service::SyncClient,
|
||||
Server>(Server));
|
||||
let _ = env_logger::try_init();
|
||||
let (_, client, _) = unwrap!(start_server_with_sync_client::<
|
||||
super::other_service::SyncClient,
|
||||
Server,
|
||||
>(Server));
|
||||
match client.foo().err().expect("failed unwrap") {
|
||||
::Error::RequestDeserialize(_) => {} // good
|
||||
bad => panic!("Expected Error::RequestDeserialize but got {}", bad),
|
||||
@@ -1093,7 +935,8 @@ mod functional_test {
|
||||
|
||||
impl Serialize for Bad {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where S: Serializer
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
serializer.serialize_seq(None)?.end()
|
||||
}
|
||||
@@ -1111,7 +954,9 @@ mod functional_test {
|
||||
|
||||
#[test]
|
||||
fn bad_serialize() {
|
||||
let handle = ().listen("localhost:0", server::Options::default()).unwrap();
|
||||
let handle = ()
|
||||
.listen("localhost:0", server::Options::default())
|
||||
.unwrap();
|
||||
let client = SyncClient::connect(handle.addr(), client::Options::default()).unwrap();
|
||||
client.bad(Bad).err().unwrap();
|
||||
}
|
||||
@@ -1143,12 +988,15 @@ mod functional_test {
|
||||
|
||||
#[test]
|
||||
fn simple() {
|
||||
let _ = env_logger::init();
|
||||
let (_, mut reactor, client) =
|
||||
unwrap!(start_server_with_async_client::<FutureClient, Server>(Server));
|
||||
let _ = env_logger::try_init();
|
||||
let (_, mut reactor, client) = unwrap!(
|
||||
start_server_with_async_client::<FutureClient, Server>(Server)
|
||||
);
|
||||
assert_eq!(3, reactor.run(client.add(1, 2)).unwrap());
|
||||
assert_eq!("Hey, Tim.",
|
||||
reactor.run(client.hey("Tim".to_string())).unwrap());
|
||||
assert_eq!(
|
||||
"Hey, Tim.",
|
||||
reactor.run(client.hey("Tim".to_string())).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1156,7 +1004,7 @@ mod functional_test {
|
||||
use futures::Future;
|
||||
use tokio_core::reactor;
|
||||
|
||||
let _ = env_logger::init();
|
||||
let _ = env_logger::try_init();
|
||||
let (handle, mut reactor, server) = unwrap!(return_server::<Server>(Server));
|
||||
|
||||
let (tx, rx) = ::std::sync::mpsc::channel();
|
||||
@@ -1178,9 +1026,10 @@ mod functional_test {
|
||||
|
||||
#[test]
|
||||
fn concurrent() {
|
||||
let _ = env_logger::init();
|
||||
let (_, mut reactor, client) =
|
||||
unwrap!(start_server_with_async_client::<FutureClient, Server>(Server));
|
||||
let _ = env_logger::try_init();
|
||||
let (_, mut reactor, client) = unwrap!(
|
||||
start_server_with_async_client::<FutureClient, Server>(Server)
|
||||
);
|
||||
let req1 = client.add(1, 2);
|
||||
let req2 = client.add(3, 4);
|
||||
let req3 = client.hey("Tim".to_string());
|
||||
@@ -1191,10 +1040,11 @@ mod functional_test {
|
||||
|
||||
#[test]
|
||||
fn other_service() {
|
||||
let _ = env_logger::init();
|
||||
let (_, mut reactor, client) =
|
||||
unwrap!(start_server_with_async_client::<super::other_service::FutureClient,
|
||||
Server>(Server));
|
||||
let _ = env_logger::try_init();
|
||||
let (_, mut reactor, client) = unwrap!(start_server_with_async_client::<
|
||||
super::other_service::FutureClient,
|
||||
Server,
|
||||
>(Server));
|
||||
match reactor.run(client.foo()).err().unwrap() {
|
||||
::Error::RequestDeserialize(_) => {} // good
|
||||
bad => panic!(r#"Expected Error::RequestDeserialize but got "{}""#, bad),
|
||||
@@ -1207,14 +1057,19 @@ mod functional_test {
|
||||
use future::server;
|
||||
use super::FutureServiceExt;
|
||||
|
||||
let _ = env_logger::init();
|
||||
let _ = env_logger::try_init();
|
||||
let reactor = reactor::Core::new().unwrap();
|
||||
let handle = Server.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
let handle = Server
|
||||
.listen(
|
||||
"localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default(),
|
||||
)
|
||||
.unwrap()
|
||||
.0;
|
||||
Server.listen(handle.addr(), &reactor.handle(), server::Options::default()).unwrap();
|
||||
Server
|
||||
.listen(handle.addr(), &reactor.handle(), server::Options::default())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1224,22 +1079,29 @@ mod functional_test {
|
||||
use util::FirstSocketAddr;
|
||||
use super::{FutureClient, FutureServiceExt};
|
||||
|
||||
let _ = env_logger::init();
|
||||
let _ = env_logger::try_init();
|
||||
let mut reactor = reactor::Core::new().unwrap();
|
||||
let (handle, server) = Server.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
let (handle, server) = Server
|
||||
.listen(
|
||||
"localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default(),
|
||||
)
|
||||
.unwrap();
|
||||
reactor.handle().spawn(server);
|
||||
|
||||
let client = FutureClient::connect(handle.addr(),
|
||||
client::Options::default().handle(reactor.handle()));
|
||||
let client = FutureClient::connect(
|
||||
handle.addr(),
|
||||
client::Options::default().handle(reactor.handle()),
|
||||
);
|
||||
let client = unwrap!(reactor.run(client));
|
||||
assert_eq!(reactor.run(client.add(1, 2)).unwrap(), 3);
|
||||
drop(client);
|
||||
|
||||
let client = FutureClient::connect(handle.addr(),
|
||||
client::Options::default().handle(reactor.handle()));
|
||||
let client = FutureClient::connect(
|
||||
handle.addr(),
|
||||
client::Options::default().handle(reactor.handle()),
|
||||
);
|
||||
let client = unwrap!(reactor.run(client));
|
||||
assert_eq!(reactor.run(client.add(1, 2)).unwrap(), 3);
|
||||
}
|
||||
@@ -1252,23 +1114,33 @@ mod functional_test {
|
||||
use future::client::ClientExt;
|
||||
use super::FutureServiceExt;
|
||||
|
||||
let _ = env_logger::init();
|
||||
let (_, mut reactor, client) =
|
||||
unwrap!(start_server_with_async_client::<FutureClient, Server>(Server));
|
||||
let _ = env_logger::try_init();
|
||||
let (_, mut reactor, client) = unwrap!(
|
||||
start_server_with_async_client::<FutureClient, Server>(Server)
|
||||
);
|
||||
assert_eq!(3, reactor.run(client.add(1, 2)).unwrap());
|
||||
assert_eq!("Hey, Tim.",
|
||||
reactor.run(client.hey("Tim".to_string())).unwrap());
|
||||
assert_eq!(
|
||||
"Hey, Tim.",
|
||||
reactor.run(client.hey("Tim".to_string())).unwrap()
|
||||
);
|
||||
|
||||
let (handle, server) = Server.listen("localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default())
|
||||
let (handle, server) = Server
|
||||
.listen(
|
||||
"localhost:0".first_socket_addr(),
|
||||
&reactor.handle(),
|
||||
server::Options::default(),
|
||||
)
|
||||
.unwrap();
|
||||
reactor.handle().spawn(server);
|
||||
let options = client::Options::default().handle(reactor.handle());
|
||||
let client = reactor.run(FutureClient::connect(handle.addr(), options)).unwrap();
|
||||
let client = reactor
|
||||
.run(FutureClient::connect(handle.addr(), options))
|
||||
.unwrap();
|
||||
assert_eq!(3, reactor.run(client.add(1, 2)).unwrap());
|
||||
assert_eq!("Hey, Tim.",
|
||||
reactor.run(client.hey("Tim".to_string())).unwrap());
|
||||
assert_eq!(
|
||||
"Hey, Tim.",
|
||||
reactor.run(client.hey("Tim".to_string())).unwrap()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1294,20 +1166,20 @@ mod functional_test {
|
||||
fn error() {
|
||||
use std::error::Error as E;
|
||||
use self::error_service::*;
|
||||
let _ = env_logger::init();
|
||||
let _ = env_logger::try_init();
|
||||
|
||||
let (_, mut reactor, client) =
|
||||
start_err_server_with_async_client::<FutureClient, ErrorServer>(ErrorServer).unwrap();
|
||||
reactor.run(client.bar()
|
||||
.then(move |result| {
|
||||
match result.err().unwrap() {
|
||||
::Error::App(e) => {
|
||||
assert_eq!(e.description(), "lol jk");
|
||||
Ok::<_, ()>(())
|
||||
} // good
|
||||
bad => panic!("Expected Error::App but got {:?}", bad),
|
||||
}
|
||||
}))
|
||||
reactor
|
||||
.run(client.bar().then(move |result| {
|
||||
match result.err().unwrap() {
|
||||
::Error::App(e) => {
|
||||
assert_eq!(e.description(), "lol jk");
|
||||
Ok::<_, ()>(())
|
||||
} // good
|
||||
bad => panic!("Expected Error::App but got {:?}", bad),
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tarpc-plugins"
|
||||
version = "0.1.1"
|
||||
version = "0.4.0"
|
||||
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
|
||||
license = "MIT"
|
||||
documentation = "https://docs.rs/tarpc"
|
||||
@@ -15,7 +15,7 @@ description = "Plugins for tarpc, an RPC framework for Rust with a focus on ease
|
||||
travis-ci = { repository = "google/tarpc" }
|
||||
|
||||
[dependencies]
|
||||
itertools = "0.5"
|
||||
itertools = "0.7"
|
||||
|
||||
[lib]
|
||||
plugin = true
|
||||
|
||||
@@ -8,7 +8,7 @@ use itertools::Itertools;
|
||||
use rustc_plugin::Registry;
|
||||
use syntax::ast::{self, Ident, TraitRef, Ty, TyKind};
|
||||
use syntax::ext::base::{ExtCtxt, MacResult, DummyResult, MacEager};
|
||||
use syntax::ext::quote::rt::Span;
|
||||
use syntax::codemap::Span;
|
||||
use syntax::parse::{self, token, str_lit, PResult};
|
||||
use syntax::parse::parser::{Parser, PathStyle};
|
||||
use syntax::symbol::Symbol;
|
||||
@@ -65,7 +65,7 @@ fn snake_to_camel(cx: &mut ExtCtxt, sp: Span, tts: &[TokenTree]) -> Box<MacResul
|
||||
if let Some(tt @ TokenTree::Token(_, token::Eq)) = tokens.next() {
|
||||
let mut docstr = tokens.next().expect("Docstrings must have literal docstring");
|
||||
if let TokenTree::Token(_, token::Literal(token::Str_(ref mut doc), _)) = docstr {
|
||||
*doc = Symbol::intern(&str_lit(&doc.as_str()).replace("{}", &old_ident));
|
||||
*doc = Symbol::intern(&str_lit(&doc.as_str(), None).replace("{}", &old_ident));
|
||||
} else {
|
||||
unreachable!();
|
||||
}
|
||||
@@ -126,7 +126,7 @@ fn ty_snake_to_camel(cx: &mut ExtCtxt, sp: Span, tts: &[TokenTree]) -> Box<MacRe
|
||||
convert(&mut path.segments
|
||||
.last_mut()
|
||||
.unwrap()
|
||||
.identifier);
|
||||
.ident);
|
||||
MacEager::ty(P(Ty {
|
||||
id: ast::DUMMY_NODE_ID,
|
||||
node: TyKind::Path(None, path),
|
||||
@@ -160,10 +160,8 @@ fn convert(ident: &mut Ident) -> String {
|
||||
while let Some(c) = chars.next() {
|
||||
if c != '_' {
|
||||
camel_ty.push(c);
|
||||
} else {
|
||||
if let Some(c) = chars.next() {
|
||||
camel_ty.extend(c.to_uppercase());
|
||||
}
|
||||
} else if let Some(c) = chars.next() {
|
||||
camel_ty.extend(c.to_uppercase());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
129
src/protocol.rs
129
src/protocol.rs
@@ -3,16 +3,16 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
use serde;
|
||||
use bincode::{self, Infinite};
|
||||
use byteorder::{BigEndian, ReadBytesExt};
|
||||
use bincode;
|
||||
use byteorder::{BigEndian, ByteOrder};
|
||||
use bytes::BytesMut;
|
||||
use bytes::buf::BufMut;
|
||||
use std::io::{self, Cursor};
|
||||
use serde;
|
||||
use std::io;
|
||||
use std::marker::PhantomData;
|
||||
use std::mem;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_io::codec::{Encoder, Decoder, Framed};
|
||||
use tokio_codec::{Encoder, Decoder, Framed};
|
||||
use tokio_proto::multiplex::{ClientProto, ServerProto};
|
||||
use tokio_proto::streaming::multiplex::RequestId;
|
||||
|
||||
@@ -34,7 +34,7 @@ enum CodecState {
|
||||
impl<Encode, Decode> Codec<Encode, Decode> {
|
||||
fn new(max_payload_size: u64) -> Self {
|
||||
Codec {
|
||||
max_payload_size: max_payload_size,
|
||||
max_payload_size,
|
||||
state: CodecState::Id,
|
||||
_phantom_data: PhantomData,
|
||||
}
|
||||
@@ -42,41 +42,53 @@ impl<Encode, Decode> Codec<Encode, Decode> {
|
||||
}
|
||||
|
||||
fn too_big(payload_size: u64, max_payload_size: u64) -> io::Error {
|
||||
warn!("Not sending too-big packet of size {} (max is {})",
|
||||
payload_size, max_payload_size);
|
||||
io::Error::new(io::ErrorKind::InvalidData,
|
||||
format!("Maximum payload size is {} bytes but got a payload of {}",
|
||||
max_payload_size, payload_size))
|
||||
warn!(
|
||||
"Not sending too-big packet of size {} (max is {})",
|
||||
payload_size,
|
||||
max_payload_size
|
||||
);
|
||||
io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
format!(
|
||||
"Maximum payload size is {} bytes but got a payload of {}",
|
||||
max_payload_size,
|
||||
payload_size
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
impl<Encode, Decode> Encoder for Codec<Encode, Decode>
|
||||
where Encode: serde::Serialize,
|
||||
Decode: serde::Deserialize
|
||||
where
|
||||
Encode: serde::Serialize,
|
||||
Decode: serde::de::DeserializeOwned,
|
||||
{
|
||||
type Item = (RequestId, Encode);
|
||||
type Error = io::Error;
|
||||
|
||||
fn encode(&mut self, (id, message): Self::Item, buf: &mut BytesMut) -> io::Result<()> {
|
||||
let payload_size = bincode::serialized_size(&message);
|
||||
let payload_size = bincode::serialized_size(&message).map_err(|serialize_err| {
|
||||
io::Error::new(io::ErrorKind::Other, serialize_err)
|
||||
})?;
|
||||
if payload_size > self.max_payload_size {
|
||||
return Err(too_big(payload_size, self.max_payload_size));
|
||||
}
|
||||
let message_size = 2 * mem::size_of::<u64>() + payload_size as usize;
|
||||
buf.reserve(message_size);
|
||||
buf.put_u64::<BigEndian>(id);
|
||||
buf.put_u64_be(id);
|
||||
trace!("Encoded request id = {} as {:?}", id, buf);
|
||||
buf.put_u64::<BigEndian>(payload_size);
|
||||
bincode::serialize_into(&mut buf.writer(),
|
||||
&message,
|
||||
Infinite)
|
||||
.map_err(|serialize_err| io::Error::new(io::ErrorKind::Other, serialize_err))?;
|
||||
buf.put_u64_be(payload_size);
|
||||
bincode::serialize_into(&mut buf.writer(), &message)
|
||||
.map_err(|serialize_err| {
|
||||
io::Error::new(io::ErrorKind::Other, serialize_err)
|
||||
})?;
|
||||
trace!("Encoded buffer: {:?}", buf);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<Encode, Decode> Decoder for Codec<Encode, Decode>
|
||||
where Decode: serde::Deserialize
|
||||
where
|
||||
Decode: serde::de::DeserializeOwned,
|
||||
{
|
||||
type Item = (RequestId, Result<Decode, bincode::Error>);
|
||||
type Error = io::Error;
|
||||
@@ -93,36 +105,41 @@ impl<Encode, Decode> Decoder for Codec<Encode, Decode>
|
||||
}
|
||||
Id => {
|
||||
let mut id_buf = buf.split_to(mem::size_of::<u64>());
|
||||
let id = Cursor::new(&mut id_buf).read_u64::<BigEndian>()?;
|
||||
let id = BigEndian::read_u64(&*id_buf);
|
||||
trace!("--> Parsed id = {} from {:?}", id, id_buf);
|
||||
self.state = Len { id: id };
|
||||
self.state = Len { id };
|
||||
}
|
||||
Len { .. } if buf.len() < mem::size_of::<u64>() => {
|
||||
trace!("--> Buf len is {}; waiting for 8 to parse packet length.",
|
||||
buf.len());
|
||||
trace!(
|
||||
"--> Buf len is {}; waiting for 8 to parse packet length.",
|
||||
buf.len()
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
Len { id } => {
|
||||
let len_buf = buf.split_to(mem::size_of::<u64>());
|
||||
let len = Cursor::new(len_buf).read_u64::<BigEndian>()?;
|
||||
trace!("--> Parsed payload length = {}, remaining buffer length = {}",
|
||||
len,
|
||||
buf.len());
|
||||
let len = BigEndian::read_u64(&*len_buf);
|
||||
trace!(
|
||||
"--> Parsed payload length = {}, remaining buffer length = {}",
|
||||
len,
|
||||
buf.len()
|
||||
);
|
||||
if len > self.max_payload_size {
|
||||
return Err(too_big(len, self.max_payload_size));
|
||||
}
|
||||
self.state = Payload { id: id, len: len };
|
||||
self.state = Payload { id, len };
|
||||
}
|
||||
Payload { len, .. } if buf.len() < len as usize => {
|
||||
trace!("--> Buf len is {}; waiting for {} to parse payload.",
|
||||
buf.len(),
|
||||
len);
|
||||
trace!(
|
||||
"--> Buf len is {}; waiting for {} to parse payload.",
|
||||
buf.len(),
|
||||
len
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
Payload { id, len } => {
|
||||
let payload = buf.split_to(len as usize);
|
||||
let result = bincode::deserialize_from(&mut Cursor::new(payload),
|
||||
Infinite);
|
||||
let result = bincode::deserialize(&payload);
|
||||
// Reset the state machine because, either way, we're done processing this
|
||||
// message.
|
||||
self.state = Id;
|
||||
@@ -146,15 +163,16 @@ impl<Encode, Decode> Proto<Encode, Decode> {
|
||||
pub fn new(max_payload_size: u64) -> Self {
|
||||
Proto {
|
||||
max_payload_size: max_payload_size,
|
||||
_phantom_data: PhantomData
|
||||
_phantom_data: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, Encode, Decode> ServerProto<T> for Proto<Encode, Decode>
|
||||
where T: AsyncRead + AsyncWrite + 'static,
|
||||
Encode: serde::Serialize + 'static,
|
||||
Decode: serde::Deserialize + 'static
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + 'static,
|
||||
Encode: serde::Serialize + 'static,
|
||||
Decode: serde::de::DeserializeOwned + 'static,
|
||||
{
|
||||
type Response = Encode;
|
||||
type Request = Result<Decode, bincode::Error>;
|
||||
@@ -162,14 +180,15 @@ impl<T, Encode, Decode> ServerProto<T> for Proto<Encode, Decode>
|
||||
type BindTransport = Result<Self::Transport, io::Error>;
|
||||
|
||||
fn bind_transport(&self, io: T) -> Self::BindTransport {
|
||||
Ok(io.framed(Codec::new(self.max_payload_size)))
|
||||
Ok(Framed::new(io, Codec::new(self.max_payload_size)))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, Encode, Decode> ClientProto<T> for Proto<Encode, Decode>
|
||||
where T: AsyncRead + AsyncWrite + 'static,
|
||||
Encode: serde::Serialize + 'static,
|
||||
Decode: serde::Deserialize + 'static
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + 'static,
|
||||
Encode: serde::Serialize + 'static,
|
||||
Decode: serde::de::DeserializeOwned + 'static,
|
||||
{
|
||||
type Response = Result<Decode, bincode::Error>;
|
||||
type Request = Encode;
|
||||
@@ -177,7 +196,7 @@ impl<T, Encode, Decode> ClientProto<T> for Proto<Encode, Decode>
|
||||
type BindTransport = Result<Self::Transport, io::Error>;
|
||||
|
||||
fn bind_transport(&self, io: T) -> Self::BindTransport {
|
||||
Ok(io.framed(Codec::new(self.max_payload_size)))
|
||||
Ok(Framed::new(io, Codec::new(self.max_payload_size)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -190,8 +209,10 @@ fn serialize() {
|
||||
for _ in 0..2 {
|
||||
let mut codec: Codec<(char, char, char), (char, char, char)> = Codec::new(2_000_000);
|
||||
codec.encode(MSG, &mut buf).unwrap();
|
||||
let actual: Result<Option<(u64, Result<(char, char, char), bincode::Error>)>, io::Error> =
|
||||
codec.decode(&mut buf);
|
||||
let actual: Result<
|
||||
Option<(u64, Result<(char, char, char), bincode::Error>)>,
|
||||
io::Error,
|
||||
> = codec.decode(&mut buf);
|
||||
|
||||
match actual {
|
||||
Ok(Some((id, ref v))) if id == MSG.0 && *v.as_ref().unwrap() == MSG.1 => {}
|
||||
@@ -207,13 +228,21 @@ fn deserialize_big() {
|
||||
let mut codec: Codec<Vec<u8>, Vec<u8>> = Codec::new(24);
|
||||
|
||||
let mut buf = BytesMut::with_capacity(40);
|
||||
assert_eq!(codec.encode((0, vec![0; 24]), &mut buf).err().unwrap().kind(),
|
||||
io::ErrorKind::InvalidData);
|
||||
assert_eq!(
|
||||
codec
|
||||
.encode((0, vec![0; 24]), &mut buf)
|
||||
.err()
|
||||
.unwrap()
|
||||
.kind(),
|
||||
io::ErrorKind::InvalidData
|
||||
);
|
||||
|
||||
// Header
|
||||
buf.put_slice(&mut [0u8; 8]);
|
||||
// Len
|
||||
buf.put_slice(&mut [0u8, 0, 0, 0, 0, 0, 0, 25]);
|
||||
assert_eq!(codec.decode(&mut buf).err().unwrap().kind(),
|
||||
io::ErrorKind::InvalidData);
|
||||
assert_eq!(
|
||||
codec.decode(&mut buf).err().unwrap().kind(),
|
||||
io::ErrorKind::InvalidData
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1,19 +1,19 @@
|
||||
use future::client::{Client as FutureClient, ClientExt as FutureClientExt,
|
||||
Options as FutureOptions};
|
||||
/// Exposes a trait for connecting synchronously to servers.
|
||||
use futures::{Future, Stream};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde::Serialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::net::{SocketAddr, ToSocketAddrs};
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
#[cfg(feature = "tls")]
|
||||
use tls::client::Context;
|
||||
use tokio_core::reactor;
|
||||
use tokio_proto::util::client_proxy::{ClientProxy, Receiver, pair};
|
||||
use tokio_service::Service;
|
||||
use util::FirstSocketAddr;
|
||||
#[cfg(feature = "tls")]
|
||||
use tls::client::Context;
|
||||
|
||||
#[doc(hidden)]
|
||||
pub struct Client<Req, Resp, E> {
|
||||
@@ -22,23 +22,24 @@ pub struct Client<Req, Resp, E> {
|
||||
|
||||
impl<Req, Resp, E> Clone for Client<Req, Resp, E> {
|
||||
fn clone(&self) -> Self {
|
||||
Client { proxy: self.proxy.clone() }
|
||||
Client {
|
||||
proxy: self.proxy.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
const PROXY: &'static &'static str = &"ClientProxy { .. }";
|
||||
f.debug_struct("Client")
|
||||
.field("proxy", PROXY)
|
||||
.finish()
|
||||
const PROXY: &str = "ClientProxy { .. }";
|
||||
f.debug_struct("Client").field("proxy", &PROXY).finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp, E> Client<Req, Resp, E>
|
||||
where Req: Serialize + Sync + Send + 'static,
|
||||
Resp: Deserialize + Sync + Send + 'static,
|
||||
E: Deserialize + Sync + Send + 'static
|
||||
where
|
||||
Req: Serialize + Send + 'static,
|
||||
Resp: DeserializeOwned + Send + 'static,
|
||||
E: DeserializeOwned + Send + 'static,
|
||||
{
|
||||
/// Drives an RPC call for the given request.
|
||||
pub fn call(&self, request: Req) -> Result<Resp, ::Error<E>> {
|
||||
@@ -47,7 +48,6 @@ impl<Req, Resp, E> Client<Req, Resp, E>
|
||||
// oneshot send.
|
||||
self.proxy.call(request).wait()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// Additional options to configure how the client connects and operates.
|
||||
@@ -93,12 +93,11 @@ impl Options {
|
||||
impl fmt::Debug for Options {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
#[cfg(feature = "tls")]
|
||||
const SOME: &'static &'static str = &"Some(_)";
|
||||
const SOME: &str = "Some(_)";
|
||||
#[cfg(feature = "tls")]
|
||||
const NONE: &'static &'static str = &"None";
|
||||
const NONE: &str = "None";
|
||||
let mut f = f.debug_struct("Options");
|
||||
#[cfg(feature = "tls")]
|
||||
f.field("tls_ctx", if self.tls_ctx.is_some() { SOME } else { NONE });
|
||||
#[cfg(feature = "tls")] f.field("tls_ctx", if self.tls_ctx.is_some() { &SOME } else { &NONE });
|
||||
f.finish()
|
||||
}
|
||||
}
|
||||
@@ -124,27 +123,29 @@ impl Into<FutureOptions> for (reactor::Handle, Options) {
|
||||
/// Extension methods for Clients.
|
||||
pub trait ClientExt: Sized {
|
||||
/// Connects to a server located at the given address.
|
||||
fn connect<A>(addr: A, options: Options) -> io::Result<Self> where A: ToSocketAddrs;
|
||||
fn connect<A>(addr: A, options: Options) -> io::Result<Self>
|
||||
where
|
||||
A: ToSocketAddrs;
|
||||
}
|
||||
|
||||
impl<Req, Resp, E> ClientExt for Client<Req, Resp, E>
|
||||
where Req: Serialize + Sync + Send + 'static,
|
||||
Resp: Deserialize + Sync + Send + 'static,
|
||||
E: Deserialize + Sync + Send + 'static
|
||||
where
|
||||
Req: Serialize + Send + 'static,
|
||||
Resp: DeserializeOwned + Send + 'static,
|
||||
E: DeserializeOwned + Send + 'static,
|
||||
{
|
||||
fn connect<A>(addr: A, options: Options) -> io::Result<Self>
|
||||
where A: ToSocketAddrs
|
||||
where
|
||||
A: ToSocketAddrs,
|
||||
{
|
||||
let addr = addr.try_first_socket_addr()?;
|
||||
let (connect_tx, connect_rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
match RequestHandler::connect(addr, options) {
|
||||
Ok((proxy, mut handler)) => {
|
||||
connect_tx.send(Ok(proxy)).unwrap();
|
||||
handler.handle_requests();
|
||||
}
|
||||
Err(e) => connect_tx.send(Err(e)).unwrap(),
|
||||
thread::spawn(move || match RequestHandler::connect(addr, options) {
|
||||
Ok((proxy, mut handler)) => {
|
||||
connect_tx.send(Ok(proxy)).unwrap();
|
||||
handler.handle_requests();
|
||||
}
|
||||
Err(e) => connect_tx.send(Err(e)).unwrap(),
|
||||
});
|
||||
Ok(connect_rx.recv().unwrap()?)
|
||||
}
|
||||
@@ -160,32 +161,43 @@ struct RequestHandler<Req, Resp, E, S> {
|
||||
}
|
||||
|
||||
impl<Req, Resp, E> RequestHandler<Req, Resp, E, FutureClient<Req, Resp, E>>
|
||||
where Req: Serialize + Sync + Send + 'static,
|
||||
Resp: Deserialize + Sync + Send + 'static,
|
||||
E: Deserialize + Sync + Send + 'static
|
||||
where
|
||||
Req: Serialize + Send + 'static,
|
||||
Resp: DeserializeOwned + Send + 'static,
|
||||
E: DeserializeOwned + Send + 'static,
|
||||
{
|
||||
/// Creates a new `RequestHandler` by connecting a `FutureClient` to the given address
|
||||
/// using the given options.
|
||||
fn connect(addr: SocketAddr, options: Options)
|
||||
-> io::Result<(Client<Req, Resp, E>, Self)>
|
||||
{
|
||||
fn connect(addr: SocketAddr, options: Options) -> io::Result<(Client<Req, Resp, E>, Self)> {
|
||||
let mut reactor = reactor::Core::new()?;
|
||||
let options = (reactor.handle(), options).into();
|
||||
let client = reactor.run(FutureClient::connect(addr, options))?;
|
||||
let (proxy, requests) = pair();
|
||||
Ok((Client { proxy }, RequestHandler { reactor, client, requests }))
|
||||
Ok((
|
||||
Client { proxy },
|
||||
RequestHandler {
|
||||
reactor,
|
||||
client,
|
||||
requests,
|
||||
},
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp, E, S> RequestHandler<Req, Resp, E, S>
|
||||
where Req: Serialize + 'static,
|
||||
Resp: Deserialize + 'static,
|
||||
E: Deserialize + 'static,
|
||||
S: Service<Request = Req, Response = Resp, Error = ::Error<E>>,
|
||||
S::Future: 'static,
|
||||
where
|
||||
Req: Serialize + 'static,
|
||||
Resp: DeserializeOwned + 'static,
|
||||
E: DeserializeOwned + 'static,
|
||||
S: Service<Request = Req, Response = Resp, Error = ::Error<E>>,
|
||||
S::Future: 'static,
|
||||
{
|
||||
fn handle_requests(&mut self) {
|
||||
let RequestHandler { ref mut reactor, ref mut requests, ref mut client } = *self;
|
||||
let RequestHandler {
|
||||
ref mut reactor,
|
||||
ref mut requests,
|
||||
ref mut client,
|
||||
} = *self;
|
||||
let handle = reactor.handle();
|
||||
let requests = requests
|
||||
.map(|result| {
|
||||
@@ -196,14 +208,14 @@ impl<Req, Resp, E, S> RequestHandler<Req, Resp, E, S>
|
||||
}
|
||||
})
|
||||
.for_each(|(request, response_tx)| {
|
||||
let request = client.call(request)
|
||||
.then(move |response| {
|
||||
// Safe to unwrap because clients always block on the response future.
|
||||
response_tx.send(response)
|
||||
.map_err(|_| ())
|
||||
.expect("Client should block on response");
|
||||
Ok(())
|
||||
});
|
||||
let request = client.call(request).then(move |response| {
|
||||
// Safe to unwrap because clients always block on the response future.
|
||||
response_tx
|
||||
.send(response)
|
||||
.map_err(|_| ())
|
||||
.expect("Client should block on response");
|
||||
Ok(())
|
||||
});
|
||||
handle.spawn(request);
|
||||
Ok(())
|
||||
});
|
||||
@@ -230,7 +242,11 @@ fn handle_requests() {
|
||||
let (request, requests) = ::futures::sync::mpsc::unbounded();
|
||||
let reactor = reactor::Core::new().unwrap();
|
||||
let client = Client;
|
||||
let mut request_handler = RequestHandler { reactor, client, requests };
|
||||
let mut request_handler = RequestHandler {
|
||||
reactor,
|
||||
client,
|
||||
requests,
|
||||
};
|
||||
// Test that `handle_requests` returns when all request senders are dropped.
|
||||
drop(request);
|
||||
request_handler.handle_requests();
|
||||
|
||||
@@ -2,17 +2,18 @@ use {bincode, future, num_cpus};
|
||||
use future::server::{Response, Shutdown};
|
||||
use futures::{Future, future as futures};
|
||||
use futures::sync::oneshot;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io;
|
||||
#[cfg(feature = "tls")]
|
||||
use native_tls_inner::TlsAcceptor;
|
||||
use serde::Serialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
use std::usize;
|
||||
use thread_pool::{self, Sender, Task, ThreadPool};
|
||||
use tokio_core::reactor;
|
||||
use tokio_service::{NewService, Service};
|
||||
#[cfg(feature = "tls")]
|
||||
use native_tls_inner::TlsAcceptor;
|
||||
|
||||
/// Additional options to configure how the server operates.
|
||||
#[derive(Debug)]
|
||||
@@ -88,26 +89,28 @@ impl Handle {
|
||||
|
||||
impl fmt::Debug for Handle {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
const CORE: &'static &'static str = &"Core { .. }";
|
||||
const SERVER: &'static &'static str = &"Box<Future<Item = (), Error = ()>>";
|
||||
const SERVER: &str = "Box<Future<Item = (), Error = ()>>";
|
||||
|
||||
f.debug_struct("Handle").field("reactor", CORE)
|
||||
.field("handle", &self.handle)
|
||||
.field("server", SERVER)
|
||||
.finish()
|
||||
f.debug_struct("Handle")
|
||||
.field("reactor", &self.reactor)
|
||||
.field("handle", &self.handle)
|
||||
.field("server", &SERVER)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub fn listen<S, Req, Resp, E>(new_service: S, addr: SocketAddr, options: Options)
|
||||
-> io::Result<Handle>
|
||||
pub fn listen<S, Req, Resp, E>(new_service: S,
|
||||
addr: SocketAddr,
|
||||
options: Options)
|
||||
-> io::Result<Handle>
|
||||
where S: NewService<Request = Result<Req, bincode::Error>,
|
||||
Response = Response<Resp, E>,
|
||||
Error = io::Error> + 'static,
|
||||
<S::Instance as Service>::Future: Send + 'static,
|
||||
S::Response: Send,
|
||||
S::Error: Send,
|
||||
Req: Deserialize + 'static,
|
||||
Req: DeserializeOwned + 'static,
|
||||
Resp: Serialize + 'static,
|
||||
E: Serialize + 'static
|
||||
{
|
||||
@@ -124,42 +127,57 @@ pub fn listen<S, Req, Resp, E>(new_service: S, addr: SocketAddr, options: Option
|
||||
}
|
||||
|
||||
/// A service that uses a thread pool.
|
||||
struct NewThreadService<S> where S: NewService {
|
||||
struct NewThreadService<S>
|
||||
where
|
||||
S: NewService,
|
||||
{
|
||||
new_service: S,
|
||||
sender: Sender<ServiceTask<<S::Instance as Service>::Future>>,
|
||||
_pool: ThreadPool<ServiceTask<<S::Instance as Service>::Future>>,
|
||||
}
|
||||
|
||||
/// A service that runs by executing request handlers in a thread pool.
|
||||
struct ThreadService<S> where S: Service {
|
||||
struct ThreadService<S>
|
||||
where
|
||||
S: Service,
|
||||
{
|
||||
service: S,
|
||||
sender: Sender<ServiceTask<S::Future>>,
|
||||
}
|
||||
|
||||
/// A task that handles a single request.
|
||||
struct ServiceTask<F> where F: Future {
|
||||
struct ServiceTask<F>
|
||||
where
|
||||
F: Future,
|
||||
{
|
||||
future: F,
|
||||
tx: oneshot::Sender<Result<F::Item, F::Error>>,
|
||||
}
|
||||
|
||||
impl<S> NewThreadService<S>
|
||||
where S: NewService,
|
||||
<S::Instance as Service>::Future: Send + 'static,
|
||||
S::Response: Send,
|
||||
S::Error: Send,
|
||||
where
|
||||
S: NewService,
|
||||
<S::Instance as Service>::Future: Send + 'static,
|
||||
S::Response: Send,
|
||||
S::Error: Send,
|
||||
{
|
||||
/// Create a NewThreadService by wrapping another service.
|
||||
fn new(new_service: S, pool: thread_pool::Builder) -> Self {
|
||||
let (sender, _pool) = pool.build();
|
||||
NewThreadService { new_service, sender, _pool }
|
||||
NewThreadService {
|
||||
new_service,
|
||||
sender,
|
||||
_pool,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> NewService for NewThreadService<S>
|
||||
where S: NewService,
|
||||
<S::Instance as Service>::Future: Send + 'static,
|
||||
S::Response: Send,
|
||||
S::Error: Send,
|
||||
where
|
||||
S: NewService,
|
||||
<S::Instance as Service>::Future: Send + 'static,
|
||||
S::Response: Send,
|
||||
S::Error: Send,
|
||||
{
|
||||
type Request = S::Request;
|
||||
type Response = S::Response;
|
||||
@@ -175,9 +193,10 @@ impl<S> NewService for NewThreadService<S>
|
||||
}
|
||||
|
||||
impl<F> Task for ServiceTask<F>
|
||||
where F: Future + Send + 'static,
|
||||
F::Item: Send,
|
||||
F::Error: Send,
|
||||
where
|
||||
F: Future + Send + 'static,
|
||||
F::Item: Send,
|
||||
F::Error: Send,
|
||||
{
|
||||
fn run(self) {
|
||||
// Don't care if sending fails. It just means the request is no longer
|
||||
@@ -187,34 +206,40 @@ impl<F> Task for ServiceTask<F>
|
||||
}
|
||||
|
||||
impl<S> Service for ThreadService<S>
|
||||
where S: Service,
|
||||
S::Future: Send + 'static,
|
||||
S::Response: Send,
|
||||
S::Error: Send,
|
||||
where
|
||||
S: Service,
|
||||
S::Future: Send + 'static,
|
||||
S::Response: Send,
|
||||
S::Error: Send,
|
||||
{
|
||||
type Request = S::Request;
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future =
|
||||
futures::AndThen<
|
||||
futures::MapErr<
|
||||
oneshot::Receiver<Result<Self::Response, Self::Error>>,
|
||||
fn(oneshot::Canceled) -> Self::Error>,
|
||||
Result<Self::Response, Self::Error>,
|
||||
fn(Result<Self::Response, Self::Error>) -> Result<Self::Response, Self::Error>>;
|
||||
type Future = futures::AndThen<
|
||||
futures::MapErr<
|
||||
oneshot::Receiver<Result<Self::Response, Self::Error>>,
|
||||
fn(oneshot::Canceled) -> Self::Error,
|
||||
>,
|
||||
Result<Self::Response, Self::Error>,
|
||||
fn(Result<Self::Response, Self::Error>)
|
||||
-> Result<Self::Response, Self::Error>,
|
||||
>;
|
||||
|
||||
fn call(&self, request: Self::Request) -> Self::Future {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.sender.send(ServiceTask {
|
||||
future: self.service.call(request),
|
||||
tx: tx,
|
||||
}).unwrap();
|
||||
self.sender
|
||||
.send(ServiceTask {
|
||||
future: self.service.call(request),
|
||||
tx: tx,
|
||||
})
|
||||
.unwrap();
|
||||
rx.map_err(unreachable as _).and_then(ident)
|
||||
}
|
||||
}
|
||||
|
||||
fn unreachable<T, U>(t: T) -> U
|
||||
where T: fmt::Display
|
||||
where
|
||||
T: fmt::Display,
|
||||
{
|
||||
unreachable!(t)
|
||||
}
|
||||
@@ -222,4 +247,3 @@ fn unreachable<T, U>(t: T) -> U
|
||||
fn ident<T>(t: T) -> T {
|
||||
t
|
||||
}
|
||||
|
||||
|
||||
@@ -39,13 +39,12 @@ pub mod client {
|
||||
|
||||
impl fmt::Debug for Context {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
const TLS_CONNECTOR: &'static &'static str = &"TlsConnector { .. }";
|
||||
const TLS_CONNECTOR: &str = "TlsConnector { .. }";
|
||||
f.debug_struct("Context")
|
||||
.field("domain", &self.domain)
|
||||
.field("tls_connector", TLS_CONNECTOR)
|
||||
.finish()
|
||||
.field("domain", &self.domain)
|
||||
.field("tls_connector", &TLS_CONNECTOR)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
29
src/util.rs
29
src/util.rs
@@ -53,16 +53,18 @@ impl Stream for Never {
|
||||
|
||||
impl Serialize for Never {
|
||||
fn serialize<S>(&self, _: S) -> Result<S::Ok, S::Error>
|
||||
where S: Serializer
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
// Please don't try to deserialize this. :(
|
||||
impl Deserialize for Never {
|
||||
impl<'a> Deserialize<'a> for Never {
|
||||
fn deserialize<D>(_: D) -> Result<Self, D::Error>
|
||||
where D: Deserializer
|
||||
where
|
||||
D: Deserializer<'a>,
|
||||
{
|
||||
panic!("Never cannot be instantiated!");
|
||||
}
|
||||
@@ -99,8 +101,10 @@ pub trait FirstSocketAddr: ToSocketAddrs {
|
||||
if let Some(a) = self.to_socket_addrs()?.next() {
|
||||
Ok(a)
|
||||
} else {
|
||||
Err(io::Error::new(io::ErrorKind::AddrNotAvailable,
|
||||
"`ToSocketAddrs::to_socket_addrs` returned an empty iterator."))
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::AddrNotAvailable,
|
||||
"`ToSocketAddrs::to_socket_addrs` returned an empty iterator.",
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -119,8 +123,9 @@ impl<A: ToSocketAddrs> FirstSocketAddr for A {}
|
||||
/// on it, otherwise the callback never runs. Once run, however, this future is
|
||||
/// the same as the one the closure creates.
|
||||
pub fn lazy<F, A, R>(f: F, args: A) -> Lazy<F, A, R>
|
||||
where F: FnOnce(A) -> R,
|
||||
R: IntoFuture
|
||||
where
|
||||
F: FnOnce(A) -> R,
|
||||
R: IntoFuture,
|
||||
{
|
||||
Lazy {
|
||||
inner: _Lazy::First(f, args),
|
||||
@@ -145,8 +150,9 @@ enum _Lazy<F, A, R> {
|
||||
}
|
||||
|
||||
impl<F, A, R> Lazy<F, A, R>
|
||||
where F: FnOnce(A) -> R,
|
||||
R: IntoFuture,
|
||||
where
|
||||
F: FnOnce(A) -> R,
|
||||
R: IntoFuture,
|
||||
{
|
||||
fn get(&mut self) -> &mut R::Future {
|
||||
match self.inner {
|
||||
@@ -166,8 +172,9 @@ impl<F, A, R> Lazy<F, A, R>
|
||||
}
|
||||
|
||||
impl<F, A, R> Future for Lazy<F, A, R>
|
||||
where F: FnOnce(A) -> R,
|
||||
R: IntoFuture,
|
||||
where
|
||||
F: FnOnce(A) -> R,
|
||||
R: IntoFuture,
|
||||
{
|
||||
type Item = R::Item;
|
||||
type Error = R::Error;
|
||||
|
||||
Reference in New Issue
Block a user