diff --git a/tarpc/src/protocol/server.rs b/tarpc/src/protocol/server.rs index ce165d1..1b65faa 100644 --- a/tarpc/src/protocol/server.rs +++ b/tarpc/src/protocol/server.rs @@ -5,7 +5,7 @@ use std::fmt; use std::io::{self, BufReader, BufWriter, Write}; use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs}; use std::sync::{Condvar, Mutex}; -use std::sync::mpsc::{Sender, TryRecvError, channel}; +use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel}; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use std::thread::{self, JoinHandle}; @@ -15,7 +15,7 @@ struct ConnectionHandler<'a, S> where S: Serve { read_stream: BufReader, - write_stream: Mutex>, + write_stream: BufWriter, shutdown: &'a AtomicBool, inflight_rpcs: &'a InflightRpcs, server: S, @@ -33,37 +33,28 @@ impl<'a, S> ConnectionHandler<'a, S> where S: Serve { fn handle_conn(&mut self) -> Result<()> { let ConnectionHandler { ref mut read_stream, - ref write_stream, + ref mut write_stream, shutdown, - inflight_rpcs, + ref inflight_rpcs, ref server, pool, } = *self; trace!("ConnectionHandler: serving client..."); pool.scoped(|scope| { + let (tx, rx) = channel(); + scope.execute(|| Self::write(rx, write_stream, inflight_rpcs)); loop { match bincode::serde::deserialize_from(read_stream, bincode::SizeLimit::Infinite) { Ok(Packet { rpc_id, message, }) => { inflight_rpcs.increment(); + let tx = tx.clone(); scope.execute(move || { let reply = server.serve(message); let reply_packet = Packet { rpc_id: rpc_id, message: reply }; - let mut write_stream = write_stream.lock().unwrap(); - if let Err(e) = - bincode::serde::serialize_into(&mut *write_stream, - &reply_packet, - bincode::SizeLimit::Infinite) { - warn!("ConnectionHandler: failed to write reply to Client: {:?}", - e); - } - if let Err(e) = write_stream.flush() { - warn!("ConnectionHandler: failed to flush reply to Client: {:?}", - e); - } - inflight_rpcs.decrement(); + tx.send(reply_packet).unwrap(); }); if shutdown.load(Ordering::SeqCst) { info!("ConnectionHandler: server shutdown, so closing connection."); @@ -101,6 +92,33 @@ impl<'a, S> ConnectionHandler<'a, S> where S: Serve { _ => false, } } + + fn write(rx: Receiver::Reply>>, + stream: &mut BufWriter, + inflight_rpcs: &InflightRpcs) { + loop { + match rx.recv() { + Err(e) => { + debug!("Write thread: returning due to {:?}", e); + return; + } + Ok(reply_packet) => { + if let Err(e) = + bincode::serde::serialize_into(stream, + &reply_packet, + bincode::SizeLimit::Infinite) { + warn!("Writer: failed to write reply to Client: {:?}", + e); + } + if let Err(e) = stream.flush() { + warn!("Writer: failed to flush reply to Client: {:?}", + e); + } + inflight_rpcs.decrement(); + } + } + } + } } struct InflightRpcs { @@ -219,7 +237,7 @@ pub fn serve_async(addr: A, scope.execute(|| { let mut handler = ConnectionHandler { read_stream: BufReader::new(conn.try_clone().unwrap()), - write_stream: Mutex::new(BufWriter::new(conn)), + write_stream: BufWriter::new(conn), shutdown: &shutdown, inflight_rpcs: &inflight_rpcs, server: &server, @@ -244,7 +262,7 @@ pub trait Serve: Send + Sync { /// The type of request received by the server type Request: 'static + fmt::Debug + serde::ser::Serialize + serde::de::Deserialize + Send; /// The type of reply sent by the server - type Reply: 'static + fmt::Debug + serde::ser::Serialize + serde::de::Deserialize; + type Reply : 'static + fmt::Debug + serde::ser::Serialize + serde::de::Deserialize + Send; /// Return a reply for a given request fn serve(&self, request: Self::Request) -> Self::Reply;