mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-02-23 15:49:54 +01:00
Merge pull request #63 from tikue/tokio-tracking
Track more changes to tokio
This commit is contained in:
@@ -28,8 +28,8 @@ tokio-core = { git = "https://github.com/tokio-rs/tokio-core" }
|
||||
net2 = "0.2"
|
||||
|
||||
[replace]
|
||||
"tokio-core:0.1.1" = { git = "https://github.com/tokio-rs/tokio-core" }
|
||||
"futures:0.1.6" = { git = "https://github.com/alexcrichton/futures-rs" }
|
||||
"tokio-core:0.1.3" = { git = "https://github.com/tokio-rs/tokio-core" }
|
||||
"futures:0.1.7" = { git = "https://github.com/alexcrichton/futures-rs" }
|
||||
|
||||
[dev-dependencies]
|
||||
chrono = "0.2"
|
||||
|
||||
24
README.md
24
README.md
@@ -62,7 +62,7 @@ service! {
|
||||
struct HelloServer;
|
||||
|
||||
impl SyncService for HelloServer {
|
||||
fn hello(&self, name: String) -> Result<String, Never> {
|
||||
fn hello(&mut self, name: String) -> Result<String, Never> {
|
||||
Ok(format!("Hello, {}!", name))
|
||||
}
|
||||
}
|
||||
@@ -87,7 +87,7 @@ races! See the tarpc_examples package for more examples.
|
||||
|
||||
## Example: Futures
|
||||
|
||||
Here's the same server, implemented using `FutureService`.
|
||||
Here's the same service, implemented using futures.
|
||||
|
||||
```rust
|
||||
#![feature(conservative_impl_trait, plugin)]
|
||||
@@ -96,9 +96,12 @@ Here's the same server, implemented using `FutureService`.
|
||||
extern crate futures;
|
||||
#[macro_use]
|
||||
extern crate tarpc;
|
||||
extern crate tokio_core;
|
||||
|
||||
use futures::Future;
|
||||
use tarpc::future::Connect;
|
||||
use tarpc::util::{FirstSocketAddr, Never};
|
||||
use tarpc::sync::Connect;
|
||||
use tokio_core::reactor;
|
||||
|
||||
service! {
|
||||
rpc hello(name: String) -> String;
|
||||
@@ -110,16 +113,21 @@ struct HelloServer;
|
||||
impl FutureService for HelloServer {
|
||||
type HelloFut = futures::Finished<String, Never>;
|
||||
|
||||
fn hello(&self, name: String) -> Self::HelloFut {
|
||||
fn hello(&mut self, name: String) -> Self::HelloFut {
|
||||
futures::finished(format!("Hello, {}!", name))
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let addr = "localhost:10000";
|
||||
let _server = HelloServer.listen(addr.first_socket_addr());
|
||||
let client = SyncClient::connect(addr).unwrap();
|
||||
println!("{}", client.hello("Mom".to_string()).unwrap());
|
||||
let addr = "localhost:10000".first_socket_addr();
|
||||
let mut core = reactor::Core::new().unwrap();
|
||||
HelloServer.listen_with(addr, core.handle()).unwrap();
|
||||
core.run(
|
||||
FutureClient::connect(&addr)
|
||||
.map_err(tarpc::Error::from)
|
||||
.and_then(|mut client| client.hello("Mom".to_string()))
|
||||
.map(|resp| println!("{}", resp))
|
||||
).unwrap();
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ struct Server;
|
||||
|
||||
impl FutureService for Server {
|
||||
type AckFut = futures::Finished<(), Never>;
|
||||
fn ack(&self) -> Self::AckFut {
|
||||
fn ack(&mut self) -> Self::AckFut {
|
||||
futures::finished(())
|
||||
}
|
||||
}
|
||||
@@ -37,8 +37,8 @@ impl FutureService for Server {
|
||||
#[bench]
|
||||
fn latency(bencher: &mut Bencher) {
|
||||
let _ = env_logger::init();
|
||||
let server = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
let client = SyncClient::connect(server.local_addr()).unwrap();
|
||||
let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
let mut client = SyncClient::connect(addr).unwrap();
|
||||
|
||||
bencher.iter(|| {
|
||||
client.ack().unwrap();
|
||||
|
||||
@@ -18,11 +18,12 @@ extern crate tokio_core;
|
||||
extern crate futures_cpupool;
|
||||
|
||||
use clap::{Arg, App};
|
||||
use futures::Future;
|
||||
use futures::{Future, Stream};
|
||||
use futures_cpupool::{CpuFuture, CpuPool};
|
||||
use std::cmp;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use std::time::{Duration, Instant};
|
||||
use tarpc::future::{Connect};
|
||||
use tarpc::util::{FirstSocketAddr, Never, spawn_core};
|
||||
use tokio_core::reactor;
|
||||
@@ -49,7 +50,7 @@ impl Server {
|
||||
impl FutureService for Server {
|
||||
type ReadFut = CpuFuture<Vec<u8>, Never>;
|
||||
|
||||
fn read(&self, size: u32) -> Self::ReadFut {
|
||||
fn read(&mut self, size: u32) -> Self::ReadFut {
|
||||
let request_number = self.request_count.fetch_add(1, Ordering::SeqCst);
|
||||
debug!("Server received read({}) no. {}", size, request_number);
|
||||
self.pool
|
||||
@@ -79,44 +80,47 @@ impl Microseconds for Duration {
|
||||
}
|
||||
}
|
||||
|
||||
fn run_once(clients: Vec<FutureClient>, concurrency: u32) -> impl Future<Item=(), Error=()> {
|
||||
#[derive(Default)]
|
||||
struct Stats {
|
||||
sum: Duration,
|
||||
count: u64,
|
||||
min: Option<Duration>,
|
||||
max: Option<Duration>,
|
||||
}
|
||||
|
||||
fn run_once(mut clients: Vec<FutureClient>, concurrency: u32) -> impl Future<Item=(), Error=()> + 'static {
|
||||
let start = Instant::now();
|
||||
let futs = clients.iter()
|
||||
.enumerate()
|
||||
.cycle()
|
||||
.enumerate()
|
||||
.take(concurrency as usize)
|
||||
.map(|(iteration, (client_id, client))| {
|
||||
let iteration = iteration + 1;
|
||||
let start = SystemTime::now();
|
||||
debug!("Client {} reading (iteration {})...", client_id, iteration);
|
||||
let future = client.read(CHUNK_SIZE).map(move |_| {
|
||||
let elapsed = start.elapsed().unwrap();
|
||||
debug!("Client {} received reply (iteration {}).", client_id, iteration);
|
||||
elapsed
|
||||
});
|
||||
future
|
||||
let num_clients = clients.len();
|
||||
futures::stream::futures_unordered((0..concurrency as usize)
|
||||
.map(|iteration| (iteration + 1, iteration % num_clients))
|
||||
.map(|(iteration, client_idx)| {
|
||||
let mut client = &mut clients[client_idx];
|
||||
let start = Instant::now();
|
||||
debug!("Client {} reading (iteration {})...", client_idx, iteration);
|
||||
client.read(CHUNK_SIZE)
|
||||
.map(move |_| (client_idx, iteration, start))
|
||||
}))
|
||||
.map(|(client_idx, iteration, start)| {
|
||||
let elapsed = start.elapsed();
|
||||
debug!("Client {} received reply (iteration {}).", client_idx, iteration);
|
||||
elapsed
|
||||
})
|
||||
.map_err(|e| panic!(e))
|
||||
.fold(Stats::default(), move |mut stats, elapsed| {
|
||||
stats.sum += elapsed;
|
||||
stats.count += 1;
|
||||
stats.min = Some(cmp::min(stats.min.unwrap_or(elapsed), elapsed));
|
||||
stats.max = Some(cmp::max(stats.max.unwrap_or(elapsed), elapsed));
|
||||
Ok(stats)
|
||||
})
|
||||
.map(move |stats| {
|
||||
info!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs",
|
||||
stats.count,
|
||||
stats.sum.microseconds() as f64 / stats.count as f64,
|
||||
stats.min.unwrap().microseconds(),
|
||||
stats.max.unwrap().microseconds(),
|
||||
start.elapsed().microseconds());
|
||||
})
|
||||
// Need an intermediate collection to kick off each future,
|
||||
// because futures::collect will iterate sequentially.
|
||||
.collect::<Vec<_>>();
|
||||
let futs = futures::collect(futs);
|
||||
|
||||
futs.map(move |latencies| {
|
||||
let total_time = start.elapsed();
|
||||
|
||||
let sum_latencies = latencies.iter().fold(Duration::new(0, 0), |sum, &dur| sum + dur);
|
||||
let mean = sum_latencies / latencies.len() as u32;
|
||||
let min_latency = *latencies.iter().min().unwrap();
|
||||
let max_latency = *latencies.iter().max().unwrap();
|
||||
|
||||
info!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs",
|
||||
latencies.len(),
|
||||
mean.microseconds(),
|
||||
min_latency.microseconds(),
|
||||
max_latency.microseconds(),
|
||||
total_time.microseconds());
|
||||
}).map_err(|e| panic!(e))
|
||||
}
|
||||
|
||||
fn main() {
|
||||
|
||||
@@ -49,7 +49,7 @@ struct Subscriber {
|
||||
impl subscriber::FutureService for Subscriber {
|
||||
type ReceiveFut = futures::Finished<(), Never>;
|
||||
|
||||
fn receive(&self, message: String) -> Self::ReceiveFut {
|
||||
fn receive(&mut self, message: String) -> Self::ReceiveFut {
|
||||
println!("{} received message: {}", self.id, message);
|
||||
futures::finished(())
|
||||
}
|
||||
@@ -80,11 +80,11 @@ impl Publisher {
|
||||
impl publisher::FutureService for Publisher {
|
||||
type BroadcastFut = BoxFuture<(), Never>;
|
||||
|
||||
fn broadcast(&self, message: String) -> Self::BroadcastFut {
|
||||
fn broadcast(&mut self, message: String) -> Self::BroadcastFut {
|
||||
futures::collect(self.clients
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.values_mut()
|
||||
// Ignore failing subscribers.
|
||||
.map(move |client| client.receive(message.clone()).then(|_| Ok(())))
|
||||
.collect::<Vec<_>>())
|
||||
@@ -94,7 +94,7 @@ impl publisher::FutureService for Publisher {
|
||||
|
||||
type SubscribeFut = BoxFuture<(), Message>;
|
||||
|
||||
fn subscribe(&self, id: u32, address: SocketAddr) -> Self::SubscribeFut {
|
||||
fn subscribe(&mut self, id: u32, address: SocketAddr) -> Self::SubscribeFut {
|
||||
let clients = self.clients.clone();
|
||||
subscriber::FutureClient::connect(&address)
|
||||
.map(move |subscriber| {
|
||||
@@ -108,7 +108,7 @@ impl publisher::FutureService for Publisher {
|
||||
|
||||
type UnsubscribeFut = BoxFuture<(), Never>;
|
||||
|
||||
fn unsubscribe(&self, id: u32) -> Self::UnsubscribeFut {
|
||||
fn unsubscribe(&mut self, id: u32) -> Self::UnsubscribeFut {
|
||||
println!("Unsubscribing {}", id);
|
||||
self.clients.lock().unwrap().remove(&id).unwrap();
|
||||
futures::finished(()).boxed()
|
||||
@@ -122,7 +122,7 @@ fn main() {
|
||||
.wait()
|
||||
.unwrap();
|
||||
|
||||
let publisher_client = publisher::SyncClient::connect(publisher_addr).unwrap();
|
||||
let mut publisher_client = publisher::SyncClient::connect(publisher_addr).unwrap();
|
||||
|
||||
let subscriber1 = Subscriber::new(0);
|
||||
publisher_client.subscribe(0, subscriber1).unwrap();
|
||||
|
||||
@@ -1,55 +0,0 @@
|
||||
// Copyright 2016 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(conservative_impl_trait, plugin)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate env_logger;
|
||||
extern crate futures;
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
#[macro_use]
|
||||
extern crate tarpc;
|
||||
extern crate tokio_core;
|
||||
|
||||
use futures::Future;
|
||||
use tarpc::util::Never;
|
||||
use tarpc::future::Connect;
|
||||
|
||||
service! {
|
||||
rpc hello(name: String) -> String;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct HelloServer;
|
||||
|
||||
impl SyncService for HelloServer {
|
||||
fn hello(&self, name: String) -> Result<String, Never> {
|
||||
info!("Got request: {}", name);
|
||||
Ok(format!("Hello, {}!", name))
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let _ = env_logger::init();
|
||||
let mut core = tokio_core::reactor::Core::new().unwrap();
|
||||
let addr = HelloServer.listen("localhost:10000").unwrap();
|
||||
let f = FutureClient::connect(&addr)
|
||||
.map_err(tarpc::Error::from)
|
||||
.and_then(|client| {
|
||||
let resp1 = client.hello("Mom".to_string());
|
||||
info!("Sent first request.");
|
||||
/*
|
||||
let resp2 = client.hello("Dad".to_string());
|
||||
info!("Sent second request.");
|
||||
*/
|
||||
futures::collect(vec![resp1, /*resp2*/])
|
||||
}).map(|responses| {
|
||||
for resp in responses {
|
||||
println!("{}", resp);
|
||||
}
|
||||
});
|
||||
core.run(f).unwrap();
|
||||
}
|
||||
@@ -3,7 +3,7 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(conservative_impl_trait, plugin, proc_macro)]
|
||||
#![feature(conservative_impl_trait, plugin)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate futures;
|
||||
@@ -39,7 +39,7 @@ impl Error for NoNameGiven {
|
||||
struct HelloServer;
|
||||
|
||||
impl SyncService for HelloServer {
|
||||
fn hello(&self, name: String) -> Result<String, NoNameGiven> {
|
||||
fn hello(&mut self, name: String) -> Result<String, NoNameGiven> {
|
||||
if name == "" {
|
||||
Err(NoNameGiven)
|
||||
} else {
|
||||
@@ -3,7 +3,7 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(conservative_impl_trait, plugin, proc_macro)]
|
||||
#![feature(conservative_impl_trait, plugin)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate bincode;
|
||||
@@ -33,7 +33,7 @@ struct HelloServer;
|
||||
impl HelloServer {
|
||||
fn listen(addr: SocketAddr) -> impl Future<Item=SocketAddr, Error=io::Error> {
|
||||
let (tx, rx) = futures::oneshot();
|
||||
tarpc::REMOTE.spawn(move |handle| {
|
||||
tarpc::future::REMOTE.spawn(move |handle| {
|
||||
Ok(tx.complete(tarpc::listen_with(addr, move || Ok(HelloServer), handle.clone())))
|
||||
});
|
||||
rx.map_err(|e| panic!(e)).and_then(|result| result)
|
||||
@@ -46,7 +46,7 @@ impl Service for HelloServer {
|
||||
type Error = io::Error;
|
||||
type Future = Box<Future<Item = tarpc::Response<String, Never>, Error = io::Error>>;
|
||||
|
||||
fn call(&self, request: Self::Request) -> Self::Future {
|
||||
fn call(&mut self, request: Self::Request) -> Self::Future {
|
||||
Ok(Ok(format!("Hello, {}!", request.unwrap()))).into_future().boxed()
|
||||
}
|
||||
}
|
||||
@@ -57,10 +57,10 @@ pub struct FutureClient(tarpc::Client<String, String, Never>);
|
||||
|
||||
impl FutureClient {
|
||||
fn connect(addr: &SocketAddr) -> impl Future<Item = FutureClient, Error = io::Error> {
|
||||
tarpc::Client::connect_remotely(addr, &tarpc::REMOTE).map(FutureClient)
|
||||
tarpc::Client::connect_remotely(addr, &tarpc::future::REMOTE).map(FutureClient)
|
||||
}
|
||||
|
||||
pub fn hello(&self, name: String)
|
||||
pub fn hello(&mut self, name: String)
|
||||
-> impl Future<Item = String, Error = tarpc::Error<Never>> + 'static
|
||||
{
|
||||
self.0.call(name).then(|msg| msg.unwrap())
|
||||
@@ -73,7 +73,7 @@ fn main() {
|
||||
let addr = HelloServer::listen("localhost:10000".first_socket_addr()).wait().unwrap();
|
||||
let f = FutureClient::connect(&addr)
|
||||
.map_err(tarpc::Error::from)
|
||||
.and_then(|client| {
|
||||
.and_then(|mut client| {
|
||||
let resp1 = client.hello("Mom".to_string());
|
||||
info!("Sent first request.");
|
||||
|
||||
|
||||
44
examples/readme_futures.rs
Normal file
44
examples/readme_futures.rs
Normal file
@@ -0,0 +1,44 @@
|
||||
// Copyright 2016 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#![feature(conservative_impl_trait, plugin)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate futures;
|
||||
#[macro_use]
|
||||
extern crate tarpc;
|
||||
extern crate tokio_core;
|
||||
|
||||
use futures::Future;
|
||||
use tarpc::future::Connect;
|
||||
use tarpc::util::{FirstSocketAddr, Never};
|
||||
use tokio_core::reactor;
|
||||
|
||||
service! {
|
||||
rpc hello(name: String) -> String;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct HelloServer;
|
||||
|
||||
impl FutureService for HelloServer {
|
||||
type HelloFut = futures::Finished<String, Never>;
|
||||
|
||||
fn hello(&mut self, name: String) -> Self::HelloFut {
|
||||
futures::finished(format!("Hello, {}!", name))
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let addr = "localhost:10000".first_socket_addr();
|
||||
let mut core = reactor::Core::new().unwrap();
|
||||
HelloServer.listen_with(addr, core.handle()).unwrap();
|
||||
core.run(
|
||||
FutureClient::connect(&addr)
|
||||
.map_err(tarpc::Error::from)
|
||||
.and_then(|mut client| client.hello("Mom".to_string()))
|
||||
.map(|resp| println!("{}", resp))
|
||||
).unwrap();
|
||||
}
|
||||
@@ -3,6 +3,7 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
// required by `FutureClient` (not used directly in this example)
|
||||
#![feature(conservative_impl_trait, plugin)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
@@ -10,8 +11,7 @@ extern crate futures;
|
||||
#[macro_use]
|
||||
extern crate tarpc;
|
||||
|
||||
use futures::Future;
|
||||
use tarpc::util::{FirstSocketAddr, Never};
|
||||
use tarpc::util::Never;
|
||||
use tarpc::sync::Connect;
|
||||
|
||||
service! {
|
||||
@@ -21,16 +21,15 @@ service! {
|
||||
#[derive(Clone)]
|
||||
struct HelloServer;
|
||||
|
||||
impl FutureService for HelloServer {
|
||||
type HelloFut = futures::Finished<String, Never>;
|
||||
|
||||
fn hello(&self, name: String) -> Self::HelloFut {
|
||||
futures::finished(format!("Hello, {}!", name))
|
||||
impl SyncService for HelloServer {
|
||||
fn hello(&mut self, name: String) -> Result<String, Never> {
|
||||
Ok(format!("Hello, {}!", name))
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let addr = HelloServer.listen("localhost:10000".first_socket_addr()).wait().unwrap();
|
||||
let client = SyncClient::connect(addr).unwrap();
|
||||
let addr = "localhost:10000";
|
||||
HelloServer.listen(addr).unwrap();
|
||||
let mut client = SyncClient::connect(addr).unwrap();
|
||||
println!("{}", client.hello("Mom".to_string()).unwrap());
|
||||
}
|
||||
@@ -41,7 +41,7 @@ struct AddServer;
|
||||
impl AddFutureService for AddServer {
|
||||
type AddFut = futures::Finished<i32, Never>;
|
||||
|
||||
fn add(&self, x: i32, y: i32) -> Self::AddFut {
|
||||
fn add(&mut self, x: i32, y: i32) -> Self::AddFut {
|
||||
futures::finished(x + y)
|
||||
}
|
||||
}
|
||||
@@ -54,7 +54,7 @@ struct DoubleServer {
|
||||
impl DoubleServer {
|
||||
fn new(client: add::FutureClient) -> Self {
|
||||
DoubleServer {
|
||||
client: Arc::new(Mutex::new(client))
|
||||
client: Arc::new(Mutex::new(client)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -62,7 +62,7 @@ impl DoubleServer {
|
||||
impl DoubleFutureService for DoubleServer {
|
||||
type DoubleFut = BoxFuture<i32, Message>;
|
||||
|
||||
fn double(&self, x: i32) -> Self::DoubleFut {
|
||||
fn double(&mut self, x: i32) -> Self::DoubleFut {
|
||||
self.client
|
||||
.lock()
|
||||
.unwrap()
|
||||
|
||||
@@ -44,7 +44,7 @@ struct Server;
|
||||
impl FutureService for Server {
|
||||
type ReadFut = futures::Finished<Arc<Vec<u8>>, Never>;
|
||||
|
||||
fn read(&self) -> Self::ReadFut {
|
||||
fn read(&mut self) -> Self::ReadFut {
|
||||
futures::finished(BUF.clone())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ struct Bar;
|
||||
impl bar::FutureService for Bar {
|
||||
type BarFut = futures::Finished<i32, Never>;
|
||||
|
||||
fn bar(&self, i: i32) -> Self::BarFut {
|
||||
fn bar(&mut self, i: i32) -> Self::BarFut {
|
||||
futures::finished(i)
|
||||
}
|
||||
}
|
||||
@@ -47,7 +47,7 @@ struct Baz;
|
||||
impl baz::FutureService for Baz {
|
||||
type BazFut = futures::Finished<String, Never>;
|
||||
|
||||
fn baz(&self, s: String) -> Self::BazFut {
|
||||
fn baz(&mut self, s: String) -> Self::BazFut {
|
||||
futures::finished(format!("Hello, {}!", s))
|
||||
}
|
||||
}
|
||||
@@ -61,8 +61,8 @@ fn main() {
|
||||
let bar_addr = Bar.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
let baz_addr = Baz.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
|
||||
let bar_client = bar::SyncClient::connect(&bar_addr).unwrap();
|
||||
let baz_client = baz::SyncClient::connect(&baz_addr).unwrap();
|
||||
let mut bar_client = bar::SyncClient::connect(&bar_addr).unwrap();
|
||||
let mut baz_client = baz::SyncClient::connect(&baz_addr).unwrap();
|
||||
|
||||
info!("Result: {:?}", bar_client.bar(17));
|
||||
|
||||
|
||||
@@ -3,9 +3,10 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
use {WireError, framed};
|
||||
use WireError;
|
||||
use bincode::serde::DeserializeError;
|
||||
use futures::{self, Future};
|
||||
use protocol::Proto;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
@@ -18,7 +19,7 @@ type WireResponse<Resp, E> = Result<Result<Resp, WireError<E>>, DeserializeError
|
||||
type ResponseFuture<Req, Resp, E> = futures::Map<<BindClient<Req, Resp, E> as Service>::Future,
|
||||
fn(WireResponse<Resp, E>) -> Result<Resp, ::Error<E>>>;
|
||||
type BindClient<Req, Resp, E> =
|
||||
<framed::Proto<Req, Result<Resp, WireError<E>>> as ProtoBindClient<Multiplex, TcpStream>>::BindClient;
|
||||
<Proto<Req, Result<Resp, WireError<E>>> as ProtoBindClient<Multiplex, TcpStream>>::BindClient;
|
||||
|
||||
/// A client that impls `tokio_service::Service` that writes and reads bytes.
|
||||
///
|
||||
@@ -32,6 +33,18 @@ pub struct Client<Req, Resp, E>
|
||||
inner: BindClient<Req, Resp, E>,
|
||||
}
|
||||
|
||||
impl<Req, Resp, E> Clone for Client<Req, Resp, E>
|
||||
where Req: Serialize + 'static,
|
||||
Resp: Deserialize + 'static,
|
||||
E: Deserialize + 'static,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
Client {
|
||||
inner: self.inner.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp, E> Service for Client<Req, Resp, E>
|
||||
where Req: Serialize + Sync + Send + 'static,
|
||||
Resp: Deserialize + Sync + Send + 'static,
|
||||
@@ -42,7 +55,7 @@ impl<Req, Resp, E> Service for Client<Req, Resp, E>
|
||||
type Error = io::Error;
|
||||
type Future = ResponseFuture<Req, Resp, E>;
|
||||
|
||||
fn call(&self, request: Self::Request) -> Self::Future {
|
||||
fn call(&mut self, request: Self::Request) -> Self::Future {
|
||||
self.inner.call(request).map(Self::map_err)
|
||||
}
|
||||
}
|
||||
@@ -81,8 +94,9 @@ impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E>
|
||||
|
||||
/// Exposes a trait for connecting asynchronously to servers.
|
||||
pub mod future {
|
||||
use {REMOTE, framed};
|
||||
use future::REMOTE;
|
||||
use futures::{self, Async, Future};
|
||||
use protocol::Proto;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io;
|
||||
use std::marker::PhantomData;
|
||||
@@ -175,7 +189,7 @@ pub mod future {
|
||||
type Output = Client<Req, Resp, E>;
|
||||
|
||||
extern "rust-call" fn call_once(self, (tcp,): (TcpStream,)) -> Client<Req, Resp, E> {
|
||||
Client::new(framed::Proto::new().bind_client(self.0, tcp))
|
||||
Client::new(Proto::new().bind_client(self.0, tcp))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -193,7 +207,7 @@ pub mod future {
|
||||
remote.spawn(move |handle| {
|
||||
let handle2 = handle.clone();
|
||||
TcpStream::connect(&addr, handle)
|
||||
.map(move |tcp| Client::new(framed::Proto::new().bind_client(&handle2, tcp)))
|
||||
.map(move |tcp| Client::new(Proto::new().bind_client(&handle2, tcp)))
|
||||
.then(move |result| {
|
||||
tx.complete(result);
|
||||
Ok(())
|
||||
|
||||
38
src/lib.rs
38
src/lib.rs
@@ -45,7 +45,7 @@
|
||||
//! struct HelloServer;
|
||||
//!
|
||||
//! impl SyncService for HelloServer {
|
||||
//! fn hello(&self, name: String) -> Result<String, Never> {
|
||||
//! fn hello(&mut self, name: String) -> Result<String, Never> {
|
||||
//! Ok(format!("Hello, {}!", name))
|
||||
//! }
|
||||
//! }
|
||||
@@ -53,13 +53,13 @@
|
||||
//! fn main() {
|
||||
//! let addr = "localhost:10000";
|
||||
//! let _server = HelloServer.listen(addr);
|
||||
//! let client = SyncClient::connect(addr).unwrap();
|
||||
//! let mut client = SyncClient::connect(addr).unwrap();
|
||||
//! println!("{}", client.hello("Mom".to_string()).unwrap());
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
#![deny(missing_docs)]
|
||||
#![feature(plugin, conservative_impl_trait, never_type, proc_macro, unboxed_closures, fn_traits, specialization)]
|
||||
#![feature(plugin, conservative_impl_trait, never_type, unboxed_closures, fn_traits, specialization)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate byteorder;
|
||||
@@ -86,8 +86,6 @@ pub extern crate tokio_proto;
|
||||
#[doc(hidden)]
|
||||
pub extern crate tokio_service;
|
||||
|
||||
pub use client::{sync, future};
|
||||
|
||||
#[doc(hidden)]
|
||||
pub use client::Client;
|
||||
#[doc(hidden)]
|
||||
@@ -109,17 +107,27 @@ mod macros;
|
||||
mod client;
|
||||
/// Provides the base server boilerplate used by service implementations.
|
||||
mod server;
|
||||
/// Provides an implementation of `FramedIo` that implements the tarpc protocol.
|
||||
/// The tarpc protocol is defined by the `FramedIo` implementation.
|
||||
mod framed;
|
||||
/// Provides implementations of `ClientProto` and `ServerProto` that implement the tarpc protocol.
|
||||
/// The tarpc protocol is a length-delimited, bincode-serialized payload.
|
||||
mod protocol;
|
||||
/// Provides a few different error types.
|
||||
mod errors;
|
||||
|
||||
use tokio_core::reactor::Remote;
|
||||
|
||||
lazy_static! {
|
||||
/// The `Remote` for the default reactor core.
|
||||
pub static ref REMOTE: Remote = {
|
||||
util::spawn_core()
|
||||
};
|
||||
/// Utility specific to synchronous implementation.
|
||||
pub mod sync {
|
||||
pub use client::sync::*;
|
||||
}
|
||||
|
||||
/// Utility specific to futures implementation.
|
||||
pub mod future {
|
||||
pub use client::future::*;
|
||||
use tokio_core::reactor::Remote;
|
||||
use util;
|
||||
|
||||
lazy_static! {
|
||||
/// The `Remote` for the default reactor core.
|
||||
pub static ref REMOTE: Remote = {
|
||||
util::spawn_core()
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -386,7 +386,7 @@ macro_rules! service {
|
||||
}
|
||||
|
||||
$(#[$attr])*
|
||||
fn $fn_name(&self, $($arg:$in_),*) -> ty_snake_to_camel!(Self::$fn_name);
|
||||
fn $fn_name(&mut self, $($arg:$in_),*) -> ty_snake_to_camel!(Self::$fn_name);
|
||||
)*
|
||||
}
|
||||
|
||||
@@ -396,7 +396,7 @@ macro_rules! service {
|
||||
fn listen(self, addr: ::std::net::SocketAddr) -> $crate::ListenFuture
|
||||
{
|
||||
let (tx, rx) = $crate::futures::oneshot();
|
||||
$crate::REMOTE.spawn(move |handle|
|
||||
$crate::future::REMOTE.spawn(move |handle|
|
||||
Ok(tx.complete(Self::listen_with(self,
|
||||
addr,
|
||||
handle.clone()))));
|
||||
@@ -424,7 +424,6 @@ macro_rules! service {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
type __tarpc_service_Future =
|
||||
$crate::futures::Finished<$crate::Response<__tarpc_service_Response,
|
||||
@@ -477,7 +476,7 @@ macro_rules! service {
|
||||
type Error = ::std::io::Error;
|
||||
type Future = __tarpc_service_FutureReply<__tarpc_service_S>;
|
||||
|
||||
fn call(&self, __tarpc_service_request: Self::Request) -> Self::Future {
|
||||
fn call(&mut self, __tarpc_service_request: Self::Request) -> Self::Future {
|
||||
let __tarpc_service_request = match __tarpc_service_request {
|
||||
Ok(__tarpc_service_request) => __tarpc_service_request,
|
||||
Err(__tarpc_service_deserialize_err) => {
|
||||
@@ -510,7 +509,7 @@ macro_rules! service {
|
||||
}
|
||||
return __tarpc_service_FutureReply::$fn_name(
|
||||
$crate::futures::Future::then(
|
||||
FutureService::$fn_name(&self.0, $($arg),*),
|
||||
FutureService::$fn_name(&mut self.0, $($arg),*),
|
||||
__tarpc_service_wrap));
|
||||
}
|
||||
)*
|
||||
@@ -530,7 +529,7 @@ macro_rules! service {
|
||||
{
|
||||
$(
|
||||
$(#[$attr])*
|
||||
fn $fn_name(&self, $($arg:$in_),*) -> ::std::result::Result<$out, $error>;
|
||||
fn $fn_name(&mut self, $($arg:$in_),*) -> ::std::result::Result<$out, $error>;
|
||||
)*
|
||||
}
|
||||
|
||||
@@ -543,7 +542,7 @@ macro_rules! service {
|
||||
{
|
||||
let addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?;
|
||||
let (tx, rx) = $crate::futures::oneshot();
|
||||
$crate::REMOTE.spawn(move |handle| Ok(tx.complete(Self::listen_with(self, addr, handle.clone()))));
|
||||
$crate::future::REMOTE.spawn(move |handle| Ok(tx.complete(Self::listen_with(self, addr, handle.clone()))));
|
||||
$crate::futures::Future::wait($crate::ListenFuture::from_oneshot(rx))
|
||||
}
|
||||
|
||||
@@ -579,19 +578,19 @@ macro_rules! service {
|
||||
$crate::futures::Done<$out, $error>>,
|
||||
fn($crate::futures::Canceled) -> $error>>;
|
||||
}
|
||||
fn $fn_name(&self, $($arg:$in_),*) -> ty_snake_to_camel!(Self::$fn_name) {
|
||||
fn $fn_name(&mut self, $($arg:$in_),*) -> ty_snake_to_camel!(Self::$fn_name) {
|
||||
fn unimplemented(_: $crate::futures::Canceled) -> $error {
|
||||
// TODO(tikue): what do do if SyncService panics?
|
||||
unimplemented!()
|
||||
}
|
||||
let (__tarpc_service_complete, __tarpc_service_promise) =
|
||||
$crate::futures::oneshot();
|
||||
let __tarpc_service_service = self.clone();
|
||||
let mut __tarpc_service_service = self.clone();
|
||||
const UNIMPLEMENTED: fn($crate::futures::Canceled) -> $error =
|
||||
unimplemented;
|
||||
::std::thread::spawn(move || {
|
||||
let __tarpc_service_reply = SyncService::$fn_name(
|
||||
&__tarpc_service_service.service, $($arg),*);
|
||||
&mut __tarpc_service_service.service, $($arg),*);
|
||||
__tarpc_service_complete.complete(
|
||||
$crate::futures::IntoFuture::into_future(
|
||||
__tarpc_service_reply));
|
||||
@@ -610,9 +609,9 @@ macro_rules! service {
|
||||
impl<S> SyncServiceExt for S where S: SyncService {}
|
||||
|
||||
#[allow(unused)]
|
||||
#[derive(Debug)]
|
||||
#[derive(Clone, Debug)]
|
||||
/// The client stub that makes RPC calls to the server. Exposes a blocking interface.
|
||||
pub struct SyncClient(FutureClient);
|
||||
pub struct SyncClient(::std::sync::Arc<::std::sync::Mutex<FutureClient>>);
|
||||
|
||||
impl $crate::sync::Connect for SyncClient {
|
||||
fn connect<A>(addr: A) -> ::std::result::Result<Self, ::std::io::Error>
|
||||
@@ -620,7 +619,8 @@ macro_rules! service {
|
||||
{
|
||||
let addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?;
|
||||
let client = <FutureClient as $crate::future::Connect>::connect(&addr);
|
||||
let client = SyncClient($crate::futures::Future::wait(client)?);
|
||||
let client = $crate::futures::Future::wait(client)?;
|
||||
let client = SyncClient(::std::sync::Arc::new(::std::sync::Mutex::new(client)));
|
||||
::std::result::Result::Ok(client)
|
||||
}
|
||||
}
|
||||
@@ -632,7 +632,7 @@ macro_rules! service {
|
||||
pub fn $fn_name(&self, $($arg: $in_),*)
|
||||
-> ::std::result::Result<$out, $crate::Error<$error>>
|
||||
{
|
||||
let rpc = (self.0).$fn_name($($arg),*);
|
||||
let rpc = self.0.lock().unwrap().$fn_name($($arg),*);
|
||||
$crate::futures::Future::wait(rpc)
|
||||
}
|
||||
)*
|
||||
@@ -682,7 +682,7 @@ macro_rules! service {
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
#[derive(Debug)]
|
||||
#[derive(Clone, Debug)]
|
||||
/// The client stub that makes RPC calls to the server. Exposes a Future interface.
|
||||
pub struct FutureClient(__tarpc_service_Client);
|
||||
|
||||
@@ -719,13 +719,13 @@ macro_rules! service {
|
||||
$(
|
||||
#[allow(unused)]
|
||||
$(#[$attr])*
|
||||
pub fn $fn_name(&self, $($arg: $in_),*)
|
||||
pub fn $fn_name(&mut self, $($arg: $in_),*)
|
||||
-> impl $crate::futures::Future<Item=$out, Error=$crate::Error<$error>>
|
||||
+ 'static
|
||||
{
|
||||
let __tarpc_service_req = __tarpc_service_Request::$fn_name(($($arg,)*));
|
||||
let __tarpc_service_fut =
|
||||
$crate::tokio_service::Service::call(&self.0, __tarpc_service_req);
|
||||
$crate::tokio_service::Service::call(&mut self.0, __tarpc_service_req);
|
||||
$crate::futures::Future::then(__tarpc_service_fut,
|
||||
move |__tarpc_service_msg| {
|
||||
match __tarpc_service_msg? {
|
||||
@@ -822,10 +822,10 @@ mod functional_test {
|
||||
struct Server;
|
||||
|
||||
impl SyncService for Server {
|
||||
fn add(&self, x: i32, y: i32) -> Result<i32, Never> {
|
||||
fn add(&mut self, x: i32, y: i32) -> Result<i32, Never> {
|
||||
Ok(x + y)
|
||||
}
|
||||
fn hey(&self, name: String) -> Result<String, Never> {
|
||||
fn hey(&mut self, name: String) -> Result<String, Never> {
|
||||
Ok(format!("Hey, {}.", name))
|
||||
}
|
||||
}
|
||||
@@ -865,13 +865,13 @@ mod functional_test {
|
||||
impl FutureService for Server {
|
||||
type AddFut = Finished<i32, Never>;
|
||||
|
||||
fn add(&self, x: i32, y: i32) -> Self::AddFut {
|
||||
fn add(&mut self, x: i32, y: i32) -> Self::AddFut {
|
||||
finished(x + y)
|
||||
}
|
||||
|
||||
type HeyFut = Finished<String, Never>;
|
||||
|
||||
fn hey(&self, name: String) -> Self::HeyFut {
|
||||
fn hey(&mut self, name: String) -> Self::HeyFut {
|
||||
finished(format!("Hey, {}.", name))
|
||||
}
|
||||
}
|
||||
@@ -880,7 +880,7 @@ mod functional_test {
|
||||
fn simple() {
|
||||
let _ = env_logger::init();
|
||||
let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
let client = FutureClient::connect(&addr).wait().unwrap();
|
||||
let mut client = FutureClient::connect(&addr).wait().unwrap();
|
||||
assert_eq!(3, client.add(1, 2).wait().unwrap());
|
||||
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap());
|
||||
}
|
||||
@@ -889,7 +889,7 @@ mod functional_test {
|
||||
fn concurrent() {
|
||||
let _ = env_logger::init();
|
||||
let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
let client = FutureClient::connect(&addr).wait().unwrap();
|
||||
let mut client = FutureClient::connect(&addr).wait().unwrap();
|
||||
let req1 = client.add(1, 2);
|
||||
let req2 = client.add(3, 4);
|
||||
let req3 = client.hey("Tim".to_string());
|
||||
@@ -902,7 +902,7 @@ mod functional_test {
|
||||
fn other_service() {
|
||||
let _ = env_logger::init();
|
||||
let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
let client =
|
||||
let mut client =
|
||||
super::other_service::FutureClient::connect(&addr).wait().unwrap();
|
||||
match client.foo().wait().err().unwrap() {
|
||||
::Error::ServerDeserialize(_) => {} // good
|
||||
@@ -923,7 +923,7 @@ mod functional_test {
|
||||
impl error_service::FutureService for ErrorServer {
|
||||
type BarFut = ::futures::Failed<u32, ::util::Message>;
|
||||
|
||||
fn bar(&self) -> Self::BarFut {
|
||||
fn bar(&mut self) -> Self::BarFut {
|
||||
info!("Called bar");
|
||||
failed("lol jk".into())
|
||||
}
|
||||
@@ -938,7 +938,7 @@ mod functional_test {
|
||||
let _ = env_logger::init();
|
||||
|
||||
let addr = ErrorServer.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||
let client = FutureClient::connect(&addr).wait().unwrap();
|
||||
let mut client = FutureClient::connect(&addr).wait().unwrap();
|
||||
client.bar()
|
||||
.then(move |result| {
|
||||
match result.err().unwrap() {
|
||||
@@ -952,7 +952,7 @@ mod functional_test {
|
||||
.wait()
|
||||
.unwrap();
|
||||
|
||||
let client = SyncClient::connect(&addr).unwrap();
|
||||
let mut client = SyncClient::connect(&addr).unwrap();
|
||||
match client.bar().err().unwrap() {
|
||||
::Error::App(e) => {
|
||||
assert_eq!(e.description(), "lol jk");
|
||||
|
||||
@@ -10,18 +10,14 @@ use std::io::{self, Cursor};
|
||||
use std::marker::PhantomData;
|
||||
use std::mem;
|
||||
use tokio_core::io::{EasyBuf, Framed, Io};
|
||||
use tokio_proto::streaming::multiplex::{self, RequestId};
|
||||
use tokio_proto::streaming::multiplex::RequestId;
|
||||
use tokio_proto::multiplex::{ClientProto, ServerProto};
|
||||
use util::{Debugger, Never};
|
||||
use util::Debugger;
|
||||
|
||||
/// The type of message sent and received by the transport.
|
||||
pub type Frame<T> = multiplex::Frame<T, Never, io::Error>;
|
||||
|
||||
|
||||
// `Req` is the type that `Codec` parses. `Resp` is the type it serializes.
|
||||
pub struct Codec<Req, Resp> {
|
||||
// `Encode` is the type that `Codec` encodes. `Decode` is the type it decodes.
|
||||
pub struct Codec<Encode, Decode> {
|
||||
state: CodecState,
|
||||
_phantom_data: PhantomData<(Req, Resp)>,
|
||||
_phantom_data: PhantomData<(Encode, Decode)>,
|
||||
}
|
||||
|
||||
enum CodecState {
|
||||
@@ -30,7 +26,7 @@ enum CodecState {
|
||||
Payload { id: u64, len: u64 },
|
||||
}
|
||||
|
||||
impl<Req, Resp> Codec<Req, Resp> {
|
||||
impl<Encode, Decode> Codec<Encode, Decode> {
|
||||
fn new() -> Self {
|
||||
Codec {
|
||||
state: CodecState::Id,
|
||||
@@ -39,12 +35,12 @@ impl<Req, Resp> Codec<Req, Resp> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp> tokio_core::io::Codec for Codec<Req, Resp>
|
||||
where Req: serde::Deserialize,
|
||||
Resp: serde::Serialize,
|
||||
impl<Encode, Decode> tokio_core::io::Codec for Codec<Encode, Decode>
|
||||
where Encode: serde::Serialize,
|
||||
Decode: serde::Deserialize,
|
||||
{
|
||||
type Out = (RequestId, Resp);
|
||||
type In = (RequestId, Result<Req, bincode::DeserializeError>);
|
||||
type Out = (RequestId, Encode);
|
||||
type In = (RequestId, Result<Decode, bincode::DeserializeError>);
|
||||
|
||||
fn encode(&mut self, (id, message): Self::Out, buf: &mut Vec<u8>) -> io::Result<()> {
|
||||
buf.write_u64::<BigEndian>(id).unwrap();
|
||||
@@ -53,8 +49,7 @@ impl<Req, Resp> tokio_core::io::Codec for Codec<Req, Resp>
|
||||
bincode::serialize_into(buf,
|
||||
&message,
|
||||
SizeLimit::Infinite)
|
||||
// TODO(tikue): handle err
|
||||
.expect("In bincode::serialize_into");
|
||||
.map_err(|serialize_err| io::Error::new(io::ErrorKind::Other, serialize_err))?;
|
||||
trace!("Encoded buffer: {:?}", buf);
|
||||
Ok(())
|
||||
}
|
||||
@@ -110,24 +105,24 @@ impl<Req, Resp> tokio_core::io::Codec for Codec<Req, Resp>
|
||||
}
|
||||
|
||||
/// Implements the `multiplex::ServerProto` trait.
|
||||
pub struct Proto<Req, Resp>(PhantomData<(Req, Resp)>);
|
||||
pub struct Proto<Encode, Decode>(PhantomData<(Encode, Decode)>);
|
||||
|
||||
impl<Req, Resp> Proto<Req, Resp> {
|
||||
impl<Encode, Decode> Proto<Encode, Decode> {
|
||||
/// Returns a new `Proto`.
|
||||
pub fn new() -> Self {
|
||||
Proto(PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, Req, Resp> ServerProto<T> for Proto<Req, Resp>
|
||||
impl<T, Encode, Decode> ServerProto<T> for Proto<Encode, Decode>
|
||||
where T: Io + 'static,
|
||||
Req: serde::Deserialize + 'static,
|
||||
Resp: serde::Serialize + 'static,
|
||||
Encode: serde::Serialize + 'static,
|
||||
Decode: serde::Deserialize + 'static,
|
||||
{
|
||||
type Response = Resp;
|
||||
type Request = Result<Req, bincode::DeserializeError>;
|
||||
type Response = Encode;
|
||||
type Request = Result<Decode, bincode::DeserializeError>;
|
||||
type Error = io::Error;
|
||||
type Transport = Framed<T, Codec<Req, Resp>>;
|
||||
type Transport = Framed<T, Codec<Encode, Decode>>;
|
||||
type BindTransport = Result<Self::Transport, io::Error>;
|
||||
|
||||
fn bind_transport(&self, io: T) -> Self::BindTransport {
|
||||
@@ -135,15 +130,15 @@ impl<T, Req, Resp> ServerProto<T> for Proto<Req, Resp>
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, Req, Resp> ClientProto<T> for Proto<Req, Resp>
|
||||
impl<T, Encode, Decode> ClientProto<T> for Proto<Encode, Decode>
|
||||
where T: Io + 'static,
|
||||
Req: serde::Serialize + 'static,
|
||||
Resp: serde::Deserialize + 'static,
|
||||
Encode: serde::Serialize + 'static,
|
||||
Decode: serde::Deserialize + 'static,
|
||||
{
|
||||
type Response = Result<Resp, bincode::DeserializeError>;
|
||||
type Request = Req;
|
||||
type Response = Result<Decode, bincode::DeserializeError>;
|
||||
type Request = Encode;
|
||||
type Error = io::Error;
|
||||
type Transport = Framed<T, Codec<Resp, Req>>;
|
||||
type Transport = Framed<T, Codec<Encode, Decode>>;
|
||||
type BindTransport = Result<Self::Transport, io::Error>;
|
||||
|
||||
fn bind_transport(&self, io: T) -> Self::BindTransport {
|
||||
@@ -3,10 +3,11 @@
|
||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
// This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
use {REMOTE, net2};
|
||||
use net2;
|
||||
use bincode::serde::DeserializeError;
|
||||
use errors::WireError;
|
||||
use framed::Proto;
|
||||
use future::REMOTE;
|
||||
use protocol::Proto;
|
||||
use futures::{self, Async, Future, Stream};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io;
|
||||
|
||||
17
src/util.rs
17
src/util.rs
@@ -15,14 +15,17 @@ use tokio_core::reactor;
|
||||
/// A bottom type that impls `Error`, `Serialize`, and `Deserialize`. It is impossible to
|
||||
/// instantiate this type.
|
||||
#[allow(unreachable_code)]
|
||||
#[derive(Debug)]
|
||||
pub struct Never(!);
|
||||
|
||||
impl fmt::Debug for Never {
|
||||
fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result {
|
||||
unreachable!("Never cannot be instantiated");
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for Never {
|
||||
fn description(&self) -> &str {
|
||||
match self.0 {
|
||||
// TODO(tikue): remove when https://github.com/rust-lang/rust/issues/12609 lands
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -30,8 +33,6 @@ impl Error for Never {
|
||||
impl fmt::Display for Never {
|
||||
fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self.0 {
|
||||
// TODO(tikue): remove when https://github.com/rust-lang/rust/issues/12609 lands
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -42,8 +43,6 @@ impl Future for Never {
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
match self.0 {
|
||||
// TODO(tikue): remove when https://github.com/rust-lang/rust/issues/12609 lands
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -54,8 +53,6 @@ impl Stream for Never {
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
match self.0 {
|
||||
// TODO(tikue): remove when https://github.com/rust-lang/rust/issues/12609 lands
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -65,8 +62,6 @@ impl Serialize for Never {
|
||||
where S: Serializer
|
||||
{
|
||||
match self.0 {
|
||||
// TODO(tikue): remove when https://github.com/rust-lang/rust/issues/12609 lands
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user