I made le test pass

This commit is contained in:
Adam Wright
2016-01-08 02:39:40 -08:00
parent dbf7113cf3
commit 579d3909e5
2 changed files with 34 additions and 35 deletions

View File

@@ -5,7 +5,9 @@ extern crate multi_tcp;
extern crate serde;
extern crate serde_json;
use std::io;
use serde::Deserialize;
use std::fmt;
use std::io::{self, Read};
use std::convert;
use std::collections::HashMap;
use std::net::{
@@ -36,7 +38,10 @@ pub enum Error {
impl convert::From<serde_json::Error> for Error {
fn from(err: serde_json::Error) -> Error {
Error::Json(err)
match err {
serde_json::Error::IoError(err) => Error::Io(err),
err => Error::Json(err),
}
}
}
@@ -55,21 +60,27 @@ impl<T> convert::From<sync::mpsc::SendError<T>> for Error {
pub type Result<T> = std::result::Result<T, Error>;
pub fn handle_conn<F, Request, Reply>(
mut conn: TcpStream,
mut stream: TcpStream,
f: F) -> Result<()>
where Request: serde::de::Deserialize,
Reply: serde::ser::Serialize,
where Request: fmt::Debug + serde::de::Deserialize,
Reply: fmt::Debug + serde::ser::Serialize,
F: 'static + Serve<Request, Reply>
{
let request: Request = try!(serde_json::from_reader(&mut conn));
let response = try!(f.serve(&request));
try!(serde_json::to_writer(&mut conn, &response));
let read_stream = try!(stream.try_clone());
let mut de = serde_json::Deserializer::new(read_stream.bytes());
let request_packet: Packet<Request> = try!(Packet::deserialize(&mut de));
let reply = try!(f.serve(&request_packet.message));
let reply_packet = Packet{
id: request_packet.id,
message: reply,
};
try!(serde_json::to_writer(&mut stream, &reply_packet));
Ok(())
}
pub fn serve<F, Request, Reply>(listener: TcpListener, f: F) -> Error
where Request: serde::de::Deserialize,
Reply: serde::ser::Serialize,
where Request: fmt::Debug + serde::de::Deserialize,
Reply: fmt::Debug + serde::ser::Serialize,
F: 'static + Serve<Request, Reply>,
{
for conn in listener.incoming() {
@@ -77,7 +88,6 @@ pub fn serve<F, Request, Reply>(listener: TcpListener, f: F) -> Error
Err(err) => return convert::From::from(err),
Ok(c) => c,
};
println!("received connection");
let f = f.clone();
thread::spawn(move || {
if let Err(err) = handle_conn(conn, f) {
@@ -92,7 +102,7 @@ pub trait Serve<Request, Reply> : Sync + Send + Clone {
fn serve(&self, request: &Request) -> io::Result<Reply>;
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Packet<T> {
id: u64,
message: T,
@@ -126,7 +136,7 @@ fn receiver<Reply>(messages: Receiver<ReceiverMessage<Reply>>) -> Result<()> {
pub struct Client<Request, Reply> {
next_id: Mutex<u64>,
writer: multi_tcp::MultiStream<Packet<Request>, serde_json::Error>,
writer: multi_tcp::MultiStream<Packet<Request>, Error>,
handles_tx: SyncSender<ReceiverMessage<Reply>>,
}
@@ -136,20 +146,16 @@ impl<Request, Reply> Client<Request, Reply>
{
pub fn new(stream: TcpStream) -> Result<Self> {
let (handles_tx, receiver_rx) = sync_channel(0);
let writer = multi_tcp::MultiStream::with_sync_sender(
let writer: multi_tcp::MultiStream<Packet<Request>, Error>
= multi_tcp::MultiStream::with_sync_sender(
stream,
|stream, packet: &Packet<Request>| {
try!(serde_json::to_writer(stream, &packet.id));
try!(serde_json::to_writer(stream, &packet.message));
|stream: &mut TcpStream, packet: &Packet<Request>| {
try!(serde_json::to_writer(stream, packet));
Ok(())
},
|stream| {
let id = try!(serde_json::from_reader(stream));
let reply = try!(serde_json::from_reader(stream));
Ok(ReceiverMessage::Packet(Packet{
id: id,
message: reply,
}))
|mut stream| {
let packet = try!(serde_json::from_reader(&mut stream));
Ok(ReceiverMessage::Packet(packet))
},
handles_tx.clone());
thread::spawn(move || receiver(receiver_rx));
@@ -169,17 +175,14 @@ impl<Request, Reply> Client<Request, Reply>
pub fn rpc(&self, request: &Request) -> Result<Reply> {
let (tx, rx) = channel();
let id = self.get_next_id();
println!("indicate that we're weaiting");
try!(self.handles_tx.send(ReceiverMessage::Handle(Handle{
id: id,
sender: tx,
})));
println!("write the request to the wire");
try!(self.writer.write(Packet{
id: id,
message: request.clone(),
}));
println!("wait for the response");
Ok(rx.recv().unwrap())
}
}
@@ -220,13 +223,8 @@ mod test {
#[test]
fn test() {
let (client_stream, server_streams) = pair();
println!("starting server!");
thread::spawn(|| {
serve(server_streams, Server)
});
println!("making client");
thread::spawn(|| serve(server_streams, Server));
let client: Client<Request, Reply> = Client::new(client_stream).unwrap();
println!("hi there");
assert_eq!(Reply::Increment, client.rpc(&Request::Increment).unwrap());
}
}

View File

@@ -15,7 +15,7 @@ fn read<T, F, E>(mut stream: TcpStream, decode: F, tx: SyncSender<T>)
E: fmt::Debug + Send + 'static
{
loop {
let t = decode(&mut stream).unwrap();
let t = decode(&mut stream).expect("I couldn't do the thing");
if let Err(_) = tx.send(t) {
break;
}
@@ -41,7 +41,8 @@ fn write<T, F, E>(mut stream: TcpStream, encode: F) -> Sender<SendHelper<T, E>>
break;
}
};
helper.result.send(encode(&mut stream, &helper.value)).unwrap();
helper.result.send(encode(&mut stream, &helper.value))
.expect("died trying to send the result to the helper");
}
});
tx