From 579d3909e54d714fa4a0f0caeb2cc83fe375ea8c Mon Sep 17 00:00:00 2001 From: Adam Wright Date: Fri, 8 Jan 2016 02:39:40 -0800 Subject: [PATCH] I made le test pass --- src/lib.rs | 64 +++++++++++++++++++--------------------- src/multi_tcp/src/lib.rs | 5 ++-- 2 files changed, 34 insertions(+), 35 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ad0343c..36c14ea 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,9 @@ extern crate multi_tcp; extern crate serde; extern crate serde_json; -use std::io; +use serde::Deserialize; +use std::fmt; +use std::io::{self, Read}; use std::convert; use std::collections::HashMap; use std::net::{ @@ -36,7 +38,10 @@ pub enum Error { impl convert::From for Error { fn from(err: serde_json::Error) -> Error { - Error::Json(err) + match err { + serde_json::Error::IoError(err) => Error::Io(err), + err => Error::Json(err), + } } } @@ -55,21 +60,27 @@ impl convert::From> for Error { pub type Result = std::result::Result; pub fn handle_conn( - mut conn: TcpStream, + mut stream: TcpStream, f: F) -> Result<()> - where Request: serde::de::Deserialize, - Reply: serde::ser::Serialize, + where Request: fmt::Debug + serde::de::Deserialize, + Reply: fmt::Debug + serde::ser::Serialize, F: 'static + Serve { - let request: Request = try!(serde_json::from_reader(&mut conn)); - let response = try!(f.serve(&request)); - try!(serde_json::to_writer(&mut conn, &response)); + let read_stream = try!(stream.try_clone()); + let mut de = serde_json::Deserializer::new(read_stream.bytes()); + let request_packet: Packet = try!(Packet::deserialize(&mut de)); + let reply = try!(f.serve(&request_packet.message)); + let reply_packet = Packet{ + id: request_packet.id, + message: reply, + }; + try!(serde_json::to_writer(&mut stream, &reply_packet)); Ok(()) } pub fn serve(listener: TcpListener, f: F) -> Error - where Request: serde::de::Deserialize, - Reply: serde::ser::Serialize, + where Request: fmt::Debug + serde::de::Deserialize, + Reply: fmt::Debug + serde::ser::Serialize, F: 'static + Serve, { for conn in listener.incoming() { @@ -77,7 +88,6 @@ pub fn serve(listener: TcpListener, f: F) -> Error Err(err) => return convert::From::from(err), Ok(c) => c, }; - println!("received connection"); let f = f.clone(); thread::spawn(move || { if let Err(err) = handle_conn(conn, f) { @@ -92,7 +102,7 @@ pub trait Serve : Sync + Send + Clone { fn serve(&self, request: &Request) -> io::Result; } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] struct Packet { id: u64, message: T, @@ -126,7 +136,7 @@ fn receiver(messages: Receiver>) -> Result<()> { pub struct Client { next_id: Mutex, - writer: multi_tcp::MultiStream, serde_json::Error>, + writer: multi_tcp::MultiStream, Error>, handles_tx: SyncSender>, } @@ -136,20 +146,16 @@ impl Client { pub fn new(stream: TcpStream) -> Result { let (handles_tx, receiver_rx) = sync_channel(0); - let writer = multi_tcp::MultiStream::with_sync_sender( + let writer: multi_tcp::MultiStream, Error> + = multi_tcp::MultiStream::with_sync_sender( stream, - |stream, packet: &Packet| { - try!(serde_json::to_writer(stream, &packet.id)); - try!(serde_json::to_writer(stream, &packet.message)); + |stream: &mut TcpStream, packet: &Packet| { + try!(serde_json::to_writer(stream, packet)); Ok(()) }, - |stream| { - let id = try!(serde_json::from_reader(stream)); - let reply = try!(serde_json::from_reader(stream)); - Ok(ReceiverMessage::Packet(Packet{ - id: id, - message: reply, - })) + |mut stream| { + let packet = try!(serde_json::from_reader(&mut stream)); + Ok(ReceiverMessage::Packet(packet)) }, handles_tx.clone()); thread::spawn(move || receiver(receiver_rx)); @@ -169,17 +175,14 @@ impl Client pub fn rpc(&self, request: &Request) -> Result { let (tx, rx) = channel(); let id = self.get_next_id(); - println!("indicate that we're weaiting"); try!(self.handles_tx.send(ReceiverMessage::Handle(Handle{ id: id, sender: tx, }))); - println!("write the request to the wire"); try!(self.writer.write(Packet{ id: id, message: request.clone(), })); - println!("wait for the response"); Ok(rx.recv().unwrap()) } } @@ -220,13 +223,8 @@ mod test { #[test] fn test() { let (client_stream, server_streams) = pair(); - println!("starting server!"); - thread::spawn(|| { - serve(server_streams, Server) - }); - println!("making client"); + thread::spawn(|| serve(server_streams, Server)); let client: Client = Client::new(client_stream).unwrap(); - println!("hi there"); assert_eq!(Reply::Increment, client.rpc(&Request::Increment).unwrap()); } } diff --git a/src/multi_tcp/src/lib.rs b/src/multi_tcp/src/lib.rs index 4bccdde..7b8183d 100644 --- a/src/multi_tcp/src/lib.rs +++ b/src/multi_tcp/src/lib.rs @@ -15,7 +15,7 @@ fn read(mut stream: TcpStream, decode: F, tx: SyncSender) E: fmt::Debug + Send + 'static { loop { - let t = decode(&mut stream).unwrap(); + let t = decode(&mut stream).expect("I couldn't do the thing"); if let Err(_) = tx.send(t) { break; } @@ -41,7 +41,8 @@ fn write(mut stream: TcpStream, encode: F) -> Sender> break; } }; - helper.result.send(encode(&mut stream, &helper.value)).unwrap(); + helper.result.send(encode(&mut stream, &helper.value)) + .expect("died trying to send the result to the helper"); } }); tx