mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-02-23 15:49:54 +01:00
Cargo fmt
This commit is contained in:
@@ -425,7 +425,9 @@ where
|
||||
loop {
|
||||
match ready!(self.as_mut().canceled_requests().poll_next_unpin(waker)) {
|
||||
Some(request_id) => {
|
||||
if let Some(in_flight_data) = self.as_mut().in_flight_requests().remove(&request_id) {
|
||||
if let Some(in_flight_data) =
|
||||
self.as_mut().in_flight_requests().remove(&request_id)
|
||||
{
|
||||
self.as_mut().in_flight_requests().compact(0.1);
|
||||
|
||||
debug!(
|
||||
@@ -438,7 +440,10 @@ where
|
||||
}
|
||||
}
|
||||
None => {
|
||||
trace!("[{}] canceled_requests closed.", self.as_mut().server_addr());
|
||||
trace!(
|
||||
"[{}] canceled_requests closed.",
|
||||
self.as_mut().server_addr()
|
||||
);
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
}
|
||||
@@ -480,13 +485,21 @@ where
|
||||
message: ClientMessageKind::Cancel { request_id },
|
||||
};
|
||||
self.as_mut().transport().start_send(cancel)?;
|
||||
trace!("[{}/{}] Cancel message sent.", trace_id, self.as_mut().server_addr());
|
||||
trace!(
|
||||
"[{}/{}] Cancel message sent.",
|
||||
trace_id,
|
||||
self.as_mut().server_addr()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Sends a server response to the client task that initiated the associated request.
|
||||
fn complete(self: &mut Pin<&mut Self>, response: Response<Resp>) -> bool {
|
||||
if let Some(in_flight_data) = self.as_mut().in_flight_requests().remove(&response.request_id) {
|
||||
if let Some(in_flight_data) = self
|
||||
.as_mut()
|
||||
.in_flight_requests()
|
||||
.remove(&response.request_id)
|
||||
{
|
||||
self.as_mut().in_flight_requests().compact(0.1);
|
||||
|
||||
trace!(
|
||||
|
||||
@@ -231,12 +231,18 @@ where
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &LocalWaker) -> PollIo<Channel<Req, Resp, T>> {
|
||||
loop {
|
||||
match (self.as_mut().poll_listener(cx)?, self.poll_closed_connections(cx)?) {
|
||||
match (
|
||||
self.as_mut().poll_listener(cx)?,
|
||||
self.poll_closed_connections(cx)?,
|
||||
) {
|
||||
(Poll::Ready(Some(NewConnection::Accepted(channel))), _) => {
|
||||
return Poll::Ready(Some(Ok(channel)))
|
||||
return Poll::Ready(Some(Ok(channel)));
|
||||
}
|
||||
(Poll::Ready(Some(NewConnection::Filtered)), _) | (_, Poll::Ready(())) => {
|
||||
trace!("Filtered a connection; {} open.", self.as_mut().open_connections());
|
||||
trace!(
|
||||
"Filtered a connection; {} open.",
|
||||
self.as_mut().open_connections()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
(Poll::Pending, Poll::Pending) => return Poll::Pending,
|
||||
|
||||
@@ -230,10 +230,7 @@ where
|
||||
Req: Send,
|
||||
Resp: Send,
|
||||
{
|
||||
pub(crate) fn start_send(
|
||||
mut self: Pin<&mut Self>,
|
||||
response: Response<Resp>,
|
||||
) -> io::Result<()> {
|
||||
pub(crate) fn start_send(mut self: Pin<&mut Self>, response: Response<Resp>) -> io::Result<()> {
|
||||
self.as_mut().transport().start_send(response)
|
||||
}
|
||||
|
||||
@@ -316,10 +313,7 @@ where
|
||||
{
|
||||
/// If at max in-flight requests, check that there's room to immediately write a throttled
|
||||
/// response.
|
||||
fn poll_ready_if_throttling(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
) -> Poll<io::Result<()>> {
|
||||
fn poll_ready_if_throttling(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
if self.in_flight_requests.len()
|
||||
>= self.channel.config.max_in_flight_requests_per_connection
|
||||
{
|
||||
@@ -359,11 +353,7 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
fn pump_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
read_half_closed: bool,
|
||||
) -> PollIo<()> {
|
||||
fn pump_write(mut self: Pin<&mut Self>, cx: &LocalWaker, read_half_closed: bool) -> PollIo<()> {
|
||||
match self.as_mut().poll_next_response(cx)? {
|
||||
Poll::Ready(Some((_, response))) => {
|
||||
self.as_mut().channel().start_send(response)?;
|
||||
@@ -441,14 +431,21 @@ where
|
||||
let request = request.message;
|
||||
|
||||
if self.as_mut().in_flight_requests().len()
|
||||
>= self.as_mut().channel().config.max_in_flight_requests_per_connection
|
||||
>= self
|
||||
.as_mut()
|
||||
.channel()
|
||||
.config
|
||||
.max_in_flight_requests_per_connection
|
||||
{
|
||||
debug!(
|
||||
"[{}/{}] Client has reached in-flight request limit ({}/{}).",
|
||||
ctx.trace_id(),
|
||||
peer,
|
||||
self.as_mut().in_flight_requests().len(),
|
||||
self.as_mut().channel().config.max_in_flight_requests_per_connection
|
||||
self.as_mut()
|
||||
.channel()
|
||||
.config
|
||||
.max_in_flight_requests_per_connection
|
||||
);
|
||||
|
||||
self.as_mut().channel().start_send(Response {
|
||||
@@ -497,7 +494,9 @@ where
|
||||
),
|
||||
)
|
||||
})?;
|
||||
self.as_mut().in_flight_requests().insert(request_id, abort_handle);
|
||||
self.as_mut()
|
||||
.in_flight_requests()
|
||||
.insert(request_id, abort_handle);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -540,7 +539,10 @@ where
|
||||
trace!("[{}] ClientHandler::poll", self.channel.client_addr);
|
||||
loop {
|
||||
let read = self.as_mut().pump_read(cx)?;
|
||||
match (read, self.as_mut().pump_write(cx, read == Poll::Ready(None))?) {
|
||||
match (
|
||||
read,
|
||||
self.as_mut().pump_write(cx, read == Poll::Ready(None))?,
|
||||
) {
|
||||
(Poll::Ready(None), Poll::Ready(None)) => {
|
||||
info!("[{}] Client disconnected.", self.channel.client_addr);
|
||||
return Poll::Ready(Ok(()));
|
||||
|
||||
@@ -53,8 +53,10 @@ mod registry {
|
||||
/// Returns a function that serves requests for the registered services.
|
||||
pub fn serve(
|
||||
self,
|
||||
) -> impl FnOnce(context::Context, ServiceRequest)
|
||||
-> Either<Services::Future, Ready<io::Result<ServiceResponse>>>
|
||||
) -> impl FnOnce(
|
||||
context::Context,
|
||||
ServiceRequest,
|
||||
) -> Either<Services::Future, Ready<io::Result<ServiceResponse>>>
|
||||
+ Clone {
|
||||
let registrations = Arc::new(self.registrations);
|
||||
move |cx, req: ServiceRequest| match registrations.serve(cx, &req) {
|
||||
@@ -327,7 +329,8 @@ impl<Services: registry::MaybeServe + Sync> BincodeRegistry<Services> {
|
||||
fn serve(
|
||||
self,
|
||||
) -> impl FnOnce(
|
||||
context::Context, registry::ServiceRequest
|
||||
context::Context,
|
||||
registry::ServiceRequest,
|
||||
) -> registry::Either<
|
||||
Services::Future,
|
||||
Ready<io::Result<registry::ServiceResponse>>,
|
||||
|
||||
@@ -9,12 +9,7 @@
|
||||
#![feature(async_await, external_doc)]
|
||||
#![cfg_attr(
|
||||
test,
|
||||
feature(
|
||||
futures_api,
|
||||
await_macro,
|
||||
proc_macro_hygiene,
|
||||
arbitrary_self_types
|
||||
)
|
||||
feature(futures_api, await_macro, proc_macro_hygiene, arbitrary_self_types)
|
||||
)]
|
||||
|
||||
#[doc(hidden)]
|
||||
|
||||
Reference in New Issue
Block a user