From 44b3765d7027ba1226a0e08a483c861a7ddd1e05 Mon Sep 17 00:00:00 2001 From: Adam Wright Date: Fri, 8 Jan 2016 04:15:10 -0800 Subject: [PATCH] Removed a bunch of over-engineered code --- Cargo.toml | 1 - src/lib.rs | 84 +++++++++++++++--------- src/multi_tcp/Cargo.toml | 6 -- src/multi_tcp/src/lib.rs | 136 --------------------------------------- 4 files changed, 52 insertions(+), 175 deletions(-) delete mode 100644 src/multi_tcp/Cargo.toml delete mode 100644 src/multi_tcp/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 2ff0216..31a32ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,4 +7,3 @@ authors = ["Adam Wright "] serde = "*" serde_json = "*" serde_macros = "*" -multi_tcp = { path = "src/multi_tcp" } diff --git a/src/lib.rs b/src/lib.rs index 36c14ea..f3e9af0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,6 @@ #![feature(custom_derive, plugin)] #![plugin(serde_macros)] -extern crate multi_tcp; extern crate serde; extern crate serde_json; @@ -134,57 +133,78 @@ fn receiver(messages: Receiver>) -> Result<()> { Ok(()) } -pub struct Client { - next_id: Mutex, - writer: multi_tcp::MultiStream, Error>, - handles_tx: SyncSender>, +fn reader(mut stream: TcpStream, decode: F, tx: SyncSender) + where F: Send + 'static + Fn(&mut TcpStream) -> Result, + T: Send + 'static +{ + loop { + let t = decode(&mut stream).expect("I couldn't do the thing"); + if let Err(_) = tx.send(t) { + break; + } + } } -impl Client - where Request: serde::ser::Serialize + Clone + Send + 'static, - Reply: serde::de::Deserialize + Send + 'static +fn increment(cur_id: &mut u64) -> u64 { + let id = *cur_id; + *cur_id += 1; + id +} + +struct SyncedClientState{ + next_id: u64, + stream: TcpStream, +} + +pub struct Client { + synced_state: Mutex, + handles_tx: SyncSender>, + reader_guard: thread::JoinHandle<()>, +} + +impl Client + where Reply: serde::de::Deserialize + Send + 'static { pub fn new(stream: TcpStream) -> Result { let (handles_tx, receiver_rx) = sync_channel(0); - let writer: multi_tcp::MultiStream, Error> - = multi_tcp::MultiStream::with_sync_sender( - stream, - |stream: &mut TcpStream, packet: &Packet| { - try!(serde_json::to_writer(stream, packet)); - Ok(()) - }, - |mut stream| { - let packet = try!(serde_json::from_reader(&mut stream)); - Ok(ReceiverMessage::Packet(packet)) - }, - handles_tx.clone()); + let decode = |mut stream: &mut TcpStream| { + let packet = try!(serde_json::from_reader(&mut stream)); + Ok(ReceiverMessage::Packet(packet)) + }; + let read_stream = try!(stream.try_clone()); + let reader_handles_tx = handles_tx.clone(); + let guard = thread::spawn(move || reader(read_stream, decode, reader_handles_tx)); thread::spawn(move || receiver(receiver_rx)); Ok(Client{ - next_id: Mutex::new(0), - writer: writer, + synced_state: Mutex::new(SyncedClientState{ + next_id: 0, + stream: stream, + }), + reader_guard: guard, handles_tx: handles_tx, }) } - fn get_next_id(&self) -> u64 { - let mut id = self.next_id.lock().unwrap(); - *id += 1; - *id - } - - pub fn rpc(&self, request: &Request) -> Result { + pub fn rpc(&self, request: &Request) -> Result + where Request: serde::ser::Serialize + Clone + Send + 'static + { let (tx, rx) = channel(); - let id = self.get_next_id(); + let mut state = self.synced_state.lock().unwrap(); + let id = increment(&mut state.next_id); try!(self.handles_tx.send(ReceiverMessage::Handle(Handle{ id: id, sender: tx, }))); - try!(self.writer.write(Packet{ + try!(serde_json::to_writer(&mut state.stream, &Packet{ id: id, message: request.clone(), })); Ok(rx.recv().unwrap()) } + + pub fn join(self) { + self.reader_guard.join().unwrap(); + } } #[cfg(test)] @@ -224,7 +244,7 @@ mod test { fn test() { let (client_stream, server_streams) = pair(); thread::spawn(|| serve(server_streams, Server)); - let client: Client = Client::new(client_stream).unwrap(); + let client = Client::new(client_stream).unwrap(); assert_eq!(Reply::Increment, client.rpc(&Request::Increment).unwrap()); } } diff --git a/src/multi_tcp/Cargo.toml b/src/multi_tcp/Cargo.toml deleted file mode 100644 index 2fac3ad..0000000 --- a/src/multi_tcp/Cargo.toml +++ /dev/null @@ -1,6 +0,0 @@ -[package] -name = "multi_tcp" -version = "0.1.0" -authors = ["Adam Wright "] - -[dependencies] diff --git a/src/multi_tcp/src/lib.rs b/src/multi_tcp/src/lib.rs deleted file mode 100644 index 7b8183d..0000000 --- a/src/multi_tcp/src/lib.rs +++ /dev/null @@ -1,136 +0,0 @@ -use std::fmt; -use std::net::TcpStream; -use std::thread; -use std::sync::mpsc::{ - channel, - sync_channel, - Sender, - SyncSender, - Receiver, -}; - -fn read(mut stream: TcpStream, decode: F, tx: SyncSender) - where F: Send + 'static + Fn(&mut TcpStream) -> Result, - T: Send + 'static, - E: fmt::Debug + Send + 'static -{ - loop { - let t = decode(&mut stream).expect("I couldn't do the thing"); - if let Err(_) = tx.send(t) { - break; - } - } -} - -struct SendHelper { - value: T, - result: Sender>, -} - -fn write(mut stream: TcpStream, encode: F) -> Sender> - where F: Send + 'static + Fn(&mut TcpStream, &T) -> Result<(), E>, - T: Send + 'static, - E: Send + 'static -{ - let (tx, rx) = channel(); - thread::spawn(move || { - loop { - let helper: SendHelper = match rx.recv() { - Ok(h) => h, - Err(_) => { - break; - } - }; - helper.result.send(encode(&mut stream, &helper.value)) - .expect("died trying to send the result to the helper"); - } - }); - tx -} - -pub struct MultiStream { - tx: Sender>, -} - -impl MultiStream - where Request: Send + 'static, - E: fmt::Debug + Send + 'static -{ - pub fn new( - stream: TcpStream, - encode: F, - decode: G) -> (Self, Receiver) - where Reply: Send + 'static, - F: Send + 'static + Fn(&mut TcpStream, &Request) -> Result<(), E>, - G: Send + 'static + Fn(&mut TcpStream) -> Result - { - let read_stream = stream.try_clone().unwrap(); - let ms = MultiStream{tx: write(stream, encode)}; - let (reply_tx, reply_rx) = sync_channel(0); - thread::spawn(move || read(read_stream, decode, reply_tx)); - (ms, reply_rx) - } - - pub fn with_sync_sender( - stream: TcpStream, - encode: F, - decode: G, - reply_tx: SyncSender) -> Self - where Reply: Send + 'static, - F: Send + 'static + Fn(&mut TcpStream, &Request) -> Result<(), E>, - G: Send + 'static + Fn(&mut TcpStream) -> Result - { - let read_stream = stream.try_clone().unwrap(); - thread::spawn(move || read(read_stream, decode, reply_tx)); - MultiStream{tx: write(stream, encode)} - } - - - pub fn write(&self, value: Request) -> Result<(), E> { - let my_tx = self.tx.clone(); - let (reply_tx, reply_rx) = channel(); - let helper = SendHelper{ - value: value, - result: reply_tx, - }; - my_tx.send(helper).unwrap(); - reply_rx.recv().unwrap() - } -} - -#[cfg(test)] -mod test { - use super::MultiStream; - use std::net::{TcpStream, TcpListener}; - use std::sync::mpsc::Receiver; - use std::io::{Write, Read}; - - fn pair() -> (TcpStream, Receiver) { - let addr = "127.0.0.1:9000"; - let recv_stream = listen(TcpListener::bind(addr).unwrap()); - (TcpStream::connect(addr).unwrap(), recv_stream) - } - - fn write_byte(stream: &mut TcpStream, v: u8) -> Result<(), ()> { - stream.write(&[v]).unwrap(); - Ok(()) - } - - fn read_byte(stream: &mut TcpStream) -> Result { - let mut buf = [0u8]; - stream.read_exact(&mut buf[..]).unwrap(); - Ok(buf[0]) - } - - #[test] - fn test_thing() { - let (stream, listener) = pair(); - let (ms, reader) : (MultiStream, Receiver) = - MultiStream::new(stream, |s, v| write_byte(s, *v), |s| read_byte(s)); - ms.write(5).expect("writing 5"); - let mut srv_stream = listener.accept().unwrap().0; - assert_eq!(5, read_byte(&mut srv_stream).expect("read 5")); - write_byte(&mut srv_stream, 10).expect("write 10"); - assert_eq!(10, reader.recv().expect("reading 10")); - } -}