mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-01-17 17:03:07 +01:00
# Changes
## Client is now a trait
And `Channel<Req, Resp>` implements `Client<Req, Resp>`. Previously, `Client<Req, Resp>` was a thin wrapper around `Channel<Req, Resp>`.
This was changed to allow for mapping the request and response types. For example, you can take a `channel: Channel<Req, Resp>` and do:
```rust
channel
.with_request(|req: Req2| -> Req { ... })
.map_response(|resp: Resp| -> Resp2 { ... })
```
...which returns a type that implements `Client<Req2, Resp2>`.
### Why would you want to map request and response types?
The main benefit of this is that it enables creating different client types backed by the same channel. For example, you could run multiple clients multiplexing requests over a single `TcpStream`. I have a demo in `tarpc/examples/service_registry.rs` showing how you might do this with a bincode transport. I am considering factoring out the service registry portion of that to an actual library, because it's doing pretty cool stuff. For this PR, though, it'll just be part of the example.
## Client::new is now client::new
This is pretty minor, but necessary because async fns can't currently exist on traits. I changed `Server::new` to match this as well.
## Macro-generated Clients are generic over the backing Client.
This is a natural consequence of the above change. However, it is transparent to the user by keeping `Channel<Req, Resp>` as the default type for the `<C: Client>` type parameter. `new_stub` returns `Client<Channel<Req, Resp>>`, and other clients can be created via the `From` trait.
## example-service/ now has two binaries, one for client and one for server.
This serves as a "realistic" example of how one might set up a service. The other examples all run the client and server in the same binary, which isn't realistic in distributed systems use cases.
## `service!` trait fns take self by value.
Services are already cloned per request, so this just passes on that flexibility to the trait implementers.
# Open Questions
In the service registry example, multiple services are running on a single port, and thus multiple clients are sending requests over a single `TcpStream`. This has implications for throttling: [`max_in_flight_requests_per_connection`](https://github.com/google/tarpc/blob/master/rpc/src/server/mod.rs#L57-L60) will set a maximum for the sum of requests for all clients sharing a single connection. I think this is reasonable behavior, but users may expect this setting to act like `max_in_flight_requests_per_client`.
Fixes #103 #153 #205
120 lines
3.7 KiB
Rust
120 lines
3.7 KiB
Rust
// Copyright 2018 Google LLC
|
|
//
|
|
// Use of this source code is governed by an MIT-style
|
|
// license that can be found in the LICENSE file or at
|
|
// https://opensource.org/licenses/MIT.
|
|
|
|
//! Tests client/server control flow.
|
|
|
|
#![feature(generators, await_macro, async_await, futures_api)]
|
|
|
|
use futures::{
|
|
compat::{Future01CompatExt, TokioDefaultSpawner},
|
|
prelude::*,
|
|
};
|
|
use log::{error, info, trace};
|
|
use rand::distributions::{Distribution, Normal};
|
|
use rpc::{client, context, server::Server};
|
|
use std::{
|
|
io,
|
|
time::{Duration, Instant, SystemTime},
|
|
};
|
|
use tokio::timer::Delay;
|
|
|
|
pub trait AsDuration {
|
|
/// Delay of 0 if self is in the past
|
|
fn as_duration(&self) -> Duration;
|
|
}
|
|
|
|
impl AsDuration for SystemTime {
|
|
fn as_duration(&self) -> Duration {
|
|
self.duration_since(SystemTime::now()).unwrap_or_default()
|
|
}
|
|
}
|
|
|
|
async fn run() -> io::Result<()> {
|
|
let listener = tarpc_bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
|
|
let addr = listener.local_addr();
|
|
let server = Server::<String, String>::default()
|
|
.incoming(listener)
|
|
.take(1)
|
|
.for_each(async move |channel| {
|
|
let channel = if let Ok(channel) = channel {
|
|
channel
|
|
} else {
|
|
return;
|
|
};
|
|
let client_addr = *channel.client_addr();
|
|
let handler = channel.respond_with(move |ctx, request| {
|
|
// Sleep for a time sampled from a normal distribution with:
|
|
// - mean: 1/2 the deadline.
|
|
// - std dev: 1/2 the deadline.
|
|
let deadline: Duration = ctx.deadline.as_duration();
|
|
let deadline_millis = deadline.as_secs() * 1000 + deadline.subsec_millis() as u64;
|
|
let distribution =
|
|
Normal::new(deadline_millis as f64 / 2., deadline_millis as f64 / 2.);
|
|
let delay_millis = distribution.sample(&mut rand::thread_rng()).max(0.);
|
|
let delay = Duration::from_millis(delay_millis as u64);
|
|
|
|
trace!(
|
|
"[{}/{}] Responding to request in {:?}.",
|
|
ctx.trace_id(),
|
|
client_addr,
|
|
delay,
|
|
);
|
|
|
|
let sleep = Delay::new(Instant::now() + delay).compat();
|
|
async {
|
|
await!(sleep).unwrap();
|
|
Ok(request)
|
|
}
|
|
});
|
|
tokio_executor::spawn(handler.unit_error().boxed().compat());
|
|
});
|
|
|
|
tokio_executor::spawn(server.unit_error().boxed().compat());
|
|
|
|
let mut config = client::Config::default();
|
|
config.max_in_flight_requests = 10;
|
|
config.pending_request_buffer = 10;
|
|
|
|
let conn = await!(tarpc_bincode_transport::connect(&addr))?;
|
|
let client = await!(client::new::<String, String, _>(config, conn))?;
|
|
|
|
let clients = (1..=100u32).map(|_| client.clone()).collect::<Vec<_>>();
|
|
for mut client in clients {
|
|
let ctx = context::current();
|
|
tokio_executor::spawn(
|
|
async move {
|
|
let trace_id = *ctx.trace_id();
|
|
let response = client.call(ctx, "ping".into());
|
|
match await!(response) {
|
|
Ok(response) => info!("[{}] response: {}", trace_id, response),
|
|
Err(e) => error!("[{}] request error: {:?}: {}", trace_id, e.kind(), e),
|
|
}
|
|
}
|
|
.unit_error()
|
|
.boxed()
|
|
.compat(),
|
|
);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn ping_pong() -> io::Result<()> {
|
|
env_logger::init();
|
|
rpc::init(TokioDefaultSpawner);
|
|
|
|
tokio::run(
|
|
run()
|
|
.map_ok(|_| println!("done"))
|
|
.map_err(|e| panic!(e.to_string()))
|
|
.boxed()
|
|
.compat(),
|
|
);
|
|
|
|
Ok(())
|
|
}
|