Add JSON transport (#219)

This commit is contained in:
Artem Vorotnikov
2019-05-21 04:45:41 +03:00
committed by Tim
parent e6ab69c314
commit 950ad5187c
7 changed files with 605 additions and 0 deletions

View File

@@ -5,6 +5,7 @@ members = [
"rpc",
"trace",
"bincode-transport",
"json-transport",
"tarpc",
"plugins",
]

37
json-transport/Cargo.toml Normal file
View File

@@ -0,0 +1,37 @@
[package]
name = "tarpc-json-transport"
version = "0.1.0"
authors = ["Artem Vorotnikov <artem@vorotnikov.me>"]
edition = '2018'
license = "MIT"
documentation = "https://docs.rs/tarpc-json-transport"
homepage = "https://github.com/google/tarpc"
repository = "https://github.com/google/tarpc"
keywords = ["rpc", "network", "json", "serde", "tarpc"]
categories = ["asynchronous", "network-programming"]
readme = "../README.md"
description = "A JSON-based transport for tarpc services."
[dependencies]
futures-preview = { version = "0.3.0-alpha.16", features = ["compat"] }
futures_legacy = { version = "0.1", package = "futures" }
pin-utils = "0.1.0-alpha.4"
rpc = { package = "tarpc-lib", version = "0.6", path = "../rpc", features = ["serde1"] }
serde = "1.0"
serde_json = "1.0"
tokio = "0.1"
tokio-io = "0.1"
tokio-serde-json = "0.2"
tokio-tcp = "0.1"
[dev-dependencies]
env_logger = "0.6"
humantime = "1.0"
libtest = "0.0.1"
log = "0.4"
rand = "0.6"
tokio = "0.1"
tokio-executor = "0.1"
tokio-reactor = "0.1"
tokio-serde = "0.3"
tokio-timer = "0.2"

View File

@@ -0,0 +1 @@
edition = "2018"

203
json-transport/src/lib.rs Normal file
View File

@@ -0,0 +1,203 @@
// Copyright 2019 Google LLC
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
//! A TCP [`Transport`] that serializes as JSON.
#![feature(arbitrary_self_types, async_await)]
#![deny(missing_docs)]
use futures::{compat::*, prelude::*, ready};
use pin_utils::unsafe_pinned;
use serde::{Deserialize, Serialize};
use std::{
error::Error,
io,
marker::PhantomData,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
use tokio::codec::{length_delimited::LengthDelimitedCodec, Framed};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_serde_json::*;
use tokio_tcp::{TcpListener, TcpStream};
/// A transport that serializes to, and deserializes from, a [`TcpStream`].
pub struct Transport<S: AsyncWrite, Item, SinkItem> {
inner: Compat01As03Sink<
ReadJson<WriteJson<Framed<S, LengthDelimitedCodec>, SinkItem>, Item>,
SinkItem,
>,
}
impl<S: AsyncWrite, Item, SinkItem> Transport<S, Item, SinkItem> {
unsafe_pinned!(
inner:
Compat01As03Sink<
ReadJson<WriteJson<Framed<S, LengthDelimitedCodec>, SinkItem>, Item>,
SinkItem,
>
);
}
impl<S, Item, SinkItem> Stream for Transport<S, Item, SinkItem>
where
S: AsyncWrite + AsyncRead,
Item: for<'a> Deserialize<'a>,
{
type Item = io::Result<Item>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<io::Result<Item>>> {
match self.inner().poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(next))) => Poll::Ready(Some(Ok(next))),
Poll::Ready(Some(Err(e))) => {
Poll::Ready(Some(Err(io::Error::new(io::ErrorKind::Other, e))))
}
}
}
}
impl<S, Item, SinkItem> Sink<SinkItem> for Transport<S, Item, SinkItem>
where
S: AsyncWrite,
SinkItem: Serialize,
{
type SinkError = io::Error;
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
self.inner()
.start_send(item)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
convert(self.inner().poll_ready(cx))
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
convert(self.inner().poll_flush(cx))
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
convert(self.inner().poll_close(cx))
}
}
fn convert<E: Into<Box<Error + Send + Sync>>>(poll: Poll<Result<(), E>>) -> Poll<io::Result<()>> {
match poll {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))),
}
}
impl<Item, SinkItem> rpc::Transport for Transport<TcpStream, Item, SinkItem>
where
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
{
type Item = Item;
type SinkItem = SinkItem;
fn peer_addr(&self) -> io::Result<SocketAddr> {
self.inner
.get_ref()
.get_ref()
.get_ref()
.get_ref()
.peer_addr()
}
fn local_addr(&self) -> io::Result<SocketAddr> {
self.inner
.get_ref()
.get_ref()
.get_ref()
.get_ref()
.local_addr()
}
}
/// Returns a new JSON transport that reads from and writes to `io`.
pub fn new<Item, SinkItem>(io: TcpStream) -> Transport<TcpStream, Item, SinkItem>
where
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
{
Transport::from(io)
}
impl<S: AsyncWrite + AsyncRead, Item: serde::de::DeserializeOwned, SinkItem: Serialize> From<S>
for Transport<S, Item, SinkItem>
{
fn from(inner: S) -> Self {
Transport {
inner: Compat01As03Sink::new(ReadJson::new(WriteJson::new(Framed::new(
inner,
LengthDelimitedCodec::new(),
)))),
}
}
}
/// Connects to `addr`, wrapping the connection in a JSON transport.
pub async fn connect<Item, SinkItem>(
addr: &SocketAddr,
) -> io::Result<Transport<TcpStream, Item, SinkItem>>
where
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
{
Ok(new(TcpStream::connect(addr).compat().await?))
}
/// Listens on `addr`, wrapping accepted connections in JSON transports.
pub fn listen<Item, SinkItem>(addr: &SocketAddr) -> io::Result<Incoming<Item, SinkItem>>
where
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
{
let listener = TcpListener::bind(addr)?;
let local_addr = listener.local_addr()?;
let incoming = listener.incoming().compat();
Ok(Incoming {
incoming,
local_addr,
ghost: PhantomData,
})
}
/// A [`TcpListener`] that wraps connections in JSON transports.
#[derive(Debug)]
pub struct Incoming<Item, SinkItem> {
incoming: Compat01As03<tokio_tcp::Incoming>,
local_addr: SocketAddr,
ghost: PhantomData<(Item, SinkItem)>,
}
impl<Item, SinkItem> Incoming<Item, SinkItem> {
unsafe_pinned!(incoming: Compat01As03<tokio_tcp::Incoming>);
/// Returns the address being listened on.
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
}
impl<Item, SinkItem> Stream for Incoming<Item, SinkItem>
where
Item: for<'a> Deserialize<'a>,
SinkItem: Serialize,
{
type Item = io::Result<Transport<TcpStream, Item, SinkItem>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let next = ready!(self.incoming().poll_next(cx)?);
Poll::Ready(next.map(|conn| Ok(new(conn))))
}
}

View File

@@ -0,0 +1,101 @@
// Copyright 2019 Google LLC
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
//! Tests client/server control flow.
#![feature(test, integer_atomics, async_await)]
use futures::{compat::Executor01CompatExt, prelude::*};
use libtest::stats::Stats;
use rpc::{
client, context,
server::{Handler, Server},
};
use std::{
io,
time::{Duration, Instant},
};
async fn bench() -> io::Result<()> {
let listener = tarpc_json_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = listener.local_addr();
tokio_executor::spawn(
Server::<u32, u32>::default()
.incoming(listener)
.take(1)
.respond_with(|_ctx, request| futures::future::ready(Ok(request)))
.unit_error()
.boxed()
.compat(),
);
let conn = tarpc_json_transport::connect(&addr).await?;
let client = &mut client::new::<u32, u32, _>(client::Config::default(), conn).await?;
let total = 10_000usize;
let mut successful = 0u32;
let mut unsuccessful = 0u32;
let mut durations = vec![];
for _ in 1..=total {
let now = Instant::now();
let response = client.call(context::current(), 0u32).await;
let elapsed = now.elapsed();
match response {
Ok(_) => successful += 1,
Err(_) => unsuccessful += 1,
};
durations.push(elapsed);
}
let durations_nanos = durations
.iter()
.map(|duration| duration.as_secs() as f64 * 1E9 + duration.subsec_nanos() as f64)
.collect::<Vec<_>>();
let (lower, median, upper) = durations_nanos.quartiles();
println!("Of {} runs:", durations_nanos.len());
println!("\tSuccessful: {}", successful);
println!("\tUnsuccessful: {}", unsuccessful);
println!(
"\tMean: {:?}",
Duration::from_nanos(durations_nanos.mean() as u64)
);
println!("\tMedian: {:?}", Duration::from_nanos(median as u64));
println!(
"\tStd Dev: {:?}",
Duration::from_nanos(durations_nanos.std_dev() as u64)
);
println!(
"\tMin: {:?}",
Duration::from_nanos(durations_nanos.min() as u64)
);
println!(
"\tMax: {:?}",
Duration::from_nanos(durations_nanos.max() as u64)
);
println!(
"\tQuartiles: ({:?}, {:?}, {:?})",
Duration::from_nanos(lower as u64),
Duration::from_nanos(median as u64),
Duration::from_nanos(upper as u64)
);
Ok(())
}
#[test]
fn bench_small_packet() -> io::Result<()> {
env_logger::init();
rpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(bench().map_err(|e| panic!(e.to_string())).boxed().compat());
println!("done");
Ok(())
}

View File

@@ -0,0 +1,143 @@
// Copyright 2019 Google LLC
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
//! Tests client/server control flow.
#![feature(async_await)]
use futures::{
compat::{Executor01CompatExt, Future01CompatExt},
prelude::*,
stream::FuturesUnordered,
};
use log::{info, trace};
use rand::distributions::{Distribution, Normal};
use rpc::{client, context, server::Server};
use std::{
io,
time::{Duration, Instant, SystemTime},
};
use tokio::timer::Delay;
pub trait AsDuration {
/// Delay of 0 if self is in the past
fn as_duration(&self) -> Duration;
}
impl AsDuration for SystemTime {
fn as_duration(&self) -> Duration {
self.duration_since(SystemTime::now()).unwrap_or_default()
}
}
async fn run() -> io::Result<()> {
let listener = tarpc_json_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = listener.local_addr();
let server = Server::<String, String>::default()
.incoming(listener)
.take(1)
.for_each(async move |channel| {
let channel = if let Ok(channel) = channel {
channel
} else {
return;
};
let client_addr = *channel.client_addr();
let handler = channel.respond_with(move |ctx, request| {
// Sleep for a time sampled from a normal distribution with:
// - mean: 1/2 the deadline.
// - std dev: 1/2 the deadline.
let deadline: Duration = ctx.deadline.as_duration();
let deadline_millis = deadline.as_secs() * 1000 + deadline.subsec_millis() as u64;
let distribution =
Normal::new(deadline_millis as f64 / 2., deadline_millis as f64 / 2.);
let delay_millis = distribution.sample(&mut rand::thread_rng()).max(0.);
let delay = Duration::from_millis(delay_millis as u64);
trace!(
"[{}/{}] Responding to request in {:?}.",
ctx.trace_id(),
client_addr,
delay,
);
let wait = Delay::new(Instant::now() + delay).compat();
async move {
wait.await.unwrap();
Ok(request)
}
});
tokio_executor::spawn(handler.unit_error().boxed().compat());
});
tokio_executor::spawn(server.unit_error().boxed().compat());
let conn = tarpc_json_transport::connect(&addr).await?;
let client = client::new::<String, String, _>(client::Config::default(), conn).await?;
// Proxy service
let listener = tarpc_json_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = listener.local_addr();
let proxy_server = Server::<String, String>::default()
.incoming(listener)
.take(1)
.for_each(move |channel| {
let client = client.clone();
async move {
let channel = if let Ok(channel) = channel {
channel
} else {
return;
};
let client_addr = *channel.client_addr();
let handler = channel.respond_with(move |ctx, request| {
trace!("[{}/{}] Proxying request.", ctx.trace_id(), client_addr);
let mut client = client.clone();
async move { client.call(ctx, request).await }
});
tokio_executor::spawn(handler.unit_error().boxed().compat());
}
});
tokio_executor::spawn(proxy_server.unit_error().boxed().compat());
let mut config = client::Config::default();
config.max_in_flight_requests = 10;
config.pending_request_buffer = 10;
let client =
client::new::<String, String, _>(config, tarpc_json_transport::connect(&addr).await?)
.await?;
// Make 3 speculative requests, returning only the quickest.
let mut clients: Vec<_> = (1..=3u32).map(|_| client.clone()).collect();
let mut requests = vec![];
for client in &mut clients {
let mut ctx = context::current();
ctx.deadline = SystemTime::now() + Duration::from_millis(200);
let trace_id = *ctx.trace_id();
let response = client.call(ctx, "ping".into());
requests.push(response.map(move |r| (trace_id, r)));
}
let (fastest_response, _) = requests
.into_iter()
.collect::<FuturesUnordered<_>>()
.into_future()
.await;
let (trace_id, resp) = fastest_response.unwrap();
info!("[{}] fastest_response = {:?}", trace_id, resp);
Ok::<_, io::Error>(())
}
#[test]
fn cancel_slower() -> io::Result<()> {
env_logger::init();
rpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(run().boxed().map_err(|e| panic!(e)).compat());
Ok(())
}

View File

@@ -0,0 +1,119 @@
// Copyright 2019 Google LLC
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
//! Tests client/server control flow.
#![feature(async_await)]
use futures::{
compat::{Executor01CompatExt, Future01CompatExt},
prelude::*,
};
use log::{error, info, trace};
use rand::distributions::{Distribution, Normal};
use rpc::{client, context, server::Server};
use std::{
io,
time::{Duration, Instant, SystemTime},
};
use tokio::timer::Delay;
pub trait AsDuration {
/// Delay of 0 if self is in the past
fn as_duration(&self) -> Duration;
}
impl AsDuration for SystemTime {
fn as_duration(&self) -> Duration {
self.duration_since(SystemTime::now()).unwrap_or_default()
}
}
async fn run() -> io::Result<()> {
let listener = tarpc_json_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = listener.local_addr();
let server = Server::<String, String>::default()
.incoming(listener)
.take(1)
.for_each(async move |channel| {
let channel = if let Ok(channel) = channel {
channel
} else {
return;
};
let client_addr = *channel.client_addr();
let handler = channel.respond_with(move |ctx, request| {
// Sleep for a time sampled from a normal distribution with:
// - mean: 1/2 the deadline.
// - std dev: 1/2 the deadline.
let deadline: Duration = ctx.deadline.as_duration();
let deadline_millis = deadline.as_secs() * 1000 + deadline.subsec_millis() as u64;
let distribution =
Normal::new(deadline_millis as f64 / 2., deadline_millis as f64 / 2.);
let delay_millis = distribution.sample(&mut rand::thread_rng()).max(0.);
let delay = Duration::from_millis(delay_millis as u64);
trace!(
"[{}/{}] Responding to request in {:?}.",
ctx.trace_id(),
client_addr,
delay,
);
let sleep = Delay::new(Instant::now() + delay).compat();
async {
sleep.await.unwrap();
Ok(request)
}
});
tokio_executor::spawn(handler.unit_error().boxed().compat());
});
tokio_executor::spawn(server.unit_error().boxed().compat());
let mut config = client::Config::default();
config.max_in_flight_requests = 10;
config.pending_request_buffer = 10;
let conn = tarpc_json_transport::connect(&addr).await?;
let client = client::new::<String, String, _>(config, conn).await?;
let clients = (1..=100u32).map(|_| client.clone()).collect::<Vec<_>>();
for mut client in clients {
let ctx = context::current();
tokio_executor::spawn(
async move {
let trace_id = *ctx.trace_id();
let response = client.call(ctx, "ping".into());
match response.await {
Ok(response) => info!("[{}] response: {}", trace_id, response),
Err(e) => error!("[{}] request error: {:?}: {}", trace_id, e.kind(), e),
}
}
.unit_error()
.boxed()
.compat(),
);
}
Ok(())
}
#[test]
fn ping_pong() -> io::Result<()> {
env_logger::init();
rpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(
run()
.map_ok(|_| println!("done"))
.map_err(|e| panic!(e.to_string()))
.boxed()
.compat(),
);
Ok(())
}