mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-02-23 15:49:54 +01:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3bf247ae40 | ||
|
|
69442d2368 | ||
|
|
e135e39504 |
19
RELEASES.md
19
RELEASES.md
@@ -1,22 +1,3 @@
|
||||
## 0.30.0 (2022-08-12)
|
||||
|
||||
### Breaking Changes
|
||||
|
||||
- Some types that impl Future are now annotated with `#[must_use]`. Code that previously created
|
||||
these types but did not use them will now receive a warning. Code that disallows warnings will
|
||||
receive a compilation error.
|
||||
|
||||
### Fixes
|
||||
|
||||
- Servers will more reliably clean up request state for requests with long deadlines when response
|
||||
processing is aborted without sending a response.
|
||||
|
||||
### Other Changes
|
||||
|
||||
- `TrackedRequest` now contains a response guard that can be used to ensure state cleanup for
|
||||
aborted requests. (This was already handled automatically by `InFlightRequests`).
|
||||
- When the feature serde-transport is enabled, the crate tokio_serde is now re-exported.
|
||||
|
||||
## 0.29.0 (2022-05-26)
|
||||
|
||||
### Breaking Changes
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tarpc-example-service"
|
||||
version = "0.12.0"
|
||||
version = "0.11.0"
|
||||
rust-version = "1.56"
|
||||
authors = ["Tim Kuehn <tikue@google.com>"]
|
||||
edition = "2021"
|
||||
@@ -18,14 +18,14 @@ anyhow = "1.0"
|
||||
clap = { version = "3.0.0-rc.9", features = ["derive"] }
|
||||
log = "0.4"
|
||||
futures = "0.3"
|
||||
opentelemetry = { version = "0.17", features = ["rt-tokio"] }
|
||||
opentelemetry-jaeger = { version = "0.16", features = ["rt-tokio"] }
|
||||
opentelemetry = { version = "0.16", features = ["rt-tokio"] }
|
||||
opentelemetry-jaeger = { version = "0.15", features = ["rt-tokio"] }
|
||||
rand = "0.8"
|
||||
tarpc = { version = "0.30", path = "../tarpc", features = ["full"] }
|
||||
tarpc = { version = "0.29", path = "../tarpc", features = ["full"] }
|
||||
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread"] }
|
||||
tracing = { version = "0.1" }
|
||||
tracing-opentelemetry = "0.17"
|
||||
tracing-subscriber = {version = "0.3", features = ["env-filter"]}
|
||||
tracing-opentelemetry = "0.15"
|
||||
tracing-subscriber = "0.2"
|
||||
|
||||
[lib]
|
||||
name = "service"
|
||||
|
||||
@@ -1,9 +0,0 @@
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright 2016 Google Inc. All Rights Reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tarpc"
|
||||
version = "0.30.0"
|
||||
version = "0.29.0"
|
||||
rust-version = "1.58.0"
|
||||
authors = [
|
||||
"Adam Wright <adam.austin.wright@gmail.com>",
|
||||
@@ -50,7 +50,7 @@ static_assertions = "1.1.0"
|
||||
tarpc-plugins = { path = "../plugins", version = "0.12" }
|
||||
thiserror = "1.0"
|
||||
tokio = { version = "1", features = ["time"] }
|
||||
tokio-util = { version = "0.7.3", features = ["time"] }
|
||||
tokio-util = { version = "0.6.9", features = ["time"] }
|
||||
tokio-serde = { optional = true, version = "0.8" }
|
||||
tracing = { version = "0.1", default-features = false, features = [
|
||||
"attributes",
|
||||
|
||||
@@ -1,9 +0,0 @@
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright 2016 Google Inc. All Rights Reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
@@ -1,9 +1,8 @@
|
||||
use tarpc::context::Context;
|
||||
use tarpc::serde_transport as transport;
|
||||
use tarpc::server::{BaseChannel, Channel};
|
||||
use tarpc::tokio_serde::formats::Bincode;
|
||||
use tarpc::tokio_util::codec::length_delimited::LengthDelimitedCodec;
|
||||
use tarpc::{context::Context, tokio_serde::formats::Bincode};
|
||||
use tokio::net::{UnixListener, UnixStream};
|
||||
use tokio_util::codec::length_delimited::LengthDelimitedCodec;
|
||||
|
||||
#[tarpc::service]
|
||||
pub trait PingService {
|
||||
|
||||
@@ -52,9 +52,9 @@ use tarpc::{
|
||||
client, context,
|
||||
serde_transport::tcp,
|
||||
server::{self, Channel},
|
||||
tokio_serde::formats::Json,
|
||||
};
|
||||
use tokio::net::ToSocketAddrs;
|
||||
use tokio_serde::formats::Json;
|
||||
use tracing::info;
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
@@ -129,6 +129,7 @@ impl Subscriber {
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Subscription {
|
||||
subscriber: subscriber::SubscriberClient,
|
||||
topics: Vec<String>,
|
||||
}
|
||||
|
||||
@@ -209,6 +210,7 @@ impl Publisher {
|
||||
self.clients.lock().unwrap().insert(
|
||||
subscriber_addr,
|
||||
Subscription {
|
||||
subscriber: subscriber.clone(),
|
||||
topics: topics.clone(),
|
||||
},
|
||||
);
|
||||
|
||||
@@ -9,8 +9,8 @@ use futures::{future, prelude::*};
|
||||
use tarpc::{
|
||||
client, context,
|
||||
server::{incoming::Incoming, BaseChannel},
|
||||
tokio_serde::formats::Json,
|
||||
};
|
||||
use tokio_serde::formats::Json;
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
pub mod add {
|
||||
|
||||
@@ -1,49 +0,0 @@
|
||||
use futures::{prelude::*, task::*};
|
||||
use std::pin::Pin;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
/// Sends request cancellation signals.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RequestCancellation(mpsc::UnboundedSender<u64>);
|
||||
|
||||
/// A stream of IDs of requests that have been canceled.
|
||||
#[derive(Debug)]
|
||||
pub struct CanceledRequests(mpsc::UnboundedReceiver<u64>);
|
||||
|
||||
/// Returns a channel to send request cancellation messages.
|
||||
pub fn cancellations() -> (RequestCancellation, CanceledRequests) {
|
||||
// Unbounded because messages are sent in the drop fn. This is fine, because it's still
|
||||
// bounded by the number of in-flight requests.
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
(RequestCancellation(tx), CanceledRequests(rx))
|
||||
}
|
||||
|
||||
impl RequestCancellation {
|
||||
/// Cancels the request with ID `request_id`.
|
||||
///
|
||||
/// No validation is done of `request_id`. There is no way to know if the request id provided
|
||||
/// corresponds to a request actually tracked by the backing channel. `RequestCancellation` is
|
||||
/// a one-way communication channel.
|
||||
///
|
||||
/// Once request data is cleaned up, a response will never be received by the client. This is
|
||||
/// useful primarily when request processing ends prematurely for requests with long deadlines
|
||||
/// which would otherwise continue to be tracked by the backing channel—a kind of leak.
|
||||
pub fn cancel(&self, request_id: u64) {
|
||||
let _ = self.0.send(request_id);
|
||||
}
|
||||
}
|
||||
|
||||
impl CanceledRequests {
|
||||
/// Polls for a cancelled request.
|
||||
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<u64>> {
|
||||
self.0.poll_recv(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for CanceledRequests {
|
||||
type Item = u64;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<u64>> {
|
||||
self.poll_recv(cx)
|
||||
}
|
||||
}
|
||||
@@ -8,10 +8,7 @@
|
||||
|
||||
mod in_flight_requests;
|
||||
|
||||
use crate::{
|
||||
cancellations::{cancellations, CanceledRequests, RequestCancellation},
|
||||
context, trace, ClientMessage, Request, Response, ServerError, Transport,
|
||||
};
|
||||
use crate::{context, trace, ClientMessage, Request, Response, ServerError, Transport};
|
||||
use futures::{prelude::*, ready, stream::Fuse, task::*};
|
||||
use in_flight_requests::{DeadlineExceededError, InFlightRequests};
|
||||
use pin_project::pin_project;
|
||||
@@ -129,7 +126,7 @@ impl<Req, Resp> Channel<Req, Resp> {
|
||||
) -> Result<Resp, RpcError> {
|
||||
let span = Span::current();
|
||||
ctx.trace_context = trace::Context::try_from(&span).unwrap_or_else(|_| {
|
||||
tracing::trace!(
|
||||
tracing::warn!(
|
||||
"OpenTelemetry subscriber not installed; making unsampled child context."
|
||||
);
|
||||
ctx.trace_context.new_child()
|
||||
@@ -258,7 +255,6 @@ where
|
||||
|
||||
/// Handles the lifecycle of requests, writing requests to the wire, managing cancellations,
|
||||
/// and dispatching responses to the appropriate channel.
|
||||
#[must_use]
|
||||
#[pin_project]
|
||||
#[derive(Debug)]
|
||||
pub struct RequestDispatch<Req, Resp, C> {
|
||||
@@ -395,7 +391,11 @@ where
|
||||
// Receiving Poll::Ready(None) when polling expired requests never indicates "Closed",
|
||||
// because there can temporarily be zero in-flight rquests. Therefore, there is no need to
|
||||
// track the status like is done with pending and cancelled requests.
|
||||
if let Poll::Ready(Some(_)) = self.in_flight_requests().poll_expired(cx) {
|
||||
if let Poll::Ready(Some(_)) = self
|
||||
.in_flight_requests()
|
||||
.poll_expired(cx)
|
||||
.map_err(ChannelError::Timer)?
|
||||
{
|
||||
// Expired requests are considered complete; there is no compelling reason to send a
|
||||
// cancellation message to the server, since it will have already exhausted its
|
||||
// allotted processing time.
|
||||
@@ -602,9 +602,49 @@ struct DispatchRequest<Req, Resp> {
|
||||
pub response_completion: oneshot::Sender<Result<Response<Resp>, DeadlineExceededError>>,
|
||||
}
|
||||
|
||||
/// Sends request cancellation signals.
|
||||
#[derive(Debug, Clone)]
|
||||
struct RequestCancellation(mpsc::UnboundedSender<u64>);
|
||||
|
||||
/// A stream of IDs of requests that have been canceled.
|
||||
#[derive(Debug)]
|
||||
struct CanceledRequests(mpsc::UnboundedReceiver<u64>);
|
||||
|
||||
/// Returns a channel to send request cancellation messages.
|
||||
fn cancellations() -> (RequestCancellation, CanceledRequests) {
|
||||
// Unbounded because messages are sent in the drop fn. This is fine, because it's still
|
||||
// bounded by the number of in-flight requests.
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
(RequestCancellation(tx), CanceledRequests(rx))
|
||||
}
|
||||
|
||||
impl RequestCancellation {
|
||||
/// Cancels the request with ID `request_id`.
|
||||
fn cancel(&self, request_id: u64) {
|
||||
let _ = self.0.send(request_id);
|
||||
}
|
||||
}
|
||||
|
||||
impl CanceledRequests {
|
||||
fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<u64>> {
|
||||
self.0.poll_recv(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for CanceledRequests {
|
||||
type Item = u64;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<u64>> {
|
||||
self.poll_recv(cx)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{cancellations, Channel, DispatchRequest, RequestDispatch, ResponseGuard};
|
||||
use super::{
|
||||
cancellations, CanceledRequests, Channel, DispatchRequest, RequestCancellation,
|
||||
RequestDispatch, ResponseGuard,
|
||||
};
|
||||
use crate::{
|
||||
client::{
|
||||
in_flight_requests::{DeadlineExceededError, InFlightRequests},
|
||||
@@ -657,7 +697,7 @@ mod tests {
|
||||
});
|
||||
// resp's drop() is run, which should send a cancel message.
|
||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||
assert_eq!(canceled_requests.poll_recv(cx), Poll::Ready(Some(3)));
|
||||
assert_eq!(canceled_requests.0.poll_recv(cx), Poll::Ready(Some(3)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -680,7 +720,7 @@ mod tests {
|
||||
.unwrap();
|
||||
drop(cancellation);
|
||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||
assert_eq!(canceled_requests.poll_recv(cx), Poll::Ready(None));
|
||||
assert_eq!(canceled_requests.0.poll_recv(cx), Poll::Ready(None));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -691,7 +731,6 @@ mod tests {
|
||||
|
||||
let _resp = send_request(&mut channel, "hi", tx, &mut rx).await;
|
||||
|
||||
#[allow(unstable_name_collisions)]
|
||||
let req = dispatch.as_mut().poll_next_request(cx).ready();
|
||||
assert!(req.is_some());
|
||||
|
||||
@@ -722,7 +761,6 @@ mod tests {
|
||||
dispatch.await.unwrap();
|
||||
}
|
||||
|
||||
#[allow(unstable_name_collisions)]
|
||||
#[tokio::test]
|
||||
async fn stage_request_response_future_dropped_is_canceled_before_sending() {
|
||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||
@@ -738,7 +776,6 @@ mod tests {
|
||||
assert!(dispatch.as_mut().poll_next_request(cx).ready().is_none());
|
||||
}
|
||||
|
||||
#[allow(unstable_name_collisions)]
|
||||
#[tokio::test]
|
||||
async fn stage_request_response_future_dropped_is_canceled_after_sending() {
|
||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||
@@ -791,17 +828,18 @@ mod tests {
|
||||
let _ = tracing_subscriber::fmt().with_test_writer().try_init();
|
||||
|
||||
let (to_dispatch, pending_requests) = mpsc::channel(1);
|
||||
let (cancellation, canceled_requests) = cancellations();
|
||||
let (cancel_tx, canceled_requests) = mpsc::unbounded_channel();
|
||||
let (client_channel, server_channel) = transport::channel::unbounded();
|
||||
|
||||
let dispatch = RequestDispatch::<String, String, _> {
|
||||
transport: client_channel.fuse(),
|
||||
pending_requests: pending_requests,
|
||||
canceled_requests,
|
||||
canceled_requests: CanceledRequests(canceled_requests),
|
||||
in_flight_requests: InFlightRequests::default(),
|
||||
config: Config::default(),
|
||||
};
|
||||
|
||||
let cancellation = RequestCancellation(cancel_tx);
|
||||
let channel = Channel {
|
||||
to_dispatch,
|
||||
cancellation,
|
||||
@@ -826,13 +864,13 @@ mod tests {
|
||||
request: request.to_string(),
|
||||
response_completion,
|
||||
};
|
||||
let response_guard = ResponseGuard {
|
||||
channel.to_dispatch.send(request).await.unwrap();
|
||||
|
||||
ResponseGuard {
|
||||
response,
|
||||
cancellation: &channel.cancellation,
|
||||
request_id,
|
||||
};
|
||||
channel.to_dispatch.send(request).await.unwrap();
|
||||
response_guard
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_response(
|
||||
|
||||
@@ -117,9 +117,12 @@ impl<Resp> InFlightRequests<Resp> {
|
||||
|
||||
/// Yields a request that has expired, completing it with a TimedOut error.
|
||||
/// The caller should send cancellation messages for any yielded request ID.
|
||||
pub fn poll_expired(&mut self, cx: &mut Context) -> Poll<Option<u64>> {
|
||||
self.deadlines.poll_expired(cx).map(|expired| {
|
||||
let request_id = expired?.into_inner();
|
||||
pub fn poll_expired(
|
||||
&mut self,
|
||||
cx: &mut Context,
|
||||
) -> Poll<Option<Result<u64, tokio::time::error::Error>>> {
|
||||
self.deadlines.poll_expired(cx).map_ok(|expired| {
|
||||
let request_id = expired.into_inner();
|
||||
if let Some(request_data) = self.request_data.remove(&request_id) {
|
||||
let _entered = request_data.span.enter();
|
||||
tracing::error!("DeadlineExceeded");
|
||||
@@ -128,7 +131,7 @@ impl<Resp> InFlightRequests<Resp> {
|
||||
.response_completion
|
||||
.send(Err(DeadlineExceededError));
|
||||
}
|
||||
Some(request_id)
|
||||
request_id
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -209,7 +209,7 @@
|
||||
pub use serde;
|
||||
|
||||
#[cfg(feature = "serde-transport")]
|
||||
pub use {tokio_serde, tokio_util};
|
||||
pub use tokio_serde;
|
||||
|
||||
#[cfg(feature = "serde-transport")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "serde-transport")))]
|
||||
@@ -300,7 +300,6 @@ pub use tarpc_plugins::service;
|
||||
/// `async`, meaning that this should not break existing code.
|
||||
pub use tarpc_plugins::server;
|
||||
|
||||
pub(crate) mod cancellations;
|
||||
pub mod client;
|
||||
pub mod context;
|
||||
pub mod server;
|
||||
|
||||
@@ -149,7 +149,6 @@ pub mod tcp {
|
||||
}
|
||||
|
||||
/// A connection Future that also exposes the length-delimited framing config.
|
||||
#[must_use]
|
||||
#[pin_project]
|
||||
pub struct Connect<T, Item, SinkItem, CodecFn> {
|
||||
#[pin]
|
||||
|
||||
@@ -7,7 +7,6 @@
|
||||
//! Provides a server that concurrently handles many connections sending multiplexed requests.
|
||||
|
||||
use crate::{
|
||||
cancellations::{cancellations, CanceledRequests, RequestCancellation},
|
||||
context::{self, SpanExt},
|
||||
trace, ClientMessage, Request, Response, Transport,
|
||||
};
|
||||
@@ -21,14 +20,7 @@ use futures::{
|
||||
};
|
||||
use in_flight_requests::{AlreadyExistsError, InFlightRequests};
|
||||
use pin_project::pin_project;
|
||||
use std::{
|
||||
convert::TryFrom,
|
||||
error::Error,
|
||||
fmt,
|
||||
marker::PhantomData,
|
||||
mem::{self, ManuallyDrop},
|
||||
pin::Pin,
|
||||
};
|
||||
use std::{convert::TryFrom, error::Error, fmt, marker::PhantomData, pin::Pin};
|
||||
use tracing::{info_span, instrument::Instrument, Span};
|
||||
|
||||
mod in_flight_requests;
|
||||
@@ -119,11 +111,6 @@ pub struct BaseChannel<Req, Resp, T> {
|
||||
/// Writes responses to the wire and reads requests off the wire.
|
||||
#[pin]
|
||||
transport: Fuse<T>,
|
||||
/// In-flight requests that were dropped by the server before completion.
|
||||
#[pin]
|
||||
canceled_requests: CanceledRequests,
|
||||
/// Notifies `canceled_requests` when a request is canceled.
|
||||
request_cancellation: RequestCancellation,
|
||||
/// Holds data necessary to clean up in-flight requests.
|
||||
in_flight_requests: InFlightRequests,
|
||||
/// Types the request and response.
|
||||
@@ -136,12 +123,9 @@ where
|
||||
{
|
||||
/// Creates a new channel backed by `transport` and configured with `config`.
|
||||
pub fn new(config: Config, transport: T) -> Self {
|
||||
let (request_cancellation, canceled_requests) = cancellations();
|
||||
BaseChannel {
|
||||
config,
|
||||
transport: transport.fuse(),
|
||||
canceled_requests,
|
||||
request_cancellation,
|
||||
in_flight_requests: InFlightRequests::default(),
|
||||
ghost: PhantomData,
|
||||
}
|
||||
@@ -166,18 +150,12 @@ where
|
||||
self.as_mut().project().in_flight_requests
|
||||
}
|
||||
|
||||
fn canceled_requests_pin_mut<'a>(
|
||||
self: &'a mut Pin<&mut Self>,
|
||||
) -> Pin<&'a mut CanceledRequests> {
|
||||
self.as_mut().project().canceled_requests
|
||||
}
|
||||
|
||||
fn transport_pin_mut<'a>(self: &'a mut Pin<&mut Self>) -> Pin<&'a mut Fuse<T>> {
|
||||
self.as_mut().project().transport
|
||||
}
|
||||
|
||||
fn start_request(
|
||||
mut self: Pin<&mut Self>,
|
||||
self: Pin<&mut Self>,
|
||||
mut request: Request<Req>,
|
||||
) -> Result<TrackedRequest<Req>, AlreadyExistsError> {
|
||||
let span = info_span!(
|
||||
@@ -197,7 +175,7 @@ where
|
||||
});
|
||||
let entered = span.enter();
|
||||
tracing::info!("ReceiveRequest");
|
||||
let start = self.in_flight_requests_mut().start_request(
|
||||
let start = self.project().in_flight_requests.start_request(
|
||||
request.id,
|
||||
request.context.deadline,
|
||||
span.clone(),
|
||||
@@ -206,13 +184,9 @@ where
|
||||
Ok(abort_registration) => {
|
||||
drop(entered);
|
||||
Ok(TrackedRequest {
|
||||
request,
|
||||
abort_registration,
|
||||
span,
|
||||
response_guard: ManuallyDrop::new(ResponseGuard {
|
||||
request_id: request.id,
|
||||
request_cancellation: self.request_cancellation.clone(),
|
||||
}),
|
||||
request,
|
||||
})
|
||||
}
|
||||
Err(AlreadyExistsError) => {
|
||||
@@ -239,8 +213,6 @@ pub struct TrackedRequest<Req> {
|
||||
pub abort_registration: AbortRegistration,
|
||||
/// A span representing the server processing of this request.
|
||||
pub span: Span,
|
||||
/// An inert response guard. Becomes active in an InFlightRequest.
|
||||
pub response_guard: ManuallyDrop<ResponseGuard>,
|
||||
}
|
||||
|
||||
/// The server end of an open connection with a client, receiving requests from, and sending
|
||||
@@ -259,15 +231,13 @@ pub struct TrackedRequest<Req> {
|
||||
/// [`Sink::send`](futures::sink::SinkExt::send) - A user is free to manually read requests
|
||||
/// from, and send responses into, a Channel in lieu of the previous methods. Channels stream
|
||||
/// [`TrackedRequests`](TrackedRequest), which, in addition to the request itself, contains the
|
||||
/// server [`Span`], request lifetime [`AbortRegistration`], and an inert [`ResponseGuard`].
|
||||
/// Wrapping response logic in an [`Abortable`] future using the abort registration will ensure
|
||||
/// that the response does not execute longer than the request deadline. The `Channel` itself
|
||||
/// will clean up request state once either the deadline expires, or the response guard is
|
||||
/// dropped, or a response is sent.
|
||||
///
|
||||
/// Channels must be implemented using the decorator pattern: the only way to create a
|
||||
/// `TrackedRequest` is to get one from another `Channel`. Ultimately, all `TrackedRequests` are
|
||||
/// created by [`BaseChannel`].
|
||||
/// server [`Span`] and request lifetime [`AbortRegistration`]. Wrapping response
|
||||
/// logic in an [`Abortable`] future using the abort registration will ensure that the response
|
||||
/// does not execute longer than the request deadline. The `Channel` itself will clean up
|
||||
/// request state once either the deadline expires, or a cancellation message is received, or a
|
||||
/// response is sent. Because there is no guarantee that a cancellation message will ever be
|
||||
/// received for a request, or that requests come with reasonably short deadlines, services
|
||||
/// should strive to clean up Channel resources by sending a response for every request.
|
||||
pub trait Channel
|
||||
where
|
||||
Self: Transport<Response<<Self as Channel>::Resp>, TrackedRequest<<Self as Channel>::Req>>,
|
||||
@@ -364,45 +334,20 @@ where
|
||||
type Item = Result<TrackedRequest<Req>, ChannelError<T::Error>>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
#[derive(Debug)]
|
||||
enum ReceiverStatus {
|
||||
Ready,
|
||||
Pending,
|
||||
Closed,
|
||||
}
|
||||
|
||||
impl ReceiverStatus {
|
||||
fn combine(self, other: Self) -> Self {
|
||||
use ReceiverStatus::*;
|
||||
match (self, other) {
|
||||
(Ready, _) | (_, Ready) => Ready,
|
||||
(Closed, Closed) => Closed,
|
||||
(Pending, Closed) | (Closed, Pending) | (Pending, Pending) => Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
use ReceiverStatus::*;
|
||||
|
||||
loop {
|
||||
let cancellation_status = match self.canceled_requests_pin_mut().poll_recv(cx) {
|
||||
Poll::Ready(Some(request_id)) => {
|
||||
if let Some(span) = self.in_flight_requests_mut().remove_request(request_id) {
|
||||
let _entered = span.enter();
|
||||
tracing::info!("ResponseCancelled");
|
||||
}
|
||||
Ready
|
||||
}
|
||||
// Pending cancellations don't block Channel closure, because all they do is ensure
|
||||
// the Channel's internal state is cleaned up. But Channel closure also cleans up
|
||||
// the Channel state, so there's no reason to wait on a cancellation before
|
||||
// closing.
|
||||
//
|
||||
// Ready(None) can't happen, since `self` holds a Cancellation.
|
||||
Poll::Pending | Poll::Ready(None) => Closed,
|
||||
};
|
||||
|
||||
let expiration_status = match self.in_flight_requests_mut().poll_expired(cx) {
|
||||
let expiration_status = match self
|
||||
.in_flight_requests_mut()
|
||||
.poll_expired(cx)
|
||||
.map_err(ChannelError::Timer)?
|
||||
{
|
||||
// No need to send a response, since the client wouldn't be waiting for one
|
||||
// anymore.
|
||||
Poll::Ready(Some(_)) => Ready,
|
||||
@@ -450,13 +395,10 @@ where
|
||||
expiration_status,
|
||||
request_status
|
||||
);
|
||||
match cancellation_status
|
||||
.combine(expiration_status)
|
||||
.combine(request_status)
|
||||
{
|
||||
Ready => continue,
|
||||
Closed => return Poll::Ready(None),
|
||||
Pending => return Poll::Pending,
|
||||
match (expiration_status, request_status) {
|
||||
(Ready, _) | (_, Ready) => continue,
|
||||
(Closed, Closed) => return Poll::Ready(None),
|
||||
(Pending, Closed) | (Closed, Pending) | (Pending, Pending) => return Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -478,7 +420,9 @@ where
|
||||
|
||||
fn start_send(mut self: Pin<&mut Self>, response: Response<Resp>) -> Result<(), Self::Error> {
|
||||
if let Some(span) = self
|
||||
.in_flight_requests_mut()
|
||||
.as_mut()
|
||||
.project()
|
||||
.in_flight_requests
|
||||
.remove_request(response.request_id)
|
||||
{
|
||||
let _entered = span.enter();
|
||||
@@ -555,11 +499,6 @@ impl<C> Requests<C>
|
||||
where
|
||||
C: Channel,
|
||||
{
|
||||
/// Returns a reference to the inner channel over which messages are sent and received.
|
||||
pub fn channel(&self) -> &C {
|
||||
&self.channel
|
||||
}
|
||||
|
||||
/// Returns the inner channel over which messages are sent and received.
|
||||
pub fn channel_pin_mut<'a>(self: &'a mut Pin<&mut Self>) -> Pin<&'a mut C> {
|
||||
self.as_mut().project().channel
|
||||
@@ -576,22 +515,12 @@ where
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<InFlightRequest<C::Req, C::Resp>, C::Error>>> {
|
||||
self.channel_pin_mut().poll_next(cx).map_ok(
|
||||
|TrackedRequest {
|
||||
request,
|
||||
abort_registration,
|
||||
span,
|
||||
response_guard,
|
||||
}| {
|
||||
InFlightRequest {
|
||||
request,
|
||||
abort_registration,
|
||||
span,
|
||||
response_guard: ManuallyDrop::into_inner(response_guard),
|
||||
response_tx: self.responses_tx.clone(),
|
||||
}
|
||||
},
|
||||
)
|
||||
self.channel_pin_mut()
|
||||
.poll_next(cx)
|
||||
.map_ok(|request| InFlightRequest {
|
||||
request,
|
||||
response_tx: self.responses_tx.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
fn pump_write(
|
||||
@@ -668,37 +597,17 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// A fail-safe to ensure requests are properly canceled if request processing is aborted before
|
||||
/// completing.
|
||||
#[derive(Debug)]
|
||||
pub struct ResponseGuard {
|
||||
request_cancellation: RequestCancellation,
|
||||
request_id: u64,
|
||||
}
|
||||
|
||||
impl Drop for ResponseGuard {
|
||||
fn drop(&mut self) {
|
||||
self.request_cancellation.cancel(self.request_id);
|
||||
}
|
||||
}
|
||||
|
||||
/// A request produced by [Channel::requests].
|
||||
///
|
||||
/// If dropped without calling [`execute`](InFlightRequest::execute), a cancellation message will
|
||||
/// be sent to the Channel to clean up associated request state.
|
||||
#[derive(Debug)]
|
||||
pub struct InFlightRequest<Req, Res> {
|
||||
request: Request<Req>,
|
||||
abort_registration: AbortRegistration,
|
||||
response_guard: ResponseGuard,
|
||||
span: Span,
|
||||
request: TrackedRequest<Req>,
|
||||
response_tx: mpsc::Sender<Response<Res>>,
|
||||
}
|
||||
|
||||
impl<Req, Res> InFlightRequest<Req, Res> {
|
||||
/// Returns a reference to the request.
|
||||
pub fn get(&self) -> &Request<Req> {
|
||||
&self.request
|
||||
&self.request.request
|
||||
}
|
||||
|
||||
/// Returns a [future](Future) that executes the request using the given [service
|
||||
@@ -712,23 +621,22 @@ impl<Req, Res> InFlightRequest<Req, Res> {
|
||||
/// message](ClientMessage::Cancel) for this request.
|
||||
/// 2. The request [deadline](crate::context::Context::deadline) is reached.
|
||||
/// 3. The service function completes.
|
||||
///
|
||||
/// If the returned Future is dropped before completion, a cancellation message will be sent to
|
||||
/// the Channel to clean up associated request state.
|
||||
pub async fn execute<S>(self, serve: S)
|
||||
where
|
||||
S: Serve<Req, Resp = Res>,
|
||||
{
|
||||
let Self {
|
||||
response_tx,
|
||||
response_guard,
|
||||
abort_registration,
|
||||
span,
|
||||
request:
|
||||
Request {
|
||||
context,
|
||||
message,
|
||||
id: request_id,
|
||||
TrackedRequest {
|
||||
abort_registration,
|
||||
span,
|
||||
request:
|
||||
Request {
|
||||
context,
|
||||
message,
|
||||
id: request_id,
|
||||
},
|
||||
},
|
||||
} = self;
|
||||
let method = serve.method(&message);
|
||||
@@ -749,10 +657,6 @@ impl<Req, Res> InFlightRequest<Req, Res> {
|
||||
)
|
||||
.instrument(span)
|
||||
.await;
|
||||
// Request processing has completed, meaning either the channel canceled the request or
|
||||
// a request was sent back to the channel. Either way, the channel will clean up the
|
||||
// request data, so the request does not need to be canceled.
|
||||
mem::forget(response_guard);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1028,44 +932,6 @@ mod tests {
|
||||
assert_eq!(channel.in_flight_requests(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn in_flight_request_drop_cancels_request() {
|
||||
let (mut requests, mut tx) = test_requests::<(), ()>();
|
||||
tx.send(fake_request(())).await.unwrap();
|
||||
|
||||
let request = match requests.as_mut().poll_next(&mut noop_context()) {
|
||||
Poll::Ready(Some(Ok(request))) => request,
|
||||
result => panic!("Unexpected result: {:?}", result),
|
||||
};
|
||||
drop(request);
|
||||
|
||||
let poll = requests
|
||||
.as_mut()
|
||||
.channel_pin_mut()
|
||||
.poll_next(&mut noop_context());
|
||||
assert!(poll.is_pending());
|
||||
let in_flight_requests = requests.channel().in_flight_requests();
|
||||
assert_eq!(in_flight_requests, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn in_flight_requests_successful_execute_doesnt_cancel_request() {
|
||||
let (mut requests, mut tx) = test_requests::<(), ()>();
|
||||
tx.send(fake_request(())).await.unwrap();
|
||||
|
||||
let request = match requests.as_mut().poll_next(&mut noop_context()) {
|
||||
Poll::Ready(Some(Ok(request))) => request,
|
||||
result => panic!("Unexpected result: {:?}", result),
|
||||
};
|
||||
request.execute(|_, _| async {}).await;
|
||||
assert!(requests
|
||||
.as_mut()
|
||||
.channel_pin_mut()
|
||||
.canceled_requests
|
||||
.poll_recv(&mut noop_context())
|
||||
.is_pending());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn requests_poll_next_response_returns_pending_when_buffer_full() {
|
||||
let (mut requests, _tx) = test_bounded_requests::<(), ()>(0);
|
||||
|
||||
@@ -94,14 +94,16 @@ impl InFlightRequests {
|
||||
}
|
||||
|
||||
/// Yields a request that has expired, aborting any ongoing processing of that request.
|
||||
pub fn poll_expired(&mut self, cx: &mut Context) -> Poll<Option<u64>> {
|
||||
pub fn poll_expired(
|
||||
&mut self,
|
||||
cx: &mut Context,
|
||||
) -> Poll<Option<Result<u64, tokio::time::error::Error>>> {
|
||||
if self.deadlines.is_empty() {
|
||||
// TODO(https://github.com/tokio-rs/tokio/issues/4161)
|
||||
// This is a workaround for DelayQueue not always treating this case correctly.
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
self.deadlines.poll_expired(cx).map(|expired| {
|
||||
let expired = expired?;
|
||||
self.deadlines.poll_expired(cx).map_ok(|expired| {
|
||||
if let Some(RequestData {
|
||||
abort_handle, span, ..
|
||||
}) = self.request_data.remove(expired.get_ref())
|
||||
@@ -111,7 +113,7 @@ impl InFlightRequests {
|
||||
abort_handle.abort();
|
||||
tracing::error!("DeadlineExceeded");
|
||||
}
|
||||
Some(expired.into_inner())
|
||||
expired.into_inner()
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -159,7 +161,7 @@ mod tests {
|
||||
|
||||
assert_matches!(
|
||||
in_flight_requests.poll_expired(&mut noop_context()),
|
||||
Poll::Ready(Some(_))
|
||||
Poll::Ready(Some(Ok(_)))
|
||||
);
|
||||
assert_matches!(
|
||||
abortable_future.poll_unpin(&mut noop_context()),
|
||||
|
||||
@@ -5,14 +5,13 @@
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
use crate::{
|
||||
cancellations::{cancellations, CanceledRequests, RequestCancellation},
|
||||
context,
|
||||
server::{Channel, Config, ResponseGuard, TrackedRequest},
|
||||
server::{Channel, Config, TrackedRequest},
|
||||
Request, Response,
|
||||
};
|
||||
use futures::{task::*, Sink, Stream};
|
||||
use pin_project::pin_project;
|
||||
use std::{collections::VecDeque, io, mem::ManuallyDrop, pin::Pin, time::SystemTime};
|
||||
use std::{collections::VecDeque, io, pin::Pin, time::SystemTime};
|
||||
use tracing::Span;
|
||||
|
||||
#[pin_project]
|
||||
@@ -23,8 +22,6 @@ pub(crate) struct FakeChannel<In, Out> {
|
||||
pub sink: VecDeque<Out>,
|
||||
pub config: Config,
|
||||
pub in_flight_requests: super::in_flight_requests::InFlightRequests,
|
||||
pub request_cancellation: RequestCancellation,
|
||||
pub canceled_requests: CanceledRequests,
|
||||
}
|
||||
|
||||
impl<In, Out> Stream for FakeChannel<In, Out>
|
||||
@@ -89,7 +86,6 @@ where
|
||||
impl<Req, Resp> FakeChannel<io::Result<TrackedRequest<Req>>, Response<Resp>> {
|
||||
pub fn push_req(&mut self, id: u64, message: Req) {
|
||||
let (_, abort_registration) = futures::future::AbortHandle::new_pair();
|
||||
let (request_cancellation, _) = cancellations();
|
||||
self.stream.push_back(Ok(TrackedRequest {
|
||||
request: Request {
|
||||
context: context::Context {
|
||||
@@ -101,24 +97,17 @@ impl<Req, Resp> FakeChannel<io::Result<TrackedRequest<Req>>, Response<Resp>> {
|
||||
},
|
||||
abort_registration,
|
||||
span: Span::none(),
|
||||
response_guard: ManuallyDrop::new(ResponseGuard {
|
||||
request_cancellation,
|
||||
request_id: id,
|
||||
}),
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
impl FakeChannel<(), ()> {
|
||||
pub fn default<Req, Resp>() -> FakeChannel<io::Result<TrackedRequest<Req>>, Response<Resp>> {
|
||||
let (request_cancellation, canceled_requests) = cancellations();
|
||||
FakeChannel {
|
||||
stream: Default::default(),
|
||||
sink: Default::default(),
|
||||
config: Default::default(),
|
||||
in_flight_requests: Default::default(),
|
||||
request_cancellation,
|
||||
canceled_requests,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,6 @@ use std::pin::Pin;
|
||||
/// A future that drives the server by [spawning](tokio::spawn) a [`TokioChannelExecutor`](TokioChannelExecutor)
|
||||
/// for each new channel. Returned by
|
||||
/// [`Incoming::execute`](crate::server::incoming::Incoming::execute).
|
||||
#[must_use]
|
||||
#[pin_project]
|
||||
#[derive(Debug)]
|
||||
pub struct TokioServerExecutor<T, S> {
|
||||
@@ -24,7 +23,6 @@ impl<T, S> TokioServerExecutor<T, S> {
|
||||
/// A future that drives the server by [spawning](tokio::spawn) each [response
|
||||
/// handler](super::InFlightRequest::execute) on tokio's default executor. Returned by
|
||||
/// [`Channel::execute`](crate::server::Channel::execute).
|
||||
#[must_use]
|
||||
#[pin_project]
|
||||
#[derive(Debug)]
|
||||
pub struct TokioChannelExecutor<T, S> {
|
||||
|
||||
@@ -2,8 +2,4 @@
|
||||
fn ui() {
|
||||
let t = trybuild::TestCases::new();
|
||||
t.compile_fail("tests/compile_fail/*.rs");
|
||||
#[cfg(feature = "tokio1")]
|
||||
t.compile_fail("tests/compile_fail/tokio/*.rs");
|
||||
#[cfg(all(feature = "serde-transport", feature = "tcp"))]
|
||||
t.compile_fail("tests/compile_fail/serde_transport/*.rs");
|
||||
}
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
use tarpc::client;
|
||||
|
||||
#[tarpc::service]
|
||||
trait World {
|
||||
async fn hello(name: String) -> String;
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let (client_transport, _) = tarpc::transport::channel::unbounded();
|
||||
|
||||
#[deny(unused_must_use)]
|
||||
{
|
||||
WorldClient::new(client::Config::default(), client_transport).dispatch;
|
||||
}
|
||||
}
|
||||
@@ -1,11 +0,0 @@
|
||||
error: unused `RequestDispatch` that must be used
|
||||
--> tests/compile_fail/must_use_request_dispatch.rs:13:9
|
||||
|
|
||||
13 | WorldClient::new(client::Config::default(), client_transport).dispatch;
|
||||
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
|
||||
note: the lint level is defined here
|
||||
--> tests/compile_fail/must_use_request_dispatch.rs:11:12
|
||||
|
|
||||
11 | #[deny(unused_must_use)]
|
||||
| ^^^^^^^^^^^^^^^
|
||||
@@ -1,9 +0,0 @@
|
||||
use tarpc::serde_transport;
|
||||
use tokio_serde::formats::Json;
|
||||
|
||||
fn main() {
|
||||
#[deny(unused_must_use)]
|
||||
{
|
||||
serde_transport::tcp::connect::<_, (), (), _, _>("0.0.0.0:0", Json::default);
|
||||
}
|
||||
}
|
||||
@@ -1,11 +0,0 @@
|
||||
error: unused `Connect` that must be used
|
||||
--> tests/compile_fail/serde_transport/must_use_tcp_connect.rs:7:9
|
||||
|
|
||||
7 | serde_transport::tcp::connect::<_, (), (), _, _>("0.0.0.0:0", Json::default);
|
||||
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
|
||||
note: the lint level is defined here
|
||||
--> tests/compile_fail/serde_transport/must_use_tcp_connect.rs:5:12
|
||||
|
|
||||
5 | #[deny(unused_must_use)]
|
||||
| ^^^^^^^^^^^^^^^
|
||||
@@ -1,29 +0,0 @@
|
||||
use tarpc::{
|
||||
context,
|
||||
server::{self, Channel},
|
||||
};
|
||||
|
||||
#[tarpc::service]
|
||||
trait World {
|
||||
async fn hello(name: String) -> String;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct HelloServer;
|
||||
|
||||
#[tarpc::server]
|
||||
impl World for HelloServer {
|
||||
async fn hello(self, _: context::Context, name: String) -> String {
|
||||
format!("Hello, {name}!")
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let (_, server_transport) = tarpc::transport::channel::unbounded();
|
||||
let server = server::BaseChannel::with_defaults(server_transport);
|
||||
|
||||
#[deny(unused_must_use)]
|
||||
{
|
||||
server.execute(HelloServer.serve());
|
||||
}
|
||||
}
|
||||
@@ -1,11 +0,0 @@
|
||||
error: unused `TokioChannelExecutor` that must be used
|
||||
--> tests/compile_fail/tokio/must_use_channel_executor.rs:27:9
|
||||
|
|
||||
27 | server.execute(HelloServer.serve());
|
||||
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
|
||||
note: the lint level is defined here
|
||||
--> tests/compile_fail/tokio/must_use_channel_executor.rs:25:12
|
||||
|
|
||||
25 | #[deny(unused_must_use)]
|
||||
| ^^^^^^^^^^^^^^^
|
||||
@@ -1,30 +0,0 @@
|
||||
use futures::stream::once;
|
||||
use tarpc::{
|
||||
context,
|
||||
server::{self, incoming::Incoming},
|
||||
};
|
||||
|
||||
#[tarpc::service]
|
||||
trait World {
|
||||
async fn hello(name: String) -> String;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct HelloServer;
|
||||
|
||||
#[tarpc::server]
|
||||
impl World for HelloServer {
|
||||
async fn hello(self, _: context::Context, name: String) -> String {
|
||||
format!("Hello, {name}!")
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let (_, server_transport) = tarpc::transport::channel::unbounded();
|
||||
let server = once(async move { server::BaseChannel::with_defaults(server_transport) });
|
||||
|
||||
#[deny(unused_must_use)]
|
||||
{
|
||||
server.execute(HelloServer.serve());
|
||||
}
|
||||
}
|
||||
@@ -1,11 +0,0 @@
|
||||
error: unused `TokioServerExecutor` that must be used
|
||||
--> tests/compile_fail/tokio/must_use_server_executor.rs:28:9
|
||||
|
|
||||
28 | server.execute(HelloServer.serve());
|
||||
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
|
||||
note: the lint level is defined here
|
||||
--> tests/compile_fail/tokio/must_use_server_executor.rs:26:12
|
||||
|
|
||||
26 | #[deny(unused_must_use)]
|
||||
| ^^^^^^^^^^^^^^^
|
||||
Reference in New Issue
Block a user