Implement drop for tarpc::Client

This commit is contained in:
Adam Wright
2016-01-09 03:57:46 -08:00
parent a63c935a33
commit 2d9964293e
2 changed files with 21 additions and 12 deletions

View File

@@ -206,16 +206,20 @@ struct SyncedClientState {
stream: TcpStream,
}
pub struct Client<Reply> {
pub struct Client<Request, Reply>
where Request: serde::ser::Serialize
{
synced_state: Mutex<SyncedClientState>,
requests: Arc<Mutex<HashMap<u64, Sender<Reply>>>>,
reader_guard: thread::JoinHandle<()>,
reader_guard: Option<thread::JoinHandle<()>>,
_request: std::marker::PhantomData<Request>,
}
impl<Reply> Client<Reply>
where Reply: serde::de::Deserialize + Send + 'static
impl<Request, Reply> Client<Request, Reply>
where Reply: serde::de::Deserialize + Send + 'static,
Request: serde::ser::Serialize
{
pub fn new(stream: TcpStream) -> Result<Self> {
pub fn new(stream: TcpStream) -> io::Result<Self> {
let requests = Arc::new(Mutex::new(HashMap::new()));
let reader_stream = try!(stream.try_clone());
let reader_requests = requests.clone();
@@ -227,11 +231,12 @@ impl<Reply> Client<Reply>
stream: stream,
}),
requests: requests,
reader_guard: reader_guard,
reader_guard: Some(reader_guard),
_request: std::marker::PhantomData,
})
}
pub fn rpc<Request>(&self, request: &Request) -> Result<Reply>
pub fn rpc(&self, request: &Request) -> Result<Reply>
where Request: serde::ser::Serialize + Send + 'static
{
let (tx, rx) = channel();
@@ -246,15 +251,18 @@ impl<Reply> Client<Reply>
drop(state);
Ok(rx.recv().unwrap())
}
}
pub fn disconnect<Request: serde::Serialize>(self) -> Result<()> {
impl<Request, Reply> Drop for Client<Request, Reply>
where Request: serde::ser::Serialize
{
fn drop(&mut self) {
{
let mut state = self.synced_state.lock().unwrap();
let packet: Packet<Request> = Packet::Shutdown;
try!(serde_json::to_writer(&mut state.stream, &packet));
serde_json::to_writer(&mut state.stream, &packet);
}
self.reader_guard.join().unwrap();
Ok(())
self.reader_guard.take().unwrap().join().unwrap();
}
}

View File

@@ -64,7 +64,7 @@ macro_rules! rpc_service {
)*
}
pub struct Client(tarpc::Client<Reply>);
pub struct Client(tarpc::Client<Request, Reply>);
impl Client {
pub fn new<A>(addr: A) -> Result<Self>
@@ -146,6 +146,7 @@ mod test {
let foo = Foo{message: "Adam".into()};
let want = Foo{message: format!("Hello, {}", &foo.message)};
assert_eq!(want, client.hello(Foo{message: "Adam".into()}).unwrap());
drop(client);
shutdown.shutdown();
}
}