mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-01-19 09:50:47 +01:00
Concurrency actually works
This commit is contained in:
50
src/lib.rs
50
src/lib.rs
@@ -64,23 +64,31 @@ impl<T> convert::From<sync::mpsc::SendError<T>> for Error {
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
pub fn handle_conn<F, Request, Reply>(mut stream: TcpStream, f: F) -> Result<()>
|
||||
where Request: fmt::Debug + serde::de::Deserialize + serde::ser::Serialize,
|
||||
Reply: fmt::Debug + serde::ser::Serialize,
|
||||
F: Serve<Request, Reply>
|
||||
where Request: 'static + fmt::Debug + Send + serde::de::Deserialize + serde::ser::Serialize,
|
||||
Reply: 'static + fmt::Debug + serde::ser::Serialize,
|
||||
F: 'static + Clone + Serve<Request, Reply>
|
||||
{
|
||||
let read_stream = try!(stream.try_clone());
|
||||
let mut de = serde_json::Deserializer::new(read_stream.bytes());
|
||||
let stream = Arc::new(Mutex::new(stream));
|
||||
loop {
|
||||
let request_packet: Packet<Request> = try!(Packet::deserialize(&mut de));
|
||||
match request_packet {
|
||||
Packet::Shutdown => {
|
||||
try!(serde_json::to_writer(&mut stream, &request_packet));
|
||||
let stream = stream.clone();
|
||||
let mut my_stream = stream.lock().unwrap();
|
||||
try!(serde_json::to_writer(&mut *my_stream, &request_packet));
|
||||
break;
|
||||
},
|
||||
Packet::Message(id, message) => {
|
||||
let reply = try!(f.serve(&message));
|
||||
let reply_packet = Packet::Message(id, reply);
|
||||
try!(serde_json::to_writer(&mut stream, &reply_packet));
|
||||
let f = f.clone();
|
||||
let arc_stream = stream.clone();
|
||||
thread::spawn(move || {
|
||||
let reply = f.serve(&message).unwrap();
|
||||
let reply_packet = Packet::Message(id, reply);
|
||||
let mut my_stream = arc_stream.lock().unwrap();
|
||||
serde_json::to_writer(&mut *my_stream, &reply_packet).unwrap();
|
||||
});
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -108,8 +116,8 @@ impl Shutdown {
|
||||
}
|
||||
|
||||
pub fn serve_async<F, Request, Reply>(addr: &SocketAddr, f: F) -> io::Result<Shutdown>
|
||||
where Request: fmt::Debug + serde::de::Deserialize + fmt::Debug + serde::ser::Serialize,
|
||||
Reply: fmt::Debug + serde::ser::Serialize,
|
||||
where Request: 'static + fmt::Debug + Send + serde::de::Deserialize + fmt::Debug + serde::ser::Serialize,
|
||||
Reply: 'static + fmt::Debug + serde::ser::Serialize,
|
||||
F: 'static + Clone + Serve<Request, Reply>,
|
||||
{
|
||||
let listener = try!(TcpListener::bind(addr));
|
||||
@@ -234,6 +242,7 @@ impl<Reply> Client<Reply>
|
||||
}
|
||||
let packet = Packet::Message(id, request.clone());
|
||||
try!(serde_json::to_writer(&mut state.stream, &packet));
|
||||
drop(state);
|
||||
Ok(rx.recv().unwrap())
|
||||
}
|
||||
|
||||
@@ -269,14 +278,6 @@ mod test {
|
||||
//ToSocketAddrs::to_socket_addrs(addr.as_ref()).unwrap().next().unwrap()
|
||||
}
|
||||
|
||||
fn pair() -> (TcpStream, TcpListener) {
|
||||
let addr = format!("127.0.0.1:{}", port.fetch_add(1, Ordering::SeqCst));
|
||||
println!("what the fuck {}", &addr);
|
||||
// Do this one first so that we don't get connection refused :)
|
||||
let listener = TcpListener::bind(&*addr).unwrap();
|
||||
(TcpStream::connect(&*addr).unwrap(), listener)
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
|
||||
enum Request {
|
||||
Increment
|
||||
@@ -311,8 +312,8 @@ mod test {
|
||||
}
|
||||
|
||||
fn wtf<F, Request, Reply>(server: F) -> (SocketAddr, Shutdown)
|
||||
where Request: fmt::Debug + serde::de::Deserialize + fmt::Debug + serde::ser::Serialize,
|
||||
Reply: fmt::Debug + serde::ser::Serialize,
|
||||
where Request: 'static + fmt::Debug + Send + serde::de::Deserialize + fmt::Debug + serde::ser::Serialize,
|
||||
Reply: 'static + fmt::Debug + Send + serde::ser::Serialize,
|
||||
F: 'static + Clone + Serve<Request, Reply>
|
||||
{
|
||||
let mut addr;
|
||||
@@ -345,7 +346,6 @@ mod test {
|
||||
shutdown.shutdown();
|
||||
}
|
||||
|
||||
/*
|
||||
struct BarrierServer {
|
||||
barrier: Barrier,
|
||||
inner: Server,
|
||||
@@ -371,10 +371,9 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn test_concurrent() {
|
||||
let (client_stream, server_streams) = pair();
|
||||
let server = Arc::new(BarrierServer::new(10));
|
||||
let thread_server = server.clone();
|
||||
let guard = thread::spawn(move || serve(server_streams, thread_server));
|
||||
let (addr, shutdown) = wtf(server.clone());
|
||||
let client_stream = TcpStream::connect(&addr).unwrap();
|
||||
let client: Arc<Client<Reply>> = Arc::new(Client::new(client_stream).unwrap());
|
||||
let mut join_handles = vec![];
|
||||
for _ in 0..10 {
|
||||
@@ -389,8 +388,7 @@ mod test {
|
||||
Err(_) => panic!("couldn't unwrap arc"),
|
||||
Ok(c) => c,
|
||||
};
|
||||
client.join::<Request>().unwrap();
|
||||
guard.join();
|
||||
client.disconnect::<Request>().unwrap();
|
||||
shutdown.shutdown();
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user