8 Commits

Author SHA1 Message Date
Tim Kuehn
0c08d5e8ca Prepare release of v0.31.0 2022-11-03 13:29:46 -07:00
Tim Kuehn
75b15fe2aa Address clippy lint 2022-10-07 10:51:45 -07:00
Tim Kuehn
863a08d87e In example-service, print the port the server is listened on.
This is helpful when passing starting the server with --port 0.
2022-10-06 20:58:54 -07:00
Tim Kuehn
49ba8f8b1b Zero-pad the random number suffix of TempPathBufs.
This way, the hex number is always 16 digits, which is helpful for test
verification as well as simple consistency.
2022-10-03 18:50:50 -07:00
Kevin K
d832209da3 feat: Unix domain sockets with serde transports (#380)
* adds support for Unix Domain Socket generic transports
* adds a TempPathBuf that lives in temp and is removed on drop
2022-10-03 18:07:29 -07:00
royrustdev
584426d414 fix clippy warnings #378 2022-09-19 23:26:07 -07:00
royrustdev
50eb80c883 reference latest tarpc version in readme 2022-09-19 21:58:21 -07:00
royrustdev
1f0c80d8c9 bump github actions 2022-09-15 11:17:58 -07:00
16 changed files with 359 additions and 33 deletions

View File

@@ -14,10 +14,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Cancel previous
uses: styfle/cancel-workflow-action@0.7.0
uses: styfle/cancel-workflow-action@0.10.0
with:
access_token: ${{ github.token }}
- uses: actions/checkout@v1
- uses: actions/checkout@v3
- uses: actions-rs/toolchain@v1
with:
profile: minimal
@@ -38,10 +38,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Cancel previous
uses: styfle/cancel-workflow-action@0.7.0
uses: styfle/cancel-workflow-action@0.10.0
with:
access_token: ${{ github.token }}
- uses: actions/checkout@v1
- uses: actions/checkout@v3
- uses: actions-rs/toolchain@v1
with:
profile: minimal
@@ -76,10 +76,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Cancel previous
uses: styfle/cancel-workflow-action@0.7.0
uses: styfle/cancel-workflow-action@0.10.0
with:
access_token: ${{ github.token }}
- uses: actions/checkout@v1
- uses: actions/checkout@v3
- uses: actions-rs/toolchain@v1
with:
profile: minimal
@@ -96,10 +96,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Cancel previous
uses: styfle/cancel-workflow-action@0.7.0
uses: styfle/cancel-workflow-action@0.10.0
with:
access_token: ${{ github.token }}
- uses: actions/checkout@v1
- uses: actions/checkout@v3
- uses: actions-rs/toolchain@v1
with:
profile: minimal

View File

@@ -67,7 +67,7 @@ Some other features of tarpc:
Add to your `Cargo.toml` dependencies:
```toml
tarpc = "0.29"
tarpc = "0.31"
```
The `tarpc::service` attribute expands to a collection of items that form an rpc service.
@@ -82,7 +82,7 @@ your `Cargo.toml`:
```toml
anyhow = "1.0"
futures = "0.3"
tarpc = { version = "0.29", features = ["tokio1"] }
tarpc = { version = "0.31", features = ["tokio1"] }
tokio = { version = "1.0", features = ["macros"] }
```

View File

@@ -1,3 +1,10 @@
## 0.31.0 (2022-11-03)
### New Features
This release adds Unix Domain Sockets to the `serde_transport` module.
To use it, enable the "unix" feature. See the docs for more information.
## 0.30.0 (2022-08-12)
### Breaking Changes

View File

@@ -1,6 +1,6 @@
[package]
name = "tarpc-example-service"
version = "0.12.0"
version = "0.13.0"
rust-version = "1.56"
authors = ["Tim Kuehn <tikue@google.com>"]
edition = "2021"
@@ -21,7 +21,7 @@ futures = "0.3"
opentelemetry = { version = "0.17", features = ["rt-tokio"] }
opentelemetry-jaeger = { version = "0.16", features = ["rt-tokio"] }
rand = "0.8"
tarpc = { version = "0.30", path = "../tarpc", features = ["full"] }
tarpc = { version = "0.31", path = "../tarpc", features = ["full"] }
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread"] }
tracing = { version = "0.1" }
tracing-opentelemetry = "0.17"

View File

@@ -54,6 +54,7 @@ async fn main() -> anyhow::Result<()> {
// JSON transport is provided by the json_transport tarpc module. It makes it easy
// to start up a serde-powered json serialization strategy over TCP.
let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Json::default).await?;
tracing::info!("Listening on port {}", listener.local_addr().port());
listener.config_mut().max_frame_length(usize::MAX);
listener
// Ignore accept errors.

View File

@@ -285,7 +285,7 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
args,
method_attrs: &rpcs.iter().map(|rpc| &*rpc.attrs).collect::<Vec<_>>(),
method_idents: &methods,
request_names: &*request_names,
request_names: &request_names,
attrs,
rpcs,
return_types: &rpcs

View File

@@ -1,6 +1,6 @@
[package]
name = "tarpc"
version = "0.30.0"
version = "0.31.0"
rust-version = "1.58.0"
authors = [
"Adam Wright <adam.austin.wright@gmail.com>",
@@ -13,7 +13,7 @@ homepage = "https://github.com/google/tarpc"
repository = "https://github.com/google/tarpc"
keywords = ["rpc", "network", "server", "api", "microservices"]
categories = ["asynchronous", "network-programming"]
readme = "../README.md"
readme = "README.md"
description = "An RPC framework for Rust with a focus on ease of use."
[features]
@@ -25,6 +25,7 @@ serde-transport = ["serde1", "tokio1", "tokio-serde", "tokio-util/codec"]
serde-transport-json = ["tokio-serde/json"]
serde-transport-bincode = ["tokio-serde/bincode"]
tcp = ["tokio/net"]
unix = ["tokio/net"]
full = [
"serde1",
@@ -33,6 +34,7 @@ full = [
"serde-transport-json",
"serde-transport-bincode",
"tcp",
"unix",
]
[badges]

View File

@@ -628,7 +628,7 @@ mod tests {
#[tokio::test]
async fn response_completes_request_future() {
let (mut dispatch, mut _channel, mut server_channel) = set_up();
let cx = &mut Context::from_waker(&noop_waker_ref());
let cx = &mut Context::from_waker(noop_waker_ref());
let (tx, mut rx) = oneshot::channel();
dispatch
@@ -656,7 +656,7 @@ mod tests {
request_id: 3,
});
// resp's drop() is run, which should send a cancel message.
let cx = &mut Context::from_waker(&noop_waker_ref());
let cx = &mut Context::from_waker(noop_waker_ref());
assert_eq!(canceled_requests.poll_recv(cx), Poll::Ready(Some(3)));
}
@@ -679,14 +679,14 @@ mod tests {
.await
.unwrap();
drop(cancellation);
let cx = &mut Context::from_waker(&noop_waker_ref());
let cx = &mut Context::from_waker(noop_waker_ref());
assert_eq!(canceled_requests.poll_recv(cx), Poll::Ready(None));
}
#[tokio::test]
async fn stage_request() {
let (mut dispatch, mut channel, _server_channel) = set_up();
let cx = &mut Context::from_waker(&noop_waker_ref());
let cx = &mut Context::from_waker(noop_waker_ref());
let (tx, mut rx) = oneshot::channel();
let _resp = send_request(&mut channel, "hi", tx, &mut rx).await;
@@ -704,7 +704,7 @@ mod tests {
#[tokio::test]
async fn stage_request_channel_dropped_doesnt_panic() {
let (mut dispatch, mut channel, mut server_channel) = set_up();
let cx = &mut Context::from_waker(&noop_waker_ref());
let cx = &mut Context::from_waker(noop_waker_ref());
let (tx, mut rx) = oneshot::channel();
let _ = send_request(&mut channel, "hi", tx, &mut rx).await;
@@ -726,7 +726,7 @@ mod tests {
#[tokio::test]
async fn stage_request_response_future_dropped_is_canceled_before_sending() {
let (mut dispatch, mut channel, _server_channel) = set_up();
let cx = &mut Context::from_waker(&noop_waker_ref());
let cx = &mut Context::from_waker(noop_waker_ref());
let (tx, mut rx) = oneshot::channel();
let _ = send_request(&mut channel, "hi", tx, &mut rx).await;
@@ -742,7 +742,7 @@ mod tests {
#[tokio::test]
async fn stage_request_response_future_dropped_is_canceled_after_sending() {
let (mut dispatch, mut channel, _server_channel) = set_up();
let cx = &mut Context::from_waker(&noop_waker_ref());
let cx = &mut Context::from_waker(noop_waker_ref());
let (tx, mut rx) = oneshot::channel();
let req = send_request(&mut channel, "hi", tx, &mut rx).await;
@@ -763,7 +763,7 @@ mod tests {
#[tokio::test]
async fn stage_request_response_closed_skipped() {
let (mut dispatch, mut channel, _server_channel) = set_up();
let cx = &mut Context::from_waker(&noop_waker_ref());
let cx = &mut Context::from_waker(noop_waker_ref());
let (tx, mut rx) = oneshot::channel();
// Test that a request future that's closed its receiver but not yet canceled its request --
@@ -796,7 +796,7 @@ mod tests {
let dispatch = RequestDispatch::<String, String, _> {
transport: client_channel.fuse(),
pending_requests: pending_requests,
pending_requests,
canceled_requests,
in_flight_requests: InFlightRequests::default(),
config: Config::default(),

View File

@@ -277,6 +277,270 @@ pub mod tcp {
}
}
#[cfg(all(unix, feature = "unix"))]
#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "unix"))))]
/// Unix Domain Socket support for generic transport using Tokio.
pub mod unix {
use {
super::*,
futures::ready,
std::{marker::PhantomData, path::Path},
tokio::net::{unix::SocketAddr, UnixListener, UnixStream},
tokio_util::codec::length_delimited,
};
impl<Item, SinkItem, Codec> Transport<UnixStream, Item, SinkItem, Codec> {
/// Returns the socket address of the remote half of the underlying [`UnixStream`].
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.inner.get_ref().get_ref().peer_addr()
}
/// Returns the socket address of the local half of the underlying [`UnixStream`].
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.inner.get_ref().get_ref().local_addr()
}
}
/// A connection Future that also exposes the length-delimited framing config.
#[must_use]
#[pin_project]
pub struct Connect<T, Item, SinkItem, CodecFn> {
#[pin]
inner: T,
codec_fn: CodecFn,
config: length_delimited::Builder,
ghost: PhantomData<(fn(SinkItem), fn() -> Item)>,
}
impl<T, Item, SinkItem, Codec, CodecFn> Future for Connect<T, Item, SinkItem, CodecFn>
where
T: Future<Output = io::Result<UnixStream>>,
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
Codec: Serializer<SinkItem> + Deserializer<Item>,
CodecFn: Fn() -> Codec,
{
type Output = io::Result<Transport<UnixStream, Item, SinkItem, Codec>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let io = ready!(self.as_mut().project().inner.poll(cx))?;
Poll::Ready(Ok(new(self.config.new_framed(io), (self.codec_fn)())))
}
}
impl<T, Item, SinkItem, CodecFn> Connect<T, Item, SinkItem, CodecFn> {
/// Returns an immutable reference to the length-delimited codec's config.
pub fn config(&self) -> &length_delimited::Builder {
&self.config
}
/// Returns a mutable reference to the length-delimited codec's config.
pub fn config_mut(&mut self) -> &mut length_delimited::Builder {
&mut self.config
}
}
/// Connects to socket named by `path`, wrapping the connection in a Unix Domain Socket
/// transport.
pub fn connect<P, Item, SinkItem, Codec, CodecFn>(
path: P,
codec_fn: CodecFn,
) -> Connect<impl Future<Output = io::Result<UnixStream>>, Item, SinkItem, CodecFn>
where
P: AsRef<Path>,
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
Codec: Serializer<SinkItem> + Deserializer<Item>,
CodecFn: Fn() -> Codec,
{
Connect {
inner: UnixStream::connect(path),
codec_fn,
config: LengthDelimitedCodec::builder(),
ghost: PhantomData,
}
}
/// Listens on the socket named by `path`, wrapping accepted connections in Unix Domain Socket
/// transports.
pub async fn listen<P, Item, SinkItem, Codec, CodecFn>(
path: P,
codec_fn: CodecFn,
) -> io::Result<Incoming<Item, SinkItem, Codec, CodecFn>>
where
P: AsRef<Path>,
Item: for<'de> Deserialize<'de>,
Codec: Serializer<SinkItem> + Deserializer<Item>,
CodecFn: Fn() -> Codec,
{
let listener = UnixListener::bind(path)?;
let local_addr = listener.local_addr()?;
Ok(Incoming {
listener,
codec_fn,
local_addr,
config: LengthDelimitedCodec::builder(),
ghost: PhantomData,
})
}
/// A [`UnixListener`] that wraps connections in [transports](Transport).
#[pin_project]
#[derive(Debug)]
pub struct Incoming<Item, SinkItem, Codec, CodecFn> {
listener: UnixListener,
local_addr: SocketAddr,
codec_fn: CodecFn,
config: length_delimited::Builder,
ghost: PhantomData<(fn() -> Item, fn(SinkItem), Codec)>,
}
impl<Item, SinkItem, Codec, CodecFn> Incoming<Item, SinkItem, Codec, CodecFn> {
/// Returns the the socket address being listened on.
pub fn local_addr(&self) -> &SocketAddr {
&self.local_addr
}
/// Returns an immutable reference to the length-delimited codec's config.
pub fn config(&self) -> &length_delimited::Builder {
&self.config
}
/// Returns a mutable reference to the length-delimited codec's config.
pub fn config_mut(&mut self) -> &mut length_delimited::Builder {
&mut self.config
}
}
impl<Item, SinkItem, Codec, CodecFn> Stream for Incoming<Item, SinkItem, Codec, CodecFn>
where
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
Codec: Serializer<SinkItem> + Deserializer<Item>,
CodecFn: Fn() -> Codec,
{
type Item = io::Result<Transport<UnixStream, Item, SinkItem, Codec>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let conn: UnixStream = ready!(self.as_mut().project().listener.poll_accept(cx)?).0;
Poll::Ready(Some(Ok(new(
self.config.new_framed(conn),
(self.codec_fn)(),
))))
}
}
/// A temporary `PathBuf` that lives in `std::env::temp_dir` and is removed on drop.
pub struct TempPathBuf(std::path::PathBuf);
impl TempPathBuf {
/// A named socket that results in `<tempdir>/<name>`
pub fn new<S: AsRef<str>>(name: S) -> Self {
let mut sock = std::env::temp_dir();
sock.push(name.as_ref());
Self(sock)
}
/// Appends a random hex string to the socket name resulting in
/// `<tempdir>/<name>_<xxxxx>`
pub fn with_random<S: AsRef<str>>(name: S) -> Self {
Self::new(format!("{}_{:016x}", name.as_ref(), rand::random::<u64>()))
}
}
impl AsRef<std::path::Path> for TempPathBuf {
fn as_ref(&self) -> &std::path::Path {
self.0.as_path()
}
}
impl Drop for TempPathBuf {
fn drop(&mut self) {
// This will remove the file pointed to by this PathBuf if it exists, however Err's can
// be returned such as attempting to remove a non-existing file, or one which we don't
// have permission to remove. In these cases the Err is swallowed
let _ = std::fs::remove_file(&self.0);
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio_serde::formats::SymmetricalJson;
#[test]
fn temp_path_buf_non_random() {
let sock = TempPathBuf::new("test");
let mut good = std::env::temp_dir();
good.push("test");
assert_eq!(sock.as_ref(), good);
assert_eq!(sock.as_ref().file_name().unwrap(), "test");
}
#[test]
fn temp_path_buf_random() {
let sock = TempPathBuf::with_random("test");
let good = std::env::temp_dir();
assert!(sock.as_ref().starts_with(good));
// Since there are 16 random characters we just assert the file_name has the right name
// and starts with the correct string 'test_'
// file name: test_xxxxxxxxxxxxxxxx
// test = 4
// _ = 1
// <hex> = 16
// total = 21
let fname = sock.as_ref().file_name().unwrap().to_string_lossy();
assert!(fname.starts_with("test_"));
assert_eq!(fname.len(), 21);
}
#[test]
fn temp_path_buf_non_existing() {
let sock = TempPathBuf::with_random("test");
let sock_path = std::path::PathBuf::from(sock.as_ref());
// No actual file has been created yet
assert!(!sock_path.exists());
// Should not panic
std::mem::drop(sock);
assert!(!sock_path.exists());
}
#[test]
fn temp_path_buf_existing_file() {
let sock = TempPathBuf::with_random("test");
let sock_path = std::path::PathBuf::from(sock.as_ref());
let _file = std::fs::File::create(&sock).unwrap();
assert!(sock_path.exists());
std::mem::drop(sock);
assert!(!sock_path.exists());
}
#[test]
fn temp_path_buf_preexisting_file() {
let mut pre_existing = std::env::temp_dir();
pre_existing.push("test");
let _file = std::fs::File::create(&pre_existing).unwrap();
let sock = TempPathBuf::new("test");
let sock_path = std::path::PathBuf::from(sock.as_ref());
assert!(sock_path.exists());
std::mem::drop(sock);
assert!(!sock_path.exists());
}
#[tokio::test]
async fn temp_path_buf_for_socket() {
let sock = TempPathBuf::with_random("test");
// Save path for testing after drop
let sock_path = std::path::PathBuf::from(sock.as_ref());
// create the actual socket
let _ = listen(&sock, SymmetricalJson::<String>::default).await;
assert!(sock_path.exists());
std::mem::drop(sock);
assert!(!sock_path.exists());
}
}
}
#[cfg(test)]
mod tests {
use super::Transport;
@@ -291,7 +555,7 @@ mod tests {
use tokio_serde::formats::SymmetricalJson;
fn ctx() -> Context<'static> {
Context::from_waker(&noop_waker_ref())
Context::from_waker(noop_waker_ref())
}
struct TestIo(Cursor<Vec<u8>>);
@@ -393,4 +657,24 @@ mod tests {
assert_matches!(transport.next().await, None);
Ok(())
}
#[cfg(all(unix, feature = "unix"))]
#[tokio::test]
async fn uds() -> io::Result<()> {
use super::unix;
use super::*;
let sock = unix::TempPathBuf::with_random("uds");
let mut listener = unix::listen(&sock, SymmetricalJson::<String>::default).await?;
tokio::spawn(async move {
let mut transport = listener.next().await.unwrap().unwrap();
let message = transport.next().await.unwrap().unwrap();
transport.send(message).await.unwrap();
});
let mut transport = unix::connect(&sock, SymmetricalJson::<String>::default).await?;
transport.send(String::from("test")).await?;
assert_matches!(transport.next().await, Some(Ok(s)) if s == "test");
assert_matches!(transport.next().await, None);
Ok(())
}
}

View File

@@ -837,9 +837,10 @@ mod tests {
channel::Channel<Response<Resp>, ClientMessage<Req>>,
) {
let (tx, rx) = crate::transport::channel::bounded(capacity);
let mut config = Config::default();
// Add 1 because capacity 0 is not supported (but is supported by transport::channel::bounded).
config.pending_response_buffer = capacity + 1;
let config = Config {
pending_response_buffer: capacity + 1,
};
(Box::pin(BaseChannel::new(config, rx).requests()), tx)
}

View File

@@ -176,7 +176,7 @@ mod tests {
.unwrap();
let mut abortable_future = Box::new(Abortable::new(pending::<()>(), abort_registration));
assert_eq!(in_flight_requests.cancel_request(0), true);
assert!(in_flight_requests.cancel_request(0));
assert_matches!(
abortable_future.poll_unpin(&mut noop_context()),
Poll::Ready(Err(_))

View File

@@ -282,7 +282,7 @@ where
fn ctx() -> Context<'static> {
use futures::task::*;
Context::from_waker(&noop_waker_ref())
Context::from_waker(noop_waker_ref())
}
#[test]

View File

@@ -134,5 +134,5 @@ impl<T> PollExt for Poll<Option<T>> {
}
pub fn cx() -> Context<'static> {
Context::from_waker(&noop_waker_ref())
Context::from_waker(noop_waker_ref())
}

View File

@@ -1,4 +1,4 @@
error: unused `Connect` that must be used
error: unused `tarpc::serde_transport::tcp::Connect` that must be used
--> tests/compile_fail/serde_transport/must_use_tcp_connect.rs:7:9
|
7 | serde_transport::tcp::connect::<_, (), (), _, _>("0.0.0.0:0", Json::default);

View File

@@ -7,7 +7,7 @@ use tarpc::{
use tokio_serde::formats::Json;
#[tarpc::derive_serde]
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Eq)]
pub enum TestData {
Black,
White,

View File

@@ -108,7 +108,7 @@ async fn dropped_channel_aborts_in_flight_requests() -> anyhow::Result<()> {
#[cfg(all(feature = "serde-transport", feature = "tcp"))]
#[tokio::test]
async fn serde() -> anyhow::Result<()> {
async fn serde_tcp() -> anyhow::Result<()> {
use tarpc::serde_transport;
use tokio_serde::formats::Json;
@@ -136,6 +136,37 @@ async fn serde() -> anyhow::Result<()> {
Ok(())
}
#[cfg(all(feature = "serde-transport", feature = "unix", unix))]
#[tokio::test]
async fn serde_uds() -> anyhow::Result<()> {
use tarpc::serde_transport;
use tokio_serde::formats::Json;
let _ = tracing_subscriber::fmt::try_init();
let sock = tarpc::serde_transport::unix::TempPathBuf::with_random("uds");
let transport = tarpc::serde_transport::unix::listen(&sock, Json::default).await?;
tokio::spawn(
transport
.take(1)
.filter_map(|r| async { r.ok() })
.map(BaseChannel::with_defaults)
.execute(Server.serve()),
);
let transport = serde_transport::unix::connect(&sock, Json::default).await?;
let client = ServiceClient::new(client::Config::default(), transport).spawn();
// Save results using socket so we can clean the socket even if our test assertions fail
let res1 = client.add(context::current(), 1, 2).await;
let res2 = client.hey(context::current(), "Tim".to_string()).await;
assert_matches!(res1, Ok(3));
assert_matches!(res2, Ok(ref s) if s == "Hey, Tim.");
Ok(())
}
#[tokio::test]
async fn concurrent() -> anyhow::Result<()> {
let _ = tracing_subscriber::fmt::try_init();