mirror of
https://github.com/OMGeeky/tarpc.git
synced 2025-12-30 16:18:56 +01:00
Factor out HashMap in reader into its own struct.
Precipitated by the clippy warning about complex types, but definitely makes the code more readable, as well.
This commit is contained in:
@@ -304,8 +304,55 @@ struct Packet<T> {
|
||||
message: T,
|
||||
}
|
||||
|
||||
struct RpcFutures<Reply>(Result<HashMap<u64, Sender<Reply>>>);
|
||||
|
||||
impl<Reply> RpcFutures<Reply> {
|
||||
fn new() -> RpcFutures<Reply> {
|
||||
RpcFutures(Ok(HashMap::new()))
|
||||
}
|
||||
|
||||
fn insert_tx(&mut self, id: u64, tx: Sender<Reply>) -> Result<()> {
|
||||
match self.0 {
|
||||
Ok(ref mut requests) => {
|
||||
requests.insert(id, tx);
|
||||
Ok(())
|
||||
}
|
||||
Err(ref e) => Err(e.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_tx(&mut self, id: u64) -> Result<()> {
|
||||
match self.0 {
|
||||
Ok(ref mut requests) => {
|
||||
requests.remove(&id);
|
||||
Ok(())
|
||||
}
|
||||
Err(ref e) => Err(e.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
fn complete_reply(&mut self, id: u64, reply: Reply) {
|
||||
self.0
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.remove(&id)
|
||||
.unwrap()
|
||||
.send(reply)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn set_error(&mut self, err: bincode::serde::DeserializeError) {
|
||||
let map = mem::replace(&mut self.0, Err(err.into()));
|
||||
map.unwrap().clear();
|
||||
}
|
||||
|
||||
fn get_error(&self) -> Error {
|
||||
self.0.as_ref().err().unwrap().clone()
|
||||
}
|
||||
}
|
||||
|
||||
struct Reader<Reply> {
|
||||
requests: Arc<Mutex<Result<HashMap<u64, Sender<Reply>>>>>,
|
||||
requests: Arc<Mutex<RpcFutures<Reply>>>,
|
||||
}
|
||||
|
||||
impl<Reply> Reader<Reply> {
|
||||
@@ -321,18 +368,13 @@ impl<Reply> Reader<Reply> {
|
||||
message: reply
|
||||
}) => {
|
||||
debug!("Client: received message, id={}", id);
|
||||
let mut requests = self.requests.lock().unwrap();
|
||||
let mut requests = requests.as_mut().unwrap();
|
||||
let reply_tx = requests.remove(&id).unwrap();
|
||||
reply_tx.send(reply).unwrap();
|
||||
self.requests.lock().unwrap().complete_reply(id, reply);
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("Client: reader thread encountered an unexpected error while parsing; \
|
||||
returning now. Error: {:?}",
|
||||
err);
|
||||
let mut guard = self.requests.lock().unwrap();
|
||||
let map = mem::replace(&mut *guard, Err(err.into()));
|
||||
map.unwrap().clear();
|
||||
self.requests.lock().unwrap().set_error(err);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -356,7 +398,7 @@ pub struct Client<Request, Reply>
|
||||
where Request: serde::ser::Serialize
|
||||
{
|
||||
synced_state: Mutex<SyncedClientState>,
|
||||
requests: Arc<Mutex<Result<HashMap<u64, Sender<Reply>>>>>,
|
||||
requests: Arc<Mutex<RpcFutures<Reply>>>,
|
||||
reader_guard: Option<thread::JoinHandle<()>>,
|
||||
timeout: Option<Duration>,
|
||||
_request: PhantomData<Request>,
|
||||
@@ -370,7 +412,7 @@ impl<Request, Reply> Client<Request, Reply>
|
||||
/// for both reads and writes.
|
||||
pub fn new<A: ToSocketAddrs>(addr: A, timeout: Option<Duration>) -> io::Result<Self> {
|
||||
let stream = try!(TcpStream::connect(addr));
|
||||
let requests = Arc::new(Mutex::new(Ok(HashMap::new())));
|
||||
let requests = Arc::new(Mutex::new(RpcFutures::new()));
|
||||
let reader_stream = try!(stream.try_clone());
|
||||
let reader = Reader { requests: requests.clone() };
|
||||
let reader_guard = thread::spawn(move || reader.read(reader_stream));
|
||||
@@ -393,12 +435,9 @@ impl<Request, Reply> Client<Request, Reply>
|
||||
let (tx, rx) = channel();
|
||||
let mut state = self.synced_state.lock().unwrap();
|
||||
let id = increment(&mut state.next_id);
|
||||
{
|
||||
match *self.requests.lock().unwrap() {
|
||||
Ok(ref mut requests) => {
|
||||
requests.insert(id, tx);
|
||||
}
|
||||
Err(ref e) => return Err(e.clone()),
|
||||
{ // block required to drop lock asap
|
||||
if let Err(e) = self.requests.lock().unwrap().insert_tx(id, tx) {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
let packet = Packet {
|
||||
@@ -414,21 +453,14 @@ impl<Request, Reply> Client<Request, Reply>
|
||||
warn!("Client: failed to write packet.\nPacket: {:?}\nError: {:?}",
|
||||
packet,
|
||||
err);
|
||||
match *self.requests.lock().unwrap() {
|
||||
Ok(ref mut requests) => {
|
||||
requests.remove(&id);
|
||||
return Err(err.into());
|
||||
}
|
||||
Err(ref e) => return Err(e.clone()),
|
||||
if let Err(e) = self.requests.lock().unwrap().remove_tx(id) {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
drop(state);
|
||||
match rx.recv() {
|
||||
Ok(msg) => Ok(msg),
|
||||
Err(_) => {
|
||||
let guard = self.requests.lock().unwrap();
|
||||
Err(guard.as_ref().err().unwrap().clone())
|
||||
}
|
||||
Err(_) => Err(self.requests.lock().unwrap().get_error()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user