Removed a bunch of over-engineered code

This commit is contained in:
Adam Wright
2016-01-08 04:15:10 -08:00
parent 579d3909e5
commit 44b3765d70
4 changed files with 52 additions and 175 deletions

View File

@@ -1,7 +1,6 @@
#![feature(custom_derive, plugin)]
#![plugin(serde_macros)]
extern crate multi_tcp;
extern crate serde;
extern crate serde_json;
@@ -134,57 +133,78 @@ fn receiver<Reply>(messages: Receiver<ReceiverMessage<Reply>>) -> Result<()> {
Ok(())
}
pub struct Client<Request, Reply> {
next_id: Mutex<u64>,
writer: multi_tcp::MultiStream<Packet<Request>, Error>,
handles_tx: SyncSender<ReceiverMessage<Reply>>,
fn reader<T, F>(mut stream: TcpStream, decode: F, tx: SyncSender<T>)
where F: Send + 'static + Fn(&mut TcpStream) -> Result<T>,
T: Send + 'static
{
loop {
let t = decode(&mut stream).expect("I couldn't do the thing");
if let Err(_) = tx.send(t) {
break;
}
}
}
impl<Request, Reply> Client<Request, Reply>
where Request: serde::ser::Serialize + Clone + Send + 'static,
Reply: serde::de::Deserialize + Send + 'static
fn increment(cur_id: &mut u64) -> u64 {
let id = *cur_id;
*cur_id += 1;
id
}
struct SyncedClientState{
next_id: u64,
stream: TcpStream,
}
pub struct Client<Reply> {
synced_state: Mutex<SyncedClientState>,
handles_tx: SyncSender<ReceiverMessage<Reply>>,
reader_guard: thread::JoinHandle<()>,
}
impl<Reply> Client<Reply>
where Reply: serde::de::Deserialize + Send + 'static
{
pub fn new(stream: TcpStream) -> Result<Self> {
let (handles_tx, receiver_rx) = sync_channel(0);
let writer: multi_tcp::MultiStream<Packet<Request>, Error>
= multi_tcp::MultiStream::with_sync_sender(
stream,
|stream: &mut TcpStream, packet: &Packet<Request>| {
try!(serde_json::to_writer(stream, packet));
Ok(())
},
|mut stream| {
let packet = try!(serde_json::from_reader(&mut stream));
Ok(ReceiverMessage::Packet(packet))
},
handles_tx.clone());
let decode = |mut stream: &mut TcpStream| {
let packet = try!(serde_json::from_reader(&mut stream));
Ok(ReceiverMessage::Packet(packet))
};
let read_stream = try!(stream.try_clone());
let reader_handles_tx = handles_tx.clone();
let guard = thread::spawn(move || reader(read_stream, decode, reader_handles_tx));
thread::spawn(move || receiver(receiver_rx));
Ok(Client{
next_id: Mutex::new(0),
writer: writer,
synced_state: Mutex::new(SyncedClientState{
next_id: 0,
stream: stream,
}),
reader_guard: guard,
handles_tx: handles_tx,
})
}
fn get_next_id(&self) -> u64 {
let mut id = self.next_id.lock().unwrap();
*id += 1;
*id
}
pub fn rpc(&self, request: &Request) -> Result<Reply> {
pub fn rpc<Request>(&self, request: &Request) -> Result<Reply>
where Request: serde::ser::Serialize + Clone + Send + 'static
{
let (tx, rx) = channel();
let id = self.get_next_id();
let mut state = self.synced_state.lock().unwrap();
let id = increment(&mut state.next_id);
try!(self.handles_tx.send(ReceiverMessage::Handle(Handle{
id: id,
sender: tx,
})));
try!(self.writer.write(Packet{
try!(serde_json::to_writer(&mut state.stream, &Packet{
id: id,
message: request.clone(),
}));
Ok(rx.recv().unwrap())
}
pub fn join(self) {
self.reader_guard.join().unwrap();
}
}
#[cfg(test)]
@@ -224,7 +244,7 @@ mod test {
fn test() {
let (client_stream, server_streams) = pair();
thread::spawn(|| serve(server_streams, Server));
let client: Client<Request, Reply> = Client::new(client_stream).unwrap();
let client = Client::new(client_stream).unwrap();
assert_eq!(Reply::Increment, client.rpc(&Request::Increment).unwrap());
}
}

View File

@@ -1,6 +0,0 @@
[package]
name = "multi_tcp"
version = "0.1.0"
authors = ["Adam Wright <adam.austin.wright@gmail.com>"]
[dependencies]

View File

@@ -1,136 +0,0 @@
use std::fmt;
use std::net::TcpStream;
use std::thread;
use std::sync::mpsc::{
channel,
sync_channel,
Sender,
SyncSender,
Receiver,
};
fn read<T, F, E>(mut stream: TcpStream, decode: F, tx: SyncSender<T>)
where F: Send + 'static + Fn(&mut TcpStream) -> Result<T, E>,
T: Send + 'static,
E: fmt::Debug + Send + 'static
{
loop {
let t = decode(&mut stream).expect("I couldn't do the thing");
if let Err(_) = tx.send(t) {
break;
}
}
}
struct SendHelper<T, E> {
value: T,
result: Sender<Result<(), E>>,
}
fn write<T, F, E>(mut stream: TcpStream, encode: F) -> Sender<SendHelper<T, E>>
where F: Send + 'static + Fn(&mut TcpStream, &T) -> Result<(), E>,
T: Send + 'static,
E: Send + 'static
{
let (tx, rx) = channel();
thread::spawn(move || {
loop {
let helper: SendHelper<T, E> = match rx.recv() {
Ok(h) => h,
Err(_) => {
break;
}
};
helper.result.send(encode(&mut stream, &helper.value))
.expect("died trying to send the result to the helper");
}
});
tx
}
pub struct MultiStream<Request, E> {
tx: Sender<SendHelper<Request, E>>,
}
impl<Request, E> MultiStream<Request, E>
where Request: Send + 'static,
E: fmt::Debug + Send + 'static
{
pub fn new<Reply, F, G>(
stream: TcpStream,
encode: F,
decode: G) -> (Self, Receiver<Reply>)
where Reply: Send + 'static,
F: Send + 'static + Fn(&mut TcpStream, &Request) -> Result<(), E>,
G: Send + 'static + Fn(&mut TcpStream) -> Result<Reply, E>
{
let read_stream = stream.try_clone().unwrap();
let ms = MultiStream{tx: write(stream, encode)};
let (reply_tx, reply_rx) = sync_channel(0);
thread::spawn(move || read(read_stream, decode, reply_tx));
(ms, reply_rx)
}
pub fn with_sync_sender<Reply, F, G>(
stream: TcpStream,
encode: F,
decode: G,
reply_tx: SyncSender<Reply>) -> Self
where Reply: Send + 'static,
F: Send + 'static + Fn(&mut TcpStream, &Request) -> Result<(), E>,
G: Send + 'static + Fn(&mut TcpStream) -> Result<Reply, E>
{
let read_stream = stream.try_clone().unwrap();
thread::spawn(move || read(read_stream, decode, reply_tx));
MultiStream{tx: write(stream, encode)}
}
pub fn write(&self, value: Request) -> Result<(), E> {
let my_tx = self.tx.clone();
let (reply_tx, reply_rx) = channel();
let helper = SendHelper{
value: value,
result: reply_tx,
};
my_tx.send(helper).unwrap();
reply_rx.recv().unwrap()
}
}
#[cfg(test)]
mod test {
use super::MultiStream;
use std::net::{TcpStream, TcpListener};
use std::sync::mpsc::Receiver;
use std::io::{Write, Read};
fn pair() -> (TcpStream, Receiver<TcpStream>) {
let addr = "127.0.0.1:9000";
let recv_stream = listen(TcpListener::bind(addr).unwrap());
(TcpStream::connect(addr).unwrap(), recv_stream)
}
fn write_byte(stream: &mut TcpStream, v: u8) -> Result<(), ()> {
stream.write(&[v]).unwrap();
Ok(())
}
fn read_byte(stream: &mut TcpStream) -> Result<u8, ()> {
let mut buf = [0u8];
stream.read_exact(&mut buf[..]).unwrap();
Ok(buf[0])
}
#[test]
fn test_thing() {
let (stream, listener) = pair();
let (ms, reader) : (MultiStream<u8, u8, ()>, Receiver<u8>) =
MultiStream::new(stream, |s, v| write_byte(s, *v), |s| read_byte(s));
ms.write(5).expect("writing 5");
let mut srv_stream = listener.accept().unwrap().0;
assert_eq!(5, read_byte(&mut srv_stream).expect("read 5"));
write_byte(&mut srv_stream, 10).expect("write 10");
assert_eq!(10, reader.recv().expect("reading 10"));
}
}