From 7b6e98da7b045ce8f522c242ea39f8121ab9316d Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Mon, 15 Jul 2019 22:11:55 -0700 Subject: [PATCH] Replace transport integration tests with unit tests. I want 'cargo test' to run faster. --- bincode-transport/Cargo.toml | 13 +-- bincode-transport/src/lib.rs | 47 ++++++++++ bincode-transport/tests/bench.rs | 104 --------------------- bincode-transport/tests/cancel.rs | 139 ---------------------------- bincode-transport/tests/pushback.rs | 114 ----------------------- json-transport/Cargo.toml | 13 +-- json-transport/src/lib.rs | 38 ++++++++ json-transport/tests/bench.rs | 104 --------------------- json-transport/tests/cancel.rs | 139 ---------------------------- json-transport/tests/pushback.rs | 119 ------------------------ 10 files changed, 89 insertions(+), 741 deletions(-) delete mode 100644 bincode-transport/tests/bench.rs delete mode 100644 bincode-transport/tests/cancel.rs delete mode 100644 bincode-transport/tests/pushback.rs delete mode 100644 json-transport/tests/bench.rs delete mode 100644 json-transport/tests/cancel.rs delete mode 100644 json-transport/tests/pushback.rs diff --git a/bincode-transport/Cargo.toml b/bincode-transport/Cargo.toml index 141152b..aaff2b2 100644 --- a/bincode-transport/Cargo.toml +++ b/bincode-transport/Cargo.toml @@ -23,14 +23,5 @@ async-bincode = "0.4" tokio-tcp = "0.1" [dev-dependencies] -env_logger = "0.6" -humantime = "1.0" -log = "0.4" -rand = "0.7" -rand_distr = "0.2" -rpc = { package = "tarpc-lib", version = "0.6", path = "../rpc", features = ["serde1"] } -tokio = "0.1" -tokio-executor = "0.1" -tokio-reactor = "0.1" -tokio-serde = "0.3" -tokio-timer = "0.2" +futures-test-preview = { version = "0.3.0-alpha.17" } +assert_matches = "1.0" diff --git a/bincode-transport/src/lib.rs b/bincode-transport/src/lib.rs index bc42242..80b8ff1 100644 --- a/bincode-transport/src/lib.rs +++ b/bincode-transport/src/lib.rs @@ -182,3 +182,50 @@ where Poll::Ready(next.map(|conn| Ok(new(conn)))) } } + +#[cfg(test)] +mod tests { + use super::Transport; + use assert_matches::assert_matches; + use futures::{Sink, Stream}; + use futures_test::task::noop_waker_ref; + use pin_utils::pin_mut; + use std::{ + io::Cursor, + task::{Context, Poll}, + }; + + fn ctx() -> Context<'static> { + Context::from_waker(&noop_waker_ref()) + } + + #[test] + fn test_stream() { + // Frame is big endian; bincode is little endian. A bit confusing! + let reader = *b"\x00\x00\x00\x1e\x16\x00\x00\x00\x00\x00\x00\x00Test one, check check."; + let reader: Box<[u8]> = Box::new(reader); + let transport = Transport::<_, String, String>::from(Cursor::new(reader)); + pin_mut!(transport); + + assert_matches!( + transport.poll_next(&mut ctx()), + Poll::Ready(Some(Ok(ref s))) if s == "Test one, check check."); + } + + #[test] + fn test_sink() { + let writer: &mut [u8] = &mut [0; 34]; + let transport = Transport::<_, String, String>::from(Cursor::new(&mut *writer)); + pin_mut!(transport); + + assert_matches!(transport.as_mut().poll_ready(&mut ctx()), Poll::Ready(Ok(()))); + assert_matches!(transport .as_mut() .start_send("Test one, check check.".into()), Ok(())); + assert_matches!(transport.poll_flush(&mut ctx()), Poll::Ready(Ok(()))); + assert_eq!( + writer, + <&[u8]>::from( + b"\x00\x00\x00\x1e\x16\x00\x00\x00\x00\x00\x00\x00Test one, check check." + ) + ); + } +} diff --git a/bincode-transport/tests/bench.rs b/bincode-transport/tests/bench.rs deleted file mode 100644 index c77cffe..0000000 --- a/bincode-transport/tests/bench.rs +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright 2018 Google LLC -// -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -//! Tests client/server control flow. - -#![feature(test, integer_atomics, async_await)] - -extern crate test; - -use futures::{compat::Executor01CompatExt, prelude::*}; -use test::stats::Stats; -use rpc::{ - client, context, - server::{Handler, Server}, -}; -use std::{ - io, - time::{Duration, Instant}, -}; - -async fn bench() -> io::Result<()> { - let listener = tarpc_bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())? - .filter_map(|r| future::ready(r.ok())); - let addr = listener.get_ref().local_addr(); - - tokio_executor::spawn( - Server::::default() - .incoming(listener) - .take(1) - .respond_with(|_ctx, request| futures::future::ready(Ok(request))) - .unit_error() - .boxed() - .compat(), - ); - - let conn = tarpc_bincode_transport::connect(&addr).await?; - let client = &mut client::new::(client::Config::default(), conn).await?; - - let total = 10_000usize; - let mut successful = 0u32; - let mut unsuccessful = 0u32; - let mut durations = vec![]; - for _ in 1..=total { - let now = Instant::now(); - let response = client.call(context::current(), 0u32).await; - let elapsed = now.elapsed(); - - match response { - Ok(_) => successful += 1, - Err(_) => unsuccessful += 1, - }; - durations.push(elapsed); - } - - let durations_nanos = durations - .iter() - .map(|duration| duration.as_secs() as f64 * 1E9 + duration.subsec_nanos() as f64) - .collect::>(); - - let (lower, median, upper) = durations_nanos.quartiles(); - - println!("Of {} runs:", durations_nanos.len()); - println!("\tSuccessful: {}", successful); - println!("\tUnsuccessful: {}", unsuccessful); - println!( - "\tMean: {:?}", - Duration::from_nanos(durations_nanos.mean() as u64) - ); - println!("\tMedian: {:?}", Duration::from_nanos(median as u64)); - println!( - "\tStd Dev: {:?}", - Duration::from_nanos(durations_nanos.std_dev() as u64) - ); - println!( - "\tMin: {:?}", - Duration::from_nanos(durations_nanos.min() as u64) - ); - println!( - "\tMax: {:?}", - Duration::from_nanos(durations_nanos.max() as u64) - ); - println!( - "\tQuartiles: ({:?}, {:?}, {:?})", - Duration::from_nanos(lower as u64), - Duration::from_nanos(median as u64), - Duration::from_nanos(upper as u64) - ); - - Ok(()) -} - -#[test] -fn bench_small_packet() -> io::Result<()> { - env_logger::init(); - rpc::init(tokio::executor::DefaultExecutor::current().compat()); - - tokio::run(bench().map_err(|e| panic!(e.to_string())).boxed().compat()); - println!("done"); - - Ok(()) -} diff --git a/bincode-transport/tests/cancel.rs b/bincode-transport/tests/cancel.rs deleted file mode 100644 index 38087bb..0000000 --- a/bincode-transport/tests/cancel.rs +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2018 Google LLC -// -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -//! Tests client/server control flow. - -#![feature(async_await)] -#![feature(async_closure)] - -use futures::{ - compat::{Executor01CompatExt, Future01CompatExt}, - prelude::*, - stream::FuturesUnordered, -}; -use log::{info, trace}; -use rand_distr::{Distribution, Normal}; -use rpc::{ - client, context, - server::{Channel, Server}, -}; -use std::{ - io, - time::{Duration, Instant, SystemTime}, -}; -use tokio::timer::Delay; - -pub trait AsDuration { - /// Delay of 0 if self is in the past - fn as_duration(&self) -> Duration; -} - -impl AsDuration for SystemTime { - fn as_duration(&self) -> Duration { - self.duration_since(SystemTime::now()).unwrap_or_default() - } -} - -async fn run() -> io::Result<()> { - let listener = tarpc_bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())? - .filter_map(|r| future::ready(r.ok())); - let addr = listener.get_ref().local_addr(); - let server = Server::::default() - .incoming(listener) - .take(1) - .for_each(async move |channel| { - let client_addr = channel.get_ref().peer_addr().unwrap(); - let handler = channel.respond_with(move |ctx, request| { - // Sleep for a time sampled from a normal distribution with: - // - mean: 1/2 the deadline. - // - std dev: 1/2 the deadline. - let deadline: Duration = ctx.deadline.as_duration(); - let deadline_millis = deadline.as_secs() * 1000 + deadline.subsec_millis() as u64; - let distribution = - Normal::new(deadline_millis as f64 / 2., deadline_millis as f64 / 2.).unwrap(); - let delay_millis = distribution.sample(&mut rand::thread_rng()).max(0.); - let delay = Duration::from_millis(delay_millis as u64); - - trace!( - "[{}/{}] Responding to request in {:?}.", - ctx.trace_id(), - client_addr, - delay, - ); - - let wait = Delay::new(Instant::now() + delay).compat(); - async move { - wait.await.unwrap(); - Ok(request) - } - }); - tokio_executor::spawn(handler.unit_error().boxed().compat()); - }); - - tokio_executor::spawn(server.unit_error().boxed().compat()); - - let conn = tarpc_bincode_transport::connect(&addr).await?; - let client = client::new::(client::Config::default(), conn).await?; - - // Proxy service - let listener = tarpc_bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())? - .filter_map(|r| future::ready(r.ok())); - let addr = listener.get_ref().local_addr(); - let proxy_server = Server::::default() - .incoming(listener) - .take(1) - .for_each(move |channel| { - let client = client.clone(); - async move { - let client_addr = channel.get_ref().peer_addr().unwrap(); - let handler = channel.respond_with(move |ctx, request| { - trace!("[{}/{}] Proxying request.", ctx.trace_id(), client_addr); - let mut client = client.clone(); - async move { client.call(ctx, request).await } - }); - tokio_executor::spawn(handler.unit_error().boxed().compat()); - } - }); - - tokio_executor::spawn(proxy_server.unit_error().boxed().compat()); - - let mut config = client::Config::default(); - config.max_in_flight_requests = 10; - config.pending_request_buffer = 10; - - let client = - client::new::(config, tarpc_bincode_transport::connect(&addr).await?) - .await?; - - // Make 3 speculative requests, returning only the quickest. - let mut clients: Vec<_> = (1..=3u32).map(|_| client.clone()).collect(); - let mut requests = vec![]; - for client in &mut clients { - let mut ctx = context::current(); - ctx.deadline = SystemTime::now() + Duration::from_millis(200); - let trace_id = *ctx.trace_id(); - let response = client.call(ctx, "ping".into()); - requests.push(response.map(move |r| (trace_id, r))); - } - let (fastest_response, _) = requests - .into_iter() - .collect::>() - .into_future() - .await; - let (trace_id, resp) = fastest_response.unwrap(); - info!("[{}] fastest_response = {:?}", trace_id, resp); - - Ok::<_, io::Error>(()) -} - -#[test] -fn cancel_slower() -> io::Result<()> { - env_logger::init(); - rpc::init(tokio::executor::DefaultExecutor::current().compat()); - - tokio::run(run().boxed().map_err(|e| panic!(e)).compat()); - Ok(()) -} diff --git a/bincode-transport/tests/pushback.rs b/bincode-transport/tests/pushback.rs deleted file mode 100644 index 8f54547..0000000 --- a/bincode-transport/tests/pushback.rs +++ /dev/null @@ -1,114 +0,0 @@ -// Copyright 2018 Google LLC -// -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -//! Tests client/server control flow. - -#![feature(async_await)] -#![feature(async_closure)] - -use futures::{ - compat::{Executor01CompatExt, Future01CompatExt}, - prelude::*, -}; -use log::{error, info, trace}; -use rand_distr::{Distribution, Normal}; -use rpc::{ - client, context, - server::{Channel, Handler, Server}, -}; -use std::{ - io, - time::{Duration, Instant, SystemTime}, -}; -use tokio::timer::Delay; - -pub trait AsDuration { - /// Delay of 0 if self is in the past - fn as_duration(&self) -> Duration; -} - -impl AsDuration for SystemTime { - fn as_duration(&self) -> Duration { - self.duration_since(SystemTime::now()).unwrap_or_default() - } -} - -async fn run() -> io::Result<()> { - let listener = tarpc_bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())? - .filter_map(|r| future::ready(r.ok())); - let addr = listener.get_ref().local_addr(); - let server = Server::::default() - .incoming(listener) - .take(1) - .max_concurrent_requests_per_channel(19) - .for_each(async move |channel| { - let client_addr = channel.get_ref().get_ref().peer_addr().unwrap(); - let handler = channel.respond_with(move |ctx, request| { - // Sleep for a time sampled from a normal distribution with: - // - mean: 1/2 the deadline. - // - std dev: 1/2 the deadline. - let deadline: Duration = ctx.deadline.as_duration(); - let deadline_millis = deadline.as_secs() * 1000 + deadline.subsec_millis() as u64; - let distribution = - Normal::new(deadline_millis as f64 / 2., deadline_millis as f64 / 2.).unwrap(); - let delay_millis = distribution.sample(&mut rand::thread_rng()).max(0.); - let delay = Duration::from_millis(delay_millis as u64); - - trace!( - "[{}/{}] Responding to request in {:?}.", - ctx.trace_id(), - client_addr, - delay, - ); - - let sleep = Delay::new(Instant::now() + delay).compat(); - async { - sleep.await.unwrap(); - Ok(request) - } - }); - tokio_executor::spawn(handler.unit_error().boxed().compat()); - }); - - tokio_executor::spawn(server.unit_error().boxed().compat()); - - let mut config = client::Config::default(); - config.max_in_flight_requests = 20; - config.pending_request_buffer = 10; - - let conn = tarpc_bincode_transport::connect(&addr).await?; - let client = client::new::(config, conn).await?; - - let clients = (1..=100u32).map(|_| client.clone()).collect::>(); - for mut client in clients { - let ctx = context::current(); - tokio_executor::spawn( - async move { - let trace_id = *ctx.trace_id(); - let response = client.call(ctx, "ping".into()); - match response.await { - Ok(response) => info!("[{}] response: {}", trace_id, response), - Err(e) => error!("[{}] request error: {:?}: {}", trace_id, e.kind(), e), - } - } - .unit_error() - .boxed() - .compat(), - ); - } - - Ok(()) -} - -#[test] -fn pushback() -> io::Result<()> { - env_logger::init(); - rpc::init(tokio::executor::DefaultExecutor::current().compat()); - - tokio::run(run().map_err(|e| panic!(e.to_string())).boxed().compat()); - - Ok(()) -} diff --git a/json-transport/Cargo.toml b/json-transport/Cargo.toml index 212db90..31451cb 100644 --- a/json-transport/Cargo.toml +++ b/json-transport/Cargo.toml @@ -24,14 +24,5 @@ tokio-serde-json = "0.2" tokio-tcp = "0.1" [dev-dependencies] -env_logger = "0.6" -humantime = "1.0" -log = "0.4" -rand = "0.7" -rand_distr = "0.2" -rpc = { package = "tarpc-lib", version = "0.6", path = "../rpc", features = ["serde1"] } -tokio = "0.1" -tokio-executor = "0.1" -tokio-reactor = "0.1" -tokio-serde = "0.3" -tokio-timer = "0.2" +futures-test-preview = { version = "0.3.0-alpha.17" } +assert_matches = "1.0" diff --git a/json-transport/src/lib.rs b/json-transport/src/lib.rs index 54aaf98..c12f059 100644 --- a/json-transport/src/lib.rs +++ b/json-transport/src/lib.rs @@ -198,3 +198,41 @@ where Poll::Ready(next.map(|conn| Ok(new(conn)))) } } + +#[cfg(test)] +mod tests { + use assert_matches::assert_matches; + use futures::{Sink, Stream}; + use futures_test::task::noop_waker_ref; + use pin_utils::pin_mut; + use std::{io::Cursor, task::{Context, Poll}}; + use super::Transport; + + fn ctx() -> Context<'static> { + Context::from_waker(&noop_waker_ref()) + } + + #[test] + fn test_stream() { + let reader = *b"\x00\x00\x00\x18\"Test one, check check.\""; + let reader: Box<[u8]> = Box::new(reader); + let transport = Transport::<_, String, String>::from(Cursor::new(reader)); + pin_mut!(transport); + + assert_matches!( + transport.poll_next(&mut ctx()), + Poll::Ready(Some(Ok(ref s))) if s == "Test one, check check."); + } + + #[test] + fn test_sink() { + let writer: &mut [u8] = &mut [0; 28]; + let transport = Transport::<_, String, String>::from(Cursor::new(&mut *writer)); + pin_mut!(transport); + + assert_matches!(transport.as_mut().poll_ready(&mut ctx()), Poll::Ready(Ok(()))); + assert_matches!(transport.as_mut().start_send("Test one, check check.".into()), Ok(())); + assert_matches!(transport.poll_flush(&mut ctx()), Poll::Ready(Ok(()))); + assert_eq!(writer, b"\x00\x00\x00\x18\"Test one, check check.\""); + } +} diff --git a/json-transport/tests/bench.rs b/json-transport/tests/bench.rs deleted file mode 100644 index 0a27bcf..0000000 --- a/json-transport/tests/bench.rs +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright 2019 Google LLC -// -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -//! Tests client/server control flow. - -#![feature(test, integer_atomics, async_await)] - -extern crate test; - -use futures::{compat::Executor01CompatExt, prelude::*}; -use test::stats::Stats; -use rpc::{ - client, context, - server::{Handler, Server}, -}; -use std::{ - io, - time::{Duration, Instant}, -}; - -async fn bench() -> io::Result<()> { - let listener = tarpc_json_transport::listen(&"0.0.0.0:0".parse().unwrap())? - .filter_map(|r| future::ready(r.ok())); - let addr = listener.get_ref().local_addr(); - - tokio_executor::spawn( - Server::::default() - .incoming(listener) - .take(1) - .respond_with(|_ctx, request| futures::future::ready(Ok(request))) - .unit_error() - .boxed() - .compat(), - ); - - let conn = tarpc_json_transport::connect(&addr).await?; - let client = &mut client::new::(client::Config::default(), conn).await?; - - let total = 10_000usize; - let mut successful = 0u32; - let mut unsuccessful = 0u32; - let mut durations = vec![]; - for _ in 1..=total { - let now = Instant::now(); - let response = client.call(context::current(), 0u32).await; - let elapsed = now.elapsed(); - - match response { - Ok(_) => successful += 1, - Err(_) => unsuccessful += 1, - }; - durations.push(elapsed); - } - - let durations_nanos = durations - .iter() - .map(|duration| duration.as_secs() as f64 * 1E9 + duration.subsec_nanos() as f64) - .collect::>(); - - let (lower, median, upper) = durations_nanos.quartiles(); - - println!("Of {} runs:", durations_nanos.len()); - println!("\tSuccessful: {}", successful); - println!("\tUnsuccessful: {}", unsuccessful); - println!( - "\tMean: {:?}", - Duration::from_nanos(durations_nanos.mean() as u64) - ); - println!("\tMedian: {:?}", Duration::from_nanos(median as u64)); - println!( - "\tStd Dev: {:?}", - Duration::from_nanos(durations_nanos.std_dev() as u64) - ); - println!( - "\tMin: {:?}", - Duration::from_nanos(durations_nanos.min() as u64) - ); - println!( - "\tMax: {:?}", - Duration::from_nanos(durations_nanos.max() as u64) - ); - println!( - "\tQuartiles: ({:?}, {:?}, {:?})", - Duration::from_nanos(lower as u64), - Duration::from_nanos(median as u64), - Duration::from_nanos(upper as u64) - ); - - Ok(()) -} - -#[test] -fn bench_small_packet() -> io::Result<()> { - env_logger::init(); - rpc::init(tokio::executor::DefaultExecutor::current().compat()); - - tokio::run(bench().map_err(|e| panic!(e.to_string())).boxed().compat()); - println!("done"); - - Ok(()) -} diff --git a/json-transport/tests/cancel.rs b/json-transport/tests/cancel.rs deleted file mode 100644 index c05a7c1..0000000 --- a/json-transport/tests/cancel.rs +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2019 Google LLC -// -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -//! Tests client/server control flow. - -#![feature(async_await)] -#![feature(async_closure)] - -use futures::{ - compat::{Executor01CompatExt, Future01CompatExt}, - prelude::*, - stream::FuturesUnordered, -}; -use log::{info, trace}; -use rand_distr::{Distribution, Normal}; -use rpc::{ - client, context, - server::{Channel, Server}, -}; -use std::{ - io, - time::{Duration, Instant, SystemTime}, -}; -use tokio::timer::Delay; - -pub trait AsDuration { - /// Delay of 0 if self is in the past - fn as_duration(&self) -> Duration; -} - -impl AsDuration for SystemTime { - fn as_duration(&self) -> Duration { - self.duration_since(SystemTime::now()).unwrap_or_default() - } -} - -async fn run() -> io::Result<()> { - let listener = tarpc_json_transport::listen(&"0.0.0.0:0".parse().unwrap())? - .filter_map(|r| future::ready(r.ok())); - let addr = listener.get_ref().local_addr(); - let server = Server::::default() - .incoming(listener) - .take(1) - .for_each(async move |channel| { - let client_addr = channel.get_ref().peer_addr().unwrap(); - let handler = channel.respond_with(move |ctx, request| { - // Sleep for a time sampled from a normal distribution with: - // - mean: 1/2 the deadline. - // - std dev: 1/2 the deadline. - let deadline: Duration = ctx.deadline.as_duration(); - let deadline_millis = deadline.as_secs() * 1000 + deadline.subsec_millis() as u64; - let distribution = - Normal::new(deadline_millis as f64 / 2., deadline_millis as f64 / 2.).unwrap(); - let delay_millis = distribution.sample(&mut rand::thread_rng()).max(0.); - let delay = Duration::from_millis(delay_millis as u64); - - trace!( - "[{}/{}] Responding to request in {:?}.", - ctx.trace_id(), - client_addr, - delay, - ); - - let wait = Delay::new(Instant::now() + delay).compat(); - async move { - wait.await.unwrap(); - Ok(request) - } - }); - tokio_executor::spawn(handler.unit_error().boxed().compat()); - }); - - tokio_executor::spawn(server.unit_error().boxed().compat()); - - let conn = tarpc_json_transport::connect(&addr).await?; - let client = client::new::(client::Config::default(), conn).await?; - - // Proxy service - let listener = tarpc_json_transport::listen(&"0.0.0.0:0".parse().unwrap())? - .filter_map(|r| future::ready(r.ok())); - let addr = listener.get_ref().local_addr(); - let proxy_server = Server::::default() - .incoming(listener) - .take(1) - .for_each(move |channel| { - let client = client.clone(); - async move { - let client_addr = channel.get_ref().peer_addr().unwrap(); - let handler = channel.respond_with(move |ctx, request| { - trace!("[{}/{}] Proxying request.", ctx.trace_id(), client_addr); - let mut client = client.clone(); - async move { client.call(ctx, request).await } - }); - tokio_executor::spawn(handler.unit_error().boxed().compat()); - } - }); - - tokio_executor::spawn(proxy_server.unit_error().boxed().compat()); - - let mut config = client::Config::default(); - config.max_in_flight_requests = 10; - config.pending_request_buffer = 10; - - let client = - client::new::(config, tarpc_json_transport::connect(&addr).await?) - .await?; - - // Make 3 speculative requests, returning only the quickest. - let mut clients: Vec<_> = (1..=3u32).map(|_| client.clone()).collect(); - let mut requests = vec![]; - for client in &mut clients { - let mut ctx = context::current(); - ctx.deadline = SystemTime::now() + Duration::from_millis(200); - let trace_id = *ctx.trace_id(); - let response = client.call(ctx, "ping".into()); - requests.push(response.map(move |r| (trace_id, r))); - } - let (fastest_response, _) = requests - .into_iter() - .collect::>() - .into_future() - .await; - let (trace_id, resp) = fastest_response.unwrap(); - info!("[{}] fastest_response = {:?}", trace_id, resp); - - Ok::<_, io::Error>(()) -} - -#[test] -fn cancel_slower() -> io::Result<()> { - env_logger::init(); - rpc::init(tokio::executor::DefaultExecutor::current().compat()); - - tokio::run(run().boxed().map_err(|e| panic!(e)).compat()); - Ok(()) -} diff --git a/json-transport/tests/pushback.rs b/json-transport/tests/pushback.rs deleted file mode 100644 index ab82d72..0000000 --- a/json-transport/tests/pushback.rs +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright 2019 Google LLC -// -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -//! Tests client/server control flow. - -#![feature(async_await)] -#![feature(async_closure)] - -use futures::{ - compat::{Executor01CompatExt, Future01CompatExt}, - prelude::*, -}; -use log::{error, info, trace}; -use rand_distr::{Distribution, Normal}; -use rpc::{ - client, context, - server::{Channel, Server}, -}; -use std::{ - io, - time::{Duration, Instant, SystemTime}, -}; -use tokio::timer::Delay; - -pub trait AsDuration { - /// Delay of 0 if self is in the past - fn as_duration(&self) -> Duration; -} - -impl AsDuration for SystemTime { - fn as_duration(&self) -> Duration { - self.duration_since(SystemTime::now()).unwrap_or_default() - } -} - -async fn run() -> io::Result<()> { - let listener = tarpc_json_transport::listen(&"0.0.0.0:0".parse().unwrap())? - .filter_map(|r| future::ready(r.ok())); - let addr = listener.get_ref().local_addr(); - let server = Server::::default() - .incoming(listener) - .take(1) - .for_each(async move |channel| { - let client_addr = channel.get_ref().peer_addr().unwrap(); - let handler = channel.respond_with(move |ctx, request| { - // Sleep for a time sampled from a normal distribution with: - // - mean: 1/2 the deadline. - // - std dev: 1/2 the deadline. - let deadline: Duration = ctx.deadline.as_duration(); - let deadline_millis = deadline.as_secs() * 1000 + deadline.subsec_millis() as u64; - let distribution = - Normal::new(deadline_millis as f64 / 2., deadline_millis as f64 / 2.).unwrap(); - let delay_millis = distribution.sample(&mut rand::thread_rng()).max(0.); - let delay = Duration::from_millis(delay_millis as u64); - - trace!( - "[{}/{}] Responding to request in {:?}.", - ctx.trace_id(), - client_addr, - delay, - ); - - let sleep = Delay::new(Instant::now() + delay).compat(); - async { - sleep.await.unwrap(); - Ok(request) - } - }); - tokio_executor::spawn(handler.unit_error().boxed().compat()); - }); - - tokio_executor::spawn(server.unit_error().boxed().compat()); - - let mut config = client::Config::default(); - config.max_in_flight_requests = 10; - config.pending_request_buffer = 10; - - let conn = tarpc_json_transport::connect(&addr).await?; - let client = client::new::(config, conn).await?; - - let clients = (1..=100u32).map(|_| client.clone()).collect::>(); - for mut client in clients { - let ctx = context::current(); - tokio_executor::spawn( - async move { - let trace_id = *ctx.trace_id(); - let response = client.call(ctx, "ping".into()); - match response.await { - Ok(response) => info!("[{}] response: {}", trace_id, response), - Err(e) => error!("[{}] request error: {:?}: {}", trace_id, e.kind(), e), - } - } - .unit_error() - .boxed() - .compat(), - ); - } - - Ok(()) -} - -#[test] -fn ping_pong() -> io::Result<()> { - env_logger::init(); - rpc::init(tokio::executor::DefaultExecutor::current().compat()); - - tokio::run( - run() - .map_ok(|_| println!("done")) - .map_err(|e| panic!(e.to_string())) - .boxed() - .compat(), - ); - - Ok(()) -}