mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-02-23 15:49:54 +01:00
Merge branch 'code-refactor' into 'master'
Code refactor Factor out HashMap in reader into its own struct. Precipitated by the clippy warning about complex types, but definitely makes the code more readable, as well. This should be merged after #8 See merge request !9
This commit is contained in:
@@ -312,8 +312,54 @@ struct Packet<T> {
|
|||||||
message: T,
|
message: T,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct RpcFutures<Reply>(Result<HashMap<u64, Sender<Reply>>>);
|
||||||
|
|
||||||
|
impl<Reply> RpcFutures<Reply> {
|
||||||
|
fn new() -> RpcFutures<Reply> {
|
||||||
|
RpcFutures(Ok(HashMap::new()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert_tx(&mut self, id: u64, tx: Sender<Reply>) -> Result<()> {
|
||||||
|
match self.0 {
|
||||||
|
Ok(ref mut requests) => {
|
||||||
|
requests.insert(id, tx);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(ref e) => Err(e.clone()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_tx(&mut self, id: u64) -> Result<()> {
|
||||||
|
match self.0 {
|
||||||
|
Ok(ref mut requests) => {
|
||||||
|
requests.remove(&id);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(ref e) => Err(e.clone()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn complete_reply(&mut self, id: u64, reply: Reply) {
|
||||||
|
self.0
|
||||||
|
.as_mut()
|
||||||
|
.unwrap()
|
||||||
|
.remove(&id)
|
||||||
|
.unwrap()
|
||||||
|
.send(reply)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_error(&mut self, err: bincode::serde::DeserializeError) {
|
||||||
|
let _ = mem::replace(&mut self.0, Err(err.into()));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_error(&self) -> Error {
|
||||||
|
self.0.as_ref().err().unwrap().clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct Reader<Reply> {
|
struct Reader<Reply> {
|
||||||
requests: Arc<Mutex<Result<HashMap<u64, Sender<Reply>>>>>,
|
requests: Arc<Mutex<RpcFutures<Reply>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Reply> Reader<Reply> {
|
impl<Reply> Reader<Reply> {
|
||||||
@@ -329,18 +375,13 @@ impl<Reply> Reader<Reply> {
|
|||||||
message: reply
|
message: reply
|
||||||
}) => {
|
}) => {
|
||||||
debug!("Client: received message, id={}", id);
|
debug!("Client: received message, id={}", id);
|
||||||
let mut requests = self.requests.lock().unwrap();
|
self.requests.lock().unwrap().complete_reply(id, reply);
|
||||||
let mut requests = requests.as_mut().unwrap();
|
|
||||||
let reply_tx = requests.remove(&id).unwrap();
|
|
||||||
reply_tx.send(reply).unwrap();
|
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!("Client: reader thread encountered an unexpected error while parsing; \
|
warn!("Client: reader thread encountered an unexpected error while parsing; \
|
||||||
returning now. Error: {:?}",
|
returning now. Error: {:?}",
|
||||||
err);
|
err);
|
||||||
let mut guard = self.requests.lock().unwrap();
|
self.requests.lock().unwrap().set_error(err);
|
||||||
let map = mem::replace(&mut *guard, Err(err.into()));
|
|
||||||
map.unwrap().clear();
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -364,7 +405,7 @@ pub struct Client<Request, Reply>
|
|||||||
where Request: serde::ser::Serialize
|
where Request: serde::ser::Serialize
|
||||||
{
|
{
|
||||||
synced_state: Mutex<SyncedClientState>,
|
synced_state: Mutex<SyncedClientState>,
|
||||||
requests: Arc<Mutex<Result<HashMap<u64, Sender<Reply>>>>>,
|
requests: Arc<Mutex<RpcFutures<Reply>>>,
|
||||||
reader_guard: Option<thread::JoinHandle<()>>,
|
reader_guard: Option<thread::JoinHandle<()>>,
|
||||||
timeout: Option<Duration>,
|
timeout: Option<Duration>,
|
||||||
_request: PhantomData<Request>,
|
_request: PhantomData<Request>,
|
||||||
@@ -378,7 +419,7 @@ impl<Request, Reply> Client<Request, Reply>
|
|||||||
/// for both reads and writes.
|
/// for both reads and writes.
|
||||||
pub fn new<A: ToSocketAddrs>(addr: A, timeout: Option<Duration>) -> io::Result<Self> {
|
pub fn new<A: ToSocketAddrs>(addr: A, timeout: Option<Duration>) -> io::Result<Self> {
|
||||||
let stream = try!(TcpStream::connect(addr));
|
let stream = try!(TcpStream::connect(addr));
|
||||||
let requests = Arc::new(Mutex::new(Ok(HashMap::new())));
|
let requests = Arc::new(Mutex::new(RpcFutures::new()));
|
||||||
let reader_stream = try!(stream.try_clone());
|
let reader_stream = try!(stream.try_clone());
|
||||||
let reader = Reader { requests: requests.clone() };
|
let reader = Reader { requests: requests.clone() };
|
||||||
let reader_guard = thread::spawn(move || reader.read(reader_stream));
|
let reader_guard = thread::spawn(move || reader.read(reader_stream));
|
||||||
@@ -401,14 +442,7 @@ impl<Request, Reply> Client<Request, Reply>
|
|||||||
let (tx, rx) = channel();
|
let (tx, rx) = channel();
|
||||||
let mut state = self.synced_state.lock().unwrap();
|
let mut state = self.synced_state.lock().unwrap();
|
||||||
let id = increment(&mut state.next_id);
|
let id = increment(&mut state.next_id);
|
||||||
{
|
try!(self.requests.lock().unwrap().insert_tx(id, tx));
|
||||||
match *self.requests.lock().unwrap() {
|
|
||||||
Ok(ref mut requests) => {
|
|
||||||
requests.insert(id, tx);
|
|
||||||
}
|
|
||||||
Err(ref e) => return Err(e.clone()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let packet = Packet {
|
let packet = Packet {
|
||||||
rpc_id: id,
|
rpc_id: id,
|
||||||
message: request,
|
message: request,
|
||||||
@@ -422,21 +456,12 @@ impl<Request, Reply> Client<Request, Reply>
|
|||||||
warn!("Client: failed to write packet.\nPacket: {:?}\nError: {:?}",
|
warn!("Client: failed to write packet.\nPacket: {:?}\nError: {:?}",
|
||||||
packet,
|
packet,
|
||||||
err);
|
err);
|
||||||
match *self.requests.lock().unwrap() {
|
try!(self.requests.lock().unwrap().remove_tx(id));
|
||||||
Ok(ref mut requests) => {
|
|
||||||
requests.remove(&id);
|
|
||||||
return Err(err.into());
|
|
||||||
}
|
|
||||||
Err(ref e) => return Err(e.clone()),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
drop(state);
|
drop(state);
|
||||||
match rx.recv() {
|
match rx.recv() {
|
||||||
Ok(msg) => Ok(msg),
|
Ok(msg) => Ok(msg),
|
||||||
Err(_) => {
|
Err(_) => Err(self.requests.lock().unwrap().get_error()),
|
||||||
let guard = self.requests.lock().unwrap();
|
|
||||||
Err(guard.as_ref().err().unwrap().clone())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user