mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-01-01 17:14:32 +01:00
Server: use a writer thread for each open connection
This commit is contained in:
@@ -5,7 +5,7 @@ use std::fmt;
|
||||
use std::io::{self, BufReader, BufWriter, Write};
|
||||
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
|
||||
use std::sync::{Condvar, Mutex};
|
||||
use std::sync::mpsc::{Sender, TryRecvError, channel};
|
||||
use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::Duration;
|
||||
use std::thread::{self, JoinHandle};
|
||||
@@ -15,7 +15,7 @@ struct ConnectionHandler<'a, S>
|
||||
where S: Serve
|
||||
{
|
||||
read_stream: BufReader<TcpStream>,
|
||||
write_stream: Mutex<BufWriter<TcpStream>>,
|
||||
write_stream: BufWriter<TcpStream>,
|
||||
shutdown: &'a AtomicBool,
|
||||
inflight_rpcs: &'a InflightRpcs,
|
||||
server: S,
|
||||
@@ -33,37 +33,28 @@ impl<'a, S> ConnectionHandler<'a, S> where S: Serve {
|
||||
fn handle_conn(&mut self) -> Result<()> {
|
||||
let ConnectionHandler {
|
||||
ref mut read_stream,
|
||||
ref write_stream,
|
||||
ref mut write_stream,
|
||||
shutdown,
|
||||
inflight_rpcs,
|
||||
ref inflight_rpcs,
|
||||
ref server,
|
||||
pool,
|
||||
} = *self;
|
||||
trace!("ConnectionHandler: serving client...");
|
||||
pool.scoped(|scope| {
|
||||
let (tx, rx) = channel();
|
||||
scope.execute(|| Self::write(rx, write_stream, inflight_rpcs));
|
||||
loop {
|
||||
match bincode::serde::deserialize_from(read_stream, bincode::SizeLimit::Infinite) {
|
||||
Ok(Packet { rpc_id, message, }) => {
|
||||
inflight_rpcs.increment();
|
||||
let tx = tx.clone();
|
||||
scope.execute(move || {
|
||||
let reply = server.serve(message);
|
||||
let reply_packet = Packet {
|
||||
rpc_id: rpc_id,
|
||||
message: reply
|
||||
};
|
||||
let mut write_stream = write_stream.lock().unwrap();
|
||||
if let Err(e) =
|
||||
bincode::serde::serialize_into(&mut *write_stream,
|
||||
&reply_packet,
|
||||
bincode::SizeLimit::Infinite) {
|
||||
warn!("ConnectionHandler: failed to write reply to Client: {:?}",
|
||||
e);
|
||||
}
|
||||
if let Err(e) = write_stream.flush() {
|
||||
warn!("ConnectionHandler: failed to flush reply to Client: {:?}",
|
||||
e);
|
||||
}
|
||||
inflight_rpcs.decrement();
|
||||
tx.send(reply_packet).unwrap();
|
||||
});
|
||||
if shutdown.load(Ordering::SeqCst) {
|
||||
info!("ConnectionHandler: server shutdown, so closing connection.");
|
||||
@@ -101,6 +92,33 @@ impl<'a, S> ConnectionHandler<'a, S> where S: Serve {
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn write(rx: Receiver<Packet<<S as Serve>::Reply>>,
|
||||
stream: &mut BufWriter<TcpStream>,
|
||||
inflight_rpcs: &InflightRpcs) {
|
||||
loop {
|
||||
match rx.recv() {
|
||||
Err(e) => {
|
||||
debug!("Write thread: returning due to {:?}", e);
|
||||
return;
|
||||
}
|
||||
Ok(reply_packet) => {
|
||||
if let Err(e) =
|
||||
bincode::serde::serialize_into(stream,
|
||||
&reply_packet,
|
||||
bincode::SizeLimit::Infinite) {
|
||||
warn!("Writer: failed to write reply to Client: {:?}",
|
||||
e);
|
||||
}
|
||||
if let Err(e) = stream.flush() {
|
||||
warn!("Writer: failed to flush reply to Client: {:?}",
|
||||
e);
|
||||
}
|
||||
inflight_rpcs.decrement();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct InflightRpcs {
|
||||
@@ -219,7 +237,7 @@ pub fn serve_async<A, S>(addr: A,
|
||||
scope.execute(|| {
|
||||
let mut handler = ConnectionHandler {
|
||||
read_stream: BufReader::new(conn.try_clone().unwrap()),
|
||||
write_stream: Mutex::new(BufWriter::new(conn)),
|
||||
write_stream: BufWriter::new(conn),
|
||||
shutdown: &shutdown,
|
||||
inflight_rpcs: &inflight_rpcs,
|
||||
server: &server,
|
||||
@@ -244,7 +262,7 @@ pub trait Serve: Send + Sync {
|
||||
/// The type of request received by the server
|
||||
type Request: 'static + fmt::Debug + serde::ser::Serialize + serde::de::Deserialize + Send;
|
||||
/// The type of reply sent by the server
|
||||
type Reply: 'static + fmt::Debug + serde::ser::Serialize + serde::de::Deserialize;
|
||||
type Reply : 'static + fmt::Debug + serde::ser::Serialize + serde::de::Deserialize + Send;
|
||||
|
||||
/// Return a reply for a given request
|
||||
fn serve(&self, request: Self::Request) -> Self::Reply;
|
||||
|
||||
Reference in New Issue
Block a user