mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-02-23 15:49:54 +01:00
Compare commits
21 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0c08d5e8ca | ||
|
|
75b15fe2aa | ||
|
|
863a08d87e | ||
|
|
49ba8f8b1b | ||
|
|
d832209da3 | ||
|
|
584426d414 | ||
|
|
50eb80c883 | ||
|
|
1f0c80d8c9 | ||
|
|
99bf3e62a3 | ||
|
|
68863e3db0 | ||
|
|
453ba1c074 | ||
|
|
e3eac1b4f5 | ||
|
|
0e102288a5 | ||
|
|
4c8ba41b2f | ||
|
|
946c627579 | ||
|
|
104dd71bba | ||
|
|
012c481861 | ||
|
|
dc12bd09aa | ||
|
|
2594ea8ce9 | ||
|
|
839b87e394 | ||
|
|
57d0638a99 |
16
.github/workflows/main.yml
vendored
16
.github/workflows/main.yml
vendored
@@ -14,10 +14,10 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Cancel previous
|
||||
uses: styfle/cancel-workflow-action@0.7.0
|
||||
uses: styfle/cancel-workflow-action@0.10.0
|
||||
with:
|
||||
access_token: ${{ github.token }}
|
||||
- uses: actions/checkout@v1
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
profile: minimal
|
||||
@@ -38,10 +38,10 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Cancel previous
|
||||
uses: styfle/cancel-workflow-action@0.7.0
|
||||
uses: styfle/cancel-workflow-action@0.10.0
|
||||
with:
|
||||
access_token: ${{ github.token }}
|
||||
- uses: actions/checkout@v1
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
profile: minimal
|
||||
@@ -76,10 +76,10 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Cancel previous
|
||||
uses: styfle/cancel-workflow-action@0.7.0
|
||||
uses: styfle/cancel-workflow-action@0.10.0
|
||||
with:
|
||||
access_token: ${{ github.token }}
|
||||
- uses: actions/checkout@v1
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
profile: minimal
|
||||
@@ -96,10 +96,10 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Cancel previous
|
||||
uses: styfle/cancel-workflow-action@0.7.0
|
||||
uses: styfle/cancel-workflow-action@0.10.0
|
||||
with:
|
||||
access_token: ${{ github.token }}
|
||||
- uses: actions/checkout@v1
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
profile: minimal
|
||||
|
||||
@@ -67,7 +67,7 @@ Some other features of tarpc:
|
||||
Add to your `Cargo.toml` dependencies:
|
||||
|
||||
```toml
|
||||
tarpc = "0.29"
|
||||
tarpc = "0.31"
|
||||
```
|
||||
|
||||
The `tarpc::service` attribute expands to a collection of items that form an rpc service.
|
||||
@@ -82,7 +82,7 @@ your `Cargo.toml`:
|
||||
```toml
|
||||
anyhow = "1.0"
|
||||
futures = "0.3"
|
||||
tarpc = { version = "0.29", features = ["tokio1"] }
|
||||
tarpc = { version = "0.31", features = ["tokio1"] }
|
||||
tokio = { version = "1.0", features = ["macros"] }
|
||||
```
|
||||
|
||||
|
||||
26
RELEASES.md
26
RELEASES.md
@@ -1,3 +1,29 @@
|
||||
## 0.31.0 (2022-11-03)
|
||||
|
||||
### New Features
|
||||
|
||||
This release adds Unix Domain Sockets to the `serde_transport` module.
|
||||
To use it, enable the "unix" feature. See the docs for more information.
|
||||
|
||||
## 0.30.0 (2022-08-12)
|
||||
|
||||
### Breaking Changes
|
||||
|
||||
- Some types that impl Future are now annotated with `#[must_use]`. Code that previously created
|
||||
these types but did not use them will now receive a warning. Code that disallows warnings will
|
||||
receive a compilation error.
|
||||
|
||||
### Fixes
|
||||
|
||||
- Servers will more reliably clean up request state for requests with long deadlines when response
|
||||
processing is aborted without sending a response.
|
||||
|
||||
### Other Changes
|
||||
|
||||
- `TrackedRequest` now contains a response guard that can be used to ensure state cleanup for
|
||||
aborted requests. (This was already handled automatically by `InFlightRequests`).
|
||||
- When the feature serde-transport is enabled, the crate tokio_serde is now re-exported.
|
||||
|
||||
## 0.29.0 (2022-05-26)
|
||||
|
||||
### Breaking Changes
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tarpc-example-service"
|
||||
version = "0.11.0"
|
||||
version = "0.13.0"
|
||||
rust-version = "1.56"
|
||||
authors = ["Tim Kuehn <tikue@google.com>"]
|
||||
edition = "2021"
|
||||
@@ -18,14 +18,14 @@ anyhow = "1.0"
|
||||
clap = { version = "3.0.0-rc.9", features = ["derive"] }
|
||||
log = "0.4"
|
||||
futures = "0.3"
|
||||
opentelemetry = { version = "0.16", features = ["rt-tokio"] }
|
||||
opentelemetry-jaeger = { version = "0.15", features = ["rt-tokio"] }
|
||||
opentelemetry = { version = "0.17", features = ["rt-tokio"] }
|
||||
opentelemetry-jaeger = { version = "0.16", features = ["rt-tokio"] }
|
||||
rand = "0.8"
|
||||
tarpc = { version = "0.29", path = "../tarpc", features = ["full"] }
|
||||
tarpc = { version = "0.31", path = "../tarpc", features = ["full"] }
|
||||
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread"] }
|
||||
tracing = { version = "0.1" }
|
||||
tracing-opentelemetry = "0.15"
|
||||
tracing-subscriber = "0.2"
|
||||
tracing-opentelemetry = "0.17"
|
||||
tracing-subscriber = {version = "0.3", features = ["env-filter"]}
|
||||
|
||||
[lib]
|
||||
name = "service"
|
||||
|
||||
@@ -54,6 +54,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
// JSON transport is provided by the json_transport tarpc module. It makes it easy
|
||||
// to start up a serde-powered json serialization strategy over TCP.
|
||||
let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Json::default).await?;
|
||||
tracing::info!("Listening on port {}", listener.local_addr().port());
|
||||
listener.config_mut().max_frame_length(usize::MAX);
|
||||
listener
|
||||
// Ignore accept errors.
|
||||
|
||||
9
plugins/LICENSE
Normal file
9
plugins/LICENSE
Normal file
@@ -0,0 +1,9 @@
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright 2016 Google Inc. All Rights Reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
@@ -285,7 +285,7 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
|
||||
args,
|
||||
method_attrs: &rpcs.iter().map(|rpc| &*rpc.attrs).collect::<Vec<_>>(),
|
||||
method_idents: &methods,
|
||||
request_names: &*request_names,
|
||||
request_names: &request_names,
|
||||
attrs,
|
||||
rpcs,
|
||||
return_types: &rpcs
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tarpc"
|
||||
version = "0.29.0"
|
||||
version = "0.31.0"
|
||||
rust-version = "1.58.0"
|
||||
authors = [
|
||||
"Adam Wright <adam.austin.wright@gmail.com>",
|
||||
@@ -13,7 +13,7 @@ homepage = "https://github.com/google/tarpc"
|
||||
repository = "https://github.com/google/tarpc"
|
||||
keywords = ["rpc", "network", "server", "api", "microservices"]
|
||||
categories = ["asynchronous", "network-programming"]
|
||||
readme = "../README.md"
|
||||
readme = "README.md"
|
||||
description = "An RPC framework for Rust with a focus on ease of use."
|
||||
|
||||
[features]
|
||||
@@ -25,6 +25,7 @@ serde-transport = ["serde1", "tokio1", "tokio-serde", "tokio-util/codec"]
|
||||
serde-transport-json = ["tokio-serde/json"]
|
||||
serde-transport-bincode = ["tokio-serde/bincode"]
|
||||
tcp = ["tokio/net"]
|
||||
unix = ["tokio/net"]
|
||||
|
||||
full = [
|
||||
"serde1",
|
||||
@@ -33,6 +34,7 @@ full = [
|
||||
"serde-transport-json",
|
||||
"serde-transport-bincode",
|
||||
"tcp",
|
||||
"unix",
|
||||
]
|
||||
|
||||
[badges]
|
||||
@@ -50,7 +52,7 @@ static_assertions = "1.1.0"
|
||||
tarpc-plugins = { path = "../plugins", version = "0.12" }
|
||||
thiserror = "1.0"
|
||||
tokio = { version = "1", features = ["time"] }
|
||||
tokio-util = { version = "0.6.9", features = ["time"] }
|
||||
tokio-util = { version = "0.7.3", features = ["time"] }
|
||||
tokio-serde = { optional = true, version = "0.8" }
|
||||
tracing = { version = "0.1", default-features = false, features = [
|
||||
"attributes",
|
||||
|
||||
9
tarpc/LICENSE
Normal file
9
tarpc/LICENSE
Normal file
@@ -0,0 +1,9 @@
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright 2016 Google Inc. All Rights Reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
@@ -1,8 +1,9 @@
|
||||
use tarpc::context::Context;
|
||||
use tarpc::serde_transport as transport;
|
||||
use tarpc::server::{BaseChannel, Channel};
|
||||
use tarpc::{context::Context, tokio_serde::formats::Bincode};
|
||||
use tarpc::tokio_serde::formats::Bincode;
|
||||
use tarpc::tokio_util::codec::length_delimited::LengthDelimitedCodec;
|
||||
use tokio::net::{UnixListener, UnixStream};
|
||||
use tokio_util::codec::length_delimited::LengthDelimitedCodec;
|
||||
|
||||
#[tarpc::service]
|
||||
pub trait PingService {
|
||||
|
||||
@@ -52,9 +52,9 @@ use tarpc::{
|
||||
client, context,
|
||||
serde_transport::tcp,
|
||||
server::{self, Channel},
|
||||
tokio_serde::formats::Json,
|
||||
};
|
||||
use tokio::net::ToSocketAddrs;
|
||||
use tokio_serde::formats::Json;
|
||||
use tracing::info;
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
@@ -129,7 +129,6 @@ impl Subscriber {
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Subscription {
|
||||
subscriber: subscriber::SubscriberClient,
|
||||
topics: Vec<String>,
|
||||
}
|
||||
|
||||
@@ -210,7 +209,6 @@ impl Publisher {
|
||||
self.clients.lock().unwrap().insert(
|
||||
subscriber_addr,
|
||||
Subscription {
|
||||
subscriber: subscriber.clone(),
|
||||
topics: topics.clone(),
|
||||
},
|
||||
);
|
||||
|
||||
@@ -9,8 +9,8 @@ use futures::{future, prelude::*};
|
||||
use tarpc::{
|
||||
client, context,
|
||||
server::{incoming::Incoming, BaseChannel},
|
||||
tokio_serde::formats::Json,
|
||||
};
|
||||
use tokio_serde::formats::Json;
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
pub mod add {
|
||||
|
||||
49
tarpc/src/cancellations.rs
Normal file
49
tarpc/src/cancellations.rs
Normal file
@@ -0,0 +1,49 @@
|
||||
use futures::{prelude::*, task::*};
|
||||
use std::pin::Pin;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
/// Sends request cancellation signals.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RequestCancellation(mpsc::UnboundedSender<u64>);
|
||||
|
||||
/// A stream of IDs of requests that have been canceled.
|
||||
#[derive(Debug)]
|
||||
pub struct CanceledRequests(mpsc::UnboundedReceiver<u64>);
|
||||
|
||||
/// Returns a channel to send request cancellation messages.
|
||||
pub fn cancellations() -> (RequestCancellation, CanceledRequests) {
|
||||
// Unbounded because messages are sent in the drop fn. This is fine, because it's still
|
||||
// bounded by the number of in-flight requests.
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
(RequestCancellation(tx), CanceledRequests(rx))
|
||||
}
|
||||
|
||||
impl RequestCancellation {
|
||||
/// Cancels the request with ID `request_id`.
|
||||
///
|
||||
/// No validation is done of `request_id`. There is no way to know if the request id provided
|
||||
/// corresponds to a request actually tracked by the backing channel. `RequestCancellation` is
|
||||
/// a one-way communication channel.
|
||||
///
|
||||
/// Once request data is cleaned up, a response will never be received by the client. This is
|
||||
/// useful primarily when request processing ends prematurely for requests with long deadlines
|
||||
/// which would otherwise continue to be tracked by the backing channel—a kind of leak.
|
||||
pub fn cancel(&self, request_id: u64) {
|
||||
let _ = self.0.send(request_id);
|
||||
}
|
||||
}
|
||||
|
||||
impl CanceledRequests {
|
||||
/// Polls for a cancelled request.
|
||||
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<u64>> {
|
||||
self.0.poll_recv(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for CanceledRequests {
|
||||
type Item = u64;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<u64>> {
|
||||
self.poll_recv(cx)
|
||||
}
|
||||
}
|
||||
@@ -8,7 +8,10 @@
|
||||
|
||||
mod in_flight_requests;
|
||||
|
||||
use crate::{context, trace, ClientMessage, Request, Response, ServerError, Transport};
|
||||
use crate::{
|
||||
cancellations::{cancellations, CanceledRequests, RequestCancellation},
|
||||
context, trace, ClientMessage, Request, Response, ServerError, Transport,
|
||||
};
|
||||
use futures::{prelude::*, ready, stream::Fuse, task::*};
|
||||
use in_flight_requests::{DeadlineExceededError, InFlightRequests};
|
||||
use pin_project::pin_project;
|
||||
@@ -126,7 +129,7 @@ impl<Req, Resp> Channel<Req, Resp> {
|
||||
) -> Result<Resp, RpcError> {
|
||||
let span = Span::current();
|
||||
ctx.trace_context = trace::Context::try_from(&span).unwrap_or_else(|_| {
|
||||
tracing::warn!(
|
||||
tracing::trace!(
|
||||
"OpenTelemetry subscriber not installed; making unsampled child context."
|
||||
);
|
||||
ctx.trace_context.new_child()
|
||||
@@ -255,6 +258,7 @@ where
|
||||
|
||||
/// Handles the lifecycle of requests, writing requests to the wire, managing cancellations,
|
||||
/// and dispatching responses to the appropriate channel.
|
||||
#[must_use]
|
||||
#[pin_project]
|
||||
#[derive(Debug)]
|
||||
pub struct RequestDispatch<Req, Resp, C> {
|
||||
@@ -391,11 +395,7 @@ where
|
||||
// Receiving Poll::Ready(None) when polling expired requests never indicates "Closed",
|
||||
// because there can temporarily be zero in-flight rquests. Therefore, there is no need to
|
||||
// track the status like is done with pending and cancelled requests.
|
||||
if let Poll::Ready(Some(_)) = self
|
||||
.in_flight_requests()
|
||||
.poll_expired(cx)
|
||||
.map_err(ChannelError::Timer)?
|
||||
{
|
||||
if let Poll::Ready(Some(_)) = self.in_flight_requests().poll_expired(cx) {
|
||||
// Expired requests are considered complete; there is no compelling reason to send a
|
||||
// cancellation message to the server, since it will have already exhausted its
|
||||
// allotted processing time.
|
||||
@@ -602,49 +602,9 @@ struct DispatchRequest<Req, Resp> {
|
||||
pub response_completion: oneshot::Sender<Result<Response<Resp>, DeadlineExceededError>>,
|
||||
}
|
||||
|
||||
/// Sends request cancellation signals.
|
||||
#[derive(Debug, Clone)]
|
||||
struct RequestCancellation(mpsc::UnboundedSender<u64>);
|
||||
|
||||
/// A stream of IDs of requests that have been canceled.
|
||||
#[derive(Debug)]
|
||||
struct CanceledRequests(mpsc::UnboundedReceiver<u64>);
|
||||
|
||||
/// Returns a channel to send request cancellation messages.
|
||||
fn cancellations() -> (RequestCancellation, CanceledRequests) {
|
||||
// Unbounded because messages are sent in the drop fn. This is fine, because it's still
|
||||
// bounded by the number of in-flight requests.
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
(RequestCancellation(tx), CanceledRequests(rx))
|
||||
}
|
||||
|
||||
impl RequestCancellation {
|
||||
/// Cancels the request with ID `request_id`.
|
||||
fn cancel(&self, request_id: u64) {
|
||||
let _ = self.0.send(request_id);
|
||||
}
|
||||
}
|
||||
|
||||
impl CanceledRequests {
|
||||
fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<u64>> {
|
||||
self.0.poll_recv(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for CanceledRequests {
|
||||
type Item = u64;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<u64>> {
|
||||
self.poll_recv(cx)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{
|
||||
cancellations, CanceledRequests, Channel, DispatchRequest, RequestCancellation,
|
||||
RequestDispatch, ResponseGuard,
|
||||
};
|
||||
use super::{cancellations, Channel, DispatchRequest, RequestDispatch, ResponseGuard};
|
||||
use crate::{
|
||||
client::{
|
||||
in_flight_requests::{DeadlineExceededError, InFlightRequests},
|
||||
@@ -668,7 +628,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn response_completes_request_future() {
|
||||
let (mut dispatch, mut _channel, mut server_channel) = set_up();
|
||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||
let cx = &mut Context::from_waker(noop_waker_ref());
|
||||
let (tx, mut rx) = oneshot::channel();
|
||||
|
||||
dispatch
|
||||
@@ -696,8 +656,8 @@ mod tests {
|
||||
request_id: 3,
|
||||
});
|
||||
// resp's drop() is run, which should send a cancel message.
|
||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||
assert_eq!(canceled_requests.0.poll_recv(cx), Poll::Ready(Some(3)));
|
||||
let cx = &mut Context::from_waker(noop_waker_ref());
|
||||
assert_eq!(canceled_requests.poll_recv(cx), Poll::Ready(Some(3)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -719,18 +679,19 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
drop(cancellation);
|
||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||
assert_eq!(canceled_requests.0.poll_recv(cx), Poll::Ready(None));
|
||||
let cx = &mut Context::from_waker(noop_waker_ref());
|
||||
assert_eq!(canceled_requests.poll_recv(cx), Poll::Ready(None));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn stage_request() {
|
||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||
let cx = &mut Context::from_waker(noop_waker_ref());
|
||||
let (tx, mut rx) = oneshot::channel();
|
||||
|
||||
let _resp = send_request(&mut channel, "hi", tx, &mut rx).await;
|
||||
|
||||
#[allow(unstable_name_collisions)]
|
||||
let req = dispatch.as_mut().poll_next_request(cx).ready();
|
||||
assert!(req.is_some());
|
||||
|
||||
@@ -743,7 +704,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn stage_request_channel_dropped_doesnt_panic() {
|
||||
let (mut dispatch, mut channel, mut server_channel) = set_up();
|
||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||
let cx = &mut Context::from_waker(noop_waker_ref());
|
||||
let (tx, mut rx) = oneshot::channel();
|
||||
|
||||
let _ = send_request(&mut channel, "hi", tx, &mut rx).await;
|
||||
@@ -761,10 +722,11 @@ mod tests {
|
||||
dispatch.await.unwrap();
|
||||
}
|
||||
|
||||
#[allow(unstable_name_collisions)]
|
||||
#[tokio::test]
|
||||
async fn stage_request_response_future_dropped_is_canceled_before_sending() {
|
||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||
let cx = &mut Context::from_waker(noop_waker_ref());
|
||||
let (tx, mut rx) = oneshot::channel();
|
||||
|
||||
let _ = send_request(&mut channel, "hi", tx, &mut rx).await;
|
||||
@@ -776,10 +738,11 @@ mod tests {
|
||||
assert!(dispatch.as_mut().poll_next_request(cx).ready().is_none());
|
||||
}
|
||||
|
||||
#[allow(unstable_name_collisions)]
|
||||
#[tokio::test]
|
||||
async fn stage_request_response_future_dropped_is_canceled_after_sending() {
|
||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||
let cx = &mut Context::from_waker(noop_waker_ref());
|
||||
let (tx, mut rx) = oneshot::channel();
|
||||
|
||||
let req = send_request(&mut channel, "hi", tx, &mut rx).await;
|
||||
@@ -800,7 +763,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn stage_request_response_closed_skipped() {
|
||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||
let cx = &mut Context::from_waker(noop_waker_ref());
|
||||
let (tx, mut rx) = oneshot::channel();
|
||||
|
||||
// Test that a request future that's closed its receiver but not yet canceled its request --
|
||||
@@ -828,18 +791,17 @@ mod tests {
|
||||
let _ = tracing_subscriber::fmt().with_test_writer().try_init();
|
||||
|
||||
let (to_dispatch, pending_requests) = mpsc::channel(1);
|
||||
let (cancel_tx, canceled_requests) = mpsc::unbounded_channel();
|
||||
let (cancellation, canceled_requests) = cancellations();
|
||||
let (client_channel, server_channel) = transport::channel::unbounded();
|
||||
|
||||
let dispatch = RequestDispatch::<String, String, _> {
|
||||
transport: client_channel.fuse(),
|
||||
pending_requests: pending_requests,
|
||||
canceled_requests: CanceledRequests(canceled_requests),
|
||||
pending_requests,
|
||||
canceled_requests,
|
||||
in_flight_requests: InFlightRequests::default(),
|
||||
config: Config::default(),
|
||||
};
|
||||
|
||||
let cancellation = RequestCancellation(cancel_tx);
|
||||
let channel = Channel {
|
||||
to_dispatch,
|
||||
cancellation,
|
||||
@@ -864,13 +826,13 @@ mod tests {
|
||||
request: request.to_string(),
|
||||
response_completion,
|
||||
};
|
||||
channel.to_dispatch.send(request).await.unwrap();
|
||||
|
||||
ResponseGuard {
|
||||
let response_guard = ResponseGuard {
|
||||
response,
|
||||
cancellation: &channel.cancellation,
|
||||
request_id,
|
||||
}
|
||||
};
|
||||
channel.to_dispatch.send(request).await.unwrap();
|
||||
response_guard
|
||||
}
|
||||
|
||||
async fn send_response(
|
||||
|
||||
@@ -117,12 +117,9 @@ impl<Resp> InFlightRequests<Resp> {
|
||||
|
||||
/// Yields a request that has expired, completing it with a TimedOut error.
|
||||
/// The caller should send cancellation messages for any yielded request ID.
|
||||
pub fn poll_expired(
|
||||
&mut self,
|
||||
cx: &mut Context,
|
||||
) -> Poll<Option<Result<u64, tokio::time::error::Error>>> {
|
||||
self.deadlines.poll_expired(cx).map_ok(|expired| {
|
||||
let request_id = expired.into_inner();
|
||||
pub fn poll_expired(&mut self, cx: &mut Context) -> Poll<Option<u64>> {
|
||||
self.deadlines.poll_expired(cx).map(|expired| {
|
||||
let request_id = expired?.into_inner();
|
||||
if let Some(request_data) = self.request_data.remove(&request_id) {
|
||||
let _entered = request_data.span.enter();
|
||||
tracing::error!("DeadlineExceeded");
|
||||
@@ -131,7 +128,7 @@ impl<Resp> InFlightRequests<Resp> {
|
||||
.response_completion
|
||||
.send(Err(DeadlineExceededError));
|
||||
}
|
||||
request_id
|
||||
Some(request_id)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -209,7 +209,7 @@
|
||||
pub use serde;
|
||||
|
||||
#[cfg(feature = "serde-transport")]
|
||||
pub use tokio_serde;
|
||||
pub use {tokio_serde, tokio_util};
|
||||
|
||||
#[cfg(feature = "serde-transport")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "serde-transport")))]
|
||||
@@ -300,6 +300,7 @@ pub use tarpc_plugins::service;
|
||||
/// `async`, meaning that this should not break existing code.
|
||||
pub use tarpc_plugins::server;
|
||||
|
||||
pub(crate) mod cancellations;
|
||||
pub mod client;
|
||||
pub mod context;
|
||||
pub mod server;
|
||||
|
||||
@@ -149,6 +149,7 @@ pub mod tcp {
|
||||
}
|
||||
|
||||
/// A connection Future that also exposes the length-delimited framing config.
|
||||
#[must_use]
|
||||
#[pin_project]
|
||||
pub struct Connect<T, Item, SinkItem, CodecFn> {
|
||||
#[pin]
|
||||
@@ -276,6 +277,270 @@ pub mod tcp {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "unix"))]
|
||||
#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "unix"))))]
|
||||
/// Unix Domain Socket support for generic transport using Tokio.
|
||||
pub mod unix {
|
||||
use {
|
||||
super::*,
|
||||
futures::ready,
|
||||
std::{marker::PhantomData, path::Path},
|
||||
tokio::net::{unix::SocketAddr, UnixListener, UnixStream},
|
||||
tokio_util::codec::length_delimited,
|
||||
};
|
||||
|
||||
impl<Item, SinkItem, Codec> Transport<UnixStream, Item, SinkItem, Codec> {
|
||||
/// Returns the socket address of the remote half of the underlying [`UnixStream`].
|
||||
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
|
||||
self.inner.get_ref().get_ref().peer_addr()
|
||||
}
|
||||
/// Returns the socket address of the local half of the underlying [`UnixStream`].
|
||||
pub fn local_addr(&self) -> io::Result<SocketAddr> {
|
||||
self.inner.get_ref().get_ref().local_addr()
|
||||
}
|
||||
}
|
||||
|
||||
/// A connection Future that also exposes the length-delimited framing config.
|
||||
#[must_use]
|
||||
#[pin_project]
|
||||
pub struct Connect<T, Item, SinkItem, CodecFn> {
|
||||
#[pin]
|
||||
inner: T,
|
||||
codec_fn: CodecFn,
|
||||
config: length_delimited::Builder,
|
||||
ghost: PhantomData<(fn(SinkItem), fn() -> Item)>,
|
||||
}
|
||||
|
||||
impl<T, Item, SinkItem, Codec, CodecFn> Future for Connect<T, Item, SinkItem, CodecFn>
|
||||
where
|
||||
T: Future<Output = io::Result<UnixStream>>,
|
||||
Item: for<'de> Deserialize<'de>,
|
||||
SinkItem: Serialize,
|
||||
Codec: Serializer<SinkItem> + Deserializer<Item>,
|
||||
CodecFn: Fn() -> Codec,
|
||||
{
|
||||
type Output = io::Result<Transport<UnixStream, Item, SinkItem, Codec>>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let io = ready!(self.as_mut().project().inner.poll(cx))?;
|
||||
Poll::Ready(Ok(new(self.config.new_framed(io), (self.codec_fn)())))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, Item, SinkItem, CodecFn> Connect<T, Item, SinkItem, CodecFn> {
|
||||
/// Returns an immutable reference to the length-delimited codec's config.
|
||||
pub fn config(&self) -> &length_delimited::Builder {
|
||||
&self.config
|
||||
}
|
||||
|
||||
/// Returns a mutable reference to the length-delimited codec's config.
|
||||
pub fn config_mut(&mut self) -> &mut length_delimited::Builder {
|
||||
&mut self.config
|
||||
}
|
||||
}
|
||||
|
||||
/// Connects to socket named by `path`, wrapping the connection in a Unix Domain Socket
|
||||
/// transport.
|
||||
pub fn connect<P, Item, SinkItem, Codec, CodecFn>(
|
||||
path: P,
|
||||
codec_fn: CodecFn,
|
||||
) -> Connect<impl Future<Output = io::Result<UnixStream>>, Item, SinkItem, CodecFn>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
Item: for<'de> Deserialize<'de>,
|
||||
SinkItem: Serialize,
|
||||
Codec: Serializer<SinkItem> + Deserializer<Item>,
|
||||
CodecFn: Fn() -> Codec,
|
||||
{
|
||||
Connect {
|
||||
inner: UnixStream::connect(path),
|
||||
codec_fn,
|
||||
config: LengthDelimitedCodec::builder(),
|
||||
ghost: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Listens on the socket named by `path`, wrapping accepted connections in Unix Domain Socket
|
||||
/// transports.
|
||||
pub async fn listen<P, Item, SinkItem, Codec, CodecFn>(
|
||||
path: P,
|
||||
codec_fn: CodecFn,
|
||||
) -> io::Result<Incoming<Item, SinkItem, Codec, CodecFn>>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
Item: for<'de> Deserialize<'de>,
|
||||
Codec: Serializer<SinkItem> + Deserializer<Item>,
|
||||
CodecFn: Fn() -> Codec,
|
||||
{
|
||||
let listener = UnixListener::bind(path)?;
|
||||
let local_addr = listener.local_addr()?;
|
||||
Ok(Incoming {
|
||||
listener,
|
||||
codec_fn,
|
||||
local_addr,
|
||||
config: LengthDelimitedCodec::builder(),
|
||||
ghost: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
/// A [`UnixListener`] that wraps connections in [transports](Transport).
|
||||
#[pin_project]
|
||||
#[derive(Debug)]
|
||||
pub struct Incoming<Item, SinkItem, Codec, CodecFn> {
|
||||
listener: UnixListener,
|
||||
local_addr: SocketAddr,
|
||||
codec_fn: CodecFn,
|
||||
config: length_delimited::Builder,
|
||||
ghost: PhantomData<(fn() -> Item, fn(SinkItem), Codec)>,
|
||||
}
|
||||
|
||||
impl<Item, SinkItem, Codec, CodecFn> Incoming<Item, SinkItem, Codec, CodecFn> {
|
||||
/// Returns the the socket address being listened on.
|
||||
pub fn local_addr(&self) -> &SocketAddr {
|
||||
&self.local_addr
|
||||
}
|
||||
|
||||
/// Returns an immutable reference to the length-delimited codec's config.
|
||||
pub fn config(&self) -> &length_delimited::Builder {
|
||||
&self.config
|
||||
}
|
||||
|
||||
/// Returns a mutable reference to the length-delimited codec's config.
|
||||
pub fn config_mut(&mut self) -> &mut length_delimited::Builder {
|
||||
&mut self.config
|
||||
}
|
||||
}
|
||||
|
||||
impl<Item, SinkItem, Codec, CodecFn> Stream for Incoming<Item, SinkItem, Codec, CodecFn>
|
||||
where
|
||||
Item: for<'de> Deserialize<'de>,
|
||||
SinkItem: Serialize,
|
||||
Codec: Serializer<SinkItem> + Deserializer<Item>,
|
||||
CodecFn: Fn() -> Codec,
|
||||
{
|
||||
type Item = io::Result<Transport<UnixStream, Item, SinkItem, Codec>>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let conn: UnixStream = ready!(self.as_mut().project().listener.poll_accept(cx)?).0;
|
||||
Poll::Ready(Some(Ok(new(
|
||||
self.config.new_framed(conn),
|
||||
(self.codec_fn)(),
|
||||
))))
|
||||
}
|
||||
}
|
||||
|
||||
/// A temporary `PathBuf` that lives in `std::env::temp_dir` and is removed on drop.
|
||||
pub struct TempPathBuf(std::path::PathBuf);
|
||||
|
||||
impl TempPathBuf {
|
||||
/// A named socket that results in `<tempdir>/<name>`
|
||||
pub fn new<S: AsRef<str>>(name: S) -> Self {
|
||||
let mut sock = std::env::temp_dir();
|
||||
sock.push(name.as_ref());
|
||||
Self(sock)
|
||||
}
|
||||
|
||||
/// Appends a random hex string to the socket name resulting in
|
||||
/// `<tempdir>/<name>_<xxxxx>`
|
||||
pub fn with_random<S: AsRef<str>>(name: S) -> Self {
|
||||
Self::new(format!("{}_{:016x}", name.as_ref(), rand::random::<u64>()))
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<std::path::Path> for TempPathBuf {
|
||||
fn as_ref(&self) -> &std::path::Path {
|
||||
self.0.as_path()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TempPathBuf {
|
||||
fn drop(&mut self) {
|
||||
// This will remove the file pointed to by this PathBuf if it exists, however Err's can
|
||||
// be returned such as attempting to remove a non-existing file, or one which we don't
|
||||
// have permission to remove. In these cases the Err is swallowed
|
||||
let _ = std::fs::remove_file(&self.0);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tokio_serde::formats::SymmetricalJson;
|
||||
|
||||
#[test]
|
||||
fn temp_path_buf_non_random() {
|
||||
let sock = TempPathBuf::new("test");
|
||||
let mut good = std::env::temp_dir();
|
||||
good.push("test");
|
||||
assert_eq!(sock.as_ref(), good);
|
||||
assert_eq!(sock.as_ref().file_name().unwrap(), "test");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn temp_path_buf_random() {
|
||||
let sock = TempPathBuf::with_random("test");
|
||||
let good = std::env::temp_dir();
|
||||
assert!(sock.as_ref().starts_with(good));
|
||||
// Since there are 16 random characters we just assert the file_name has the right name
|
||||
// and starts with the correct string 'test_'
|
||||
// file name: test_xxxxxxxxxxxxxxxx
|
||||
// test = 4
|
||||
// _ = 1
|
||||
// <hex> = 16
|
||||
// total = 21
|
||||
let fname = sock.as_ref().file_name().unwrap().to_string_lossy();
|
||||
assert!(fname.starts_with("test_"));
|
||||
assert_eq!(fname.len(), 21);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn temp_path_buf_non_existing() {
|
||||
let sock = TempPathBuf::with_random("test");
|
||||
let sock_path = std::path::PathBuf::from(sock.as_ref());
|
||||
|
||||
// No actual file has been created yet
|
||||
assert!(!sock_path.exists());
|
||||
// Should not panic
|
||||
std::mem::drop(sock);
|
||||
assert!(!sock_path.exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn temp_path_buf_existing_file() {
|
||||
let sock = TempPathBuf::with_random("test");
|
||||
let sock_path = std::path::PathBuf::from(sock.as_ref());
|
||||
let _file = std::fs::File::create(&sock).unwrap();
|
||||
assert!(sock_path.exists());
|
||||
std::mem::drop(sock);
|
||||
assert!(!sock_path.exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn temp_path_buf_preexisting_file() {
|
||||
let mut pre_existing = std::env::temp_dir();
|
||||
pre_existing.push("test");
|
||||
let _file = std::fs::File::create(&pre_existing).unwrap();
|
||||
let sock = TempPathBuf::new("test");
|
||||
let sock_path = std::path::PathBuf::from(sock.as_ref());
|
||||
assert!(sock_path.exists());
|
||||
std::mem::drop(sock);
|
||||
assert!(!sock_path.exists());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn temp_path_buf_for_socket() {
|
||||
let sock = TempPathBuf::with_random("test");
|
||||
// Save path for testing after drop
|
||||
let sock_path = std::path::PathBuf::from(sock.as_ref());
|
||||
// create the actual socket
|
||||
let _ = listen(&sock, SymmetricalJson::<String>::default).await;
|
||||
assert!(sock_path.exists());
|
||||
std::mem::drop(sock);
|
||||
assert!(!sock_path.exists());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::Transport;
|
||||
@@ -290,7 +555,7 @@ mod tests {
|
||||
use tokio_serde::formats::SymmetricalJson;
|
||||
|
||||
fn ctx() -> Context<'static> {
|
||||
Context::from_waker(&noop_waker_ref())
|
||||
Context::from_waker(noop_waker_ref())
|
||||
}
|
||||
|
||||
struct TestIo(Cursor<Vec<u8>>);
|
||||
@@ -392,4 +657,24 @@ mod tests {
|
||||
assert_matches!(transport.next().await, None);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "unix"))]
|
||||
#[tokio::test]
|
||||
async fn uds() -> io::Result<()> {
|
||||
use super::unix;
|
||||
use super::*;
|
||||
|
||||
let sock = unix::TempPathBuf::with_random("uds");
|
||||
let mut listener = unix::listen(&sock, SymmetricalJson::<String>::default).await?;
|
||||
tokio::spawn(async move {
|
||||
let mut transport = listener.next().await.unwrap().unwrap();
|
||||
let message = transport.next().await.unwrap().unwrap();
|
||||
transport.send(message).await.unwrap();
|
||||
});
|
||||
let mut transport = unix::connect(&sock, SymmetricalJson::<String>::default).await?;
|
||||
transport.send(String::from("test")).await?;
|
||||
assert_matches!(transport.next().await, Some(Ok(s)) if s == "test");
|
||||
assert_matches!(transport.next().await, None);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
//! Provides a server that concurrently handles many connections sending multiplexed requests.
|
||||
|
||||
use crate::{
|
||||
cancellations::{cancellations, CanceledRequests, RequestCancellation},
|
||||
context::{self, SpanExt},
|
||||
trace, ClientMessage, Request, Response, Transport,
|
||||
};
|
||||
@@ -20,7 +21,14 @@ use futures::{
|
||||
};
|
||||
use in_flight_requests::{AlreadyExistsError, InFlightRequests};
|
||||
use pin_project::pin_project;
|
||||
use std::{convert::TryFrom, error::Error, fmt, marker::PhantomData, pin::Pin};
|
||||
use std::{
|
||||
convert::TryFrom,
|
||||
error::Error,
|
||||
fmt,
|
||||
marker::PhantomData,
|
||||
mem::{self, ManuallyDrop},
|
||||
pin::Pin,
|
||||
};
|
||||
use tracing::{info_span, instrument::Instrument, Span};
|
||||
|
||||
mod in_flight_requests;
|
||||
@@ -111,6 +119,11 @@ pub struct BaseChannel<Req, Resp, T> {
|
||||
/// Writes responses to the wire and reads requests off the wire.
|
||||
#[pin]
|
||||
transport: Fuse<T>,
|
||||
/// In-flight requests that were dropped by the server before completion.
|
||||
#[pin]
|
||||
canceled_requests: CanceledRequests,
|
||||
/// Notifies `canceled_requests` when a request is canceled.
|
||||
request_cancellation: RequestCancellation,
|
||||
/// Holds data necessary to clean up in-flight requests.
|
||||
in_flight_requests: InFlightRequests,
|
||||
/// Types the request and response.
|
||||
@@ -123,9 +136,12 @@ where
|
||||
{
|
||||
/// Creates a new channel backed by `transport` and configured with `config`.
|
||||
pub fn new(config: Config, transport: T) -> Self {
|
||||
let (request_cancellation, canceled_requests) = cancellations();
|
||||
BaseChannel {
|
||||
config,
|
||||
transport: transport.fuse(),
|
||||
canceled_requests,
|
||||
request_cancellation,
|
||||
in_flight_requests: InFlightRequests::default(),
|
||||
ghost: PhantomData,
|
||||
}
|
||||
@@ -150,12 +166,18 @@ where
|
||||
self.as_mut().project().in_flight_requests
|
||||
}
|
||||
|
||||
fn canceled_requests_pin_mut<'a>(
|
||||
self: &'a mut Pin<&mut Self>,
|
||||
) -> Pin<&'a mut CanceledRequests> {
|
||||
self.as_mut().project().canceled_requests
|
||||
}
|
||||
|
||||
fn transport_pin_mut<'a>(self: &'a mut Pin<&mut Self>) -> Pin<&'a mut Fuse<T>> {
|
||||
self.as_mut().project().transport
|
||||
}
|
||||
|
||||
fn start_request(
|
||||
self: Pin<&mut Self>,
|
||||
mut self: Pin<&mut Self>,
|
||||
mut request: Request<Req>,
|
||||
) -> Result<TrackedRequest<Req>, AlreadyExistsError> {
|
||||
let span = info_span!(
|
||||
@@ -175,7 +197,7 @@ where
|
||||
});
|
||||
let entered = span.enter();
|
||||
tracing::info!("ReceiveRequest");
|
||||
let start = self.project().in_flight_requests.start_request(
|
||||
let start = self.in_flight_requests_mut().start_request(
|
||||
request.id,
|
||||
request.context.deadline,
|
||||
span.clone(),
|
||||
@@ -184,9 +206,13 @@ where
|
||||
Ok(abort_registration) => {
|
||||
drop(entered);
|
||||
Ok(TrackedRequest {
|
||||
request,
|
||||
abort_registration,
|
||||
span,
|
||||
response_guard: ManuallyDrop::new(ResponseGuard {
|
||||
request_id: request.id,
|
||||
request_cancellation: self.request_cancellation.clone(),
|
||||
}),
|
||||
request,
|
||||
})
|
||||
}
|
||||
Err(AlreadyExistsError) => {
|
||||
@@ -213,6 +239,8 @@ pub struct TrackedRequest<Req> {
|
||||
pub abort_registration: AbortRegistration,
|
||||
/// A span representing the server processing of this request.
|
||||
pub span: Span,
|
||||
/// An inert response guard. Becomes active in an InFlightRequest.
|
||||
pub response_guard: ManuallyDrop<ResponseGuard>,
|
||||
}
|
||||
|
||||
/// The server end of an open connection with a client, receiving requests from, and sending
|
||||
@@ -231,13 +259,15 @@ pub struct TrackedRequest<Req> {
|
||||
/// [`Sink::send`](futures::sink::SinkExt::send) - A user is free to manually read requests
|
||||
/// from, and send responses into, a Channel in lieu of the previous methods. Channels stream
|
||||
/// [`TrackedRequests`](TrackedRequest), which, in addition to the request itself, contains the
|
||||
/// server [`Span`] and request lifetime [`AbortRegistration`]. Wrapping response
|
||||
/// logic in an [`Abortable`] future using the abort registration will ensure that the response
|
||||
/// does not execute longer than the request deadline. The `Channel` itself will clean up
|
||||
/// request state once either the deadline expires, or a cancellation message is received, or a
|
||||
/// response is sent. Because there is no guarantee that a cancellation message will ever be
|
||||
/// received for a request, or that requests come with reasonably short deadlines, services
|
||||
/// should strive to clean up Channel resources by sending a response for every request.
|
||||
/// server [`Span`], request lifetime [`AbortRegistration`], and an inert [`ResponseGuard`].
|
||||
/// Wrapping response logic in an [`Abortable`] future using the abort registration will ensure
|
||||
/// that the response does not execute longer than the request deadline. The `Channel` itself
|
||||
/// will clean up request state once either the deadline expires, or the response guard is
|
||||
/// dropped, or a response is sent.
|
||||
///
|
||||
/// Channels must be implemented using the decorator pattern: the only way to create a
|
||||
/// `TrackedRequest` is to get one from another `Channel`. Ultimately, all `TrackedRequests` are
|
||||
/// created by [`BaseChannel`].
|
||||
pub trait Channel
|
||||
where
|
||||
Self: Transport<Response<<Self as Channel>::Resp>, TrackedRequest<<Self as Channel>::Req>>,
|
||||
@@ -334,20 +364,45 @@ where
|
||||
type Item = Result<TrackedRequest<Req>, ChannelError<T::Error>>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
#[derive(Debug)]
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
enum ReceiverStatus {
|
||||
Ready,
|
||||
Pending,
|
||||
Closed,
|
||||
}
|
||||
|
||||
impl ReceiverStatus {
|
||||
fn combine(self, other: Self) -> Self {
|
||||
use ReceiverStatus::*;
|
||||
match (self, other) {
|
||||
(Ready, _) | (_, Ready) => Ready,
|
||||
(Closed, Closed) => Closed,
|
||||
(Pending, Closed) | (Closed, Pending) | (Pending, Pending) => Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
use ReceiverStatus::*;
|
||||
|
||||
loop {
|
||||
let expiration_status = match self
|
||||
.in_flight_requests_mut()
|
||||
.poll_expired(cx)
|
||||
.map_err(ChannelError::Timer)?
|
||||
{
|
||||
let cancellation_status = match self.canceled_requests_pin_mut().poll_recv(cx) {
|
||||
Poll::Ready(Some(request_id)) => {
|
||||
if let Some(span) = self.in_flight_requests_mut().remove_request(request_id) {
|
||||
let _entered = span.enter();
|
||||
tracing::info!("ResponseCancelled");
|
||||
}
|
||||
Ready
|
||||
}
|
||||
// Pending cancellations don't block Channel closure, because all they do is ensure
|
||||
// the Channel's internal state is cleaned up. But Channel closure also cleans up
|
||||
// the Channel state, so there's no reason to wait on a cancellation before
|
||||
// closing.
|
||||
//
|
||||
// Ready(None) can't happen, since `self` holds a Cancellation.
|
||||
Poll::Pending | Poll::Ready(None) => Closed,
|
||||
};
|
||||
|
||||
let expiration_status = match self.in_flight_requests_mut().poll_expired(cx) {
|
||||
// No need to send a response, since the client wouldn't be waiting for one
|
||||
// anymore.
|
||||
Poll::Ready(Some(_)) => Ready,
|
||||
@@ -395,10 +450,13 @@ where
|
||||
expiration_status,
|
||||
request_status
|
||||
);
|
||||
match (expiration_status, request_status) {
|
||||
(Ready, _) | (_, Ready) => continue,
|
||||
(Closed, Closed) => return Poll::Ready(None),
|
||||
(Pending, Closed) | (Closed, Pending) | (Pending, Pending) => return Poll::Pending,
|
||||
match cancellation_status
|
||||
.combine(expiration_status)
|
||||
.combine(request_status)
|
||||
{
|
||||
Ready => continue,
|
||||
Closed => return Poll::Ready(None),
|
||||
Pending => return Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -420,9 +478,7 @@ where
|
||||
|
||||
fn start_send(mut self: Pin<&mut Self>, response: Response<Resp>) -> Result<(), Self::Error> {
|
||||
if let Some(span) = self
|
||||
.as_mut()
|
||||
.project()
|
||||
.in_flight_requests
|
||||
.in_flight_requests_mut()
|
||||
.remove_request(response.request_id)
|
||||
{
|
||||
let _entered = span.enter();
|
||||
@@ -499,6 +555,11 @@ impl<C> Requests<C>
|
||||
where
|
||||
C: Channel,
|
||||
{
|
||||
/// Returns a reference to the inner channel over which messages are sent and received.
|
||||
pub fn channel(&self) -> &C {
|
||||
&self.channel
|
||||
}
|
||||
|
||||
/// Returns the inner channel over which messages are sent and received.
|
||||
pub fn channel_pin_mut<'a>(self: &'a mut Pin<&mut Self>) -> Pin<&'a mut C> {
|
||||
self.as_mut().project().channel
|
||||
@@ -515,12 +576,22 @@ where
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<InFlightRequest<C::Req, C::Resp>, C::Error>>> {
|
||||
self.channel_pin_mut()
|
||||
.poll_next(cx)
|
||||
.map_ok(|request| InFlightRequest {
|
||||
request,
|
||||
response_tx: self.responses_tx.clone(),
|
||||
})
|
||||
self.channel_pin_mut().poll_next(cx).map_ok(
|
||||
|TrackedRequest {
|
||||
request,
|
||||
abort_registration,
|
||||
span,
|
||||
response_guard,
|
||||
}| {
|
||||
InFlightRequest {
|
||||
request,
|
||||
abort_registration,
|
||||
span,
|
||||
response_guard: ManuallyDrop::into_inner(response_guard),
|
||||
response_tx: self.responses_tx.clone(),
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
fn pump_write(
|
||||
@@ -597,17 +668,37 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// A fail-safe to ensure requests are properly canceled if request processing is aborted before
|
||||
/// completing.
|
||||
#[derive(Debug)]
|
||||
pub struct ResponseGuard {
|
||||
request_cancellation: RequestCancellation,
|
||||
request_id: u64,
|
||||
}
|
||||
|
||||
impl Drop for ResponseGuard {
|
||||
fn drop(&mut self) {
|
||||
self.request_cancellation.cancel(self.request_id);
|
||||
}
|
||||
}
|
||||
|
||||
/// A request produced by [Channel::requests].
|
||||
///
|
||||
/// If dropped without calling [`execute`](InFlightRequest::execute), a cancellation message will
|
||||
/// be sent to the Channel to clean up associated request state.
|
||||
#[derive(Debug)]
|
||||
pub struct InFlightRequest<Req, Res> {
|
||||
request: TrackedRequest<Req>,
|
||||
request: Request<Req>,
|
||||
abort_registration: AbortRegistration,
|
||||
response_guard: ResponseGuard,
|
||||
span: Span,
|
||||
response_tx: mpsc::Sender<Response<Res>>,
|
||||
}
|
||||
|
||||
impl<Req, Res> InFlightRequest<Req, Res> {
|
||||
/// Returns a reference to the request.
|
||||
pub fn get(&self) -> &Request<Req> {
|
||||
&self.request.request
|
||||
&self.request
|
||||
}
|
||||
|
||||
/// Returns a [future](Future) that executes the request using the given [service
|
||||
@@ -621,22 +712,23 @@ impl<Req, Res> InFlightRequest<Req, Res> {
|
||||
/// message](ClientMessage::Cancel) for this request.
|
||||
/// 2. The request [deadline](crate::context::Context::deadline) is reached.
|
||||
/// 3. The service function completes.
|
||||
///
|
||||
/// If the returned Future is dropped before completion, a cancellation message will be sent to
|
||||
/// the Channel to clean up associated request state.
|
||||
pub async fn execute<S>(self, serve: S)
|
||||
where
|
||||
S: Serve<Req, Resp = Res>,
|
||||
{
|
||||
let Self {
|
||||
response_tx,
|
||||
response_guard,
|
||||
abort_registration,
|
||||
span,
|
||||
request:
|
||||
TrackedRequest {
|
||||
abort_registration,
|
||||
span,
|
||||
request:
|
||||
Request {
|
||||
context,
|
||||
message,
|
||||
id: request_id,
|
||||
},
|
||||
Request {
|
||||
context,
|
||||
message,
|
||||
id: request_id,
|
||||
},
|
||||
} = self;
|
||||
let method = serve.method(&message);
|
||||
@@ -657,6 +749,10 @@ impl<Req, Res> InFlightRequest<Req, Res> {
|
||||
)
|
||||
.instrument(span)
|
||||
.await;
|
||||
// Request processing has completed, meaning either the channel canceled the request or
|
||||
// a request was sent back to the channel. Either way, the channel will clean up the
|
||||
// request data, so the request does not need to be canceled.
|
||||
mem::forget(response_guard);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -741,9 +837,10 @@ mod tests {
|
||||
channel::Channel<Response<Resp>, ClientMessage<Req>>,
|
||||
) {
|
||||
let (tx, rx) = crate::transport::channel::bounded(capacity);
|
||||
let mut config = Config::default();
|
||||
// Add 1 because capacity 0 is not supported (but is supported by transport::channel::bounded).
|
||||
config.pending_response_buffer = capacity + 1;
|
||||
let config = Config {
|
||||
pending_response_buffer: capacity + 1,
|
||||
};
|
||||
(Box::pin(BaseChannel::new(config, rx).requests()), tx)
|
||||
}
|
||||
|
||||
@@ -932,6 +1029,44 @@ mod tests {
|
||||
assert_eq!(channel.in_flight_requests(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn in_flight_request_drop_cancels_request() {
|
||||
let (mut requests, mut tx) = test_requests::<(), ()>();
|
||||
tx.send(fake_request(())).await.unwrap();
|
||||
|
||||
let request = match requests.as_mut().poll_next(&mut noop_context()) {
|
||||
Poll::Ready(Some(Ok(request))) => request,
|
||||
result => panic!("Unexpected result: {:?}", result),
|
||||
};
|
||||
drop(request);
|
||||
|
||||
let poll = requests
|
||||
.as_mut()
|
||||
.channel_pin_mut()
|
||||
.poll_next(&mut noop_context());
|
||||
assert!(poll.is_pending());
|
||||
let in_flight_requests = requests.channel().in_flight_requests();
|
||||
assert_eq!(in_flight_requests, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn in_flight_requests_successful_execute_doesnt_cancel_request() {
|
||||
let (mut requests, mut tx) = test_requests::<(), ()>();
|
||||
tx.send(fake_request(())).await.unwrap();
|
||||
|
||||
let request = match requests.as_mut().poll_next(&mut noop_context()) {
|
||||
Poll::Ready(Some(Ok(request))) => request,
|
||||
result => panic!("Unexpected result: {:?}", result),
|
||||
};
|
||||
request.execute(|_, _| async {}).await;
|
||||
assert!(requests
|
||||
.as_mut()
|
||||
.channel_pin_mut()
|
||||
.canceled_requests
|
||||
.poll_recv(&mut noop_context())
|
||||
.is_pending());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn requests_poll_next_response_returns_pending_when_buffer_full() {
|
||||
let (mut requests, _tx) = test_bounded_requests::<(), ()>(0);
|
||||
|
||||
@@ -94,16 +94,14 @@ impl InFlightRequests {
|
||||
}
|
||||
|
||||
/// Yields a request that has expired, aborting any ongoing processing of that request.
|
||||
pub fn poll_expired(
|
||||
&mut self,
|
||||
cx: &mut Context,
|
||||
) -> Poll<Option<Result<u64, tokio::time::error::Error>>> {
|
||||
pub fn poll_expired(&mut self, cx: &mut Context) -> Poll<Option<u64>> {
|
||||
if self.deadlines.is_empty() {
|
||||
// TODO(https://github.com/tokio-rs/tokio/issues/4161)
|
||||
// This is a workaround for DelayQueue not always treating this case correctly.
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
self.deadlines.poll_expired(cx).map_ok(|expired| {
|
||||
self.deadlines.poll_expired(cx).map(|expired| {
|
||||
let expired = expired?;
|
||||
if let Some(RequestData {
|
||||
abort_handle, span, ..
|
||||
}) = self.request_data.remove(expired.get_ref())
|
||||
@@ -113,7 +111,7 @@ impl InFlightRequests {
|
||||
abort_handle.abort();
|
||||
tracing::error!("DeadlineExceeded");
|
||||
}
|
||||
expired.into_inner()
|
||||
Some(expired.into_inner())
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -161,7 +159,7 @@ mod tests {
|
||||
|
||||
assert_matches!(
|
||||
in_flight_requests.poll_expired(&mut noop_context()),
|
||||
Poll::Ready(Some(Ok(_)))
|
||||
Poll::Ready(Some(_))
|
||||
);
|
||||
assert_matches!(
|
||||
abortable_future.poll_unpin(&mut noop_context()),
|
||||
@@ -178,7 +176,7 @@ mod tests {
|
||||
.unwrap();
|
||||
let mut abortable_future = Box::new(Abortable::new(pending::<()>(), abort_registration));
|
||||
|
||||
assert_eq!(in_flight_requests.cancel_request(0), true);
|
||||
assert!(in_flight_requests.cancel_request(0));
|
||||
assert_matches!(
|
||||
abortable_future.poll_unpin(&mut noop_context()),
|
||||
Poll::Ready(Err(_))
|
||||
|
||||
@@ -282,7 +282,7 @@ where
|
||||
fn ctx() -> Context<'static> {
|
||||
use futures::task::*;
|
||||
|
||||
Context::from_waker(&noop_waker_ref())
|
||||
Context::from_waker(noop_waker_ref())
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -5,13 +5,14 @@
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
use crate::{
|
||||
cancellations::{cancellations, CanceledRequests, RequestCancellation},
|
||||
context,
|
||||
server::{Channel, Config, TrackedRequest},
|
||||
server::{Channel, Config, ResponseGuard, TrackedRequest},
|
||||
Request, Response,
|
||||
};
|
||||
use futures::{task::*, Sink, Stream};
|
||||
use pin_project::pin_project;
|
||||
use std::{collections::VecDeque, io, pin::Pin, time::SystemTime};
|
||||
use std::{collections::VecDeque, io, mem::ManuallyDrop, pin::Pin, time::SystemTime};
|
||||
use tracing::Span;
|
||||
|
||||
#[pin_project]
|
||||
@@ -22,6 +23,8 @@ pub(crate) struct FakeChannel<In, Out> {
|
||||
pub sink: VecDeque<Out>,
|
||||
pub config: Config,
|
||||
pub in_flight_requests: super::in_flight_requests::InFlightRequests,
|
||||
pub request_cancellation: RequestCancellation,
|
||||
pub canceled_requests: CanceledRequests,
|
||||
}
|
||||
|
||||
impl<In, Out> Stream for FakeChannel<In, Out>
|
||||
@@ -86,6 +89,7 @@ where
|
||||
impl<Req, Resp> FakeChannel<io::Result<TrackedRequest<Req>>, Response<Resp>> {
|
||||
pub fn push_req(&mut self, id: u64, message: Req) {
|
||||
let (_, abort_registration) = futures::future::AbortHandle::new_pair();
|
||||
let (request_cancellation, _) = cancellations();
|
||||
self.stream.push_back(Ok(TrackedRequest {
|
||||
request: Request {
|
||||
context: context::Context {
|
||||
@@ -97,17 +101,24 @@ impl<Req, Resp> FakeChannel<io::Result<TrackedRequest<Req>>, Response<Resp>> {
|
||||
},
|
||||
abort_registration,
|
||||
span: Span::none(),
|
||||
response_guard: ManuallyDrop::new(ResponseGuard {
|
||||
request_cancellation,
|
||||
request_id: id,
|
||||
}),
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
impl FakeChannel<(), ()> {
|
||||
pub fn default<Req, Resp>() -> FakeChannel<io::Result<TrackedRequest<Req>>, Response<Resp>> {
|
||||
let (request_cancellation, canceled_requests) = cancellations();
|
||||
FakeChannel {
|
||||
stream: Default::default(),
|
||||
sink: Default::default(),
|
||||
config: Default::default(),
|
||||
in_flight_requests: Default::default(),
|
||||
request_cancellation,
|
||||
canceled_requests,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -123,5 +134,5 @@ impl<T> PollExt for Poll<Option<T>> {
|
||||
}
|
||||
|
||||
pub fn cx() -> Context<'static> {
|
||||
Context::from_waker(&noop_waker_ref())
|
||||
Context::from_waker(noop_waker_ref())
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::pin::Pin;
|
||||
/// A future that drives the server by [spawning](tokio::spawn) a [`TokioChannelExecutor`](TokioChannelExecutor)
|
||||
/// for each new channel. Returned by
|
||||
/// [`Incoming::execute`](crate::server::incoming::Incoming::execute).
|
||||
#[must_use]
|
||||
#[pin_project]
|
||||
#[derive(Debug)]
|
||||
pub struct TokioServerExecutor<T, S> {
|
||||
@@ -23,6 +24,7 @@ impl<T, S> TokioServerExecutor<T, S> {
|
||||
/// A future that drives the server by [spawning](tokio::spawn) each [response
|
||||
/// handler](super::InFlightRequest::execute) on tokio's default executor. Returned by
|
||||
/// [`Channel::execute`](crate::server::Channel::execute).
|
||||
#[must_use]
|
||||
#[pin_project]
|
||||
#[derive(Debug)]
|
||||
pub struct TokioChannelExecutor<T, S> {
|
||||
|
||||
@@ -2,4 +2,8 @@
|
||||
fn ui() {
|
||||
let t = trybuild::TestCases::new();
|
||||
t.compile_fail("tests/compile_fail/*.rs");
|
||||
#[cfg(feature = "tokio1")]
|
||||
t.compile_fail("tests/compile_fail/tokio/*.rs");
|
||||
#[cfg(all(feature = "serde-transport", feature = "tcp"))]
|
||||
t.compile_fail("tests/compile_fail/serde_transport/*.rs");
|
||||
}
|
||||
|
||||
15
tarpc/tests/compile_fail/must_use_request_dispatch.rs
Normal file
15
tarpc/tests/compile_fail/must_use_request_dispatch.rs
Normal file
@@ -0,0 +1,15 @@
|
||||
use tarpc::client;
|
||||
|
||||
#[tarpc::service]
|
||||
trait World {
|
||||
async fn hello(name: String) -> String;
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let (client_transport, _) = tarpc::transport::channel::unbounded();
|
||||
|
||||
#[deny(unused_must_use)]
|
||||
{
|
||||
WorldClient::new(client::Config::default(), client_transport).dispatch;
|
||||
}
|
||||
}
|
||||
11
tarpc/tests/compile_fail/must_use_request_dispatch.stderr
Normal file
11
tarpc/tests/compile_fail/must_use_request_dispatch.stderr
Normal file
@@ -0,0 +1,11 @@
|
||||
error: unused `RequestDispatch` that must be used
|
||||
--> tests/compile_fail/must_use_request_dispatch.rs:13:9
|
||||
|
|
||||
13 | WorldClient::new(client::Config::default(), client_transport).dispatch;
|
||||
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
|
||||
note: the lint level is defined here
|
||||
--> tests/compile_fail/must_use_request_dispatch.rs:11:12
|
||||
|
|
||||
11 | #[deny(unused_must_use)]
|
||||
| ^^^^^^^^^^^^^^^
|
||||
@@ -0,0 +1,9 @@
|
||||
use tarpc::serde_transport;
|
||||
use tokio_serde::formats::Json;
|
||||
|
||||
fn main() {
|
||||
#[deny(unused_must_use)]
|
||||
{
|
||||
serde_transport::tcp::connect::<_, (), (), _, _>("0.0.0.0:0", Json::default);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
error: unused `tarpc::serde_transport::tcp::Connect` that must be used
|
||||
--> tests/compile_fail/serde_transport/must_use_tcp_connect.rs:7:9
|
||||
|
|
||||
7 | serde_transport::tcp::connect::<_, (), (), _, _>("0.0.0.0:0", Json::default);
|
||||
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
|
||||
note: the lint level is defined here
|
||||
--> tests/compile_fail/serde_transport/must_use_tcp_connect.rs:5:12
|
||||
|
|
||||
5 | #[deny(unused_must_use)]
|
||||
| ^^^^^^^^^^^^^^^
|
||||
29
tarpc/tests/compile_fail/tokio/must_use_channel_executor.rs
Normal file
29
tarpc/tests/compile_fail/tokio/must_use_channel_executor.rs
Normal file
@@ -0,0 +1,29 @@
|
||||
use tarpc::{
|
||||
context,
|
||||
server::{self, Channel},
|
||||
};
|
||||
|
||||
#[tarpc::service]
|
||||
trait World {
|
||||
async fn hello(name: String) -> String;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct HelloServer;
|
||||
|
||||
#[tarpc::server]
|
||||
impl World for HelloServer {
|
||||
async fn hello(self, _: context::Context, name: String) -> String {
|
||||
format!("Hello, {name}!")
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let (_, server_transport) = tarpc::transport::channel::unbounded();
|
||||
let server = server::BaseChannel::with_defaults(server_transport);
|
||||
|
||||
#[deny(unused_must_use)]
|
||||
{
|
||||
server.execute(HelloServer.serve());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
error: unused `TokioChannelExecutor` that must be used
|
||||
--> tests/compile_fail/tokio/must_use_channel_executor.rs:27:9
|
||||
|
|
||||
27 | server.execute(HelloServer.serve());
|
||||
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
|
||||
note: the lint level is defined here
|
||||
--> tests/compile_fail/tokio/must_use_channel_executor.rs:25:12
|
||||
|
|
||||
25 | #[deny(unused_must_use)]
|
||||
| ^^^^^^^^^^^^^^^
|
||||
30
tarpc/tests/compile_fail/tokio/must_use_server_executor.rs
Normal file
30
tarpc/tests/compile_fail/tokio/must_use_server_executor.rs
Normal file
@@ -0,0 +1,30 @@
|
||||
use futures::stream::once;
|
||||
use tarpc::{
|
||||
context,
|
||||
server::{self, incoming::Incoming},
|
||||
};
|
||||
|
||||
#[tarpc::service]
|
||||
trait World {
|
||||
async fn hello(name: String) -> String;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct HelloServer;
|
||||
|
||||
#[tarpc::server]
|
||||
impl World for HelloServer {
|
||||
async fn hello(self, _: context::Context, name: String) -> String {
|
||||
format!("Hello, {name}!")
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let (_, server_transport) = tarpc::transport::channel::unbounded();
|
||||
let server = once(async move { server::BaseChannel::with_defaults(server_transport) });
|
||||
|
||||
#[deny(unused_must_use)]
|
||||
{
|
||||
server.execute(HelloServer.serve());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
error: unused `TokioServerExecutor` that must be used
|
||||
--> tests/compile_fail/tokio/must_use_server_executor.rs:28:9
|
||||
|
|
||||
28 | server.execute(HelloServer.serve());
|
||||
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
|
||||
note: the lint level is defined here
|
||||
--> tests/compile_fail/tokio/must_use_server_executor.rs:26:12
|
||||
|
|
||||
26 | #[deny(unused_must_use)]
|
||||
| ^^^^^^^^^^^^^^^
|
||||
@@ -7,7 +7,7 @@ use tarpc::{
|
||||
use tokio_serde::formats::Json;
|
||||
|
||||
#[tarpc::derive_serde]
|
||||
#[derive(Debug, PartialEq)]
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub enum TestData {
|
||||
Black,
|
||||
White,
|
||||
|
||||
@@ -108,7 +108,7 @@ async fn dropped_channel_aborts_in_flight_requests() -> anyhow::Result<()> {
|
||||
|
||||
#[cfg(all(feature = "serde-transport", feature = "tcp"))]
|
||||
#[tokio::test]
|
||||
async fn serde() -> anyhow::Result<()> {
|
||||
async fn serde_tcp() -> anyhow::Result<()> {
|
||||
use tarpc::serde_transport;
|
||||
use tokio_serde::formats::Json;
|
||||
|
||||
@@ -136,6 +136,37 @@ async fn serde() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "serde-transport", feature = "unix", unix))]
|
||||
#[tokio::test]
|
||||
async fn serde_uds() -> anyhow::Result<()> {
|
||||
use tarpc::serde_transport;
|
||||
use tokio_serde::formats::Json;
|
||||
|
||||
let _ = tracing_subscriber::fmt::try_init();
|
||||
|
||||
let sock = tarpc::serde_transport::unix::TempPathBuf::with_random("uds");
|
||||
let transport = tarpc::serde_transport::unix::listen(&sock, Json::default).await?;
|
||||
tokio::spawn(
|
||||
transport
|
||||
.take(1)
|
||||
.filter_map(|r| async { r.ok() })
|
||||
.map(BaseChannel::with_defaults)
|
||||
.execute(Server.serve()),
|
||||
);
|
||||
|
||||
let transport = serde_transport::unix::connect(&sock, Json::default).await?;
|
||||
let client = ServiceClient::new(client::Config::default(), transport).spawn();
|
||||
|
||||
// Save results using socket so we can clean the socket even if our test assertions fail
|
||||
let res1 = client.add(context::current(), 1, 2).await;
|
||||
let res2 = client.hey(context::current(), "Tim".to_string()).await;
|
||||
|
||||
assert_matches!(res1, Ok(3));
|
||||
assert_matches!(res2, Ok(ref s) if s == "Hey, Tim.");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn concurrent() -> anyhow::Result<()> {
|
||||
let _ = tracing_subscriber::fmt::try_init();
|
||||
|
||||
Reference in New Issue
Block a user