Sketch of Client with only one reader thread

This commit is contained in:
Adam Wright
2016-01-08 20:36:31 -08:00
parent 3a3e2d1e4d
commit b011cbe1dc

View File

@@ -22,12 +22,8 @@ use std::sync::{
};
use std::sync::mpsc::{
channel,
sync_channel,
Sender,
SyncSender,
Receiver,
};
use std::time;
use std::thread;
#[derive(Debug)]
@@ -63,7 +59,7 @@ impl<T> convert::From<sync::mpsc::SendError<T>> for Error {
pub type Result<T> = std::result::Result<T, Error>;
pub fn handle_conn<F, Request, Reply>(mut stream: TcpStream, f: Arc<F>) -> Result<()>
where Request: fmt::Debug + serde::de::Deserialize,
where Request: fmt::Debug + serde::de::Deserialize + serde::ser::Serialize,
Reply: fmt::Debug + serde::ser::Serialize,
F: Serve<Request, Reply>
{
@@ -73,7 +69,10 @@ pub fn handle_conn<F, Request, Reply>(mut stream: TcpStream, f: Arc<F>) -> Resul
println!("read");
let request_packet: Packet<Request> = try!(Packet::deserialize(&mut de));
match request_packet {
Packet::Shutdown => break,
Packet::Shutdown => {
try!(serde_json::to_writer(&mut stream, &request_packet));
break;
},
Packet::Message(id, message) => {
let reply = try!(f.serve(&message));
let reply_packet = Packet::Message(id, reply);
@@ -86,7 +85,7 @@ pub fn handle_conn<F, Request, Reply>(mut stream: TcpStream, f: Arc<F>) -> Resul
}
pub fn serve<F, Request, Reply>(listener: TcpListener, f: Arc<F>) -> Error
where Request: fmt::Debug + serde::de::Deserialize,
where Request: fmt::Debug + serde::de::Deserialize + fmt::Debug + serde::ser::Serialize,
Reply: fmt::Debug + serde::ser::Serialize,
F: 'static + Serve<Request, Reply>,
{
@@ -115,49 +114,21 @@ enum Packet<T> {
Shutdown,
}
struct Handle<T> {
id: u64,
sender: Sender<T>,
}
enum ReceiverMessage<Reply> {
Handle(Handle<Reply>),
Packet(Packet<Reply>),
Shutdown,
}
fn receiver<Reply>(messages: Receiver<ReceiverMessage<Reply>>) -> Result<()> {
let mut ready_handles: HashMap<u64, Handle<Reply>> = HashMap::new();
for message in messages.into_iter() {
match message {
ReceiverMessage::Handle(handle) => {
ready_handles.insert(handle.id, handle);
},
ReceiverMessage::Packet(Packet::Shutdown) => break,
ReceiverMessage::Packet(Packet::Message(id, message)) => {
let handle = ready_handles.remove(&id).unwrap();
try!(handle.sender.send(message));
}
ReceiverMessage::Shutdown => break,
}
}
Ok(())
}
fn reader<Reply>(stream: TcpStream, tx: SyncSender<ReceiverMessage<Reply>>)
fn reader<Reply>(
stream: TcpStream,
requests: Arc<Mutex<HashMap<u64, Sender<Reply>>>>)
where Reply: serde::Deserialize
{
use serde_json::Error::SyntaxError;
use serde_json::ErrorCode::EOFWhileParsingValue;
let mut de = serde_json::Deserializer::new(stream.bytes());
loop {
match Packet::deserialize(&mut de) {
Ok(packet) =>{
println!("send!");
tx.send(ReceiverMessage::Packet(packet)).unwrap();
Ok(Packet::Message(id, reply)) => {
let mut requests = requests.lock().unwrap();
let reply_tx = requests.remove(&id).unwrap();
reply_tx.send(reply).unwrap();
},
Ok(Packet::Shutdown) => break,
// TODO: This shutdown logic is janky.. What's the right way to do this?
Err(SyntaxError(EOFWhileParsingValue, _, _)) => break,
Err(err) => panic!("unexpected error while parsing!: {:?}", err),
}
}
@@ -169,14 +140,14 @@ fn increment(cur_id: &mut u64) -> u64 {
id
}
struct SyncedClientState<Reply> {
struct SyncedClientState {
next_id: u64,
stream: TcpStream,
handles_tx: SyncSender<ReceiverMessage<Reply>>,
}
pub struct Client<Reply> {
synced_state: Mutex<SyncedClientState<Reply>>,
synced_state: Mutex<SyncedClientState>,
requests: Arc<Mutex<HashMap<u64, Sender<Reply>>>>,
reader_guard: thread::JoinHandle<()>,
}
@@ -184,19 +155,18 @@ impl<Reply> Client<Reply>
where Reply: serde::de::Deserialize + Send + 'static
{
pub fn new(stream: TcpStream) -> Result<Self> {
let (handles_tx, receiver_rx) = sync_channel(0);
let read_stream = try!(stream.try_clone());
try!(read_stream.set_read_timeout(Some(time::Duration::from_millis(50))));
let reader_handles_tx = handles_tx.clone();
let guard = thread::spawn(move || reader(read_stream, reader_handles_tx));
thread::spawn(move || receiver(receiver_rx));
let requests = Arc::new(Mutex::new(HashMap::new()));
let reader_stream = try!(stream.try_clone());
let reader_requests = requests.clone();
let reader_guard =
thread::spawn(move || reader(reader_stream, reader_requests));
Ok(Client{
synced_state: Mutex::new(SyncedClientState{
next_id: 0,
stream: stream,
handles_tx: handles_tx,
}),
reader_guard: guard,
requests: requests,
reader_guard: reader_guard,
})
}
@@ -206,10 +176,10 @@ impl<Reply> Client<Reply>
let (tx, rx) = channel();
let mut state = self.synced_state.lock().unwrap();
let id = increment(&mut state.next_id);
try!(state.handles_tx.send(ReceiverMessage::Handle(Handle{
id: id,
sender: tx,
})));
{
let mut requests = self.requests.lock().unwrap();
requests.insert(id, tx);
}
let packet = Packet::Message(id, request.clone());
try!(serde_json::to_writer(&mut state.stream, &packet));
Ok(rx.recv().unwrap())