Make 'f' a field of ConnectionHandler and rename ==> server

This commit is contained in:
Tim Kuehn
2016-01-25 21:49:31 -08:00
parent 5d50ccfb6f
commit bb53b5733f

View File

@@ -97,22 +97,23 @@ impl InflightRpcs {
}
struct ConnectionHandler<'a> {
struct ConnectionHandler<'a, Server> where Server: Serve {
read_stream: TcpStream,
write_stream: Mutex<TcpStream>,
shutdown: &'a AtomicBool,
inflight_rpcs: &'a InflightRpcs,
timeout: Option<Duration>,
server: Server,
}
impl<'a> Drop for ConnectionHandler<'a> {
impl<'a, Server> Drop for ConnectionHandler<'a, Server> where Server: Serve {
fn drop(&mut self) {
trace!("ConnectionHandler: finished serving client.");
self.inflight_rpcs.decrement_and_notify();
}
}
impl<'a> ConnectionHandler<'a> {
impl<'a, Server> ConnectionHandler<'a, Server> where Server: Serve {
fn read<Request>(read_stream: &mut TcpStream,
timeout: Option<Duration>)
-> bincode::serde::DeserializeResult<Packet<Request>>
@@ -122,17 +123,16 @@ impl<'a> ConnectionHandler<'a> {
bincode::serde::deserialize_from(read_stream, bincode::SizeLimit::Infinite)
}
fn handle_conn<F>(&mut self, f: F) -> Result<()>
where F: Serve
fn handle_conn(&mut self) -> Result<()>
{
let ConnectionHandler {
ref mut read_stream,
ref write_stream,
shutdown,
inflight_rpcs,
timeout
timeout,
ref server,
} = *self;
let f = &f;
trace!("ConnectionHandler: serving client...");
crossbeam::scope(|scope| {
loop {
@@ -140,7 +140,7 @@ impl<'a> ConnectionHandler<'a> {
Ok(Packet { rpc_id, message, }) => {
inflight_rpcs.increment();
scope.spawn(move || {
let reply = f.serve(message);
let reply = server.serve(message);
let reply_packet = Packet {
rpc_id: rpc_id,
message: reply
@@ -225,9 +225,9 @@ impl ServeHandle {
}
/// Start
pub fn serve_async<A, F>(addr: A, f: F, read_timeout: Option<Duration>) -> io::Result<ServeHandle>
pub fn serve_async<A, Server>(addr: A, server: Server, read_timeout: Option<Duration>) -> io::Result<ServeHandle>
where A: ToSocketAddrs,
F: 'static + Serve
Server: 'static + Serve
{
let listener = try!(TcpListener::bind(&addr));
let addr = try!(listener.local_addr());
@@ -267,8 +267,9 @@ pub fn serve_async<A, F>(addr: A, f: F, read_timeout: Option<Duration>) -> io::R
shutdown: &shutdown,
inflight_rpcs: &inflight_rpcs,
timeout: read_timeout,
server: &server,
};
if let Err(err) = handler.handle_conn(&f) {
if let Err(err) = handler.handle_conn() {
info!("ConnectionHandler: err in connection handling: {:?}", err);
}
});