mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-01-25 03:50:29 +01:00
Service registry (#204)
# Changes
## Client is now a trait
And `Channel<Req, Resp>` implements `Client<Req, Resp>`. Previously, `Client<Req, Resp>` was a thin wrapper around `Channel<Req, Resp>`.
This was changed to allow for mapping the request and response types. For example, you can take a `channel: Channel<Req, Resp>` and do:
```rust
channel
.with_request(|req: Req2| -> Req { ... })
.map_response(|resp: Resp| -> Resp2 { ... })
```
...which returns a type that implements `Client<Req2, Resp2>`.
### Why would you want to map request and response types?
The main benefit of this is that it enables creating different client types backed by the same channel. For example, you could run multiple clients multiplexing requests over a single `TcpStream`. I have a demo in `tarpc/examples/service_registry.rs` showing how you might do this with a bincode transport. I am considering factoring out the service registry portion of that to an actual library, because it's doing pretty cool stuff. For this PR, though, it'll just be part of the example.
## Client::new is now client::new
This is pretty minor, but necessary because async fns can't currently exist on traits. I changed `Server::new` to match this as well.
## Macro-generated Clients are generic over the backing Client.
This is a natural consequence of the above change. However, it is transparent to the user by keeping `Channel<Req, Resp>` as the default type for the `<C: Client>` type parameter. `new_stub` returns `Client<Channel<Req, Resp>>`, and other clients can be created via the `From` trait.
## example-service/ now has two binaries, one for client and one for server.
This serves as a "realistic" example of how one might set up a service. The other examples all run the client and server in the same binary, which isn't realistic in distributed systems use cases.
## `service!` trait fns take self by value.
Services are already cloned per request, so this just passes on that flexibility to the trait implementers.
# Open Questions
In the service registry example, multiple services are running on a single port, and thus multiple clients are sending requests over a single `TcpStream`. This has implications for throttling: [`max_in_flight_requests_per_connection`](https://github.com/google/tarpc/blob/master/rpc/src/server/mod.rs#L57-L60) will set a maximum for the sum of requests for all clients sharing a single connection. I think this is reasonable behavior, but users may expect this setting to act like `max_in_flight_requests_per_client`.
Fixes #103 #153 #205
This commit is contained in:
@@ -10,7 +10,13 @@ use crate::{
|
||||
ClientMessage, Response, Transport,
|
||||
};
|
||||
use fnv::FnvHashMap;
|
||||
use futures::{channel::mpsc, prelude::*, ready, stream::Fuse, task::{LocalWaker, Poll}};
|
||||
use futures::{
|
||||
channel::mpsc,
|
||||
prelude::*,
|
||||
ready,
|
||||
stream::Fuse,
|
||||
task::{LocalWaker, Poll},
|
||||
};
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use pin_utils::unsafe_pinned;
|
||||
use std::{
|
||||
@@ -205,10 +211,7 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_closed_connections(
|
||||
self: &mut Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
) -> Poll<io::Result<()>> {
|
||||
fn poll_closed_connections(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
match ready!(self.closed_connections_rx().poll_next_unpin(cx)) {
|
||||
Some(addr) => {
|
||||
self.handle_closed_connection(&addr);
|
||||
|
||||
@@ -43,6 +43,12 @@ pub struct Server<Req, Resp> {
|
||||
ghost: PhantomData<(Req, Resp)>,
|
||||
}
|
||||
|
||||
impl<Req, Resp> Default for Server<Req, Resp> {
|
||||
fn default() -> Self {
|
||||
new(Config::default())
|
||||
}
|
||||
}
|
||||
|
||||
/// Settings that control the behavior of the server.
|
||||
#[non_exhaustive]
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -75,15 +81,15 @@ impl Default for Config {
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp> Server<Req, Resp> {
|
||||
/// Returns a new server with configuration specified `config`.
|
||||
pub fn new(config: Config) -> Self {
|
||||
Server {
|
||||
config,
|
||||
ghost: PhantomData,
|
||||
}
|
||||
/// Returns a new server with configuration specified `config`.
|
||||
pub fn new<Req, Resp>(config: Config) -> Server<Req, Resp> {
|
||||
Server {
|
||||
config,
|
||||
ghost: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp> Server<Req, Resp> {
|
||||
/// Returns the config for this server.
|
||||
pub fn config(&self) -> &Config {
|
||||
&self.config
|
||||
@@ -122,7 +128,7 @@ where
|
||||
Req: Send + 'static,
|
||||
Resp: Send + 'static,
|
||||
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send + 'static,
|
||||
F: FnMut(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
{
|
||||
type Output = ();
|
||||
@@ -132,7 +138,8 @@ where
|
||||
match channel {
|
||||
Ok(channel) => {
|
||||
let peer = channel.client_addr;
|
||||
if let Err(e) = crate::spawn(channel.respond_with(self.request_handler().clone()))
|
||||
if let Err(e) =
|
||||
crate::spawn(channel.respond_with(self.request_handler().clone()))
|
||||
{
|
||||
warn!("[{}] Failed to spawn connection handler: {:?}", peer, e);
|
||||
}
|
||||
@@ -158,7 +165,7 @@ where
|
||||
/// Responds to all requests with `request_handler`.
|
||||
fn respond_with<F, Fut>(self, request_handler: F) -> Running<Self, F>
|
||||
where
|
||||
F: FnMut(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
{
|
||||
Running {
|
||||
@@ -222,21 +229,18 @@ where
|
||||
Req: Send,
|
||||
Resp: Send,
|
||||
{
|
||||
pub(crate) fn start_send(self: &mut Pin<&mut Self>, response: Response<Resp>) -> io::Result<()> {
|
||||
pub(crate) fn start_send(
|
||||
self: &mut Pin<&mut Self>,
|
||||
response: Response<Resp>,
|
||||
) -> io::Result<()> {
|
||||
self.transport().start_send(response)
|
||||
}
|
||||
|
||||
pub(crate) fn poll_ready(
|
||||
self: &mut Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
) -> Poll<io::Result<()>> {
|
||||
pub(crate) fn poll_ready(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
self.transport().poll_ready(cx)
|
||||
}
|
||||
|
||||
pub(crate) fn poll_flush(
|
||||
self: &mut Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
) -> Poll<io::Result<()>> {
|
||||
pub(crate) fn poll_flush(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
self.transport().poll_flush(cx)
|
||||
}
|
||||
|
||||
@@ -256,7 +260,7 @@ where
|
||||
/// responses and resolves when the connection is closed.
|
||||
pub fn respond_with<F, Fut>(self, f: F) -> impl Future<Output = ()>
|
||||
where
|
||||
F: FnMut(Context, Req) -> Fut + Send + 'static,
|
||||
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
Req: 'static,
|
||||
Resp: 'static,
|
||||
@@ -271,7 +275,8 @@ where
|
||||
pending_responses: responses,
|
||||
responses_tx,
|
||||
in_flight_requests: FnvHashMap::default(),
|
||||
}.unwrap_or_else(move |e| {
|
||||
}
|
||||
.unwrap_or_else(move |e| {
|
||||
info!("[{}] ClientHandler errored out: {}", peer, e);
|
||||
})
|
||||
}
|
||||
@@ -305,7 +310,7 @@ where
|
||||
Req: Send + 'static,
|
||||
Resp: Send + 'static,
|
||||
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
|
||||
F: FnMut(Context, Req) -> Fut + Send + 'static,
|
||||
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
{
|
||||
/// If at max in-flight requests, check that there's room to immediately write a throttled
|
||||
@@ -462,7 +467,7 @@ where
|
||||
let mut response_tx = self.responses_tx().clone();
|
||||
|
||||
let trace_id = *ctx.trace_id();
|
||||
let response = self.f()(ctx.clone(), request);
|
||||
let response = self.f().clone()(ctx.clone(), request);
|
||||
let response = deadline_compat::Deadline::new(response, Instant::now() + timeout).then(
|
||||
async move |result| {
|
||||
let response = Response {
|
||||
@@ -477,16 +482,15 @@ where
|
||||
},
|
||||
);
|
||||
let (abortable_response, abort_handle) = abortable(response);
|
||||
crate::spawn(abortable_response.map(|_| ()))
|
||||
.map_err(|e| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!(
|
||||
"Could not spawn response task. Is shutdown: {}",
|
||||
e.is_shutdown()
|
||||
),
|
||||
)
|
||||
})?;
|
||||
crate::spawn(abortable_response.map(|_| ())).map_err(|e| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!(
|
||||
"Could not spawn response task. Is shutdown: {}",
|
||||
e.is_shutdown()
|
||||
),
|
||||
)
|
||||
})?;
|
||||
self.in_flight_requests().insert(request_id, abort_handle);
|
||||
Ok(())
|
||||
}
|
||||
@@ -521,7 +525,7 @@ where
|
||||
Req: Send + 'static,
|
||||
Resp: Send + 'static,
|
||||
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
|
||||
F: FnMut(Context, Req) -> Fut + Send + 'static,
|
||||
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
{
|
||||
type Output = io::Result<()>;
|
||||
|
||||
Reference in New Issue
Block a user