mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-01-06 19:45:25 +01:00
Port to std::task::Context
This commit is contained in:
@@ -15,7 +15,7 @@ use futures::{
|
||||
prelude::*,
|
||||
ready,
|
||||
stream::Fuse,
|
||||
task::Waker,
|
||||
task::Context,
|
||||
Poll,
|
||||
};
|
||||
use humantime::format_rfc3339;
|
||||
@@ -87,8 +87,8 @@ impl<'a, Req, Resp> Send<'a, Req, Resp> {
|
||||
impl<'a, Req, Resp> Future for Send<'a, Req, Resp> {
|
||||
type Output = io::Result<DispatchResponse<Resp>>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
|
||||
self.as_mut().fut().poll(waker)
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
self.as_mut().fut().poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -106,8 +106,8 @@ impl<'a, Req, Resp> Call<'a, Req, Resp> {
|
||||
impl<'a, Req, Resp> Future for Call<'a, Req, Resp> {
|
||||
type Output = io::Result<Resp>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
|
||||
self.as_mut().fut().poll(waker)
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
self.as_mut().fut().poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -182,8 +182,8 @@ impl<Resp> DispatchResponse<Resp> {
|
||||
impl<Resp> Future for DispatchResponse<Resp> {
|
||||
type Output = io::Result<Resp>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<Resp>> {
|
||||
let resp = ready!(self.response.poll_unpin(waker));
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<Resp>> {
|
||||
let resp = ready!(self.response.poll_unpin(cx));
|
||||
|
||||
self.complete = true;
|
||||
|
||||
@@ -323,8 +323,8 @@ where
|
||||
unsafe_pinned!(pending_requests: Fuse<mpsc::Receiver<DispatchRequest<Req, Resp>>>);
|
||||
unsafe_pinned!(transport: Fuse<C>);
|
||||
|
||||
fn pump_read(self: &mut Pin<&mut Self>, waker: &Waker) -> PollIo<()> {
|
||||
Poll::Ready(match ready!(self.as_mut().transport().poll_next(waker)?) {
|
||||
fn pump_read(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> {
|
||||
Poll::Ready(match ready!(self.as_mut().transport().poll_next(cx)?) {
|
||||
Some(response) => {
|
||||
self.complete(response);
|
||||
Some(Ok(()))
|
||||
@@ -336,13 +336,13 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
fn pump_write(self: &mut Pin<&mut Self>, waker: &Waker) -> PollIo<()> {
|
||||
fn pump_write(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> {
|
||||
enum ReceiverStatus {
|
||||
NotReady,
|
||||
Closed,
|
||||
}
|
||||
|
||||
let pending_requests_status = match self.poll_next_request(waker)? {
|
||||
let pending_requests_status = match self.poll_next_request(cx)? {
|
||||
Poll::Ready(Some(dispatch_request)) => {
|
||||
self.write_request(dispatch_request)?;
|
||||
return Poll::Ready(Some(Ok(())));
|
||||
@@ -351,7 +351,7 @@ where
|
||||
Poll::Pending => ReceiverStatus::NotReady,
|
||||
};
|
||||
|
||||
let canceled_requests_status = match self.poll_next_cancellation(waker)? {
|
||||
let canceled_requests_status = match self.poll_next_cancellation(cx)? {
|
||||
Poll::Ready(Some((context, request_id))) => {
|
||||
self.write_cancel(context, request_id)?;
|
||||
return Poll::Ready(Some(Ok(())));
|
||||
@@ -362,12 +362,12 @@ where
|
||||
|
||||
match (pending_requests_status, canceled_requests_status) {
|
||||
(ReceiverStatus::Closed, ReceiverStatus::Closed) => {
|
||||
ready!(self.as_mut().transport().poll_flush(waker)?);
|
||||
ready!(self.as_mut().transport().poll_flush(cx)?);
|
||||
Poll::Ready(None)
|
||||
}
|
||||
(ReceiverStatus::NotReady, _) | (_, ReceiverStatus::NotReady) => {
|
||||
// No more messages to process, so flush any messages buffered in the transport.
|
||||
ready!(self.as_mut().transport().poll_flush(waker)?);
|
||||
ready!(self.as_mut().transport().poll_flush(cx)?);
|
||||
|
||||
// Even if we fully-flush, we return Pending, because we have no more requests
|
||||
// or cancellations right now.
|
||||
@@ -379,7 +379,7 @@ where
|
||||
/// Yields the next pending request, if one is ready to be sent.
|
||||
fn poll_next_request(
|
||||
self: &mut Pin<&mut Self>,
|
||||
waker: &Waker,
|
||||
cx: &mut Context<'_>,
|
||||
) -> PollIo<DispatchRequest<Req, Resp>> {
|
||||
if self.as_mut().in_flight_requests().len() >= self.config.max_in_flight_requests {
|
||||
info!(
|
||||
@@ -393,13 +393,13 @@ where
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
while let Poll::Pending = self.as_mut().transport().poll_ready(waker)? {
|
||||
while let Poll::Pending = self.as_mut().transport().poll_ready(cx)? {
|
||||
// We can't yield a request-to-be-sent before the transport is capable of buffering it.
|
||||
ready!(self.as_mut().transport().poll_flush(waker)?);
|
||||
ready!(self.as_mut().transport().poll_flush(cx)?);
|
||||
}
|
||||
|
||||
loop {
|
||||
match ready!(self.as_mut().pending_requests().poll_next_unpin(waker)) {
|
||||
match ready!(self.as_mut().pending_requests().poll_next_unpin(cx)) {
|
||||
Some(request) => {
|
||||
if request.response_completion.is_canceled() {
|
||||
trace!(
|
||||
@@ -422,14 +422,14 @@ where
|
||||
/// Yields the next pending cancellation, and, if one is ready, cancels the associated request.
|
||||
fn poll_next_cancellation(
|
||||
self: &mut Pin<&mut Self>,
|
||||
waker: &Waker,
|
||||
cx: &mut Context<'_>,
|
||||
) -> PollIo<(context::Context, u64)> {
|
||||
while let Poll::Pending = self.as_mut().transport().poll_ready(waker)? {
|
||||
ready!(self.as_mut().transport().poll_flush(waker)?);
|
||||
while let Poll::Pending = self.as_mut().transport().poll_ready(cx)? {
|
||||
ready!(self.as_mut().transport().poll_flush(cx)?);
|
||||
}
|
||||
|
||||
loop {
|
||||
let cancellation = self.as_mut().canceled_requests().poll_next_unpin(waker);
|
||||
let cancellation = self.as_mut().canceled_requests().poll_next_unpin(cx);
|
||||
match ready!(cancellation) {
|
||||
Some(request_id) => {
|
||||
if let Some(in_flight_data) =
|
||||
@@ -537,10 +537,10 @@ where
|
||||
{
|
||||
type Output = io::Result<()>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<()>> {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
trace!("[{}] RequestDispatch::poll", self.as_mut().server_addr());
|
||||
loop {
|
||||
match (self.pump_read(waker)?, self.pump_write(waker)?) {
|
||||
match (self.pump_read(cx)?, self.pump_write(cx)?) {
|
||||
(read, write @ Poll::Ready(None)) => {
|
||||
if self.as_mut().in_flight_requests().is_empty() {
|
||||
info!(
|
||||
@@ -627,8 +627,8 @@ impl RequestCancellation {
|
||||
impl Stream for CanceledRequests {
|
||||
type Item = u64;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<u64>> {
|
||||
self.0.poll_next_unpin(waker)
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<u64>> {
|
||||
self.0.poll_next_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -659,8 +659,8 @@ where
|
||||
{
|
||||
type Output = io::Result<Fut::Ok>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
|
||||
match self.as_mut().future().try_poll(waker) {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
match self.as_mut().future().try_poll(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(result) => {
|
||||
self.finished().take().expect(
|
||||
@@ -699,8 +699,8 @@ where
|
||||
{
|
||||
type Output = Result<DispatchResponse<Resp>, Fut::Error>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
|
||||
match self.as_mut().future().try_poll(waker) {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
match self.as_mut().future().try_poll(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(result) => {
|
||||
let response = self
|
||||
@@ -742,8 +742,8 @@ where
|
||||
{
|
||||
type Output = Result<Fut2::Ok, Fut2::Error>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
|
||||
self.try_chain().poll(waker, |result| match result {
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
self.try_chain().poll(cx, |result| match result {
|
||||
Ok(ok) => TryChainAction::Future(ok),
|
||||
Err(err) => TryChainAction::Output(Err(err)),
|
||||
})
|
||||
@@ -775,7 +775,11 @@ where
|
||||
TryChain::First(fut1)
|
||||
}
|
||||
|
||||
fn poll<F>(self: Pin<&mut Self>, waker: &Waker, f: F) -> Poll<Result<Fut2::Ok, Fut2::Error>>
|
||||
fn poll<F>(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
f: F,
|
||||
) -> Poll<Result<Fut2::Ok, Fut2::Error>>
|
||||
where
|
||||
F: FnOnce(Result<Fut1::Ok, Fut1::Error>) -> TryChainAction<Fut2>,
|
||||
{
|
||||
@@ -788,14 +792,14 @@ where
|
||||
let output = match this {
|
||||
TryChain::First(fut1) => {
|
||||
// Poll the first future
|
||||
match unsafe { Pin::new_unchecked(fut1) }.try_poll(waker) {
|
||||
match unsafe { Pin::new_unchecked(fut1) }.try_poll(cx) {
|
||||
Poll::Pending => return Poll::Pending,
|
||||
Poll::Ready(output) => output,
|
||||
}
|
||||
}
|
||||
TryChain::Second(fut2) => {
|
||||
// Poll the second future
|
||||
return unsafe { Pin::new_unchecked(fut2) }.try_poll(waker);
|
||||
return unsafe { Pin::new_unchecked(fut2) }.try_poll(cx);
|
||||
}
|
||||
TryChain::Empty => {
|
||||
panic!("future must not be polled after it returned `Poll::Ready`");
|
||||
@@ -824,7 +828,7 @@ mod tests {
|
||||
ClientMessage, Response,
|
||||
};
|
||||
use fnv::FnvHashMap;
|
||||
use futures::{channel::mpsc, prelude::*, Poll};
|
||||
use futures::{channel::mpsc, prelude::*, task::Context, Poll};
|
||||
use futures_test::task::noop_waker_ref;
|
||||
use std::{
|
||||
marker,
|
||||
@@ -838,11 +842,11 @@ mod tests {
|
||||
fn stage_request() {
|
||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||
let mut dispatch = Pin::new(&mut dispatch);
|
||||
let waker = &noop_waker_ref();
|
||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||
|
||||
let _resp = send_request(&mut channel, "hi");
|
||||
|
||||
let req = dispatch.poll_next_request(waker).ready();
|
||||
let req = dispatch.poll_next_request(cx).ready();
|
||||
assert!(req.is_some());
|
||||
|
||||
let req = req.unwrap();
|
||||
@@ -855,12 +859,12 @@ mod tests {
|
||||
fn stage_request_channel_dropped_doesnt_panic() {
|
||||
let (mut dispatch, mut channel, mut server_channel) = set_up();
|
||||
let mut dispatch = Pin::new(&mut dispatch);
|
||||
let waker = &noop_waker_ref();
|
||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||
|
||||
let _ = send_request(&mut channel, "hi");
|
||||
drop(channel);
|
||||
|
||||
assert!(dispatch.as_mut().poll(waker).is_ready());
|
||||
assert!(dispatch.as_mut().poll(cx).is_ready());
|
||||
send_response(
|
||||
&mut server_channel,
|
||||
Response {
|
||||
@@ -875,7 +879,7 @@ mod tests {
|
||||
fn stage_request_response_future_dropped_is_canceled_before_sending() {
|
||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||
let mut dispatch = Pin::new(&mut dispatch);
|
||||
let waker = &noop_waker_ref();
|
||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||
|
||||
let _ = send_request(&mut channel, "hi");
|
||||
|
||||
@@ -883,24 +887,24 @@ mod tests {
|
||||
drop(channel);
|
||||
// Test that a request future dropped before it's processed by dispatch will cause the request
|
||||
// to not be added to the in-flight request map.
|
||||
assert!(dispatch.poll_next_request(waker).ready().is_none());
|
||||
assert!(dispatch.poll_next_request(cx).ready().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stage_request_response_future_dropped_is_canceled_after_sending() {
|
||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||
let waker = &noop_waker_ref();
|
||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||
let mut dispatch = Pin::new(&mut dispatch);
|
||||
|
||||
let req = send_request(&mut channel, "hi");
|
||||
|
||||
assert!(dispatch.as_mut().pump_write(waker).ready().is_some());
|
||||
assert!(dispatch.as_mut().pump_write(cx).ready().is_some());
|
||||
assert!(!dispatch.as_mut().in_flight_requests().is_empty());
|
||||
|
||||
// Test that a request future dropped after it's processed by dispatch will cause the request
|
||||
// to be removed from the in-flight request map.
|
||||
drop(req);
|
||||
if let Poll::Ready(Some(_)) = dispatch.as_mut().poll_next_cancellation(waker).unwrap() {
|
||||
if let Poll::Ready(Some(_)) = dispatch.as_mut().poll_next_cancellation(cx).unwrap() {
|
||||
// ok
|
||||
} else {
|
||||
panic!("Expected request to be cancelled")
|
||||
@@ -912,7 +916,7 @@ mod tests {
|
||||
fn stage_request_response_closed_skipped() {
|
||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||
let mut dispatch = Pin::new(&mut dispatch);
|
||||
let waker = &noop_waker_ref();
|
||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||
|
||||
// Test that a request future that's closed its receiver but not yet canceled its request --
|
||||
// i.e. still in `drop fn` -- will cause the request to not be added to the in-flight request
|
||||
@@ -920,7 +924,7 @@ mod tests {
|
||||
let mut resp = send_request(&mut channel, "hi");
|
||||
resp.response.get_mut().close();
|
||||
|
||||
assert!(dispatch.poll_next_request(waker).is_pending());
|
||||
assert!(dispatch.poll_next_request(cx).is_pending());
|
||||
}
|
||||
|
||||
fn set_up() -> (
|
||||
|
||||
@@ -15,7 +15,7 @@ use futures::{
|
||||
prelude::*,
|
||||
ready,
|
||||
stream::Fuse,
|
||||
task::{Poll, Waker},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use pin_utils::unsafe_pinned;
|
||||
@@ -197,7 +197,10 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_listener<C>(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<NewConnection<Req, Resp, C>>
|
||||
fn poll_listener<C>(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> PollIo<NewConnection<Req, Resp, C>>
|
||||
where
|
||||
S: Stream<Item = Result<C, io::Error>>,
|
||||
C: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
|
||||
@@ -208,7 +211,10 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_closed_connections(self: &mut Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
|
||||
fn poll_closed_connections(
|
||||
self: &mut Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
match ready!(self.as_mut().closed_connections_rx().poll_next_unpin(cx)) {
|
||||
Some(addr) => {
|
||||
self.handle_closed_connection(&addr);
|
||||
@@ -226,7 +232,7 @@ where
|
||||
{
|
||||
type Item = io::Result<Channel<Req, Resp, T>>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<Channel<Req, Resp, T>> {
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<Channel<Req, Resp, T>> {
|
||||
loop {
|
||||
match (
|
||||
self.as_mut().poll_listener(cx)?,
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
//! Provides a server that concurrently handles many connections sending multiplexed requests.
|
||||
|
||||
use crate::{
|
||||
context::Context, util::deadline_compat, util::AsDuration, util::Compact, ClientMessage,
|
||||
context, util::deadline_compat, util::AsDuration, util::Compact, ClientMessage,
|
||||
ClientMessageKind, PollIo, Request, Response, ServerError, Transport,
|
||||
};
|
||||
use fnv::FnvHashMap;
|
||||
@@ -17,7 +17,7 @@ use futures::{
|
||||
prelude::*,
|
||||
ready,
|
||||
stream::Fuse,
|
||||
task::{Poll, Waker},
|
||||
task::{Context, Poll},
|
||||
try_ready,
|
||||
};
|
||||
use humantime::format_rfc3339;
|
||||
@@ -128,12 +128,12 @@ where
|
||||
Req: Send + 'static,
|
||||
Resp: Send + 'static,
|
||||
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send + 'static,
|
||||
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone,
|
||||
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
{
|
||||
type Output = ();
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<()> {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
|
||||
while let Some(channel) = ready!(self.as_mut().incoming().poll_next(cx)) {
|
||||
match channel {
|
||||
Ok(channel) => {
|
||||
@@ -165,7 +165,7 @@ where
|
||||
/// Responds to all requests with `request_handler`.
|
||||
fn respond_with<F, Fut>(self, request_handler: F) -> Running<Self, F>
|
||||
where
|
||||
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone,
|
||||
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
{
|
||||
Running {
|
||||
@@ -234,15 +234,24 @@ where
|
||||
self.as_mut().transport().start_send(response)
|
||||
}
|
||||
|
||||
pub(crate) fn poll_ready(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
|
||||
pub(crate) fn poll_ready(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
self.as_mut().transport().poll_ready(cx)
|
||||
}
|
||||
|
||||
pub(crate) fn poll_flush(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
|
||||
pub(crate) fn poll_flush(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
self.as_mut().transport().poll_flush(cx)
|
||||
}
|
||||
|
||||
pub(crate) fn poll_next(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<ClientMessage<Req>> {
|
||||
pub(crate) fn poll_next(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> PollIo<ClientMessage<Req>> {
|
||||
self.as_mut().transport().poll_next(cx)
|
||||
}
|
||||
|
||||
@@ -255,7 +264,7 @@ where
|
||||
/// responses and resolves when the connection is closed.
|
||||
pub fn respond_with<F, Fut>(self, f: F) -> impl Future<Output = ()>
|
||||
where
|
||||
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone,
|
||||
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
Req: 'static,
|
||||
Resp: 'static,
|
||||
@@ -281,9 +290,9 @@ where
|
||||
struct ClientHandler<Req, Resp, T, F> {
|
||||
channel: Channel<Req, Resp, T>,
|
||||
/// Responses waiting to be written to the wire.
|
||||
pending_responses: Fuse<mpsc::Receiver<(Context, Response<Resp>)>>,
|
||||
pending_responses: Fuse<mpsc::Receiver<(context::Context, Response<Resp>)>>,
|
||||
/// Handed out to request handlers to fan in responses.
|
||||
responses_tx: mpsc::Sender<(Context, Response<Resp>)>,
|
||||
responses_tx: mpsc::Sender<(context::Context, Response<Resp>)>,
|
||||
/// Number of requests currently being responded to.
|
||||
in_flight_requests: FnvHashMap<u64, AbortHandle>,
|
||||
/// Request handler.
|
||||
@@ -293,8 +302,8 @@ struct ClientHandler<Req, Resp, T, F> {
|
||||
impl<Req, Resp, T, F> ClientHandler<Req, Resp, T, F> {
|
||||
unsafe_pinned!(channel: Channel<Req, Resp, T>);
|
||||
unsafe_pinned!(in_flight_requests: FnvHashMap<u64, AbortHandle>);
|
||||
unsafe_pinned!(pending_responses: Fuse<mpsc::Receiver<(Context, Response<Resp>)>>);
|
||||
unsafe_pinned!(responses_tx: mpsc::Sender<(Context, Response<Resp>)>);
|
||||
unsafe_pinned!(pending_responses: Fuse<mpsc::Receiver<(context::Context, Response<Resp>)>>);
|
||||
unsafe_pinned!(responses_tx: mpsc::Sender<(context::Context, Response<Resp>)>);
|
||||
// For this to be safe, field f must be private, and code in this module must never
|
||||
// construct PinMut<F>.
|
||||
unsafe_unpinned!(f: F);
|
||||
@@ -305,12 +314,15 @@ where
|
||||
Req: Send + 'static,
|
||||
Resp: Send + 'static,
|
||||
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
|
||||
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone,
|
||||
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
{
|
||||
/// If at max in-flight requests, check that there's room to immediately write a throttled
|
||||
/// response.
|
||||
fn poll_ready_if_throttling(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
|
||||
fn poll_ready_if_throttling(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
if self.in_flight_requests.len()
|
||||
>= self.channel.config.max_in_flight_requests_per_connection
|
||||
{
|
||||
@@ -328,7 +340,7 @@ where
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn pump_read(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<()> {
|
||||
fn pump_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> {
|
||||
ready!(self.as_mut().poll_ready_if_throttling(cx)?);
|
||||
|
||||
Poll::Ready(match ready!(self.as_mut().channel().poll_next(cx)?) {
|
||||
@@ -350,7 +362,11 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
fn pump_write(mut self: Pin<&mut Self>, cx: &Waker, read_half_closed: bool) -> PollIo<()> {
|
||||
fn pump_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
read_half_closed: bool,
|
||||
) -> PollIo<()> {
|
||||
match self.as_mut().poll_next_response(cx)? {
|
||||
Poll::Ready(Some((_, response))) => {
|
||||
self.as_mut().channel().start_send(response)?;
|
||||
@@ -379,8 +395,8 @@ where
|
||||
|
||||
fn poll_next_response(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &Waker,
|
||||
) -> PollIo<(Context, Response<Resp>)> {
|
||||
cx: &mut Context<'_>,
|
||||
) -> PollIo<(context::Context, Response<Resp>)> {
|
||||
// Ensure there's room to write a response.
|
||||
while let Poll::Pending = self.as_mut().channel().poll_ready(cx)? {
|
||||
ready!(self.as_mut().channel().poll_flush(cx)?);
|
||||
@@ -421,7 +437,7 @@ where
|
||||
) -> io::Result<()> {
|
||||
let request_id = request.id;
|
||||
let peer = self.as_mut().channel().client_addr;
|
||||
let ctx = Context {
|
||||
let ctx = context::Context {
|
||||
deadline: request.deadline,
|
||||
trace_context,
|
||||
};
|
||||
@@ -527,12 +543,12 @@ where
|
||||
Req: Send + 'static,
|
||||
Resp: Send + 'static,
|
||||
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
|
||||
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone,
|
||||
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
{
|
||||
type Output = io::Result<()>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
trace!("[{}] ClientHandler::poll", self.channel.client_addr);
|
||||
loop {
|
||||
let read = self.as_mut().pump_read(cx)?;
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
//! Transports backed by in-memory channels.
|
||||
|
||||
use crate::{PollIo, Transport};
|
||||
use futures::{channel::mpsc, task::Waker, Poll, Sink, Stream};
|
||||
use futures::{channel::mpsc, task::Context, Poll, Sink, Stream};
|
||||
use pin_utils::unsafe_pinned;
|
||||
use std::pin::Pin;
|
||||
use std::{
|
||||
@@ -45,7 +45,7 @@ impl<Item, SinkItem> UnboundedChannel<Item, SinkItem> {
|
||||
impl<Item, SinkItem> Stream for UnboundedChannel<Item, SinkItem> {
|
||||
type Item = Result<Item, io::Error>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &Waker) -> PollIo<Item> {
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<Item> {
|
||||
self.rx().poll_next(cx).map(|option| option.map(Ok))
|
||||
}
|
||||
}
|
||||
@@ -53,7 +53,7 @@ impl<Item, SinkItem> Stream for UnboundedChannel<Item, SinkItem> {
|
||||
impl<Item, SinkItem> Sink<SinkItem> for UnboundedChannel<Item, SinkItem> {
|
||||
type SinkError = io::Error;
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
self.tx()
|
||||
.poll_ready(cx)
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
|
||||
@@ -65,13 +65,13 @@ impl<Item, SinkItem> Sink<SinkItem> for UnboundedChannel<Item, SinkItem> {
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &Waker) -> Poll<Result<(), Self::SinkError>> {
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::SinkError>> {
|
||||
self.tx()
|
||||
.poll_flush(cx)
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
self.tx()
|
||||
.poll_close(cx)
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
|
||||
|
||||
@@ -15,7 +15,7 @@ use std::{
|
||||
marker::PhantomData,
|
||||
net::SocketAddr,
|
||||
pin::Pin,
|
||||
task::{Poll, Waker},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
pub mod channel;
|
||||
@@ -74,8 +74,8 @@ where
|
||||
{
|
||||
type Item = S::Item;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<S::Item>> {
|
||||
self.inner().poll_next(waker)
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
|
||||
self.inner().poll_next(cx)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,16 +89,16 @@ where
|
||||
self.inner().start_send(item)
|
||||
}
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
|
||||
self.inner().poll_ready(waker)
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::SinkError>> {
|
||||
self.inner().poll_ready(cx)
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
|
||||
self.inner().poll_flush(waker)
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::SinkError>> {
|
||||
self.inner().poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
|
||||
self.inner().poll_close(waker)
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::SinkError>> {
|
||||
self.inner().poll_close(cx)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,10 +5,10 @@
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
use futures::{
|
||||
compat::{Compat01As03, Future01CompatExt},
|
||||
compat::*,
|
||||
prelude::*,
|
||||
ready,
|
||||
task::{Poll, Waker},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use pin_utils::unsafe_pinned;
|
||||
use std::pin::Pin;
|
||||
@@ -50,15 +50,15 @@ where
|
||||
{
|
||||
type Output = Result<T::Ok, timeout::Error<T::Error>>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
// First, try polling the future
|
||||
match self.as_mut().future().try_poll(waker) {
|
||||
match self.as_mut().future().try_poll(cx) {
|
||||
Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)),
|
||||
Poll::Pending => {}
|
||||
Poll::Ready(Err(e)) => return Poll::Ready(Err(timeout::Error::inner(e))),
|
||||
}
|
||||
|
||||
let delay = self.delay().poll_unpin(waker);
|
||||
let delay = self.delay().poll_unpin(cx);
|
||||
|
||||
// Now check the timer
|
||||
match ready!(delay) {
|
||||
|
||||
Reference in New Issue
Block a user