From b011cbe1dc1e3daa6d9a9861b6fa0a4666336a86 Mon Sep 17 00:00:00 2001 From: Adam Wright Date: Fri, 8 Jan 2016 20:36:31 -0800 Subject: [PATCH 1/7] Sketch of Client with only one reader thread --- src/lib.rs | 86 ++++++++++++++++++------------------------------------ 1 file changed, 28 insertions(+), 58 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 0571d95..409e6ad 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,12 +22,8 @@ use std::sync::{ }; use std::sync::mpsc::{ channel, - sync_channel, Sender, - SyncSender, - Receiver, }; -use std::time; use std::thread; #[derive(Debug)] @@ -63,7 +59,7 @@ impl convert::From> for Error { pub type Result = std::result::Result; pub fn handle_conn(mut stream: TcpStream, f: Arc) -> Result<()> - where Request: fmt::Debug + serde::de::Deserialize, + where Request: fmt::Debug + serde::de::Deserialize + serde::ser::Serialize, Reply: fmt::Debug + serde::ser::Serialize, F: Serve { @@ -73,7 +69,10 @@ pub fn handle_conn(mut stream: TcpStream, f: Arc) -> Resul println!("read"); let request_packet: Packet = try!(Packet::deserialize(&mut de)); match request_packet { - Packet::Shutdown => break, + Packet::Shutdown => { + try!(serde_json::to_writer(&mut stream, &request_packet)); + break; + }, Packet::Message(id, message) => { let reply = try!(f.serve(&message)); let reply_packet = Packet::Message(id, reply); @@ -86,7 +85,7 @@ pub fn handle_conn(mut stream: TcpStream, f: Arc) -> Resul } pub fn serve(listener: TcpListener, f: Arc) -> Error - where Request: fmt::Debug + serde::de::Deserialize, + where Request: fmt::Debug + serde::de::Deserialize + fmt::Debug + serde::ser::Serialize, Reply: fmt::Debug + serde::ser::Serialize, F: 'static + Serve, { @@ -115,49 +114,21 @@ enum Packet { Shutdown, } -struct Handle { - id: u64, - sender: Sender, -} - -enum ReceiverMessage { - Handle(Handle), - Packet(Packet), - Shutdown, -} - -fn receiver(messages: Receiver>) -> Result<()> { - let mut ready_handles: HashMap> = HashMap::new(); - for message in messages.into_iter() { - match message { - ReceiverMessage::Handle(handle) => { - ready_handles.insert(handle.id, handle); - }, - ReceiverMessage::Packet(Packet::Shutdown) => break, - ReceiverMessage::Packet(Packet::Message(id, message)) => { - let handle = ready_handles.remove(&id).unwrap(); - try!(handle.sender.send(message)); - } - ReceiverMessage::Shutdown => break, - } - } - Ok(()) -} - -fn reader(stream: TcpStream, tx: SyncSender>) +fn reader( + stream: TcpStream, + requests: Arc>>>) where Reply: serde::Deserialize { - use serde_json::Error::SyntaxError; - use serde_json::ErrorCode::EOFWhileParsingValue; let mut de = serde_json::Deserializer::new(stream.bytes()); loop { match Packet::deserialize(&mut de) { - Ok(packet) =>{ - println!("send!"); - tx.send(ReceiverMessage::Packet(packet)).unwrap(); + Ok(Packet::Message(id, reply)) => { + let mut requests = requests.lock().unwrap(); + let reply_tx = requests.remove(&id).unwrap(); + reply_tx.send(reply).unwrap(); }, + Ok(Packet::Shutdown) => break, // TODO: This shutdown logic is janky.. What's the right way to do this? - Err(SyntaxError(EOFWhileParsingValue, _, _)) => break, Err(err) => panic!("unexpected error while parsing!: {:?}", err), } } @@ -169,14 +140,14 @@ fn increment(cur_id: &mut u64) -> u64 { id } -struct SyncedClientState { +struct SyncedClientState { next_id: u64, stream: TcpStream, - handles_tx: SyncSender>, } pub struct Client { - synced_state: Mutex>, + synced_state: Mutex, + requests: Arc>>>, reader_guard: thread::JoinHandle<()>, } @@ -184,19 +155,18 @@ impl Client where Reply: serde::de::Deserialize + Send + 'static { pub fn new(stream: TcpStream) -> Result { - let (handles_tx, receiver_rx) = sync_channel(0); - let read_stream = try!(stream.try_clone()); - try!(read_stream.set_read_timeout(Some(time::Duration::from_millis(50)))); - let reader_handles_tx = handles_tx.clone(); - let guard = thread::spawn(move || reader(read_stream, reader_handles_tx)); - thread::spawn(move || receiver(receiver_rx)); + let requests = Arc::new(Mutex::new(HashMap::new())); + let reader_stream = try!(stream.try_clone()); + let reader_requests = requests.clone(); + let reader_guard = + thread::spawn(move || reader(reader_stream, reader_requests)); Ok(Client{ synced_state: Mutex::new(SyncedClientState{ next_id: 0, stream: stream, - handles_tx: handles_tx, }), - reader_guard: guard, + requests: requests, + reader_guard: reader_guard, }) } @@ -206,10 +176,10 @@ impl Client let (tx, rx) = channel(); let mut state = self.synced_state.lock().unwrap(); let id = increment(&mut state.next_id); - try!(state.handles_tx.send(ReceiverMessage::Handle(Handle{ - id: id, - sender: tx, - }))); + { + let mut requests = self.requests.lock().unwrap(); + requests.insert(id, tx); + } let packet = Packet::Message(id, request.clone()); try!(serde_json::to_writer(&mut state.stream, &packet)); Ok(rx.recv().unwrap()) From f10712f16abc86c018998160a113296e8a71de2c Mon Sep 17 00:00:00 2001 From: Adam Wright Date: Fri, 8 Jan 2016 21:12:01 -0800 Subject: [PATCH 2/7] Clean shutdown except for server --- src/lib.rs | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 409e6ad..01b7d69 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,7 +11,6 @@ use std::io::{self, Read}; use std::convert; use std::collections::HashMap; use std::net::{ - self, TcpListener, TcpStream, }; @@ -70,13 +69,13 @@ pub fn handle_conn(mut stream: TcpStream, f: Arc) -> Resul let request_packet: Packet = try!(Packet::deserialize(&mut de)); match request_packet { Packet::Shutdown => { + println!("server shutting down"); try!(serde_json::to_writer(&mut stream, &request_packet)); break; }, Packet::Message(id, message) => { let reply = try!(f.serve(&message)); let reply_packet = Packet::Message(id, reply); - println!("write"); try!(serde_json::to_writer(&mut stream, &reply_packet)); }, } @@ -127,7 +126,10 @@ fn reader( let reply_tx = requests.remove(&id).unwrap(); reply_tx.send(reply).unwrap(); }, - Ok(Packet::Shutdown) => break, + Ok(Packet::Shutdown) => { + println!("reader shutting down"); + break; + } // TODO: This shutdown logic is janky.. What's the right way to do this? Err(err) => panic!("unexpected error while parsing!: {:?}", err), } @@ -186,10 +188,11 @@ impl Client } pub fn join(self) -> Result<()> { - let mut state = self.synced_state.lock().unwrap(); - let packet: Packet = Packet::Shutdown; - try!(serde_json::to_writer(&mut state.stream, &packet)); - try!(state.stream.shutdown(net::Shutdown::Both)); + { + let mut state = self.synced_state.lock().unwrap(); + let packet: Packet = Packet::Shutdown; + try!(serde_json::to_writer(&mut state.stream, &packet)); + } self.reader_guard.join().unwrap(); Ok(()) } @@ -259,10 +262,13 @@ mod test { assert_eq!(1, server.count()); assert_eq!(Reply::Increment(1), client.rpc(&Request::Increment).unwrap()); assert_eq!(2, server.count()); + println!("joining client"); client.join::().unwrap(); + println!("joining server"); guard.join(); } + /* struct BarrierServer { barrier: Barrier, inner: Server, @@ -309,4 +315,5 @@ mod test { client.join::().unwrap(); guard.join(); } + */ } From e266aa81dc46fe756ba758ca9c13c2ee1f9d7f7a Mon Sep 17 00:00:00 2001 From: Adam Wright Date: Sat, 9 Jan 2016 00:27:07 -0800 Subject: [PATCH 3/7] LOL Shutdown works LOL --- src/lib.rs | 97 ++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 80 insertions(+), 17 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 01b7d69..798cf25 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ use std::collections::HashMap; use std::net::{ TcpListener, TcpStream, + SocketAddr, }; use std::sync::{ self, @@ -22,8 +23,13 @@ use std::sync::{ use std::sync::mpsc::{ channel, Sender, + Receiver, + TryRecvError, +}; +use std::thread::{ + self, + JoinHandle, }; -use std::thread; #[derive(Debug)] pub enum Error { @@ -83,24 +89,63 @@ pub fn handle_conn(mut stream: TcpStream, f: Arc) -> Resul Ok(()) } -pub fn serve(listener: TcpListener, f: Arc) -> Error + +pub struct Shutdown { + tx: Sender<()>, + join_handle: JoinHandle<()>, + addr: SocketAddr, +} + + +impl Shutdown { + fn wait(self) { + self.join_handle.join().unwrap(); + } + + fn shutdown(self) { + self.tx.send(()).expect(&line!().to_string()); + TcpStream::connect(&self.addr).unwrap(); + self.join_handle.join().expect(&line!().to_string()); + } +} + +pub fn serve_async(addr: &SocketAddr, f: Arc) -> io::Result where Request: fmt::Debug + serde::de::Deserialize + fmt::Debug + serde::ser::Serialize, Reply: fmt::Debug + serde::ser::Serialize, F: 'static + Serve, { - for conn in listener.incoming() { - let conn = match conn { - Err(err) => return convert::From::from(err), - Ok(c) => c, - }; - let f = f.clone(); - thread::spawn(move || { - if let Err(err) = handle_conn(conn, f) { - println!("error handling connection: {:?}", err); + let listener = try!(TcpListener::bind(addr)); + let (die_tx, die_rx) = channel(); + let join_handle = thread::spawn(move || { + for conn in listener.incoming() { + match die_rx.try_recv() { + Ok(_) => break, + Err(TryRecvError::Disconnected) => { + println!("serve: sender disconnected "); + break; + }, + _ => (), } - }); - } - Error::Impossible + let conn = match conn { + Err(err) => { + println!("I couldn't unwrap the connection :( {:?}", err); + return; + }, + Ok(c) => c, + }; + let f = f.clone(); + thread::spawn(move || { + if let Err(err) = handle_conn(conn, f) { + println!("error handling connection: {:?}", err); + } + }); + } + }); + Ok(Shutdown{ + tx: die_tx, + join_handle: join_handle, + addr: addr.clone(), + }) } pub trait Serve: Send + Sync { @@ -187,7 +232,7 @@ impl Client Ok(rx.recv().unwrap()) } - pub fn join(self) -> Result<()> { + pub fn disconnect(self) -> Result<()> { { let mut state = self.synced_state.lock().unwrap(); let packet: Packet = Packet::Shutdown; @@ -202,14 +247,21 @@ impl Client mod test { use super::*; use std::io; - use std::net::{TcpStream, TcpListener, SocketAddr}; + use std::net::{TcpStream, TcpListener, SocketAddr, ToSocketAddrs}; use std::str::FromStr; use std::sync::{Arc, Mutex, Barrier}; + use std::sync::mpsc::channel; use std::sync::atomic::{AtomicUsize, Ordering}; use std::thread; const port: AtomicUsize = AtomicUsize::new(10000); + fn next_addr() -> SocketAddr { + let addr = format!("127.0.0.1:{}", port.fetch_add(1, Ordering::SeqCst)); + addr.to_socket_addrs().unwrap().next().unwrap() + //ToSocketAddrs::to_socket_addrs(addr.as_ref()).unwrap().next().unwrap() + } + fn pair() -> (TcpStream, TcpListener) { let addr = format!("127.0.0.1:{}", port.fetch_add(1, Ordering::SeqCst)); println!("what the fuck {}", &addr); @@ -251,6 +303,18 @@ mod test { } } + #[test] + fn test_handle() { + let addr = next_addr(); + let server = Arc::new(Server::new()); + let srv_shutdown = serve_async(&addr, server).unwrap(); + let client_stream = TcpStream::connect(&addr).unwrap(); + let client: Client = Client::new(client_stream).expect(&line!().to_string()); + client.disconnect::(); + srv_shutdown.shutdown(); + } + + /* #[test] fn test() { let (client_stream, server_streams) = pair(); @@ -268,7 +332,6 @@ mod test { guard.join(); } - /* struct BarrierServer { barrier: Barrier, inner: Server, From 47828e2c8e63dcefc4a0eeb9bf2108ec0a3211c8 Mon Sep 17 00:00:00 2001 From: Adam Wright Date: Sat, 9 Jan 2016 00:45:30 -0800 Subject: [PATCH 4/7] Tests pass, hooray --- src/lib.rs | 48 +++++++++++++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 798cf25..c2b6708 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,7 +63,7 @@ impl convert::From> for Error { pub type Result = std::result::Result; -pub fn handle_conn(mut stream: TcpStream, f: Arc) -> Result<()> +pub fn handle_conn(mut stream: TcpStream, f: F) -> Result<()> where Request: fmt::Debug + serde::de::Deserialize + serde::ser::Serialize, Reply: fmt::Debug + serde::ser::Serialize, F: Serve @@ -71,11 +71,9 @@ pub fn handle_conn(mut stream: TcpStream, f: Arc) -> Resul let read_stream = try!(stream.try_clone()); let mut de = serde_json::Deserializer::new(read_stream.bytes()); loop { - println!("read"); let request_packet: Packet = try!(Packet::deserialize(&mut de)); match request_packet { Packet::Shutdown => { - println!("server shutting down"); try!(serde_json::to_writer(&mut stream, &request_packet)); break; }, @@ -109,10 +107,10 @@ impl Shutdown { } } -pub fn serve_async(addr: &SocketAddr, f: Arc) -> io::Result +pub fn serve_async(addr: &SocketAddr, f: F) -> io::Result where Request: fmt::Debug + serde::de::Deserialize + fmt::Debug + serde::ser::Serialize, Reply: fmt::Debug + serde::ser::Serialize, - F: 'static + Serve, + F: 'static + Clone + Serve, { let listener = try!(TcpListener::bind(addr)); let (die_tx, die_rx) = channel(); @@ -152,6 +150,14 @@ pub trait Serve: Send + Sync { fn serve(&self, request: &Request) -> io::Result; } +impl Serve for Arc + where S: Serve +{ + fn serve(&self, request: &Request) -> io::Result { + S::serve(self, request) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] enum Packet { Message(u64, T), @@ -172,7 +178,6 @@ fn reader( reply_tx.send(reply).unwrap(); }, Ok(Packet::Shutdown) => { - println!("reader shutting down"); break; } // TODO: This shutdown logic is janky.. What's the right way to do this? @@ -245,7 +250,9 @@ impl Client #[cfg(test)] mod test { + use serde; use super::*; + use std::fmt; use std::io; use std::net::{TcpStream, TcpListener, SocketAddr, ToSocketAddrs}; use std::str::FromStr; @@ -303,35 +310,42 @@ mod test { } } + fn wtf(server: F) -> (SocketAddr, Shutdown) + where Request: fmt::Debug + serde::de::Deserialize + fmt::Debug + serde::ser::Serialize, + Reply: fmt::Debug + serde::ser::Serialize, + F: 'static + Clone + Serve + { + let mut addr; + let mut shutdown; + while let &Err(_) = {shutdown = serve_async({addr = next_addr(); &addr}, server.clone()); &shutdown} { } + (addr, shutdown.unwrap()) + } + #[test] fn test_handle() { - let addr = next_addr(); let server = Arc::new(Server::new()); - let srv_shutdown = serve_async(&addr, server).unwrap(); + let (addr, shutdown) = wtf(server.clone()); let client_stream = TcpStream::connect(&addr).unwrap(); let client: Client = Client::new(client_stream).expect(&line!().to_string()); client.disconnect::(); - srv_shutdown.shutdown(); + shutdown.shutdown(); } - /* #[test] fn test() { - let (client_stream, server_streams) = pair(); let server = Arc::new(Server::new()); - let thread_server = server.clone(); - let guard = thread::spawn(move || serve(server_streams, thread_server)); + let (addr, shutdown) = wtf(server.clone()); + let client_stream = TcpStream::connect(&addr).unwrap(); let client = Client::new(client_stream).unwrap(); assert_eq!(Reply::Increment(0), client.rpc(&Request::Increment).unwrap()); assert_eq!(1, server.count()); assert_eq!(Reply::Increment(1), client.rpc(&Request::Increment).unwrap()); assert_eq!(2, server.count()); - println!("joining client"); - client.join::().unwrap(); - println!("joining server"); - guard.join(); + client.disconnect::().unwrap(); + shutdown.shutdown(); } + /* struct BarrierServer { barrier: Barrier, inner: Server, From 55f1686e529af861c5cf4027d02fcfd738b054c0 Mon Sep 17 00:00:00 2001 From: Adam Wright Date: Sat, 9 Jan 2016 01:15:29 -0800 Subject: [PATCH 5/7] Concurrency actually works --- src/lib.rs | 50 ++++++++++++++++++++++++-------------------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index c2b6708..99b69b6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -64,23 +64,31 @@ impl convert::From> for Error { pub type Result = std::result::Result; pub fn handle_conn(mut stream: TcpStream, f: F) -> Result<()> - where Request: fmt::Debug + serde::de::Deserialize + serde::ser::Serialize, - Reply: fmt::Debug + serde::ser::Serialize, - F: Serve + where Request: 'static + fmt::Debug + Send + serde::de::Deserialize + serde::ser::Serialize, + Reply: 'static + fmt::Debug + serde::ser::Serialize, + F: 'static + Clone + Serve { let read_stream = try!(stream.try_clone()); let mut de = serde_json::Deserializer::new(read_stream.bytes()); + let stream = Arc::new(Mutex::new(stream)); loop { let request_packet: Packet = try!(Packet::deserialize(&mut de)); match request_packet { Packet::Shutdown => { - try!(serde_json::to_writer(&mut stream, &request_packet)); + let stream = stream.clone(); + let mut my_stream = stream.lock().unwrap(); + try!(serde_json::to_writer(&mut *my_stream, &request_packet)); break; }, Packet::Message(id, message) => { - let reply = try!(f.serve(&message)); - let reply_packet = Packet::Message(id, reply); - try!(serde_json::to_writer(&mut stream, &reply_packet)); + let f = f.clone(); + let arc_stream = stream.clone(); + thread::spawn(move || { + let reply = f.serve(&message).unwrap(); + let reply_packet = Packet::Message(id, reply); + let mut my_stream = arc_stream.lock().unwrap(); + serde_json::to_writer(&mut *my_stream, &reply_packet).unwrap(); + }); }, } } @@ -108,8 +116,8 @@ impl Shutdown { } pub fn serve_async(addr: &SocketAddr, f: F) -> io::Result - where Request: fmt::Debug + serde::de::Deserialize + fmt::Debug + serde::ser::Serialize, - Reply: fmt::Debug + serde::ser::Serialize, + where Request: 'static + fmt::Debug + Send + serde::de::Deserialize + fmt::Debug + serde::ser::Serialize, + Reply: 'static + fmt::Debug + serde::ser::Serialize, F: 'static + Clone + Serve, { let listener = try!(TcpListener::bind(addr)); @@ -234,6 +242,7 @@ impl Client } let packet = Packet::Message(id, request.clone()); try!(serde_json::to_writer(&mut state.stream, &packet)); + drop(state); Ok(rx.recv().unwrap()) } @@ -269,14 +278,6 @@ mod test { //ToSocketAddrs::to_socket_addrs(addr.as_ref()).unwrap().next().unwrap() } - fn pair() -> (TcpStream, TcpListener) { - let addr = format!("127.0.0.1:{}", port.fetch_add(1, Ordering::SeqCst)); - println!("what the fuck {}", &addr); - // Do this one first so that we don't get connection refused :) - let listener = TcpListener::bind(&*addr).unwrap(); - (TcpStream::connect(&*addr).unwrap(), listener) - } - #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] enum Request { Increment @@ -311,8 +312,8 @@ mod test { } fn wtf(server: F) -> (SocketAddr, Shutdown) - where Request: fmt::Debug + serde::de::Deserialize + fmt::Debug + serde::ser::Serialize, - Reply: fmt::Debug + serde::ser::Serialize, + where Request: 'static + fmt::Debug + Send + serde::de::Deserialize + fmt::Debug + serde::ser::Serialize, + Reply: 'static + fmt::Debug + Send + serde::ser::Serialize, F: 'static + Clone + Serve { let mut addr; @@ -345,7 +346,6 @@ mod test { shutdown.shutdown(); } - /* struct BarrierServer { barrier: Barrier, inner: Server, @@ -371,10 +371,9 @@ mod test { #[test] fn test_concurrent() { - let (client_stream, server_streams) = pair(); let server = Arc::new(BarrierServer::new(10)); - let thread_server = server.clone(); - let guard = thread::spawn(move || serve(server_streams, thread_server)); + let (addr, shutdown) = wtf(server.clone()); + let client_stream = TcpStream::connect(&addr).unwrap(); let client: Arc> = Arc::new(Client::new(client_stream).unwrap()); let mut join_handles = vec![]; for _ in 0..10 { @@ -389,8 +388,7 @@ mod test { Err(_) => panic!("couldn't unwrap arc"), Ok(c) => c, }; - client.join::().unwrap(); - guard.join(); + client.disconnect::().unwrap(); + shutdown.shutdown(); } - */ } From 213963ddda6778f8ed27cf431b0d8bd9a2071fbf Mon Sep 17 00:00:00 2001 From: Adam Wright Date: Sat, 9 Jan 2016 01:16:59 -0800 Subject: [PATCH 6/7] Clean up those unused thingies --- src/lib.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 99b69b6..496ab40 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,7 +23,6 @@ use std::sync::{ use std::sync::mpsc::{ channel, Sender, - Receiver, TryRecvError, }; use std::thread::{ @@ -63,7 +62,7 @@ impl convert::From> for Error { pub type Result = std::result::Result; -pub fn handle_conn(mut stream: TcpStream, f: F) -> Result<()> +pub fn handle_conn(stream: TcpStream, f: F) -> Result<()> where Request: 'static + fmt::Debug + Send + serde::de::Deserialize + serde::ser::Serialize, Reply: 'static + fmt::Debug + serde::ser::Serialize, F: 'static + Clone + Serve @@ -104,11 +103,11 @@ pub struct Shutdown { impl Shutdown { - fn wait(self) { + pub fn wait(self) { self.join_handle.join().unwrap(); } - fn shutdown(self) { + pub fn shutdown(self) { self.tx.send(()).expect(&line!().to_string()); TcpStream::connect(&self.addr).unwrap(); self.join_handle.join().expect(&line!().to_string()); From df8d0f23033580c605d0513809f6ad81e13db12a Mon Sep 17 00:00:00 2001 From: Adam Wright Date: Sat, 9 Jan 2016 01:16:59 -0800 Subject: [PATCH 7/7] Clean up those unused thingies Closes #2. --- src/lib.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 99b69b6..496ab40 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,7 +23,6 @@ use std::sync::{ use std::sync::mpsc::{ channel, Sender, - Receiver, TryRecvError, }; use std::thread::{ @@ -63,7 +62,7 @@ impl convert::From> for Error { pub type Result = std::result::Result; -pub fn handle_conn(mut stream: TcpStream, f: F) -> Result<()> +pub fn handle_conn(stream: TcpStream, f: F) -> Result<()> where Request: 'static + fmt::Debug + Send + serde::de::Deserialize + serde::ser::Serialize, Reply: 'static + fmt::Debug + serde::ser::Serialize, F: 'static + Clone + Serve @@ -104,11 +103,11 @@ pub struct Shutdown { impl Shutdown { - fn wait(self) { + pub fn wait(self) { self.join_handle.join().unwrap(); } - fn shutdown(self) { + pub fn shutdown(self) { self.tx.send(()).expect(&line!().to_string()); TcpStream::connect(&self.addr).unwrap(); self.join_handle.join().expect(&line!().to_string());