mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-02-23 15:49:54 +01:00
Merge pull request #61 from tikue/proto-changes
Track the changes to tokio-proto/master
This commit is contained in:
@@ -25,15 +25,17 @@ take = "0.1"
|
|||||||
tokio-service = { git = "https://github.com/tokio-rs/tokio-service" }
|
tokio-service = { git = "https://github.com/tokio-rs/tokio-service" }
|
||||||
tokio-proto = { git = "https://github.com/tokio-rs/tokio-proto" }
|
tokio-proto = { git = "https://github.com/tokio-rs/tokio-proto" }
|
||||||
tokio-core = { git = "https://github.com/tokio-rs/tokio-core" }
|
tokio-core = { git = "https://github.com/tokio-rs/tokio-core" }
|
||||||
|
net2 = "0.2"
|
||||||
|
|
||||||
[replace]
|
[replace]
|
||||||
"tokio-core:0.1.0" = { git = "https://github.com/tokio-rs/tokio-core" }
|
"tokio-core:0.1.1" = { git = "https://github.com/tokio-rs/tokio-core" }
|
||||||
"futures:0.1.3" = { git = "https://github.com/alexcrichton/futures-rs" }
|
"futures:0.1.6" = { git = "https://github.com/alexcrichton/futures-rs" }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
chrono = "0.2"
|
chrono = "0.2"
|
||||||
env_logger = "0.3"
|
env_logger = "0.3"
|
||||||
futures-cpupool = "0.1"
|
futures-cpupool = "0.1"
|
||||||
|
clap = "2.0"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
unstable = ["serde/unstable"]
|
unstable = ["serde/unstable"]
|
||||||
|
|||||||
@@ -7,6 +7,7 @@
|
|||||||
#![plugin(tarpc_plugins)]
|
#![plugin(tarpc_plugins)]
|
||||||
|
|
||||||
extern crate chrono;
|
extern crate chrono;
|
||||||
|
extern crate clap;
|
||||||
extern crate env_logger;
|
extern crate env_logger;
|
||||||
extern crate futures;
|
extern crate futures;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
@@ -16,8 +17,11 @@ extern crate tarpc;
|
|||||||
extern crate tokio_core;
|
extern crate tokio_core;
|
||||||
extern crate futures_cpupool;
|
extern crate futures_cpupool;
|
||||||
|
|
||||||
|
use clap::{Arg, App};
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
use futures_cpupool::{CpuFuture, CpuPool};
|
use futures_cpupool::{CpuFuture, CpuPool};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
use std::time::{Duration, Instant, SystemTime};
|
use std::time::{Duration, Instant, SystemTime};
|
||||||
use tarpc::future::{Connect};
|
use tarpc::future::{Connect};
|
||||||
use tarpc::util::{FirstSocketAddr, Never, spawn_core};
|
use tarpc::util::{FirstSocketAddr, Never, spawn_core};
|
||||||
@@ -28,11 +32,17 @@ service! {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct Server(CpuPool);
|
struct Server {
|
||||||
|
pool: CpuPool,
|
||||||
|
request_count: Arc<AtomicUsize>,
|
||||||
|
}
|
||||||
|
|
||||||
impl Server {
|
impl Server {
|
||||||
fn new() -> Self {
|
fn new() -> Self {
|
||||||
Server(CpuPool::new_num_cpus())
|
Server {
|
||||||
|
pool: CpuPool::new_num_cpus(),
|
||||||
|
request_count: Arc::new(AtomicUsize::new(1)),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -40,52 +50,21 @@ impl FutureService for Server {
|
|||||||
type ReadFut = CpuFuture<Vec<u8>, Never>;
|
type ReadFut = CpuFuture<Vec<u8>, Never>;
|
||||||
|
|
||||||
fn read(&self, size: u32) -> Self::ReadFut {
|
fn read(&self, size: u32) -> Self::ReadFut {
|
||||||
self.0
|
let request_number = self.request_count.fetch_add(1, Ordering::SeqCst);
|
||||||
|
debug!("Server received read({}) no. {}", size, request_number);
|
||||||
|
self.pool
|
||||||
.spawn(futures::lazy(move || {
|
.spawn(futures::lazy(move || {
|
||||||
let mut vec: Vec<u8> = Vec::with_capacity(size as usize);
|
let mut vec: Vec<u8> = Vec::with_capacity(size as usize);
|
||||||
for i in 0..size {
|
for i in 0..size {
|
||||||
vec.push((i % 1 << 8) as u8);
|
vec.push((i % 1 << 8) as u8);
|
||||||
}
|
}
|
||||||
futures::finished::<_, Never>(vec)
|
debug!("Server sending response no. {}", request_number);
|
||||||
|
futures::finished(vec)
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_once<'a>(clients: &'a [FutureClient], concurrency: u32, print: bool)
|
const CHUNK_SIZE: u32 = 1 << 10;
|
||||||
-> Box<Future<Item=(), Error=()> + 'a>
|
|
||||||
{
|
|
||||||
let start = Instant::now();
|
|
||||||
let futs = clients.iter()
|
|
||||||
.cycle()
|
|
||||||
.take(concurrency as usize)
|
|
||||||
.map(|client| {
|
|
||||||
let start = SystemTime::now();
|
|
||||||
let future = client.read(CHUNK_SIZE).map(move |_| start.elapsed().unwrap());
|
|
||||||
future
|
|
||||||
})
|
|
||||||
// Need an intermediate collection to kick off each future,
|
|
||||||
// because futures::collect will iterate sequentially.
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
let futs = futures::collect(futs);
|
|
||||||
|
|
||||||
Box::new(futs.map(move |latencies| {
|
|
||||||
let total_time = start.elapsed();
|
|
||||||
|
|
||||||
let sum_latencies = latencies.iter().fold(Duration::new(0, 0), |sum, &dur| sum + dur);
|
|
||||||
let mean = sum_latencies / latencies.len() as u32;
|
|
||||||
let min_latency = *latencies.iter().min().unwrap();
|
|
||||||
let max_latency = *latencies.iter().max().unwrap();
|
|
||||||
|
|
||||||
if print {
|
|
||||||
println!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs",
|
|
||||||
latencies.len(),
|
|
||||||
mean.microseconds(),
|
|
||||||
min_latency.microseconds(),
|
|
||||||
max_latency.microseconds(),
|
|
||||||
total_time.microseconds());
|
|
||||||
}
|
|
||||||
}).map_err(|e| panic!(e)))
|
|
||||||
}
|
|
||||||
|
|
||||||
trait Microseconds {
|
trait Microseconds {
|
||||||
fn microseconds(&self) -> i64;
|
fn microseconds(&self) -> i64;
|
||||||
@@ -100,33 +79,93 @@ impl Microseconds for Duration {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const CHUNK_SIZE: u32 = 1 << 10;
|
fn run_once(clients: Vec<FutureClient>, concurrency: u32) -> impl Future<Item=(), Error=()> {
|
||||||
const MAX_CONCURRENCY: u32 = 100;
|
let start = Instant::now();
|
||||||
|
let futs = clients.iter()
|
||||||
|
.enumerate()
|
||||||
|
.cycle()
|
||||||
|
.enumerate()
|
||||||
|
.take(concurrency as usize)
|
||||||
|
.map(|(iteration, (client_id, client))| {
|
||||||
|
let iteration = iteration + 1;
|
||||||
|
let start = SystemTime::now();
|
||||||
|
debug!("Client {} reading (iteration {})...", client_id, iteration);
|
||||||
|
let future = client.read(CHUNK_SIZE).map(move |_| {
|
||||||
|
let elapsed = start.elapsed().unwrap();
|
||||||
|
debug!("Client {} received reply (iteration {}).", client_id, iteration);
|
||||||
|
elapsed
|
||||||
|
});
|
||||||
|
future
|
||||||
|
})
|
||||||
|
// Need an intermediate collection to kick off each future,
|
||||||
|
// because futures::collect will iterate sequentially.
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let futs = futures::collect(futs);
|
||||||
|
|
||||||
|
futs.map(move |latencies| {
|
||||||
|
let total_time = start.elapsed();
|
||||||
|
|
||||||
|
let sum_latencies = latencies.iter().fold(Duration::new(0, 0), |sum, &dur| sum + dur);
|
||||||
|
let mean = sum_latencies / latencies.len() as u32;
|
||||||
|
let min_latency = *latencies.iter().min().unwrap();
|
||||||
|
let max_latency = *latencies.iter().max().unwrap();
|
||||||
|
|
||||||
|
info!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs",
|
||||||
|
latencies.len(),
|
||||||
|
mean.microseconds(),
|
||||||
|
min_latency.microseconds(),
|
||||||
|
max_latency.microseconds(),
|
||||||
|
total_time.microseconds());
|
||||||
|
}).map_err(|e| panic!(e))
|
||||||
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let _ = env_logger::init();
|
let _ = env_logger::init();
|
||||||
|
let matches = App::new("Tarpc Concurrency")
|
||||||
|
.about("Demonstrates making concurrent requests to a tarpc service.")
|
||||||
|
.arg(Arg::with_name("concurrency")
|
||||||
|
.short("c")
|
||||||
|
.long("concurrency")
|
||||||
|
.value_name("LEVEL")
|
||||||
|
.help("Sets a custom concurrency level")
|
||||||
|
.takes_value(true))
|
||||||
|
.arg(Arg::with_name("clients")
|
||||||
|
.short("n")
|
||||||
|
.long("num_clients")
|
||||||
|
.value_name("AMOUNT")
|
||||||
|
.help("How many clients to distribute requests between")
|
||||||
|
.takes_value(true))
|
||||||
|
.get_matches();
|
||||||
|
let concurrency = matches.value_of("concurrency")
|
||||||
|
.map(&str::parse)
|
||||||
|
.map(Result::unwrap)
|
||||||
|
.unwrap_or(10);
|
||||||
|
let num_clients = matches.value_of("clients")
|
||||||
|
.map(&str::parse)
|
||||||
|
.map(Result::unwrap)
|
||||||
|
.unwrap_or(4);
|
||||||
|
|
||||||
let server = Server::new().listen("localhost:0".first_socket_addr()).wait().unwrap();
|
let addr = Server::new().listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||||
println!("Server listening on {}.", server.local_addr());
|
info!("Server listening on {}.", addr);
|
||||||
|
|
||||||
// The driver of the main future.
|
let clients = (0..num_clients)
|
||||||
let mut core = reactor::Core::new().unwrap();
|
|
||||||
|
|
||||||
let clients = (1...5)
|
|
||||||
// Spin up a couple threads to drive the clients.
|
// Spin up a couple threads to drive the clients.
|
||||||
.map(|i| (i, spawn_core()))
|
.map(|i| (i, spawn_core()))
|
||||||
.map(|(i, remote)| {
|
.map(|(i, remote)| {
|
||||||
println!("Client {} connecting...", i);
|
info!("Client {} connecting...", i);
|
||||||
FutureClient::connect_remotely(server.local_addr(), &remote)
|
FutureClient::connect_remotely(&addr, &remote)
|
||||||
.map_err(|e| panic!(e))
|
.map_err(|e| panic!(e))
|
||||||
})
|
})
|
||||||
// Need an intermediate collection to connect the clients in parallel,
|
// Need an intermediate collection to connect the clients in parallel,
|
||||||
// because `futures::collect` iterates sequentially.
|
// because `futures::collect` iterates sequentially.
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
let clients = core.run(futures::collect(clients)).unwrap();
|
let run = futures::collect(clients).and_then(|clients| run_once(clients, concurrency));
|
||||||
println!("Starting...");
|
|
||||||
let runs = ::std::iter::once(run_once(&clients, MAX_CONCURRENCY, false))
|
info!("Starting...");
|
||||||
.chain((1...MAX_CONCURRENCY).map(|concurrency| run_once(&clients, concurrency, true)));
|
|
||||||
core.run(futures::collect(runs)).unwrap();
|
// The driver of the main future.
|
||||||
|
let mut core = reactor::Core::new().unwrap();
|
||||||
|
|
||||||
|
core.run(run).unwrap();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,13 +56,13 @@ impl subscriber::FutureService for Subscriber {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Subscriber {
|
impl Subscriber {
|
||||||
fn new(id: u32) -> tokio::server::ServerHandle {
|
fn new(id: u32) -> SocketAddr {
|
||||||
Subscriber {
|
Subscriber {
|
||||||
id: id,
|
id: id,
|
||||||
}
|
}
|
||||||
.listen("localhost:0".first_socket_addr())
|
.listen("localhost:0".first_socket_addr())
|
||||||
.wait()
|
.wait()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -117,19 +117,18 @@ impl publisher::FutureService for Publisher {
|
|||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let _ = env_logger::init();
|
let _ = env_logger::init();
|
||||||
let publisher_server = Publisher::new()
|
let publisher_addr = Publisher::new()
|
||||||
.listen("localhost:0".first_socket_addr())
|
.listen("localhost:0".first_socket_addr())
|
||||||
.wait()
|
.wait()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let publisher_addr = publisher_server.local_addr();
|
|
||||||
let publisher_client = publisher::SyncClient::connect(publisher_addr).unwrap();
|
let publisher_client = publisher::SyncClient::connect(publisher_addr).unwrap();
|
||||||
|
|
||||||
let subscriber1 = Subscriber::new(0);
|
let subscriber1 = Subscriber::new(0);
|
||||||
publisher_client.subscribe(0, *subscriber1.local_addr()).unwrap();
|
publisher_client.subscribe(0, subscriber1).unwrap();
|
||||||
|
|
||||||
let subscriber2 = Subscriber::new(1);
|
let subscriber2 = Subscriber::new(1);
|
||||||
publisher_client.subscribe(1, *subscriber2.local_addr()).unwrap();
|
publisher_client.subscribe(1, subscriber2).unwrap();
|
||||||
|
|
||||||
|
|
||||||
println!("Broadcasting...");
|
println!("Broadcasting...");
|
||||||
|
|||||||
@@ -6,12 +6,17 @@
|
|||||||
#![feature(conservative_impl_trait, plugin)]
|
#![feature(conservative_impl_trait, plugin)]
|
||||||
#![plugin(tarpc_plugins)]
|
#![plugin(tarpc_plugins)]
|
||||||
|
|
||||||
|
extern crate env_logger;
|
||||||
extern crate futures;
|
extern crate futures;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
|
extern crate log;
|
||||||
|
#[macro_use]
|
||||||
extern crate tarpc;
|
extern crate tarpc;
|
||||||
|
extern crate tokio_core;
|
||||||
|
|
||||||
|
use futures::Future;
|
||||||
use tarpc::util::Never;
|
use tarpc::util::Never;
|
||||||
use tarpc::sync::Connect;
|
use tarpc::future::Connect;
|
||||||
|
|
||||||
service! {
|
service! {
|
||||||
rpc hello(name: String) -> String;
|
rpc hello(name: String) -> String;
|
||||||
@@ -22,13 +27,29 @@ struct HelloServer;
|
|||||||
|
|
||||||
impl SyncService for HelloServer {
|
impl SyncService for HelloServer {
|
||||||
fn hello(&self, name: String) -> Result<String, Never> {
|
fn hello(&self, name: String) -> Result<String, Never> {
|
||||||
|
info!("Got request: {}", name);
|
||||||
Ok(format!("Hello, {}!", name))
|
Ok(format!("Hello, {}!", name))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let addr = "localhost:10000";
|
let _ = env_logger::init();
|
||||||
HelloServer.listen(addr).unwrap();
|
let mut core = tokio_core::reactor::Core::new().unwrap();
|
||||||
let client = SyncClient::connect(addr).unwrap();
|
let addr = HelloServer.listen("localhost:10000").unwrap();
|
||||||
println!("{}", client.hello("Mom".to_string()).unwrap());
|
let f = FutureClient::connect(&addr)
|
||||||
|
.map_err(tarpc::Error::from)
|
||||||
|
.and_then(|client| {
|
||||||
|
let resp1 = client.hello("Mom".to_string());
|
||||||
|
info!("Sent first request.");
|
||||||
|
/*
|
||||||
|
let resp2 = client.hello("Dad".to_string());
|
||||||
|
info!("Sent second request.");
|
||||||
|
*/
|
||||||
|
futures::collect(vec![resp1, /*resp2*/])
|
||||||
|
}).map(|responses| {
|
||||||
|
for resp in responses {
|
||||||
|
println!("{}", resp);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
core.run(f).unwrap();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -49,8 +49,7 @@ impl SyncService for HelloServer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let addr = "localhost:10000";
|
let addr = HelloServer.listen("localhost:10000").unwrap();
|
||||||
HelloServer.listen(addr).unwrap();
|
|
||||||
let client = SyncClient::connect(addr).unwrap();
|
let client = SyncClient::connect(addr).unwrap();
|
||||||
println!("{}", client.hello("Mom".to_string()).unwrap());
|
println!("{}", client.hello("Mom".to_string()).unwrap());
|
||||||
println!("{}", client.hello("".to_string()).unwrap_err());
|
println!("{}", client.hello("".to_string()).unwrap_err());
|
||||||
|
|||||||
91
examples/readme_expanded.rs
Normal file
91
examples/readme_expanded.rs
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
// Copyright 2016 Google Inc. All Rights Reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||||
|
// This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
|
#![feature(conservative_impl_trait, plugin, proc_macro)]
|
||||||
|
#![plugin(tarpc_plugins)]
|
||||||
|
|
||||||
|
extern crate bincode;
|
||||||
|
extern crate env_logger;
|
||||||
|
extern crate futures;
|
||||||
|
#[macro_use]
|
||||||
|
extern crate log;
|
||||||
|
#[macro_use]
|
||||||
|
extern crate serde_derive;
|
||||||
|
#[macro_use]
|
||||||
|
extern crate tarpc;
|
||||||
|
extern crate tokio_core;
|
||||||
|
extern crate tokio_service;
|
||||||
|
|
||||||
|
use bincode::serde::DeserializeError;
|
||||||
|
use futures::{Future, IntoFuture};
|
||||||
|
use std::io;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use tarpc::future::Connect;
|
||||||
|
use tarpc::util::FirstSocketAddr;
|
||||||
|
use tarpc::util::Never;
|
||||||
|
use tokio_service::Service;
|
||||||
|
|
||||||
|
#[derive(Clone, Copy)]
|
||||||
|
struct HelloServer;
|
||||||
|
|
||||||
|
impl HelloServer {
|
||||||
|
fn listen(addr: SocketAddr) -> impl Future<Item=SocketAddr, Error=io::Error> {
|
||||||
|
let (tx, rx) = futures::oneshot();
|
||||||
|
tarpc::REMOTE.spawn(move |handle| {
|
||||||
|
Ok(tx.complete(tarpc::listen_with(addr, move || Ok(HelloServer), handle.clone())))
|
||||||
|
});
|
||||||
|
rx.map_err(|e| panic!(e)).and_then(|result| result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Service for HelloServer {
|
||||||
|
type Request = Result<String, DeserializeError>;
|
||||||
|
type Response = tarpc::Response<String, Never>;
|
||||||
|
type Error = io::Error;
|
||||||
|
type Future = Box<Future<Item = tarpc::Response<String, Never>, Error = io::Error>>;
|
||||||
|
|
||||||
|
fn call(&self, request: Self::Request) -> Self::Future {
|
||||||
|
Ok(Ok(format!("Hello, {}!", request.unwrap()))).into_future().boxed()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The client stub that makes RPC calls to the server. Exposes a Future interface.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct FutureClient(tarpc::Client<String, String, Never>);
|
||||||
|
|
||||||
|
impl FutureClient {
|
||||||
|
fn connect(addr: &SocketAddr) -> impl Future<Item = FutureClient, Error = io::Error> {
|
||||||
|
tarpc::Client::connect_remotely(addr, &tarpc::REMOTE).map(FutureClient)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn hello(&self, name: String)
|
||||||
|
-> impl Future<Item = String, Error = tarpc::Error<Never>> + 'static
|
||||||
|
{
|
||||||
|
self.0.call(name).then(|msg| msg.unwrap())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
let _ = env_logger::init();
|
||||||
|
let mut core = tokio_core::reactor::Core::new().unwrap();
|
||||||
|
let addr = HelloServer::listen("localhost:10000".first_socket_addr()).wait().unwrap();
|
||||||
|
let f = FutureClient::connect(&addr)
|
||||||
|
.map_err(tarpc::Error::from)
|
||||||
|
.and_then(|client| {
|
||||||
|
let resp1 = client.hello("Mom".to_string());
|
||||||
|
info!("Sent first request.");
|
||||||
|
|
||||||
|
let resp2 = client.hello("Dad".to_string());
|
||||||
|
info!("Sent second request.");
|
||||||
|
|
||||||
|
futures::collect(vec![resp1, resp2])
|
||||||
|
})
|
||||||
|
.map(|responses| {
|
||||||
|
for resp in responses {
|
||||||
|
println!("{}", resp);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
core.run(f).unwrap();
|
||||||
|
}
|
||||||
@@ -10,6 +10,7 @@ extern crate futures;
|
|||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate tarpc;
|
extern crate tarpc;
|
||||||
|
|
||||||
|
use futures::Future;
|
||||||
use tarpc::util::{FirstSocketAddr, Never};
|
use tarpc::util::{FirstSocketAddr, Never};
|
||||||
use tarpc::sync::Connect;
|
use tarpc::sync::Connect;
|
||||||
|
|
||||||
@@ -29,8 +30,7 @@ impl FutureService for HelloServer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let addr = "localhost:10000";
|
let addr = HelloServer.listen("localhost:10000".first_socket_addr()).wait().unwrap();
|
||||||
let _server = HelloServer.listen(addr.first_socket_addr());
|
|
||||||
let client = SyncClient::connect(addr).unwrap();
|
let client = SyncClient::connect(addr).unwrap();
|
||||||
println!("{}", client.hello("Mom".to_string()).unwrap());
|
println!("{}", client.hello("Mom".to_string()).unwrap());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -74,12 +74,13 @@ impl DoubleFutureService for DoubleServer {
|
|||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let _ = env_logger::init();
|
let _ = env_logger::init();
|
||||||
let add = AddServer.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
let add_addr = AddServer.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||||
let add_client = add::FutureClient::connect(add.local_addr()).wait().unwrap();
|
let add_client = add::FutureClient::connect(&add_addr).wait().unwrap();
|
||||||
let double = DoubleServer::new(add_client);
|
|
||||||
let double = double.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
|
||||||
|
|
||||||
let double_client = double::SyncClient::connect(double.local_addr()).unwrap();
|
let double = DoubleServer::new(add_client);
|
||||||
|
let double_addr = double.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||||
|
|
||||||
|
let double_client = double::SyncClient::connect(&double_addr).unwrap();
|
||||||
for i in 0..5 {
|
for i in 0..5 {
|
||||||
println!("{:?}", double_client.double(i).unwrap());
|
println!("{:?}", double_client.double(i).unwrap());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -52,8 +52,8 @@ impl FutureService for Server {
|
|||||||
const CHUNK_SIZE: u32 = 1 << 19;
|
const CHUNK_SIZE: u32 = 1 << 19;
|
||||||
|
|
||||||
fn bench_tarpc(target: u64) {
|
fn bench_tarpc(target: u64) {
|
||||||
let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||||
let client = SyncClient::connect(handle.local_addr()).unwrap();
|
let client = SyncClient::connect(&addr).unwrap();
|
||||||
let start = time::Instant::now();
|
let start = time::Instant::now();
|
||||||
let mut nread = 0;
|
let mut nread = 0;
|
||||||
while nread < target {
|
while nread < target {
|
||||||
|
|||||||
@@ -58,10 +58,11 @@ macro_rules! pos {
|
|||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let _ = env_logger::init();
|
let _ = env_logger::init();
|
||||||
let bar = Bar.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
let bar_addr = Bar.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||||
let baz = Baz.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
let baz_addr = Baz.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||||
let bar_client = bar::SyncClient::connect(bar.local_addr()).unwrap();
|
|
||||||
let baz_client = baz::SyncClient::connect(baz.local_addr()).unwrap();
|
let bar_client = bar::SyncClient::connect(&bar_addr).unwrap();
|
||||||
|
let baz_client = baz::SyncClient::connect(&baz_addr).unwrap();
|
||||||
|
|
||||||
info!("Result: {:?}", bar_client.bar(17));
|
info!("Result: {:?}", bar_client.bar(17));
|
||||||
|
|
||||||
|
|||||||
104
src/client.rs
104
src/client.rs
@@ -3,57 +3,62 @@
|
|||||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||||
// This file may not be copied, modified, or distributed except according to those terms.
|
// This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
use WireError;
|
use {WireError, framed};
|
||||||
use bincode::serde::DeserializeError;
|
use bincode::serde::DeserializeError;
|
||||||
use framed::Framed;
|
use futures::{self, Future};
|
||||||
use futures::{self, Async, Future};
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::io;
|
use std::io;
|
||||||
use tokio_core::net::TcpStream;
|
use tokio_core::net::TcpStream;
|
||||||
use tokio_core::reactor;
|
use tokio_proto::BindClient as ProtoBindClient;
|
||||||
use tokio_proto::easy::{EasyClient, EasyResponse, multiplex};
|
use tokio_proto::multiplex::Multiplex;
|
||||||
use tokio_service::Service;
|
use tokio_service::Service;
|
||||||
|
|
||||||
|
type WireResponse<Resp, E> = Result<Result<Resp, WireError<E>>, DeserializeError>;
|
||||||
|
type ResponseFuture<Req, Resp, E> = futures::Map<<BindClient<Req, Resp, E> as Service>::Future,
|
||||||
|
fn(WireResponse<Resp, E>) -> Result<Resp, ::Error<E>>>;
|
||||||
|
type BindClient<Req, Resp, E> =
|
||||||
|
<framed::Proto<Req, Result<Resp, WireError<E>>> as ProtoBindClient<Multiplex, TcpStream>>::BindClient;
|
||||||
|
|
||||||
/// A client that impls `tokio_service::Service` that writes and reads bytes.
|
/// A client that impls `tokio_service::Service` that writes and reads bytes.
|
||||||
///
|
///
|
||||||
/// Typically, this would be combined with a serialization pre-processing step
|
/// Typically, this would be combined with a serialization pre-processing step
|
||||||
/// and a deserialization post-processing step.
|
/// and a deserialization post-processing step.
|
||||||
pub struct Client<Req, Resp, E> {
|
pub struct Client<Req, Resp, E>
|
||||||
inner: EasyClient<Req, WireResponse<Resp, E>>,
|
where Req: Serialize + 'static,
|
||||||
|
Resp: Deserialize + 'static,
|
||||||
|
E: Deserialize + 'static,
|
||||||
|
{
|
||||||
|
inner: BindClient<Req, Resp, E>,
|
||||||
}
|
}
|
||||||
|
|
||||||
type WireResponse<Resp, E> = Result<Result<Resp, WireError<E>>, DeserializeError>;
|
|
||||||
type ResponseFuture<Resp, E> = futures::Map<EasyResponse<WireResponse<Resp, E>>,
|
|
||||||
fn(WireResponse<Resp, E>) -> Result<Resp, ::Error<E>>>;
|
|
||||||
|
|
||||||
impl<Req, Resp, E> Service for Client<Req, Resp, E>
|
impl<Req, Resp, E> Service for Client<Req, Resp, E>
|
||||||
where Req: Send + 'static,
|
where Req: Serialize + Sync + Send + 'static,
|
||||||
Resp: Send + 'static,
|
Resp: Deserialize + Sync + Send + 'static,
|
||||||
E: Send + 'static
|
E: Deserialize + Sync + Send + 'static
|
||||||
{
|
{
|
||||||
type Request = Req;
|
type Request = Req;
|
||||||
type Response = Result<Resp, ::Error<E>>;
|
type Response = Result<Resp, ::Error<E>>;
|
||||||
type Error = io::Error;
|
type Error = io::Error;
|
||||||
type Future = ResponseFuture<Resp, E>;
|
type Future = ResponseFuture<Req, Resp, E>;
|
||||||
|
|
||||||
fn poll_ready(&self) -> Async<()> {
|
|
||||||
self.inner.poll_ready()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn call(&self, request: Self::Request) -> Self::Future {
|
fn call(&self, request: Self::Request) -> Self::Future {
|
||||||
self.inner.call(request).map(Self::map_err)
|
self.inner.call(request).map(Self::map_err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Req, Resp, E> Client<Req, Resp, E> {
|
impl<Req, Resp, E> Client<Req, Resp, E>
|
||||||
fn new(tcp: TcpStream, handle: &reactor::Handle) -> Self
|
where Req: Serialize + 'static,
|
||||||
where Req: Serialize + Send + 'static,
|
Resp: Deserialize + 'static,
|
||||||
Resp: Deserialize + Send + 'static,
|
E: Deserialize + 'static,
|
||||||
E: Deserialize + Send + 'static
|
{
|
||||||
|
fn new(inner: BindClient<Req, Resp, E>) -> Self
|
||||||
|
where Req: Serialize + Sync + Send + 'static,
|
||||||
|
Resp: Deserialize + Sync + Send + 'static,
|
||||||
|
E: Deserialize + Sync + Send + 'static
|
||||||
{
|
{
|
||||||
Client {
|
Client {
|
||||||
inner: multiplex::connect(Framed::new(tcp), handle),
|
inner: inner,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -64,7 +69,11 @@ impl<Req, Resp, E> Client<Req, Resp, E> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E> {
|
impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E>
|
||||||
|
where Req: Serialize + 'static,
|
||||||
|
Resp: Deserialize + 'static,
|
||||||
|
E: Deserialize + 'static,
|
||||||
|
{
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||||
write!(f, "Client {{ .. }}")
|
write!(f, "Client {{ .. }}")
|
||||||
}
|
}
|
||||||
@@ -72,7 +81,7 @@ impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E> {
|
|||||||
|
|
||||||
/// Exposes a trait for connecting asynchronously to servers.
|
/// Exposes a trait for connecting asynchronously to servers.
|
||||||
pub mod future {
|
pub mod future {
|
||||||
use REMOTE;
|
use {REMOTE, framed};
|
||||||
use futures::{self, Async, Future};
|
use futures::{self, Async, Future};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::io;
|
use std::io;
|
||||||
@@ -81,6 +90,7 @@ pub mod future {
|
|||||||
use super::Client;
|
use super::Client;
|
||||||
use tokio_core::net::TcpStream;
|
use tokio_core::net::TcpStream;
|
||||||
use tokio_core::{self, reactor};
|
use tokio_core::{self, reactor};
|
||||||
|
use tokio_proto::BindClient;
|
||||||
|
|
||||||
/// Types that can connect to a server asynchronously.
|
/// Types that can connect to a server asynchronously.
|
||||||
pub trait Connect<'a>: Sized {
|
pub trait Connect<'a>: Sized {
|
||||||
@@ -104,11 +114,19 @@ pub mod future {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// A future that resolves to a `Client` or an `io::Error`.
|
/// A future that resolves to a `Client` or an `io::Error`.
|
||||||
pub struct ConnectFuture<Req, Resp, E> {
|
pub struct ConnectFuture<Req, Resp, E>
|
||||||
|
where Req: Serialize + 'static,
|
||||||
|
Resp: Deserialize + 'static,
|
||||||
|
E: Deserialize + 'static,
|
||||||
|
{
|
||||||
inner: futures::Oneshot<io::Result<Client<Req, Resp, E>>>,
|
inner: futures::Oneshot<io::Result<Client<Req, Resp, E>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Req, Resp, E> Future for ConnectFuture<Req, Resp, E> {
|
impl<Req, Resp, E> Future for ConnectFuture<Req, Resp, E>
|
||||||
|
where Req: Serialize + 'static,
|
||||||
|
Resp: Deserialize + 'static,
|
||||||
|
E: Deserialize + 'static,
|
||||||
|
{
|
||||||
type Item = Client<Req, Resp, E>;
|
type Item = Client<Req, Resp, E>;
|
||||||
type Error = io::Error;
|
type Error = io::Error;
|
||||||
|
|
||||||
@@ -129,9 +147,9 @@ pub mod future {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, Req, Resp, E> Future for ConnectWithFuture<'a, Req, Resp, E>
|
impl<'a, Req, Resp, E> Future for ConnectWithFuture<'a, Req, Resp, E>
|
||||||
where Req: Serialize + Send + 'static,
|
where Req: Serialize + Sync + Send + 'static,
|
||||||
Resp: Deserialize + Send + 'static,
|
Resp: Deserialize + Sync + Send + 'static,
|
||||||
E: Deserialize + Send + 'static
|
E: Deserialize + Sync + Send + 'static
|
||||||
{
|
{
|
||||||
type Item = Client<Req, Resp, E>;
|
type Item = Client<Req, Resp, E>;
|
||||||
type Error = io::Error;
|
type Error = io::Error;
|
||||||
@@ -150,21 +168,21 @@ pub mod future {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, Req, Resp, E> FnOnce<(TcpStream,)> for MultiplexConnect<'a, Req, Resp, E>
|
impl<'a, Req, Resp, E> FnOnce<(TcpStream,)> for MultiplexConnect<'a, Req, Resp, E>
|
||||||
where Req: Serialize + Send + 'static,
|
where Req: Serialize + Sync + Send + 'static,
|
||||||
Resp: Deserialize + Send + 'static,
|
Resp: Deserialize + Sync + Send + 'static,
|
||||||
E: Deserialize + Send + 'static
|
E: Deserialize + Sync + Send + 'static
|
||||||
{
|
{
|
||||||
type Output = Client<Req, Resp, E>;
|
type Output = Client<Req, Resp, E>;
|
||||||
|
|
||||||
extern "rust-call" fn call_once(self, (tcp,): (TcpStream,)) -> Client<Req, Resp, E> {
|
extern "rust-call" fn call_once(self, (tcp,): (TcpStream,)) -> Client<Req, Resp, E> {
|
||||||
Client::new(tcp, self.0)
|
Client::new(framed::Proto::new().bind_client(self.0, tcp))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, Req, Resp, E> Connect<'a> for Client<Req, Resp, E>
|
impl<'a, Req, Resp, E> Connect<'a> for Client<Req, Resp, E>
|
||||||
where Req: Serialize + Send + 'static,
|
where Req: Serialize + Sync + Send + 'static,
|
||||||
Resp: Deserialize + Send + 'static,
|
Resp: Deserialize + Sync + Send + 'static,
|
||||||
E: Deserialize + Send + 'static
|
E: Deserialize + Sync + Send + 'static
|
||||||
{
|
{
|
||||||
type ConnectFut = ConnectFuture<Req, Resp, E>;
|
type ConnectFut = ConnectFuture<Req, Resp, E>;
|
||||||
type ConnectWithFut = ConnectWithFuture<'a, Req, Resp, E>;
|
type ConnectWithFut = ConnectWithFuture<'a, Req, Resp, E>;
|
||||||
@@ -175,7 +193,7 @@ pub mod future {
|
|||||||
remote.spawn(move |handle| {
|
remote.spawn(move |handle| {
|
||||||
let handle2 = handle.clone();
|
let handle2 = handle.clone();
|
||||||
TcpStream::connect(&addr, handle)
|
TcpStream::connect(&addr, handle)
|
||||||
.map(move |tcp| Client::new(tcp, &handle2))
|
.map(move |tcp| Client::new(framed::Proto::new().bind_client(&handle2, tcp)))
|
||||||
.then(move |result| {
|
.then(move |result| {
|
||||||
tx.complete(result);
|
tx.complete(result);
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -207,9 +225,9 @@ pub mod sync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<Req, Resp, E> Connect for Client<Req, Resp, E>
|
impl<Req, Resp, E> Connect for Client<Req, Resp, E>
|
||||||
where Req: Serialize + Send + 'static,
|
where Req: Serialize + Sync + Send + 'static,
|
||||||
Resp: Deserialize + Send + 'static,
|
Resp: Deserialize + Sync + Send + 'static,
|
||||||
E: Deserialize + Send + 'static
|
E: Deserialize + Sync + Send + 'static
|
||||||
{
|
{
|
||||||
fn connect<A>(addr: A) -> Result<Self, io::Error>
|
fn connect<A>(addr: A) -> Result<Self, io::Error>
|
||||||
where A: ToSocketAddrs
|
where A: ToSocketAddrs
|
||||||
|
|||||||
@@ -3,7 +3,7 @@
|
|||||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||||
// This file may not be copied, modified, or distributed except according to those terms.
|
// This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
use {bincode, tokio_proto as proto};
|
use bincode;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::{fmt, io};
|
use std::{fmt, io};
|
||||||
use std::error::Error as StdError;
|
use std::error::Error as StdError;
|
||||||
@@ -75,15 +75,6 @@ impl<E: SerializableError> StdError for Error<E> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<E> From<proto::Error<Error<E>>> for Error<E> {
|
|
||||||
fn from(err: proto::Error<Error<E>>) -> Self {
|
|
||||||
match err {
|
|
||||||
proto::Error::Transport(e) => e,
|
|
||||||
proto::Error::Io(e) => e.into(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<E> From<io::Error> for Error<E> {
|
impl<E> From<io::Error> for Error<E> {
|
||||||
fn from(err: io::Error) -> Self {
|
fn from(err: io::Error) -> Self {
|
||||||
Error::Io(err)
|
Error::Io(err)
|
||||||
|
|||||||
236
src/framed.rs
236
src/framed.rs
@@ -3,156 +3,157 @@
|
|||||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||||
// This file may not be copied, modified, or distributed except according to those terms.
|
// This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
|
use {serde, tokio_core};
|
||||||
use bincode::{SizeLimit, serde as bincode};
|
use bincode::{SizeLimit, serde as bincode};
|
||||||
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
|
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
|
||||||
use futures::{Async, Poll};
|
|
||||||
use serde;
|
|
||||||
use std::io::{self, Cursor};
|
use std::io::{self, Cursor};
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::mem;
|
use std::mem;
|
||||||
use tokio_core::easy::{self, EasyBuf, EasyFramed};
|
use tokio_core::io::{EasyBuf, Framed, Io};
|
||||||
use tokio_core::io::{FramedIo, Io};
|
use tokio_proto::streaming::multiplex::{self, RequestId};
|
||||||
use tokio_proto::multiplex::{self, RequestId};
|
use tokio_proto::multiplex::{ClientProto, ServerProto};
|
||||||
use util::Never;
|
use util::{Debugger, Never};
|
||||||
|
|
||||||
/// Handles the IO of tarpc messages. Similar to `tokio_core::easy::EasyFramed` except that it
|
|
||||||
/// hardcodes a parser and serializer suitable for tarpc messages.
|
|
||||||
pub struct Framed<I, In, Out> {
|
|
||||||
inner: EasyFramed<I, Parser<Out>, Serializer<In>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I, In, Out> Framed<I, In, Out> {
|
|
||||||
/// Constructs a new tarpc FramedIo
|
|
||||||
pub fn new(upstream: I) -> Framed<I, In, Out>
|
|
||||||
where I: Io,
|
|
||||||
In: serde::Serialize,
|
|
||||||
Out: serde::Deserialize
|
|
||||||
{
|
|
||||||
Framed { inner: EasyFramed::new(upstream, Parser::new(), Serializer::new()) }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The type of message sent and received by the transport.
|
/// The type of message sent and received by the transport.
|
||||||
pub type Frame<T> = multiplex::Frame<T, Never, io::Error>;
|
pub type Frame<T> = multiplex::Frame<T, Never, io::Error>;
|
||||||
|
|
||||||
impl<I, In, Out> FramedIo for Framed<I, In, Out>
|
|
||||||
where I: Io,
|
|
||||||
In: serde::Serialize,
|
|
||||||
Out: serde::Deserialize
|
|
||||||
{
|
|
||||||
type In = (RequestId, In);
|
|
||||||
type Out = Option<(RequestId, Result<Out, bincode::DeserializeError>)>;
|
|
||||||
|
|
||||||
fn poll_read(&mut self) -> Async<()> {
|
// `Req` is the type that `Codec` parses. `Resp` is the type it serializes.
|
||||||
self.inner.poll_read()
|
pub struct Codec<Req, Resp> {
|
||||||
}
|
state: CodecState,
|
||||||
|
_phantom_data: PhantomData<(Req, Resp)>,
|
||||||
fn poll_write(&mut self) -> Async<()> {
|
|
||||||
self.inner.poll_write()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read(&mut self) -> Poll<Self::Out, io::Error> {
|
|
||||||
self.inner.read()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn write(&mut self, req: Self::In) -> Poll<(), io::Error> {
|
|
||||||
self.inner.write(req)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn flush(&mut self) -> Poll<(), io::Error> {
|
|
||||||
self.inner.flush()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// `T` is the type that `Parser` parses.
|
enum CodecState {
|
||||||
struct Parser<T> {
|
|
||||||
state: ParserState,
|
|
||||||
_phantom_data: PhantomData<T>,
|
|
||||||
}
|
|
||||||
|
|
||||||
enum ParserState {
|
|
||||||
Id,
|
Id,
|
||||||
Len { id: u64 },
|
Len { id: u64 },
|
||||||
Payload { id: u64, len: u64 },
|
Payload { id: u64, len: u64 },
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Parser<T> {
|
impl<Req, Resp> Codec<Req, Resp> {
|
||||||
fn new() -> Self {
|
fn new() -> Self {
|
||||||
Parser {
|
Codec {
|
||||||
state: ParserState::Id,
|
state: CodecState::Id,
|
||||||
_phantom_data: PhantomData,
|
_phantom_data: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> easy::Parse for Parser<T>
|
impl<Req, Resp> tokio_core::io::Codec for Codec<Req, Resp>
|
||||||
where T: serde::Deserialize
|
where Req: serde::Deserialize,
|
||||||
|
Resp: serde::Serialize,
|
||||||
{
|
{
|
||||||
type Out = (RequestId, Result<T, bincode::DeserializeError>);
|
type Out = (RequestId, Resp);
|
||||||
|
type In = (RequestId, Result<Req, bincode::DeserializeError>);
|
||||||
|
|
||||||
fn parse(&mut self, buf: &mut EasyBuf) -> Poll<Self::Out, io::Error> {
|
fn encode(&mut self, (id, message): Self::Out, buf: &mut Vec<u8>) -> io::Result<()> {
|
||||||
use self::ParserState::*;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match self.state {
|
|
||||||
Id if buf.len() < mem::size_of::<u64>() => return Ok(Async::NotReady),
|
|
||||||
Id => {
|
|
||||||
self.state = Len { id: Cursor::new(&*buf.get_mut()).read_u64::<BigEndian>()? };
|
|
||||||
*buf = buf.split_off(mem::size_of::<u64>());
|
|
||||||
}
|
|
||||||
Len { .. } if buf.len() < mem::size_of::<u64>() => return Ok(Async::NotReady),
|
|
||||||
Len { id } => {
|
|
||||||
self.state = Payload {
|
|
||||||
id: id,
|
|
||||||
len: Cursor::new(&*buf.get_mut()).read_u64::<BigEndian>()?,
|
|
||||||
};
|
|
||||||
*buf = buf.split_off(mem::size_of::<u64>());
|
|
||||||
}
|
|
||||||
Payload { len, .. } if buf.len() < len as usize => return Ok(Async::NotReady),
|
|
||||||
Payload { id, .. } => {
|
|
||||||
let mut buf = buf.get_mut();
|
|
||||||
let result = bincode::deserialize_from(&mut Cursor::new(&mut *buf),
|
|
||||||
SizeLimit::Infinite);
|
|
||||||
// Clear any unread bytes so we don't read garbage on next request.
|
|
||||||
buf.clear();
|
|
||||||
// Reset the state machine because, either way, we're done processing this
|
|
||||||
// message.
|
|
||||||
self.state = Id;
|
|
||||||
|
|
||||||
return Ok(Async::Ready((id, result)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Serializer<T>(PhantomData<T>);
|
|
||||||
|
|
||||||
impl<T> Serializer<T> {
|
|
||||||
fn new() -> Self {
|
|
||||||
Serializer(PhantomData)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> easy::Serialize for Serializer<T>
|
|
||||||
where T: serde::Serialize
|
|
||||||
{
|
|
||||||
type In = (RequestId, T);
|
|
||||||
|
|
||||||
fn serialize(&mut self, (id, message): Self::In, buf: &mut Vec<u8>) {
|
|
||||||
buf.write_u64::<BigEndian>(id).unwrap();
|
buf.write_u64::<BigEndian>(id).unwrap();
|
||||||
|
trace!("Encoded request id = {} as {:?}", id, buf);
|
||||||
buf.write_u64::<BigEndian>(bincode::serialized_size(&message)).unwrap();
|
buf.write_u64::<BigEndian>(bincode::serialized_size(&message)).unwrap();
|
||||||
bincode::serialize_into(buf,
|
bincode::serialize_into(buf,
|
||||||
&message,
|
&message,
|
||||||
SizeLimit::Infinite)
|
SizeLimit::Infinite)
|
||||||
// TODO(tikue): handle err
|
// TODO(tikue): handle err
|
||||||
.expect("In bincode::serialize_into");
|
.expect("In bincode::serialize_into");
|
||||||
|
trace!("Encoded buffer: {:?}", buf);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn decode(&mut self, buf: &mut EasyBuf) -> Result<Option<Self::In>, io::Error> {
|
||||||
|
use self::CodecState::*;
|
||||||
|
trace!("Codec::decode: {:?}", buf.as_slice());
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match self.state {
|
||||||
|
Id if buf.len() < mem::size_of::<u64>() => {
|
||||||
|
trace!("--> Buf len is {}; waiting for 8 to parse id.", buf.len());
|
||||||
|
return Ok(None)
|
||||||
|
}
|
||||||
|
Id => {
|
||||||
|
let mut id_buf = buf.drain_to(mem::size_of::<u64>());
|
||||||
|
let id = Cursor::new(&mut id_buf).read_u64::<BigEndian>()?;
|
||||||
|
trace!("--> Parsed id = {} from {:?}", id, id_buf.as_slice());
|
||||||
|
self.state = Len { id: id };
|
||||||
|
}
|
||||||
|
Len { .. } if buf.len() < mem::size_of::<u64>() => {
|
||||||
|
trace!("--> Buf len is {}; waiting for 8 to parse packet length.", buf.len());
|
||||||
|
return Ok(None)
|
||||||
|
}
|
||||||
|
Len { id } => {
|
||||||
|
let len_buf = buf.drain_to(mem::size_of::<u64>());
|
||||||
|
let len = Cursor::new(len_buf).read_u64::<BigEndian>()?;
|
||||||
|
trace!("--> Parsed payload length = {}, remaining buffer length = {}",
|
||||||
|
len, buf.len());
|
||||||
|
self.state = Payload {
|
||||||
|
id: id,
|
||||||
|
len: len,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
Payload { len, .. } if buf.len() < len as usize => {
|
||||||
|
trace!("--> Buf len is {}; waiting for {} to parse payload.", buf.len(), len);
|
||||||
|
return Ok(None)
|
||||||
|
}
|
||||||
|
Payload { id, len } => {
|
||||||
|
let payload = buf.drain_to(len as usize);
|
||||||
|
let result = bincode::deserialize_from(&mut Cursor::new(payload),
|
||||||
|
SizeLimit::Infinite);
|
||||||
|
// Reset the state machine because, either way, we're done processing this
|
||||||
|
// message.
|
||||||
|
self.state = Id;
|
||||||
|
|
||||||
|
trace!("--> Parsed message: {:?}", Debugger(&result));
|
||||||
|
return Ok(Some((id, result)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Implements the `multiplex::ServerProto` trait.
|
||||||
|
pub struct Proto<Req, Resp>(PhantomData<(Req, Resp)>);
|
||||||
|
|
||||||
|
impl<Req, Resp> Proto<Req, Resp> {
|
||||||
|
/// Returns a new `Proto`.
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Proto(PhantomData)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, Req, Resp> ServerProto<T> for Proto<Req, Resp>
|
||||||
|
where T: Io + 'static,
|
||||||
|
Req: serde::Deserialize + 'static,
|
||||||
|
Resp: serde::Serialize + 'static,
|
||||||
|
{
|
||||||
|
type Response = Resp;
|
||||||
|
type Request = Result<Req, bincode::DeserializeError>;
|
||||||
|
type Error = io::Error;
|
||||||
|
type Transport = Framed<T, Codec<Req, Resp>>;
|
||||||
|
type BindTransport = Result<Self::Transport, io::Error>;
|
||||||
|
|
||||||
|
fn bind_transport(&self, io: T) -> Self::BindTransport {
|
||||||
|
Ok(io.framed(Codec::new()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, Req, Resp> ClientProto<T> for Proto<Req, Resp>
|
||||||
|
where T: Io + 'static,
|
||||||
|
Req: serde::Serialize + 'static,
|
||||||
|
Resp: serde::Deserialize + 'static,
|
||||||
|
{
|
||||||
|
type Response = Result<Resp, bincode::DeserializeError>;
|
||||||
|
type Request = Req;
|
||||||
|
type Error = io::Error;
|
||||||
|
type Transport = Framed<T, Codec<Resp, Req>>;
|
||||||
|
type BindTransport = Result<Self::Transport, io::Error>;
|
||||||
|
|
||||||
|
fn bind_transport(&self, io: T) -> Self::BindTransport {
|
||||||
|
Ok(io.framed(Codec::new()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn serialize() {
|
fn serialize() {
|
||||||
use tokio_core::easy::{Parse, Serialize};
|
use tokio_core::io::Codec as TokioCodec;
|
||||||
|
|
||||||
const MSG: (u64, (char, char, char)) = (4, ('a', 'b', 'c'));
|
const MSG: (u64, (char, char, char)) = (4, ('a', 'b', 'c'));
|
||||||
let mut buf = EasyBuf::new();
|
let mut buf = EasyBuf::new();
|
||||||
@@ -160,13 +161,14 @@ fn serialize() {
|
|||||||
|
|
||||||
// Serialize twice to check for idempotence.
|
// Serialize twice to check for idempotence.
|
||||||
for _ in 0..2 {
|
for _ in 0..2 {
|
||||||
Serializer::new().serialize(MSG, &mut vec);
|
let mut codec: Codec<(char, char, char), (char, char, char)> = Codec::new();
|
||||||
|
codec.encode(MSG, &mut vec).unwrap();
|
||||||
buf.get_mut().append(&mut vec);
|
buf.get_mut().append(&mut vec);
|
||||||
let actual: Poll<(u64, Result<(char, char, char), bincode::DeserializeError>), io::Error> =
|
let actual: Result<Option<(u64, Result<(char, char, char), bincode::DeserializeError>)>, io::Error> =
|
||||||
Parser::new().parse(&mut buf);
|
codec.decode(&mut buf);
|
||||||
|
|
||||||
match actual {
|
match actual {
|
||||||
Ok(Async::Ready((id, ref v))) if id == MSG.0 && *v.as_ref().unwrap() == MSG.1 => {}
|
Ok(Some((id, ref v))) if id == MSG.0 && *v.as_ref().unwrap() == MSG.1 => {}
|
||||||
bad => panic!("Expected {:?}, but got {:?}", Some(MSG), bad),
|
bad => panic!("Expected {:?}, but got {:?}", Some(MSG), bad),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -59,7 +59,7 @@
|
|||||||
//! ```
|
//! ```
|
||||||
//!
|
//!
|
||||||
#![deny(missing_docs)]
|
#![deny(missing_docs)]
|
||||||
#![feature(plugin, conservative_impl_trait, never_type, proc_macro, unboxed_closures, fn_traits)]
|
#![feature(plugin, conservative_impl_trait, never_type, proc_macro, unboxed_closures, fn_traits, specialization)]
|
||||||
#![plugin(tarpc_plugins)]
|
#![plugin(tarpc_plugins)]
|
||||||
|
|
||||||
extern crate byteorder;
|
extern crate byteorder;
|
||||||
@@ -68,6 +68,7 @@ extern crate bytes;
|
|||||||
extern crate lazy_static;
|
extern crate lazy_static;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
extern crate net2;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate serde_derive;
|
extern crate serde_derive;
|
||||||
extern crate take;
|
extern crate take;
|
||||||
@@ -95,8 +96,6 @@ pub use errors::{Error, SerializableError};
|
|||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub use errors::WireError;
|
pub use errors::WireError;
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub use framed::Framed;
|
|
||||||
#[doc(hidden)]
|
|
||||||
pub use server::{ListenFuture, Response, listen, listen_with};
|
pub use server::{ListenFuture, Response, listen, listen_with};
|
||||||
|
|
||||||
/// Provides some utility error types, as well as a trait for spawning futures on the default event
|
/// Provides some utility error types, as well as a trait for spawning futures on the default event
|
||||||
|
|||||||
@@ -393,10 +393,26 @@ macro_rules! service {
|
|||||||
/// Provides a function for starting the service. This is a separate trait from
|
/// Provides a function for starting the service. This is a separate trait from
|
||||||
/// `FutureService` to prevent collisions with the names of RPCs.
|
/// `FutureService` to prevent collisions with the names of RPCs.
|
||||||
pub trait FutureServiceExt: FutureService {
|
pub trait FutureServiceExt: FutureService {
|
||||||
|
fn listen(self, addr: ::std::net::SocketAddr) -> $crate::ListenFuture
|
||||||
|
{
|
||||||
|
let (tx, rx) = $crate::futures::oneshot();
|
||||||
|
$crate::REMOTE.spawn(move |handle|
|
||||||
|
Ok(tx.complete(Self::listen_with(self,
|
||||||
|
addr,
|
||||||
|
handle.clone()))));
|
||||||
|
$crate::ListenFuture::from_oneshot(rx)
|
||||||
|
}
|
||||||
|
|
||||||
/// Spawns the service, binding to the given address and running on
|
/// Spawns the service, binding to the given address and running on
|
||||||
/// the default tokio `Loop`.
|
/// the default tokio `Loop`.
|
||||||
fn listen(self, addr: ::std::net::SocketAddr) -> $crate::ListenFuture {
|
fn listen_with(self,
|
||||||
return $crate::listen(addr, __tarpc_service_AsyncServer(self));
|
addr: ::std::net::SocketAddr,
|
||||||
|
handle: $crate::tokio_core::reactor::Handle)
|
||||||
|
-> ::std::io::Result<::std::net::SocketAddr>
|
||||||
|
{
|
||||||
|
return $crate::listen_with(addr,
|
||||||
|
move || Ok(__tarpc_service_AsyncServer(self.clone())),
|
||||||
|
handle);
|
||||||
|
|
||||||
#[allow(non_camel_case_types)]
|
#[allow(non_camel_case_types)]
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@@ -461,10 +477,6 @@ macro_rules! service {
|
|||||||
type Error = ::std::io::Error;
|
type Error = ::std::io::Error;
|
||||||
type Future = __tarpc_service_FutureReply<__tarpc_service_S>;
|
type Future = __tarpc_service_FutureReply<__tarpc_service_S>;
|
||||||
|
|
||||||
fn poll_ready(&self) -> $crate::futures::Async<()> {
|
|
||||||
$crate::futures::Async::Ready(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn call(&self, __tarpc_service_request: Self::Request) -> Self::Future {
|
fn call(&self, __tarpc_service_request: Self::Request) -> Self::Future {
|
||||||
let __tarpc_service_request = match __tarpc_service_request {
|
let __tarpc_service_request = match __tarpc_service_request {
|
||||||
Ok(__tarpc_service_request) => __tarpc_service_request,
|
Ok(__tarpc_service_request) => __tarpc_service_request,
|
||||||
@@ -525,18 +537,29 @@ macro_rules! service {
|
|||||||
/// Provides a function for starting the service. This is a separate trait from
|
/// Provides a function for starting the service. This is a separate trait from
|
||||||
/// `SyncService` to prevent collisions with the names of RPCs.
|
/// `SyncService` to prevent collisions with the names of RPCs.
|
||||||
pub trait SyncServiceExt: SyncService {
|
pub trait SyncServiceExt: SyncService {
|
||||||
/// Spawns the service, binding to the given address and running on
|
|
||||||
/// the default tokio `Loop`.
|
|
||||||
fn listen<L>(self, addr: L)
|
fn listen<L>(self, addr: L)
|
||||||
-> ::std::io::Result<$crate::tokio_proto::server::ServerHandle>
|
-> ::std::io::Result<::std::net::SocketAddr>
|
||||||
where L: ::std::net::ToSocketAddrs
|
where L: ::std::net::ToSocketAddrs
|
||||||
{
|
{
|
||||||
let addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?;
|
let addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?;
|
||||||
|
let (tx, rx) = $crate::futures::oneshot();
|
||||||
|
$crate::REMOTE.spawn(move |handle| Ok(tx.complete(Self::listen_with(self, addr, handle.clone()))));
|
||||||
|
$crate::futures::Future::wait($crate::ListenFuture::from_oneshot(rx))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawns the service, binding to the given address and running on
|
||||||
|
/// the default tokio `Loop`.
|
||||||
|
fn listen_with<L>(self, addr: L, handle: $crate::tokio_core::reactor::Handle)
|
||||||
|
-> ::std::io::Result<::std::net::SocketAddr>
|
||||||
|
where L: ::std::net::ToSocketAddrs
|
||||||
|
{
|
||||||
let __tarpc_service_service = __SyncServer {
|
let __tarpc_service_service = __SyncServer {
|
||||||
service: self,
|
service: self,
|
||||||
};
|
};
|
||||||
return $crate::futures::Future::wait(
|
return FutureServiceExt::listen_with(
|
||||||
FutureServiceExt::listen(__tarpc_service_service, addr));
|
__tarpc_service_service,
|
||||||
|
$crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?,
|
||||||
|
handle);
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct __SyncServer<S> {
|
struct __SyncServer<S> {
|
||||||
@@ -810,8 +833,8 @@ mod functional_test {
|
|||||||
#[test]
|
#[test]
|
||||||
fn simple() {
|
fn simple() {
|
||||||
let _ = env_logger::init();
|
let _ = env_logger::init();
|
||||||
let handle = Server.listen("localhost:0".first_socket_addr()).unwrap();
|
let addr = Server.listen("localhost:0".first_socket_addr()).unwrap();
|
||||||
let client = SyncClient::connect(handle.local_addr()).unwrap();
|
let client = SyncClient::connect(addr).unwrap();
|
||||||
assert_eq!(3, client.add(1, 2).unwrap());
|
assert_eq!(3, client.add(1, 2).unwrap());
|
||||||
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
|
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
|
||||||
}
|
}
|
||||||
@@ -819,8 +842,8 @@ mod functional_test {
|
|||||||
#[test]
|
#[test]
|
||||||
fn other_service() {
|
fn other_service() {
|
||||||
let _ = env_logger::init();
|
let _ = env_logger::init();
|
||||||
let handle = Server.listen("localhost:0".first_socket_addr()).unwrap();
|
let addr = Server.listen("localhost:0".first_socket_addr()).unwrap();
|
||||||
let client = super::other_service::SyncClient::connect(handle.local_addr()).unwrap();
|
let client = super::other_service::SyncClient::connect(addr).expect("Could not connect!");
|
||||||
match client.foo().err().unwrap() {
|
match client.foo().err().unwrap() {
|
||||||
::Error::ServerDeserialize(_) => {} // good
|
::Error::ServerDeserialize(_) => {} // good
|
||||||
bad => panic!("Expected Error::ServerDeserialize but got {}", bad),
|
bad => panic!("Expected Error::ServerDeserialize but got {}", bad),
|
||||||
@@ -856,18 +879,31 @@ mod functional_test {
|
|||||||
#[test]
|
#[test]
|
||||||
fn simple() {
|
fn simple() {
|
||||||
let _ = env_logger::init();
|
let _ = env_logger::init();
|
||||||
let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||||
let client = FutureClient::connect(handle.local_addr()).wait().unwrap();
|
let client = FutureClient::connect(&addr).wait().unwrap();
|
||||||
assert_eq!(3, client.add(1, 2).wait().unwrap());
|
assert_eq!(3, client.add(1, 2).wait().unwrap());
|
||||||
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap());
|
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn concurrent() {
|
||||||
|
let _ = env_logger::init();
|
||||||
|
let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||||
|
let client = FutureClient::connect(&addr).wait().unwrap();
|
||||||
|
let req1 = client.add(1, 2);
|
||||||
|
let req2 = client.add(3, 4);
|
||||||
|
let req3 = client.hey("Tim".to_string());
|
||||||
|
assert_eq!(3, req1.wait().unwrap());
|
||||||
|
assert_eq!(7, req2.wait().unwrap());
|
||||||
|
assert_eq!("Hey, Tim.", req3.wait().unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn other_service() {
|
fn other_service() {
|
||||||
let _ = env_logger::init();
|
let _ = env_logger::init();
|
||||||
let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||||
let client =
|
let client =
|
||||||
super::other_service::FutureClient::connect(handle.local_addr()).wait().unwrap();
|
super::other_service::FutureClient::connect(&addr).wait().unwrap();
|
||||||
match client.foo().wait().err().unwrap() {
|
match client.foo().wait().err().unwrap() {
|
||||||
::Error::ServerDeserialize(_) => {} // good
|
::Error::ServerDeserialize(_) => {} // good
|
||||||
bad => panic!(r#"Expected Error::ServerDeserialize but got "{}""#, bad),
|
bad => panic!(r#"Expected Error::ServerDeserialize but got "{}""#, bad),
|
||||||
@@ -901,8 +937,8 @@ mod functional_test {
|
|||||||
use self::error_service::*;
|
use self::error_service::*;
|
||||||
let _ = env_logger::init();
|
let _ = env_logger::init();
|
||||||
|
|
||||||
let handle = ErrorServer.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
let addr = ErrorServer.listen("localhost:0".first_socket_addr()).wait().unwrap();
|
||||||
let client = FutureClient::connect(handle.local_addr()).wait().unwrap();
|
let client = FutureClient::connect(&addr).wait().unwrap();
|
||||||
client.bar()
|
client.bar()
|
||||||
.then(move |result| {
|
.then(move |result| {
|
||||||
match result.err().unwrap() {
|
match result.err().unwrap() {
|
||||||
@@ -916,7 +952,7 @@ mod functional_test {
|
|||||||
.wait()
|
.wait()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let client = SyncClient::connect(handle.local_addr()).unwrap();
|
let client = SyncClient::connect(&addr).unwrap();
|
||||||
match client.bar().err().unwrap() {
|
match client.bar().err().unwrap() {
|
||||||
::Error::App(e) => {
|
::Error::App(e) => {
|
||||||
assert_eq!(e.description(), "lol jk");
|
assert_eq!(e.description(), "lol jk");
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ use syntax::ext::base::{ExtCtxt, MacResult, DummyResult, MacEager};
|
|||||||
use syntax::ext::quote::rt::Span;
|
use syntax::ext::quote::rt::Span;
|
||||||
use syntax::parse::{self, token, PResult};
|
use syntax::parse::{self, token, PResult};
|
||||||
use syntax::parse::parser::{Parser, PathStyle};
|
use syntax::parse::parser::{Parser, PathStyle};
|
||||||
use syntax::parse::token::intern_and_get_ident;
|
use syntax::symbol::Symbol;
|
||||||
use syntax::ptr::P;
|
use syntax::ptr::P;
|
||||||
use syntax::tokenstream::TokenTree;
|
use syntax::tokenstream::TokenTree;
|
||||||
use syntax::util::small_vector::SmallVector;
|
use syntax::util::small_vector::SmallVector;
|
||||||
@@ -46,23 +46,12 @@ fn snake_to_camel(cx: &mut ExtCtxt, sp: Span, tts: &[TokenTree]) -> Box<MacResul
|
|||||||
// This code looks intimidating, but it's just iterating through the trait item's attributes
|
// This code looks intimidating, but it's just iterating through the trait item's attributes
|
||||||
// (NameValues), filtering out non-doc attributes, and replacing any {} in the doc string with
|
// (NameValues), filtering out non-doc attributes, and replacing any {} in the doc string with
|
||||||
// the original, snake_case ident.
|
// the original, snake_case ident.
|
||||||
for meta_item in item.attrs.iter_mut().map(|attr| &mut attr.node.value) {
|
for attr in item.attrs.iter_mut().filter(|attr| attr.is_sugared_doc) {
|
||||||
let updated = match meta_item.node {
|
if let NameValue(Spanned { node: Str(ref mut doc, _), .. }) = attr.value.node {
|
||||||
NameValue(ref name, _) if name == "doc" => {
|
*doc = Symbol::intern(&doc.as_str().replace("{}", &old_ident));
|
||||||
let mut updated = (**meta_item).clone();
|
} else {
|
||||||
if let NameValue(_, Spanned { node: Str(ref mut doc, _), .. }) = updated.node {
|
unreachable!()
|
||||||
let updated_doc = doc.replace("{}", &old_ident);
|
|
||||||
*doc = intern_and_get_ident(&updated_doc);
|
|
||||||
} else {
|
|
||||||
unreachable!()
|
|
||||||
};
|
|
||||||
Some(P(updated))
|
|
||||||
}
|
|
||||||
_ => None,
|
|
||||||
};
|
};
|
||||||
if let Some(updated) = updated {
|
|
||||||
*meta_item = updated;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
MacEager::trait_items(SmallVector::one(item))
|
MacEager::trait_items(SmallVector::one(item))
|
||||||
@@ -158,7 +147,7 @@ fn convert(ident: &mut Ident) -> String {
|
|||||||
// The Fut suffix is hardcoded right now; this macro isn't really meant to be general-purpose.
|
// The Fut suffix is hardcoded right now; this macro isn't really meant to be general-purpose.
|
||||||
camel_ty.push_str("Fut");
|
camel_ty.push_str("Fut");
|
||||||
|
|
||||||
*ident = Ident::with_empty_ctxt(token::intern(&camel_ty));
|
*ident = Ident::with_empty_ctxt(Symbol::intern(&camel_ty));
|
||||||
ident_str
|
ident_str
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,17 +3,17 @@
|
|||||||
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||||
// This file may not be copied, modified, or distributed except according to those terms.
|
// This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
use REMOTE;
|
use {REMOTE, net2};
|
||||||
use bincode::serde::DeserializeError;
|
use bincode::serde::DeserializeError;
|
||||||
use errors::WireError;
|
use errors::WireError;
|
||||||
use framed::Framed;
|
use framed::Proto;
|
||||||
use futures::{self, Async, Future};
|
use futures::{self, Async, Future, Stream};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
use tokio_core::net::TcpListener;
|
||||||
use tokio_core::reactor::Handle;
|
use tokio_core::reactor::Handle;
|
||||||
use tokio_proto::easy::multiplex;
|
use tokio_proto::BindServer;
|
||||||
use tokio_proto::server::{self, ServerHandle};
|
|
||||||
use tokio_service::NewService;
|
use tokio_service::NewService;
|
||||||
|
|
||||||
/// A message from server to client.
|
/// A message from server to client.
|
||||||
@@ -29,15 +29,15 @@ pub fn listen<S, Req, Resp, E>(addr: SocketAddr, new_service: S) -> ListenFuture
|
|||||||
E: Serialize + 'static
|
E: Serialize + 'static
|
||||||
{
|
{
|
||||||
let (tx, rx) = futures::oneshot();
|
let (tx, rx) = futures::oneshot();
|
||||||
REMOTE.spawn(move |handle| Ok(tx.complete(listen_with(addr, new_service, handle))));
|
REMOTE.spawn(move |handle| Ok(tx.complete(listen_with(addr, new_service, handle.clone()))));
|
||||||
ListenFuture { inner: rx }
|
ListenFuture { inner: rx }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawns a service that binds to the given address using the given handle.
|
/// Spawns a service that binds to the given address using the given handle.
|
||||||
pub fn listen_with<S, Req, Resp, E>(addr: SocketAddr,
|
pub fn listen_with<S, Req, Resp, E>(addr: SocketAddr,
|
||||||
new_service: S,
|
new_service: S,
|
||||||
handle: &Handle)
|
handle: Handle)
|
||||||
-> io::Result<ServerHandle>
|
-> io::Result<SocketAddr>
|
||||||
where S: NewService<Request = Result<Req, DeserializeError>,
|
where S: NewService<Request = Result<Req, DeserializeError>,
|
||||||
Response = Response<Resp, E>,
|
Response = Response<Resp, E>,
|
||||||
Error = io::Error> + Send + 'static,
|
Error = io::Error> + Send + 'static,
|
||||||
@@ -45,19 +45,49 @@ pub fn listen_with<S, Req, Resp, E>(addr: SocketAddr,
|
|||||||
Resp: Serialize + 'static,
|
Resp: Serialize + 'static,
|
||||||
E: Serialize + 'static
|
E: Serialize + 'static
|
||||||
{
|
{
|
||||||
server::listen(handle, addr, move |stream| {
|
let listener = listener(&addr, &handle)?;
|
||||||
Ok(multiplex::EasyServer::new(new_service.new_service()?, Framed::new(stream))
|
let addr = listener.local_addr()?;
|
||||||
.map_err(|()| panic!("What do we do here")))
|
|
||||||
|
let handle2 = handle.clone();
|
||||||
|
let server = listener.incoming().for_each(move |(socket, _)| {
|
||||||
|
Proto::new().bind_server(&handle2, socket, new_service.new_service()?);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}).map_err(|e| error!("While processing incoming connections: {}", e));
|
||||||
|
handle.spawn(server);
|
||||||
|
Ok(addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn listener(addr: &SocketAddr,
|
||||||
|
handle: &Handle) -> io::Result<TcpListener> {
|
||||||
|
const PENDING_CONNECTION_BACKLOG: i32 = 1024;
|
||||||
|
|
||||||
|
match *addr {
|
||||||
|
SocketAddr::V4(_) => net2::TcpBuilder::new_v4(),
|
||||||
|
SocketAddr::V6(_) => net2::TcpBuilder::new_v6()
|
||||||
|
}?
|
||||||
|
.reuse_address(true)?
|
||||||
|
.bind(addr)?
|
||||||
|
.listen(PENDING_CONNECTION_BACKLOG)
|
||||||
|
.and_then(|l| {
|
||||||
|
TcpListener::from_listener(l, addr, handle)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A future that resolves to a `ServerHandle`.
|
/// A future that resolves to a `ServerHandle`.
|
||||||
pub struct ListenFuture {
|
pub struct ListenFuture {
|
||||||
inner: futures::Oneshot<io::Result<ServerHandle>>,
|
inner: futures::Oneshot<io::Result<SocketAddr>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ListenFuture {
|
||||||
|
#[doc(hidden)]
|
||||||
|
pub fn from_oneshot(rx: futures::Oneshot<io::Result<SocketAddr>>) -> Self {
|
||||||
|
ListenFuture { inner: rx }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Future for ListenFuture {
|
impl Future for ListenFuture {
|
||||||
type Item = ServerHandle;
|
type Item = SocketAddr;
|
||||||
type Error = io::Error;
|
type Error = io::Error;
|
||||||
|
|
||||||
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
|
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
|
||||||
|
|||||||
16
src/util.rs
16
src/util.rs
@@ -14,6 +14,7 @@ use tokio_core::reactor;
|
|||||||
|
|
||||||
/// A bottom type that impls `Error`, `Serialize`, and `Deserialize`. It is impossible to
|
/// A bottom type that impls `Error`, `Serialize`, and `Deserialize`. It is impossible to
|
||||||
/// instantiate this type.
|
/// instantiate this type.
|
||||||
|
#[allow(unreachable_code)]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Never(!);
|
pub struct Never(!);
|
||||||
|
|
||||||
@@ -135,3 +136,18 @@ pub fn spawn_core() -> reactor::Remote {
|
|||||||
});
|
});
|
||||||
rx.recv().unwrap()
|
rx.recv().unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A struct that will format as the contained type if the type impls Debug.
|
||||||
|
pub struct Debugger<'a, T: 'a>(pub &'a T);
|
||||||
|
|
||||||
|
impl<'a, T: fmt::Debug> fmt::Debug for Debugger<'a, T> {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||||
|
write!(f, "{:?}", self.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T> fmt::Debug for Debugger<'a, T> {
|
||||||
|
default fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||||
|
write!(f, "{{not debuggable}}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user