diff --git a/benches/latency.rs b/benches/latency.rs index 75ecbd7..79bd220 100644 --- a/benches/latency.rs +++ b/benches/latency.rs @@ -40,7 +40,8 @@ impl FutureService for Server { fn latency(bencher: &mut Bencher) { let _ = env_logger::init(); let mut reactor = reactor::Core::new().unwrap(); - let (handle, server) = Server.listen("localhost:0".first_socket_addr(), + let (handle, server) = Server + .listen("localhost:0".first_socket_addr(), &reactor.handle(), server::Options::default()) .unwrap(); diff --git a/examples/concurrency.rs b/examples/concurrency.rs index e49fb8d..807f4cf 100644 --- a/examples/concurrency.rs +++ b/examples/concurrency.rs @@ -57,13 +57,13 @@ impl FutureService for Server { debug!("Server received read({}) no. {}", size, request_number); self.pool .spawn(futures::lazy(move || { - let mut vec = Vec::with_capacity(size as usize); - for i in 0..size { - vec.push(((i % 2) << 8) as u8); - } - debug!("Server sending response no. {}", request_number); - Ok(vec.into()) - })) + let mut vec = Vec::with_capacity(size as usize); + for i in 0..size { + vec.push(((i % 2) << 8) as u8); + } + debug!("Server sending response no. {}", request_number); + Ok(vec.into()) + })) } } @@ -94,12 +94,12 @@ struct Stats { fn spawn_core() -> reactor::Remote { let (tx, rx) = mpsc::channel(); thread::spawn(move || { - let mut core = reactor::Core::new().unwrap(); - tx.send(core.handle().remote().clone()).unwrap(); + let mut core = reactor::Core::new().unwrap(); + tx.send(core.handle().remote().clone()).unwrap(); - // Run forever - core.run(futures::empty::<(), !>()).unwrap(); - }); + // Run forever + core.run(futures::empty::<(), !>()).unwrap(); + }); rx.recv().unwrap() } @@ -108,36 +108,37 @@ fn run_once(clients: Vec, -> impl Future + 'static { let start = Instant::now(); futures::stream::futures_unordered((0..concurrency as usize) - .zip(clients.iter().enumerate().cycle()) - .map(|(iteration, (client_idx, client))| { - let start = Instant::now(); - debug!("Client {} reading (iteration {})...", client_idx, iteration); - client.read(CHUNK_SIZE) - .map(move |_| (client_idx, iteration, start)) - })) - .map(|(client_idx, iteration, start)| { - let elapsed = start.elapsed(); - debug!("Client {} received reply (iteration {}).", - client_idx, - iteration); - elapsed - }) - .map_err(|e| panic!(e)) - .fold(Stats::default(), move |mut stats, elapsed| { - stats.sum += elapsed; - stats.count += 1; - stats.min = Some(cmp::min(stats.min.unwrap_or(elapsed), elapsed)); - stats.max = Some(cmp::max(stats.max.unwrap_or(elapsed), elapsed)); - Ok(stats) - }) - .map(move |stats| { - info!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs", - stats.count, - stats.sum.microseconds() as f64 / stats.count as f64, - stats.min.unwrap().microseconds(), - stats.max.unwrap().microseconds(), - start.elapsed().microseconds()); - }) + .zip(clients.iter().enumerate().cycle()) + .map(|(iteration, (client_idx, client))| { + let start = Instant::now(); + debug!("Client {} reading (iteration {})...", client_idx, iteration); + client + .read(CHUNK_SIZE) + .map(move |_| (client_idx, iteration, start)) + })) + .map(|(client_idx, iteration, start)| { + let elapsed = start.elapsed(); + debug!("Client {} received reply (iteration {}).", + client_idx, + iteration); + elapsed + }) + .map_err(|e| panic!(e)) + .fold(Stats::default(), move |mut stats, elapsed| { + stats.sum += elapsed; + stats.count += 1; + stats.min = Some(cmp::min(stats.min.unwrap_or(elapsed), elapsed)); + stats.max = Some(cmp::max(stats.max.unwrap_or(elapsed), elapsed)); + Ok(stats) + }) + .map(move |stats| { + info!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs", + stats.count, + stats.sum.microseconds() as f64 / stats.count as f64, + stats.min.unwrap().microseconds(), + stats.max.unwrap().microseconds(), + start.elapsed().microseconds()); + }) } fn main() { @@ -145,23 +146,25 @@ fn main() { let matches = App::new("Tarpc Concurrency") .about("Demonstrates making concurrent requests to a tarpc service.") .arg(Arg::with_name("concurrency") - .short("c") - .long("concurrency") - .value_name("LEVEL") - .help("Sets a custom concurrency level") - .takes_value(true)) + .short("c") + .long("concurrency") + .value_name("LEVEL") + .help("Sets a custom concurrency level") + .takes_value(true)) .arg(Arg::with_name("clients") - .short("n") - .long("num_clients") - .value_name("AMOUNT") - .help("How many clients to distribute requests between") - .takes_value(true)) + .short("n") + .long("num_clients") + .value_name("AMOUNT") + .help("How many clients to distribute requests between") + .takes_value(true)) .get_matches(); - let concurrency = matches.value_of("concurrency") + let concurrency = matches + .value_of("concurrency") .map(&str::parse) .map(Result::unwrap) .unwrap_or(10); - let num_clients = matches.value_of("clients") + let num_clients = matches + .value_of("clients") .map(&str::parse) .map(Result::unwrap) .unwrap_or(4); diff --git a/examples/pubsub.rs b/examples/pubsub.rs index e198fa8..a30ebe1 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -58,10 +58,7 @@ impl subscriber::FutureService for Subscriber { } impl Subscriber { - fn listen(id: u32, - handle: &reactor::Handle, - options: server::Options) - -> server::Handle { + fn listen(id: u32, handle: &reactor::Handle, options: server::Options) -> server::Handle { let (server_handle, server) = Subscriber { id: id } .listen("localhost:0".first_socket_addr(), handle, options) .unwrap(); @@ -103,12 +100,12 @@ impl publisher::FutureService for Publisher { fn subscribe(&self, id: u32, address: SocketAddr) -> Self::SubscribeFut { let clients = self.clients.clone(); Box::new(subscriber::FutureClient::connect(address, client::Options::default()) - .map(move |subscriber| { - println!("Subscribing {}.", id); - clients.borrow_mut().insert(id, subscriber); - () - }) - .map_err(|e| e.to_string().into())) + .map(move |subscriber| { + println!("Subscribing {}.", id); + clients.borrow_mut().insert(id, subscriber); + () + }) + .map_err(|e| e.to_string().into())) } type UnsubscribeFut = Box>; @@ -133,19 +130,20 @@ fn main() { let subscriber1 = Subscriber::listen(0, &reactor.handle(), server::Options::default()); let subscriber2 = Subscriber::listen(1, &reactor.handle(), server::Options::default()); - let publisher = - reactor.run(publisher::FutureClient::connect(publisher_handle.addr(), - client::Options::default())) - .unwrap(); - reactor.run(publisher.subscribe(0, subscriber1.addr()) - .and_then(|_| publisher.subscribe(1, subscriber2.addr())) - .map_err(|e| panic!(e)) - .and_then(|_| { - println!("Broadcasting..."); - publisher.broadcast("hello to all".to_string()) - }) - .and_then(|_| publisher.unsubscribe(1)) - .and_then(|_| publisher.broadcast("hi again".to_string()))) + let publisher = reactor + .run(publisher::FutureClient::connect(publisher_handle.addr(), client::Options::default())) + .unwrap(); + reactor + .run(publisher + .subscribe(0, subscriber1.addr()) + .and_then(|_| publisher.subscribe(1, subscriber2.addr())) + .map_err(|e| panic!(e)) + .and_then(|_| { + println!("Broadcasting..."); + publisher.broadcast("hello to all".to_string()) + }) + .and_then(|_| publisher.unsubscribe(1)) + .and_then(|_| publisher.broadcast("hi again".to_string()))) .unwrap(); thread::sleep(Duration::from_millis(300)); } diff --git a/examples/readme_errors.rs b/examples/readme_errors.rs index 36e3b44..5d33b68 100644 --- a/examples/readme_errors.rs +++ b/examples/readme_errors.rs @@ -55,10 +55,12 @@ impl SyncService for HelloServer { fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { - let handle = HelloServer.listen("localhost:10000", server::Options::default()).unwrap(); - tx.send(handle.addr()).unwrap(); - handle.run(); - }); + let handle = HelloServer + .listen("localhost:10000", server::Options::default()) + .unwrap(); + tx.send(handle.addr()).unwrap(); + handle.run(); + }); let client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); println!("{}", client.hello("".to_string()).unwrap_err()); diff --git a/examples/readme_futures.rs b/examples/readme_futures.rs index 5515c9e..3451e27 100644 --- a/examples/readme_futures.rs +++ b/examples/readme_futures.rs @@ -34,16 +34,18 @@ impl FutureService for HelloServer { fn main() { let mut reactor = reactor::Core::new().unwrap(); - let (handle, server) = HelloServer.listen("localhost:10000".first_socket_addr(), + let (handle, server) = HelloServer + .listen("localhost:10000".first_socket_addr(), &reactor.handle(), server::Options::default()) .unwrap(); reactor.handle().spawn(server); let options = client::Options::default().handle(reactor.handle()); - reactor.run(FutureClient::connect(handle.addr(), options) - .map_err(tarpc::Error::from) - .and_then(|client| client.hello("Mom".to_string())) - .map(|resp| println!("{}", resp))) + reactor + .run(FutureClient::connect(handle.addr(), options) + .map_err(tarpc::Error::from) + .and_then(|client| client.hello("Mom".to_string())) + .map(|resp| println!("{}", resp))) .unwrap(); } diff --git a/examples/readme_sync.rs b/examples/readme_sync.rs index f40a5a6..f19758b 100644 --- a/examples/readme_sync.rs +++ b/examples/readme_sync.rs @@ -27,17 +27,21 @@ struct HelloServer; impl SyncService for HelloServer { fn hello(&self, name: String) -> Result { - Ok(format!("Hello from thread {}, {}!", thread::current().name().unwrap(), name)) + Ok(format!("Hello from thread {}, {}!", + thread::current().name().unwrap(), + name)) } } fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { - let handle = HelloServer.listen("localhost:0", server::Options::default()).unwrap(); - tx.send(handle.addr()).unwrap(); - handle.run(); - }); + let handle = HelloServer + .listen("localhost:0", server::Options::default()) + .unwrap(); + tx.send(handle.addr()).unwrap(); + handle.run(); + }); let client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); } diff --git a/examples/server_calling_server.rs b/examples/server_calling_server.rs index 1e22786..9c0b353 100644 --- a/examples/server_calling_server.rs +++ b/examples/server_calling_server.rs @@ -72,14 +72,17 @@ impl DoubleFutureService for DoubleServer { fn main() { let _ = env_logger::init(); let mut reactor = reactor::Core::new().unwrap(); - let (add, server) = AddServer.listen("localhost:0".first_socket_addr(), + let (add, server) = AddServer + .listen("localhost:0".first_socket_addr(), &reactor.handle(), server::Options::default()) .unwrap(); reactor.handle().spawn(server); let options = client::Options::default().handle(reactor.handle()); - let add_client = reactor.run(add::FutureClient::connect(add.addr(), options)).unwrap(); + let add_client = reactor + .run(add::FutureClient::connect(add.addr(), options)) + .unwrap(); let (double, server) = DoubleServer::new(add_client) .listen("localhost:0".first_socket_addr(), @@ -88,14 +91,15 @@ fn main() { .unwrap(); reactor.handle().spawn(server); - let double_client = - reactor.run(double::FutureClient::connect(double.addr(), client::Options::default())) - .unwrap(); - reactor.run(futures::stream::futures_unordered((0..5).map(|i| double_client.double(i))) - .map_err(|e| println!("{}", e)) - .for_each(|i| { - println!("{:?}", i); - Ok(()) - })) + let double_client = reactor + .run(double::FutureClient::connect(double.addr(), client::Options::default())) + .unwrap(); + reactor + .run(futures::stream::futures_unordered((0..5).map(|i| double_client.double(i))) + .map_err(|e| println!("{}", e)) + .for_each(|i| { + println!("{:?}", i); + Ok(()) + })) .unwrap(); } diff --git a/examples/sync_server_calling_server.rs b/examples/sync_server_calling_server.rs index 59c13fe..8e39476 100644 --- a/examples/sync_server_calling_server.rs +++ b/examples/sync_server_calling_server.rs @@ -58,9 +58,7 @@ impl DoubleServer { impl DoubleSyncService for DoubleServer { fn double(&self, x: i32) -> Result { - self.client - .add(x, x) - .map_err(|e| e.to_string().into()) + self.client.add(x, x).map_err(|e| e.to_string().into()) } } @@ -68,12 +66,13 @@ fn main() { let _ = env_logger::init(); let (tx, rx) = mpsc::channel(); thread::spawn(move || { - let handle = AddServer.listen("localhost:0".first_socket_addr(), - server::Options::default()) - .unwrap(); - tx.send(handle.addr()).unwrap(); - handle.run(); - }); + let handle = AddServer + .listen("localhost:0".first_socket_addr(), + server::Options::default()) + .unwrap(); + tx.send(handle.addr()).unwrap(); + handle.run(); + }); let add = rx.recv().unwrap(); diff --git a/examples/throughput.rs b/examples/throughput.rs index 0c79b27..c757506 100644 --- a/examples/throughput.rs +++ b/examples/throughput.rs @@ -59,7 +59,8 @@ fn bench_tarpc(target: u64) { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let mut reactor = reactor::Core::new().unwrap(); - let (addr, server) = Server.listen("localhost:0".first_socket_addr(), + let (addr, server) = Server + .listen("localhost:0".first_socket_addr(), &reactor.handle(), server::Options::default()) .unwrap(); @@ -86,9 +87,9 @@ fn bench_tcp(target: u64) { let l = net::TcpListener::bind("localhost:0").unwrap(); let addr = l.local_addr().unwrap(); thread::spawn(move || { - let (mut stream, _) = l.accept().unwrap(); - while let Ok(_) = stream.write_all(&*BUF) {} - }); + let (mut stream, _) = l.accept().unwrap(); + while let Ok(_) = stream.write_all(&*BUF) {} + }); let mut stream = net::TcpStream::connect(&addr).unwrap(); let mut buf = vec![0; CHUNK_SIZE as usize]; let start = time::Instant::now(); diff --git a/examples/two_clients.rs b/examples/two_clients.rs index 1ef1000..3c5b941 100644 --- a/examples/two_clients.rs +++ b/examples/two_clients.rs @@ -68,8 +68,8 @@ fn main() { thread::spawn(move || { let mut reactor = reactor::Core::new().unwrap(); let (handle, server) = Bar.listen("localhost:0".first_socket_addr(), - &reactor.handle(), - server::Options::default()) + &reactor.handle(), + server::Options::default()) .unwrap(); tx.send(handle).unwrap(); reactor.run(server).unwrap(); @@ -83,8 +83,8 @@ fn main() { thread::spawn(move || { let mut reactor = reactor::Core::new().unwrap(); let (handle, server) = Baz.listen("localhost:0".first_socket_addr(), - &reactor.handle(), - server::Options::default()) + &reactor.handle(), + server::Options::default()) .unwrap(); tx.send(handle).unwrap(); reactor.run(server).unwrap(); diff --git a/src/future/client.rs b/src/future/client.rs index 85cf880..a6dd674 100644 --- a/src/future/client.rs +++ b/src/future/client.rs @@ -212,10 +212,11 @@ impl ClientExt for Client #[cfg(feature = "tls")] match tls_ctx { Some(tls_ctx) => { - future::Either::A(tls_ctx.tls_connector - .connect_async(&tls_ctx.domain, socket) - .map(StreamType::Tls) - .map_err(native_to_io)) + future::Either::A(tls_ctx + .tls_connector + .connect_async(&tls_ctx.domain, socket) + .map(StreamType::Tls) + .map_err(native_to_io)) } None => future::Either::B(future::ok(StreamType::Tcp(socket))), } diff --git a/src/future/server/connection.rs b/src/future/server/connection.rs index 7883ad3..43c653c 100644 --- a/src/future/server/connection.rs +++ b/src/future/server/connection.rs @@ -69,8 +69,8 @@ impl NewService for TrackingNewService { fn new_service(&self) -> io::Result { self.connection_tracker.increment(); Ok(TrackingService { - service: self.new_service.new_service()?, - tracker: self.connection_tracker.clone(), - }) + service: self.new_service.new_service()?, + tracker: self.connection_tracker.clone(), + }) } } diff --git a/src/future/server/mod.rs b/src/future/server/mod.rs index 4a42659..1b7a40b 100644 --- a/src/future/server/mod.rs +++ b/src/future/server/mod.rs @@ -12,9 +12,9 @@ use std::fmt; use std::io; use std::net::SocketAddr; use stream_type::StreamType; -use tokio_io::{AsyncRead, AsyncWrite}; use tokio_core::net::{Incoming, TcpListener, TcpStream}; use tokio_core::reactor; +use tokio_io::{AsyncRead, AsyncWrite}; use tokio_proto::BindServer; use tokio_service::NewService; @@ -59,9 +59,9 @@ enum Acceptor { struct Accept { #[cfg(feature = "tls")] inner: futures::Either, - fn(TlsStream) -> StreamType>, - fn(native_tls::Error) -> io::Error>, - futures::FutureResult>, + fn(TlsStream) -> StreamType>, + fn(native_tls::Error) -> io::Error>, + futures::FutureResult>, #[cfg(not(feature = "tls"))] inner: futures::FutureResult, } @@ -82,20 +82,19 @@ impl Acceptor { Accept { inner: match *self { Acceptor::Tls(ref tls_acceptor) => { - futures::Either::A(tls_acceptor.accept_async(socket) - .map(StreamType::Tls as _) - .map_err(native_to_io)) + futures::Either::A(tls_acceptor + .accept_async(socket) + .map(StreamType::Tls as _) + .map_err(native_to_io)) } Acceptor::Tcp => futures::Either::B(futures::ok(StreamType::Tcp(socket))), - } + }, } } #[cfg(not(feature = "tls"))] fn accept(&self, socket: TcpStream) -> Accept { - Accept { - inner: futures::ok(StreamType::Tcp(socket)) - } + Accept { inner: futures::ok(StreamType::Tcp(socket)) } } } @@ -144,7 +143,7 @@ struct AcceptStream { } impl Stream for AcceptStream - where S: Stream, + where S: Stream { type Item = ::Item; type Error = io::Error; @@ -167,7 +166,7 @@ impl Stream for AcceptStream self.future = None; Err(e) } - Ok(Async::NotReady) => Ok(Async::NotReady) + Ok(Async::NotReady) => Ok(Async::NotReady), } } } @@ -183,9 +182,7 @@ pub struct Options { impl Default for Options { #[cfg(not(feature = "tls"))] fn default() -> Self { - Options { - max_payload_size: 2 << 20, - } + Options { max_payload_size: 2 << 20 } } #[cfg(feature = "tls")] @@ -221,7 +218,12 @@ impl fmt::Debug for Options { let mut debug_struct = fmt.debug_struct("Options"); #[cfg(feature = "tls")] - debug_struct.field("tls_acceptor", if self.tls_acceptor.is_some() { SOME } else { NONE }); + debug_struct.field("tls_acceptor", + if self.tls_acceptor.is_some() { + SOME + } else { + NONE + }); debug_struct.finish() } } @@ -243,8 +245,11 @@ pub fn listen(new_service: S, Resp: Serialize + 'static, E: Serialize + 'static { - let (addr, shutdown, server) = listen_with( - new_service, addr, handle, options.max_payload_size, Acceptor::from(options))?; + let (addr, shutdown, server) = listen_with(new_service, + addr, + handle, + options.max_payload_size, + Acceptor::from(options))?; Ok((Handle { addr: addr, shutdown: shutdown, @@ -299,7 +304,8 @@ fn listener(addr: &SocketAddr, handle: &reactor::Handle) -> io::Result { impl fmt::Debug for BindStream where S: fmt::Debug, - St: fmt::Debug, + St: fmt::Debug { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { const HANDLE: &'static &'static str = &"Handle { .. }"; f.debug_struct("BindStream") - .field("handle", HANDLE) - .field("new_service", &self.new_service) - .field("stream", &self.stream) - .finish() + .field("handle", HANDLE) + .field("new_service", &self.new_service) + .field("stream", &self.stream) + .finish() } } @@ -345,14 +351,15 @@ impl BindStream Resp: Serialize + 'static, E: Serialize + 'static, I: AsyncRead + AsyncWrite + 'static, - St: Stream, + St: Stream { fn bind_each(&mut self) -> Poll<(), io::Error> { loop { match try!(self.stream.poll()) { Async::Ready(Some(socket)) => { - Proto::new(self.max_payload_size) - .bind_server(&self.handle, socket, self.new_service.new_service()?); + Proto::new(self.max_payload_size).bind_server(&self.handle, + socket, + self.new_service.new_service()?); } Async::Ready(None) => return Ok(Async::Ready(())), Async::NotReady => return Ok(Async::NotReady), @@ -369,7 +376,7 @@ impl Future for BindStream Resp: Serialize + 'static, E: Serialize + 'static, I: AsyncRead + AsyncWrite + 'static, - St: Stream, + St: Stream { type Item = (); type Error = (); @@ -396,8 +403,7 @@ pub struct Listen Resp: Serialize + 'static, E: Serialize + 'static { - inner: AlwaysOkUnit>, - shutdown::Watcher>>, + inner: AlwaysOkUnit>, shutdown::Watcher>>, } impl Future for Listen @@ -422,7 +428,7 @@ impl fmt::Debug for Listen Error = io::Error> + 'static, Req: Deserialize + 'static, Resp: Serialize + 'static, - E: Serialize + 'static, + E: Serialize + 'static { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { f.debug_struct("Listen").finish() @@ -433,16 +439,16 @@ impl fmt::Debug for Listen struct AlwaysOkUnit(F); impl Future for AlwaysOkUnit - where F: Future, + where F: Future { type Item = (); type Error = (); fn poll(&mut self) -> Poll<(), ()> { match self.0.poll() { - Ok(Async::Ready(_)) | Err(_) => Ok(Async::Ready(())), + Ok(Async::Ready(_)) | + Err(_) => Ok(Async::Ready(())), Ok(Async::NotReady) => Ok(Async::NotReady), } } } - diff --git a/src/future/server/shutdown.rs b/src/future/server/shutdown.rs index c41221a..9af05a9 100644 --- a/src/future/server/shutdown.rs +++ b/src/future/server/shutdown.rs @@ -1,9 +1,10 @@ + + +use super::{AlwaysOkUnit, connection}; use futures::{Async, Future, Poll, Stream, future as futures, stream}; use futures::sync::{mpsc, oneshot}; use futures::unsync; -use super::{AlwaysOkUnit, connection}; - /// A hook to shut down a running server. #[derive(Clone, Debug)] pub struct Shutdown { @@ -13,8 +14,7 @@ pub struct Shutdown { /// A future that resolves when server shutdown completes. #[derive(Debug)] pub struct ShutdownFuture { - inner: futures::Either, - AlwaysOkUnit>>, + inner: futures::Either, AlwaysOkUnit>>, } impl Future for ShutdownFuture { @@ -63,13 +63,13 @@ impl Watcher { (connection_tx, Shutdown { tx: shutdown_tx }, Watcher { - shutdown_rx: shutdown_rx.take(1), - connections: connections, - queued_error: None, - shutdown: None, - done: false, - num_connections: 0, - }) + shutdown_rx: shutdown_rx.take(1), + connections: connections, + queued_error: None, + shutdown: None, + done: false, + num_connections: 0, + }) } fn process_connection(&mut self, action: connection::Action) { @@ -81,28 +81,28 @@ impl Watcher { fn poll_shutdown_requests(&mut self) -> Poll, ()> { Ok(Async::Ready(match try_ready!(self.shutdown_rx.poll()) { - Some(tx) => { - debug!("Received shutdown request."); - self.shutdown = Some(tx); - Some(()) - } - None => None, - })) + Some(tx) => { + debug!("Received shutdown request."); + self.shutdown = Some(tx); + Some(()) + } + None => None, + })) } fn poll_connections(&mut self) -> Poll, ()> { Ok(Async::Ready(match try_ready!(self.connections.poll()) { - Some(action) => { - self.process_connection(action); - Some(()) - } - None => None, - })) + Some(action) => { + self.process_connection(action); + Some(()) + } + None => None, + })) } fn poll_shutdown_requests_and_connections(&mut self) -> Poll, ()> { if let Some(e) = self.queued_error.take() { - return Err(e) + return Err(e); } match try!(self.poll_shutdown_requests()) { @@ -178,4 +178,3 @@ impl Future for Watcher { } } } - diff --git a/src/lib.rs b/src/lib.rs index 82903bb..a3d324c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -189,12 +189,12 @@ lazy_static! { fn spawn_core() -> reactor::Remote { let (tx, rx) = mpsc::channel(); thread::spawn(move || { - let mut core = reactor::Core::new().unwrap(); - tx.send(core.handle().remote().clone()).unwrap(); + let mut core = reactor::Core::new().unwrap(); + tx.send(core.handle().remote().clone()).unwrap(); - // Run forever - core.run(futures::empty::<(), !>()).unwrap(); - }); + // Run forever + core.run(futures::empty::<(), !>()).unwrap(); + }); rx.recv().unwrap() } diff --git a/src/macros.rs b/src/macros.rs index 8edd279..24cd3b4 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -1072,9 +1072,9 @@ mod functional_test { #[test] fn other_service() { let _ = env_logger::init(); - let (_, client, _) = - unwrap!(start_server_with_sync_client::(Server)); + let (_, client, _) = unwrap!( + start_server_with_sync_client::(Server) + ); match client.foo().err().expect("failed unwrap") { ::Error::RequestDeserialize(_) => {} // good bad => panic!("Expected Error::RequestDeserialize but got {}", bad), @@ -1111,7 +1111,9 @@ mod functional_test { #[test] fn bad_serialize() { - let handle = ().listen("localhost:0", server::Options::default()).unwrap(); + let handle = () + .listen("localhost:0", server::Options::default()) + .unwrap(); let client = SyncClient::connect(handle.addr(), client::Options::default()).unwrap(); client.bad(Bad).err().unwrap(); } @@ -1209,12 +1211,15 @@ mod functional_test { let _ = env_logger::init(); let reactor = reactor::Core::new().unwrap(); - let handle = Server.listen("localhost:0".first_socket_addr(), + let handle = Server + .listen("localhost:0".first_socket_addr(), &reactor.handle(), server::Options::default()) .unwrap() .0; - Server.listen(handle.addr(), &reactor.handle(), server::Options::default()).unwrap(); + Server + .listen(handle.addr(), &reactor.handle(), server::Options::default()) + .unwrap(); } #[test] @@ -1226,20 +1231,23 @@ mod functional_test { let _ = env_logger::init(); let mut reactor = reactor::Core::new().unwrap(); - let (handle, server) = Server.listen("localhost:0".first_socket_addr(), + let (handle, server) = Server + .listen("localhost:0".first_socket_addr(), &reactor.handle(), server::Options::default()) .unwrap(); reactor.handle().spawn(server); let client = FutureClient::connect(handle.addr(), - client::Options::default().handle(reactor.handle())); + client::Options::default() + .handle(reactor.handle())); let client = unwrap!(reactor.run(client)); assert_eq!(reactor.run(client.add(1, 2)).unwrap(), 3); drop(client); let client = FutureClient::connect(handle.addr(), - client::Options::default().handle(reactor.handle())); + client::Options::default() + .handle(reactor.handle())); let client = unwrap!(reactor.run(client)); assert_eq!(reactor.run(client.add(1, 2)).unwrap(), 3); } @@ -1259,13 +1267,16 @@ mod functional_test { assert_eq!("Hey, Tim.", reactor.run(client.hey("Tim".to_string())).unwrap()); - let (handle, server) = Server.listen("localhost:0".first_socket_addr(), + let (handle, server) = Server + .listen("localhost:0".first_socket_addr(), &reactor.handle(), server::Options::default()) .unwrap(); reactor.handle().spawn(server); let options = client::Options::default().handle(reactor.handle()); - let client = reactor.run(FutureClient::connect(handle.addr(), options)).unwrap(); + let client = reactor + .run(FutureClient::connect(handle.addr(), options)) + .unwrap(); assert_eq!(3, reactor.run(client.add(1, 2)).unwrap()); assert_eq!("Hey, Tim.", reactor.run(client.hey("Tim".to_string())).unwrap()); @@ -1298,16 +1309,18 @@ mod functional_test { let (_, mut reactor, client) = start_err_server_with_async_client::(ErrorServer).unwrap(); - reactor.run(client.bar() - .then(move |result| { - match result.err().unwrap() { - ::Error::App(e) => { - assert_eq!(e.description(), "lol jk"); - Ok::<_, ()>(()) - } // good - bad => panic!("Expected Error::App but got {:?}", bad), - } - })) + reactor + .run(client + .bar() + .then(move |result| { + match result.err().unwrap() { + ::Error::App(e) => { + assert_eq!(e.description(), "lol jk"); + Ok::<_, ()>(()) + } // good + bad => panic!("Expected Error::App but got {:?}", bad), + } + })) .unwrap(); } diff --git a/src/protocol.rs b/src/protocol.rs index b9d9c6d..be17c3a 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -3,11 +3,11 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use serde; use bincode::{self, Infinite}; use byteorder::{BigEndian, ReadBytesExt}; use bytes::BytesMut; use bytes::buf::BufMut; +use serde; use std::io::{self, Cursor}; use std::marker::PhantomData; use std::mem; @@ -43,10 +43,12 @@ impl Codec { fn too_big(payload_size: u64, max_payload_size: u64) -> io::Error { warn!("Not sending too-big packet of size {} (max is {})", - payload_size, max_payload_size); + payload_size, + max_payload_size); io::Error::new(io::ErrorKind::InvalidData, format!("Maximum payload size is {} bytes but got a payload of {}", - max_payload_size, payload_size)) + max_payload_size, + payload_size)) } impl Encoder for Codec @@ -66,9 +68,7 @@ impl Encoder for Codec buf.put_u64::(id); trace!("Encoded request id = {} as {:?}", id, buf); buf.put_u64::(payload_size); - bincode::serialize_into(&mut buf.writer(), - &message, - Infinite) + bincode::serialize_into(&mut buf.writer(), &message, Infinite) .map_err(|serialize_err| io::Error::new(io::ErrorKind::Other, serialize_err))?; trace!("Encoded buffer: {:?}", buf); Ok(()) @@ -121,8 +121,7 @@ impl Decoder for Codec } Payload { id, len } => { let payload = buf.split_to(len as usize); - let result = bincode::deserialize_from(&mut Cursor::new(payload), - Infinite); + let result = bincode::deserialize_from(&mut Cursor::new(payload), Infinite); // Reset the state machine because, either way, we're done processing this // message. self.state = Id; @@ -146,7 +145,7 @@ impl Proto { pub fn new(max_payload_size: u64) -> Self { Proto { max_payload_size: max_payload_size, - _phantom_data: PhantomData + _phantom_data: PhantomData, } } } @@ -190,8 +189,8 @@ fn serialize() { for _ in 0..2 { let mut codec: Codec<(char, char, char), (char, char, char)> = Codec::new(2_000_000); codec.encode(MSG, &mut buf).unwrap(); - let actual: Result)>, io::Error> = - codec.decode(&mut buf); + let actual: Result)>, + io::Error> = codec.decode(&mut buf); match actual { Ok(Some((id, ref v))) if id == MSG.0 && *v.as_ref().unwrap() == MSG.1 => {} @@ -207,7 +206,11 @@ fn deserialize_big() { let mut codec: Codec, Vec> = Codec::new(24); let mut buf = BytesMut::with_capacity(40); - assert_eq!(codec.encode((0, vec![0; 24]), &mut buf).err().unwrap().kind(), + assert_eq!(codec + .encode((0, vec![0; 24]), &mut buf) + .err() + .unwrap() + .kind(), io::ErrorKind::InvalidData); // Header diff --git a/src/sync/client.rs b/src/sync/client.rs index 4b84488..bed63eb 100644 --- a/src/sync/client.rs +++ b/src/sync/client.rs @@ -8,12 +8,12 @@ use std::io; use std::net::{SocketAddr, ToSocketAddrs}; use std::sync::mpsc; use std::thread; +#[cfg(feature = "tls")] +use tls::client::Context; use tokio_core::reactor; use tokio_proto::util::client_proxy::{ClientProxy, Receiver, pair}; use tokio_service::Service; use util::FirstSocketAddr; -#[cfg(feature = "tls")] -use tls::client::Context; #[doc(hidden)] pub struct Client { @@ -29,9 +29,7 @@ impl Clone for Client { impl fmt::Debug for Client { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { const PROXY: &'static &'static str = &"ClientProxy { .. }"; - f.debug_struct("Client") - .field("proxy", PROXY) - .finish() + f.debug_struct("Client").field("proxy", PROXY).finish() } } @@ -47,7 +45,6 @@ impl Client // oneshot send. self.proxy.call(request).wait() } - } /// Additional options to configure how the client connects and operates. @@ -61,9 +58,7 @@ pub struct Options { impl Default for Options { #[cfg(not(feature = "tls"))] fn default() -> Self { - Options { - max_payload_size: 2_000_000, - } + Options { max_payload_size: 2_000_000 } } #[cfg(feature = "tls")] @@ -137,15 +132,13 @@ impl ClientExt for Client { let addr = addr.try_first_socket_addr()?; let (connect_tx, connect_rx) = mpsc::channel(); - thread::spawn(move || { - match RequestHandler::connect(addr, options) { - Ok((proxy, mut handler)) => { - connect_tx.send(Ok(proxy)).unwrap(); - handler.handle_requests(); - } - Err(e) => connect_tx.send(Err(e)).unwrap(), - } - }); + thread::spawn(move || match RequestHandler::connect(addr, options) { + Ok((proxy, mut handler)) => { + connect_tx.send(Ok(proxy)).unwrap(); + handler.handle_requests(); + } + Err(e) => connect_tx.send(Err(e)).unwrap(), + }); Ok(connect_rx.recv().unwrap()?) } } @@ -166,14 +159,17 @@ impl RequestHandler> { /// Creates a new `RequestHandler` by connecting a `FutureClient` to the given address /// using the given options. - fn connect(addr: SocketAddr, options: Options) - -> io::Result<(Client, Self)> - { + fn connect(addr: SocketAddr, options: Options) -> io::Result<(Client, Self)> { let mut reactor = reactor::Core::new()?; let options = (reactor.handle(), options).into(); let client = reactor.run(FutureClient::connect(addr, options))?; let (proxy, requests) = pair(); - Ok((Client { proxy }, RequestHandler { reactor, client, requests })) + Ok((Client { proxy }, + RequestHandler { + reactor, + client, + requests, + })) } } @@ -182,21 +178,26 @@ impl RequestHandler Resp: Deserialize + 'static, E: Deserialize + 'static, S: Service>, - S::Future: 'static, + S::Future: 'static { fn handle_requests(&mut self) { - let RequestHandler { ref mut reactor, ref mut requests, ref mut client } = *self; + let RequestHandler { + ref mut reactor, + ref mut requests, + ref mut client, + } = *self; let handle = reactor.handle(); - let requests = requests - .map(|result| { - match result { - Ok(req) => req, - // The ClientProxy never sends Err currently - Err(e) => panic!("Unimplemented error handling in RequestHandler: {}", e), - } - }) - .for_each(|(request, response_tx)| { - let request = client.call(request) + let requests = + requests + .map(|result| { + match result { + Ok(req) => req, + // The ClientProxy never sends Err currently + Err(e) => panic!("Unimplemented error handling in RequestHandler: {}", e), + } + }) + .for_each(|(request, response_tx)| { + let request = client.call(request) .then(move |response| { // Safe to unwrap because clients always block on the response future. response_tx.send(response) @@ -204,9 +205,9 @@ impl RequestHandler .expect("Client should block on response"); Ok(()) }); - handle.spawn(request); - Ok(()) - }); + handle.spawn(request); + Ok(()) + }); reactor.run(requests).unwrap(); } } @@ -230,7 +231,11 @@ fn handle_requests() { let (request, requests) = ::futures::sync::mpsc::unbounded(); let reactor = reactor::Core::new().unwrap(); let client = Client; - let mut request_handler = RequestHandler { reactor, client, requests }; + let mut request_handler = RequestHandler { + reactor, + client, + requests, + }; // Test that `handle_requests` returns when all request senders are dropped. drop(request); request_handler.handle_requests(); diff --git a/src/sync/server.rs b/src/sync/server.rs index 7f8e946..8cb67fd 100644 --- a/src/sync/server.rs +++ b/src/sync/server.rs @@ -2,17 +2,17 @@ use {bincode, future, num_cpus}; use future::server::{Response, Shutdown}; use futures::{Future, future as futures}; use futures::sync::oneshot; +#[cfg(feature = "tls")] +use native_tls_inner::TlsAcceptor; use serde::{Deserialize, Serialize}; -use std::io; use std::fmt; +use std::io; use std::net::SocketAddr; use std::time::Duration; use std::usize; use thread_pool::{self, Sender, Task, ThreadPool}; use tokio_core::reactor; use tokio_service::{NewService, Service}; -#[cfg(feature = "tls")] -use native_tls_inner::TlsAcceptor; /// Additional options to configure how the server operates. #[derive(Debug)] @@ -91,16 +91,19 @@ impl fmt::Debug for Handle { const CORE: &'static &'static str = &"Core { .. }"; const SERVER: &'static &'static str = &"Box>"; - f.debug_struct("Handle").field("reactor", CORE) - .field("handle", &self.handle) - .field("server", SERVER) - .finish() + f.debug_struct("Handle") + .field("reactor", CORE) + .field("handle", &self.handle) + .field("server", SERVER) + .finish() } } #[doc(hidden)] -pub fn listen(new_service: S, addr: SocketAddr, options: Options) - -> io::Result +pub fn listen(new_service: S, + addr: SocketAddr, + options: Options) + -> io::Result where S: NewService, Response = Response, Error = io::Error> + 'static, @@ -117,27 +120,33 @@ pub fn listen(new_service: S, addr: SocketAddr, options: Option future::server::listen(new_service, addr, &reactor.handle(), options.opts)?; let server = Box::new(server); Ok(Handle { - reactor: reactor, - handle: handle, - server: server, - }) + reactor: reactor, + handle: handle, + server: server, + }) } /// A service that uses a thread pool. -struct NewThreadService where S: NewService { +struct NewThreadService + where S: NewService +{ new_service: S, sender: Sender::Future>>, _pool: ThreadPool::Future>>, } /// A service that runs by executing request handlers in a thread pool. -struct ThreadService where S: Service { +struct ThreadService + where S: Service +{ service: S, sender: Sender>, } /// A task that handles a single request. -struct ServiceTask where F: Future { +struct ServiceTask + where F: Future +{ future: F, tx: oneshot::Sender>, } @@ -146,12 +155,16 @@ impl NewThreadService where S: NewService, ::Future: Send + 'static, S::Response: Send, - S::Error: Send, + S::Error: Send { /// Create a NewThreadService by wrapping another service. fn new(new_service: S, pool: thread_pool::Builder) -> Self { let (sender, _pool) = pool.build(); - NewThreadService { new_service, sender, _pool } + NewThreadService { + new_service, + sender, + _pool, + } } } @@ -159,7 +172,7 @@ impl NewService for NewThreadService where S: NewService, ::Future: Send + 'static, S::Response: Send, - S::Error: Send, + S::Error: Send { type Request = S::Request; type Response = S::Response; @@ -168,16 +181,16 @@ impl NewService for NewThreadService fn new_service(&self) -> io::Result { Ok(ThreadService { - service: self.new_service.new_service()?, - sender: self.sender.clone(), - }) + service: self.new_service.new_service()?, + sender: self.sender.clone(), + }) } } impl Task for ServiceTask where F: Future + Send + 'static, F::Item: Send, - F::Error: Send, + F::Error: Send { fn run(self) { // Don't care if sending fails. It just means the request is no longer @@ -190,25 +203,26 @@ impl Service for ThreadService where S: Service, S::Future: Send + 'static, S::Response: Send, - S::Error: Send, + S::Error: Send { type Request = S::Request; type Response = S::Response; type Error = S::Error; - type Future = - futures::AndThen< - futures::MapErr< - oneshot::Receiver>, - fn(oneshot::Canceled) -> Self::Error>, - Result, - fn(Result) -> Result>; + type Future = futures::AndThen>, + fn(oneshot::Canceled) -> Self::Error>, + Result, + fn(Result) + -> Result>; fn call(&self, request: Self::Request) -> Self::Future { let (tx, rx) = oneshot::channel(); - self.sender.send(ServiceTask { - future: self.service.call(request), - tx: tx, - }).unwrap(); + self.sender + .send(ServiceTask { + future: self.service.call(request), + tx: tx, + }) + .unwrap(); rx.map_err(unreachable as _).and_then(ident) } } @@ -222,4 +236,3 @@ fn unreachable(t: T) -> U fn ident(t: T) -> T { t } - diff --git a/src/tls.rs b/src/tls.rs index 62327c8..6e9fd2b 100644 --- a/src/tls.rs +++ b/src/tls.rs @@ -19,9 +19,9 @@ pub mod client { /// validation. pub fn new>(domain: S) -> Result { Ok(Context { - domain: domain.into(), - tls_connector: TlsConnector::builder()?.build()?, - }) + domain: domain.into(), + tls_connector: TlsConnector::builder()?.build()?, + }) } /// Construct a new `Context` using the provided domain and `TlsConnector` @@ -41,11 +41,10 @@ pub mod client { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { const TLS_CONNECTOR: &'static &'static str = &"TlsConnector { .. }"; f.debug_struct("Context") - .field("domain", &self.domain) - .field("tls_connector", TLS_CONNECTOR) - .finish() + .field("domain", &self.domain) + .field("tls_connector", TLS_CONNECTOR) + .finish() } } } - diff --git a/src/util.rs b/src/util.rs index fd07b91..12bc85f 100644 --- a/src/util.rs +++ b/src/util.rs @@ -122,9 +122,7 @@ pub fn lazy(f: F, args: A) -> Lazy where F: FnOnce(A) -> R, R: IntoFuture { - Lazy { - inner: _Lazy::First(f, args), - } + Lazy { inner: _Lazy::First(f, args) } } /// A future which defers creation of the actual future until a callback is @@ -146,7 +144,7 @@ enum _Lazy { impl Lazy where F: FnOnce(A) -> R, - R: IntoFuture, + R: IntoFuture { fn get(&mut self) -> &mut R::Future { match self.inner { @@ -167,7 +165,7 @@ impl Lazy impl Future for Lazy where F: FnOnce(A) -> R, - R: IntoFuture, + R: IntoFuture { type Item = R::Item; type Error = R::Error;