mirror of
https://github.com/OMGeeky/tarpc.git
synced 2025-12-28 07:12:05 +01:00
Tests pass, hooray
This commit is contained in:
48
src/lib.rs
48
src/lib.rs
@@ -63,7 +63,7 @@ impl<T> convert::From<sync::mpsc::SendError<T>> for Error {
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
pub fn handle_conn<F, Request, Reply>(mut stream: TcpStream, f: Arc<F>) -> Result<()>
|
||||
pub fn handle_conn<F, Request, Reply>(mut stream: TcpStream, f: F) -> Result<()>
|
||||
where Request: fmt::Debug + serde::de::Deserialize + serde::ser::Serialize,
|
||||
Reply: fmt::Debug + serde::ser::Serialize,
|
||||
F: Serve<Request, Reply>
|
||||
@@ -71,11 +71,9 @@ pub fn handle_conn<F, Request, Reply>(mut stream: TcpStream, f: Arc<F>) -> Resul
|
||||
let read_stream = try!(stream.try_clone());
|
||||
let mut de = serde_json::Deserializer::new(read_stream.bytes());
|
||||
loop {
|
||||
println!("read");
|
||||
let request_packet: Packet<Request> = try!(Packet::deserialize(&mut de));
|
||||
match request_packet {
|
||||
Packet::Shutdown => {
|
||||
println!("server shutting down");
|
||||
try!(serde_json::to_writer(&mut stream, &request_packet));
|
||||
break;
|
||||
},
|
||||
@@ -109,10 +107,10 @@ impl Shutdown {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn serve_async<F, Request, Reply>(addr: &SocketAddr, f: Arc<F>) -> io::Result<Shutdown>
|
||||
pub fn serve_async<F, Request, Reply>(addr: &SocketAddr, f: F) -> io::Result<Shutdown>
|
||||
where Request: fmt::Debug + serde::de::Deserialize + fmt::Debug + serde::ser::Serialize,
|
||||
Reply: fmt::Debug + serde::ser::Serialize,
|
||||
F: 'static + Serve<Request, Reply>,
|
||||
F: 'static + Clone + Serve<Request, Reply>,
|
||||
{
|
||||
let listener = try!(TcpListener::bind(addr));
|
||||
let (die_tx, die_rx) = channel();
|
||||
@@ -152,6 +150,14 @@ pub trait Serve<Request, Reply>: Send + Sync {
|
||||
fn serve(&self, request: &Request) -> io::Result<Reply>;
|
||||
}
|
||||
|
||||
impl<Request, Reply, S> Serve<Request, Reply> for Arc<S>
|
||||
where S: Serve<Request, Reply>
|
||||
{
|
||||
fn serve(&self, request: &Request) -> io::Result<Reply> {
|
||||
S::serve(self, request)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
enum Packet<T> {
|
||||
Message(u64, T),
|
||||
@@ -172,7 +178,6 @@ fn reader<Reply>(
|
||||
reply_tx.send(reply).unwrap();
|
||||
},
|
||||
Ok(Packet::Shutdown) => {
|
||||
println!("reader shutting down");
|
||||
break;
|
||||
}
|
||||
// TODO: This shutdown logic is janky.. What's the right way to do this?
|
||||
@@ -245,7 +250,9 @@ impl<Reply> Client<Reply>
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use serde;
|
||||
use super::*;
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::net::{TcpStream, TcpListener, SocketAddr, ToSocketAddrs};
|
||||
use std::str::FromStr;
|
||||
@@ -303,35 +310,42 @@ mod test {
|
||||
}
|
||||
}
|
||||
|
||||
fn wtf<F, Request, Reply>(server: F) -> (SocketAddr, Shutdown)
|
||||
where Request: fmt::Debug + serde::de::Deserialize + fmt::Debug + serde::ser::Serialize,
|
||||
Reply: fmt::Debug + serde::ser::Serialize,
|
||||
F: 'static + Clone + Serve<Request, Reply>
|
||||
{
|
||||
let mut addr;
|
||||
let mut shutdown;
|
||||
while let &Err(_) = {shutdown = serve_async({addr = next_addr(); &addr}, server.clone()); &shutdown} { }
|
||||
(addr, shutdown.unwrap())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_handle() {
|
||||
let addr = next_addr();
|
||||
let server = Arc::new(Server::new());
|
||||
let srv_shutdown = serve_async(&addr, server).unwrap();
|
||||
let (addr, shutdown) = wtf(server.clone());
|
||||
let client_stream = TcpStream::connect(&addr).unwrap();
|
||||
let client: Client<Reply> = Client::new(client_stream).expect(&line!().to_string());
|
||||
client.disconnect::<Request>();
|
||||
srv_shutdown.shutdown();
|
||||
shutdown.shutdown();
|
||||
}
|
||||
|
||||
/*
|
||||
#[test]
|
||||
fn test() {
|
||||
let (client_stream, server_streams) = pair();
|
||||
let server = Arc::new(Server::new());
|
||||
let thread_server = server.clone();
|
||||
let guard = thread::spawn(move || serve(server_streams, thread_server));
|
||||
let (addr, shutdown) = wtf(server.clone());
|
||||
let client_stream = TcpStream::connect(&addr).unwrap();
|
||||
let client = Client::new(client_stream).unwrap();
|
||||
assert_eq!(Reply::Increment(0), client.rpc(&Request::Increment).unwrap());
|
||||
assert_eq!(1, server.count());
|
||||
assert_eq!(Reply::Increment(1), client.rpc(&Request::Increment).unwrap());
|
||||
assert_eq!(2, server.count());
|
||||
println!("joining client");
|
||||
client.join::<Request>().unwrap();
|
||||
println!("joining server");
|
||||
guard.join();
|
||||
client.disconnect::<Request>().unwrap();
|
||||
shutdown.shutdown();
|
||||
}
|
||||
|
||||
/*
|
||||
struct BarrierServer {
|
||||
barrier: Barrier,
|
||||
inner: Server,
|
||||
|
||||
Reference in New Issue
Block a user