3 Commits

Author SHA1 Message Date
Tim Kuehn
3bf247ae40 Prepare release of 0.29.0 2022-05-26 16:34:08 -07:00
Tim Kuehn
69442d2368 Serialize RPC deadline as a Duration.
Duration was previously serialized as SystemTime. However, absolute
times run into problems with clock skew: if the remote machine's clock
is too far in the future, the RPC deadline will be exceeded before
request processing can begin. Conversely, if the remote machine's clock
is too far in the past, the RPC deadline will not be enforced.

By converting the absolute deadline to a relative duration, clock skew
is no longer relevant, as the remote machine will convert the deadline
into a time relative to its own clock. This mirrors how the gRPC HTTP2
protocol includes a Timeout in the request headers [0] but the SDK uses
timestamps [1]. Keeping the absolute time in the core APIs maintains all
the benefits of today, namely, natural deadline propagation between RPC
hops when using the current context.

This serialization strategy means that, generally, the remote machine's
deadline will be slightly in the future compared to the local machine.
Depending on network transfer latencies, this could be microseconds to
milliseconds, or worse in the worst case. Because the deadline is not
intended for high-precision scenarios, I don't view this is as
problematic.

Because this change only affects the serialization layer, local
transports that bypass serialization are not affected.

[0] https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
[1] https://grpc.io/blog/deadlines/#setting-a-deadline
2022-05-26 14:14:49 -07:00
Tim Kuehn
e135e39504 Add rpc.deadline tag to Opentelemetry traces. 2022-05-26 13:53:03 -07:00
9 changed files with 70 additions and 15 deletions

View File

@@ -67,7 +67,7 @@ Some other features of tarpc:
Add to your `Cargo.toml` dependencies: Add to your `Cargo.toml` dependencies:
```toml ```toml
tarpc = "0.28" tarpc = "0.29"
``` ```
The `tarpc::service` attribute expands to a collection of items that form an rpc service. The `tarpc::service` attribute expands to a collection of items that form an rpc service.
@@ -82,7 +82,7 @@ your `Cargo.toml`:
```toml ```toml
anyhow = "1.0" anyhow = "1.0"
futures = "0.3" futures = "0.3"
tarpc = { version = "0.28", features = ["tokio1"] } tarpc = { version = "0.29", features = ["tokio1"] }
tokio = { version = "1.0", features = ["macros"] } tokio = { version = "1.0", features = ["macros"] }
``` ```

View File

@@ -1,3 +1,11 @@
## 0.29.0 (2022-05-26)
### Breaking Changes
`Context.deadline` is now serialized as a Duration. This prevents clock skew from affecting deadline
behavior. For more details see https://github.com/google/tarpc/pull/367 and its [related
issue](https://github.com/google/tarpc/issues/366).
## 0.28.0 (2022-04-06) ## 0.28.0 (2022-04-06)
### Breaking Changes ### Breaking Changes
@@ -16,7 +24,7 @@ the server drop its connections more quickly.
### Breaking Changes ### Breaking Changes
### RPC error type is changing #### RPC error type is changing
RPC return types are changing from `Result<Response, io::Error>` to `Result<Response, RPC return types are changing from `Result<Response, io::Error>` to `Result<Response,
tarpc::client::RpcError>`. tarpc::client::RpcError>`.

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "tarpc-example-service" name = "tarpc-example-service"
version = "0.10.0" version = "0.11.0"
rust-version = "1.56" rust-version = "1.56"
authors = ["Tim Kuehn <tikue@google.com>"] authors = ["Tim Kuehn <tikue@google.com>"]
edition = "2021" edition = "2021"
@@ -21,7 +21,7 @@ futures = "0.3"
opentelemetry = { version = "0.16", features = ["rt-tokio"] } opentelemetry = { version = "0.16", features = ["rt-tokio"] }
opentelemetry-jaeger = { version = "0.15", features = ["rt-tokio"] } opentelemetry-jaeger = { version = "0.15", features = ["rt-tokio"] }
rand = "0.8" rand = "0.8"
tarpc = { version = "0.28", path = "../tarpc", features = ["full"] } tarpc = { version = "0.29", path = "../tarpc", features = ["full"] }
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread"] } tokio = { version = "1", features = ["macros", "net", "rt-multi-thread"] }
tracing = { version = "0.1" } tracing = { version = "0.1" }
tracing-opentelemetry = "0.15" tracing-opentelemetry = "0.15"

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "tarpc" name = "tarpc"
version = "0.28.0" version = "0.29.0"
rust-version = "1.58.0" rust-version = "1.58.0"
authors = [ authors = [
"Adam Wright <adam.austin.wright@gmail.com>", "Adam Wright <adam.austin.wright@gmail.com>",

View File

@@ -6,7 +6,6 @@
use crate::{add::Add as AddService, double::Double as DoubleService}; use crate::{add::Add as AddService, double::Double as DoubleService};
use futures::{future, prelude::*}; use futures::{future, prelude::*};
use std::env;
use tarpc::{ use tarpc::{
client, context, client, context,
server::{incoming::Incoming, BaseChannel}, server::{incoming::Incoming, BaseChannel},
@@ -56,9 +55,9 @@ impl DoubleService for DoubleServer {
} }
fn init_tracing(service_name: &str) -> anyhow::Result<()> { fn init_tracing(service_name: &str) -> anyhow::Result<()> {
env::set_var("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", "12");
let tracer = opentelemetry_jaeger::new_pipeline() let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name(service_name) .with_service_name(service_name)
.with_auto_split_batch(true)
.with_max_packet_size(2usize.pow(13)) .with_max_packet_size(2usize.pow(13))
.install_batch(opentelemetry::runtime::Tokio)?; .install_batch(opentelemetry::runtime::Tokio)?;

View File

@@ -114,6 +114,7 @@ impl<Req, Resp> Channel<Req, Resp> {
skip(self, ctx, request_name, request), skip(self, ctx, request_name, request),
fields( fields(
rpc.trace_id = tracing::field::Empty, rpc.trace_id = tracing::field::Empty,
rpc.deadline = %humantime::format_rfc3339(ctx.deadline),
otel.kind = "client", otel.kind = "client",
otel.name = request_name) otel.name = request_name)
)] )]
@@ -519,11 +520,7 @@ where
}, },
}); });
self.start_send(request)?; self.start_send(request)?;
let deadline = ctx.deadline; tracing::info!("SendRequest");
tracing::info!(
tarpc.deadline = %humantime::format_rfc3339(deadline),
"SendRequest"
);
drop(entered); drop(entered);
self.in_flight_requests() self.in_flight_requests()

View File

@@ -28,6 +28,8 @@ pub struct Context {
/// When the client expects the request to be complete by. The server should cancel the request /// When the client expects the request to be complete by. The server should cancel the request
/// if it is not complete by this time. /// if it is not complete by this time.
#[cfg_attr(feature = "serde1", serde(default = "ten_seconds_from_now"))] #[cfg_attr(feature = "serde1", serde(default = "ten_seconds_from_now"))]
// Serialized as a Duration to prevent clock skew issues.
#[cfg_attr(feature = "serde1", serde(with = "absolute_to_relative_time"))]
pub deadline: SystemTime, pub deadline: SystemTime,
/// Uniquely identifies requests originating from the same source. /// Uniquely identifies requests originating from the same source.
/// When a service handles a request by making requests itself, those requests should /// When a service handles a request by making requests itself, those requests should
@@ -36,6 +38,54 @@ pub struct Context {
pub trace_context: trace::Context, pub trace_context: trace::Context,
} }
#[cfg(feature = "serde1")]
mod absolute_to_relative_time {
pub use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub use std::time::{Duration, SystemTime};
pub fn serialize<S>(deadline: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let deadline = deadline
.duration_since(SystemTime::now())
.unwrap_or(Duration::ZERO);
deadline.serialize(serializer)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
where
D: Deserializer<'de>,
{
let deadline = Duration::deserialize(deserializer)?;
Ok(SystemTime::now() + deadline)
}
#[cfg(test)]
#[derive(serde::Serialize, serde::Deserialize)]
struct AbsoluteToRelative(#[serde(with = "self")] SystemTime);
#[test]
fn test_serialize() {
let now = SystemTime::now();
let deadline = now + Duration::from_secs(10);
let serialized_deadline = bincode::serialize(&AbsoluteToRelative(deadline)).unwrap();
let deserialized_deadline: Duration = bincode::deserialize(&serialized_deadline).unwrap();
// TODO: how to avoid flakiness?
assert!(deserialized_deadline > Duration::from_secs(9));
}
#[test]
fn test_deserialize() {
let deadline = Duration::from_secs(10);
let serialized_deadline = bincode::serialize(&deadline).unwrap();
let AbsoluteToRelative(deserialized_deadline) =
bincode::deserialize(&serialized_deadline).unwrap();
// TODO: how to avoid flakiness?
assert!(deserialized_deadline > SystemTime::now() + Duration::from_secs(9));
}
}
assert_impl_all!(Context: Send, Sync); assert_impl_all!(Context: Send, Sync);
fn ten_seconds_from_now() -> SystemTime { fn ten_seconds_from_now() -> SystemTime {

View File

@@ -54,7 +54,7 @@
//! Add to your `Cargo.toml` dependencies: //! Add to your `Cargo.toml` dependencies:
//! //!
//! ```toml //! ```toml
//! tarpc = "0.28" //! tarpc = "0.29"
//! ``` //! ```
//! //!
//! The `tarpc::service` attribute expands to a collection of items that form an rpc service. //! The `tarpc::service` attribute expands to a collection of items that form an rpc service.
@@ -69,7 +69,7 @@
//! ```toml //! ```toml
//! anyhow = "1.0" //! anyhow = "1.0"
//! futures = "0.3" //! futures = "0.3"
//! tarpc = { version = "0.28", features = ["tokio1"] } //! tarpc = { version = "0.29", features = ["tokio1"] }
//! tokio = { version = "1.0", features = ["macros"] } //! tokio = { version = "1.0", features = ["macros"] }
//! ``` //! ```
//! //!

View File

@@ -161,6 +161,7 @@ where
let span = info_span!( let span = info_span!(
"RPC", "RPC",
rpc.trace_id = %request.context.trace_id(), rpc.trace_id = %request.context.trace_id(),
rpc.deadline = %humantime::format_rfc3339(request.context.deadline),
otel.kind = "server", otel.kind = "server",
otel.name = tracing::field::Empty, otel.name = tracing::field::Empty,
); );