From 950ad5187c1c91c027127f3ee8462da26758dccf Mon Sep 17 00:00:00 2001 From: Artem Vorotnikov Date: Tue, 21 May 2019 04:45:41 +0300 Subject: [PATCH] Add JSON transport (#219) --- Cargo.toml | 1 + json-transport/Cargo.toml | 37 ++++++ json-transport/rustfmt.toml | 1 + json-transport/src/lib.rs | 203 +++++++++++++++++++++++++++++++ json-transport/tests/bench.rs | 101 +++++++++++++++ json-transport/tests/cancel.rs | 143 ++++++++++++++++++++++ json-transport/tests/pushback.rs | 119 ++++++++++++++++++ 7 files changed, 605 insertions(+) create mode 100644 json-transport/Cargo.toml create mode 100644 json-transport/rustfmt.toml create mode 100644 json-transport/src/lib.rs create mode 100644 json-transport/tests/bench.rs create mode 100644 json-transport/tests/cancel.rs create mode 100644 json-transport/tests/pushback.rs diff --git a/Cargo.toml b/Cargo.toml index 3ab4bb2..5848ebf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "rpc", "trace", "bincode-transport", + "json-transport", "tarpc", "plugins", ] diff --git a/json-transport/Cargo.toml b/json-transport/Cargo.toml new file mode 100644 index 0000000..8613f86 --- /dev/null +++ b/json-transport/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "tarpc-json-transport" +version = "0.1.0" +authors = ["Artem Vorotnikov "] +edition = '2018' +license = "MIT" +documentation = "https://docs.rs/tarpc-json-transport" +homepage = "https://github.com/google/tarpc" +repository = "https://github.com/google/tarpc" +keywords = ["rpc", "network", "json", "serde", "tarpc"] +categories = ["asynchronous", "network-programming"] +readme = "../README.md" +description = "A JSON-based transport for tarpc services." + +[dependencies] +futures-preview = { version = "0.3.0-alpha.16", features = ["compat"] } +futures_legacy = { version = "0.1", package = "futures" } +pin-utils = "0.1.0-alpha.4" +rpc = { package = "tarpc-lib", version = "0.6", path = "../rpc", features = ["serde1"] } +serde = "1.0" +serde_json = "1.0" +tokio = "0.1" +tokio-io = "0.1" +tokio-serde-json = "0.2" +tokio-tcp = "0.1" + +[dev-dependencies] +env_logger = "0.6" +humantime = "1.0" +libtest = "0.0.1" +log = "0.4" +rand = "0.6" +tokio = "0.1" +tokio-executor = "0.1" +tokio-reactor = "0.1" +tokio-serde = "0.3" +tokio-timer = "0.2" diff --git a/json-transport/rustfmt.toml b/json-transport/rustfmt.toml new file mode 100644 index 0000000..32a9786 --- /dev/null +++ b/json-transport/rustfmt.toml @@ -0,0 +1 @@ +edition = "2018" diff --git a/json-transport/src/lib.rs b/json-transport/src/lib.rs new file mode 100644 index 0000000..e1c9109 --- /dev/null +++ b/json-transport/src/lib.rs @@ -0,0 +1,203 @@ +// Copyright 2019 Google LLC +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +//! A TCP [`Transport`] that serializes as JSON. + +#![feature(arbitrary_self_types, async_await)] +#![deny(missing_docs)] + +use futures::{compat::*, prelude::*, ready}; +use pin_utils::unsafe_pinned; +use serde::{Deserialize, Serialize}; +use std::{ + error::Error, + io, + marker::PhantomData, + net::SocketAddr, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::codec::{length_delimited::LengthDelimitedCodec, Framed}; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_serde_json::*; +use tokio_tcp::{TcpListener, TcpStream}; + +/// A transport that serializes to, and deserializes from, a [`TcpStream`]. +pub struct Transport { + inner: Compat01As03Sink< + ReadJson, SinkItem>, Item>, + SinkItem, + >, +} + +impl Transport { + unsafe_pinned!( + inner: + Compat01As03Sink< + ReadJson, SinkItem>, Item>, + SinkItem, + > + ); +} + +impl Stream for Transport +where + S: AsyncWrite + AsyncRead, + Item: for<'a> Deserialize<'a>, +{ + type Item = io::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + match self.inner().poll_next(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(Some(Ok(next))) => Poll::Ready(Some(Ok(next))), + Poll::Ready(Some(Err(e))) => { + Poll::Ready(Some(Err(io::Error::new(io::ErrorKind::Other, e)))) + } + } + } +} + +impl Sink for Transport +where + S: AsyncWrite, + SinkItem: Serialize, +{ + type SinkError = io::Error; + + fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> { + self.inner() + .start_send(item) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + } + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + convert(self.inner().poll_ready(cx)) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + convert(self.inner().poll_flush(cx)) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + convert(self.inner().poll_close(cx)) + } +} + +fn convert>>(poll: Poll>) -> Poll> { + match poll { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))), + } +} + +impl rpc::Transport for Transport +where + Item: for<'de> Deserialize<'de>, + SinkItem: Serialize, +{ + type Item = Item; + type SinkItem = SinkItem; + + fn peer_addr(&self) -> io::Result { + self.inner + .get_ref() + .get_ref() + .get_ref() + .get_ref() + .peer_addr() + } + + fn local_addr(&self) -> io::Result { + self.inner + .get_ref() + .get_ref() + .get_ref() + .get_ref() + .local_addr() + } +} + +/// Returns a new JSON transport that reads from and writes to `io`. +pub fn new(io: TcpStream) -> Transport +where + Item: for<'de> Deserialize<'de>, + SinkItem: Serialize, +{ + Transport::from(io) +} + +impl From + for Transport +{ + fn from(inner: S) -> Self { + Transport { + inner: Compat01As03Sink::new(ReadJson::new(WriteJson::new(Framed::new( + inner, + LengthDelimitedCodec::new(), + )))), + } + } +} + +/// Connects to `addr`, wrapping the connection in a JSON transport. +pub async fn connect( + addr: &SocketAddr, +) -> io::Result> +where + Item: for<'de> Deserialize<'de>, + SinkItem: Serialize, +{ + Ok(new(TcpStream::connect(addr).compat().await?)) +} + +/// Listens on `addr`, wrapping accepted connections in JSON transports. +pub fn listen(addr: &SocketAddr) -> io::Result> +where + Item: for<'de> Deserialize<'de>, + SinkItem: Serialize, +{ + let listener = TcpListener::bind(addr)?; + let local_addr = listener.local_addr()?; + let incoming = listener.incoming().compat(); + Ok(Incoming { + incoming, + local_addr, + ghost: PhantomData, + }) +} + +/// A [`TcpListener`] that wraps connections in JSON transports. +#[derive(Debug)] +pub struct Incoming { + incoming: Compat01As03, + local_addr: SocketAddr, + ghost: PhantomData<(Item, SinkItem)>, +} + +impl Incoming { + unsafe_pinned!(incoming: Compat01As03); + + /// Returns the address being listened on. + pub fn local_addr(&self) -> SocketAddr { + self.local_addr + } +} + +impl Stream for Incoming +where + Item: for<'a> Deserialize<'a>, + SinkItem: Serialize, +{ + type Item = io::Result>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let next = ready!(self.incoming().poll_next(cx)?); + Poll::Ready(next.map(|conn| Ok(new(conn)))) + } +} diff --git a/json-transport/tests/bench.rs b/json-transport/tests/bench.rs new file mode 100644 index 0000000..f440985 --- /dev/null +++ b/json-transport/tests/bench.rs @@ -0,0 +1,101 @@ +// Copyright 2019 Google LLC +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +//! Tests client/server control flow. + +#![feature(test, integer_atomics, async_await)] + +use futures::{compat::Executor01CompatExt, prelude::*}; +use libtest::stats::Stats; +use rpc::{ + client, context, + server::{Handler, Server}, +}; +use std::{ + io, + time::{Duration, Instant}, +}; + +async fn bench() -> io::Result<()> { + let listener = tarpc_json_transport::listen(&"0.0.0.0:0".parse().unwrap())?; + let addr = listener.local_addr(); + + tokio_executor::spawn( + Server::::default() + .incoming(listener) + .take(1) + .respond_with(|_ctx, request| futures::future::ready(Ok(request))) + .unit_error() + .boxed() + .compat(), + ); + + let conn = tarpc_json_transport::connect(&addr).await?; + let client = &mut client::new::(client::Config::default(), conn).await?; + + let total = 10_000usize; + let mut successful = 0u32; + let mut unsuccessful = 0u32; + let mut durations = vec![]; + for _ in 1..=total { + let now = Instant::now(); + let response = client.call(context::current(), 0u32).await; + let elapsed = now.elapsed(); + + match response { + Ok(_) => successful += 1, + Err(_) => unsuccessful += 1, + }; + durations.push(elapsed); + } + + let durations_nanos = durations + .iter() + .map(|duration| duration.as_secs() as f64 * 1E9 + duration.subsec_nanos() as f64) + .collect::>(); + + let (lower, median, upper) = durations_nanos.quartiles(); + + println!("Of {} runs:", durations_nanos.len()); + println!("\tSuccessful: {}", successful); + println!("\tUnsuccessful: {}", unsuccessful); + println!( + "\tMean: {:?}", + Duration::from_nanos(durations_nanos.mean() as u64) + ); + println!("\tMedian: {:?}", Duration::from_nanos(median as u64)); + println!( + "\tStd Dev: {:?}", + Duration::from_nanos(durations_nanos.std_dev() as u64) + ); + println!( + "\tMin: {:?}", + Duration::from_nanos(durations_nanos.min() as u64) + ); + println!( + "\tMax: {:?}", + Duration::from_nanos(durations_nanos.max() as u64) + ); + println!( + "\tQuartiles: ({:?}, {:?}, {:?})", + Duration::from_nanos(lower as u64), + Duration::from_nanos(median as u64), + Duration::from_nanos(upper as u64) + ); + + Ok(()) +} + +#[test] +fn bench_small_packet() -> io::Result<()> { + env_logger::init(); + rpc::init(tokio::executor::DefaultExecutor::current().compat()); + + tokio::run(bench().map_err(|e| panic!(e.to_string())).boxed().compat()); + println!("done"); + + Ok(()) +} diff --git a/json-transport/tests/cancel.rs b/json-transport/tests/cancel.rs new file mode 100644 index 0000000..9f62cf7 --- /dev/null +++ b/json-transport/tests/cancel.rs @@ -0,0 +1,143 @@ +// Copyright 2019 Google LLC +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +//! Tests client/server control flow. + +#![feature(async_await)] + +use futures::{ + compat::{Executor01CompatExt, Future01CompatExt}, + prelude::*, + stream::FuturesUnordered, +}; +use log::{info, trace}; +use rand::distributions::{Distribution, Normal}; +use rpc::{client, context, server::Server}; +use std::{ + io, + time::{Duration, Instant, SystemTime}, +}; +use tokio::timer::Delay; + +pub trait AsDuration { + /// Delay of 0 if self is in the past + fn as_duration(&self) -> Duration; +} + +impl AsDuration for SystemTime { + fn as_duration(&self) -> Duration { + self.duration_since(SystemTime::now()).unwrap_or_default() + } +} + +async fn run() -> io::Result<()> { + let listener = tarpc_json_transport::listen(&"0.0.0.0:0".parse().unwrap())?; + let addr = listener.local_addr(); + let server = Server::::default() + .incoming(listener) + .take(1) + .for_each(async move |channel| { + let channel = if let Ok(channel) = channel { + channel + } else { + return; + }; + let client_addr = *channel.client_addr(); + let handler = channel.respond_with(move |ctx, request| { + // Sleep for a time sampled from a normal distribution with: + // - mean: 1/2 the deadline. + // - std dev: 1/2 the deadline. + let deadline: Duration = ctx.deadline.as_duration(); + let deadline_millis = deadline.as_secs() * 1000 + deadline.subsec_millis() as u64; + let distribution = + Normal::new(deadline_millis as f64 / 2., deadline_millis as f64 / 2.); + let delay_millis = distribution.sample(&mut rand::thread_rng()).max(0.); + let delay = Duration::from_millis(delay_millis as u64); + + trace!( + "[{}/{}] Responding to request in {:?}.", + ctx.trace_id(), + client_addr, + delay, + ); + + let wait = Delay::new(Instant::now() + delay).compat(); + async move { + wait.await.unwrap(); + Ok(request) + } + }); + tokio_executor::spawn(handler.unit_error().boxed().compat()); + }); + + tokio_executor::spawn(server.unit_error().boxed().compat()); + + let conn = tarpc_json_transport::connect(&addr).await?; + let client = client::new::(client::Config::default(), conn).await?; + + // Proxy service + let listener = tarpc_json_transport::listen(&"0.0.0.0:0".parse().unwrap())?; + let addr = listener.local_addr(); + let proxy_server = Server::::default() + .incoming(listener) + .take(1) + .for_each(move |channel| { + let client = client.clone(); + async move { + let channel = if let Ok(channel) = channel { + channel + } else { + return; + }; + let client_addr = *channel.client_addr(); + let handler = channel.respond_with(move |ctx, request| { + trace!("[{}/{}] Proxying request.", ctx.trace_id(), client_addr); + let mut client = client.clone(); + async move { client.call(ctx, request).await } + }); + tokio_executor::spawn(handler.unit_error().boxed().compat()); + } + }); + + tokio_executor::spawn(proxy_server.unit_error().boxed().compat()); + + let mut config = client::Config::default(); + config.max_in_flight_requests = 10; + config.pending_request_buffer = 10; + + let client = + client::new::(config, tarpc_json_transport::connect(&addr).await?) + .await?; + + // Make 3 speculative requests, returning only the quickest. + let mut clients: Vec<_> = (1..=3u32).map(|_| client.clone()).collect(); + let mut requests = vec![]; + for client in &mut clients { + let mut ctx = context::current(); + ctx.deadline = SystemTime::now() + Duration::from_millis(200); + let trace_id = *ctx.trace_id(); + let response = client.call(ctx, "ping".into()); + requests.push(response.map(move |r| (trace_id, r))); + } + let (fastest_response, _) = requests + .into_iter() + .collect::>() + .into_future() + .await; + let (trace_id, resp) = fastest_response.unwrap(); + info!("[{}] fastest_response = {:?}", trace_id, resp); + + Ok::<_, io::Error>(()) +} + +#[test] +fn cancel_slower() -> io::Result<()> { + env_logger::init(); + rpc::init(tokio::executor::DefaultExecutor::current().compat()); + + tokio::run(run().boxed().map_err(|e| panic!(e)).compat()); + Ok(()) +} diff --git a/json-transport/tests/pushback.rs b/json-transport/tests/pushback.rs new file mode 100644 index 0000000..81f8f0e --- /dev/null +++ b/json-transport/tests/pushback.rs @@ -0,0 +1,119 @@ +// Copyright 2019 Google LLC +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +//! Tests client/server control flow. + +#![feature(async_await)] + +use futures::{ + compat::{Executor01CompatExt, Future01CompatExt}, + prelude::*, +}; +use log::{error, info, trace}; +use rand::distributions::{Distribution, Normal}; +use rpc::{client, context, server::Server}; +use std::{ + io, + time::{Duration, Instant, SystemTime}, +}; +use tokio::timer::Delay; + +pub trait AsDuration { + /// Delay of 0 if self is in the past + fn as_duration(&self) -> Duration; +} + +impl AsDuration for SystemTime { + fn as_duration(&self) -> Duration { + self.duration_since(SystemTime::now()).unwrap_or_default() + } +} + +async fn run() -> io::Result<()> { + let listener = tarpc_json_transport::listen(&"0.0.0.0:0".parse().unwrap())?; + let addr = listener.local_addr(); + let server = Server::::default() + .incoming(listener) + .take(1) + .for_each(async move |channel| { + let channel = if let Ok(channel) = channel { + channel + } else { + return; + }; + let client_addr = *channel.client_addr(); + let handler = channel.respond_with(move |ctx, request| { + // Sleep for a time sampled from a normal distribution with: + // - mean: 1/2 the deadline. + // - std dev: 1/2 the deadline. + let deadline: Duration = ctx.deadline.as_duration(); + let deadline_millis = deadline.as_secs() * 1000 + deadline.subsec_millis() as u64; + let distribution = + Normal::new(deadline_millis as f64 / 2., deadline_millis as f64 / 2.); + let delay_millis = distribution.sample(&mut rand::thread_rng()).max(0.); + let delay = Duration::from_millis(delay_millis as u64); + + trace!( + "[{}/{}] Responding to request in {:?}.", + ctx.trace_id(), + client_addr, + delay, + ); + + let sleep = Delay::new(Instant::now() + delay).compat(); + async { + sleep.await.unwrap(); + Ok(request) + } + }); + tokio_executor::spawn(handler.unit_error().boxed().compat()); + }); + + tokio_executor::spawn(server.unit_error().boxed().compat()); + + let mut config = client::Config::default(); + config.max_in_flight_requests = 10; + config.pending_request_buffer = 10; + + let conn = tarpc_json_transport::connect(&addr).await?; + let client = client::new::(config, conn).await?; + + let clients = (1..=100u32).map(|_| client.clone()).collect::>(); + for mut client in clients { + let ctx = context::current(); + tokio_executor::spawn( + async move { + let trace_id = *ctx.trace_id(); + let response = client.call(ctx, "ping".into()); + match response.await { + Ok(response) => info!("[{}] response: {}", trace_id, response), + Err(e) => error!("[{}] request error: {:?}: {}", trace_id, e.kind(), e), + } + } + .unit_error() + .boxed() + .compat(), + ); + } + + Ok(()) +} + +#[test] +fn ping_pong() -> io::Result<()> { + env_logger::init(); + rpc::init(tokio::executor::DefaultExecutor::current().compat()); + + tokio::run( + run() + .map_ok(|_| println!("done")) + .map_err(|e| panic!(e.to_string())) + .boxed() + .compat(), + ); + + Ok(()) +}