mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-02-23 15:49:54 +01:00
Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0c08d5e8ca | ||
|
|
75b15fe2aa | ||
|
|
863a08d87e | ||
|
|
49ba8f8b1b | ||
|
|
d832209da3 | ||
|
|
584426d414 | ||
|
|
50eb80c883 | ||
|
|
1f0c80d8c9 |
16
.github/workflows/main.yml
vendored
16
.github/workflows/main.yml
vendored
@@ -14,10 +14,10 @@ jobs:
|
|||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- name: Cancel previous
|
- name: Cancel previous
|
||||||
uses: styfle/cancel-workflow-action@0.7.0
|
uses: styfle/cancel-workflow-action@0.10.0
|
||||||
with:
|
with:
|
||||||
access_token: ${{ github.token }}
|
access_token: ${{ github.token }}
|
||||||
- uses: actions/checkout@v1
|
- uses: actions/checkout@v3
|
||||||
- uses: actions-rs/toolchain@v1
|
- uses: actions-rs/toolchain@v1
|
||||||
with:
|
with:
|
||||||
profile: minimal
|
profile: minimal
|
||||||
@@ -38,10 +38,10 @@ jobs:
|
|||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- name: Cancel previous
|
- name: Cancel previous
|
||||||
uses: styfle/cancel-workflow-action@0.7.0
|
uses: styfle/cancel-workflow-action@0.10.0
|
||||||
with:
|
with:
|
||||||
access_token: ${{ github.token }}
|
access_token: ${{ github.token }}
|
||||||
- uses: actions/checkout@v1
|
- uses: actions/checkout@v3
|
||||||
- uses: actions-rs/toolchain@v1
|
- uses: actions-rs/toolchain@v1
|
||||||
with:
|
with:
|
||||||
profile: minimal
|
profile: minimal
|
||||||
@@ -76,10 +76,10 @@ jobs:
|
|||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- name: Cancel previous
|
- name: Cancel previous
|
||||||
uses: styfle/cancel-workflow-action@0.7.0
|
uses: styfle/cancel-workflow-action@0.10.0
|
||||||
with:
|
with:
|
||||||
access_token: ${{ github.token }}
|
access_token: ${{ github.token }}
|
||||||
- uses: actions/checkout@v1
|
- uses: actions/checkout@v3
|
||||||
- uses: actions-rs/toolchain@v1
|
- uses: actions-rs/toolchain@v1
|
||||||
with:
|
with:
|
||||||
profile: minimal
|
profile: minimal
|
||||||
@@ -96,10 +96,10 @@ jobs:
|
|||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- name: Cancel previous
|
- name: Cancel previous
|
||||||
uses: styfle/cancel-workflow-action@0.7.0
|
uses: styfle/cancel-workflow-action@0.10.0
|
||||||
with:
|
with:
|
||||||
access_token: ${{ github.token }}
|
access_token: ${{ github.token }}
|
||||||
- uses: actions/checkout@v1
|
- uses: actions/checkout@v3
|
||||||
- uses: actions-rs/toolchain@v1
|
- uses: actions-rs/toolchain@v1
|
||||||
with:
|
with:
|
||||||
profile: minimal
|
profile: minimal
|
||||||
|
|||||||
@@ -67,7 +67,7 @@ Some other features of tarpc:
|
|||||||
Add to your `Cargo.toml` dependencies:
|
Add to your `Cargo.toml` dependencies:
|
||||||
|
|
||||||
```toml
|
```toml
|
||||||
tarpc = "0.29"
|
tarpc = "0.31"
|
||||||
```
|
```
|
||||||
|
|
||||||
The `tarpc::service` attribute expands to a collection of items that form an rpc service.
|
The `tarpc::service` attribute expands to a collection of items that form an rpc service.
|
||||||
@@ -82,7 +82,7 @@ your `Cargo.toml`:
|
|||||||
```toml
|
```toml
|
||||||
anyhow = "1.0"
|
anyhow = "1.0"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
tarpc = { version = "0.29", features = ["tokio1"] }
|
tarpc = { version = "0.31", features = ["tokio1"] }
|
||||||
tokio = { version = "1.0", features = ["macros"] }
|
tokio = { version = "1.0", features = ["macros"] }
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,10 @@
|
|||||||
|
## 0.31.0 (2022-11-03)
|
||||||
|
|
||||||
|
### New Features
|
||||||
|
|
||||||
|
This release adds Unix Domain Sockets to the `serde_transport` module.
|
||||||
|
To use it, enable the "unix" feature. See the docs for more information.
|
||||||
|
|
||||||
## 0.30.0 (2022-08-12)
|
## 0.30.0 (2022-08-12)
|
||||||
|
|
||||||
### Breaking Changes
|
### Breaking Changes
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "tarpc-example-service"
|
name = "tarpc-example-service"
|
||||||
version = "0.12.0"
|
version = "0.13.0"
|
||||||
rust-version = "1.56"
|
rust-version = "1.56"
|
||||||
authors = ["Tim Kuehn <tikue@google.com>"]
|
authors = ["Tim Kuehn <tikue@google.com>"]
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
@@ -21,7 +21,7 @@ futures = "0.3"
|
|||||||
opentelemetry = { version = "0.17", features = ["rt-tokio"] }
|
opentelemetry = { version = "0.17", features = ["rt-tokio"] }
|
||||||
opentelemetry-jaeger = { version = "0.16", features = ["rt-tokio"] }
|
opentelemetry-jaeger = { version = "0.16", features = ["rt-tokio"] }
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
tarpc = { version = "0.30", path = "../tarpc", features = ["full"] }
|
tarpc = { version = "0.31", path = "../tarpc", features = ["full"] }
|
||||||
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread"] }
|
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread"] }
|
||||||
tracing = { version = "0.1" }
|
tracing = { version = "0.1" }
|
||||||
tracing-opentelemetry = "0.17"
|
tracing-opentelemetry = "0.17"
|
||||||
|
|||||||
@@ -54,6 +54,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
// JSON transport is provided by the json_transport tarpc module. It makes it easy
|
// JSON transport is provided by the json_transport tarpc module. It makes it easy
|
||||||
// to start up a serde-powered json serialization strategy over TCP.
|
// to start up a serde-powered json serialization strategy over TCP.
|
||||||
let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Json::default).await?;
|
let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Json::default).await?;
|
||||||
|
tracing::info!("Listening on port {}", listener.local_addr().port());
|
||||||
listener.config_mut().max_frame_length(usize::MAX);
|
listener.config_mut().max_frame_length(usize::MAX);
|
||||||
listener
|
listener
|
||||||
// Ignore accept errors.
|
// Ignore accept errors.
|
||||||
|
|||||||
@@ -285,7 +285,7 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
|
|||||||
args,
|
args,
|
||||||
method_attrs: &rpcs.iter().map(|rpc| &*rpc.attrs).collect::<Vec<_>>(),
|
method_attrs: &rpcs.iter().map(|rpc| &*rpc.attrs).collect::<Vec<_>>(),
|
||||||
method_idents: &methods,
|
method_idents: &methods,
|
||||||
request_names: &*request_names,
|
request_names: &request_names,
|
||||||
attrs,
|
attrs,
|
||||||
rpcs,
|
rpcs,
|
||||||
return_types: &rpcs
|
return_types: &rpcs
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "tarpc"
|
name = "tarpc"
|
||||||
version = "0.30.0"
|
version = "0.31.0"
|
||||||
rust-version = "1.58.0"
|
rust-version = "1.58.0"
|
||||||
authors = [
|
authors = [
|
||||||
"Adam Wright <adam.austin.wright@gmail.com>",
|
"Adam Wright <adam.austin.wright@gmail.com>",
|
||||||
@@ -13,7 +13,7 @@ homepage = "https://github.com/google/tarpc"
|
|||||||
repository = "https://github.com/google/tarpc"
|
repository = "https://github.com/google/tarpc"
|
||||||
keywords = ["rpc", "network", "server", "api", "microservices"]
|
keywords = ["rpc", "network", "server", "api", "microservices"]
|
||||||
categories = ["asynchronous", "network-programming"]
|
categories = ["asynchronous", "network-programming"]
|
||||||
readme = "../README.md"
|
readme = "README.md"
|
||||||
description = "An RPC framework for Rust with a focus on ease of use."
|
description = "An RPC framework for Rust with a focus on ease of use."
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
@@ -25,6 +25,7 @@ serde-transport = ["serde1", "tokio1", "tokio-serde", "tokio-util/codec"]
|
|||||||
serde-transport-json = ["tokio-serde/json"]
|
serde-transport-json = ["tokio-serde/json"]
|
||||||
serde-transport-bincode = ["tokio-serde/bincode"]
|
serde-transport-bincode = ["tokio-serde/bincode"]
|
||||||
tcp = ["tokio/net"]
|
tcp = ["tokio/net"]
|
||||||
|
unix = ["tokio/net"]
|
||||||
|
|
||||||
full = [
|
full = [
|
||||||
"serde1",
|
"serde1",
|
||||||
@@ -33,6 +34,7 @@ full = [
|
|||||||
"serde-transport-json",
|
"serde-transport-json",
|
||||||
"serde-transport-bincode",
|
"serde-transport-bincode",
|
||||||
"tcp",
|
"tcp",
|
||||||
|
"unix",
|
||||||
]
|
]
|
||||||
|
|
||||||
[badges]
|
[badges]
|
||||||
|
|||||||
@@ -628,7 +628,7 @@ mod tests {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn response_completes_request_future() {
|
async fn response_completes_request_future() {
|
||||||
let (mut dispatch, mut _channel, mut server_channel) = set_up();
|
let (mut dispatch, mut _channel, mut server_channel) = set_up();
|
||||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
let cx = &mut Context::from_waker(noop_waker_ref());
|
||||||
let (tx, mut rx) = oneshot::channel();
|
let (tx, mut rx) = oneshot::channel();
|
||||||
|
|
||||||
dispatch
|
dispatch
|
||||||
@@ -656,7 +656,7 @@ mod tests {
|
|||||||
request_id: 3,
|
request_id: 3,
|
||||||
});
|
});
|
||||||
// resp's drop() is run, which should send a cancel message.
|
// resp's drop() is run, which should send a cancel message.
|
||||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
let cx = &mut Context::from_waker(noop_waker_ref());
|
||||||
assert_eq!(canceled_requests.poll_recv(cx), Poll::Ready(Some(3)));
|
assert_eq!(canceled_requests.poll_recv(cx), Poll::Ready(Some(3)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -679,14 +679,14 @@ mod tests {
|
|||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
drop(cancellation);
|
drop(cancellation);
|
||||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
let cx = &mut Context::from_waker(noop_waker_ref());
|
||||||
assert_eq!(canceled_requests.poll_recv(cx), Poll::Ready(None));
|
assert_eq!(canceled_requests.poll_recv(cx), Poll::Ready(None));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn stage_request() {
|
async fn stage_request() {
|
||||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
let cx = &mut Context::from_waker(noop_waker_ref());
|
||||||
let (tx, mut rx) = oneshot::channel();
|
let (tx, mut rx) = oneshot::channel();
|
||||||
|
|
||||||
let _resp = send_request(&mut channel, "hi", tx, &mut rx).await;
|
let _resp = send_request(&mut channel, "hi", tx, &mut rx).await;
|
||||||
@@ -704,7 +704,7 @@ mod tests {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn stage_request_channel_dropped_doesnt_panic() {
|
async fn stage_request_channel_dropped_doesnt_panic() {
|
||||||
let (mut dispatch, mut channel, mut server_channel) = set_up();
|
let (mut dispatch, mut channel, mut server_channel) = set_up();
|
||||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
let cx = &mut Context::from_waker(noop_waker_ref());
|
||||||
let (tx, mut rx) = oneshot::channel();
|
let (tx, mut rx) = oneshot::channel();
|
||||||
|
|
||||||
let _ = send_request(&mut channel, "hi", tx, &mut rx).await;
|
let _ = send_request(&mut channel, "hi", tx, &mut rx).await;
|
||||||
@@ -726,7 +726,7 @@ mod tests {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn stage_request_response_future_dropped_is_canceled_before_sending() {
|
async fn stage_request_response_future_dropped_is_canceled_before_sending() {
|
||||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
let cx = &mut Context::from_waker(noop_waker_ref());
|
||||||
let (tx, mut rx) = oneshot::channel();
|
let (tx, mut rx) = oneshot::channel();
|
||||||
|
|
||||||
let _ = send_request(&mut channel, "hi", tx, &mut rx).await;
|
let _ = send_request(&mut channel, "hi", tx, &mut rx).await;
|
||||||
@@ -742,7 +742,7 @@ mod tests {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn stage_request_response_future_dropped_is_canceled_after_sending() {
|
async fn stage_request_response_future_dropped_is_canceled_after_sending() {
|
||||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
let cx = &mut Context::from_waker(noop_waker_ref());
|
||||||
let (tx, mut rx) = oneshot::channel();
|
let (tx, mut rx) = oneshot::channel();
|
||||||
|
|
||||||
let req = send_request(&mut channel, "hi", tx, &mut rx).await;
|
let req = send_request(&mut channel, "hi", tx, &mut rx).await;
|
||||||
@@ -763,7 +763,7 @@ mod tests {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn stage_request_response_closed_skipped() {
|
async fn stage_request_response_closed_skipped() {
|
||||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
let cx = &mut Context::from_waker(noop_waker_ref());
|
||||||
let (tx, mut rx) = oneshot::channel();
|
let (tx, mut rx) = oneshot::channel();
|
||||||
|
|
||||||
// Test that a request future that's closed its receiver but not yet canceled its request --
|
// Test that a request future that's closed its receiver but not yet canceled its request --
|
||||||
@@ -796,7 +796,7 @@ mod tests {
|
|||||||
|
|
||||||
let dispatch = RequestDispatch::<String, String, _> {
|
let dispatch = RequestDispatch::<String, String, _> {
|
||||||
transport: client_channel.fuse(),
|
transport: client_channel.fuse(),
|
||||||
pending_requests: pending_requests,
|
pending_requests,
|
||||||
canceled_requests,
|
canceled_requests,
|
||||||
in_flight_requests: InFlightRequests::default(),
|
in_flight_requests: InFlightRequests::default(),
|
||||||
config: Config::default(),
|
config: Config::default(),
|
||||||
|
|||||||
@@ -277,6 +277,270 @@ pub mod tcp {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(all(unix, feature = "unix"))]
|
||||||
|
#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "unix"))))]
|
||||||
|
/// Unix Domain Socket support for generic transport using Tokio.
|
||||||
|
pub mod unix {
|
||||||
|
use {
|
||||||
|
super::*,
|
||||||
|
futures::ready,
|
||||||
|
std::{marker::PhantomData, path::Path},
|
||||||
|
tokio::net::{unix::SocketAddr, UnixListener, UnixStream},
|
||||||
|
tokio_util::codec::length_delimited,
|
||||||
|
};
|
||||||
|
|
||||||
|
impl<Item, SinkItem, Codec> Transport<UnixStream, Item, SinkItem, Codec> {
|
||||||
|
/// Returns the socket address of the remote half of the underlying [`UnixStream`].
|
||||||
|
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
|
||||||
|
self.inner.get_ref().get_ref().peer_addr()
|
||||||
|
}
|
||||||
|
/// Returns the socket address of the local half of the underlying [`UnixStream`].
|
||||||
|
pub fn local_addr(&self) -> io::Result<SocketAddr> {
|
||||||
|
self.inner.get_ref().get_ref().local_addr()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A connection Future that also exposes the length-delimited framing config.
|
||||||
|
#[must_use]
|
||||||
|
#[pin_project]
|
||||||
|
pub struct Connect<T, Item, SinkItem, CodecFn> {
|
||||||
|
#[pin]
|
||||||
|
inner: T,
|
||||||
|
codec_fn: CodecFn,
|
||||||
|
config: length_delimited::Builder,
|
||||||
|
ghost: PhantomData<(fn(SinkItem), fn() -> Item)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, Item, SinkItem, Codec, CodecFn> Future for Connect<T, Item, SinkItem, CodecFn>
|
||||||
|
where
|
||||||
|
T: Future<Output = io::Result<UnixStream>>,
|
||||||
|
Item: for<'de> Deserialize<'de>,
|
||||||
|
SinkItem: Serialize,
|
||||||
|
Codec: Serializer<SinkItem> + Deserializer<Item>,
|
||||||
|
CodecFn: Fn() -> Codec,
|
||||||
|
{
|
||||||
|
type Output = io::Result<Transport<UnixStream, Item, SinkItem, Codec>>;
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||||
|
let io = ready!(self.as_mut().project().inner.poll(cx))?;
|
||||||
|
Poll::Ready(Ok(new(self.config.new_framed(io), (self.codec_fn)())))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, Item, SinkItem, CodecFn> Connect<T, Item, SinkItem, CodecFn> {
|
||||||
|
/// Returns an immutable reference to the length-delimited codec's config.
|
||||||
|
pub fn config(&self) -> &length_delimited::Builder {
|
||||||
|
&self.config
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a mutable reference to the length-delimited codec's config.
|
||||||
|
pub fn config_mut(&mut self) -> &mut length_delimited::Builder {
|
||||||
|
&mut self.config
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Connects to socket named by `path`, wrapping the connection in a Unix Domain Socket
|
||||||
|
/// transport.
|
||||||
|
pub fn connect<P, Item, SinkItem, Codec, CodecFn>(
|
||||||
|
path: P,
|
||||||
|
codec_fn: CodecFn,
|
||||||
|
) -> Connect<impl Future<Output = io::Result<UnixStream>>, Item, SinkItem, CodecFn>
|
||||||
|
where
|
||||||
|
P: AsRef<Path>,
|
||||||
|
Item: for<'de> Deserialize<'de>,
|
||||||
|
SinkItem: Serialize,
|
||||||
|
Codec: Serializer<SinkItem> + Deserializer<Item>,
|
||||||
|
CodecFn: Fn() -> Codec,
|
||||||
|
{
|
||||||
|
Connect {
|
||||||
|
inner: UnixStream::connect(path),
|
||||||
|
codec_fn,
|
||||||
|
config: LengthDelimitedCodec::builder(),
|
||||||
|
ghost: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Listens on the socket named by `path`, wrapping accepted connections in Unix Domain Socket
|
||||||
|
/// transports.
|
||||||
|
pub async fn listen<P, Item, SinkItem, Codec, CodecFn>(
|
||||||
|
path: P,
|
||||||
|
codec_fn: CodecFn,
|
||||||
|
) -> io::Result<Incoming<Item, SinkItem, Codec, CodecFn>>
|
||||||
|
where
|
||||||
|
P: AsRef<Path>,
|
||||||
|
Item: for<'de> Deserialize<'de>,
|
||||||
|
Codec: Serializer<SinkItem> + Deserializer<Item>,
|
||||||
|
CodecFn: Fn() -> Codec,
|
||||||
|
{
|
||||||
|
let listener = UnixListener::bind(path)?;
|
||||||
|
let local_addr = listener.local_addr()?;
|
||||||
|
Ok(Incoming {
|
||||||
|
listener,
|
||||||
|
codec_fn,
|
||||||
|
local_addr,
|
||||||
|
config: LengthDelimitedCodec::builder(),
|
||||||
|
ghost: PhantomData,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A [`UnixListener`] that wraps connections in [transports](Transport).
|
||||||
|
#[pin_project]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Incoming<Item, SinkItem, Codec, CodecFn> {
|
||||||
|
listener: UnixListener,
|
||||||
|
local_addr: SocketAddr,
|
||||||
|
codec_fn: CodecFn,
|
||||||
|
config: length_delimited::Builder,
|
||||||
|
ghost: PhantomData<(fn() -> Item, fn(SinkItem), Codec)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Item, SinkItem, Codec, CodecFn> Incoming<Item, SinkItem, Codec, CodecFn> {
|
||||||
|
/// Returns the the socket address being listened on.
|
||||||
|
pub fn local_addr(&self) -> &SocketAddr {
|
||||||
|
&self.local_addr
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns an immutable reference to the length-delimited codec's config.
|
||||||
|
pub fn config(&self) -> &length_delimited::Builder {
|
||||||
|
&self.config
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a mutable reference to the length-delimited codec's config.
|
||||||
|
pub fn config_mut(&mut self) -> &mut length_delimited::Builder {
|
||||||
|
&mut self.config
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Item, SinkItem, Codec, CodecFn> Stream for Incoming<Item, SinkItem, Codec, CodecFn>
|
||||||
|
where
|
||||||
|
Item: for<'de> Deserialize<'de>,
|
||||||
|
SinkItem: Serialize,
|
||||||
|
Codec: Serializer<SinkItem> + Deserializer<Item>,
|
||||||
|
CodecFn: Fn() -> Codec,
|
||||||
|
{
|
||||||
|
type Item = io::Result<Transport<UnixStream, Item, SinkItem, Codec>>;
|
||||||
|
|
||||||
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
let conn: UnixStream = ready!(self.as_mut().project().listener.poll_accept(cx)?).0;
|
||||||
|
Poll::Ready(Some(Ok(new(
|
||||||
|
self.config.new_framed(conn),
|
||||||
|
(self.codec_fn)(),
|
||||||
|
))))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A temporary `PathBuf` that lives in `std::env::temp_dir` and is removed on drop.
|
||||||
|
pub struct TempPathBuf(std::path::PathBuf);
|
||||||
|
|
||||||
|
impl TempPathBuf {
|
||||||
|
/// A named socket that results in `<tempdir>/<name>`
|
||||||
|
pub fn new<S: AsRef<str>>(name: S) -> Self {
|
||||||
|
let mut sock = std::env::temp_dir();
|
||||||
|
sock.push(name.as_ref());
|
||||||
|
Self(sock)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Appends a random hex string to the socket name resulting in
|
||||||
|
/// `<tempdir>/<name>_<xxxxx>`
|
||||||
|
pub fn with_random<S: AsRef<str>>(name: S) -> Self {
|
||||||
|
Self::new(format!("{}_{:016x}", name.as_ref(), rand::random::<u64>()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsRef<std::path::Path> for TempPathBuf {
|
||||||
|
fn as_ref(&self) -> &std::path::Path {
|
||||||
|
self.0.as_path()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for TempPathBuf {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
// This will remove the file pointed to by this PathBuf if it exists, however Err's can
|
||||||
|
// be returned such as attempting to remove a non-existing file, or one which we don't
|
||||||
|
// have permission to remove. In these cases the Err is swallowed
|
||||||
|
let _ = std::fs::remove_file(&self.0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use tokio_serde::formats::SymmetricalJson;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn temp_path_buf_non_random() {
|
||||||
|
let sock = TempPathBuf::new("test");
|
||||||
|
let mut good = std::env::temp_dir();
|
||||||
|
good.push("test");
|
||||||
|
assert_eq!(sock.as_ref(), good);
|
||||||
|
assert_eq!(sock.as_ref().file_name().unwrap(), "test");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn temp_path_buf_random() {
|
||||||
|
let sock = TempPathBuf::with_random("test");
|
||||||
|
let good = std::env::temp_dir();
|
||||||
|
assert!(sock.as_ref().starts_with(good));
|
||||||
|
// Since there are 16 random characters we just assert the file_name has the right name
|
||||||
|
// and starts with the correct string 'test_'
|
||||||
|
// file name: test_xxxxxxxxxxxxxxxx
|
||||||
|
// test = 4
|
||||||
|
// _ = 1
|
||||||
|
// <hex> = 16
|
||||||
|
// total = 21
|
||||||
|
let fname = sock.as_ref().file_name().unwrap().to_string_lossy();
|
||||||
|
assert!(fname.starts_with("test_"));
|
||||||
|
assert_eq!(fname.len(), 21);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn temp_path_buf_non_existing() {
|
||||||
|
let sock = TempPathBuf::with_random("test");
|
||||||
|
let sock_path = std::path::PathBuf::from(sock.as_ref());
|
||||||
|
|
||||||
|
// No actual file has been created yet
|
||||||
|
assert!(!sock_path.exists());
|
||||||
|
// Should not panic
|
||||||
|
std::mem::drop(sock);
|
||||||
|
assert!(!sock_path.exists());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn temp_path_buf_existing_file() {
|
||||||
|
let sock = TempPathBuf::with_random("test");
|
||||||
|
let sock_path = std::path::PathBuf::from(sock.as_ref());
|
||||||
|
let _file = std::fs::File::create(&sock).unwrap();
|
||||||
|
assert!(sock_path.exists());
|
||||||
|
std::mem::drop(sock);
|
||||||
|
assert!(!sock_path.exists());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn temp_path_buf_preexisting_file() {
|
||||||
|
let mut pre_existing = std::env::temp_dir();
|
||||||
|
pre_existing.push("test");
|
||||||
|
let _file = std::fs::File::create(&pre_existing).unwrap();
|
||||||
|
let sock = TempPathBuf::new("test");
|
||||||
|
let sock_path = std::path::PathBuf::from(sock.as_ref());
|
||||||
|
assert!(sock_path.exists());
|
||||||
|
std::mem::drop(sock);
|
||||||
|
assert!(!sock_path.exists());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn temp_path_buf_for_socket() {
|
||||||
|
let sock = TempPathBuf::with_random("test");
|
||||||
|
// Save path for testing after drop
|
||||||
|
let sock_path = std::path::PathBuf::from(sock.as_ref());
|
||||||
|
// create the actual socket
|
||||||
|
let _ = listen(&sock, SymmetricalJson::<String>::default).await;
|
||||||
|
assert!(sock_path.exists());
|
||||||
|
std::mem::drop(sock);
|
||||||
|
assert!(!sock_path.exists());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::Transport;
|
use super::Transport;
|
||||||
@@ -291,7 +555,7 @@ mod tests {
|
|||||||
use tokio_serde::formats::SymmetricalJson;
|
use tokio_serde::formats::SymmetricalJson;
|
||||||
|
|
||||||
fn ctx() -> Context<'static> {
|
fn ctx() -> Context<'static> {
|
||||||
Context::from_waker(&noop_waker_ref())
|
Context::from_waker(noop_waker_ref())
|
||||||
}
|
}
|
||||||
|
|
||||||
struct TestIo(Cursor<Vec<u8>>);
|
struct TestIo(Cursor<Vec<u8>>);
|
||||||
@@ -393,4 +657,24 @@ mod tests {
|
|||||||
assert_matches!(transport.next().await, None);
|
assert_matches!(transport.next().await, None);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(all(unix, feature = "unix"))]
|
||||||
|
#[tokio::test]
|
||||||
|
async fn uds() -> io::Result<()> {
|
||||||
|
use super::unix;
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
let sock = unix::TempPathBuf::with_random("uds");
|
||||||
|
let mut listener = unix::listen(&sock, SymmetricalJson::<String>::default).await?;
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut transport = listener.next().await.unwrap().unwrap();
|
||||||
|
let message = transport.next().await.unwrap().unwrap();
|
||||||
|
transport.send(message).await.unwrap();
|
||||||
|
});
|
||||||
|
let mut transport = unix::connect(&sock, SymmetricalJson::<String>::default).await?;
|
||||||
|
transport.send(String::from("test")).await?;
|
||||||
|
assert_matches!(transport.next().await, Some(Ok(s)) if s == "test");
|
||||||
|
assert_matches!(transport.next().await, None);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -837,9 +837,10 @@ mod tests {
|
|||||||
channel::Channel<Response<Resp>, ClientMessage<Req>>,
|
channel::Channel<Response<Resp>, ClientMessage<Req>>,
|
||||||
) {
|
) {
|
||||||
let (tx, rx) = crate::transport::channel::bounded(capacity);
|
let (tx, rx) = crate::transport::channel::bounded(capacity);
|
||||||
let mut config = Config::default();
|
|
||||||
// Add 1 because capacity 0 is not supported (but is supported by transport::channel::bounded).
|
// Add 1 because capacity 0 is not supported (but is supported by transport::channel::bounded).
|
||||||
config.pending_response_buffer = capacity + 1;
|
let config = Config {
|
||||||
|
pending_response_buffer: capacity + 1,
|
||||||
|
};
|
||||||
(Box::pin(BaseChannel::new(config, rx).requests()), tx)
|
(Box::pin(BaseChannel::new(config, rx).requests()), tx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -176,7 +176,7 @@ mod tests {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
let mut abortable_future = Box::new(Abortable::new(pending::<()>(), abort_registration));
|
let mut abortable_future = Box::new(Abortable::new(pending::<()>(), abort_registration));
|
||||||
|
|
||||||
assert_eq!(in_flight_requests.cancel_request(0), true);
|
assert!(in_flight_requests.cancel_request(0));
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
abortable_future.poll_unpin(&mut noop_context()),
|
abortable_future.poll_unpin(&mut noop_context()),
|
||||||
Poll::Ready(Err(_))
|
Poll::Ready(Err(_))
|
||||||
|
|||||||
@@ -282,7 +282,7 @@ where
|
|||||||
fn ctx() -> Context<'static> {
|
fn ctx() -> Context<'static> {
|
||||||
use futures::task::*;
|
use futures::task::*;
|
||||||
|
|
||||||
Context::from_waker(&noop_waker_ref())
|
Context::from_waker(noop_waker_ref())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
@@ -134,5 +134,5 @@ impl<T> PollExt for Poll<Option<T>> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn cx() -> Context<'static> {
|
pub fn cx() -> Context<'static> {
|
||||||
Context::from_waker(&noop_waker_ref())
|
Context::from_waker(noop_waker_ref())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
error: unused `Connect` that must be used
|
error: unused `tarpc::serde_transport::tcp::Connect` that must be used
|
||||||
--> tests/compile_fail/serde_transport/must_use_tcp_connect.rs:7:9
|
--> tests/compile_fail/serde_transport/must_use_tcp_connect.rs:7:9
|
||||||
|
|
|
|
||||||
7 | serde_transport::tcp::connect::<_, (), (), _, _>("0.0.0.0:0", Json::default);
|
7 | serde_transport::tcp::connect::<_, (), (), _, _>("0.0.0.0:0", Json::default);
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ use tarpc::{
|
|||||||
use tokio_serde::formats::Json;
|
use tokio_serde::formats::Json;
|
||||||
|
|
||||||
#[tarpc::derive_serde]
|
#[tarpc::derive_serde]
|
||||||
#[derive(Debug, PartialEq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
pub enum TestData {
|
pub enum TestData {
|
||||||
Black,
|
Black,
|
||||||
White,
|
White,
|
||||||
|
|||||||
@@ -108,7 +108,7 @@ async fn dropped_channel_aborts_in_flight_requests() -> anyhow::Result<()> {
|
|||||||
|
|
||||||
#[cfg(all(feature = "serde-transport", feature = "tcp"))]
|
#[cfg(all(feature = "serde-transport", feature = "tcp"))]
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn serde() -> anyhow::Result<()> {
|
async fn serde_tcp() -> anyhow::Result<()> {
|
||||||
use tarpc::serde_transport;
|
use tarpc::serde_transport;
|
||||||
use tokio_serde::formats::Json;
|
use tokio_serde::formats::Json;
|
||||||
|
|
||||||
@@ -136,6 +136,37 @@ async fn serde() -> anyhow::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(all(feature = "serde-transport", feature = "unix", unix))]
|
||||||
|
#[tokio::test]
|
||||||
|
async fn serde_uds() -> anyhow::Result<()> {
|
||||||
|
use tarpc::serde_transport;
|
||||||
|
use tokio_serde::formats::Json;
|
||||||
|
|
||||||
|
let _ = tracing_subscriber::fmt::try_init();
|
||||||
|
|
||||||
|
let sock = tarpc::serde_transport::unix::TempPathBuf::with_random("uds");
|
||||||
|
let transport = tarpc::serde_transport::unix::listen(&sock, Json::default).await?;
|
||||||
|
tokio::spawn(
|
||||||
|
transport
|
||||||
|
.take(1)
|
||||||
|
.filter_map(|r| async { r.ok() })
|
||||||
|
.map(BaseChannel::with_defaults)
|
||||||
|
.execute(Server.serve()),
|
||||||
|
);
|
||||||
|
|
||||||
|
let transport = serde_transport::unix::connect(&sock, Json::default).await?;
|
||||||
|
let client = ServiceClient::new(client::Config::default(), transport).spawn();
|
||||||
|
|
||||||
|
// Save results using socket so we can clean the socket even if our test assertions fail
|
||||||
|
let res1 = client.add(context::current(), 1, 2).await;
|
||||||
|
let res2 = client.hey(context::current(), "Tim".to_string()).await;
|
||||||
|
|
||||||
|
assert_matches!(res1, Ok(3));
|
||||||
|
assert_matches!(res2, Ok(ref s) if s == "Hey, Tim.");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn concurrent() -> anyhow::Result<()> {
|
async fn concurrent() -> anyhow::Result<()> {
|
||||||
let _ = tracing_subscriber::fmt::try_init();
|
let _ = tracing_subscriber::fmt::try_init();
|
||||||
|
|||||||
Reference in New Issue
Block a user