mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-02-23 15:49:54 +01:00
Merge pull request #55 from tikue/framed
Replace handrolled transport with higher-level `Framed` construct
This commit is contained in:
@@ -14,7 +14,7 @@ description = "An RPC framework for Rust with a focus on ease of use."
|
||||
bincode = "0.6"
|
||||
byteorder = "0.5"
|
||||
bytes = { git = "https://github.com/carllerche/bytes" }
|
||||
futures = "0.1"
|
||||
futures = { git = "https://github.com/alexcrichton/futures-rs" }
|
||||
lazy_static = "0.2"
|
||||
log = "0.3"
|
||||
scoped-pool = "1.0"
|
||||
@@ -24,7 +24,11 @@ tarpc-plugins = { path = "src/plugins" }
|
||||
take = "0.1"
|
||||
tokio-service = { git = "https://github.com/tokio-rs/tokio-service" }
|
||||
tokio-proto = { git = "https://github.com/tokio-rs/tokio-proto" }
|
||||
tokio-core = "0.1"
|
||||
tokio-core = { git = "https://github.com/tokio-rs/tokio-core" }
|
||||
|
||||
[replace]
|
||||
"tokio-core:0.1.0" = { git = "https://github.com/tokio-rs/tokio-core" }
|
||||
"futures:0.1.3" = { git = "https://github.com/alexcrichton/futures-rs" }
|
||||
|
||||
[dev-dependencies]
|
||||
chrono = "0.2"
|
||||
|
||||
38
README.md
38
README.md
@@ -85,6 +85,44 @@ ergonomic to write servers without dealing with sockets or serialization
|
||||
directly. Simply implement one of the generated traits, and you're off to the
|
||||
races! See the tarpc_examples package for more examples.
|
||||
|
||||
## Example: Futures
|
||||
|
||||
Here's the same server, implemented using `FutureService`.
|
||||
|
||||
```rust
|
||||
#![feature(conservative_impl_trait, plugin)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate futures;
|
||||
#[macro_use]
|
||||
extern crate tarpc;
|
||||
|
||||
use tarpc::util::{FirstSocketAddr, Never};
|
||||
use tarpc::sync::Connect;
|
||||
|
||||
service! {
|
||||
rpc hello(name: String) -> String;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct HelloServer;
|
||||
|
||||
impl FutureService for HelloServer {
|
||||
type HelloFut = futures::Finished<String, Never>;
|
||||
|
||||
fn hello(&self, name: String) -> Self::HelloFut {
|
||||
futures::finished(format!("Hello, {}!", name))
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let addr = "localhost:10000";
|
||||
let _server = HelloServer.listen(addr.first_socket_addr());
|
||||
let client = SyncClient::connect(addr).unwrap();
|
||||
println!("{}", client.hello("Mom".to_string()).unwrap());
|
||||
}
|
||||
```
|
||||
|
||||
## Documentation
|
||||
Use `cargo doc` as you normally would to see the documentation created for all
|
||||
items expanded by a `service!` invocation.
|
||||
|
||||
@@ -17,7 +17,7 @@ use futures::Future;
|
||||
#[cfg(test)]
|
||||
use test::Bencher;
|
||||
use tarpc::sync::Connect;
|
||||
use tarpc::util::Never;
|
||||
use tarpc::util::{FirstSocketAddr, Never};
|
||||
|
||||
service! {
|
||||
rpc ack();
|
||||
@@ -37,7 +37,7 @@ impl FutureService for Server {
|
||||
#[bench]
|
||||
fn latency(bencher: &mut Bencher) {
|
||||
let _ = env_logger::init();
|
||||
let server = Server.listen("localhost:0").wait().unwrap();
|
||||
let server = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
let client = SyncClient::connect(server.local_addr()).unwrap();
|
||||
|
||||
bencher.iter(|| {
|
||||
|
||||
@@ -13,14 +13,15 @@ extern crate futures;
|
||||
extern crate log;
|
||||
#[macro_use]
|
||||
extern crate tarpc;
|
||||
extern crate tokio_core;
|
||||
extern crate futures_cpupool;
|
||||
|
||||
use futures::Future;
|
||||
use futures_cpupool::{CpuFuture, CpuPool};
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use tarpc::future::{Connect};
|
||||
use tarpc::util::Never;
|
||||
use tarpc::util::{FirstSocketAddr, Never, spawn_core};
|
||||
use tokio_core::reactor;
|
||||
|
||||
service! {
|
||||
rpc read(size: u32) -> Vec<u8>;
|
||||
@@ -50,41 +51,40 @@ impl FutureService for Server {
|
||||
}
|
||||
}
|
||||
|
||||
fn run_once(clients: &[FutureClient], concurrency: u32, print: bool) {
|
||||
let _ = env_logger::init();
|
||||
|
||||
fn run_once<'a>(clients: &'a [FutureClient], concurrency: u32, print: bool)
|
||||
-> Box<Future<Item=(), Error=()> + 'a>
|
||||
{
|
||||
let start = Instant::now();
|
||||
let futures: Vec<_> = clients.iter()
|
||||
let futs = clients.iter()
|
||||
.cycle()
|
||||
.take(concurrency as usize)
|
||||
.map(|client| {
|
||||
let start = SystemTime::now();
|
||||
let future = client.read(&CHUNK_SIZE).map(move |_| start.elapsed().unwrap());
|
||||
thread::yield_now();
|
||||
let future = client.read(CHUNK_SIZE).map(move |_| start.elapsed().unwrap());
|
||||
future
|
||||
})
|
||||
.collect();
|
||||
// Need an intermediate collection to kick off each future,
|
||||
// because futures::collect will iterate sequentially.
|
||||
.collect::<Vec<_>>();
|
||||
let futs = futures::collect(futs);
|
||||
|
||||
let latencies: Vec<_> = futures.into_iter()
|
||||
.map(|future| {
|
||||
future.wait().unwrap()
|
||||
})
|
||||
.collect();
|
||||
let total_time = start.elapsed();
|
||||
Box::new(futs.map(move |latencies| {
|
||||
let total_time = start.elapsed();
|
||||
|
||||
let sum_latencies = latencies.iter().fold(Duration::new(0, 0), |sum, &dur| sum + dur);
|
||||
let mean = sum_latencies / latencies.len() as u32;
|
||||
let min_latency = *latencies.iter().min().unwrap();
|
||||
let max_latency = *latencies.iter().max().unwrap();
|
||||
let sum_latencies = latencies.iter().fold(Duration::new(0, 0), |sum, &dur| sum + dur);
|
||||
let mean = sum_latencies / latencies.len() as u32;
|
||||
let min_latency = *latencies.iter().min().unwrap();
|
||||
let max_latency = *latencies.iter().max().unwrap();
|
||||
|
||||
if print {
|
||||
println!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs",
|
||||
latencies.len(),
|
||||
mean.microseconds(),
|
||||
min_latency.microseconds(),
|
||||
max_latency.microseconds(),
|
||||
total_time.microseconds());
|
||||
}
|
||||
if print {
|
||||
println!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs",
|
||||
latencies.len(),
|
||||
mean.microseconds(),
|
||||
min_latency.microseconds(),
|
||||
max_latency.microseconds(),
|
||||
total_time.microseconds());
|
||||
}
|
||||
}).map_err(|e| panic!(e)))
|
||||
}
|
||||
|
||||
trait Microseconds {
|
||||
@@ -105,18 +105,28 @@ const MAX_CONCURRENCY: u32 = 100;
|
||||
|
||||
fn main() {
|
||||
let _ = env_logger::init();
|
||||
let server = Server::new().listen("localhost:0").wait().unwrap();
|
||||
println!("Server listening on {}.", server.local_addr());
|
||||
let clients: Vec<_> = (1...5)
|
||||
.map(|i| {
|
||||
println!("Client {} connecting...", i);
|
||||
FutureClient::connect(server.local_addr()).wait().unwrap()
|
||||
})
|
||||
.collect();
|
||||
println!("Starting...");
|
||||
|
||||
run_once(&clients, MAX_CONCURRENCY, false);
|
||||
for concurrency in 1...MAX_CONCURRENCY {
|
||||
run_once(&clients, concurrency, true);
|
||||
}
|
||||
let server = Server::new().listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
println!("Server listening on {}.", server.local_addr());
|
||||
|
||||
// The driver of the main future.
|
||||
let mut core = reactor::Core::new().unwrap();
|
||||
|
||||
let clients = (1...5)
|
||||
// Spin up a couple threads to drive the clients.
|
||||
.map(|i| (i, spawn_core()))
|
||||
.map(|(i, remote)| {
|
||||
println!("Client {} connecting...", i);
|
||||
FutureClient::connect_remotely(server.local_addr(), &remote)
|
||||
.map_err(|e| panic!(e))
|
||||
})
|
||||
// Need an intermediate collection to connect the clients in parallel,
|
||||
// because `futures::collect` iterates sequentially.
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let clients = core.run(futures::collect(clients)).unwrap();
|
||||
println!("Starting...");
|
||||
let runs = ::std::iter::once(run_once(&clients, MAX_CONCURRENCY, false))
|
||||
.chain((1...MAX_CONCURRENCY).map(|concurrency| run_once(&clients, concurrency, true)));
|
||||
core.run(futures::collect(runs)).unwrap();
|
||||
}
|
||||
|
||||
@@ -20,9 +20,9 @@ use std::net::SocketAddr;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use tarpc::util::{Never, Message};
|
||||
use tarpc::future::Connect as Fc;
|
||||
use tarpc::sync::Connect as Sc;
|
||||
use tarpc::util::{FirstSocketAddr, Message, Never};
|
||||
|
||||
pub mod subscriber {
|
||||
service! {
|
||||
@@ -44,7 +44,6 @@ pub mod publisher {
|
||||
#[derive(Clone, Debug)]
|
||||
struct Subscriber {
|
||||
id: u32,
|
||||
publisher: publisher::SyncClient,
|
||||
}
|
||||
|
||||
impl subscriber::FutureService for Subscriber {
|
||||
@@ -57,16 +56,13 @@ impl subscriber::FutureService for Subscriber {
|
||||
}
|
||||
|
||||
impl Subscriber {
|
||||
fn new(id: u32, publisher: publisher::SyncClient) -> tokio::server::ServerHandle {
|
||||
let subscriber = Subscriber {
|
||||
fn new(id: u32) -> tokio::server::ServerHandle {
|
||||
Subscriber {
|
||||
id: id,
|
||||
publisher: publisher.clone(),
|
||||
}
|
||||
.listen("localhost:0")
|
||||
.listen("localhost:0".first_socket_addr())
|
||||
.wait()
|
||||
.unwrap();
|
||||
publisher.subscribe(&id, &subscriber.local_addr()).unwrap();
|
||||
subscriber
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -90,7 +86,7 @@ impl publisher::FutureService for Publisher {
|
||||
.unwrap()
|
||||
.values()
|
||||
// Ignore failing subscribers.
|
||||
.map(move |client| client.receive(&message).then(|_| Ok(())))
|
||||
.map(move |client| client.receive(message.clone()).then(|_| Ok(())))
|
||||
.collect::<Vec<_>>())
|
||||
.map(|_| ())
|
||||
.boxed()
|
||||
@@ -121,14 +117,24 @@ impl publisher::FutureService for Publisher {
|
||||
|
||||
fn main() {
|
||||
let _ = env_logger::init();
|
||||
let publisher = Publisher::new().listen("localhost:0").wait().unwrap();
|
||||
let publisher = publisher::SyncClient::connect(publisher.local_addr()).unwrap();
|
||||
let _subscriber1 = Subscriber::new(0, publisher.clone());
|
||||
let _subscriber2 = Subscriber::new(1, publisher.clone());
|
||||
let publisher_server = Publisher::new()
|
||||
.listen("localhost:0".first_socket_addr())
|
||||
.wait()
|
||||
.unwrap();
|
||||
|
||||
let publisher_addr = publisher_server.local_addr();
|
||||
let publisher_client = publisher::SyncClient::connect(publisher_addr).unwrap();
|
||||
|
||||
let subscriber1 = Subscriber::new(0);
|
||||
publisher_client.subscribe(0, *subscriber1.local_addr()).unwrap();
|
||||
|
||||
let subscriber2 = Subscriber::new(1);
|
||||
publisher_client.subscribe(1, *subscriber2.local_addr()).unwrap();
|
||||
|
||||
|
||||
println!("Broadcasting...");
|
||||
publisher.broadcast(&"hello to all".to_string()).unwrap();
|
||||
publisher.unsubscribe(&1).unwrap();
|
||||
publisher.broadcast(&"hello again".to_string()).unwrap();
|
||||
publisher_client.broadcast("hello to all".to_string()).unwrap();
|
||||
publisher_client.unsubscribe(1).unwrap();
|
||||
publisher_client.broadcast("hello again".to_string()).unwrap();
|
||||
thread::sleep(Duration::from_millis(300));
|
||||
}
|
||||
|
||||
@@ -30,5 +30,5 @@ fn main() {
|
||||
let addr = "localhost:10000";
|
||||
let _server = HelloServer.listen(addr);
|
||||
let client = SyncClient::connect(addr).unwrap();
|
||||
println!("{}", client.hello(&"Mom".to_string()).unwrap());
|
||||
println!("{}", client.hello("Mom".to_string()).unwrap());
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(conservative_impl_trait, plugin, rustc_macro)]
|
||||
#![feature(conservative_impl_trait, plugin, proc_macro)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate futures;
|
||||
@@ -52,6 +52,6 @@ fn main() {
|
||||
let addr = "localhost:10000";
|
||||
let _server = HelloServer.listen(addr);
|
||||
let client = SyncClient::connect(addr).unwrap();
|
||||
println!("{}", client.hello(&"Mom".to_string()).unwrap());
|
||||
println!("{}", client.hello(&"".to_string()).unwrap_err());
|
||||
println!("{}", client.hello("Mom".to_string()).unwrap());
|
||||
println!("{}", client.hello("".to_string()).unwrap_err());
|
||||
}
|
||||
|
||||
36
examples/readme_future.rs
Normal file
36
examples/readme_future.rs
Normal file
@@ -0,0 +1,36 @@
|
||||
// Copyright 2016 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(conservative_impl_trait, plugin)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate futures;
|
||||
#[macro_use]
|
||||
extern crate tarpc;
|
||||
|
||||
use tarpc::util::{FirstSocketAddr, Never};
|
||||
use tarpc::sync::Connect;
|
||||
|
||||
service! {
|
||||
rpc hello(name: String) -> String;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct HelloServer;
|
||||
|
||||
impl FutureService for HelloServer {
|
||||
type HelloFut = futures::Finished<String, Never>;
|
||||
|
||||
fn hello(&self, name: String) -> Self::HelloFut {
|
||||
futures::finished(format!("Hello, {}!", name))
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let addr = "localhost:10000";
|
||||
let _server = HelloServer.listen(addr.first_socket_addr());
|
||||
let client = SyncClient::connect(addr).unwrap();
|
||||
println!("{}", client.hello("Mom".to_string()).unwrap());
|
||||
}
|
||||
@@ -6,14 +6,16 @@
|
||||
#![feature(conservative_impl_trait, plugin)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate env_logger;
|
||||
#[macro_use]
|
||||
extern crate tarpc;
|
||||
extern crate futures;
|
||||
|
||||
use futures::{BoxFuture, Future};
|
||||
use add::{FutureService as AddFutureService, FutureServiceExt as AddExt};
|
||||
use double::{FutureService as DoubleFutureService, FutureServiceExt as DoubleExt};
|
||||
use tarpc::util::{Never, Message};
|
||||
use futures::{BoxFuture, Future};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tarpc::util::{FirstSocketAddr, Message, Never};
|
||||
use tarpc::future::Connect as Fc;
|
||||
use tarpc::sync::Connect as Sc;
|
||||
|
||||
@@ -46,7 +48,15 @@ impl AddFutureService for AddServer {
|
||||
|
||||
#[derive(Clone)]
|
||||
struct DoubleServer {
|
||||
client: add::FutureClient,
|
||||
client: Arc<Mutex<add::FutureClient>>,
|
||||
}
|
||||
|
||||
impl DoubleServer {
|
||||
fn new(client: add::FutureClient) -> Self {
|
||||
DoubleServer {
|
||||
client: Arc::new(Mutex::new(client))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DoubleFutureService for DoubleServer {
|
||||
@@ -54,20 +64,23 @@ impl DoubleFutureService for DoubleServer {
|
||||
|
||||
fn double(&self, x: i32) -> Self::DoubleFut {
|
||||
self.client
|
||||
.add(&x, &x)
|
||||
.lock()
|
||||
.unwrap()
|
||||
.add(x, x)
|
||||
.map_err(|e| e.to_string().into())
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let add = AddServer.listen("localhost:0").wait().unwrap();
|
||||
let _ = env_logger::init();
|
||||
let add = AddServer.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
let add_client = add::FutureClient::connect(add.local_addr()).wait().unwrap();
|
||||
let double = DoubleServer { client: add_client };
|
||||
let double = double.listen("localhost:0").wait().unwrap();
|
||||
let double = DoubleServer::new(add_client);
|
||||
let double = double.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
|
||||
let double_client = double::SyncClient::connect(double.local_addr()).unwrap();
|
||||
for i in 0..5 {
|
||||
println!("{:?}", double_client.double(&i).unwrap());
|
||||
println!("{:?}", double_client.double(i).unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,13 +14,13 @@ extern crate env_logger;
|
||||
extern crate futures;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time;
|
||||
use std::net;
|
||||
use std::thread;
|
||||
use std::time;
|
||||
use std::io::{Read, Write, stdout};
|
||||
use futures::Future;
|
||||
use tarpc::util::Never;
|
||||
use tarpc::sync::Connect;
|
||||
use tarpc::util::{FirstSocketAddr, Never};
|
||||
|
||||
lazy_static! {
|
||||
static ref BUF: Arc<Vec<u8>> = Arc::new(gen_vec(CHUNK_SIZE as usize));
|
||||
@@ -52,7 +52,7 @@ impl FutureService for Server {
|
||||
const CHUNK_SIZE: u32 = 1 << 19;
|
||||
|
||||
fn bench_tarpc(target: u64) {
|
||||
let handle = Server.listen("localhost:0").wait().unwrap();
|
||||
let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
let client = SyncClient::connect(handle.local_addr()).unwrap();
|
||||
let start = time::Instant::now();
|
||||
let mut nread = 0;
|
||||
|
||||
@@ -17,7 +17,7 @@ extern crate futures;
|
||||
use bar::FutureServiceExt as BarExt;
|
||||
use baz::FutureServiceExt as BazExt;
|
||||
use futures::Future;
|
||||
use tarpc::util::Never;
|
||||
use tarpc::util::{FirstSocketAddr, Never};
|
||||
use tarpc::sync::Connect;
|
||||
|
||||
mod bar {
|
||||
@@ -58,19 +58,19 @@ macro_rules! pos {
|
||||
|
||||
fn main() {
|
||||
let _ = env_logger::init();
|
||||
let bar = Bar.listen("localhost:0").wait().unwrap();
|
||||
let baz = Baz.listen("localhost:0").wait().unwrap();
|
||||
let bar = Bar.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
let baz = Baz.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
let bar_client = bar::SyncClient::connect(bar.local_addr()).unwrap();
|
||||
let baz_client = baz::SyncClient::connect(baz.local_addr()).unwrap();
|
||||
|
||||
info!("Result: {:?}", bar_client.bar(&17));
|
||||
info!("Result: {:?}", bar_client.bar(17));
|
||||
|
||||
let total = 20;
|
||||
for i in 1..(total + 1) {
|
||||
if i % 2 == 0 {
|
||||
info!("Result 1: {:?}", bar_client.bar(&i));
|
||||
info!("Result 1: {:?}", bar_client.bar(i));
|
||||
} else {
|
||||
info!("Result 2: {:?}", baz_client.baz(&i.to_string()));
|
||||
info!("Result 2: {:?}", baz_client.baz(i.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
183
src/client.rs
183
src/client.rs
@@ -3,40 +3,68 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
use Packet;
|
||||
use futures::{Async, BoxFuture};
|
||||
use futures::stream::Empty;
|
||||
use WireError;
|
||||
use bincode::serde::DeserializeError;
|
||||
use framed::Framed;
|
||||
use futures::{self, Async, Future};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use tokio_proto::pipeline;
|
||||
use tokio_core::net::TcpStream;
|
||||
use tokio_core::reactor;
|
||||
use tokio_proto::easy::{EasyClient, EasyResponse, multiplex};
|
||||
use tokio_service::Service;
|
||||
use util::Never;
|
||||
|
||||
/// A client `Service` that writes and reads bytes.
|
||||
/// A client that impls `tokio_service::Service` that writes and reads bytes.
|
||||
///
|
||||
/// Typically, this would be combined with a serialization pre-processing step
|
||||
/// and a deserialization post-processing step.
|
||||
#[derive(Clone)]
|
||||
pub struct Client {
|
||||
inner: pipeline::Client<Packet, Vec<u8>, Empty<Never, io::Error>, io::Error>,
|
||||
pub struct Client<Req, Resp, E> {
|
||||
inner: EasyClient<Req, WireResponse<Resp, E>>,
|
||||
}
|
||||
|
||||
impl Service for Client {
|
||||
type Request = Packet;
|
||||
type Response = Vec<u8>;
|
||||
type WireResponse<Resp, E> = Result<Result<Resp, WireError<E>>, DeserializeError>;
|
||||
type ResponseFuture<Resp, E> = futures::Map<EasyResponse<WireResponse<Resp, E>>,
|
||||
fn(WireResponse<Resp, E>) -> Result<Resp, ::Error<E>>>;
|
||||
|
||||
impl<Req, Resp, E> Service for Client<Req, Resp, E>
|
||||
where Req: Send + 'static,
|
||||
Resp: Send + 'static,
|
||||
E: Send + 'static
|
||||
{
|
||||
type Request = Req;
|
||||
type Response = Result<Resp, ::Error<E>>;
|
||||
type Error = io::Error;
|
||||
type Future = BoxFuture<Vec<u8>, io::Error>;
|
||||
type Future = ResponseFuture<Resp, E>;
|
||||
|
||||
fn poll_ready(&self) -> Async<()> {
|
||||
Async::Ready(())
|
||||
self.inner.poll_ready()
|
||||
}
|
||||
|
||||
fn call(&self, request: Packet) -> Self::Future {
|
||||
self.inner.call(pipeline::Message::WithoutBody(request))
|
||||
fn call(&self, request: Self::Request) -> Self::Future {
|
||||
self.inner.call(request).map(Self::map_err)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Client {
|
||||
impl<Req, Resp, E> Client<Req, Resp, E> {
|
||||
fn new(tcp: TcpStream, handle: &reactor::Handle) -> Self
|
||||
where Req: Serialize + Send + 'static,
|
||||
Resp: Deserialize + Send + 'static,
|
||||
E: Deserialize + Send + 'static
|
||||
{
|
||||
Client {
|
||||
inner: multiplex::connect(Framed::new(tcp), handle),
|
||||
}
|
||||
}
|
||||
|
||||
fn map_err(resp: WireResponse<Resp, E>) -> Result<Resp, ::Error<E>> {
|
||||
resp.map(|r| r.map_err(::Error::from))
|
||||
.map_err(::Error::ClientDeserialize)
|
||||
.and_then(|r| r)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
write!(f, "Client {{ .. }}")
|
||||
}
|
||||
@@ -44,35 +72,48 @@ impl fmt::Debug for Client {
|
||||
|
||||
/// Exposes a trait for connecting asynchronously to servers.
|
||||
pub mod future {
|
||||
use REMOTE;
|
||||
use futures::{self, Async, Future};
|
||||
use protocol::{LOOP_HANDLE, TarpcTransport};
|
||||
use std::cell::RefCell;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io;
|
||||
use std::marker::PhantomData;
|
||||
use std::net::SocketAddr;
|
||||
use super::Client;
|
||||
use tokio_core::net::TcpStream;
|
||||
use tokio_proto::pipeline;
|
||||
|
||||
use tokio_core::{self, reactor};
|
||||
|
||||
/// Types that can connect to a server asynchronously.
|
||||
pub trait Connect: Sized {
|
||||
/// The type of the future returned when calling connect.
|
||||
type Fut: Future<Item = Self, Error = io::Error>;
|
||||
pub trait Connect<'a>: Sized {
|
||||
/// The type of the future returned when calling `connect`.
|
||||
type ConnectFut: Future<Item = Self, Error = io::Error> + 'static;
|
||||
|
||||
/// Connects to a server located at the given address.
|
||||
fn connect(addr: &SocketAddr) -> Self::Fut;
|
||||
/// The type of the future returned when calling `connect_with`.
|
||||
type ConnectWithFut: Future<Item = Self, Error = io::Error> + 'a;
|
||||
|
||||
/// Connects to a server located at the given address, using a remote to the default
|
||||
/// reactor.
|
||||
fn connect(addr: &SocketAddr) -> Self::ConnectFut {
|
||||
Self::connect_remotely(addr, &REMOTE)
|
||||
}
|
||||
|
||||
/// Connects to a server located at the given address, using the given reactor remote.
|
||||
fn connect_remotely(addr: &SocketAddr, remote: &reactor::Remote) -> Self::ConnectFut;
|
||||
|
||||
/// Connects to a server located at the given address, using the given reactor handle.
|
||||
fn connect_with(addr: &SocketAddr, handle: &'a reactor::Handle) -> Self::ConnectWithFut;
|
||||
}
|
||||
|
||||
/// A future that resolves to a `Client` or an `io::Error`.
|
||||
pub struct ClientFuture {
|
||||
inner: futures::Oneshot<io::Result<Client>>,
|
||||
pub struct ConnectFuture<Req, Resp, E> {
|
||||
inner: futures::Oneshot<io::Result<Client<Req, Resp, E>>>,
|
||||
}
|
||||
|
||||
impl Future for ClientFuture {
|
||||
type Item = Client;
|
||||
impl<Req, Resp, E> Future for ConnectFuture<Req, Resp, E> {
|
||||
type Item = Client<Req, Resp, E>;
|
||||
type Error = io::Error;
|
||||
|
||||
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
|
||||
// Ok to unwrap because we ensure the oneshot is always completed.
|
||||
match self.inner.poll().unwrap() {
|
||||
Async::Ready(Ok(client)) => Ok(Async::Ready(client)),
|
||||
Async::Ready(Err(err)) => Err(err),
|
||||
@@ -81,27 +122,72 @@ pub mod future {
|
||||
}
|
||||
}
|
||||
|
||||
impl Connect for Client {
|
||||
type Fut = ClientFuture;
|
||||
/// A future that resolves to a `Client` or an `io::Error`.
|
||||
pub struct ConnectWithFuture<'a, Req, Resp, E> {
|
||||
inner: futures::Map<tokio_core::net::TcpStreamNew,
|
||||
MultiplexConnect<'a, Req, Resp, E>>,
|
||||
}
|
||||
|
||||
/// Starts an event loop on a thread and registers a new client
|
||||
/// connected to the given address.
|
||||
fn connect(addr: &SocketAddr) -> ClientFuture {
|
||||
impl<'a, Req, Resp, E> Future for ConnectWithFuture<'a, Req, Resp, E>
|
||||
where Req: Serialize + Send + 'static,
|
||||
Resp: Deserialize + Send + 'static,
|
||||
E: Deserialize + Send + 'static
|
||||
{
|
||||
type Item = Client<Req, Resp, E>;
|
||||
type Error = io::Error;
|
||||
|
||||
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
|
||||
self.inner.poll()
|
||||
}
|
||||
}
|
||||
|
||||
struct MultiplexConnect<'a, Req, Resp, E>(&'a reactor::Handle, PhantomData<(Req, Resp, E)>);
|
||||
|
||||
impl<'a, Req, Resp, E> MultiplexConnect<'a, Req, Resp, E> {
|
||||
fn new(handle: &'a reactor::Handle) -> Self {
|
||||
MultiplexConnect(handle, PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, Req, Resp, E> FnOnce<(TcpStream,)> for MultiplexConnect<'a, Req, Resp, E>
|
||||
where Req: Serialize + Send + 'static,
|
||||
Resp: Deserialize + Send + 'static,
|
||||
E: Deserialize + Send + 'static
|
||||
{
|
||||
type Output = Client<Req, Resp, E>;
|
||||
|
||||
extern "rust-call" fn call_once(self, (tcp,): (TcpStream,)) -> Client<Req, Resp, E> {
|
||||
Client::new(tcp, self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, Req, Resp, E> Connect<'a> for Client<Req, Resp, E>
|
||||
where Req: Serialize + Send + 'static,
|
||||
Resp: Deserialize + Send + 'static,
|
||||
E: Deserialize + Send + 'static
|
||||
{
|
||||
type ConnectFut = ConnectFuture<Req, Resp, E>;
|
||||
type ConnectWithFut = ConnectWithFuture<'a, Req, Resp, E>;
|
||||
|
||||
fn connect_remotely(addr: &SocketAddr, remote: &reactor::Remote) -> Self::ConnectFut {
|
||||
let addr = *addr;
|
||||
let (tx, rx) = futures::oneshot();
|
||||
LOOP_HANDLE.spawn(move |handle| {
|
||||
remote.spawn(move |handle| {
|
||||
let handle2 = handle.clone();
|
||||
TcpStream::connect(&addr, handle)
|
||||
.and_then(move |tcp| {
|
||||
let tcp = RefCell::new(Some(tcp));
|
||||
let c = try!(pipeline::connect(&handle2, move || {
|
||||
Ok(TarpcTransport::new(tcp.borrow_mut().take().unwrap()))
|
||||
}));
|
||||
Ok(Client { inner: c })
|
||||
.map(move |tcp| Client::new(tcp, &handle2))
|
||||
.then(move |result| {
|
||||
tx.complete(result);
|
||||
Ok(())
|
||||
})
|
||||
.then(|client| Ok(tx.complete(client)))
|
||||
});
|
||||
ClientFuture { inner: rx }
|
||||
ConnectFuture { inner: rx }
|
||||
}
|
||||
|
||||
fn connect_with(addr: &SocketAddr, handle: &'a reactor::Handle) -> Self::ConnectWithFut {
|
||||
ConnectWithFuture {
|
||||
inner: TcpStream::connect(addr, handle).map(MultiplexConnect::new(handle))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -109,6 +195,7 @@ pub mod future {
|
||||
/// Exposes a trait for connecting synchronously to servers.
|
||||
pub mod sync {
|
||||
use futures::Future;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io;
|
||||
use std::net::ToSocketAddrs;
|
||||
use super::Client;
|
||||
@@ -119,11 +206,15 @@ pub mod sync {
|
||||
fn connect<A>(addr: A) -> Result<Self, io::Error> where A: ToSocketAddrs;
|
||||
}
|
||||
|
||||
impl Connect for Client {
|
||||
impl<Req, Resp, E> Connect for Client<Req, Resp, E>
|
||||
where Req: Serialize + Send + 'static,
|
||||
Resp: Deserialize + Send + 'static,
|
||||
E: Deserialize + Send + 'static
|
||||
{
|
||||
fn connect<A>(addr: A) -> Result<Self, io::Error>
|
||||
where A: ToSocketAddrs
|
||||
{
|
||||
let addr = if let Some(a) = try!(addr.to_socket_addrs()).next() {
|
||||
let addr = if let Some(a) = addr.to_socket_addrs()?.next() {
|
||||
a
|
||||
} else {
|
||||
return Err(io::Error::new(io::ErrorKind::AddrNotAvailable,
|
||||
|
||||
@@ -3,17 +3,14 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
use bincode;
|
||||
use {bincode, tokio_proto as proto};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{fmt, io};
|
||||
use std::error::Error as StdError;
|
||||
use tokio_proto::pipeline;
|
||||
|
||||
/// All errors that can occur during the use of tarpc.
|
||||
#[derive(Debug)]
|
||||
pub enum Error<E>
|
||||
where E: SerializableError
|
||||
{
|
||||
pub enum Error<E> {
|
||||
/// Any IO error.
|
||||
Io(io::Error),
|
||||
/// Error in deserializing a server response.
|
||||
@@ -78,22 +75,22 @@ impl<E: SerializableError> StdError for Error<E> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: SerializableError> From<pipeline::Error<Error<E>>> for Error<E> {
|
||||
fn from(err: pipeline::Error<Error<E>>) -> Self {
|
||||
impl<E> From<proto::Error<Error<E>>> for Error<E> {
|
||||
fn from(err: proto::Error<Error<E>>) -> Self {
|
||||
match err {
|
||||
pipeline::Error::Transport(e) => e,
|
||||
pipeline::Error::Io(e) => e.into(),
|
||||
proto::Error::Transport(e) => e,
|
||||
proto::Error::Io(e) => e.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: SerializableError> From<io::Error> for Error<E> {
|
||||
impl<E> From<io::Error> for Error<E> {
|
||||
fn from(err: io::Error) -> Self {
|
||||
Error::Io(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: SerializableError> From<WireError<E>> for Error<E> {
|
||||
impl<E> From<WireError<E>> for Error<E> {
|
||||
fn from(err: WireError<E>) -> Self {
|
||||
match err {
|
||||
WireError::ServerDeserialize(s) => Error::ServerDeserialize(s),
|
||||
@@ -106,9 +103,7 @@ impl<E: SerializableError> From<WireError<E>> for Error<E> {
|
||||
/// A serializable, server-supplied error.
|
||||
#[doc(hidden)]
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
pub enum WireError<E>
|
||||
where E: SerializableError
|
||||
{
|
||||
pub enum WireError<E> {
|
||||
/// Error in deserializing a client request.
|
||||
ServerDeserialize(String),
|
||||
/// Error in serializing server response.
|
||||
|
||||
177
src/framed.rs
Normal file
177
src/framed.rs
Normal file
@@ -0,0 +1,177 @@
|
||||
// Copyright 2016 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
use bincode::{SizeLimit, serde as bincode};
|
||||
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
|
||||
use futures::{Async, Poll};
|
||||
use serde;
|
||||
use std::io::{self, Cursor};
|
||||
use std::marker::PhantomData;
|
||||
use std::mem;
|
||||
use tokio_core::easy::{self, EasyBuf, EasyFramed};
|
||||
use tokio_core::io::{FramedIo, Io};
|
||||
use tokio_proto::multiplex::{self, RequestId};
|
||||
use util::Never;
|
||||
|
||||
/// Handles the IO of tarpc messages. Similar to `tokio_core::easy::EasyFramed` except that it
|
||||
/// hardcodes a parser and serializer suitable for tarpc messages.
|
||||
pub struct Framed<I, In, Out> {
|
||||
inner: EasyFramed<I, Parser<Out>, Serializer<In>>,
|
||||
}
|
||||
|
||||
impl<I, In, Out> Framed<I, In, Out> {
|
||||
/// Constructs a new tarpc FramedIo
|
||||
pub fn new(upstream: I) -> Framed<I, In, Out>
|
||||
where I: Io,
|
||||
In: serde::Serialize,
|
||||
Out: serde::Deserialize
|
||||
{
|
||||
Framed { inner: EasyFramed::new(upstream, Parser::new(), Serializer::new()) }
|
||||
}
|
||||
}
|
||||
|
||||
/// The type of message sent and received by the transport.
|
||||
pub type Frame<T> = multiplex::Frame<T, Never, io::Error>;
|
||||
|
||||
impl<I, In, Out> FramedIo for Framed<I, In, Out>
|
||||
where I: Io,
|
||||
In: serde::Serialize,
|
||||
Out: serde::Deserialize
|
||||
{
|
||||
type In = (RequestId, In);
|
||||
type Out = Option<(RequestId, Result<Out, bincode::DeserializeError>)>;
|
||||
|
||||
fn poll_read(&mut self) -> Async<()> {
|
||||
self.inner.poll_read()
|
||||
}
|
||||
|
||||
fn poll_write(&mut self) -> Async<()> {
|
||||
self.inner.poll_write()
|
||||
}
|
||||
|
||||
fn read(&mut self) -> Poll<Self::Out, io::Error> {
|
||||
self.inner.read()
|
||||
}
|
||||
|
||||
fn write(&mut self, req: Self::In) -> Poll<(), io::Error> {
|
||||
self.inner.write(req)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> Poll<(), io::Error> {
|
||||
self.inner.flush()
|
||||
}
|
||||
}
|
||||
|
||||
// `T` is the type that `Parser` parses.
|
||||
struct Parser<T> {
|
||||
state: ParserState,
|
||||
_phantom_data: PhantomData<T>,
|
||||
}
|
||||
|
||||
enum ParserState {
|
||||
Id,
|
||||
Len { id: u64 },
|
||||
Payload { id: u64, len: u64 },
|
||||
}
|
||||
|
||||
impl<T> Parser<T> {
|
||||
fn new() -> Self {
|
||||
Parser {
|
||||
state: ParserState::Id,
|
||||
_phantom_data: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> easy::Parse for Parser<T>
|
||||
where T: serde::Deserialize
|
||||
{
|
||||
type Out = (RequestId, Result<T, bincode::DeserializeError>);
|
||||
|
||||
fn parse(&mut self, buf: &mut EasyBuf) -> Poll<Self::Out, io::Error> {
|
||||
use self::ParserState::*;
|
||||
|
||||
loop {
|
||||
match self.state {
|
||||
Id if buf.len() < mem::size_of::<u64>() => return Ok(Async::NotReady),
|
||||
Id => {
|
||||
self.state = Len { id: Cursor::new(&*buf.get_mut()).read_u64::<BigEndian>()? };
|
||||
*buf = buf.split_off(mem::size_of::<u64>());
|
||||
}
|
||||
Len { .. } if buf.len() < mem::size_of::<u64>() => return Ok(Async::NotReady),
|
||||
Len { id } => {
|
||||
self.state = Payload {
|
||||
id: id,
|
||||
len: Cursor::new(&*buf.get_mut()).read_u64::<BigEndian>()?,
|
||||
};
|
||||
*buf = buf.split_off(mem::size_of::<u64>());
|
||||
}
|
||||
Payload { len, .. } if buf.len() < len as usize => return Ok(Async::NotReady),
|
||||
Payload { id, .. } => {
|
||||
let mut buf = buf.get_mut();
|
||||
let result = bincode::deserialize_from(&mut Cursor::new(&mut *buf),
|
||||
SizeLimit::Infinite);
|
||||
// Clear any unread bytes so we don't read garbage on next request.
|
||||
buf.clear();
|
||||
// Reset the state machine because, either way, we're done processing this
|
||||
// message.
|
||||
self.state = Id;
|
||||
|
||||
return Ok(Async::Ready((id, result)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct Serializer<T>(PhantomData<T>);
|
||||
|
||||
impl<T> Serializer<T> {
|
||||
fn new() -> Self {
|
||||
Serializer(PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> easy::Serialize for Serializer<T>
|
||||
where T: serde::Serialize
|
||||
{
|
||||
type In = (RequestId, T);
|
||||
|
||||
fn serialize(&mut self, (id, message): Self::In, buf: &mut Vec<u8>) {
|
||||
buf.write_u64::<BigEndian>(id).unwrap();
|
||||
buf.write_u64::<BigEndian>(bincode::serialized_size(&message)).unwrap();
|
||||
bincode::serialize_into(buf,
|
||||
&message,
|
||||
SizeLimit::Infinite)
|
||||
// TODO(tikue): handle err
|
||||
.expect("In bincode::serialize_into");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialize() {
|
||||
use tokio_core::easy::{Parse, Serialize};
|
||||
|
||||
const MSG: (u64, (char, char, char)) = (4, ('a', 'b', 'c'));
|
||||
let mut buf = EasyBuf::new();
|
||||
let mut vec = Vec::new();
|
||||
|
||||
// Serialize twice to check for idempotence.
|
||||
for _ in 0..2 {
|
||||
Serializer::new().serialize(MSG, &mut vec);
|
||||
buf.get_mut().append(&mut vec);
|
||||
let actual: Poll<(u64, Result<(char, char, char), bincode::DeserializeError>), io::Error> =
|
||||
Parser::new().parse(&mut buf);
|
||||
|
||||
match actual {
|
||||
Ok(Async::Ready((id, ref v))) if id == MSG.0 && *v.as_ref().unwrap() == MSG.1 => {}
|
||||
bad => panic!("Expected {:?}, but got {:?}", Some(MSG), bad),
|
||||
}
|
||||
|
||||
assert!(buf.get_mut().is_empty(),
|
||||
"Expected empty buf but got {:?}",
|
||||
*buf.get_mut());
|
||||
}
|
||||
}
|
||||
32
src/lib.rs
32
src/lib.rs
@@ -54,15 +54,14 @@
|
||||
//! let addr = "localhost:10000";
|
||||
//! let _server = HelloServer.listen(addr);
|
||||
//! let client = SyncClient::connect(addr).unwrap();
|
||||
//! println!("{}", client.hello(&"Mom".to_string()).unwrap());
|
||||
//! println!("{}", client.hello("Mom".to_string()).unwrap());
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
#![deny(missing_docs)]
|
||||
#![feature(plugin, question_mark, conservative_impl_trait, never_type, rustc_macro)]
|
||||
#![feature(plugin, conservative_impl_trait, never_type, proc_macro, unboxed_closures, fn_traits)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate bincode;
|
||||
extern crate byteorder;
|
||||
extern crate bytes;
|
||||
#[macro_use]
|
||||
@@ -73,6 +72,8 @@ extern crate log;
|
||||
extern crate serde_derive;
|
||||
extern crate take;
|
||||
|
||||
#[doc(hidden)]
|
||||
pub extern crate bincode;
|
||||
#[doc(hidden)]
|
||||
pub extern crate futures;
|
||||
#[doc(hidden)]
|
||||
@@ -85,18 +86,18 @@ pub extern crate tokio_proto;
|
||||
pub extern crate tokio_service;
|
||||
|
||||
pub use client::{sync, future};
|
||||
pub use errors::{Error, SerializableError};
|
||||
|
||||
#[doc(hidden)]
|
||||
pub use client::Client;
|
||||
#[doc(hidden)]
|
||||
pub use client::future::ClientFuture;
|
||||
pub use client::future::{ConnectFuture, ConnectWithFuture};
|
||||
pub use errors::{Error, SerializableError};
|
||||
#[doc(hidden)]
|
||||
pub use errors::{WireError};
|
||||
pub use errors::WireError;
|
||||
#[doc(hidden)]
|
||||
pub use protocol::{Packet, deserialize};
|
||||
pub use framed::Framed;
|
||||
#[doc(hidden)]
|
||||
pub use server::{ListenFuture, SerializeFuture, SerializedReply, listen, serialize_reply};
|
||||
pub use server::{ListenFuture, Response, listen, listen_with};
|
||||
|
||||
/// Provides some utility error types, as well as a trait for spawning futures on the default event
|
||||
/// loop.
|
||||
@@ -109,8 +110,17 @@ mod macros;
|
||||
mod client;
|
||||
/// Provides the base server boilerplate used by service implementations.
|
||||
mod server;
|
||||
/// Provides the tarpc client and server, which implements the tarpc protocol.
|
||||
/// The protocol is defined by the implementation.
|
||||
mod protocol;
|
||||
/// Provides an implementation of `FramedIo` that implements the tarpc protocol.
|
||||
/// The tarpc protocol is defined by the `FramedIo` implementation.
|
||||
mod framed;
|
||||
/// Provides a few different error types.
|
||||
mod errors;
|
||||
|
||||
use tokio_core::reactor::Remote;
|
||||
|
||||
lazy_static! {
|
||||
/// The `Remote` for the default reactor core.
|
||||
pub static ref REMOTE: Remote = {
|
||||
util::spawn_core()
|
||||
};
|
||||
}
|
||||
|
||||
460
src/macros.rs
460
src/macros.rs
@@ -81,7 +81,6 @@ macro_rules! impl_serialize {
|
||||
($impler:ident, { $($lifetime:tt)* }, $(@($name:ident $n:expr))* -- #($_n:expr) ) => {
|
||||
as_item! {
|
||||
impl$($lifetime)* $crate::serde::Serialize for $impler$($lifetime)* {
|
||||
#[inline]
|
||||
fn serialize<S>(&self, __impl_serialize_serializer: &mut S)
|
||||
-> ::std::result::Result<(), S::Error>
|
||||
where S: $crate::serde::Serializer
|
||||
@@ -136,7 +135,6 @@ macro_rules! impl_deserialize {
|
||||
}
|
||||
|
||||
impl $crate::serde::Deserialize for __impl_deserialize_Field {
|
||||
#[inline]
|
||||
fn deserialize<D>(__impl_deserialize_deserializer: &mut D)
|
||||
-> ::std::result::Result<__impl_deserialize_Field, D::Error>
|
||||
where D: $crate::serde::Deserializer
|
||||
@@ -145,7 +143,6 @@ macro_rules! impl_deserialize {
|
||||
impl $crate::serde::de::Visitor for __impl_deserialize_FieldVisitor {
|
||||
type Value = __impl_deserialize_Field;
|
||||
|
||||
#[inline]
|
||||
fn visit_usize<E>(&mut self, __impl_deserialize_value: usize)
|
||||
-> ::std::result::Result<__impl_deserialize_Field, E>
|
||||
where E: $crate::serde::de::Error,
|
||||
@@ -172,28 +169,27 @@ macro_rules! impl_deserialize {
|
||||
impl $crate::serde::de::EnumVisitor for Visitor {
|
||||
type Value = $impler;
|
||||
|
||||
#[inline]
|
||||
fn visit<V>(&mut self, mut visitor: V)
|
||||
fn visit<V>(&mut self, mut __tarpc_enum_visitor: V)
|
||||
-> ::std::result::Result<$impler, V::Error>
|
||||
where V: $crate::serde::de::VariantVisitor
|
||||
{
|
||||
match try!(visitor.visit_variant()) {
|
||||
match __tarpc_enum_visitor.visit_variant()? {
|
||||
$(
|
||||
__impl_deserialize_Field::$name => {
|
||||
let val = try!(visitor.visit_newtype());
|
||||
::std::result::Result::Ok($impler::$name(val))
|
||||
::std::result::Result::Ok(
|
||||
$impler::$name(__tarpc_enum_visitor.visit_newtype()?))
|
||||
}
|
||||
),*
|
||||
}
|
||||
}
|
||||
}
|
||||
const VARIANTS: &'static [&'static str] = &[
|
||||
const __TARPC_VARIANTS: &'static [&'static str] = &[
|
||||
$(
|
||||
stringify!($name)
|
||||
),*
|
||||
];
|
||||
__impl_deserialize_deserializer.deserialize_enum(
|
||||
stringify!($impler), VARIANTS, Visitor)
|
||||
stringify!($impler), __TARPC_VARIANTS, Visitor)
|
||||
}
|
||||
}
|
||||
);
|
||||
@@ -239,7 +235,7 @@ macro_rules! impl_deserialize {
|
||||
///
|
||||
#[macro_export]
|
||||
macro_rules! service {
|
||||
// Entry point
|
||||
// Entry point
|
||||
(
|
||||
$(
|
||||
$(#[$attr:meta])*
|
||||
@@ -253,7 +249,7 @@ macro_rules! service {
|
||||
)*
|
||||
}}
|
||||
};
|
||||
// Pattern for when the next rpc has an implicit unit return type and no error type.
|
||||
// Pattern for when the next rpc has an implicit unit return type and no error type.
|
||||
(
|
||||
{
|
||||
$(#[$attr:meta])*
|
||||
@@ -272,7 +268,7 @@ macro_rules! service {
|
||||
rpc $fn_name( $( $arg : $in_ ),* ) -> () | $crate::util::Never;
|
||||
}
|
||||
};
|
||||
// Pattern for when the next rpc has an explicit return type and no error type.
|
||||
// Pattern for when the next rpc has an explicit return type and no error type.
|
||||
(
|
||||
{
|
||||
$(#[$attr:meta])*
|
||||
@@ -291,7 +287,7 @@ macro_rules! service {
|
||||
rpc $fn_name( $( $arg : $in_ ),* ) -> $out | $crate::util::Never;
|
||||
}
|
||||
};
|
||||
// Pattern for when the next rpc has an implicit unit return type and an explicit error type.
|
||||
// Pattern for when the next rpc has an implicit unit return type and an explicit error type.
|
||||
(
|
||||
{
|
||||
$(#[$attr:meta])*
|
||||
@@ -310,7 +306,7 @@ macro_rules! service {
|
||||
rpc $fn_name( $( $arg : $in_ ),* ) -> () | $error;
|
||||
}
|
||||
};
|
||||
// Pattern for when the next rpc has an explicit return type and an explicit error type.
|
||||
// Pattern for when the next rpc has an explicit return type and an explicit error type.
|
||||
(
|
||||
{
|
||||
$(#[$attr:meta])*
|
||||
@@ -329,7 +325,7 @@ macro_rules! service {
|
||||
rpc $fn_name( $( $arg : $in_ ),* ) -> $out | $error;
|
||||
}
|
||||
};
|
||||
// Pattern for when all return types have been expanded
|
||||
// Pattern for when all return types have been expanded
|
||||
(
|
||||
{ } // none left to expand
|
||||
$(
|
||||
@@ -337,43 +333,46 @@ macro_rules! service {
|
||||
rpc $fn_name:ident ( $( $arg:ident : $in_:ty ),* ) -> $out:ty | $error:ty;
|
||||
)*
|
||||
) => {
|
||||
service! {
|
||||
{ }
|
||||
|
||||
#[allow(non_camel_case_types, unused)]
|
||||
#[derive(Debug)]
|
||||
enum __tarpc_service_Request {
|
||||
NotIrrefutable(()),
|
||||
$(
|
||||
$(#[$attr])*
|
||||
rpc $fn_name( $( $arg : $in_ ),* ) -> $out | $error;
|
||||
)*
|
||||
|
||||
{
|
||||
#[allow(non_camel_case_types, unused)]
|
||||
#[derive(Debug)]
|
||||
enum __ClientSideRequest<'a> {
|
||||
$(
|
||||
$fn_name(&'a ( $(&'a $in_,)* ))
|
||||
),*
|
||||
}
|
||||
|
||||
impl_serialize!(__ClientSideRequest, { <'__a> }, $($fn_name(($($in_),*)))*);
|
||||
}
|
||||
$fn_name(( $($in_,)* ))
|
||||
),*
|
||||
}
|
||||
};
|
||||
// Pattern for when all return types and the client request have been expanded
|
||||
(
|
||||
{ } // none left to expand
|
||||
$(
|
||||
$(#[$attr:meta])*
|
||||
rpc $fn_name:ident ( $( $arg:ident : $in_:ty ),* ) -> $out:ty | $error:ty;
|
||||
)*
|
||||
{
|
||||
$client_req:item
|
||||
$client_serialize_impl:item
|
||||
}
|
||||
) => {
|
||||
|
||||
/// Defines the `Future` RPC service. Implementors must be `Clone`, `Send`, and `'static`,
|
||||
/// as required by `tokio_proto::NewService`. This is required so that the service can be used
|
||||
/// to respond to multiple requests concurrently.
|
||||
impl_deserialize!(__tarpc_service_Request, NotIrrefutable(()) $($fn_name(($($in_),*)))*);
|
||||
impl_serialize!(__tarpc_service_Request, {}, NotIrrefutable(()) $($fn_name(($($in_),*)))*);
|
||||
|
||||
#[allow(non_camel_case_types, unused)]
|
||||
#[derive(Debug)]
|
||||
enum __tarpc_service_Response {
|
||||
NotIrrefutable(()),
|
||||
$(
|
||||
$fn_name($out)
|
||||
),*
|
||||
}
|
||||
|
||||
impl_deserialize!(__tarpc_service_Response, NotIrrefutable(()) $($fn_name($out))*);
|
||||
impl_serialize!(__tarpc_service_Response, {}, NotIrrefutable(()) $($fn_name($out))*);
|
||||
|
||||
#[allow(non_camel_case_types, unused)]
|
||||
#[derive(Debug)]
|
||||
enum __tarpc_service_Error {
|
||||
NotIrrefutable(()),
|
||||
$(
|
||||
$fn_name($error)
|
||||
),*
|
||||
}
|
||||
|
||||
impl_deserialize!(__tarpc_service_Error, NotIrrefutable(()) $($fn_name($error))*);
|
||||
impl_serialize!(__tarpc_service_Error, {}, NotIrrefutable(()) $($fn_name($error))*);
|
||||
|
||||
/// Defines the `Future` RPC service. Implementors must be `Clone`, `Send`, and `'static`,
|
||||
/// as required by `tokio_proto::NewService`. This is required so that the service can be used
|
||||
/// to respond to multiple requests concurrently.
|
||||
pub trait FutureService:
|
||||
::std::marker::Send +
|
||||
::std::clone::Clone +
|
||||
@@ -396,9 +395,7 @@ macro_rules! service {
|
||||
pub trait FutureServiceExt: FutureService {
|
||||
/// Spawns the service, binding to the given address and running on
|
||||
/// the default tokio `Loop`.
|
||||
fn listen<L>(self, addr: L) -> $crate::ListenFuture
|
||||
where L: ::std::net::ToSocketAddrs
|
||||
{
|
||||
fn listen(self, addr: ::std::net::SocketAddr) -> $crate::ListenFuture {
|
||||
return $crate::listen(addr, __tarpc_service_AsyncServer(self));
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
@@ -411,30 +408,40 @@ macro_rules! service {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
enum __tarpc_service_Reply<__tarpc_service_S: FutureService> {
|
||||
DeserializeError($crate::SerializeFuture),
|
||||
$($fn_name($crate::futures::Then<
|
||||
$crate::futures::MapErr<
|
||||
ty_snake_to_camel!(__tarpc_service_S::$fn_name),
|
||||
fn($error) -> $crate::WireError<$error>>,
|
||||
$crate::SerializeFuture,
|
||||
fn(::std::result::Result<$out, $crate::WireError<$error>>)
|
||||
-> $crate::SerializeFuture>)),*
|
||||
type __tarpc_service_Future =
|
||||
$crate::futures::Finished<$crate::Response<__tarpc_service_Response,
|
||||
__tarpc_service_Error>,
|
||||
::std::io::Error>;
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
enum __tarpc_service_FutureReply<__tarpc_service_S: FutureService> {
|
||||
DeserializeError(__tarpc_service_Future),
|
||||
$($fn_name(
|
||||
$crate::futures::Then<ty_snake_to_camel!(__tarpc_service_S::$fn_name),
|
||||
__tarpc_service_Future,
|
||||
fn(::std::result::Result<$out, $error>)
|
||||
-> __tarpc_service_Future>)),*
|
||||
}
|
||||
|
||||
impl<S: FutureService> $crate::futures::Future for __tarpc_service_Reply<S> {
|
||||
type Item = $crate::SerializedReply;
|
||||
impl<S: FutureService> $crate::futures::Future for __tarpc_service_FutureReply<S> {
|
||||
type Item = $crate::Response<__tarpc_service_Response, __tarpc_service_Error>;
|
||||
|
||||
type Error = ::std::io::Error;
|
||||
|
||||
fn poll(&mut self) -> $crate::futures::Poll<Self::Item, Self::Error> {
|
||||
match *self {
|
||||
__tarpc_service_Reply::DeserializeError(ref mut f) => {
|
||||
$crate::futures::Future::poll(f)
|
||||
__tarpc_service_FutureReply::DeserializeError(
|
||||
ref mut __tarpc_service_future) =>
|
||||
{
|
||||
$crate::futures::Future::poll(__tarpc_service_future)
|
||||
}
|
||||
$(
|
||||
__tarpc_service_Reply::$fn_name(ref mut f) => {
|
||||
$crate::futures::Future::poll(f)
|
||||
__tarpc_service_FutureReply::$fn_name(
|
||||
ref mut __tarpc_service_future) =>
|
||||
{
|
||||
$crate::futures::Future::poll(__tarpc_service_future)
|
||||
}
|
||||
),*
|
||||
}
|
||||
@@ -447,66 +454,54 @@ macro_rules! service {
|
||||
for __tarpc_service_AsyncServer<__tarpc_service_S>
|
||||
where __tarpc_service_S: FutureService
|
||||
{
|
||||
type Request = ::std::vec::Vec<u8>;
|
||||
type Response = $crate::SerializedReply;
|
||||
type Request = ::std::result::Result<__tarpc_service_Request,
|
||||
$crate::bincode::serde::DeserializeError>;
|
||||
type Response = $crate::Response<__tarpc_service_Response,
|
||||
__tarpc_service_Error>;
|
||||
type Error = ::std::io::Error;
|
||||
type Future = __tarpc_service_Reply<__tarpc_service_S>;
|
||||
type Future = __tarpc_service_FutureReply<__tarpc_service_S>;
|
||||
|
||||
fn poll_ready(&self) -> $crate::futures::Async<()> {
|
||||
$crate::futures::Async::Ready(())
|
||||
}
|
||||
|
||||
fn call(&self, __tarpc_service_req: Self::Request) -> Self::Future {
|
||||
#[allow(non_camel_case_types, unused)]
|
||||
#[derive(Debug)]
|
||||
enum __tarpc_service_ServerSideRequest {
|
||||
$(
|
||||
$fn_name(( $($in_,)* ))
|
||||
),*
|
||||
}
|
||||
|
||||
impl_deserialize!(__tarpc_service_ServerSideRequest,
|
||||
$($fn_name(($($in_),*)))*);
|
||||
|
||||
let __tarpc_service_request = $crate::deserialize(&__tarpc_service_req);
|
||||
let __tarpc_service_request: __tarpc_service_ServerSideRequest =
|
||||
match __tarpc_service_request {
|
||||
::std::result::Result::Ok(__tarpc_service_request) => {
|
||||
__tarpc_service_request
|
||||
}
|
||||
::std::result::Result::Err(__tarpc_service_e) => {
|
||||
return __tarpc_service_Reply::DeserializeError(
|
||||
deserialize_error(__tarpc_service_e));
|
||||
}
|
||||
};
|
||||
match __tarpc_service_request {$(
|
||||
__tarpc_service_ServerSideRequest::$fn_name(( $($arg,)* )) => {
|
||||
const SERIALIZE:
|
||||
fn(::std::result::Result<$out, $crate::WireError<$error>>)
|
||||
-> $crate::SerializeFuture
|
||||
= $crate::serialize_reply;
|
||||
const TO_APP: fn($error) -> $crate::WireError<$error> =
|
||||
$crate::WireError::App;
|
||||
|
||||
return __tarpc_service_Reply::$fn_name(
|
||||
$crate::futures::Future::then(
|
||||
$crate::futures::Future::map_err(
|
||||
FutureService::$fn_name(&self.0, $($arg),*),
|
||||
TO_APP),
|
||||
SERIALIZE));
|
||||
fn call(&self, __tarpc_service_request: Self::Request) -> Self::Future {
|
||||
let __tarpc_service_request = match __tarpc_service_request {
|
||||
Ok(__tarpc_service_request) => __tarpc_service_request,
|
||||
Err(__tarpc_service_deserialize_err) => {
|
||||
return __tarpc_service_FutureReply::DeserializeError(
|
||||
$crate::futures::finished(
|
||||
::std::result::Result::Err(
|
||||
$crate::WireError::ServerDeserialize(
|
||||
::std::string::ToString::to_string(
|
||||
&__tarpc_service_deserialize_err)))));
|
||||
}
|
||||
)*}
|
||||
|
||||
#[inline]
|
||||
fn deserialize_error<E: ::std::error::Error>(__tarpc_service_e: E)
|
||||
-> $crate::SerializeFuture
|
||||
{
|
||||
$crate::serialize_reply(
|
||||
// The type param is only used in the Error::App variant, so it
|
||||
// doesn't matter what we specify it as here.
|
||||
::std::result::Result::Err::<(), _>(
|
||||
$crate::WireError::ServerDeserialize::<$crate::util::Never>(
|
||||
__tarpc_service_e.to_string())))
|
||||
};
|
||||
match __tarpc_service_request {
|
||||
__tarpc_service_Request::NotIrrefutable(()) => unreachable!(),
|
||||
$(
|
||||
__tarpc_service_Request::$fn_name(( $($arg,)* )) => {
|
||||
fn __tarpc_service_wrap(
|
||||
__tarpc_service_response:
|
||||
::std::result::Result<$out, $error>)
|
||||
-> __tarpc_service_Future
|
||||
{
|
||||
$crate::futures::finished(
|
||||
__tarpc_service_response
|
||||
.map(__tarpc_service_Response::$fn_name)
|
||||
.map_err(|__tarpc_service_error| {
|
||||
$crate::WireError::App(
|
||||
__tarpc_service_Error::$fn_name(
|
||||
__tarpc_service_error))
|
||||
})
|
||||
)
|
||||
}
|
||||
return __tarpc_service_FutureReply::$fn_name(
|
||||
$crate::futures::Future::then(
|
||||
FutureService::$fn_name(&self.0, $($arg),*),
|
||||
__tarpc_service_wrap));
|
||||
}
|
||||
)*
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -514,8 +509,8 @@ macro_rules! service {
|
||||
}
|
||||
|
||||
/// Defines the blocking RPC service. Must be `Clone`, `Send`, and `'static`,
|
||||
/// as required by `tokio_proto::NewService`. This is required so that the service can be used
|
||||
/// to respond to multiple requests concurrently.
|
||||
/// as required by `tokio_proto::NewService`. This is required so that the service can be used
|
||||
/// to respond to multiple requests concurrently.
|
||||
pub trait SyncService:
|
||||
::std::marker::Send +
|
||||
::std::clone::Clone +
|
||||
@@ -533,14 +528,15 @@ macro_rules! service {
|
||||
/// Spawns the service, binding to the given address and running on
|
||||
/// the default tokio `Loop`.
|
||||
fn listen<L>(self, addr: L)
|
||||
-> $crate::tokio_proto::server::ServerHandle
|
||||
-> ::std::io::Result<$crate::tokio_proto::server::ServerHandle>
|
||||
where L: ::std::net::ToSocketAddrs
|
||||
{
|
||||
|
||||
let addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?;
|
||||
let __tarpc_service_service = __SyncServer {
|
||||
service: self,
|
||||
};
|
||||
return ::std::result::Result::unwrap($crate::futures::Future::wait(FutureServiceExt::listen(__tarpc_service_service, addr)));
|
||||
return $crate::futures::Future::wait(
|
||||
FutureServiceExt::listen(__tarpc_service_service, addr));
|
||||
|
||||
#[derive(Clone)]
|
||||
struct __SyncServer<S> {
|
||||
@@ -591,7 +587,7 @@ macro_rules! service {
|
||||
impl<S> SyncServiceExt for S where S: SyncService {}
|
||||
|
||||
#[allow(unused)]
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Debug)]
|
||||
/// The client stub that makes RPC calls to the server. Exposes a blocking interface.
|
||||
pub struct SyncClient(FutureClient);
|
||||
|
||||
@@ -599,20 +595,9 @@ macro_rules! service {
|
||||
fn connect<A>(addr: A) -> ::std::result::Result<Self, ::std::io::Error>
|
||||
where A: ::std::net::ToSocketAddrs,
|
||||
{
|
||||
let mut addrs = try!(::std::net::ToSocketAddrs::to_socket_addrs(&addr));
|
||||
let addr = if let ::std::option::Option::Some(a) =
|
||||
::std::iter::Iterator::next(&mut addrs)
|
||||
{
|
||||
a
|
||||
} else {
|
||||
return ::std::result::Result::Err(
|
||||
::std::io::Error::new(
|
||||
::std::io::ErrorKind::AddrNotAvailable,
|
||||
"`ToSocketAddrs::to_socket_addrs` returned an empty iterator."));
|
||||
};
|
||||
let addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?;
|
||||
let client = <FutureClient as $crate::future::Connect>::connect(&addr);
|
||||
let client = $crate::futures::Future::wait(client);
|
||||
let client = SyncClient(try!(client));
|
||||
let client = SyncClient($crate::futures::Future::wait(client)?);
|
||||
::std::result::Result::Ok(client)
|
||||
}
|
||||
}
|
||||
@@ -621,8 +606,7 @@ macro_rules! service {
|
||||
$(
|
||||
#[allow(unused)]
|
||||
$(#[$attr])*
|
||||
#[inline]
|
||||
pub fn $fn_name(&self, $($arg: &$in_),*)
|
||||
pub fn $fn_name(&self, $($arg: $in_),*)
|
||||
-> ::std::result::Result<$out, $crate::Error<$error>>
|
||||
{
|
||||
let rpc = (self.0).$fn_name($($arg),*);
|
||||
@@ -631,17 +615,80 @@ macro_rules! service {
|
||||
)*
|
||||
}
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
type __tarpc_service_Client =
|
||||
$crate::Client<__tarpc_service_Request,
|
||||
__tarpc_service_Response,
|
||||
__tarpc_service_Error>;
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
/// Implementation detail: Pending connection.
|
||||
pub struct __tarpc_service_ConnectFuture<T> {
|
||||
inner: $crate::futures::Map<$crate::ConnectFuture<__tarpc_service_Request,
|
||||
__tarpc_service_Response,
|
||||
__tarpc_service_Error>,
|
||||
fn(__tarpc_service_Client) -> T>,
|
||||
}
|
||||
|
||||
impl<T> $crate::futures::Future for __tarpc_service_ConnectFuture<T> {
|
||||
type Item = T;
|
||||
type Error = ::std::io::Error;
|
||||
|
||||
fn poll(&mut self) -> $crate::futures::Poll<Self::Item, Self::Error> {
|
||||
$crate::futures::Future::poll(&mut self.inner)
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
/// Implementation detail: Pending connection.
|
||||
pub struct __tarpc_service_ConnectWithFuture<'a, T> {
|
||||
inner: $crate::futures::Map<$crate::ConnectWithFuture<'a,
|
||||
__tarpc_service_Request,
|
||||
__tarpc_service_Response,
|
||||
__tarpc_service_Error>,
|
||||
fn(__tarpc_service_Client) -> T>,
|
||||
}
|
||||
|
||||
impl<'a, T> $crate::futures::Future for __tarpc_service_ConnectWithFuture<'a, T> {
|
||||
type Item = T;
|
||||
type Error = ::std::io::Error;
|
||||
|
||||
fn poll(&mut self) -> $crate::futures::Poll<Self::Item, Self::Error> {
|
||||
$crate::futures::Future::poll(&mut self.inner)
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Debug)]
|
||||
/// The client stub that makes RPC calls to the server. Exposes a Future interface.
|
||||
pub struct FutureClient($crate::Client);
|
||||
pub struct FutureClient(__tarpc_service_Client);
|
||||
|
||||
impl $crate::future::Connect for FutureClient {
|
||||
type Fut = $crate::futures::Map<$crate::ClientFuture, fn($crate::Client) -> Self>;
|
||||
impl<'a> $crate::future::Connect<'a> for FutureClient {
|
||||
type ConnectFut = __tarpc_service_ConnectFuture<Self>;
|
||||
type ConnectWithFut = __tarpc_service_ConnectWithFuture<'a, Self>;
|
||||
|
||||
fn connect(addr: &::std::net::SocketAddr) -> Self::Fut {
|
||||
let client = <$crate::Client as $crate::future::Connect>::connect(addr);
|
||||
$crate::futures::Future::map(client, FutureClient)
|
||||
fn connect_remotely(__tarpc_service_addr: &::std::net::SocketAddr,
|
||||
__tarpc_service_remote: &$crate::tokio_core::reactor::Remote)
|
||||
-> Self::ConnectFut
|
||||
{
|
||||
let client = <__tarpc_service_Client as $crate::future::Connect>::connect_remotely(
|
||||
__tarpc_service_addr, __tarpc_service_remote);
|
||||
|
||||
__tarpc_service_ConnectFuture {
|
||||
inner: $crate::futures::Future::map(client, FutureClient)
|
||||
}
|
||||
}
|
||||
|
||||
fn connect_with(__tarpc_service_addr: &::std::net::SocketAddr,
|
||||
__tarpc_service_handle: &'a $crate::tokio_core::reactor::Handle)
|
||||
-> Self::ConnectWithFut
|
||||
{
|
||||
let client = <__tarpc_service_Client as $crate::future::Connect>::connect_with(
|
||||
__tarpc_service_addr, __tarpc_service_handle);
|
||||
|
||||
__tarpc_service_ConnectWithFuture {
|
||||
inner: $crate::futures::Future::map(client, FutureClient)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -649,52 +696,55 @@ macro_rules! service {
|
||||
$(
|
||||
#[allow(unused)]
|
||||
$(#[$attr])*
|
||||
#[inline]
|
||||
pub fn $fn_name(&self, $($arg: &$in_),*)
|
||||
pub fn $fn_name(&self, $($arg: $in_),*)
|
||||
-> impl $crate::futures::Future<Item=$out, Error=$crate::Error<$error>>
|
||||
+ 'static
|
||||
{
|
||||
$client_req
|
||||
$client_serialize_impl
|
||||
|
||||
future_enum! {
|
||||
enum Fut<C, F> {
|
||||
Called(C),
|
||||
Failed(F)
|
||||
}
|
||||
}
|
||||
|
||||
let __tarpc_service_args = ($($arg,)*);
|
||||
let __tarpc_service_req = &__ClientSideRequest::$fn_name(&__tarpc_service_args);
|
||||
let __tarpc_service_req =
|
||||
match $crate::Packet::serialize(&__tarpc_service_req)
|
||||
{
|
||||
::std::result::Result::Err(__tarpc_service_e) => {
|
||||
return Fut::Failed(
|
||||
$crate::futures::failed(
|
||||
$crate::Error::ClientSerialize(__tarpc_service_e)))
|
||||
}
|
||||
::std::result::Result::Ok(__tarpc_service_req) => __tarpc_service_req,
|
||||
};
|
||||
let __tarpc_service_req = __tarpc_service_Request::$fn_name(($($arg,)*));
|
||||
let __tarpc_service_fut =
|
||||
$crate::tokio_service::Service::call(&self.0, __tarpc_service_req);
|
||||
Fut::Called($crate::futures::Future::then(__tarpc_service_fut,
|
||||
move |__tarpc_service_msg| {
|
||||
let __tarpc_service_msg: Vec<u8> = try!(__tarpc_service_msg);
|
||||
let __tarpc_service_msg:
|
||||
::std::result::Result<
|
||||
::std::result::Result<$out, $crate::WireError<$error>>, _>
|
||||
= $crate::deserialize(&__tarpc_service_msg);
|
||||
match __tarpc_service_msg {
|
||||
$crate::futures::Future::then(__tarpc_service_fut,
|
||||
move |__tarpc_service_msg| {
|
||||
match __tarpc_service_msg? {
|
||||
::std::result::Result::Ok(__tarpc_service_msg) => {
|
||||
::std::result::Result::Ok(try!(__tarpc_service_msg))
|
||||
if let __tarpc_service_Response::$fn_name(__tarpc_service_msg) =
|
||||
__tarpc_service_msg
|
||||
{
|
||||
::std::result::Result::Ok(__tarpc_service_msg)
|
||||
} else {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
::std::result::Result::Err(__tarpc_service_e) => {
|
||||
::std::result::Result::Err(
|
||||
$crate::Error::ClientDeserialize(__tarpc_service_e))
|
||||
::std::result::Result::Err(__tarpc_service_err) => {
|
||||
::std::result::Result::Err(match __tarpc_service_err {
|
||||
$crate::Error::App(__tarpc_service_err) => {
|
||||
if let __tarpc_service_Error::$fn_name(
|
||||
__tarpc_service_err) = __tarpc_service_err
|
||||
{
|
||||
$crate::Error::App(__tarpc_service_err)
|
||||
} else {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
$crate::Error::ServerDeserialize(__tarpc_service_err) => {
|
||||
$crate::Error::ServerDeserialize(__tarpc_service_err)
|
||||
}
|
||||
$crate::Error::ServerSerialize(__tarpc_service_err) => {
|
||||
$crate::Error::ServerSerialize(__tarpc_service_err)
|
||||
}
|
||||
$crate::Error::ClientDeserialize(__tarpc_service_err) => {
|
||||
$crate::Error::ClientDeserialize(__tarpc_service_err)
|
||||
}
|
||||
$crate::Error::ClientSerialize(__tarpc_service_err) => {
|
||||
$crate::Error::ClientSerialize(__tarpc_service_err)
|
||||
}
|
||||
$crate::Error::Io(__tarpc_service_error) => {
|
||||
$crate::Error::Io(__tarpc_service_error)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}))
|
||||
})
|
||||
}
|
||||
)*
|
||||
|
||||
@@ -730,6 +780,7 @@ mod syntax_test {
|
||||
#[cfg(test)]
|
||||
mod functional_test {
|
||||
use futures::{Future, failed};
|
||||
use util::FirstSocketAddr;
|
||||
extern crate env_logger;
|
||||
|
||||
service! {
|
||||
@@ -741,6 +792,7 @@ mod functional_test {
|
||||
use super::{SyncClient, SyncService, SyncServiceExt};
|
||||
use super::env_logger;
|
||||
use sync::Connect;
|
||||
use util::FirstSocketAddr;
|
||||
use util::Never;
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
@@ -758,25 +810,16 @@ mod functional_test {
|
||||
#[test]
|
||||
fn simple() {
|
||||
let _ = env_logger::init();
|
||||
let handle = Server.listen("localhost:0");
|
||||
let handle = Server.listen("localhost:0".first_socket_addr()).unwrap();
|
||||
let client = SyncClient::connect(handle.local_addr()).unwrap();
|
||||
assert_eq!(3, client.add(&1, &2).unwrap());
|
||||
assert_eq!("Hey, Tim.", client.hey(&"Tim".to_string()).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clone() {
|
||||
let handle = Server.listen("localhost:0");
|
||||
let client1 = SyncClient::connect(handle.local_addr()).unwrap();
|
||||
let client2 = client1.clone();
|
||||
assert_eq!(3, client1.add(&1, &2).unwrap());
|
||||
assert_eq!(3, client2.add(&1, &2).unwrap());
|
||||
assert_eq!(3, client.add(1, 2).unwrap());
|
||||
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn other_service() {
|
||||
let _ = env_logger::init();
|
||||
let handle = Server.listen("localhost:0");
|
||||
let handle = Server.listen("localhost:0".first_socket_addr()).unwrap();
|
||||
let client = super::other_service::SyncClient::connect(handle.local_addr()).unwrap();
|
||||
match client.foo().err().unwrap() {
|
||||
::Error::ServerDeserialize(_) => {} // good
|
||||
@@ -790,6 +833,7 @@ mod functional_test {
|
||||
use futures::{Finished, Future, finished};
|
||||
use super::{FutureClient, FutureService, FutureServiceExt};
|
||||
use super::env_logger;
|
||||
use util::FirstSocketAddr;
|
||||
use util::Never;
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -812,26 +856,16 @@ mod functional_test {
|
||||
#[test]
|
||||
fn simple() {
|
||||
let _ = env_logger::init();
|
||||
let handle = Server.listen("localhost:0").wait().unwrap();
|
||||
let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
let client = FutureClient::connect(handle.local_addr()).wait().unwrap();
|
||||
assert_eq!(3, client.add(&1, &2).wait().unwrap());
|
||||
assert_eq!("Hey, Tim.", client.hey(&"Tim".to_string()).wait().unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clone() {
|
||||
let _ = env_logger::init();
|
||||
let handle = Server.listen("localhost:0").wait().unwrap();
|
||||
let client1 = FutureClient::connect(handle.local_addr()).wait().unwrap();
|
||||
let client2 = client1.clone();
|
||||
assert_eq!(3, client1.add(&1, &2).wait().unwrap());
|
||||
assert_eq!(3, client2.add(&1, &2).wait().unwrap());
|
||||
assert_eq!(3, client.add(1, 2).wait().unwrap());
|
||||
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn other_service() {
|
||||
let _ = env_logger::init();
|
||||
let handle = Server.listen("localhost:0").wait().unwrap();
|
||||
let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
let client =
|
||||
super::other_service::FutureClient::connect(handle.local_addr()).wait().unwrap();
|
||||
match client.foo().wait().err().unwrap() {
|
||||
@@ -867,7 +901,7 @@ mod functional_test {
|
||||
use self::error_service::*;
|
||||
let _ = env_logger::init();
|
||||
|
||||
let handle = ErrorServer.listen("localhost:0").wait().unwrap();
|
||||
let handle = ErrorServer.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
let client = FutureClient::connect(handle.local_addr()).wait().unwrap();
|
||||
client.bar()
|
||||
.then(move |result| {
|
||||
|
||||
@@ -4,7 +4,7 @@ version = "0.1.0"
|
||||
authors = ["Tim Kuehn <tikue@google.com>"]
|
||||
|
||||
[dependencies]
|
||||
itertools = "0.4"
|
||||
itertools = "0.5"
|
||||
|
||||
[lib]
|
||||
plugin = true
|
||||
|
||||
@@ -21,7 +21,7 @@ use syntax::tokenstream::TokenTree;
|
||||
use syntax::util::small_vector::SmallVector;
|
||||
|
||||
fn snake_to_camel(cx: &mut ExtCtxt, sp: Span, tts: &[TokenTree]) -> Box<MacResult + 'static> {
|
||||
let mut parser = parse::new_parser_from_tts(cx.parse_sess(), cx.cfg(), tts.into());
|
||||
let mut parser = parse::new_parser_from_tts(cx.parse_sess(), tts.into());
|
||||
// The `expand_expr` method is called so that any macro calls in the
|
||||
// parsed expression are expanded.
|
||||
|
||||
@@ -69,7 +69,7 @@ fn snake_to_camel(cx: &mut ExtCtxt, sp: Span, tts: &[TokenTree]) -> Box<MacResul
|
||||
}
|
||||
|
||||
fn impl_snake_to_camel(cx: &mut ExtCtxt, sp: Span, tts: &[TokenTree]) -> Box<MacResult + 'static> {
|
||||
let mut parser = parse::new_parser_from_tts(cx.parse_sess(), cx.cfg(), tts.into());
|
||||
let mut parser = parse::new_parser_from_tts(cx.parse_sess(), tts.into());
|
||||
// The `expand_expr` method is called so that any macro calls in the
|
||||
// parsed expression are expanded.
|
||||
|
||||
@@ -91,7 +91,7 @@ fn impl_snake_to_camel(cx: &mut ExtCtxt, sp: Span, tts: &[TokenTree]) -> Box<Mac
|
||||
}
|
||||
|
||||
fn ty_snake_to_camel(cx: &mut ExtCtxt, sp: Span, tts: &[TokenTree]) -> Box<MacResult + 'static> {
|
||||
let mut parser = parse::new_parser_from_tts(cx.parse_sess(), cx.cfg(), tts.into());
|
||||
let mut parser = parse::new_parser_from_tts(cx.parse_sess(), tts.into());
|
||||
// The `expand_expr` method is called so that any macro calls in the
|
||||
// parsed expression are expanded.
|
||||
|
||||
@@ -170,7 +170,7 @@ impl<'a> ParseTraitRef for Parser<'a> {
|
||||
/// Parse a::B<String,i32>
|
||||
fn parse_trait_ref(&mut self) -> PResult<TraitRef> {
|
||||
Ok(TraitRef {
|
||||
path: try!(self.parse_path(PathStyle::Type)),
|
||||
path: self.parse_path(PathStyle::Type)?,
|
||||
ref_id: ast::DUMMY_NODE_ID,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,109 +0,0 @@
|
||||
// Copyright 2016 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
use serde;
|
||||
use futures::{self, Async};
|
||||
use bincode::{SizeLimit, serde as bincode};
|
||||
use std::{io, thread};
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::mpsc;
|
||||
use util::Never;
|
||||
use tokio_core::io::{FramedIo, Io};
|
||||
use tokio_core::reactor::{Core, Remote};
|
||||
use tokio_proto::pipeline::Frame;
|
||||
|
||||
lazy_static! {
|
||||
#[doc(hidden)]
|
||||
pub static ref LOOP_HANDLE: Remote = {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let mut lupe = Core::new().unwrap();
|
||||
tx.send(lupe.handle().remote().clone()).unwrap();
|
||||
// Run forever
|
||||
lupe.run(futures::empty::<(), !>()).unwrap();
|
||||
});
|
||||
rx.recv().unwrap()
|
||||
};
|
||||
}
|
||||
|
||||
pub use self::writer::Packet;
|
||||
|
||||
pub mod reader;
|
||||
pub mod writer;
|
||||
|
||||
/// A helper trait to provide the `map_non_block` function on Results.
|
||||
trait MapNonBlock<T> {
|
||||
/// Maps a `Result<T>` to a `Result<Option<T>>` by converting
|
||||
/// operation-would-block errors into `Ok(None)`.
|
||||
fn map_non_block(self) -> io::Result<Option<T>>;
|
||||
}
|
||||
|
||||
impl<T> MapNonBlock<T> for io::Result<T> {
|
||||
fn map_non_block(self) -> io::Result<Option<T>> {
|
||||
use std::io::ErrorKind::WouldBlock;
|
||||
|
||||
match self {
|
||||
Ok(value) => Ok(Some(value)),
|
||||
Err(err) => {
|
||||
if let WouldBlock = err.kind() {
|
||||
Ok(None)
|
||||
} else {
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Deserialize a buffer into a `D` and its ID. On error, returns `tarpc::Error`.
|
||||
pub fn deserialize<D: serde::Deserialize>(mut buf: &[u8]) -> Result<D, bincode::DeserializeError> {
|
||||
bincode::deserialize_from(&mut buf, SizeLimit::Infinite)
|
||||
}
|
||||
|
||||
pub struct TarpcTransport<T> {
|
||||
stream: T,
|
||||
read_state: reader::ReadState,
|
||||
outbound: VecDeque<Packet>,
|
||||
head: Option<Packet>,
|
||||
}
|
||||
|
||||
impl<T> TarpcTransport<T> {
|
||||
pub fn new(stream: T) -> Self {
|
||||
TarpcTransport {
|
||||
stream: stream,
|
||||
read_state: reader::ReadState::init(),
|
||||
outbound: VecDeque::new(),
|
||||
head: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> FramedIo for TarpcTransport<T>
|
||||
where T: Io
|
||||
{
|
||||
type In = Frame<Packet, Never, io::Error>;
|
||||
type Out = Frame<Vec<u8>, Never, io::Error>;
|
||||
|
||||
fn poll_read(&mut self) -> Async<()> {
|
||||
self.stream.poll_read()
|
||||
}
|
||||
|
||||
fn poll_write(&mut self) -> Async<()> {
|
||||
self.stream.poll_write()
|
||||
}
|
||||
|
||||
fn read(&mut self) -> io::Result<Async<Frame<Vec<u8>, Never, io::Error>>> {
|
||||
self.read_state.next(&mut self.stream)
|
||||
}
|
||||
|
||||
fn write(&mut self, req: Self::In) -> io::Result<Async<()>> {
|
||||
self.outbound.push_back(req.unwrap_msg());
|
||||
self.flush()
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<Async<()>> {
|
||||
writer::NextWriteState::next(&mut self.head, &mut self.stream, &mut self.outbound)
|
||||
}
|
||||
}
|
||||
@@ -1,154 +0,0 @@
|
||||
// Copyright 2016 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
use byteorder::{BigEndian, ReadBytesExt};
|
||||
use bytes::{MutBuf, Take};
|
||||
use futures::Async;
|
||||
use std::io;
|
||||
use std::mem;
|
||||
use tokio_proto::TryRead;
|
||||
use tokio_proto::pipeline::Frame;
|
||||
use util::Never;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct U64Reader {
|
||||
read: usize,
|
||||
data: [u8; 8],
|
||||
}
|
||||
|
||||
impl U64Reader {
|
||||
fn new() -> Self {
|
||||
U64Reader {
|
||||
read: 0,
|
||||
data: [0; 8],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MutBuf for U64Reader {
|
||||
fn remaining(&self) -> usize {
|
||||
8 - self.read
|
||||
}
|
||||
|
||||
unsafe fn advance(&mut self, count: usize) {
|
||||
self.read += count;
|
||||
}
|
||||
|
||||
unsafe fn mut_bytes(&mut self) -> &mut [u8] {
|
||||
&mut self.data[self.read..]
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum NextReadAction<R> {
|
||||
Continue,
|
||||
Stop(Result<R, io::Error>),
|
||||
}
|
||||
|
||||
trait MutBufExt: MutBuf + Sized {
|
||||
type Inner;
|
||||
|
||||
fn take(&mut self) -> Self::Inner;
|
||||
|
||||
fn try_read<R: TryRead>(&mut self, stream: &mut R) -> io::Result<NextReadAction<Self::Inner>> {
|
||||
while let Async::Ready(bytes_read) = stream.try_read_buf(self)? {
|
||||
debug!("Reader: read {} bytes, {} remaining.",
|
||||
bytes_read,
|
||||
self.remaining());
|
||||
if bytes_read == 0 {
|
||||
debug!("Reader: connection broken.");
|
||||
let err = io::Error::new(io::ErrorKind::BrokenPipe, "The connection was closed.");
|
||||
return Ok(NextReadAction::Stop(Err(err)));
|
||||
}
|
||||
|
||||
if !self.has_remaining() {
|
||||
trace!("Reader: finished.");
|
||||
return Ok(NextReadAction::Stop(Ok(self.take())));
|
||||
}
|
||||
}
|
||||
Ok(NextReadAction::Continue)
|
||||
}
|
||||
}
|
||||
|
||||
impl MutBufExt for U64Reader {
|
||||
type Inner = u64;
|
||||
|
||||
fn take(&mut self) -> u64 {
|
||||
(&self.data as &[u8]).read_u64::<BigEndian>().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl MutBufExt for Take<Vec<u8>> {
|
||||
type Inner = Vec<u8>;
|
||||
|
||||
fn take(&mut self) -> Vec<u8> {
|
||||
mem::replace(self.get_mut(), vec![])
|
||||
}
|
||||
}
|
||||
|
||||
/// A state machine that reads packets in non-blocking fashion.
|
||||
#[derive(Debug)]
|
||||
pub enum ReadState {
|
||||
/// Tracks how many bytes of the message size have been read.
|
||||
Len(U64Reader),
|
||||
/// Tracks read progress.
|
||||
Data(Take<Vec<u8>>),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum NextReadState {
|
||||
Same,
|
||||
Next(ReadState),
|
||||
Reset(Vec<u8>),
|
||||
}
|
||||
|
||||
impl ReadState {
|
||||
pub fn init() -> ReadState {
|
||||
ReadState::Len(U64Reader::new())
|
||||
}
|
||||
|
||||
pub fn next<R: TryRead>(&mut self,
|
||||
socket: &mut R)
|
||||
-> io::Result<Async<Frame<Vec<u8>, Never, io::Error>>> {
|
||||
loop {
|
||||
let next = match *self {
|
||||
ReadState::Len(ref mut len) => {
|
||||
match len.try_read(socket)? {
|
||||
NextReadAction::Continue => NextReadState::Same,
|
||||
NextReadAction::Stop(result) => {
|
||||
match result {
|
||||
Ok(len) => {
|
||||
let buf = Vec::with_capacity(len as usize);
|
||||
NextReadState::Next(ReadState::Data(Take::new(buf,
|
||||
len as usize)))
|
||||
}
|
||||
Err(e) => return Ok(Async::Ready(Frame::Error(e))),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
ReadState::Data(ref mut buf) => {
|
||||
match buf.try_read(socket)? {
|
||||
NextReadAction::Continue => NextReadState::Same,
|
||||
NextReadAction::Stop(result) => {
|
||||
match result {
|
||||
Ok(buf) => NextReadState::Reset(buf),
|
||||
Err(e) => return Ok(Async::Ready(Frame::Error(e))),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
match next {
|
||||
NextReadState::Same => return Ok(Async::NotReady),
|
||||
NextReadState::Next(next) => *self = next,
|
||||
NextReadState::Reset(packet) => {
|
||||
*self = ReadState::init();
|
||||
return Ok(Async::Ready(Frame::Message(packet)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,108 +0,0 @@
|
||||
// Copyright 2016 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
use bincode::SizeLimit;
|
||||
use bincode::serde as bincode;
|
||||
use byteorder::{BigEndian, WriteBytesExt};
|
||||
use bytes::Buf;
|
||||
use futures::Async;
|
||||
use serde::Serialize;
|
||||
use std::collections::VecDeque;
|
||||
use std::io::{self, Cursor};
|
||||
use std::mem;
|
||||
use tokio_proto::TryWrite;
|
||||
|
||||
/// The means of communication between client and server.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Packet {
|
||||
/// (payload_len: u64, payload)
|
||||
///
|
||||
/// The payload is typically a serialized message.
|
||||
pub buf: Cursor<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl Packet {
|
||||
/// Creates a new packet, (len, payload)
|
||||
pub fn serialize<S>(message: &S) -> Result<Packet, bincode::SerializeError>
|
||||
where S: Serialize
|
||||
{
|
||||
let payload_len = bincode::serialized_size(message);
|
||||
|
||||
// (len, message)
|
||||
let mut buf = Vec::with_capacity(mem::size_of::<u64>() + payload_len as usize);
|
||||
|
||||
buf.write_u64::<BigEndian>(payload_len).unwrap();
|
||||
bincode::serialize_into(&mut buf, message, SizeLimit::Infinite)?;
|
||||
Ok(Packet { buf: Cursor::new(buf) })
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum NextWriteAction {
|
||||
Stop,
|
||||
Continue,
|
||||
}
|
||||
|
||||
trait BufExt: Buf + Sized {
|
||||
/// Writes data to stream. Returns Ok(true) if all data has been written or Ok(false) if
|
||||
/// there's still data to write.
|
||||
fn try_write<W: TryWrite>(&mut self, stream: &mut W) -> io::Result<NextWriteAction> {
|
||||
while let Async::Ready(bytes_written) = stream.try_write_buf(self)? {
|
||||
debug!("Writer: wrote {} bytes; {} remaining.",
|
||||
bytes_written,
|
||||
self.remaining());
|
||||
if bytes_written == 0 {
|
||||
trace!("Writer: would block.");
|
||||
return Ok(NextWriteAction::Continue);
|
||||
}
|
||||
if !self.has_remaining() {
|
||||
return Ok(NextWriteAction::Stop);
|
||||
}
|
||||
}
|
||||
Ok(NextWriteAction::Continue)
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: Buf> BufExt for B {}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum NextWriteState {
|
||||
Nothing,
|
||||
Next(Packet),
|
||||
}
|
||||
|
||||
impl NextWriteState {
|
||||
pub fn next<W: TryWrite>(state: &mut Option<Packet>,
|
||||
socket: &mut W,
|
||||
outbound: &mut VecDeque<Packet>)
|
||||
-> io::Result<Async<()>> {
|
||||
loop {
|
||||
let update = match *state {
|
||||
None => {
|
||||
match outbound.pop_front() {
|
||||
Some(packet) => {
|
||||
let size = packet.buf.remaining() as u64;
|
||||
debug_assert!(size >= mem::size_of::<u64>() as u64);
|
||||
NextWriteState::Next(packet)
|
||||
}
|
||||
None => return Ok(Async::Ready(())),
|
||||
}
|
||||
}
|
||||
Some(ref mut packet) => {
|
||||
match BufExt::try_write(&mut packet.buf, socket)? {
|
||||
NextWriteAction::Stop => NextWriteState::Nothing,
|
||||
NextWriteAction::Continue => return Ok(Async::NotReady),
|
||||
}
|
||||
}
|
||||
};
|
||||
match update {
|
||||
NextWriteState::Next(next) => *state = Some(next),
|
||||
NextWriteState::Nothing => {
|
||||
*state = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,77 +3,68 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
use errors::{SerializableError, WireError};
|
||||
use REMOTE;
|
||||
use bincode::serde::DeserializeError;
|
||||
use errors::WireError;
|
||||
use framed::Framed;
|
||||
use futures::{self, Async, Future};
|
||||
use futures::stream::Empty;
|
||||
use protocol::{LOOP_HANDLE, TarpcTransport};
|
||||
use protocol::writer::Packet;
|
||||
use serde::Serialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io;
|
||||
use std::net::ToSocketAddrs;
|
||||
use tokio_proto::pipeline;
|
||||
use std::net::SocketAddr;
|
||||
use tokio_core::reactor::Handle;
|
||||
use tokio_proto::easy::multiplex;
|
||||
use tokio_proto::server::{self, ServerHandle};
|
||||
use tokio_service::NewService;
|
||||
use util::Never;
|
||||
|
||||
/// Spawns a service that binds to the given address and runs on the default tokio `Loop`.
|
||||
pub fn listen<A, T>(addr: A, new_service: T) -> ListenFuture
|
||||
where T: NewService<Request = Vec<u8>,
|
||||
Response = pipeline::Message<Packet, Empty<Never, io::Error>>,
|
||||
/// A message from server to client.
|
||||
pub type Response<T, E> = Result<T, WireError<E>>;
|
||||
|
||||
/// Spawns a service that binds to the given address and runs on the default reactor core.
|
||||
pub fn listen<S, Req, Resp, E>(addr: SocketAddr, new_service: S) -> ListenFuture
|
||||
where S: NewService<Request = Result<Req, DeserializeError>,
|
||||
Response = Response<Resp, E>,
|
||||
Error = io::Error> + Send + 'static,
|
||||
A: ToSocketAddrs
|
||||
Req: Deserialize + 'static,
|
||||
Resp: Serialize + 'static,
|
||||
E: Serialize + 'static
|
||||
{
|
||||
// TODO(tikue): don't use ToSocketAddrs, or don't unwrap.
|
||||
let addr = addr.to_socket_addrs().unwrap().next().unwrap();
|
||||
|
||||
let (tx, rx) = futures::oneshot();
|
||||
LOOP_HANDLE.spawn(move |handle| {
|
||||
Ok(tx.complete(server::listen(handle, addr, move |stream| {
|
||||
pipeline::Server::new(new_service.new_service()?, TarpcTransport::new(stream))
|
||||
}).unwrap()))
|
||||
});
|
||||
REMOTE.spawn(move |handle| Ok(tx.complete(listen_with(addr, new_service, handle))));
|
||||
ListenFuture { inner: rx }
|
||||
}
|
||||
|
||||
/// Spawns a service that binds to the given address using the given handle.
|
||||
pub fn listen_with<S, Req, Resp, E>(addr: SocketAddr,
|
||||
new_service: S,
|
||||
handle: &Handle)
|
||||
-> io::Result<ServerHandle>
|
||||
where S: NewService<Request = Result<Req, DeserializeError>,
|
||||
Response = Response<Resp, E>,
|
||||
Error = io::Error> + Send + 'static,
|
||||
Req: Deserialize + 'static,
|
||||
Resp: Serialize + 'static,
|
||||
E: Serialize + 'static
|
||||
{
|
||||
server::listen(handle, addr, move |stream| {
|
||||
Ok(multiplex::EasyServer::new(new_service.new_service()?, Framed::new(stream))
|
||||
.map_err(|()| panic!("What do we do here")))
|
||||
})
|
||||
}
|
||||
|
||||
/// A future that resolves to a `ServerHandle`.
|
||||
pub struct ListenFuture {
|
||||
inner: futures::Oneshot<ServerHandle>,
|
||||
inner: futures::Oneshot<io::Result<ServerHandle>>,
|
||||
}
|
||||
|
||||
impl Future for ListenFuture {
|
||||
type Item = ServerHandle;
|
||||
type Error = Never;
|
||||
type Error = io::Error;
|
||||
|
||||
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
|
||||
// Can't panic the oneshot is always completed.
|
||||
match self.inner.poll().unwrap() {
|
||||
Async::Ready(server_handle) => Ok(Async::Ready(server_handle)),
|
||||
Async::Ready(result) => result.map(Async::Ready),
|
||||
Async::NotReady => Ok(Async::NotReady),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a future containing the serialized reply.
|
||||
///
|
||||
/// Because serialization can take a non-trivial
|
||||
/// amount of cpu time, it is run on a thread pool.
|
||||
#[doc(hidden)]
|
||||
#[inline]
|
||||
pub fn serialize_reply<T: Serialize + Send + 'static,
|
||||
E: SerializableError>(result: Result<T, WireError<E>>)
|
||||
-> SerializeFuture
|
||||
{
|
||||
let packet = match Packet::serialize(&result) {
|
||||
Ok(packet) => packet,
|
||||
Err(e) => {
|
||||
let err: Result<T, WireError<E>> = Err(WireError::ServerSerialize(e.to_string()));
|
||||
Packet::serialize(&err).unwrap()
|
||||
}
|
||||
};
|
||||
futures::finished(pipeline::Message::WithoutBody(packet))
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub type SerializeFuture = futures::Finished<SerializedReply, io::Error>;
|
||||
|
||||
#[doc(hidden)]
|
||||
pub type SerializedReply = pipeline::Message<Packet, Empty<Never, io::Error>>;
|
||||
|
||||
85
src/util.rs
85
src/util.rs
@@ -3,9 +3,14 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
use std::fmt;
|
||||
use std::error::Error;
|
||||
use futures::{self, Future, Poll};
|
||||
use futures::stream::Stream;
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use std::error::Error;
|
||||
use std::{fmt, io, thread};
|
||||
use std::net::{SocketAddr, ToSocketAddrs};
|
||||
use std::sync::mpsc;
|
||||
use tokio_core::reactor;
|
||||
|
||||
/// A bottom type that impls `Error`, `Serialize`, and `Deserialize`. It is impossible to
|
||||
/// instantiate this type.
|
||||
@@ -14,13 +19,43 @@ pub struct Never(!);
|
||||
|
||||
impl Error for Never {
|
||||
fn description(&self) -> &str {
|
||||
unreachable!()
|
||||
match self.0 {
|
||||
// TODO(tikue): remove when https://github.com/rust-lang/rust/issues/12609 lands
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Never {
|
||||
fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result {
|
||||
unreachable!()
|
||||
match self.0 {
|
||||
// TODO(tikue): remove when https://github.com/rust-lang/rust/issues/12609 lands
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for Never {
|
||||
type Item = Never;
|
||||
type Error = Never;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
match self.0 {
|
||||
// TODO(tikue): remove when https://github.com/rust-lang/rust/issues/12609 lands
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for Never {
|
||||
type Item = Never;
|
||||
type Error = Never;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
match self.0 {
|
||||
// TODO(tikue): remove when https://github.com/rust-lang/rust/issues/12609 lands
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,13 +63,16 @@ impl Serialize for Never {
|
||||
fn serialize<S>(&self, _: &mut S) -> Result<(), S::Error>
|
||||
where S: Serializer
|
||||
{
|
||||
unreachable!()
|
||||
match self.0 {
|
||||
// TODO(tikue): remove when https://github.com/rust-lang/rust/issues/12609 lands
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Please don't try to deserialize this. :(
|
||||
impl Deserialize for Never {
|
||||
fn deserialize<D>(_: &mut D) -> Result<Self, D::Error>
|
||||
fn deserialize<D>(_: &mut D) -> Result<Self, D::Error>
|
||||
where D: Deserializer
|
||||
{
|
||||
panic!("Never cannot be instantiated!");
|
||||
@@ -62,3 +100,38 @@ impl<S: Into<String>> From<S> for Message {
|
||||
Message(s.into())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Provides a utility method for more ergonomically parsing a `SocketAddr` when only one is
|
||||
/// needed.
|
||||
pub trait FirstSocketAddr: ToSocketAddrs {
|
||||
/// Returns the first resolved `SocketAddr`, if one exists.
|
||||
fn try_first_socket_addr(&self) -> io::Result<SocketAddr> {
|
||||
if let Some(a) = self.to_socket_addrs()?.next() {
|
||||
Ok(a)
|
||||
} else {
|
||||
Err(io::Error::new(io::ErrorKind::AddrNotAvailable,
|
||||
"`ToSocketAddrs::to_socket_addrs` returned an empty iterator."))
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the first resolved `SocketAddr` or panics otherwise.
|
||||
fn first_socket_addr(&self) -> SocketAddr {
|
||||
self.try_first_socket_addr().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: ToSocketAddrs> FirstSocketAddr for A {}
|
||||
|
||||
/// Spawns a `reactor::Core` running forever on a new thread.
|
||||
pub fn spawn_core() -> reactor::Remote {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let mut core = reactor::Core::new().unwrap();
|
||||
tx.send(core.handle().remote().clone()).unwrap();
|
||||
|
||||
// Run forever
|
||||
core.run(futures::empty::<(), !>()).unwrap();
|
||||
});
|
||||
rx.recv().unwrap()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user