diff --git a/src/lib.rs b/src/lib.rs index 798cf25..c2b6708 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,7 +63,7 @@ impl convert::From> for Error { pub type Result = std::result::Result; -pub fn handle_conn(mut stream: TcpStream, f: Arc) -> Result<()> +pub fn handle_conn(mut stream: TcpStream, f: F) -> Result<()> where Request: fmt::Debug + serde::de::Deserialize + serde::ser::Serialize, Reply: fmt::Debug + serde::ser::Serialize, F: Serve @@ -71,11 +71,9 @@ pub fn handle_conn(mut stream: TcpStream, f: Arc) -> Resul let read_stream = try!(stream.try_clone()); let mut de = serde_json::Deserializer::new(read_stream.bytes()); loop { - println!("read"); let request_packet: Packet = try!(Packet::deserialize(&mut de)); match request_packet { Packet::Shutdown => { - println!("server shutting down"); try!(serde_json::to_writer(&mut stream, &request_packet)); break; }, @@ -109,10 +107,10 @@ impl Shutdown { } } -pub fn serve_async(addr: &SocketAddr, f: Arc) -> io::Result +pub fn serve_async(addr: &SocketAddr, f: F) -> io::Result where Request: fmt::Debug + serde::de::Deserialize + fmt::Debug + serde::ser::Serialize, Reply: fmt::Debug + serde::ser::Serialize, - F: 'static + Serve, + F: 'static + Clone + Serve, { let listener = try!(TcpListener::bind(addr)); let (die_tx, die_rx) = channel(); @@ -152,6 +150,14 @@ pub trait Serve: Send + Sync { fn serve(&self, request: &Request) -> io::Result; } +impl Serve for Arc + where S: Serve +{ + fn serve(&self, request: &Request) -> io::Result { + S::serve(self, request) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] enum Packet { Message(u64, T), @@ -172,7 +178,6 @@ fn reader( reply_tx.send(reply).unwrap(); }, Ok(Packet::Shutdown) => { - println!("reader shutting down"); break; } // TODO: This shutdown logic is janky.. What's the right way to do this? @@ -245,7 +250,9 @@ impl Client #[cfg(test)] mod test { + use serde; use super::*; + use std::fmt; use std::io; use std::net::{TcpStream, TcpListener, SocketAddr, ToSocketAddrs}; use std::str::FromStr; @@ -303,35 +310,42 @@ mod test { } } + fn wtf(server: F) -> (SocketAddr, Shutdown) + where Request: fmt::Debug + serde::de::Deserialize + fmt::Debug + serde::ser::Serialize, + Reply: fmt::Debug + serde::ser::Serialize, + F: 'static + Clone + Serve + { + let mut addr; + let mut shutdown; + while let &Err(_) = {shutdown = serve_async({addr = next_addr(); &addr}, server.clone()); &shutdown} { } + (addr, shutdown.unwrap()) + } + #[test] fn test_handle() { - let addr = next_addr(); let server = Arc::new(Server::new()); - let srv_shutdown = serve_async(&addr, server).unwrap(); + let (addr, shutdown) = wtf(server.clone()); let client_stream = TcpStream::connect(&addr).unwrap(); let client: Client = Client::new(client_stream).expect(&line!().to_string()); client.disconnect::(); - srv_shutdown.shutdown(); + shutdown.shutdown(); } - /* #[test] fn test() { - let (client_stream, server_streams) = pair(); let server = Arc::new(Server::new()); - let thread_server = server.clone(); - let guard = thread::spawn(move || serve(server_streams, thread_server)); + let (addr, shutdown) = wtf(server.clone()); + let client_stream = TcpStream::connect(&addr).unwrap(); let client = Client::new(client_stream).unwrap(); assert_eq!(Reply::Increment(0), client.rpc(&Request::Increment).unwrap()); assert_eq!(1, server.count()); assert_eq!(Reply::Increment(1), client.rpc(&Request::Increment).unwrap()); assert_eq!(2, server.count()); - println!("joining client"); - client.join::().unwrap(); - println!("joining server"); - guard.join(); + client.disconnect::().unwrap(); + shutdown.shutdown(); } + /* struct BarrierServer { barrier: Barrier, inner: Server,