Make read_stream a field of ConnectionHandler

This commit is contained in:
Tim Kuehn
2016-01-25 21:35:47 -08:00
parent 3cff1c0b62
commit 5d50ccfb6f

View File

@@ -98,6 +98,7 @@ impl InflightRpcs {
}
struct ConnectionHandler<'a> {
read_stream: TcpStream,
write_stream: Mutex<TcpStream>,
shutdown: &'a AtomicBool,
inflight_rpcs: &'a InflightRpcs,
@@ -121,27 +122,32 @@ impl<'a> ConnectionHandler<'a> {
bincode::serde::deserialize_from(read_stream, bincode::SizeLimit::Infinite)
}
fn handle_conn<F>(&mut self, mut read_stream: TcpStream, f: F) -> Result<()>
fn handle_conn<F>(&mut self, f: F) -> Result<()>
where F: Serve
{
let ConnectionHandler {
ref mut read_stream,
ref write_stream,
shutdown,
inflight_rpcs,
timeout
} = *self;
let f = &f;
trace!("ConnectionHandler: serving client...");
crossbeam::scope(|scope| {
loop {
match Self::read(&mut read_stream, self.timeout) {
match Self::read(read_stream, timeout) {
Ok(Packet { rpc_id, message, }) => {
let inflight_rpcs = &self.inflight_rpcs;
inflight_rpcs.increment();
let stream = &self.write_stream;
scope.spawn(move || {
let reply = f.serve(message);
let reply_packet = Packet {
rpc_id: rpc_id,
message: reply
};
let mut stream = stream.lock().unwrap();
let mut write_stream = write_stream.lock().unwrap();
if let Err(e) =
bincode::serde::serialize_into(&mut *stream,
bincode::serde::serialize_into(&mut *write_stream,
&reply_packet,
bincode::SizeLimit::Infinite) {
warn!("ConnectionHandler: failed to write reply to Client: {:?}",
@@ -149,14 +155,14 @@ impl<'a> ConnectionHandler<'a> {
}
inflight_rpcs.decrement();
});
if self.shutdown.load(Ordering::SeqCst) {
if shutdown.load(Ordering::SeqCst) {
info!("ConnectionHandler: server shutdown, so closing connection.");
break;
}
}
Err(bincode::serde::DeserializeError::IoError(ref err))
if Self::timed_out(err.kind()) => {
if !self.shutdown.load(Ordering::SeqCst) {
if !shutdown.load(Ordering::SeqCst) {
info!("ConnectionHandler: read timed out ({:?}). Server not \
shutdown, so retrying read.",
err);
@@ -256,12 +262,13 @@ pub fn serve_async<A, F>(addr: A, f: F, read_timeout: Option<Duration>) -> io::R
inflight_rpcs.increment();
scope.spawn(|| {
let mut handler = ConnectionHandler {
write_stream: Mutex::new(conn.try_clone().unwrap()),
read_stream: conn.try_clone().unwrap(),
write_stream: Mutex::new(conn),
shutdown: &shutdown,
inflight_rpcs: &inflight_rpcs,
timeout: read_timeout,
};
if let Err(err) = handler.handle_conn(conn, &f) {
if let Err(err) = handler.handle_conn(&f) {
info!("ConnectionHandler: err in connection handling: {:?}", err);
}
});