format with rustfmt 0.8.3 (#148)

This commit is contained in:
Jon Gjengset
2017-04-26 15:25:49 -04:00
committed by Tim
parent e2728d84f3
commit 8bc01a993b
21 changed files with 363 additions and 312 deletions

View File

@@ -40,7 +40,8 @@ impl FutureService for Server {
fn latency(bencher: &mut Bencher) {
let _ = env_logger::init();
let mut reactor = reactor::Core::new().unwrap();
let (handle, server) = Server.listen("localhost:0".first_socket_addr(),
let (handle, server) = Server
.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default())
.unwrap();

View File

@@ -57,13 +57,13 @@ impl FutureService for Server {
debug!("Server received read({}) no. {}", size, request_number);
self.pool
.spawn(futures::lazy(move || {
let mut vec = Vec::with_capacity(size as usize);
for i in 0..size {
vec.push(((i % 2) << 8) as u8);
}
debug!("Server sending response no. {}", request_number);
Ok(vec.into())
}))
let mut vec = Vec::with_capacity(size as usize);
for i in 0..size {
vec.push(((i % 2) << 8) as u8);
}
debug!("Server sending response no. {}", request_number);
Ok(vec.into())
}))
}
}
@@ -94,12 +94,12 @@ struct Stats {
fn spawn_core() -> reactor::Remote {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut core = reactor::Core::new().unwrap();
tx.send(core.handle().remote().clone()).unwrap();
let mut core = reactor::Core::new().unwrap();
tx.send(core.handle().remote().clone()).unwrap();
// Run forever
core.run(futures::empty::<(), !>()).unwrap();
});
// Run forever
core.run(futures::empty::<(), !>()).unwrap();
});
rx.recv().unwrap()
}
@@ -108,36 +108,37 @@ fn run_once(clients: Vec<FutureClient>,
-> impl Future<Item = (), Error = ()> + 'static {
let start = Instant::now();
futures::stream::futures_unordered((0..concurrency as usize)
.zip(clients.iter().enumerate().cycle())
.map(|(iteration, (client_idx, client))| {
let start = Instant::now();
debug!("Client {} reading (iteration {})...", client_idx, iteration);
client.read(CHUNK_SIZE)
.map(move |_| (client_idx, iteration, start))
}))
.map(|(client_idx, iteration, start)| {
let elapsed = start.elapsed();
debug!("Client {} received reply (iteration {}).",
client_idx,
iteration);
elapsed
})
.map_err(|e| panic!(e))
.fold(Stats::default(), move |mut stats, elapsed| {
stats.sum += elapsed;
stats.count += 1;
stats.min = Some(cmp::min(stats.min.unwrap_or(elapsed), elapsed));
stats.max = Some(cmp::max(stats.max.unwrap_or(elapsed), elapsed));
Ok(stats)
})
.map(move |stats| {
info!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs",
stats.count,
stats.sum.microseconds() as f64 / stats.count as f64,
stats.min.unwrap().microseconds(),
stats.max.unwrap().microseconds(),
start.elapsed().microseconds());
})
.zip(clients.iter().enumerate().cycle())
.map(|(iteration, (client_idx, client))| {
let start = Instant::now();
debug!("Client {} reading (iteration {})...", client_idx, iteration);
client
.read(CHUNK_SIZE)
.map(move |_| (client_idx, iteration, start))
}))
.map(|(client_idx, iteration, start)| {
let elapsed = start.elapsed();
debug!("Client {} received reply (iteration {}).",
client_idx,
iteration);
elapsed
})
.map_err(|e| panic!(e))
.fold(Stats::default(), move |mut stats, elapsed| {
stats.sum += elapsed;
stats.count += 1;
stats.min = Some(cmp::min(stats.min.unwrap_or(elapsed), elapsed));
stats.max = Some(cmp::max(stats.max.unwrap_or(elapsed), elapsed));
Ok(stats)
})
.map(move |stats| {
info!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs",
stats.count,
stats.sum.microseconds() as f64 / stats.count as f64,
stats.min.unwrap().microseconds(),
stats.max.unwrap().microseconds(),
start.elapsed().microseconds());
})
}
fn main() {
@@ -145,23 +146,25 @@ fn main() {
let matches = App::new("Tarpc Concurrency")
.about("Demonstrates making concurrent requests to a tarpc service.")
.arg(Arg::with_name("concurrency")
.short("c")
.long("concurrency")
.value_name("LEVEL")
.help("Sets a custom concurrency level")
.takes_value(true))
.short("c")
.long("concurrency")
.value_name("LEVEL")
.help("Sets a custom concurrency level")
.takes_value(true))
.arg(Arg::with_name("clients")
.short("n")
.long("num_clients")
.value_name("AMOUNT")
.help("How many clients to distribute requests between")
.takes_value(true))
.short("n")
.long("num_clients")
.value_name("AMOUNT")
.help("How many clients to distribute requests between")
.takes_value(true))
.get_matches();
let concurrency = matches.value_of("concurrency")
let concurrency = matches
.value_of("concurrency")
.map(&str::parse)
.map(Result::unwrap)
.unwrap_or(10);
let num_clients = matches.value_of("clients")
let num_clients = matches
.value_of("clients")
.map(&str::parse)
.map(Result::unwrap)
.unwrap_or(4);

View File

@@ -58,10 +58,7 @@ impl subscriber::FutureService for Subscriber {
}
impl Subscriber {
fn listen(id: u32,
handle: &reactor::Handle,
options: server::Options)
-> server::Handle {
fn listen(id: u32, handle: &reactor::Handle, options: server::Options) -> server::Handle {
let (server_handle, server) = Subscriber { id: id }
.listen("localhost:0".first_socket_addr(), handle, options)
.unwrap();
@@ -103,12 +100,12 @@ impl publisher::FutureService for Publisher {
fn subscribe(&self, id: u32, address: SocketAddr) -> Self::SubscribeFut {
let clients = self.clients.clone();
Box::new(subscriber::FutureClient::connect(address, client::Options::default())
.map(move |subscriber| {
println!("Subscribing {}.", id);
clients.borrow_mut().insert(id, subscriber);
()
})
.map_err(|e| e.to_string().into()))
.map(move |subscriber| {
println!("Subscribing {}.", id);
clients.borrow_mut().insert(id, subscriber);
()
})
.map_err(|e| e.to_string().into()))
}
type UnsubscribeFut = Box<Future<Item = (), Error = Never>>;
@@ -133,19 +130,20 @@ fn main() {
let subscriber1 = Subscriber::listen(0, &reactor.handle(), server::Options::default());
let subscriber2 = Subscriber::listen(1, &reactor.handle(), server::Options::default());
let publisher =
reactor.run(publisher::FutureClient::connect(publisher_handle.addr(),
client::Options::default()))
.unwrap();
reactor.run(publisher.subscribe(0, subscriber1.addr())
.and_then(|_| publisher.subscribe(1, subscriber2.addr()))
.map_err(|e| panic!(e))
.and_then(|_| {
println!("Broadcasting...");
publisher.broadcast("hello to all".to_string())
})
.and_then(|_| publisher.unsubscribe(1))
.and_then(|_| publisher.broadcast("hi again".to_string())))
let publisher = reactor
.run(publisher::FutureClient::connect(publisher_handle.addr(), client::Options::default()))
.unwrap();
reactor
.run(publisher
.subscribe(0, subscriber1.addr())
.and_then(|_| publisher.subscribe(1, subscriber2.addr()))
.map_err(|e| panic!(e))
.and_then(|_| {
println!("Broadcasting...");
publisher.broadcast("hello to all".to_string())
})
.and_then(|_| publisher.unsubscribe(1))
.and_then(|_| publisher.broadcast("hi again".to_string())))
.unwrap();
thread::sleep(Duration::from_millis(300));
}

View File

@@ -55,10 +55,12 @@ impl SyncService for HelloServer {
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let handle = HelloServer.listen("localhost:10000", server::Options::default()).unwrap();
tx.send(handle.addr()).unwrap();
handle.run();
});
let handle = HelloServer
.listen("localhost:10000", server::Options::default())
.unwrap();
tx.send(handle.addr()).unwrap();
handle.run();
});
let client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap();
println!("{}", client.hello("Mom".to_string()).unwrap());
println!("{}", client.hello("".to_string()).unwrap_err());

View File

@@ -34,16 +34,18 @@ impl FutureService for HelloServer {
fn main() {
let mut reactor = reactor::Core::new().unwrap();
let (handle, server) = HelloServer.listen("localhost:10000".first_socket_addr(),
let (handle, server) = HelloServer
.listen("localhost:10000".first_socket_addr(),
&reactor.handle(),
server::Options::default())
.unwrap();
reactor.handle().spawn(server);
let options = client::Options::default().handle(reactor.handle());
reactor.run(FutureClient::connect(handle.addr(), options)
.map_err(tarpc::Error::from)
.and_then(|client| client.hello("Mom".to_string()))
.map(|resp| println!("{}", resp)))
reactor
.run(FutureClient::connect(handle.addr(), options)
.map_err(tarpc::Error::from)
.and_then(|client| client.hello("Mom".to_string()))
.map(|resp| println!("{}", resp)))
.unwrap();
}

View File

@@ -27,17 +27,21 @@ struct HelloServer;
impl SyncService for HelloServer {
fn hello(&self, name: String) -> Result<String, Never> {
Ok(format!("Hello from thread {}, {}!", thread::current().name().unwrap(), name))
Ok(format!("Hello from thread {}, {}!",
thread::current().name().unwrap(),
name))
}
}
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let handle = HelloServer.listen("localhost:0", server::Options::default()).unwrap();
tx.send(handle.addr()).unwrap();
handle.run();
});
let handle = HelloServer
.listen("localhost:0", server::Options::default())
.unwrap();
tx.send(handle.addr()).unwrap();
handle.run();
});
let client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap();
println!("{}", client.hello("Mom".to_string()).unwrap());
}

View File

@@ -72,14 +72,17 @@ impl DoubleFutureService for DoubleServer {
fn main() {
let _ = env_logger::init();
let mut reactor = reactor::Core::new().unwrap();
let (add, server) = AddServer.listen("localhost:0".first_socket_addr(),
let (add, server) = AddServer
.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default())
.unwrap();
reactor.handle().spawn(server);
let options = client::Options::default().handle(reactor.handle());
let add_client = reactor.run(add::FutureClient::connect(add.addr(), options)).unwrap();
let add_client = reactor
.run(add::FutureClient::connect(add.addr(), options))
.unwrap();
let (double, server) = DoubleServer::new(add_client)
.listen("localhost:0".first_socket_addr(),
@@ -88,14 +91,15 @@ fn main() {
.unwrap();
reactor.handle().spawn(server);
let double_client =
reactor.run(double::FutureClient::connect(double.addr(), client::Options::default()))
.unwrap();
reactor.run(futures::stream::futures_unordered((0..5).map(|i| double_client.double(i)))
.map_err(|e| println!("{}", e))
.for_each(|i| {
println!("{:?}", i);
Ok(())
}))
let double_client = reactor
.run(double::FutureClient::connect(double.addr(), client::Options::default()))
.unwrap();
reactor
.run(futures::stream::futures_unordered((0..5).map(|i| double_client.double(i)))
.map_err(|e| println!("{}", e))
.for_each(|i| {
println!("{:?}", i);
Ok(())
}))
.unwrap();
}

View File

@@ -58,9 +58,7 @@ impl DoubleServer {
impl DoubleSyncService for DoubleServer {
fn double(&self, x: i32) -> Result<i32, Message> {
self.client
.add(x, x)
.map_err(|e| e.to_string().into())
self.client.add(x, x).map_err(|e| e.to_string().into())
}
}
@@ -68,12 +66,13 @@ fn main() {
let _ = env_logger::init();
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let handle = AddServer.listen("localhost:0".first_socket_addr(),
server::Options::default())
.unwrap();
tx.send(handle.addr()).unwrap();
handle.run();
});
let handle = AddServer
.listen("localhost:0".first_socket_addr(),
server::Options::default())
.unwrap();
tx.send(handle.addr()).unwrap();
handle.run();
});
let add = rx.recv().unwrap();

View File

@@ -59,7 +59,8 @@ fn bench_tarpc(target: u64) {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut reactor = reactor::Core::new().unwrap();
let (addr, server) = Server.listen("localhost:0".first_socket_addr(),
let (addr, server) = Server
.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default())
.unwrap();
@@ -86,9 +87,9 @@ fn bench_tcp(target: u64) {
let l = net::TcpListener::bind("localhost:0").unwrap();
let addr = l.local_addr().unwrap();
thread::spawn(move || {
let (mut stream, _) = l.accept().unwrap();
while let Ok(_) = stream.write_all(&*BUF) {}
});
let (mut stream, _) = l.accept().unwrap();
while let Ok(_) = stream.write_all(&*BUF) {}
});
let mut stream = net::TcpStream::connect(&addr).unwrap();
let mut buf = vec![0; CHUNK_SIZE as usize];
let start = time::Instant::now();

View File

@@ -68,8 +68,8 @@ fn main() {
thread::spawn(move || {
let mut reactor = reactor::Core::new().unwrap();
let (handle, server) = Bar.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default())
&reactor.handle(),
server::Options::default())
.unwrap();
tx.send(handle).unwrap();
reactor.run(server).unwrap();
@@ -83,8 +83,8 @@ fn main() {
thread::spawn(move || {
let mut reactor = reactor::Core::new().unwrap();
let (handle, server) = Baz.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default())
&reactor.handle(),
server::Options::default())
.unwrap();
tx.send(handle).unwrap();
reactor.run(server).unwrap();

View File

@@ -212,10 +212,11 @@ impl<Req, Resp, E> ClientExt for Client<Req, Resp, E>
#[cfg(feature = "tls")]
match tls_ctx {
Some(tls_ctx) => {
future::Either::A(tls_ctx.tls_connector
.connect_async(&tls_ctx.domain, socket)
.map(StreamType::Tls)
.map_err(native_to_io))
future::Either::A(tls_ctx
.tls_connector
.connect_async(&tls_ctx.domain, socket)
.map(StreamType::Tls)
.map_err(native_to_io))
}
None => future::Either::B(future::ok(StreamType::Tcp(socket))),
}

View File

@@ -69,8 +69,8 @@ impl<S: NewService> NewService for TrackingNewService<S> {
fn new_service(&self) -> io::Result<Self::Instance> {
self.connection_tracker.increment();
Ok(TrackingService {
service: self.new_service.new_service()?,
tracker: self.connection_tracker.clone(),
})
service: self.new_service.new_service()?,
tracker: self.connection_tracker.clone(),
})
}
}

View File

@@ -12,9 +12,9 @@ use std::fmt;
use std::io;
use std::net::SocketAddr;
use stream_type::StreamType;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::net::{Incoming, TcpListener, TcpStream};
use tokio_core::reactor;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_proto::BindServer;
use tokio_service::NewService;
@@ -59,9 +59,9 @@ enum Acceptor {
struct Accept {
#[cfg(feature = "tls")]
inner: futures::Either<futures::MapErr<futures::Map<AcceptAsync<TcpStream>,
fn(TlsStream<TcpStream>) -> StreamType>,
fn(native_tls::Error) -> io::Error>,
futures::FutureResult<StreamType, io::Error>>,
fn(TlsStream<TcpStream>) -> StreamType>,
fn(native_tls::Error) -> io::Error>,
futures::FutureResult<StreamType, io::Error>>,
#[cfg(not(feature = "tls"))]
inner: futures::FutureResult<StreamType, io::Error>,
}
@@ -82,20 +82,19 @@ impl Acceptor {
Accept {
inner: match *self {
Acceptor::Tls(ref tls_acceptor) => {
futures::Either::A(tls_acceptor.accept_async(socket)
.map(StreamType::Tls as _)
.map_err(native_to_io))
futures::Either::A(tls_acceptor
.accept_async(socket)
.map(StreamType::Tls as _)
.map_err(native_to_io))
}
Acceptor::Tcp => futures::Either::B(futures::ok(StreamType::Tcp(socket))),
}
},
}
}
#[cfg(not(feature = "tls"))]
fn accept(&self, socket: TcpStream) -> Accept {
Accept {
inner: futures::ok(StreamType::Tcp(socket))
}
Accept { inner: futures::ok(StreamType::Tcp(socket)) }
}
}
@@ -144,7 +143,7 @@ struct AcceptStream<S> {
}
impl<S> Stream for AcceptStream<S>
where S: Stream<Item=(TcpStream, SocketAddr), Error = io::Error>,
where S: Stream<Item = (TcpStream, SocketAddr), Error = io::Error>
{
type Item = <Accept as Future>::Item;
type Error = io::Error;
@@ -167,7 +166,7 @@ impl<S> Stream for AcceptStream<S>
self.future = None;
Err(e)
}
Ok(Async::NotReady) => Ok(Async::NotReady)
Ok(Async::NotReady) => Ok(Async::NotReady),
}
}
}
@@ -183,9 +182,7 @@ pub struct Options {
impl Default for Options {
#[cfg(not(feature = "tls"))]
fn default() -> Self {
Options {
max_payload_size: 2 << 20,
}
Options { max_payload_size: 2 << 20 }
}
#[cfg(feature = "tls")]
@@ -221,7 +218,12 @@ impl fmt::Debug for Options {
let mut debug_struct = fmt.debug_struct("Options");
#[cfg(feature = "tls")]
debug_struct.field("tls_acceptor", if self.tls_acceptor.is_some() { SOME } else { NONE });
debug_struct.field("tls_acceptor",
if self.tls_acceptor.is_some() {
SOME
} else {
NONE
});
debug_struct.finish()
}
}
@@ -243,8 +245,11 @@ pub fn listen<S, Req, Resp, E>(new_service: S,
Resp: Serialize + 'static,
E: Serialize + 'static
{
let (addr, shutdown, server) = listen_with(
new_service, addr, handle, options.max_payload_size, Acceptor::from(options))?;
let (addr, shutdown, server) = listen_with(new_service,
addr,
handle,
options.max_payload_size,
Acceptor::from(options))?;
Ok((Handle {
addr: addr,
shutdown: shutdown,
@@ -299,7 +304,8 @@ fn listener(addr: &SocketAddr, handle: &reactor::Handle) -> io::Result<TcpListen
}?;
configure_tcp(&builder)?;
builder.reuse_address(true)?;
builder.bind(addr)?
builder
.bind(addr)?
.listen(PENDING_CONNECTION_BACKLOG)
.and_then(|l| TcpListener::from_listener(l, addr, handle))
}
@@ -325,15 +331,15 @@ struct BindStream<S, St> {
impl<S, St> fmt::Debug for BindStream<S, St>
where S: fmt::Debug,
St: fmt::Debug,
St: fmt::Debug
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
const HANDLE: &'static &'static str = &"Handle { .. }";
f.debug_struct("BindStream")
.field("handle", HANDLE)
.field("new_service", &self.new_service)
.field("stream", &self.stream)
.finish()
.field("handle", HANDLE)
.field("new_service", &self.new_service)
.field("stream", &self.stream)
.finish()
}
}
@@ -345,14 +351,15 @@ impl<S, Req, Resp, E, I, St> BindStream<S, St>
Resp: Serialize + 'static,
E: Serialize + 'static,
I: AsyncRead + AsyncWrite + 'static,
St: Stream<Item=I, Error=io::Error>,
St: Stream<Item = I, Error = io::Error>
{
fn bind_each(&mut self) -> Poll<(), io::Error> {
loop {
match try!(self.stream.poll()) {
Async::Ready(Some(socket)) => {
Proto::new(self.max_payload_size)
.bind_server(&self.handle, socket, self.new_service.new_service()?);
Proto::new(self.max_payload_size).bind_server(&self.handle,
socket,
self.new_service.new_service()?);
}
Async::Ready(None) => return Ok(Async::Ready(())),
Async::NotReady => return Ok(Async::NotReady),
@@ -369,7 +376,7 @@ impl<S, Req, Resp, E, I, St> Future for BindStream<S, St>
Resp: Serialize + 'static,
E: Serialize + 'static,
I: AsyncRead + AsyncWrite + 'static,
St: Stream<Item=I, Error=io::Error>,
St: Stream<Item = I, Error = io::Error>
{
type Item = ();
type Error = ();
@@ -396,8 +403,7 @@ pub struct Listen<S, Req, Resp, E>
Resp: Serialize + 'static,
E: Serialize + 'static
{
inner: AlwaysOkUnit<futures::Select<BindStream<S, AcceptStream<Incoming>>,
shutdown::Watcher>>,
inner: AlwaysOkUnit<futures::Select<BindStream<S, AcceptStream<Incoming>>, shutdown::Watcher>>,
}
impl<S, Req, Resp, E> Future for Listen<S, Req, Resp, E>
@@ -422,7 +428,7 @@ impl<S, Req, Resp, E> fmt::Debug for Listen<S, Req, Resp, E>
Error = io::Error> + 'static,
Req: Deserialize + 'static,
Resp: Serialize + 'static,
E: Serialize + 'static,
E: Serialize + 'static
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_struct("Listen").finish()
@@ -433,16 +439,16 @@ impl<S, Req, Resp, E> fmt::Debug for Listen<S, Req, Resp, E>
struct AlwaysOkUnit<F>(F);
impl<F> Future for AlwaysOkUnit<F>
where F: Future,
where F: Future
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
match self.0.poll() {
Ok(Async::Ready(_)) | Err(_) => Ok(Async::Ready(())),
Ok(Async::Ready(_)) |
Err(_) => Ok(Async::Ready(())),
Ok(Async::NotReady) => Ok(Async::NotReady),
}
}
}

View File

@@ -1,9 +1,10 @@
use super::{AlwaysOkUnit, connection};
use futures::{Async, Future, Poll, Stream, future as futures, stream};
use futures::sync::{mpsc, oneshot};
use futures::unsync;
use super::{AlwaysOkUnit, connection};
/// A hook to shut down a running server.
#[derive(Clone, Debug)]
pub struct Shutdown {
@@ -13,8 +14,7 @@ pub struct Shutdown {
/// A future that resolves when server shutdown completes.
#[derive(Debug)]
pub struct ShutdownFuture {
inner: futures::Either<futures::FutureResult<(), ()>,
AlwaysOkUnit<oneshot::Receiver<()>>>,
inner: futures::Either<futures::FutureResult<(), ()>, AlwaysOkUnit<oneshot::Receiver<()>>>,
}
impl Future for ShutdownFuture {
@@ -63,13 +63,13 @@ impl Watcher {
(connection_tx,
Shutdown { tx: shutdown_tx },
Watcher {
shutdown_rx: shutdown_rx.take(1),
connections: connections,
queued_error: None,
shutdown: None,
done: false,
num_connections: 0,
})
shutdown_rx: shutdown_rx.take(1),
connections: connections,
queued_error: None,
shutdown: None,
done: false,
num_connections: 0,
})
}
fn process_connection(&mut self, action: connection::Action) {
@@ -81,28 +81,28 @@ impl Watcher {
fn poll_shutdown_requests(&mut self) -> Poll<Option<()>, ()> {
Ok(Async::Ready(match try_ready!(self.shutdown_rx.poll()) {
Some(tx) => {
debug!("Received shutdown request.");
self.shutdown = Some(tx);
Some(())
}
None => None,
}))
Some(tx) => {
debug!("Received shutdown request.");
self.shutdown = Some(tx);
Some(())
}
None => None,
}))
}
fn poll_connections(&mut self) -> Poll<Option<()>, ()> {
Ok(Async::Ready(match try_ready!(self.connections.poll()) {
Some(action) => {
self.process_connection(action);
Some(())
}
None => None,
}))
Some(action) => {
self.process_connection(action);
Some(())
}
None => None,
}))
}
fn poll_shutdown_requests_and_connections(&mut self) -> Poll<Option<()>, ()> {
if let Some(e) = self.queued_error.take() {
return Err(e)
return Err(e);
}
match try!(self.poll_shutdown_requests()) {
@@ -178,4 +178,3 @@ impl Future for Watcher {
}
}
}

View File

@@ -189,12 +189,12 @@ lazy_static! {
fn spawn_core() -> reactor::Remote {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut core = reactor::Core::new().unwrap();
tx.send(core.handle().remote().clone()).unwrap();
let mut core = reactor::Core::new().unwrap();
tx.send(core.handle().remote().clone()).unwrap();
// Run forever
core.run(futures::empty::<(), !>()).unwrap();
});
// Run forever
core.run(futures::empty::<(), !>()).unwrap();
});
rx.recv().unwrap()
}

View File

@@ -1072,9 +1072,9 @@ mod functional_test {
#[test]
fn other_service() {
let _ = env_logger::init();
let (_, client, _) =
unwrap!(start_server_with_sync_client::<super::other_service::SyncClient,
Server>(Server));
let (_, client, _) = unwrap!(
start_server_with_sync_client::<super::other_service::SyncClient, Server>(Server)
);
match client.foo().err().expect("failed unwrap") {
::Error::RequestDeserialize(_) => {} // good
bad => panic!("Expected Error::RequestDeserialize but got {}", bad),
@@ -1111,7 +1111,9 @@ mod functional_test {
#[test]
fn bad_serialize() {
let handle = ().listen("localhost:0", server::Options::default()).unwrap();
let handle = ()
.listen("localhost:0", server::Options::default())
.unwrap();
let client = SyncClient::connect(handle.addr(), client::Options::default()).unwrap();
client.bad(Bad).err().unwrap();
}
@@ -1209,12 +1211,15 @@ mod functional_test {
let _ = env_logger::init();
let reactor = reactor::Core::new().unwrap();
let handle = Server.listen("localhost:0".first_socket_addr(),
let handle = Server
.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default())
.unwrap()
.0;
Server.listen(handle.addr(), &reactor.handle(), server::Options::default()).unwrap();
Server
.listen(handle.addr(), &reactor.handle(), server::Options::default())
.unwrap();
}
#[test]
@@ -1226,20 +1231,23 @@ mod functional_test {
let _ = env_logger::init();
let mut reactor = reactor::Core::new().unwrap();
let (handle, server) = Server.listen("localhost:0".first_socket_addr(),
let (handle, server) = Server
.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default())
.unwrap();
reactor.handle().spawn(server);
let client = FutureClient::connect(handle.addr(),
client::Options::default().handle(reactor.handle()));
client::Options::default()
.handle(reactor.handle()));
let client = unwrap!(reactor.run(client));
assert_eq!(reactor.run(client.add(1, 2)).unwrap(), 3);
drop(client);
let client = FutureClient::connect(handle.addr(),
client::Options::default().handle(reactor.handle()));
client::Options::default()
.handle(reactor.handle()));
let client = unwrap!(reactor.run(client));
assert_eq!(reactor.run(client.add(1, 2)).unwrap(), 3);
}
@@ -1259,13 +1267,16 @@ mod functional_test {
assert_eq!("Hey, Tim.",
reactor.run(client.hey("Tim".to_string())).unwrap());
let (handle, server) = Server.listen("localhost:0".first_socket_addr(),
let (handle, server) = Server
.listen("localhost:0".first_socket_addr(),
&reactor.handle(),
server::Options::default())
.unwrap();
reactor.handle().spawn(server);
let options = client::Options::default().handle(reactor.handle());
let client = reactor.run(FutureClient::connect(handle.addr(), options)).unwrap();
let client = reactor
.run(FutureClient::connect(handle.addr(), options))
.unwrap();
assert_eq!(3, reactor.run(client.add(1, 2)).unwrap());
assert_eq!("Hey, Tim.",
reactor.run(client.hey("Tim".to_string())).unwrap());
@@ -1298,16 +1309,18 @@ mod functional_test {
let (_, mut reactor, client) =
start_err_server_with_async_client::<FutureClient, ErrorServer>(ErrorServer).unwrap();
reactor.run(client.bar()
.then(move |result| {
match result.err().unwrap() {
::Error::App(e) => {
assert_eq!(e.description(), "lol jk");
Ok::<_, ()>(())
} // good
bad => panic!("Expected Error::App but got {:?}", bad),
}
}))
reactor
.run(client
.bar()
.then(move |result| {
match result.err().unwrap() {
::Error::App(e) => {
assert_eq!(e.description(), "lol jk");
Ok::<_, ()>(())
} // good
bad => panic!("Expected Error::App but got {:?}", bad),
}
}))
.unwrap();
}

View File

@@ -3,11 +3,11 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms.
use serde;
use bincode::{self, Infinite};
use byteorder::{BigEndian, ReadBytesExt};
use bytes::BytesMut;
use bytes::buf::BufMut;
use serde;
use std::io::{self, Cursor};
use std::marker::PhantomData;
use std::mem;
@@ -43,10 +43,12 @@ impl<Encode, Decode> Codec<Encode, Decode> {
fn too_big(payload_size: u64, max_payload_size: u64) -> io::Error {
warn!("Not sending too-big packet of size {} (max is {})",
payload_size, max_payload_size);
payload_size,
max_payload_size);
io::Error::new(io::ErrorKind::InvalidData,
format!("Maximum payload size is {} bytes but got a payload of {}",
max_payload_size, payload_size))
max_payload_size,
payload_size))
}
impl<Encode, Decode> Encoder for Codec<Encode, Decode>
@@ -66,9 +68,7 @@ impl<Encode, Decode> Encoder for Codec<Encode, Decode>
buf.put_u64::<BigEndian>(id);
trace!("Encoded request id = {} as {:?}", id, buf);
buf.put_u64::<BigEndian>(payload_size);
bincode::serialize_into(&mut buf.writer(),
&message,
Infinite)
bincode::serialize_into(&mut buf.writer(), &message, Infinite)
.map_err(|serialize_err| io::Error::new(io::ErrorKind::Other, serialize_err))?;
trace!("Encoded buffer: {:?}", buf);
Ok(())
@@ -121,8 +121,7 @@ impl<Encode, Decode> Decoder for Codec<Encode, Decode>
}
Payload { id, len } => {
let payload = buf.split_to(len as usize);
let result = bincode::deserialize_from(&mut Cursor::new(payload),
Infinite);
let result = bincode::deserialize_from(&mut Cursor::new(payload), Infinite);
// Reset the state machine because, either way, we're done processing this
// message.
self.state = Id;
@@ -146,7 +145,7 @@ impl<Encode, Decode> Proto<Encode, Decode> {
pub fn new(max_payload_size: u64) -> Self {
Proto {
max_payload_size: max_payload_size,
_phantom_data: PhantomData
_phantom_data: PhantomData,
}
}
}
@@ -190,8 +189,8 @@ fn serialize() {
for _ in 0..2 {
let mut codec: Codec<(char, char, char), (char, char, char)> = Codec::new(2_000_000);
codec.encode(MSG, &mut buf).unwrap();
let actual: Result<Option<(u64, Result<(char, char, char), bincode::Error>)>, io::Error> =
codec.decode(&mut buf);
let actual: Result<Option<(u64, Result<(char, char, char), bincode::Error>)>,
io::Error> = codec.decode(&mut buf);
match actual {
Ok(Some((id, ref v))) if id == MSG.0 && *v.as_ref().unwrap() == MSG.1 => {}
@@ -207,7 +206,11 @@ fn deserialize_big() {
let mut codec: Codec<Vec<u8>, Vec<u8>> = Codec::new(24);
let mut buf = BytesMut::with_capacity(40);
assert_eq!(codec.encode((0, vec![0; 24]), &mut buf).err().unwrap().kind(),
assert_eq!(codec
.encode((0, vec![0; 24]), &mut buf)
.err()
.unwrap()
.kind(),
io::ErrorKind::InvalidData);
// Header

View File

@@ -8,12 +8,12 @@ use std::io;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::mpsc;
use std::thread;
#[cfg(feature = "tls")]
use tls::client::Context;
use tokio_core::reactor;
use tokio_proto::util::client_proxy::{ClientProxy, Receiver, pair};
use tokio_service::Service;
use util::FirstSocketAddr;
#[cfg(feature = "tls")]
use tls::client::Context;
#[doc(hidden)]
pub struct Client<Req, Resp, E> {
@@ -29,9 +29,7 @@ impl<Req, Resp, E> Clone for Client<Req, Resp, E> {
impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E> {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
const PROXY: &'static &'static str = &"ClientProxy { .. }";
f.debug_struct("Client")
.field("proxy", PROXY)
.finish()
f.debug_struct("Client").field("proxy", PROXY).finish()
}
}
@@ -47,7 +45,6 @@ impl<Req, Resp, E> Client<Req, Resp, E>
// oneshot send.
self.proxy.call(request).wait()
}
}
/// Additional options to configure how the client connects and operates.
@@ -61,9 +58,7 @@ pub struct Options {
impl Default for Options {
#[cfg(not(feature = "tls"))]
fn default() -> Self {
Options {
max_payload_size: 2_000_000,
}
Options { max_payload_size: 2_000_000 }
}
#[cfg(feature = "tls")]
@@ -137,15 +132,13 @@ impl<Req, Resp, E> ClientExt for Client<Req, Resp, E>
{
let addr = addr.try_first_socket_addr()?;
let (connect_tx, connect_rx) = mpsc::channel();
thread::spawn(move || {
match RequestHandler::connect(addr, options) {
Ok((proxy, mut handler)) => {
connect_tx.send(Ok(proxy)).unwrap();
handler.handle_requests();
}
Err(e) => connect_tx.send(Err(e)).unwrap(),
}
});
thread::spawn(move || match RequestHandler::connect(addr, options) {
Ok((proxy, mut handler)) => {
connect_tx.send(Ok(proxy)).unwrap();
handler.handle_requests();
}
Err(e) => connect_tx.send(Err(e)).unwrap(),
});
Ok(connect_rx.recv().unwrap()?)
}
}
@@ -166,14 +159,17 @@ impl<Req, Resp, E> RequestHandler<Req, Resp, E, FutureClient<Req, Resp, E>>
{
/// Creates a new `RequestHandler` by connecting a `FutureClient` to the given address
/// using the given options.
fn connect(addr: SocketAddr, options: Options)
-> io::Result<(Client<Req, Resp, E>, Self)>
{
fn connect(addr: SocketAddr, options: Options) -> io::Result<(Client<Req, Resp, E>, Self)> {
let mut reactor = reactor::Core::new()?;
let options = (reactor.handle(), options).into();
let client = reactor.run(FutureClient::connect(addr, options))?;
let (proxy, requests) = pair();
Ok((Client { proxy }, RequestHandler { reactor, client, requests }))
Ok((Client { proxy },
RequestHandler {
reactor,
client,
requests,
}))
}
}
@@ -182,21 +178,26 @@ impl<Req, Resp, E, S> RequestHandler<Req, Resp, E, S>
Resp: Deserialize + 'static,
E: Deserialize + 'static,
S: Service<Request = Req, Response = Resp, Error = ::Error<E>>,
S::Future: 'static,
S::Future: 'static
{
fn handle_requests(&mut self) {
let RequestHandler { ref mut reactor, ref mut requests, ref mut client } = *self;
let RequestHandler {
ref mut reactor,
ref mut requests,
ref mut client,
} = *self;
let handle = reactor.handle();
let requests = requests
.map(|result| {
match result {
Ok(req) => req,
// The ClientProxy never sends Err currently
Err(e) => panic!("Unimplemented error handling in RequestHandler: {}", e),
}
})
.for_each(|(request, response_tx)| {
let request = client.call(request)
let requests =
requests
.map(|result| {
match result {
Ok(req) => req,
// The ClientProxy never sends Err currently
Err(e) => panic!("Unimplemented error handling in RequestHandler: {}", e),
}
})
.for_each(|(request, response_tx)| {
let request = client.call(request)
.then(move |response| {
// Safe to unwrap because clients always block on the response future.
response_tx.send(response)
@@ -204,9 +205,9 @@ impl<Req, Resp, E, S> RequestHandler<Req, Resp, E, S>
.expect("Client should block on response");
Ok(())
});
handle.spawn(request);
Ok(())
});
handle.spawn(request);
Ok(())
});
reactor.run(requests).unwrap();
}
}
@@ -230,7 +231,11 @@ fn handle_requests() {
let (request, requests) = ::futures::sync::mpsc::unbounded();
let reactor = reactor::Core::new().unwrap();
let client = Client;
let mut request_handler = RequestHandler { reactor, client, requests };
let mut request_handler = RequestHandler {
reactor,
client,
requests,
};
// Test that `handle_requests` returns when all request senders are dropped.
drop(request);
request_handler.handle_requests();

View File

@@ -2,17 +2,17 @@ use {bincode, future, num_cpus};
use future::server::{Response, Shutdown};
use futures::{Future, future as futures};
use futures::sync::oneshot;
#[cfg(feature = "tls")]
use native_tls_inner::TlsAcceptor;
use serde::{Deserialize, Serialize};
use std::io;
use std::fmt;
use std::io;
use std::net::SocketAddr;
use std::time::Duration;
use std::usize;
use thread_pool::{self, Sender, Task, ThreadPool};
use tokio_core::reactor;
use tokio_service::{NewService, Service};
#[cfg(feature = "tls")]
use native_tls_inner::TlsAcceptor;
/// Additional options to configure how the server operates.
#[derive(Debug)]
@@ -91,16 +91,19 @@ impl fmt::Debug for Handle {
const CORE: &'static &'static str = &"Core { .. }";
const SERVER: &'static &'static str = &"Box<Future<Item = (), Error = ()>>";
f.debug_struct("Handle").field("reactor", CORE)
.field("handle", &self.handle)
.field("server", SERVER)
.finish()
f.debug_struct("Handle")
.field("reactor", CORE)
.field("handle", &self.handle)
.field("server", SERVER)
.finish()
}
}
#[doc(hidden)]
pub fn listen<S, Req, Resp, E>(new_service: S, addr: SocketAddr, options: Options)
-> io::Result<Handle>
pub fn listen<S, Req, Resp, E>(new_service: S,
addr: SocketAddr,
options: Options)
-> io::Result<Handle>
where S: NewService<Request = Result<Req, bincode::Error>,
Response = Response<Resp, E>,
Error = io::Error> + 'static,
@@ -117,27 +120,33 @@ pub fn listen<S, Req, Resp, E>(new_service: S, addr: SocketAddr, options: Option
future::server::listen(new_service, addr, &reactor.handle(), options.opts)?;
let server = Box::new(server);
Ok(Handle {
reactor: reactor,
handle: handle,
server: server,
})
reactor: reactor,
handle: handle,
server: server,
})
}
/// A service that uses a thread pool.
struct NewThreadService<S> where S: NewService {
struct NewThreadService<S>
where S: NewService
{
new_service: S,
sender: Sender<ServiceTask<<S::Instance as Service>::Future>>,
_pool: ThreadPool<ServiceTask<<S::Instance as Service>::Future>>,
}
/// A service that runs by executing request handlers in a thread pool.
struct ThreadService<S> where S: Service {
struct ThreadService<S>
where S: Service
{
service: S,
sender: Sender<ServiceTask<S::Future>>,
}
/// A task that handles a single request.
struct ServiceTask<F> where F: Future {
struct ServiceTask<F>
where F: Future
{
future: F,
tx: oneshot::Sender<Result<F::Item, F::Error>>,
}
@@ -146,12 +155,16 @@ impl<S> NewThreadService<S>
where S: NewService,
<S::Instance as Service>::Future: Send + 'static,
S::Response: Send,
S::Error: Send,
S::Error: Send
{
/// Create a NewThreadService by wrapping another service.
fn new(new_service: S, pool: thread_pool::Builder) -> Self {
let (sender, _pool) = pool.build();
NewThreadService { new_service, sender, _pool }
NewThreadService {
new_service,
sender,
_pool,
}
}
}
@@ -159,7 +172,7 @@ impl<S> NewService for NewThreadService<S>
where S: NewService,
<S::Instance as Service>::Future: Send + 'static,
S::Response: Send,
S::Error: Send,
S::Error: Send
{
type Request = S::Request;
type Response = S::Response;
@@ -168,16 +181,16 @@ impl<S> NewService for NewThreadService<S>
fn new_service(&self) -> io::Result<Self::Instance> {
Ok(ThreadService {
service: self.new_service.new_service()?,
sender: self.sender.clone(),
})
service: self.new_service.new_service()?,
sender: self.sender.clone(),
})
}
}
impl<F> Task for ServiceTask<F>
where F: Future + Send + 'static,
F::Item: Send,
F::Error: Send,
F::Error: Send
{
fn run(self) {
// Don't care if sending fails. It just means the request is no longer
@@ -190,25 +203,26 @@ impl<S> Service for ThreadService<S>
where S: Service,
S::Future: Send + 'static,
S::Response: Send,
S::Error: Send,
S::Error: Send
{
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Future =
futures::AndThen<
futures::MapErr<
oneshot::Receiver<Result<Self::Response, Self::Error>>,
fn(oneshot::Canceled) -> Self::Error>,
Result<Self::Response, Self::Error>,
fn(Result<Self::Response, Self::Error>) -> Result<Self::Response, Self::Error>>;
type Future = futures::AndThen<futures::MapErr<oneshot::Receiver<Result<Self::Response,
Self::Error>>,
fn(oneshot::Canceled) -> Self::Error>,
Result<Self::Response, Self::Error>,
fn(Result<Self::Response, Self::Error>)
-> Result<Self::Response, Self::Error>>;
fn call(&self, request: Self::Request) -> Self::Future {
let (tx, rx) = oneshot::channel();
self.sender.send(ServiceTask {
future: self.service.call(request),
tx: tx,
}).unwrap();
self.sender
.send(ServiceTask {
future: self.service.call(request),
tx: tx,
})
.unwrap();
rx.map_err(unreachable as _).and_then(ident)
}
}
@@ -222,4 +236,3 @@ fn unreachable<T, U>(t: T) -> U
fn ident<T>(t: T) -> T {
t
}

View File

@@ -19,9 +19,9 @@ pub mod client {
/// validation.
pub fn new<S: Into<String>>(domain: S) -> Result<Self, Error> {
Ok(Context {
domain: domain.into(),
tls_connector: TlsConnector::builder()?.build()?,
})
domain: domain.into(),
tls_connector: TlsConnector::builder()?.build()?,
})
}
/// Construct a new `Context` using the provided domain and `TlsConnector`
@@ -41,11 +41,10 @@ pub mod client {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
const TLS_CONNECTOR: &'static &'static str = &"TlsConnector { .. }";
f.debug_struct("Context")
.field("domain", &self.domain)
.field("tls_connector", TLS_CONNECTOR)
.finish()
.field("domain", &self.domain)
.field("tls_connector", TLS_CONNECTOR)
.finish()
}
}
}

View File

@@ -122,9 +122,7 @@ pub fn lazy<F, A, R>(f: F, args: A) -> Lazy<F, A, R>
where F: FnOnce(A) -> R,
R: IntoFuture
{
Lazy {
inner: _Lazy::First(f, args),
}
Lazy { inner: _Lazy::First(f, args) }
}
/// A future which defers creation of the actual future until a callback is
@@ -146,7 +144,7 @@ enum _Lazy<F, A, R> {
impl<F, A, R> Lazy<F, A, R>
where F: FnOnce(A) -> R,
R: IntoFuture,
R: IntoFuture
{
fn get(&mut self) -> &mut R::Future {
match self.inner {
@@ -167,7 +165,7 @@ impl<F, A, R> Lazy<F, A, R>
impl<F, A, R> Future for Lazy<F, A, R>
where F: FnOnce(A) -> R,
R: IntoFuture,
R: IntoFuture
{
type Item = R::Item;
type Error = R::Error;