19 Commits

Author SHA1 Message Date
Tim Kuehn
bc982c5584 Prepare release of v0.24.1 2020-12-28 15:42:11 -08:00
Logan Magee
d440e12c19 Bump tokio to 1.0 (#337)
Co-authored-by: Artem Vorotnikov <artem@vorotnikov.me>
2020-12-23 22:49:02 -08:00
Frederik-Baetens
bc8128af69 add serde derivation alias macro (#333) 2020-11-13 14:36:59 -08:00
Tim Kuehn
1d87c14262 Fix github actions config - take 3 2020-11-12 12:33:10 -08:00
Tim Kuehn
ca929c2178 Fix github actions config - take 2 2020-11-12 12:24:46 -08:00
Tim Kuehn
569039734b Fix github actions config 2020-11-12 12:13:10 -08:00
Tim Kuehn
3d43310e6a Make 'cargo test' succeed again 2020-11-12 11:59:39 -08:00
Tim Kuehn
d21cbddb0d Cargo test should pass without features enabled 2020-11-12 11:57:08 -08:00
Frederik-Baetens
25aa857edf Reexport/tokio serde (#332)
Re-export tokio_serde when the serde-transport feature is enabled.
2020-11-09 12:56:46 -08:00
Frederik-Baetens
0bb2e2bbbe re-export serde (#330)
* re-export serde
* make serde re-export dependent on serde1 feature flag
* update missing_async compile test case
2020-11-09 11:42:28 -08:00
chansuke
dc376343d6 Remove #[derive(Debug)] from library structs (#327)
* Remove `#[derive(Debug)]` from library structs
* Add manual debug impl for backward compatibility
2020-11-04 11:24:57 -08:00
Artem Vorotnikov
2e7d1f8a88 Bump dependencies (#328) 2020-10-31 09:43:40 -07:00
Tim Kuehn
6314591c65 Add tokio's macros feature to readme example's dependencies 2020-10-30 17:29:14 -07:00
Tim Kuehn
7dd7494420 Prepare v0.23.1 release 2020-10-29 18:54:35 -07:00
Tim Kuehn
6c10e3649f Fix tokio required features 2020-10-29 18:53:04 -07:00
Tim Kuehn
4c6dee13d2 cargo fmt 2020-10-29 00:44:15 -07:00
Bernardo Meurer
e45abe953a tarpc: enable tokio's time feature (#325) 2020-10-29 00:43:38 -07:00
Tim Kuehn
dec3e491b5 Fix unused import 2020-10-27 15:52:11 -07:00
Kitsu
6ce341cf79 Add example for custom transport usage (#322) 2020-10-23 14:28:26 -07:00
22 changed files with 260 additions and 60 deletions

View File

@@ -28,6 +28,25 @@ jobs:
profile: minimal
toolchain: stable
override: true
- uses: actions-rs/cargo@v1
with:
command: test
- uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path tarpc/Cargo.toml --features serde1
- uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path tarpc/Cargo.toml --features tokio1
- uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path tarpc/Cargo.toml --features serde-transport
- uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path tarpc/Cargo.toml --features tcp
- uses: actions-rs/cargo@v1
with:
command: test

View File

@@ -59,7 +59,7 @@ Some other features of tarpc:
Add to your `Cargo.toml` dependencies:
```toml
tarpc = "0.23.0"
tarpc = "0.24"
```
The `tarpc::service` attribute expands to a collection of items that form an rpc service.
@@ -68,12 +68,13 @@ Simply implement the generated service trait, and you're off to the races!
## Example
For this example, in addition to tarpc, also add two other dependencies to
This example uses [tokio](https://tokio.rs), so add the following dependencies to
your `Cargo.toml`:
```toml
futures = "0.3"
tokio = "0.3"
futures = "1.0"
tarpc = { version = "0.24", features = ["tokio1"] }
tokio = { version = "1.0", features = ["macros"] }
```
In the following example, we use an in-process channel for communication between

View File

@@ -1,3 +1,13 @@
## 0.24.1 (2020-12-28)
### Breaking Changes
Upgrades tokio to 1.0.
## 0.24.0 (2020-12-28)
This release was yanked.
## 0.23.0 (2020-10-19)
### Breaking Changes

View File

@@ -1,6 +1,6 @@
[package]
name = "tarpc-example-service"
version = "0.6.0"
version = "0.8.0"
authors = ["Tim Kuehn <tikue@google.com>"]
edition = "2018"
license = "MIT"
@@ -17,10 +17,8 @@ clap = "2.33"
env_logger = "0.8"
futures = "0.3"
serde = { version = "1.0" }
tarpc = { version = "0.23", path = "../tarpc", features = ["full"] }
tokio = { version = "0.3", features = ["full"] }
tokio-serde = { version = "0.6", features = ["json"] }
tokio-util = { version = "0.4", features = ["codec"] }
tarpc = { version = "0.24", path = "../tarpc", features = ["full"] }
tokio = { version = "1", features = ["full"] }
[lib]
name = "service"

View File

@@ -6,8 +6,7 @@
use clap::{App, Arg};
use std::{io, net::SocketAddr};
use tarpc::{client, context};
use tokio_serde::formats::Json;
use tarpc::{client, context, tokio_serde::formats::Json};
#[tokio::main]
async fn main() -> io::Result<()> {

View File

@@ -14,8 +14,8 @@ use std::{
use tarpc::{
context,
server::{self, Channel, Handler},
tokio_serde::formats::Json,
};
use tokio_serde::formats::Json;
// This is the type that implements the generated World trait. It is the business logic
// and is used to start the server.

View File

@@ -1,6 +1,6 @@
[package]
name = "tarpc-plugins"
version = "0.8.0"
version = "0.9.0"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
edition = "2018"
license = "MIT"

View File

@@ -215,6 +215,19 @@ impl Parse for DeriveSerde {
}
}
/// Generates:
/// - derive of Debug, serde Serialize & Deserialize
/// - serde crate annotation
#[proc_macro_attribute]
pub fn derive_serde(_attr: TokenStream, item: TokenStream) -> TokenStream {
let mut gen: proc_macro2::TokenStream = quote! {
#[derive(tarpc::serde::Serialize, tarpc::serde::Deserialize)]
#[serde(crate = "tarpc::serde")]
};
gen.extend(proc_macro2::TokenStream::from(item));
proc_macro::TokenStream::from(gen)
}
/// Generates:
/// - service trait
/// - serve fn
@@ -240,7 +253,10 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
let args: &[&[PatType]] = &rpcs.iter().map(|rpc| &*rpc.args).collect::<Vec<_>>();
let response_fut_name = &format!("{}ResponseFut", ident.unraw());
let derive_serialize = if derive_serde.0 {
Some(quote!(#[derive(serde::Serialize, serde::Deserialize)]))
Some(
quote! {#[derive(tarpc::serde::Serialize, tarpc::serde::Deserialize)]
#[serde(crate = "tarpc::serde")]},
)
} else {
None
};

View File

@@ -1,6 +1,6 @@
[package]
name = "tarpc"
version = "0.23.0"
version = "0.24.1"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
edition = "2018"
license = "MIT"
@@ -16,9 +16,9 @@ description = "An RPC framework for Rust with a focus on ease of use."
default = []
serde1 = ["tarpc-plugins/serde1", "serde", "serde/derive"]
tokio1 = []
serde-transport = ["tokio-serde", "tokio-util/codec"]
tcp = ["tokio/net", "tokio/stream"]
tokio1 = ["tokio/rt-multi-thread"]
serde-transport = ["serde1", "tokio1", "tokio-serde/json", "tokio-util/codec"]
tcp = ["tokio/net"]
full = ["serde1", "tokio1", "serde-transport", "tcp"]
@@ -35,28 +35,32 @@ pin-project = "1.0"
rand = "0.7"
serde = { optional = true, version = "1.0", features = ["derive"] }
static_assertions = "1.1.0"
tarpc-plugins = { path = "../plugins", version = "0.8" }
tokio = { version = "0.3" }
tokio-util = { optional = true, version = "0.4" }
tokio-serde = { optional = true, version = "0.6" }
tarpc-plugins = { path = "../plugins", version = "0.9" }
tokio = { version = "1", features = ["time"] }
tokio-util = { optional = true, version = "0.6" }
tokio-serde = { optional = true, version = "0.8" }
[dev-dependencies]
assert_matches = "1.4"
bincode = "1.3"
bytes = { version = "0.5", features = ["serde"] }
bytes = { version = "1", features = ["serde"] }
env_logger = "0.8"
flate2 = "1.0"
log = "0.4"
pin-utils = "0.1.0-alpha"
serde_bytes = "0.11"
tokio = { version = "0.3", features = ["full"] }
tokio-serde = { version = "0.6", features = ["json", "bincode"] }
tokio = { version = "1", features = ["full"] }
tokio-serde = { version = "0.8", features = ["json", "bincode"] }
trybuild = "1.0"
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[[example]]
name = "compression"
required-features = ["serde-transport", "tcp"]
[[example]]
name = "server_calling_server"
required-features = ["full"]
@@ -68,3 +72,15 @@ required-features = ["full"]
[[example]]
name = "pubsub"
required-features = ["full"]
[[example]]
name = "custom_transport"
required-features = ["serde1", "tokio1", "serde-transport"]
[[test]]
name = "service_functional"
required-features = ["serde-transport"]
[[test]]
name = "dataservice"
required-features = ["serde-transport", "tcp"]

View File

@@ -0,0 +1,52 @@
use futures::future;
use tarpc::context::Context;
use tarpc::serde_transport as transport;
use tarpc::server::{BaseChannel, Channel};
use tokio::net::{UnixListener, UnixStream};
use tokio_serde::formats::Bincode;
use tokio_util::codec::length_delimited::LengthDelimitedCodec;
#[tarpc::service]
pub trait PingService {
async fn ping();
}
#[derive(Clone)]
struct Service;
impl PingService for Service {
type PingFut = future::Ready<()>;
fn ping(self, _: Context) -> Self::PingFut {
future::ready(())
}
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
let bind_addr = "/tmp/tarpc_on_unix_example.sock";
let _ = std::fs::remove_file(bind_addr);
let listener = UnixListener::bind(bind_addr).unwrap();
let codec_builder = LengthDelimitedCodec::builder();
tokio::spawn(async move {
loop {
let (conn, _addr) = listener.accept().await.unwrap();
let framed = codec_builder.new_framed(conn);
let transport = transport::new(framed, Bincode::default());
let fut = BaseChannel::with_defaults(transport)
.respond_with(Service.serve())
.execute();
tokio::spawn(fut);
}
});
let conn = UnixStream::connect(bind_addr).await?;
let transport = transport::new(codec_builder.new_framed(conn), Bincode::default());
PingServiceClient::new(Default::default(), transport)
.spawn()?
.ping(tarpc::context::current())
.await
}

View File

@@ -3,7 +3,6 @@
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
//! *Disclaimer*: This is not an official Google product.
//!
//! tarpc is an RPC framework for rust with a focus on ease of use. Defining a
@@ -47,7 +46,7 @@
//! Add to your `Cargo.toml` dependencies:
//!
//! ```toml
//! tarpc = "0.23.0"
//! tarpc = "0.24"
//! ```
//!
//! The `tarpc::service` attribute expands to a collection of items that form an rpc service.
@@ -56,12 +55,13 @@
//!
//! ## Example
//!
//! For this example, in addition to tarpc, also add two other dependencies to
//! This example uses [tokio](https://tokio.rs), so add the following dependencies to
//! your `Cargo.toml`:
//!
//! ```toml
//! futures = "0.3"
//! tokio = "0.3"
//! futures = "1.0"
//! tarpc = { version = "0.24", features = ["tokio1"] }
//! tokio = { version = "1.0", features = ["macros"] }
//! ```
//!
//! In the following example, we use an in-process channel for communication between
@@ -165,6 +165,9 @@
//! # future::ready(format!("Hello, {}!", name))
//! # }
//! # }
//! # #[cfg(not(feature = "tokio1"))]
//! # fn main() {}
//! # #[cfg(feature = "tokio1")]
//! #[tokio::main]
//! async fn main() -> io::Result<()> {
//! let (client_transport, server_transport) = tarpc::transport::channel::unbounded();
@@ -203,12 +206,21 @@
pub mod rpc;
pub use rpc::*;
#[cfg(feature = "serde1")]
pub use serde;
#[cfg(feature = "serde-transport")]
pub use tokio_serde;
#[cfg(feature = "serde-transport")]
#[cfg_attr(docsrs, doc(cfg(feature = "serde-transport")))]
pub mod serde_transport;
pub mod trace;
#[cfg(feature = "serde1")]
pub use tarpc_plugins::derive_serde;
/// The main macro that creates RPC services.
///
/// Rpc methods are specified, mirroring trait syntax:

View File

@@ -55,7 +55,7 @@ pub enum ClientMessage<T> {
Cancel {
/// The trace context associates the message with a specific chain of causally-related actions,
/// possibly orchestrated across many distributed systems.
#[cfg_attr(feature = "serde", serde(default))]
#[cfg_attr(feature = "serde1", serde(default))]
trace_context: trace::Context,
/// The ID of the request to cancel.
request_id: u64,

View File

@@ -8,6 +8,7 @@
use crate::context;
use futures::prelude::*;
use std::fmt;
use std::io;
/// Provides a [`Client`] backed by a transport.
@@ -127,7 +128,6 @@ impl Default for Config {
/// A channel and dispatch pair. The dispatch drives the sending and receiving of requests
/// and must be polled continuously or spawned.
#[derive(Debug)]
pub struct NewClient<C, D> {
/// The new client.
pub client: C,
@@ -153,3 +153,9 @@ where
Ok(self.client)
}
}
impl<C, D> fmt::Debug for NewClient<C, D> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "NewClient")
}
}

View File

@@ -36,7 +36,6 @@ pub use self::{
};
/// Manages clients, serving multiplexed requests over each connection.
#[derive(Debug)]
pub struct Server<Req, Resp> {
config: Config,
ghost: PhantomData<(Req, Resp)>,
@@ -48,6 +47,12 @@ impl<Req, Resp> Default for Server<Req, Resp> {
}
}
impl<Req, Resp> fmt::Debug for Server<Req, Resp> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "Server")
}
}
/// Settings that control the behavior of the server.
#[derive(Clone, Debug)]
pub struct Config {
@@ -167,7 +172,6 @@ where
/// BaseChannel lifts a Transport to a Channel by tracking in-flight requests.
#[pin_project]
#[derive(Debug)]
pub struct BaseChannel<Req, Resp, T> {
config: Config,
/// Writes responses to the wire and reads requests off the wire.
@@ -236,6 +240,12 @@ where
}
}
impl<Req, Resp, T> fmt::Debug for BaseChannel<Req, Resp, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "BaseChannel")
}
}
/// The server end of an open connection with a client, streaming in requests from, and sinking
/// responses to, the client.
///
@@ -384,7 +394,6 @@ where
/// A running handler serving all requests coming over a channel.
#[pin_project]
#[derive(Debug)]
pub struct ClientHandler<C, S>
where
C: Channel,
@@ -508,9 +517,17 @@ where
}
}
impl<C, S> fmt::Debug for ClientHandler<C, S>
where
C: Channel,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "ClientHandler")
}
}
/// A future fulfilling a single client request.
#[pin_project]
#[derive(Debug)]
pub struct RequestHandler<F, R> {
#[pin]
resp: Abortable<Resp<F, R>>,
@@ -528,8 +545,13 @@ where
}
}
impl<F, R> fmt::Debug for RequestHandler<F, R> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "RequestHandler")
}
}
#[pin_project]
#[derive(Debug)]
struct Resp<F, R> {
state: RespState,
request_id: u64,
@@ -614,6 +636,12 @@ where
}
}
impl<F, R> fmt::Debug for Resp<F, R> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "Resp")
}
}
impl<C, S> Stream for ClientHandler<C, S>
where
C: Channel,

View File

@@ -78,6 +78,7 @@ impl<Item, SinkItem> Sink<SinkItem> for UnboundedChannel<Item, SinkItem> {
}
#[cfg(test)]
#[cfg(feature = "tokio1")]
mod tests {
use crate::{
client, context,
@@ -89,7 +90,6 @@ mod tests {
use log::trace;
use std::io;
#[cfg(feature = "tokio1")]
#[tokio::test]
async fn integration() -> io::Result<()> {
let _ = env_logger::try_init();

View File

@@ -10,8 +10,8 @@ use std::{
time::{Duration, SystemTime},
};
#[cfg(feature = "serde")]
#[cfg_attr(docsrs, doc(cfg(feature = "serde")))]
#[cfg(feature = "serde1")]
#[cfg_attr(docsrs, doc(cfg(feature = "serde1")))]
pub mod serde;
/// Extension trait for [SystemTimes](SystemTime) in the future, i.e. deadlines.

View File

@@ -14,10 +14,7 @@ use serde::{Deserialize, Serialize};
use std::{error::Error, io, pin::Pin};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_serde::{Framed as SerdeFramed, *};
use tokio_util::codec::{
length_delimited::{self, LengthDelimitedCodec},
Framed,
};
use tokio_util::codec::{length_delimited::LengthDelimitedCodec, Framed};
/// A transport that serializes to, and deserializes from, a byte stream.
#[pin_project]
@@ -130,6 +127,7 @@ pub mod tcp {
futures::ready,
std::{marker::PhantomData, net::SocketAddr},
tokio::net::{TcpListener, TcpStream, ToSocketAddrs},
tokio_util::codec::length_delimited,
};
mod private {

View File

@@ -27,7 +27,7 @@ use std::{
/// Consists of a span identifying an event, an optional parent span identifying a causal event
/// that triggered the current span, and a trace with which all related spans are associated.
#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct Context {
/// An identifier of the trace associated with the current context. A trace ID is typically
/// created at a root span and passed along through all causal events.
@@ -47,12 +47,12 @@ pub struct Context {
/// A 128-bit UUID identifying a trace. All spans caused by the same originating span share the
/// same trace ID.
#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct TraceId(u128);
/// A 64-bit identifier of a span within a trace. The identifier is unique within the span's trace.
#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct SpanId(u64);
impl Context {

View File

@@ -1,4 +1,4 @@
#[tarpc::service]
#[tarpc::service(derive_serde = false)]
trait World {
async fn hello(name: String) -> String;
}

View File

@@ -9,11 +9,3 @@ error: hint: `#[tarpc::server]` only rewrites async fns, and `fn hello` is not a
|
10 | fn hello(name: String) -> String {
| ^^
error[E0433]: failed to resolve: use of undeclared type or module `serde`
--> $DIR/tarpc_server_missing_async.rs:1:1
|
1 | #[tarpc::service]
| ^^^^^^^^^^^^^^^^^ use of undeclared type or module `serde`
|
= note: this error originates in an attribute macro (in Nightly builds, run with -Z macro-backtrace for more info)

View File

@@ -0,0 +1,51 @@
use futures::prelude::*;
use std::io;
use tarpc::serde_transport;
use tarpc::{client, context, server::Handler};
use tokio_serde::formats::Json;
#[tarpc::derive_serde]
#[derive(Debug, PartialEq)]
pub enum TestData {
Black,
White,
}
#[tarpc::service]
pub trait ColorProtocol {
async fn get_opposite_color(color: TestData) -> TestData;
}
#[derive(Clone)]
struct ColorServer;
#[tarpc::server]
impl ColorProtocol for ColorServer {
async fn get_opposite_color(self, _: context::Context, color: TestData) -> TestData {
match color {
TestData::White => TestData::Black,
TestData::Black => TestData::White,
}
}
}
#[tokio::test]
async fn test_call() -> io::Result<()> {
let transport = tarpc::serde_transport::tcp::listen("localhost:56797", Json::default).await?;
let addr = transport.local_addr();
tokio::spawn(
tarpc::Server::default()
.incoming(transport.take(1).filter_map(|r| async { r.ok() }))
.respond_with(ColorServer.serve()),
);
let transport = serde_transport::tcp::connect(addr, Json::default).await?;
let mut client = ColorProtocolClient::new(client::Config::default(), transport).spawn()?;
let color = client
.get_opposite_color(context::current(), TestData::White)
.await?;
assert_eq!(color, TestData::Black);
Ok(())
}

View File

@@ -6,12 +6,11 @@ use futures::{
use std::io;
use tarpc::{
client::{self},
context, serde_transport,
context,
server::{self, BaseChannel, Channel, Handler},
transport::channel,
};
use tokio::join;
use tokio_serde::formats::Json;
#[tarpc_plugins::service]
trait Service {
@@ -58,12 +57,15 @@ async fn sequential() -> io::Result<()> {
Ok(())
}
#[cfg(feature = "serde1")]
#[cfg(all(feature = "serde-transport", feature = "tcp"))]
#[tokio::test]
async fn serde() -> io::Result<()> {
use tarpc::serde_transport;
use tokio_serde::formats::Json;
let _ = env_logger::try_init();
let transport = serde_transport::tcp::listen("localhost:56789", Json::default).await?;
let transport = tarpc::serde_transport::tcp::listen("localhost:56789", Json::default).await?;
let addr = transport.local_addr();
tokio::spawn(
tarpc::Server::default()