Move items in the rpc module to the top level.

The rpc module doesn't carry its weight. The whole darn project is RPC related!
This commit is contained in:
Tim Kuehn
2021-03-06 14:26:09 -08:00
parent 3b422eb179
commit b3eb8d0b7a
13 changed files with 141 additions and 157 deletions

View File

@@ -327,7 +327,13 @@ where
return Poll::Pending;
}
while let Poll::Pending = self.as_mut().project().transport.poll_ready(cx)? {
while self
.as_mut()
.project()
.transport
.poll_ready(cx)?
.is_pending()
{
// We can't yield a request-to-be-sent before the transport is capable of buffering it.
ready!(self.as_mut().project().transport.poll_flush(cx)?);
}
@@ -355,7 +361,13 @@ where
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> PollIo<(context::Context, u64)> {
while let Poll::Pending = self.as_mut().project().transport.poll_ready(cx)? {
while self
.as_mut()
.project()
.transport
.poll_ready(cx)?
.is_pending()
{
ready!(self.as_mut().project().transport.poll_flush(cx)?);
}
@@ -901,7 +913,7 @@ mod tests {
match self {
Poll::Ready(Some(Ok(t))) => Poll::Ready(Some(t)),
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Err(e))) => panic!(e.to_string()),
Poll::Ready(Some(Err(e))) => panic!("{}", e.to_string()),
Poll::Pending => Poll::Pending,
}
}
@@ -910,7 +922,7 @@ mod tests {
match self {
Poll::Ready(Some(Ok(t))) => Some(t),
Poll::Ready(None) => None,
Poll::Ready(Some(Err(e))) => panic!(e.to_string()),
Poll::Ready(Some(Err(e))) => panic!("{}", e.to_string()),
Poll::Pending => panic!("Pending"),
}
}

View File

@@ -203,9 +203,6 @@
#![allow(clippy::type_complexity)]
#![cfg_attr(docsrs, feature(doc_cfg))]
pub mod rpc;
pub use rpc::*;
#[cfg(feature = "serde1")]
pub use serde;
@@ -300,3 +297,126 @@ pub use tarpc_plugins::service;
/// Note that this won't touch functions unless they have been annotated with
/// `async`, meaning that this should not break existing code.
pub use tarpc_plugins::server;
pub mod client;
pub mod context;
pub mod server;
pub mod transport;
pub(crate) mod util;
pub use crate::{client::Client, server::Server, transport::sealed::Transport};
use anyhow::Context as _;
use futures::task::*;
use std::{fmt::Display, io, time::SystemTime};
/// A message from a client to a server.
#[derive(Debug)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub enum ClientMessage<T> {
/// A request initiated by a user. The server responds to a request by invoking a
/// service-provided request handler. The handler completes with a [`response`](Response), which
/// the server sends back to the client.
Request(Request<T>),
/// A command to cancel an in-flight request, automatically sent by the client when a response
/// future is dropped.
///
/// When received, the server will immediately cancel the main task (top-level future) of the
/// request handler for the associated request. Any tasks spawned by the request handler will
/// not be canceled, because the framework layer does not
/// know about them.
Cancel {
/// The trace context associates the message with a specific chain of causally-related actions,
/// possibly orchestrated across many distributed systems.
#[cfg_attr(feature = "serde1", serde(default))]
trace_context: trace::Context,
/// The ID of the request to cancel.
request_id: u64,
},
}
/// A request from a client to a server.
#[derive(Clone, Copy, Debug)]
#[non_exhaustive]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct Request<T> {
/// Trace context, deadline, and other cross-cutting concerns.
pub context: context::Context,
/// Uniquely identifies the request across all requests sent over a single channel.
pub id: u64,
/// The request body.
pub message: T,
}
/// A response from a server to a client.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct Response<T> {
/// The ID of the request being responded to.
pub request_id: u64,
/// The response body, or an error if the request failed.
pub message: Result<T, ServerError>,
}
/// An error response from a server to a client.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct ServerError {
#[cfg_attr(
feature = "serde1",
serde(serialize_with = "util::serde::serialize_io_error_kind_as_u32")
)]
#[cfg_attr(
feature = "serde1",
serde(deserialize_with = "util::serde::deserialize_io_error_kind_from_u32")
)]
/// The type of error that occurred to fail the request.
pub kind: io::ErrorKind,
/// A message describing more detail about the error that occurred.
pub detail: Option<String>,
}
impl From<ServerError> for io::Error {
fn from(e: ServerError) -> io::Error {
io::Error::new(e.kind, e.detail.unwrap_or_default())
}
}
impl<T> Request<T> {
/// Returns the deadline for this request.
pub fn deadline(&self) -> &SystemTime {
&self.context.deadline
}
}
pub(crate) type PollIo<T> = Poll<Option<io::Result<T>>>;
pub(crate) trait PollContext<T> {
fn context<C>(self, context: C) -> Poll<Option<anyhow::Result<T>>>
where
C: Display + Send + Sync + 'static;
fn with_context<C, F>(self, f: F) -> Poll<Option<anyhow::Result<T>>>
where
C: Display + Send + Sync + 'static,
F: FnOnce() -> C;
}
impl<T> PollContext<T> for PollIo<T> {
fn context<C>(self, context: C) -> Poll<Option<anyhow::Result<T>>>
where
C: Display + Send + Sync + 'static,
{
self.map(|o| o.map(|r| r.context(context)))
}
fn with_context<C, F>(self, f: F) -> Poll<Option<anyhow::Result<T>>>
where
C: Display + Send + Sync + 'static,
F: FnOnce() -> C,
{
self.map(|o| o.map(|r| r.with_context(f)))
}
}

View File

@@ -1,148 +0,0 @@
// Copyright 2018 Google LLC
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
#![deny(missing_docs, missing_debug_implementations)]
//! An RPC framework providing client and server.
//!
//! Features:
//! * RPC deadlines, both client- and server-side.
//! * Cascading cancellation (works with multiple hops).
//! * Configurable limits
//! * In-flight requests, both client and server-side.
//! * Server-side limit is per-connection.
//! * When the server reaches the in-flight request maximum, it returns a throttled error
//! to the client.
//! * When the client reaches the in-flight request max, messages are buffered up to a
//! configurable maximum, beyond which the requests are back-pressured.
//! * Server connections.
//! * Total and per-IP limits.
//! * When an incoming connection is accepted, if already at maximum, the connection is
//! dropped.
//! * Transport agnostic.
pub mod client;
pub mod context;
pub mod server;
pub mod transport;
pub(crate) mod util;
pub use crate::{client::Client, server::Server, trace, transport::sealed::Transport};
use anyhow::Context as _;
use futures::task::*;
use std::{fmt::Display, io, time::SystemTime};
/// A message from a client to a server.
#[derive(Debug)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub enum ClientMessage<T> {
/// A request initiated by a user. The server responds to a request by invoking a
/// service-provided request handler. The handler completes with a [`response`](Response), which
/// the server sends back to the client.
Request(Request<T>),
/// A command to cancel an in-flight request, automatically sent by the client when a response
/// future is dropped.
///
/// When received, the server will immediately cancel the main task (top-level future) of the
/// request handler for the associated request. Any tasks spawned by the request handler will
/// not be canceled, because the framework layer does not
/// know about them.
Cancel {
/// The trace context associates the message with a specific chain of causally-related actions,
/// possibly orchestrated across many distributed systems.
#[cfg_attr(feature = "serde1", serde(default))]
trace_context: trace::Context,
/// The ID of the request to cancel.
request_id: u64,
},
}
/// A request from a client to a server.
#[derive(Clone, Copy, Debug)]
#[non_exhaustive]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct Request<T> {
/// Trace context, deadline, and other cross-cutting concerns.
pub context: context::Context,
/// Uniquely identifies the request across all requests sent over a single channel.
pub id: u64,
/// The request body.
pub message: T,
}
/// A response from a server to a client.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct Response<T> {
/// The ID of the request being responded to.
pub request_id: u64,
/// The response body, or an error if the request failed.
pub message: Result<T, ServerError>,
}
/// An error response from a server to a client.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct ServerError {
#[cfg_attr(
feature = "serde1",
serde(serialize_with = "util::serde::serialize_io_error_kind_as_u32")
)]
#[cfg_attr(
feature = "serde1",
serde(deserialize_with = "util::serde::deserialize_io_error_kind_from_u32")
)]
/// The type of error that occurred to fail the request.
pub kind: io::ErrorKind,
/// A message describing more detail about the error that occurred.
pub detail: Option<String>,
}
impl From<ServerError> for io::Error {
fn from(e: ServerError) -> io::Error {
io::Error::new(e.kind, e.detail.unwrap_or_default())
}
}
impl<T> Request<T> {
/// Returns the deadline for this request.
pub fn deadline(&self) -> &SystemTime {
&self.context.deadline
}
}
pub(crate) type PollIo<T> = Poll<Option<io::Result<T>>>;
pub(crate) trait PollContext<T> {
fn context<C>(self, context: C) -> Poll<Option<anyhow::Result<T>>>
where
C: Display + Send + Sync + 'static;
fn with_context<C, F>(self, f: F) -> Poll<Option<anyhow::Result<T>>>
where
C: Display + Send + Sync + 'static,
F: FnOnce() -> C;
}
impl<T> PollContext<T> for PollIo<T> {
fn context<C>(self, context: C) -> Poll<Option<anyhow::Result<T>>>
where
C: Display + Send + Sync + 'static,
{
self.map(|o| o.map(|r| r.context(context)))
}
fn with_context<C, F>(self, f: F) -> Poll<Option<anyhow::Result<T>>>
where
C: Display + Send + Sync + 'static,
F: FnOnce() -> C,
{
self.map(|o| o.map(|r| r.with_context(f)))
}
}

View File

@@ -484,7 +484,7 @@ where
cx: &mut Context<'_>,
) -> PollIo<(context::Context, Response<C::Resp>)> {
// Ensure there's room to write a response.
while let Poll::Pending = self.as_mut().project().channel.poll_ready(cx)? {
while self.as_mut().project().channel.poll_ready(cx)?.is_pending() {
ready!(self.as_mut().project().channel.poll_flush(cx)?);
}

View File

@@ -36,8 +36,8 @@ where
/// `max_in_flight_requests`.
pub fn new(inner: C, max_in_flight_requests: usize) -> Self {
Throttler {
inner,
max_in_flight_requests,
inner,
}
}
}