This commit is contained in:
Artem Vorotnikov
2019-04-03 20:36:31 +03:00
parent b8b92ddb5f
commit 4569d26d81
2 changed files with 19 additions and 8 deletions

View File

@@ -65,14 +65,19 @@ struct Send<'a, Req, Resp> {
fut: MapOkDispatchResponse<SendMapErrConnectionReset<'a, Req, Resp>, Resp>, fut: MapOkDispatchResponse<SendMapErrConnectionReset<'a, Req, Resp>, Resp>,
} }
type SendMapErrConnectionReset<'a, Req, Resp> = type SendMapErrConnectionReset<'a, Req, Resp> = MapErrConnectionReset<
MapErrConnectionReset<futures::sink::Send<'a, mpsc::Sender<DispatchRequest<Req, Resp>>, DispatchRequest<Req, Resp>>>; futures::sink::Send<'a, mpsc::Sender<DispatchRequest<Req, Resp>>, DispatchRequest<Req, Resp>>,
>;
impl<'a, Req, Resp> Send<'a, Req, Resp> { impl<'a, Req, Resp> Send<'a, Req, Resp> {
unsafe_pinned!( unsafe_pinned!(
fut: MapOkDispatchResponse< fut: MapOkDispatchResponse<
MapErrConnectionReset< MapErrConnectionReset<
futures::sink::Send<'a, mpsc::Sender<DispatchRequest<Req, Resp>>, DispatchRequest<Req, Resp>>, futures::sink::Send<
'a,
mpsc::Sender<DispatchRequest<Req, Resp>>,
DispatchRequest<Req, Resp>,
>,
>, >,
Resp, Resp,
> >
@@ -874,7 +879,6 @@ mod tests {
let _ = send_request(&mut channel, "hi"); let _ = send_request(&mut channel, "hi");
// Drop the channel so polling returns none if no requests are currently ready. // Drop the channel so polling returns none if no requests are currently ready.
drop(channel); drop(channel);
// Test that a request future dropped before it's processed by dispatch will cause the request // Test that a request future dropped before it's processed by dispatch will cause the request
@@ -893,13 +897,14 @@ mod tests {
assert!(dispatch.as_mut().pump_write(waker).ready().is_some()); assert!(dispatch.as_mut().pump_write(waker).ready().is_some());
assert!(!dispatch.as_mut().in_flight_requests().is_empty()); assert!(!dispatch.as_mut().in_flight_requests().is_empty());
// Test that a request future dropped after it's processed by dispatch will cause the request // Test that a request future dropped after it's processed by dispatch will cause the request
// to be removed from the in-flight request map. // to be removed from the in-flight request map.
drop(req); drop(req);
if let Poll::Ready(Some(_)) = dispatch.as_mut().poll_next_cancellation(waker).unwrap() { if let Poll::Ready(Some(_)) = dispatch.as_mut().poll_next_cancellation(waker).unwrap() {
// ok // ok
} else { panic!("Expected request to be cancelled")}; } else {
panic!("Expected request to be cancelled")
};
assert!(dispatch.in_flight_requests().is_empty()); assert!(dispatch.in_flight_requests().is_empty());
} }
@@ -958,7 +963,8 @@ mod tests {
.send(context::current(), request.to_string()) .send(context::current(), request.to_string())
.boxed() .boxed()
.compat(), .compat(),
).unwrap() )
.unwrap()
} }
fn send_response( fn send_response(

View File

@@ -45,8 +45,13 @@ use futures::{
task::{Poll, Spawn, SpawnError, SpawnExt}, task::{Poll, Spawn, SpawnError, SpawnExt},
Future, Future,
}; };
use std::{cell::RefCell, io, sync::{Arc, Once}, time::SystemTime};
use parking_lot::Mutex; use parking_lot::Mutex;
use std::{
cell::RefCell,
io,
sync::{Arc, Once},
time::SystemTime,
};
/// A message from a client to a server. /// A message from a client to a server.
#[derive(Debug)] #[derive(Debug)]