From 55f1686e529af861c5cf4027d02fcfd738b054c0 Mon Sep 17 00:00:00 2001 From: Adam Wright Date: Sat, 9 Jan 2016 01:15:29 -0800 Subject: [PATCH] Concurrency actually works --- src/lib.rs | 50 ++++++++++++++++++++++++-------------------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index c2b6708..99b69b6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -64,23 +64,31 @@ impl convert::From> for Error { pub type Result = std::result::Result; pub fn handle_conn(mut stream: TcpStream, f: F) -> Result<()> - where Request: fmt::Debug + serde::de::Deserialize + serde::ser::Serialize, - Reply: fmt::Debug + serde::ser::Serialize, - F: Serve + where Request: 'static + fmt::Debug + Send + serde::de::Deserialize + serde::ser::Serialize, + Reply: 'static + fmt::Debug + serde::ser::Serialize, + F: 'static + Clone + Serve { let read_stream = try!(stream.try_clone()); let mut de = serde_json::Deserializer::new(read_stream.bytes()); + let stream = Arc::new(Mutex::new(stream)); loop { let request_packet: Packet = try!(Packet::deserialize(&mut de)); match request_packet { Packet::Shutdown => { - try!(serde_json::to_writer(&mut stream, &request_packet)); + let stream = stream.clone(); + let mut my_stream = stream.lock().unwrap(); + try!(serde_json::to_writer(&mut *my_stream, &request_packet)); break; }, Packet::Message(id, message) => { - let reply = try!(f.serve(&message)); - let reply_packet = Packet::Message(id, reply); - try!(serde_json::to_writer(&mut stream, &reply_packet)); + let f = f.clone(); + let arc_stream = stream.clone(); + thread::spawn(move || { + let reply = f.serve(&message).unwrap(); + let reply_packet = Packet::Message(id, reply); + let mut my_stream = arc_stream.lock().unwrap(); + serde_json::to_writer(&mut *my_stream, &reply_packet).unwrap(); + }); }, } } @@ -108,8 +116,8 @@ impl Shutdown { } pub fn serve_async(addr: &SocketAddr, f: F) -> io::Result - where Request: fmt::Debug + serde::de::Deserialize + fmt::Debug + serde::ser::Serialize, - Reply: fmt::Debug + serde::ser::Serialize, + where Request: 'static + fmt::Debug + Send + serde::de::Deserialize + fmt::Debug + serde::ser::Serialize, + Reply: 'static + fmt::Debug + serde::ser::Serialize, F: 'static + Clone + Serve, { let listener = try!(TcpListener::bind(addr)); @@ -234,6 +242,7 @@ impl Client } let packet = Packet::Message(id, request.clone()); try!(serde_json::to_writer(&mut state.stream, &packet)); + drop(state); Ok(rx.recv().unwrap()) } @@ -269,14 +278,6 @@ mod test { //ToSocketAddrs::to_socket_addrs(addr.as_ref()).unwrap().next().unwrap() } - fn pair() -> (TcpStream, TcpListener) { - let addr = format!("127.0.0.1:{}", port.fetch_add(1, Ordering::SeqCst)); - println!("what the fuck {}", &addr); - // Do this one first so that we don't get connection refused :) - let listener = TcpListener::bind(&*addr).unwrap(); - (TcpStream::connect(&*addr).unwrap(), listener) - } - #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] enum Request { Increment @@ -311,8 +312,8 @@ mod test { } fn wtf(server: F) -> (SocketAddr, Shutdown) - where Request: fmt::Debug + serde::de::Deserialize + fmt::Debug + serde::ser::Serialize, - Reply: fmt::Debug + serde::ser::Serialize, + where Request: 'static + fmt::Debug + Send + serde::de::Deserialize + fmt::Debug + serde::ser::Serialize, + Reply: 'static + fmt::Debug + Send + serde::ser::Serialize, F: 'static + Clone + Serve { let mut addr; @@ -345,7 +346,6 @@ mod test { shutdown.shutdown(); } - /* struct BarrierServer { barrier: Barrier, inner: Server, @@ -371,10 +371,9 @@ mod test { #[test] fn test_concurrent() { - let (client_stream, server_streams) = pair(); let server = Arc::new(BarrierServer::new(10)); - let thread_server = server.clone(); - let guard = thread::spawn(move || serve(server_streams, thread_server)); + let (addr, shutdown) = wtf(server.clone()); + let client_stream = TcpStream::connect(&addr).unwrap(); let client: Arc> = Arc::new(Client::new(client_stream).unwrap()); let mut join_handles = vec![]; for _ in 0..10 { @@ -389,8 +388,7 @@ mod test { Err(_) => panic!("couldn't unwrap arc"), Ok(c) => c, }; - client.join::().unwrap(); - guard.join(); + client.disconnect::().unwrap(); + shutdown.shutdown(); } - */ }