mirror of
https://github.com/OMGeeky/tarpc.git
synced 2025-12-28 23:27:25 +01:00
Add request hooks to the Serve trait.
This allows plugging in horizontal functionality, such as authorization, throttling, or latency recording, that should run before and/or after execution of every request, regardless of the request type. The tracing example is updated to show off both client stubs as well as server hooks. As part of this change, there were some changes to the Serve trait: 1. Serve's output type is now a Result<Response, ServerError>.. Serve previously did not allow returning ServerErrors, which prevented using Serve for horizontal functionality like throttling or auth. Now, Serve's output type is Result<Resp, ServerError>, making Serve a more natural integration point for horizontal capabilities. 2. Serve's generic Request type changed to an associated type. The primary benefit of the generic type is that it allows one type to impl a trait multiple times (for example, u64 impls TryFrom<usize>, TryFrom<u128>, etc.). In the case of Serve impls, while it is theoretically possible to contrive a type that could serve multiple request types, in practice I don't expect that to be needed. Most users will use the Serve impl generated by #[tarpc::service], which only ever serves one type of request.
This commit is contained in:
@@ -548,9 +548,10 @@ impl<'a> ServiceGenerator<'a> {
|
||||
} = self;
|
||||
|
||||
quote! {
|
||||
impl<S> tarpc::server::Serve<#request_ident> for #server_ident<S>
|
||||
impl<S> tarpc::server::Serve for #server_ident<S>
|
||||
where S: #service_ident
|
||||
{
|
||||
type Req = #request_ident;
|
||||
type Resp = #response_ident;
|
||||
type Fut = #response_fut_ident<S>;
|
||||
|
||||
@@ -670,10 +671,10 @@ impl<'a> ServiceGenerator<'a> {
|
||||
|
||||
quote! {
|
||||
impl<S: #service_ident> std::future::Future for #response_fut_ident<S> {
|
||||
type Output = #response_ident;
|
||||
type Output = Result<#response_ident, tarpc::ServerError>;
|
||||
|
||||
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>)
|
||||
-> std::task::Poll<#response_ident>
|
||||
-> std::task::Poll<Result<#response_ident, tarpc::ServerError>>
|
||||
{
|
||||
unsafe {
|
||||
match std::pin::Pin::get_unchecked_mut(self) {
|
||||
@@ -681,7 +682,8 @@ impl<'a> ServiceGenerator<'a> {
|
||||
#response_fut_ident::#camel_case_idents(resp) =>
|
||||
std::pin::Pin::new_unchecked(resp)
|
||||
.poll(cx)
|
||||
.map(#response_ident::#camel_case_idents),
|
||||
.map(#response_ident::#camel_case_idents)
|
||||
.map(Ok),
|
||||
)*
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,13 +4,32 @@
|
||||
// license that can be found in the LICENSE file or at
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
use crate::{add::Add as AddService, double::Double as DoubleService};
|
||||
use futures::{future, prelude::*};
|
||||
use tarpc::{
|
||||
client, context,
|
||||
server::{incoming::Incoming, BaseChannel},
|
||||
tokio_serde::formats::Json,
|
||||
#![feature(type_alias_impl_trait)]
|
||||
|
||||
use crate::{
|
||||
add::{Add as AddService, AddStub},
|
||||
double::Double as DoubleService,
|
||||
};
|
||||
use futures::{future, prelude::*};
|
||||
use std::{
|
||||
io,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
};
|
||||
use tarpc::{
|
||||
client::{
|
||||
self,
|
||||
stub::{load_balance, retry},
|
||||
RpcError,
|
||||
},
|
||||
context, serde_transport,
|
||||
server::{incoming::Incoming, BaseChannel, Serve},
|
||||
tokio_serde::formats::Json,
|
||||
ClientMessage, Response, ServerError, Transport,
|
||||
};
|
||||
use tokio::net::TcpStream;
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
pub mod add {
|
||||
@@ -40,12 +59,16 @@ impl AddService for AddServer {
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct DoubleServer {
|
||||
add_client: add::AddClient,
|
||||
struct DoubleServer<Stub> {
|
||||
add_client: add::AddClient<Stub>,
|
||||
}
|
||||
|
||||
#[tarpc::server]
|
||||
impl DoubleService for DoubleServer {
|
||||
impl<Stub> DoubleService for DoubleServer<Stub>
|
||||
where
|
||||
Stub: AddStub + Clone + Send + Sync + 'static,
|
||||
for<'a> Stub::RespFut<'a>: Send,
|
||||
{
|
||||
async fn double(self, _: context::Context, x: i32) -> Result<i32, String> {
|
||||
self.add_client
|
||||
.add(context::current(), x, x)
|
||||
@@ -70,22 +93,79 @@ fn init_tracing(service_name: &str) -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn listen_on_random_port<Item, SinkItem>() -> anyhow::Result<(
|
||||
impl Stream<Item = serde_transport::Transport<TcpStream, Item, SinkItem, Json<Item, SinkItem>>>,
|
||||
std::net::SocketAddr,
|
||||
)>
|
||||
where
|
||||
Item: for<'de> serde::Deserialize<'de>,
|
||||
SinkItem: serde::Serialize,
|
||||
{
|
||||
let listener = tarpc::serde_transport::tcp::listen("localhost:0", Json::default)
|
||||
.await?
|
||||
.filter_map(|r| future::ready(r.ok()))
|
||||
.take(1);
|
||||
let addr = listener.get_ref().get_ref().local_addr();
|
||||
Ok((listener, addr))
|
||||
}
|
||||
|
||||
fn make_stub<Req, Resp, const N: usize>(
|
||||
backends: [impl Transport<ClientMessage<Arc<Req>>, Response<Resp>> + Send + Sync + 'static; N],
|
||||
) -> retry::Retry<
|
||||
impl Fn(&Result<Resp, RpcError>, u32) -> bool + Clone,
|
||||
load_balance::RoundRobin<client::Channel<Arc<Req>, Resp>>,
|
||||
>
|
||||
where
|
||||
Req: Send + Sync + 'static,
|
||||
Resp: Send + Sync + 'static,
|
||||
{
|
||||
let stub = load_balance::RoundRobin::new(
|
||||
backends
|
||||
.into_iter()
|
||||
.map(|transport| tarpc::client::new(client::Config::default(), transport).spawn())
|
||||
.collect(),
|
||||
);
|
||||
let stub = retry::Retry::new(stub, |resp, attempts| {
|
||||
if let Err(e) = resp {
|
||||
tracing::warn!("Got an error: {e:?}");
|
||||
attempts < 3
|
||||
} else {
|
||||
false
|
||||
}
|
||||
});
|
||||
stub
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
init_tracing("tarpc_tracing_example")?;
|
||||
|
||||
let add_listener = tarpc::serde_transport::tcp::listen("localhost:0", Json::default)
|
||||
.await?
|
||||
.filter_map(|r| future::ready(r.ok()));
|
||||
let addr = add_listener.get_ref().local_addr();
|
||||
let add_server = add_listener
|
||||
let (add_listener1, addr1) = listen_on_random_port().await?;
|
||||
let (add_listener2, addr2) = listen_on_random_port().await?;
|
||||
let something_bad_happened = Arc::new(AtomicBool::new(false));
|
||||
let server = AddServer.serve().before(move |_: &mut _, _: &_| {
|
||||
let something_bad_happened = something_bad_happened.clone();
|
||||
async move {
|
||||
if something_bad_happened.fetch_xor(true, Ordering::Relaxed) {
|
||||
Err(ServerError::new(
|
||||
io::ErrorKind::NotFound,
|
||||
"Gamma Ray!".into(),
|
||||
))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
});
|
||||
let add_server = add_listener1
|
||||
.chain(add_listener2)
|
||||
.map(BaseChannel::with_defaults)
|
||||
.take(1)
|
||||
.execute(AddServer.serve());
|
||||
.execute(server);
|
||||
tokio::spawn(add_server);
|
||||
|
||||
let to_add_server = tarpc::serde_transport::tcp::connect(addr, Json::default).await?;
|
||||
let add_client = add::AddClient::new(client::Config::default(), to_add_server).spawn();
|
||||
let add_client = add::AddClient::from(make_stub([
|
||||
tarpc::serde_transport::tcp::connect(addr1, Json::default).await?,
|
||||
tarpc::serde_transport::tcp::connect(addr2, Json::default).await?,
|
||||
]));
|
||||
|
||||
let double_listener = tarpc::serde_transport::tcp::listen("localhost:0", Json::default)
|
||||
.await?
|
||||
|
||||
@@ -21,10 +21,11 @@ use futures::{
|
||||
};
|
||||
use in_flight_requests::{AlreadyExistsError, InFlightRequests};
|
||||
use pin_project::pin_project;
|
||||
use std::{convert::TryFrom, error::Error, fmt, marker::PhantomData, mem, pin::Pin, sync::Arc};
|
||||
use std::{convert::TryFrom, error::Error, fmt, marker::PhantomData, pin::Pin, sync::Arc};
|
||||
use tracing::{info_span, instrument::Instrument, Span};
|
||||
|
||||
mod in_flight_requests;
|
||||
pub mod request_hook;
|
||||
#[cfg(test)]
|
||||
mod testing;
|
||||
|
||||
@@ -39,6 +40,10 @@ pub mod incoming;
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "tokio1")))]
|
||||
pub mod tokio;
|
||||
|
||||
use request_hook::{
|
||||
AfterRequest, AfterRequestHook, BeforeAndAfterRequestHook, BeforeRequest, BeforeRequestHook,
|
||||
};
|
||||
|
||||
/// Settings that control the behavior of [channels](Channel).
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Config {
|
||||
@@ -67,32 +72,212 @@ impl Config {
|
||||
}
|
||||
|
||||
/// Equivalent to a `FnOnce(Req) -> impl Future<Output = Resp>`.
|
||||
pub trait Serve<Req> {
|
||||
pub trait Serve {
|
||||
/// Type of request.
|
||||
type Req;
|
||||
|
||||
/// Type of response.
|
||||
type Resp;
|
||||
|
||||
/// Type of response future.
|
||||
type Fut: Future<Output = Self::Resp>;
|
||||
type Fut: Future<Output = Result<Self::Resp, ServerError>>;
|
||||
|
||||
/// Responds to a single request.
|
||||
fn serve(self, ctx: context::Context, req: Self::Req) -> Self::Fut;
|
||||
|
||||
/// Extracts a method name from the request.
|
||||
fn method(&self, _request: &Req) -> Option<&'static str> {
|
||||
fn method(&self, _request: &Self::Req) -> Option<&'static str> {
|
||||
None
|
||||
}
|
||||
|
||||
/// Responds to a single request.
|
||||
fn serve(self, ctx: context::Context, req: Req) -> Self::Fut;
|
||||
/// Runs a hook before execution of the request.
|
||||
///
|
||||
/// If the hook returns an error, the request will not be executed and the error will be
|
||||
/// returned instead.
|
||||
///
|
||||
/// The hook can also modify the request context. This could be used, for example, to enforce a
|
||||
/// maximum deadline on all requests.
|
||||
///
|
||||
/// Any type that implements [`BeforeRequest`] can be used as the hook. Types that implement
|
||||
/// `FnMut(&mut Context, &RequestType) -> impl Future<Output = Result<(), ServerError>>` can
|
||||
/// also be used.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use futures::{executor::block_on, future};
|
||||
/// use tarpc::{context, ServerError, server::{Serve, serve}};
|
||||
/// use std::io;
|
||||
///
|
||||
/// let serve = serve(|_ctx, i| async move { Ok(i + 1) })
|
||||
/// .before(|_ctx: &mut context::Context, req: &i32| {
|
||||
/// future::ready(
|
||||
/// if *req == 1 {
|
||||
/// Err(ServerError::new(
|
||||
/// io::ErrorKind::Other,
|
||||
/// format!("I don't like {req}")))
|
||||
/// } else {
|
||||
/// Ok(())
|
||||
/// })
|
||||
/// });
|
||||
/// let response = serve.serve(context::current(), 1);
|
||||
/// assert!(block_on(response).is_err());
|
||||
/// ```
|
||||
fn before<Hook>(self, hook: Hook) -> BeforeRequestHook<Self, Hook>
|
||||
where
|
||||
Hook: BeforeRequest<Self::Req>,
|
||||
Self: Sized,
|
||||
{
|
||||
BeforeRequestHook::new(self, hook)
|
||||
}
|
||||
|
||||
/// Runs a hook after completion of a request.
|
||||
///
|
||||
/// The hook can modify the request context and the response.
|
||||
///
|
||||
/// Any type that implements [`AfterRequest`] can be used as the hook. Types that implement
|
||||
/// `FnMut(&mut Context, &mut Result<ResponseType, ServerError>) -> impl Future<Output = ()>`
|
||||
/// can also be used.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use futures::{executor::block_on, future};
|
||||
/// use tarpc::{context, ServerError, server::{Serve, serve}};
|
||||
/// use std::io;
|
||||
///
|
||||
/// let serve = serve(
|
||||
/// |_ctx, i| async move {
|
||||
/// if i == 1 {
|
||||
/// Err(ServerError::new(
|
||||
/// io::ErrorKind::Other,
|
||||
/// format!("{i} is the loneliest number")))
|
||||
/// } else {
|
||||
/// Ok(i + 1)
|
||||
/// }
|
||||
/// })
|
||||
/// .after(|_ctx: &mut context::Context, resp: &mut Result<i32, ServerError>| {
|
||||
/// if let Err(e) = resp {
|
||||
/// eprintln!("server error: {e:?}");
|
||||
/// }
|
||||
/// future::ready(())
|
||||
/// });
|
||||
///
|
||||
/// let response = serve.serve(context::current(), 1);
|
||||
/// assert!(block_on(response).is_err());
|
||||
/// ```
|
||||
fn after<Hook>(self, hook: Hook) -> AfterRequestHook<Self, Hook>
|
||||
where
|
||||
Hook: AfterRequest<Self::Resp>,
|
||||
Self: Sized,
|
||||
{
|
||||
AfterRequestHook::new(self, hook)
|
||||
}
|
||||
|
||||
/// Runs a hook before and after execution of the request.
|
||||
///
|
||||
/// If the hook returns an error, the request will not be executed and the error will be
|
||||
/// returned instead.
|
||||
///
|
||||
/// The hook can also modify the request context and the response. This could be used, for
|
||||
/// example, to enforce a maximum deadline on all requests.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use futures::{executor::block_on, future};
|
||||
/// use tarpc::{
|
||||
/// context, ServerError, server::{Serve, serve, request_hook::{BeforeRequest, AfterRequest}}
|
||||
/// };
|
||||
/// use std::{io, time::Instant};
|
||||
///
|
||||
/// struct PrintLatency(Instant);
|
||||
///
|
||||
/// impl<Req> BeforeRequest<Req> for PrintLatency {
|
||||
/// type Fut<'a> = future::Ready<Result<(), ServerError>> where Self: 'a, Req: 'a;
|
||||
///
|
||||
/// fn before<'a>(&'a mut self, _: &'a mut context::Context, _: &'a Req) -> Self::Fut<'a> {
|
||||
/// self.0 = Instant::now();
|
||||
/// future::ready(Ok(()))
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// impl<Resp> AfterRequest<Resp> for PrintLatency {
|
||||
/// type Fut<'a> = future::Ready<()> where Self:'a, Resp:'a;
|
||||
///
|
||||
/// fn after<'a>(
|
||||
/// &'a mut self,
|
||||
/// _: &'a mut context::Context,
|
||||
/// _: &'a mut Result<Resp, ServerError>,
|
||||
/// ) -> Self::Fut<'a> {
|
||||
/// tracing::info!("Elapsed: {:?}", self.0.elapsed());
|
||||
/// future::ready(())
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// let serve = serve(|_ctx, i| async move {
|
||||
/// Ok(i + 1)
|
||||
/// }).before_and_after(PrintLatency(Instant::now()));
|
||||
/// let response = serve.serve(context::current(), 1);
|
||||
/// assert!(block_on(response).is_ok());
|
||||
/// ```
|
||||
fn before_and_after<Hook>(
|
||||
self,
|
||||
hook: Hook,
|
||||
) -> BeforeAndAfterRequestHook<Self::Req, Self::Resp, Self, Hook>
|
||||
where
|
||||
Hook: BeforeRequest<Self::Req> + AfterRequest<Self::Resp>,
|
||||
Self: Sized,
|
||||
{
|
||||
BeforeAndAfterRequestHook::new(self, hook)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp, Fut, F> Serve<Req> for F
|
||||
/// A Serve wrapper around a Fn.
|
||||
#[derive(Debug)]
|
||||
pub struct ServeFn<Req, Resp, F> {
|
||||
f: F,
|
||||
data: PhantomData<fn(Req) -> Resp>,
|
||||
}
|
||||
|
||||
impl<Req, Resp, F> Clone for ServeFn<Req, Resp, F>
|
||||
where
|
||||
F: Clone,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
f: self.f.clone(),
|
||||
data: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp, F> Copy for ServeFn<Req, Resp, F> where F: Copy {}
|
||||
|
||||
/// Creates a [`Serve`] wrapper around a `FnOnce(context::Context, Req) -> impl Future<Output =
|
||||
/// Result<Resp, ServerError>>`.
|
||||
pub fn serve<Req, Resp, Fut, F>(f: F) -> ServeFn<Req, Resp, F>
|
||||
where
|
||||
F: FnOnce(context::Context, Req) -> Fut,
|
||||
Fut: Future<Output = Resp>,
|
||||
Fut: Future<Output = Result<Resp, ServerError>>,
|
||||
{
|
||||
ServeFn {
|
||||
f,
|
||||
data: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp, Fut, F> Serve for ServeFn<Req, Resp, F>
|
||||
where
|
||||
F: FnOnce(context::Context, Req) -> Fut,
|
||||
Fut: Future<Output = Result<Resp, ServerError>>,
|
||||
{
|
||||
type Req = Req;
|
||||
type Resp = Resp;
|
||||
type Fut = Fut;
|
||||
|
||||
fn serve(self, ctx: context::Context, req: Req) -> Self::Fut {
|
||||
self(ctx, req)
|
||||
(self.f)(ctx, req)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -120,7 +305,7 @@ pub struct BaseChannel<Req, Resp, T> {
|
||||
/// Holds data necessary to clean up in-flight requests.
|
||||
in_flight_requests: InFlightRequests,
|
||||
/// Types the request and response.
|
||||
ghost: PhantomData<(Req, Resp)>,
|
||||
ghost: PhantomData<(fn() -> Req, fn(Resp))>,
|
||||
}
|
||||
|
||||
impl<Req, Resp, T> BaseChannel<Req, Resp, T>
|
||||
@@ -307,6 +492,34 @@ where
|
||||
/// This is a terminal operation. After calling `requests`, the channel cannot be retrieved,
|
||||
/// and the only way to complete requests is via [`Requests::execute`] or
|
||||
/// [`InFlightRequest::execute`].
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use tarpc::{
|
||||
/// context,
|
||||
/// client::{self, NewClient},
|
||||
/// server::{self, BaseChannel, Channel, serve},
|
||||
/// transport,
|
||||
/// };
|
||||
/// use futures::prelude::*;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() {
|
||||
/// let (tx, rx) = transport::channel::unbounded();
|
||||
/// let server = BaseChannel::new(server::Config::default(), rx);
|
||||
/// let NewClient { client, dispatch } = client::new(client::Config::default(), tx);
|
||||
/// tokio::spawn(dispatch);
|
||||
///
|
||||
/// let mut requests = server.requests();
|
||||
/// tokio::spawn(async move {
|
||||
/// while let Some(Ok(request)) = requests.next().await {
|
||||
/// tokio::spawn(request.execute(serve(|_, i| async move { Ok(i + 1) })));
|
||||
/// }
|
||||
/// });
|
||||
/// assert_eq!(client.call(context::current(), "AddOne", 1).await.unwrap(), 2);
|
||||
/// }
|
||||
/// ```
|
||||
fn requests(self) -> Requests<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
@@ -323,12 +536,28 @@ where
|
||||
/// Runs the channel until completion by executing all requests using the given service
|
||||
/// function. Request handlers are run concurrently by [spawning](::tokio::spawn) on tokio's
|
||||
/// default executor.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use tarpc::{context, client, server::{self, BaseChannel, Channel, serve}, transport};
|
||||
/// use futures::prelude::*;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() {
|
||||
/// let (tx, rx) = transport::channel::unbounded();
|
||||
/// let client = client::new(client::Config::default(), tx).spawn();
|
||||
/// let channel = BaseChannel::new(server::Config::default(), rx);
|
||||
/// tokio::spawn(channel.execute(serve(|_, i| async move { Ok(i + 1) })));
|
||||
/// assert_eq!(client.call(context::current(), "AddOne", 1).await.unwrap(), 2);
|
||||
/// }
|
||||
/// ```
|
||||
#[cfg(feature = "tokio1")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "tokio1")))]
|
||||
fn execute<S>(self, serve: S) -> self::tokio::TokioChannelExecutor<Requests<Self>, S>
|
||||
where
|
||||
Self: Sized,
|
||||
S: Serve<Self::Req, Resp = Self::Resp> + Send + 'static,
|
||||
S: Serve<Req = Self::Req, Resp = Self::Resp> + Send + 'static,
|
||||
S::Fut: Send,
|
||||
Self::Req: Send + 'static,
|
||||
Self::Resp: Send + 'static,
|
||||
@@ -690,29 +919,6 @@ impl<Req, Res> InFlightRequest<Req, Res> {
|
||||
&self.request
|
||||
}
|
||||
|
||||
/// Respond without executing a service function. Useful for early aborts (e.g. for throttling).
|
||||
pub async fn respond(self, response: Result<Res, ServerError>) {
|
||||
let Self {
|
||||
response_tx,
|
||||
response_guard,
|
||||
request: Request { id: request_id, .. },
|
||||
span,
|
||||
..
|
||||
} = self;
|
||||
let _entered = span.enter();
|
||||
tracing::info!("CompleteRequest");
|
||||
let response = Response {
|
||||
request_id,
|
||||
message: response,
|
||||
};
|
||||
let _ = response_tx.send(response).await;
|
||||
tracing::info!("BufferResponse");
|
||||
// Request processing has completed, meaning either the channel canceled the request or
|
||||
// a request was sent back to the channel. Either way, the channel will clean up the
|
||||
// request data, so the request does not need to be canceled.
|
||||
mem::forget(response_guard);
|
||||
}
|
||||
|
||||
/// Returns a [future](Future) that executes the request using the given [service
|
||||
/// function](Serve). The service function's output is automatically sent back to the [Channel]
|
||||
/// that yielded this request. The request will be executed in the scope of this request's
|
||||
@@ -727,9 +933,39 @@ impl<Req, Res> InFlightRequest<Req, Res> {
|
||||
///
|
||||
/// If the returned Future is dropped before completion, a cancellation message will be sent to
|
||||
/// the Channel to clean up associated request state.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use tarpc::{
|
||||
/// context,
|
||||
/// client::{self, NewClient},
|
||||
/// server::{self, BaseChannel, Channel, serve},
|
||||
/// transport,
|
||||
/// };
|
||||
/// use futures::prelude::*;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() {
|
||||
/// let (tx, rx) = transport::channel::unbounded();
|
||||
/// let server = BaseChannel::new(server::Config::default(), rx);
|
||||
/// let NewClient { client, dispatch } = client::new(client::Config::default(), tx);
|
||||
/// tokio::spawn(dispatch);
|
||||
///
|
||||
/// tokio::spawn(async move {
|
||||
/// let mut requests = server.requests();
|
||||
/// while let Some(Ok(in_flight_request)) = requests.next().await {
|
||||
/// in_flight_request.execute(serve(|_, i| async move { Ok(i + 1) })).await;
|
||||
/// }
|
||||
///
|
||||
/// });
|
||||
/// assert_eq!(client.call(context::current(), "AddOne", 1).await.unwrap(), 2);
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
pub async fn execute<S>(self, serve: S)
|
||||
where
|
||||
S: Serve<Req, Resp = Res>,
|
||||
S: Serve<Req = Req, Resp = Res>,
|
||||
{
|
||||
let Self {
|
||||
response_tx,
|
||||
@@ -747,11 +983,11 @@ impl<Req, Res> InFlightRequest<Req, Res> {
|
||||
span.record("otel.name", method.unwrap_or(""));
|
||||
let _ = Abortable::new(
|
||||
async move {
|
||||
let response = serve.serve(context, message).await;
|
||||
let message = serve.serve(context, message).await;
|
||||
tracing::info!("CompleteRequest");
|
||||
let response = Response {
|
||||
request_id,
|
||||
message: Ok(response),
|
||||
message,
|
||||
};
|
||||
let _ = response_tx.send(response).await;
|
||||
tracing::info!("BufferResponse");
|
||||
@@ -795,11 +1031,14 @@ where
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{in_flight_requests::AlreadyExistsError, BaseChannel, Channel, Config, Requests};
|
||||
use super::{
|
||||
in_flight_requests::AlreadyExistsError, serve, AfterRequest, BaseChannel, BeforeRequest,
|
||||
Channel, Config, Requests, Serve,
|
||||
};
|
||||
use crate::{
|
||||
context, trace,
|
||||
transport::channel::{self, UnboundedChannel},
|
||||
ClientMessage, Request, Response,
|
||||
ClientMessage, Request, Response, ServerError,
|
||||
};
|
||||
use assert_matches::assert_matches;
|
||||
use futures::{
|
||||
@@ -808,7 +1047,12 @@ mod tests {
|
||||
Future,
|
||||
};
|
||||
use futures_test::task::noop_context;
|
||||
use std::{pin::Pin, task::Poll};
|
||||
use std::{
|
||||
io,
|
||||
pin::Pin,
|
||||
task::Poll,
|
||||
time::{Duration, Instant, SystemTime},
|
||||
};
|
||||
|
||||
fn test_channel<Req, Resp>() -> (
|
||||
Pin<Box<BaseChannel<Req, Resp, UnboundedChannel<ClientMessage<Req>, Response<Resp>>>>>,
|
||||
@@ -869,6 +1113,101 @@ mod tests {
|
||||
Abortable::new(pending(), abort_registration)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_serve() {
|
||||
let serve = serve(|_, i| async move { Ok(i) });
|
||||
assert_matches!(serve.serve(context::current(), 7).await, Ok(7));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn serve_before_mutates_context() -> anyhow::Result<()> {
|
||||
struct SetDeadline(SystemTime);
|
||||
type SetDeadlineFut<'a, Req: 'a> = impl Future<Output = Result<(), ServerError>> + 'a;
|
||||
impl<Req> BeforeRequest<Req> for SetDeadline {
|
||||
type Fut<'a> = SetDeadlineFut<'a, Req> where Self: 'a, Req: 'a;
|
||||
fn before<'a>(
|
||||
&'a mut self,
|
||||
ctx: &'a mut context::Context,
|
||||
_: &'a Req,
|
||||
) -> Self::Fut<'a> {
|
||||
async move {
|
||||
ctx.deadline = self.0;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let some_time = SystemTime::UNIX_EPOCH + Duration::from_secs(37);
|
||||
let some_other_time = SystemTime::UNIX_EPOCH + Duration::from_secs(83);
|
||||
|
||||
let serve = serve(move |ctx: context::Context, i| async move {
|
||||
assert_eq!(ctx.deadline, some_time);
|
||||
Ok(i)
|
||||
});
|
||||
let deadline_hook = serve.before(SetDeadline(some_time));
|
||||
let mut ctx = context::current();
|
||||
ctx.deadline = some_other_time;
|
||||
deadline_hook.serve(ctx, 7).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn serve_before_and_after() -> anyhow::Result<()> {
|
||||
let _ = tracing_subscriber::fmt::try_init();
|
||||
|
||||
struct PrintLatency {
|
||||
start: Instant,
|
||||
}
|
||||
impl PrintLatency {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
start: Instant::now(),
|
||||
}
|
||||
}
|
||||
}
|
||||
type StartFut<'a, Req: 'a> = impl Future<Output = Result<(), ServerError>> + 'a;
|
||||
type EndFut<'a, Resp: 'a> = impl Future<Output = ()> + 'a;
|
||||
impl<Req> BeforeRequest<Req> for PrintLatency {
|
||||
type Fut<'a> = StartFut<'a, Req> where Self: 'a, Req: 'a;
|
||||
fn before<'a>(&'a mut self, _: &'a mut context::Context, _: &'a Req) -> Self::Fut<'a> {
|
||||
async move {
|
||||
self.start = Instant::now();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
impl<Resp> AfterRequest<Resp> for PrintLatency {
|
||||
type Fut<'a> = EndFut<'a, Resp> where Self: 'a, Resp: 'a;
|
||||
fn after<'a>(
|
||||
&'a mut self,
|
||||
_: &'a mut context::Context,
|
||||
_: &'a mut Result<Resp, ServerError>,
|
||||
) -> Self::Fut<'a> {
|
||||
async move {
|
||||
tracing::info!("Elapsed: {:?}", self.start.elapsed());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let serve = serve(move |_: context::Context, i| async move { Ok(i) });
|
||||
serve
|
||||
.before_and_after(PrintLatency::new())
|
||||
.serve(context::current(), 7)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn serve_before_error_aborts_request() -> anyhow::Result<()> {
|
||||
let serve = serve(|_, _| async { panic!("Shouldn't get here") });
|
||||
let deadline_hook = serve.before(|_: &mut context::Context, _: &i32| async {
|
||||
Err(ServerError::new(io::ErrorKind::Other, "oops".into()))
|
||||
});
|
||||
let resp: Result<i32, _> = deadline_hook.serve(context::current(), 7).await;
|
||||
assert_matches!(resp, Err(_));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn base_channel_start_send_duplicate_request_returns_error() {
|
||||
let (mut channel, _tx) = test_channel::<(), ()>();
|
||||
@@ -1069,7 +1408,7 @@ mod tests {
|
||||
Poll::Ready(Some(Ok(request))) => request,
|
||||
result => panic!("Unexpected result: {:?}", result),
|
||||
};
|
||||
request.execute(|_, _| async {}).await;
|
||||
request.execute(serve(|_, _| async { Ok(()) })).await;
|
||||
assert!(requests
|
||||
.as_mut()
|
||||
.channel_pin_mut()
|
||||
|
||||
@@ -35,7 +35,7 @@ where
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "tokio1")))]
|
||||
fn execute<S>(self, serve: S) -> TokioServerExecutor<Self, S>
|
||||
where
|
||||
S: Serve<C::Req, Resp = C::Resp>,
|
||||
S: Serve<Req = C::Req, Resp = C::Resp>,
|
||||
{
|
||||
TokioServerExecutor::new(self, serve)
|
||||
}
|
||||
|
||||
22
tarpc/src/server/request_hook.rs
Normal file
22
tarpc/src/server/request_hook.rs
Normal file
@@ -0,0 +1,22 @@
|
||||
// Copyright 2022 Google LLC
|
||||
//
|
||||
// Use of this source code is governed by an MIT-style
|
||||
// license that can be found in the LICENSE file or at
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
//! Hooks for horizontal functionality that can run either before or after a request is executed.
|
||||
|
||||
/// A request hook that runs before a request is executed.
|
||||
mod before;
|
||||
|
||||
/// A request hook that runs after a request is completed.
|
||||
mod after;
|
||||
|
||||
/// A request hook that runs both before a request is executed and after it is completed.
|
||||
mod before_and_after;
|
||||
|
||||
pub use {
|
||||
after::{AfterRequest, AfterRequestHook},
|
||||
before::{BeforeRequest, BeforeRequestHook},
|
||||
before_and_after::BeforeAndAfterRequestHook,
|
||||
};
|
||||
89
tarpc/src/server/request_hook/after.rs
Normal file
89
tarpc/src/server/request_hook/after.rs
Normal file
@@ -0,0 +1,89 @@
|
||||
// Copyright 2022 Google LLC
|
||||
//
|
||||
// Use of this source code is governed by an MIT-style
|
||||
// license that can be found in the LICENSE file or at
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
//! Provides a hook that runs after request execution.
|
||||
|
||||
use crate::{context, server::Serve, ServerError};
|
||||
use futures::prelude::*;
|
||||
|
||||
/// A hook that runs after request execution.
|
||||
pub trait AfterRequest<Resp> {
|
||||
/// The type of future returned by the hook.
|
||||
type Fut<'a>: Future<Output = ()>
|
||||
where
|
||||
Self: 'a,
|
||||
Resp: 'a;
|
||||
|
||||
/// The function that is called after request execution.
|
||||
///
|
||||
/// The hook can modify the request context and the response.
|
||||
fn after<'a>(
|
||||
&'a mut self,
|
||||
ctx: &'a mut context::Context,
|
||||
resp: &'a mut Result<Resp, ServerError>,
|
||||
) -> Self::Fut<'a>;
|
||||
}
|
||||
|
||||
impl<F, Fut, Resp> AfterRequest<Resp> for F
|
||||
where
|
||||
F: FnMut(&mut context::Context, &mut Result<Resp, ServerError>) -> Fut,
|
||||
Fut: Future<Output = ()>,
|
||||
{
|
||||
type Fut<'a> = Fut where Self: 'a, Resp: 'a;
|
||||
|
||||
fn after<'a>(
|
||||
&'a mut self,
|
||||
ctx: &'a mut context::Context,
|
||||
resp: &'a mut Result<Resp, ServerError>,
|
||||
) -> Self::Fut<'a> {
|
||||
self(ctx, resp)
|
||||
}
|
||||
}
|
||||
|
||||
/// A Service function that runs a hook after request execution.
|
||||
pub struct AfterRequestHook<Serv, Hook> {
|
||||
serve: Serv,
|
||||
hook: Hook,
|
||||
}
|
||||
|
||||
impl<Serv, Hook> AfterRequestHook<Serv, Hook> {
|
||||
pub(crate) fn new(serve: Serv, hook: Hook) -> Self {
|
||||
Self { serve, hook }
|
||||
}
|
||||
}
|
||||
|
||||
impl<Serv: Clone, Hook: Clone> Clone for AfterRequestHook<Serv, Hook> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
serve: self.serve.clone(),
|
||||
hook: self.hook.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Serv, Hook> Serve for AfterRequestHook<Serv, Hook>
|
||||
where
|
||||
Serv: Serve,
|
||||
Hook: AfterRequest<Serv::Resp>,
|
||||
{
|
||||
type Req = Serv::Req;
|
||||
type Resp = Serv::Resp;
|
||||
type Fut = AfterRequestHookFut<Serv, Hook>;
|
||||
|
||||
fn serve(self, mut ctx: context::Context, req: Serv::Req) -> Self::Fut {
|
||||
async move {
|
||||
let AfterRequestHook {
|
||||
serve, mut hook, ..
|
||||
} = self;
|
||||
let mut resp = serve.serve(ctx, req).await;
|
||||
hook.after(&mut ctx, &mut resp).await;
|
||||
resp
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type AfterRequestHookFut<Serv: Serve, Hook: AfterRequest<Serv::Resp>> =
|
||||
impl Future<Output = Result<Serv::Resp, ServerError>>;
|
||||
84
tarpc/src/server/request_hook/before.rs
Normal file
84
tarpc/src/server/request_hook/before.rs
Normal file
@@ -0,0 +1,84 @@
|
||||
// Copyright 2022 Google LLC
|
||||
//
|
||||
// Use of this source code is governed by an MIT-style
|
||||
// license that can be found in the LICENSE file or at
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
//! Provides a hook that runs before request execution.
|
||||
|
||||
use crate::{context, server::Serve, ServerError};
|
||||
use futures::prelude::*;
|
||||
|
||||
/// A hook that runs before request execution.
|
||||
pub trait BeforeRequest<Req> {
|
||||
/// The type of future returned by the hook.
|
||||
type Fut<'a>: Future<Output = Result<(), ServerError>>
|
||||
where
|
||||
Self: 'a,
|
||||
Req: 'a;
|
||||
|
||||
/// The function that is called before request execution.
|
||||
///
|
||||
/// If this function returns an error, the request will not be executed and the error will be
|
||||
/// returned instead.
|
||||
///
|
||||
/// This function can also modify the request context. This could be used, for example, to
|
||||
/// enforce a maximum deadline on all requests.
|
||||
fn before<'a>(&'a mut self, ctx: &'a mut context::Context, req: &'a Req) -> Self::Fut<'a>;
|
||||
}
|
||||
|
||||
impl<F, Fut, Req> BeforeRequest<Req> for F
|
||||
where
|
||||
F: FnMut(&mut context::Context, &Req) -> Fut,
|
||||
Fut: Future<Output = Result<(), ServerError>>,
|
||||
{
|
||||
type Fut<'a> = Fut where Self: 'a, Req: 'a;
|
||||
|
||||
fn before<'a>(&'a mut self, ctx: &'a mut context::Context, req: &'a Req) -> Self::Fut<'a> {
|
||||
self(ctx, req)
|
||||
}
|
||||
}
|
||||
|
||||
/// A Service function that runs a hook before request execution.
|
||||
pub struct BeforeRequestHook<Serv, Hook> {
|
||||
serve: Serv,
|
||||
hook: Hook,
|
||||
}
|
||||
|
||||
impl<Serv, Hook> BeforeRequestHook<Serv, Hook> {
|
||||
pub(crate) fn new(serve: Serv, hook: Hook) -> Self {
|
||||
Self { serve, hook }
|
||||
}
|
||||
}
|
||||
|
||||
impl<Serv: Clone, Hook: Clone> Clone for BeforeRequestHook<Serv, Hook> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
serve: self.serve.clone(),
|
||||
hook: self.hook.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Serv, Hook> Serve for BeforeRequestHook<Serv, Hook>
|
||||
where
|
||||
Serv: Serve,
|
||||
Hook: BeforeRequest<Serv::Req>,
|
||||
{
|
||||
type Req = Serv::Req;
|
||||
type Resp = Serv::Resp;
|
||||
type Fut = BeforeRequestHookFut<Serv, Hook>;
|
||||
|
||||
fn serve(self, mut ctx: context::Context, req: Self::Req) -> Self::Fut {
|
||||
let BeforeRequestHook {
|
||||
serve, mut hook, ..
|
||||
} = self;
|
||||
async move {
|
||||
hook.before(&mut ctx, &req).await?;
|
||||
serve.serve(ctx, req).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type BeforeRequestHookFut<Serv: Serve, Hook: BeforeRequest<Serv::Req>> =
|
||||
impl Future<Output = Result<Serv::Resp, ServerError>>;
|
||||
70
tarpc/src/server/request_hook/before_and_after.rs
Normal file
70
tarpc/src/server/request_hook/before_and_after.rs
Normal file
@@ -0,0 +1,70 @@
|
||||
// Copyright 2022 Google LLC
|
||||
//
|
||||
// Use of this source code is governed by an MIT-style
|
||||
// license that can be found in the LICENSE file or at
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
//! Provides a hook that runs both before and after request execution.
|
||||
|
||||
use super::{after::AfterRequest, before::BeforeRequest};
|
||||
use crate::{context, server::Serve, ServerError};
|
||||
use futures::prelude::*;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
/// A Service function that runs a hook both before and after request execution.
|
||||
pub struct BeforeAndAfterRequestHook<Req, Resp, Serv, Hook> {
|
||||
serve: Serv,
|
||||
hook: Hook,
|
||||
fns: PhantomData<(fn(Req), fn(Resp))>,
|
||||
}
|
||||
|
||||
impl<Req, Resp, Serv, Hook> BeforeAndAfterRequestHook<Req, Resp, Serv, Hook> {
|
||||
pub(crate) fn new(serve: Serv, hook: Hook) -> Self {
|
||||
Self {
|
||||
serve,
|
||||
hook,
|
||||
fns: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp, Serv: Clone, Hook: Clone> Clone
|
||||
for BeforeAndAfterRequestHook<Req, Resp, Serv, Hook>
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
serve: self.serve.clone(),
|
||||
hook: self.hook.clone(),
|
||||
fns: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp, Serv, Hook> Serve for BeforeAndAfterRequestHook<Req, Resp, Serv, Hook>
|
||||
where
|
||||
Serv: Serve<Req = Req, Resp = Resp>,
|
||||
Hook: BeforeRequest<Req> + AfterRequest<Resp>,
|
||||
{
|
||||
type Req = Req;
|
||||
type Resp = Resp;
|
||||
type Fut = BeforeAndAfterRequestHookFut<Req, Resp, Serv, Hook>;
|
||||
|
||||
fn serve(self, mut ctx: context::Context, req: Req) -> Self::Fut {
|
||||
async move {
|
||||
let BeforeAndAfterRequestHook {
|
||||
serve, mut hook, ..
|
||||
} = self;
|
||||
hook.before(&mut ctx, &req).await?;
|
||||
let mut resp = serve.serve(ctx, req).await;
|
||||
hook.after(&mut ctx, &mut resp).await;
|
||||
resp
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type BeforeAndAfterRequestHookFut<
|
||||
Req,
|
||||
Resp,
|
||||
Serv: Serve<Req = Req, Resp = Resp>,
|
||||
Hook: BeforeRequest<Req> + AfterRequest<Resp>,
|
||||
> = impl Future<Output = Result<Serv::Resp, ServerError>>;
|
||||
@@ -55,9 +55,25 @@ where
|
||||
{
|
||||
/// Executes all requests using the given service function. Requests are handled concurrently
|
||||
/// by [spawning](::tokio::spawn) each handler on tokio's default executor.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use tarpc::{context, client, server::{self, BaseChannel, Channel, serve}, transport};
|
||||
/// use futures::prelude::*;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() {
|
||||
/// let (tx, rx) = transport::channel::unbounded();
|
||||
/// let requests = BaseChannel::new(server::Config::default(), rx).requests();
|
||||
/// let client = client::new(client::Config::default(), tx).spawn();
|
||||
/// tokio::spawn(requests.execute(serve(|_, i| async move { Ok(i + 1) })));
|
||||
/// assert_eq!(client.call(context::current(), "AddOne", 1).await.unwrap(), 2);
|
||||
/// }
|
||||
/// ```
|
||||
pub fn execute<S>(self, serve: S) -> TokioChannelExecutor<Self, S>
|
||||
where
|
||||
S: Serve<C::Req, Resp = C::Resp> + Send + 'static,
|
||||
S: Serve<Req = C::Req, Resp = C::Resp> + Send + 'static,
|
||||
{
|
||||
TokioChannelExecutor { inner: self, serve }
|
||||
}
|
||||
@@ -69,7 +85,7 @@ where
|
||||
C: Channel + Send + 'static,
|
||||
C::Req: Send + 'static,
|
||||
C::Resp: Send + 'static,
|
||||
Se: Serve<C::Req, Resp = C::Resp> + Send + 'static + Clone,
|
||||
Se: Serve<Req = C::Req, Resp = C::Resp> + Send + 'static + Clone,
|
||||
Se::Fut: Send,
|
||||
{
|
||||
type Output = ();
|
||||
@@ -88,7 +104,7 @@ where
|
||||
C: Channel + 'static,
|
||||
C::Req: Send + 'static,
|
||||
C::Resp: Send + 'static,
|
||||
S: Serve<C::Req, Resp = C::Resp> + Send + 'static + Clone,
|
||||
S: Serve<Req = C::Req, Resp = C::Resp> + Send + 'static + Clone,
|
||||
S::Fut: Send,
|
||||
{
|
||||
type Output = ();
|
||||
|
||||
@@ -150,12 +150,14 @@ impl<Item, SinkItem> Sink<SinkItem> for Channel<Item, SinkItem> {
|
||||
#[cfg(feature = "tokio1")]
|
||||
mod tests {
|
||||
use crate::{
|
||||
client, context,
|
||||
server::{incoming::Incoming, BaseChannel},
|
||||
client::{self, RpcError},
|
||||
context,
|
||||
server::{incoming::Incoming, serve, BaseChannel},
|
||||
transport::{
|
||||
self,
|
||||
channel::{Channel, UnboundedChannel},
|
||||
},
|
||||
ServerError,
|
||||
};
|
||||
use assert_matches::assert_matches;
|
||||
use futures::{prelude::*, stream};
|
||||
@@ -177,25 +179,25 @@ mod tests {
|
||||
tokio::spawn(
|
||||
stream::once(future::ready(server_channel))
|
||||
.map(BaseChannel::with_defaults)
|
||||
.execute(|_ctx, request: String| {
|
||||
future::ready(request.parse::<u64>().map_err(|_| {
|
||||
io::Error::new(
|
||||
.execute(serve(|_ctx, request: String| async move {
|
||||
request.parse::<u64>().map_err(|_| {
|
||||
ServerError::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
format!("{request:?} is not an int"),
|
||||
)
|
||||
}))
|
||||
}),
|
||||
})
|
||||
})),
|
||||
);
|
||||
|
||||
let client = client::new(client::Config::default(), client_channel).spawn();
|
||||
|
||||
let response1 = client.call(context::current(), "", "123".into()).await?;
|
||||
let response2 = client.call(context::current(), "", "abc".into()).await?;
|
||||
let response1 = client.call(context::current(), "", "123".into()).await;
|
||||
let response2 = client.call(context::current(), "", "abc".into()).await;
|
||||
|
||||
trace!("response1: {:?}, response2: {:?}", response1, response2);
|
||||
|
||||
assert_matches!(response1, Ok(123));
|
||||
assert_matches!(response2, Err(ref e) if e.kind() == io::ErrorKind::InvalidInput);
|
||||
assert_matches!(response2, Err(RpcError::Server(e)) if e.kind == io::ErrorKind::InvalidInput);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user