mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-02-23 15:49:54 +01:00
Compare commits
26 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9b90f6ae51 | ||
|
|
bbfc8ac352 | ||
|
|
ad86a967ba | ||
|
|
58a0eced19 | ||
|
|
46fffd13e7 | ||
|
|
6c8d4be462 | ||
|
|
e3a517bf0d | ||
|
|
f4e22bdc2e | ||
|
|
46f56fbdc0 | ||
|
|
8665655592 | ||
|
|
4569d26d81 | ||
|
|
b8b92ddb5f | ||
|
|
8dd3390876 | ||
|
|
06c420b60c | ||
|
|
a7fb4d22cc | ||
|
|
b1cd5f34e5 | ||
|
|
088e5f8f2c | ||
|
|
4e0be5b626 | ||
|
|
5516034bbc | ||
|
|
06544faa5a | ||
|
|
39737b720a | ||
|
|
0f36985440 | ||
|
|
959bb691cd | ||
|
|
2a3162c5fa | ||
|
|
0cc976b729 | ||
|
|
4d2d3f24c6 |
@@ -33,7 +33,7 @@ arguments to tarpc fns.
|
||||
Add to your `Cargo.toml` dependencies:
|
||||
|
||||
```toml
|
||||
tarpc = "0.14.0"
|
||||
tarpc = "0.15.0"
|
||||
```
|
||||
|
||||
The `service!` macro expands to a collection of items that form an
|
||||
@@ -52,7 +52,7 @@ Here's a small service.
|
||||
|
||||
|
||||
use futures::{
|
||||
compat::TokioDefaultSpawner,
|
||||
compat::Executor01CompatExt,
|
||||
future::{self, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
@@ -121,7 +121,7 @@ async fn run() -> io::Result<()> {
|
||||
}
|
||||
|
||||
fn main() {
|
||||
tarpc::init(TokioDefaultSpawner);
|
||||
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
tokio::run(run()
|
||||
.map_err(|e| eprintln!("Oh no: {}", e))
|
||||
.boxed()
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
cargo-features = ["rename-dependency"]
|
||||
|
||||
[package]
|
||||
name = "tarpc-bincode-transport"
|
||||
version = "0.3.0"
|
||||
version = "0.5.0"
|
||||
authors = ["Tim Kuehn <tikue@google.com>"]
|
||||
edition = '2018'
|
||||
license = "MIT"
|
||||
@@ -15,26 +13,24 @@ readme = "../README.md"
|
||||
description = "A bincode-based transport for tarpc services."
|
||||
|
||||
[dependencies]
|
||||
bincode = { version = "1.0", features = ["i128"] }
|
||||
bincode = "1"
|
||||
futures-preview = { version = "0.3.0-alpha.14", features = ["compat"] }
|
||||
futures_legacy = { version = "0.1", package = "futures" }
|
||||
pin-utils = "0.1.0-alpha.3"
|
||||
rpc = { package = "tarpc-lib", version = "0.2", path = "../rpc", features = ["serde1"] }
|
||||
pin-utils = "0.1.0-alpha.4"
|
||||
rpc = { package = "tarpc-lib", version = "0.4", path = "../rpc", features = ["serde1"] }
|
||||
serde = "1.0"
|
||||
tokio-io = "0.1"
|
||||
async-bincode = "0.4"
|
||||
tokio-tcp = "0.1"
|
||||
|
||||
[target.'cfg(not(test))'.dependencies]
|
||||
futures-preview = { version = "0.3.0-alpha.9", features = ["compat"] }
|
||||
|
||||
[dev-dependencies]
|
||||
futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] }
|
||||
env_logger = "0.5"
|
||||
env_logger = "0.6"
|
||||
humantime = "1.0"
|
||||
libtest = "0.0.1"
|
||||
log = "0.4"
|
||||
rand = "0.5"
|
||||
rand = "0.6"
|
||||
tokio = "0.1"
|
||||
tokio-executor = "0.1"
|
||||
tokio-reactor = "0.1"
|
||||
tokio-serde = "0.2"
|
||||
tokio-serde = "0.3"
|
||||
tokio-timer = "0.2"
|
||||
|
||||
@@ -1,150 +0,0 @@
|
||||
use futures::{compat::Stream01CompatExt, prelude::*, ready};
|
||||
use futures_legacy::{
|
||||
executor::{
|
||||
self as executor01, Notify as Notify01, NotifyHandle as NotifyHandle01,
|
||||
UnsafeNotify as UnsafeNotify01,
|
||||
},
|
||||
Async as Async01, AsyncSink as AsyncSink01, Sink as Sink01, Stream as Stream01,
|
||||
};
|
||||
use std::{
|
||||
pin::Pin,
|
||||
task::{self, LocalWaker, Poll},
|
||||
};
|
||||
|
||||
/// A shim to convert a 0.1 Sink + Stream to a 0.3 Sink + Stream.
|
||||
#[derive(Debug)]
|
||||
pub struct Compat<S, SinkItem> {
|
||||
staged_item: Option<SinkItem>,
|
||||
inner: S,
|
||||
}
|
||||
|
||||
impl<S, SinkItem> Compat<S, SinkItem> {
|
||||
/// Returns a new Compat.
|
||||
pub fn new(inner: S) -> Self {
|
||||
Compat {
|
||||
inner,
|
||||
staged_item: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Unwraps Compat, returning the inner value.
|
||||
pub fn into_inner(self) -> S {
|
||||
self.inner
|
||||
}
|
||||
|
||||
/// Returns a reference to the value wrapped by Compat.
|
||||
pub fn get_ref(&self) -> &S {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, SinkItem> Stream for Compat<S, SinkItem>
|
||||
where
|
||||
S: Stream01,
|
||||
{
|
||||
type Item = Result<S::Item, S::Error>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<Self::Item>> {
|
||||
unsafe {
|
||||
let inner = &mut Pin::get_mut_unchecked(self).inner;
|
||||
let mut compat = inner.compat();
|
||||
let compat = Pin::new_unchecked(&mut compat);
|
||||
match ready!(compat.poll_next(waker)) {
|
||||
None => Poll::Ready(None),
|
||||
Some(Ok(next)) => Poll::Ready(Some(Ok(next))),
|
||||
Some(Err(e)) => Poll::Ready(Some(Err(e))),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, SinkItem> Sink for Compat<S, SinkItem>
|
||||
where
|
||||
S: Sink01<SinkItem = SinkItem>,
|
||||
{
|
||||
type SinkItem = SinkItem;
|
||||
type SinkError = S::SinkError;
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> Result<(), S::SinkError> {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
assert!(me.staged_item.is_none());
|
||||
me.staged_item = Some(item);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
let notify = &WakerToHandle(waker);
|
||||
|
||||
executor01::with_notify(notify, 0, move || {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
match me.staged_item.take() {
|
||||
Some(staged_item) => match me.inner.start_send(staged_item) {
|
||||
Ok(AsyncSink01::Ready) => Poll::Ready(Ok(())),
|
||||
Ok(AsyncSink01::NotReady(item)) => {
|
||||
me.staged_item = Some(item);
|
||||
Poll::Pending
|
||||
}
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
},
|
||||
None => Poll::Ready(Ok(())),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
let notify = &WakerToHandle(waker);
|
||||
|
||||
executor01::with_notify(notify, 0, move || {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
match me.inner.poll_complete() {
|
||||
Ok(Async01::Ready(())) => Poll::Ready(Ok(())),
|
||||
Ok(Async01::NotReady) => Poll::Pending,
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
let notify = &WakerToHandle(waker);
|
||||
|
||||
executor01::with_notify(notify, 0, move || {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
match me.inner.close() {
|
||||
Ok(Async01::Ready(())) => Poll::Ready(Ok(())),
|
||||
Ok(Async01::NotReady) => Poll::Pending,
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct WakerToHandle<'a>(&'a LocalWaker);
|
||||
|
||||
#[derive(Debug)]
|
||||
struct NotifyWaker(task::Waker);
|
||||
|
||||
impl Notify01 for NotifyWaker {
|
||||
fn notify(&self, _: usize) {
|
||||
self.0.wake();
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl UnsafeNotify01 for NotifyWaker {
|
||||
unsafe fn clone_raw(&self) -> NotifyHandle01 {
|
||||
let ptr = Box::new(NotifyWaker(self.0.clone()));
|
||||
|
||||
NotifyHandle01::new(Box::into_raw(ptr))
|
||||
}
|
||||
|
||||
unsafe fn drop_raw(&self) {
|
||||
let ptr: *const dyn UnsafeNotify01 = self;
|
||||
drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<WakerToHandle<'a>> for NotifyHandle01 {
|
||||
fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 {
|
||||
unsafe { NotifyWaker(handle.0.clone().into_waker()).clone_raw() }
|
||||
}
|
||||
}
|
||||
@@ -6,22 +6,11 @@
|
||||
|
||||
//! A TCP [`Transport`] that serializes as bincode.
|
||||
|
||||
#![feature(
|
||||
futures_api,
|
||||
pin,
|
||||
arbitrary_self_types,
|
||||
await_macro,
|
||||
async_await
|
||||
)]
|
||||
#![feature(futures_api, arbitrary_self_types, await_macro, async_await)]
|
||||
#![deny(missing_docs, missing_debug_implementations)]
|
||||
|
||||
use self::compat::Compat;
|
||||
use async_bincode::{AsyncBincodeStream, AsyncDestination};
|
||||
use futures::{
|
||||
compat::{Compat01As03, Future01CompatExt, Stream01CompatExt},
|
||||
prelude::*,
|
||||
ready,
|
||||
};
|
||||
use futures::{compat::*, prelude::*, ready};
|
||||
use pin_utils::unsafe_pinned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
@@ -30,29 +19,20 @@ use std::{
|
||||
marker::PhantomData,
|
||||
net::SocketAddr,
|
||||
pin::Pin,
|
||||
task::{LocalWaker, Poll},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_tcp::{TcpListener, TcpStream};
|
||||
|
||||
mod compat;
|
||||
|
||||
/// A transport that serializes to, and deserializes from, a [`TcpStream`].
|
||||
#[derive(Debug)]
|
||||
pub struct Transport<S, Item, SinkItem> {
|
||||
inner: Compat<AsyncBincodeStream<S, Item, SinkItem, AsyncDestination>, SinkItem>,
|
||||
}
|
||||
|
||||
impl<S, Item, SinkItem> Transport<S, Item, SinkItem> {
|
||||
/// Returns the transport underlying the bincode transport.
|
||||
pub fn into_inner(self) -> S {
|
||||
self.inner.into_inner().into_inner()
|
||||
}
|
||||
inner: Compat01As03Sink<AsyncBincodeStream<S, Item, SinkItem, AsyncDestination>, SinkItem>,
|
||||
}
|
||||
|
||||
impl<S, Item, SinkItem> Transport<S, Item, SinkItem> {
|
||||
unsafe_pinned!(
|
||||
inner: Compat<AsyncBincodeStream<S, Item, SinkItem, AsyncDestination>, SinkItem>
|
||||
inner: Compat01As03Sink<AsyncBincodeStream<S, Item, SinkItem, AsyncDestination>, SinkItem>
|
||||
);
|
||||
}
|
||||
|
||||
@@ -63,8 +43,8 @@ where
|
||||
{
|
||||
type Item = io::Result<Item>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<io::Result<Item>>> {
|
||||
match self.inner().poll_next(waker) {
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<io::Result<Item>>> {
|
||||
match self.inner().poll_next(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
Poll::Ready(Some(Ok(next))) => Poll::Ready(Some(Ok(next))),
|
||||
@@ -75,30 +55,29 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, Item, SinkItem> Sink for Transport<S, Item, SinkItem>
|
||||
impl<S, Item, SinkItem> Sink<SinkItem> for Transport<S, Item, SinkItem>
|
||||
where
|
||||
S: AsyncWrite,
|
||||
SinkItem: Serialize,
|
||||
{
|
||||
type SinkItem = SinkItem;
|
||||
type SinkError = io::Error;
|
||||
|
||||
fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
|
||||
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
|
||||
self.inner()
|
||||
.start_send(item)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
|
||||
}
|
||||
|
||||
fn poll_ready(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
convert(self.inner().poll_ready(waker))
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
convert(self.inner().poll_ready(cx))
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
convert(self.inner().poll_flush(waker))
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
convert(self.inner().poll_flush(cx))
|
||||
}
|
||||
|
||||
fn poll_close(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
convert(self.inner().poll_close(waker))
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
convert(self.inner().poll_close(cx))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,7 +118,7 @@ where
|
||||
impl<S, Item, SinkItem> From<S> for Transport<S, Item, SinkItem> {
|
||||
fn from(inner: S) -> Self {
|
||||
Transport {
|
||||
inner: Compat::new(AsyncBincodeStream::from(inner).for_async()),
|
||||
inner: Compat01As03Sink::new(AsyncBincodeStream::from(inner).for_async()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -195,8 +174,8 @@ where
|
||||
{
|
||||
type Item = io::Result<Transport<TcpStream, Item, SinkItem>>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<Self::Item>> {
|
||||
let next = ready!(self.incoming().poll_next(waker)?);
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let next = ready!(self.incoming().poll_next(cx)?);
|
||||
Poll::Ready(next.map(|conn| Ok(new(conn))))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,10 +15,8 @@
|
||||
async_await
|
||||
)]
|
||||
|
||||
extern crate test;
|
||||
|
||||
use self::test::stats::Stats;
|
||||
use futures::{compat::TokioDefaultSpawner, prelude::*};
|
||||
use futures::{compat::Executor01CompatExt, prelude::*};
|
||||
use libtest::stats::Stats;
|
||||
use rpc::{
|
||||
client, context,
|
||||
server::{Handler, Server},
|
||||
@@ -101,7 +99,7 @@ async fn bench() -> io::Result<()> {
|
||||
#[test]
|
||||
fn bench_small_packet() -> io::Result<()> {
|
||||
env_logger::init();
|
||||
rpc::init(TokioDefaultSpawner);
|
||||
rpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
tokio::run(bench().map_err(|e| panic!(e.to_string())).boxed().compat());
|
||||
println!("done");
|
||||
|
||||
@@ -9,9 +9,9 @@
|
||||
#![feature(generators, await_macro, async_await, futures_api)]
|
||||
|
||||
use futures::{
|
||||
compat::{Future01CompatExt, TokioDefaultSpawner},
|
||||
compat::{Executor01CompatExt, Future01CompatExt},
|
||||
prelude::*,
|
||||
stream,
|
||||
stream::FuturesUnordered,
|
||||
};
|
||||
use log::{info, trace};
|
||||
use rand::distributions::{Distribution, Normal};
|
||||
@@ -126,7 +126,10 @@ async fn run() -> io::Result<()> {
|
||||
let response = client.call(ctx, "ping".into());
|
||||
requests.push(response.map(move |r| (trace_id, r)));
|
||||
}
|
||||
let (fastest_response, _) = await!(stream::futures_unordered(requests).into_future());
|
||||
let (fastest_response, _) = await!(requests
|
||||
.into_iter()
|
||||
.collect::<FuturesUnordered<_>>()
|
||||
.into_future());
|
||||
let (trace_id, resp) = fastest_response.unwrap();
|
||||
info!("[{}] fastest_response = {:?}", trace_id, resp);
|
||||
|
||||
@@ -136,7 +139,7 @@ async fn run() -> io::Result<()> {
|
||||
#[test]
|
||||
fn cancel_slower() -> io::Result<()> {
|
||||
env_logger::init();
|
||||
rpc::init(TokioDefaultSpawner);
|
||||
rpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
tokio::run(run().boxed().map_err(|e| panic!(e)).compat());
|
||||
Ok(())
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
#![feature(generators, await_macro, async_await, futures_api)]
|
||||
|
||||
use futures::{
|
||||
compat::{Future01CompatExt, TokioDefaultSpawner},
|
||||
compat::{Executor01CompatExt, Future01CompatExt},
|
||||
prelude::*,
|
||||
};
|
||||
use log::{error, info, trace};
|
||||
@@ -105,7 +105,7 @@ async fn run() -> io::Result<()> {
|
||||
#[test]
|
||||
fn ping_pong() -> io::Result<()> {
|
||||
env_logger::init();
|
||||
rpc::init(TokioDefaultSpawner);
|
||||
rpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
tokio::run(
|
||||
run()
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
cargo-features = ["rename-dependency"]
|
||||
|
||||
[package]
|
||||
name = "tarpc-example-service"
|
||||
version = "0.2.0"
|
||||
version = "0.4.0"
|
||||
authors = ["Tim Kuehn <tikue@google.com>"]
|
||||
edition = "2018"
|
||||
license = "MIT"
|
||||
@@ -15,11 +13,11 @@ readme = "../README.md"
|
||||
description = "An example server built on tarpc."
|
||||
|
||||
[dependencies]
|
||||
bincode-transport = { package = "tarpc-bincode-transport", version = "0.3", path = "../bincode-transport" }
|
||||
bincode-transport = { package = "tarpc-bincode-transport", version = "0.5", path = "../bincode-transport" }
|
||||
clap = "2.0"
|
||||
futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] }
|
||||
futures-preview = { version = "0.3.0-alpha.14", features = ["compat"] }
|
||||
serde = { version = "1.0" }
|
||||
tarpc = { version = "0.14", path = "../tarpc", features = ["serde1"] }
|
||||
tarpc = { version = "0.16", path = "../tarpc", features = ["serde1"] }
|
||||
tokio = "0.1"
|
||||
tokio-executor = "0.1"
|
||||
|
||||
|
||||
@@ -4,16 +4,10 @@
|
||||
// license that can be found in the LICENSE file or at
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
#![feature(
|
||||
futures_api,
|
||||
pin,
|
||||
arbitrary_self_types,
|
||||
await_macro,
|
||||
async_await
|
||||
)]
|
||||
#![feature(futures_api, arbitrary_self_types, await_macro, async_await)]
|
||||
|
||||
use clap::{App, Arg};
|
||||
use futures::{compat::TokioDefaultSpawner, prelude::*};
|
||||
use futures::{compat::Executor01CompatExt, prelude::*};
|
||||
use std::{io, net::SocketAddr};
|
||||
use tarpc::{client, context};
|
||||
|
||||
@@ -59,7 +53,7 @@ fn main() {
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
tarpc::init(TokioDefaultSpawner);
|
||||
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
let server_addr = flags.value_of("server_addr").unwrap();
|
||||
let server_addr = server_addr
|
||||
@@ -68,7 +62,7 @@ fn main() {
|
||||
|
||||
let name = flags.value_of("name").unwrap();
|
||||
|
||||
tarpc::init(TokioDefaultSpawner);
|
||||
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
tokio::run(
|
||||
run(server_addr, name.into())
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
|
||||
#![feature(
|
||||
futures_api,
|
||||
pin,
|
||||
arbitrary_self_types,
|
||||
await_macro,
|
||||
async_await,
|
||||
|
||||
@@ -4,17 +4,11 @@
|
||||
// license that can be found in the LICENSE file or at
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
#![feature(
|
||||
futures_api,
|
||||
pin,
|
||||
arbitrary_self_types,
|
||||
await_macro,
|
||||
async_await
|
||||
)]
|
||||
#![feature(futures_api, arbitrary_self_types, await_macro, async_await)]
|
||||
|
||||
use clap::{App, Arg};
|
||||
use futures::{
|
||||
compat::TokioDefaultSpawner,
|
||||
compat::Executor01CompatExt,
|
||||
future::{self, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
@@ -79,7 +73,7 @@ fn main() {
|
||||
.parse()
|
||||
.unwrap_or_else(|e| panic!(r#"--port value "{}" invalid: {}"#, port, e));
|
||||
|
||||
tarpc::init(TokioDefaultSpawner);
|
||||
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
tokio::run(
|
||||
run(([0, 0, 0, 0], port).into())
|
||||
|
||||
@@ -96,7 +96,7 @@ do
|
||||
diff="$diff$(cargo fmt -- --skip-children --write-mode=diff $file)"
|
||||
fi
|
||||
done
|
||||
if grep --quiet "^Diff at line" <<< "$diff"; then
|
||||
if grep --quiet "^[-+]" <<< "$diff"; then
|
||||
FMTRESULT=1
|
||||
fi
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tarpc-plugins"
|
||||
version = "0.5.0"
|
||||
version = "0.5.1"
|
||||
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
|
||||
license = "MIT"
|
||||
documentation = "https://docs.rs/tarpc-plugins"
|
||||
@@ -15,7 +15,7 @@ description = "Proc macros for tarpc."
|
||||
travis-ci = { repository = "google/tarpc" }
|
||||
|
||||
[dependencies]
|
||||
itertools = "0.7"
|
||||
itertools = "0.8"
|
||||
syn = { version = "0.15", features = ["full", "extra-traits"] }
|
||||
quote = "0.6"
|
||||
proc-macro2 = "0.4"
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
cargo-features = ["rename-dependency"]
|
||||
|
||||
[package]
|
||||
name = "tarpc-lib"
|
||||
version = "0.2.0"
|
||||
version = "0.4.0"
|
||||
authors = ["Tim Kuehn <tikue@google.com>"]
|
||||
edition = '2018'
|
||||
license = "MIT"
|
||||
@@ -20,19 +18,16 @@ serde1 = ["trace/serde", "serde", "serde/derive"]
|
||||
|
||||
[dependencies]
|
||||
fnv = "1.0"
|
||||
futures-preview = { version = "0.3.0-alpha.14", features = ["compat"] }
|
||||
humantime = "1.0"
|
||||
log = "0.4"
|
||||
pin-utils = "0.1.0-alpha.3"
|
||||
rand = "0.5"
|
||||
pin-utils = "0.1.0-alpha.4"
|
||||
rand = "0.6"
|
||||
tokio-timer = "0.2"
|
||||
trace = { package = "tarpc-trace", version = "0.1", path = "../trace" }
|
||||
trace = { package = "tarpc-trace", version = "0.2", path = "../trace" }
|
||||
serde = { optional = true, version = "1.0" }
|
||||
|
||||
[target.'cfg(not(test))'.dependencies]
|
||||
futures-preview = { version = "0.3.0-alpha.9", features = ["compat"] }
|
||||
|
||||
[dev-dependencies]
|
||||
futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] }
|
||||
futures-test-preview = { version = "0.3.0-alpha.9" }
|
||||
env_logger = "0.5"
|
||||
futures-test-preview = { version = "0.3.0-alpha.14" }
|
||||
env_logger = "0.6"
|
||||
tokio = "0.1"
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
use crate::{
|
||||
context,
|
||||
util::{deadline_compat, AsDuration, Compact},
|
||||
ClientMessage, ClientMessageKind, Request, Response, Transport,
|
||||
ClientMessage, ClientMessageKind, PollIo, Request, Response, Transport,
|
||||
};
|
||||
use fnv::FnvHashMap;
|
||||
use futures::{
|
||||
@@ -15,7 +15,7 @@ use futures::{
|
||||
prelude::*,
|
||||
ready,
|
||||
stream::Fuse,
|
||||
task::LocalWaker,
|
||||
task::Context,
|
||||
Poll,
|
||||
};
|
||||
use humantime::format_rfc3339;
|
||||
@@ -62,17 +62,22 @@ impl<Req, Resp> Clone for Channel<Req, Resp> {
|
||||
#[derive(Debug)]
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
struct Send<'a, Req, Resp> {
|
||||
fut: MapOkDispatchResponse<
|
||||
MapErrConnectionReset<futures::sink::Send<'a, mpsc::Sender<DispatchRequest<Req, Resp>>>>,
|
||||
Resp,
|
||||
>,
|
||||
fut: MapOkDispatchResponse<SendMapErrConnectionReset<'a, Req, Resp>, Resp>,
|
||||
}
|
||||
|
||||
type SendMapErrConnectionReset<'a, Req, Resp> = MapErrConnectionReset<
|
||||
futures::sink::Send<'a, mpsc::Sender<DispatchRequest<Req, Resp>>, DispatchRequest<Req, Resp>>,
|
||||
>;
|
||||
|
||||
impl<'a, Req, Resp> Send<'a, Req, Resp> {
|
||||
unsafe_pinned!(
|
||||
fut: MapOkDispatchResponse<
|
||||
MapErrConnectionReset<
|
||||
futures::sink::Send<'a, mpsc::Sender<DispatchRequest<Req, Resp>>>,
|
||||
futures::sink::Send<
|
||||
'a,
|
||||
mpsc::Sender<DispatchRequest<Req, Resp>>,
|
||||
DispatchRequest<Req, Resp>,
|
||||
>,
|
||||
>,
|
||||
Resp,
|
||||
>
|
||||
@@ -82,8 +87,8 @@ impl<'a, Req, Resp> Send<'a, Req, Resp> {
|
||||
impl<'a, Req, Resp> Future for Send<'a, Req, Resp> {
|
||||
type Output = io::Result<DispatchResponse<Resp>>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
self.fut().poll(lw)
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
self.as_mut().fut().poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,15 +106,15 @@ impl<'a, Req, Resp> Call<'a, Req, Resp> {
|
||||
impl<'a, Req, Resp> Future for Call<'a, Req, Resp> {
|
||||
type Output = io::Result<Resp>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
self.fut().poll(lw)
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
self.as_mut().fut().poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp> Channel<Req, Resp> {
|
||||
/// Sends a request to the dispatch task to forward to the server, returning a [`Future`] that
|
||||
/// resolves when the request is sent (not when the response is received).
|
||||
fn send<'a>(&'a mut self, mut ctx: context::Context, request: Req) -> Send<'a, Req, Resp> {
|
||||
fn send(&mut self, mut ctx: context::Context, request: Req) -> Send<Req, Resp> {
|
||||
// Convert the context to the call context.
|
||||
ctx.trace_context.parent_id = Some(ctx.trace_context.span_id);
|
||||
ctx.trace_context.span_id = SpanId::random(&mut rand::thread_rng());
|
||||
@@ -150,7 +155,7 @@ impl<Req, Resp> Channel<Req, Resp> {
|
||||
|
||||
/// Sends a request to the dispatch task to forward to the server, returning a [`Future`] that
|
||||
/// resolves to the response.
|
||||
pub fn call<'a>(&'a mut self, context: context::Context, request: Req) -> Call<'a, Req, Resp> {
|
||||
pub fn call(&mut self, context: context::Context, request: Req) -> Call<Req, Resp> {
|
||||
Call {
|
||||
fut: AndThenIdent::new(self.send(context, request)),
|
||||
}
|
||||
@@ -177,16 +182,16 @@ impl<Resp> DispatchResponse<Resp> {
|
||||
impl<Resp> Future for DispatchResponse<Resp> {
|
||||
type Output = io::Result<Resp>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<Resp>> {
|
||||
let resp = ready!(self.response.poll_unpin(waker));
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<Resp>> {
|
||||
let resp = ready!(self.response.poll_unpin(cx));
|
||||
|
||||
self.complete = true;
|
||||
|
||||
Poll::Ready(match resp {
|
||||
Ok(resp) => Ok(resp.message?),
|
||||
Err(e) => Err({
|
||||
let trace_id = *self.ctx().trace_id();
|
||||
let server_addr = *self.server_addr();
|
||||
let trace_id = *self.as_mut().ctx().trace_id();
|
||||
let server_addr = *self.as_mut().server_addr();
|
||||
|
||||
if e.is_elapsed() {
|
||||
io::Error::new(
|
||||
@@ -258,6 +263,7 @@ where
|
||||
{
|
||||
let (to_dispatch, pending_requests) = mpsc::channel(config.pending_request_buffer);
|
||||
let (cancellation, canceled_requests) = cancellations();
|
||||
let canceled_requests = canceled_requests.fuse();
|
||||
|
||||
crate::spawn(
|
||||
RequestDispatch {
|
||||
@@ -296,7 +302,7 @@ struct RequestDispatch<Req, Resp, C> {
|
||||
/// Requests waiting to be written to the wire.
|
||||
pending_requests: Fuse<mpsc::Receiver<DispatchRequest<Req, Resp>>>,
|
||||
/// Requests that were dropped.
|
||||
canceled_requests: CanceledRequests,
|
||||
canceled_requests: Fuse<CanceledRequests>,
|
||||
/// Requests already written to the wire that haven't yet received responses.
|
||||
in_flight_requests: FnvHashMap<u64, InFlightData<Resp>>,
|
||||
/// Configures limits to prevent unlimited resource usage.
|
||||
@@ -313,30 +319,30 @@ where
|
||||
{
|
||||
unsafe_pinned!(server_addr: SocketAddr);
|
||||
unsafe_pinned!(in_flight_requests: FnvHashMap<u64, InFlightData<Resp>>);
|
||||
unsafe_pinned!(canceled_requests: CanceledRequests);
|
||||
unsafe_pinned!(canceled_requests: Fuse<CanceledRequests>);
|
||||
unsafe_pinned!(pending_requests: Fuse<mpsc::Receiver<DispatchRequest<Req, Resp>>>);
|
||||
unsafe_pinned!(transport: Fuse<C>);
|
||||
|
||||
fn pump_read(self: &mut Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<io::Result<()>>> {
|
||||
Poll::Ready(match ready!(self.transport().poll_next(waker)?) {
|
||||
fn pump_read(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> {
|
||||
Poll::Ready(match ready!(self.as_mut().transport().poll_next(cx)?) {
|
||||
Some(response) => {
|
||||
self.complete(response);
|
||||
Some(Ok(()))
|
||||
}
|
||||
None => {
|
||||
trace!("[{}] read half closed", self.server_addr());
|
||||
trace!("[{}] read half closed", self.as_mut().server_addr());
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn pump_write(self: &mut Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<io::Result<()>>> {
|
||||
fn pump_write(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> {
|
||||
enum ReceiverStatus {
|
||||
NotReady,
|
||||
Closed,
|
||||
}
|
||||
|
||||
let pending_requests_status = match self.poll_next_request(waker)? {
|
||||
let pending_requests_status = match self.poll_next_request(cx)? {
|
||||
Poll::Ready(Some(dispatch_request)) => {
|
||||
self.write_request(dispatch_request)?;
|
||||
return Poll::Ready(Some(Ok(())));
|
||||
@@ -345,7 +351,7 @@ where
|
||||
Poll::Pending => ReceiverStatus::NotReady,
|
||||
};
|
||||
|
||||
let canceled_requests_status = match self.poll_next_cancellation(waker)? {
|
||||
let canceled_requests_status = match self.poll_next_cancellation(cx)? {
|
||||
Poll::Ready(Some((context, request_id))) => {
|
||||
self.write_cancel(context, request_id)?;
|
||||
return Poll::Ready(Some(Ok(())));
|
||||
@@ -356,12 +362,12 @@ where
|
||||
|
||||
match (pending_requests_status, canceled_requests_status) {
|
||||
(ReceiverStatus::Closed, ReceiverStatus::Closed) => {
|
||||
ready!(self.transport().poll_flush(waker)?);
|
||||
ready!(self.as_mut().transport().poll_flush(cx)?);
|
||||
Poll::Ready(None)
|
||||
}
|
||||
(ReceiverStatus::NotReady, _) | (_, ReceiverStatus::NotReady) => {
|
||||
// No more messages to process, so flush any messages buffered in the transport.
|
||||
ready!(self.transport().poll_flush(waker)?);
|
||||
ready!(self.as_mut().transport().poll_flush(cx)?);
|
||||
|
||||
// Even if we fully-flush, we return Pending, because we have no more requests
|
||||
// or cancellations right now.
|
||||
@@ -373,12 +379,12 @@ where
|
||||
/// Yields the next pending request, if one is ready to be sent.
|
||||
fn poll_next_request(
|
||||
self: &mut Pin<&mut Self>,
|
||||
waker: &LocalWaker,
|
||||
) -> Poll<Option<io::Result<DispatchRequest<Req, Resp>>>> {
|
||||
if self.in_flight_requests().len() >= self.config.max_in_flight_requests {
|
||||
cx: &mut Context<'_>,
|
||||
) -> PollIo<DispatchRequest<Req, Resp>> {
|
||||
if self.as_mut().in_flight_requests().len() >= self.config.max_in_flight_requests {
|
||||
info!(
|
||||
"At in-flight request capacity ({}/{}).",
|
||||
self.in_flight_requests().len(),
|
||||
self.as_mut().in_flight_requests().len(),
|
||||
self.config.max_in_flight_requests
|
||||
);
|
||||
|
||||
@@ -387,13 +393,13 @@ where
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
while let Poll::Pending = self.transport().poll_ready(waker)? {
|
||||
while let Poll::Pending = self.as_mut().transport().poll_ready(cx)? {
|
||||
// We can't yield a request-to-be-sent before the transport is capable of buffering it.
|
||||
ready!(self.transport().poll_flush(waker)?);
|
||||
ready!(self.as_mut().transport().poll_flush(cx)?);
|
||||
}
|
||||
|
||||
loop {
|
||||
match ready!(self.pending_requests().poll_next_unpin(waker)) {
|
||||
match ready!(self.as_mut().pending_requests().poll_next_unpin(cx)) {
|
||||
Some(request) => {
|
||||
if request.response_completion.is_canceled() {
|
||||
trace!(
|
||||
@@ -406,7 +412,7 @@ where
|
||||
return Poll::Ready(Some(Ok(request)));
|
||||
}
|
||||
None => {
|
||||
trace!("[{}] pending_requests closed", self.server_addr());
|
||||
trace!("[{}] pending_requests closed", self.as_mut().server_addr());
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
}
|
||||
@@ -416,29 +422,35 @@ where
|
||||
/// Yields the next pending cancellation, and, if one is ready, cancels the associated request.
|
||||
fn poll_next_cancellation(
|
||||
self: &mut Pin<&mut Self>,
|
||||
waker: &LocalWaker,
|
||||
) -> Poll<Option<io::Result<(context::Context, u64)>>> {
|
||||
while let Poll::Pending = self.transport().poll_ready(waker)? {
|
||||
ready!(self.transport().poll_flush(waker)?);
|
||||
cx: &mut Context<'_>,
|
||||
) -> PollIo<(context::Context, u64)> {
|
||||
while let Poll::Pending = self.as_mut().transport().poll_ready(cx)? {
|
||||
ready!(self.as_mut().transport().poll_flush(cx)?);
|
||||
}
|
||||
|
||||
loop {
|
||||
match ready!(self.canceled_requests().poll_next_unpin(waker)) {
|
||||
let cancellation = self.as_mut().canceled_requests().poll_next_unpin(cx);
|
||||
match ready!(cancellation) {
|
||||
Some(request_id) => {
|
||||
if let Some(in_flight_data) = self.in_flight_requests().remove(&request_id) {
|
||||
self.in_flight_requests().compact(0.1);
|
||||
if let Some(in_flight_data) =
|
||||
self.as_mut().in_flight_requests().remove(&request_id)
|
||||
{
|
||||
self.as_mut().in_flight_requests().compact(0.1);
|
||||
|
||||
debug!(
|
||||
"[{}/{}] Removed request.",
|
||||
in_flight_data.ctx.trace_id(),
|
||||
self.server_addr()
|
||||
self.as_mut().server_addr()
|
||||
);
|
||||
|
||||
return Poll::Ready(Some(Ok((in_flight_data.ctx, request_id))));
|
||||
}
|
||||
}
|
||||
None => {
|
||||
trace!("[{}] canceled_requests closed.", self.server_addr());
|
||||
trace!(
|
||||
"[{}] canceled_requests closed.",
|
||||
self.as_mut().server_addr()
|
||||
);
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
}
|
||||
@@ -458,8 +470,8 @@ where
|
||||
deadline: dispatch_request.ctx.deadline,
|
||||
}),
|
||||
};
|
||||
self.transport().start_send(request)?;
|
||||
self.in_flight_requests().insert(
|
||||
self.as_mut().transport().start_send(request)?;
|
||||
self.as_mut().in_flight_requests().insert(
|
||||
request_id,
|
||||
InFlightData {
|
||||
ctx: dispatch_request.ctx,
|
||||
@@ -479,20 +491,28 @@ where
|
||||
trace_context: context.trace_context,
|
||||
message: ClientMessageKind::Cancel { request_id },
|
||||
};
|
||||
self.transport().start_send(cancel)?;
|
||||
trace!("[{}/{}] Cancel message sent.", trace_id, self.server_addr());
|
||||
return Ok(());
|
||||
self.as_mut().transport().start_send(cancel)?;
|
||||
trace!(
|
||||
"[{}/{}] Cancel message sent.",
|
||||
trace_id,
|
||||
self.as_mut().server_addr()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Sends a server response to the client task that initiated the associated request.
|
||||
fn complete(self: &mut Pin<&mut Self>, response: Response<Resp>) -> bool {
|
||||
if let Some(in_flight_data) = self.in_flight_requests().remove(&response.request_id) {
|
||||
self.in_flight_requests().compact(0.1);
|
||||
if let Some(in_flight_data) = self
|
||||
.as_mut()
|
||||
.in_flight_requests()
|
||||
.remove(&response.request_id)
|
||||
{
|
||||
self.as_mut().in_flight_requests().compact(0.1);
|
||||
|
||||
trace!(
|
||||
"[{}/{}] Received response.",
|
||||
in_flight_data.ctx.trace_id(),
|
||||
self.server_addr()
|
||||
self.as_mut().server_addr()
|
||||
);
|
||||
let _ = in_flight_data.response_completion.send(response);
|
||||
return true;
|
||||
@@ -500,7 +520,7 @@ where
|
||||
|
||||
debug!(
|
||||
"[{}] No in-flight request found for request_id = {}.",
|
||||
self.server_addr(),
|
||||
self.as_mut().server_addr(),
|
||||
response.request_id
|
||||
);
|
||||
|
||||
@@ -517,15 +537,15 @@ where
|
||||
{
|
||||
type Output = io::Result<()>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
trace!("[{}] RequestDispatch::poll", self.server_addr());
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
trace!("[{}] RequestDispatch::poll", self.as_mut().server_addr());
|
||||
loop {
|
||||
match (self.pump_read(waker)?, self.pump_write(waker)?) {
|
||||
match (self.pump_read(cx)?, self.pump_write(cx)?) {
|
||||
(read, write @ Poll::Ready(None)) => {
|
||||
if self.in_flight_requests().is_empty() {
|
||||
if self.as_mut().in_flight_requests().is_empty() {
|
||||
info!(
|
||||
"[{}] Shutdown: write half closed, and no requests in flight.",
|
||||
self.server_addr()
|
||||
self.as_mut().server_addr()
|
||||
);
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
@@ -534,7 +554,7 @@ where
|
||||
_ => {
|
||||
trace!(
|
||||
"[{}] read: {:?}, write: {:?}, (not ready)",
|
||||
self.server_addr(),
|
||||
self.as_mut().server_addr(),
|
||||
read,
|
||||
write,
|
||||
);
|
||||
@@ -545,7 +565,7 @@ where
|
||||
(read @ Poll::Ready(Some(())), write) | (read, write @ Poll::Ready(Some(()))) => {
|
||||
trace!(
|
||||
"[{}] read: {:?}, write: {:?}",
|
||||
self.server_addr(),
|
||||
self.as_mut().server_addr(),
|
||||
read,
|
||||
write,
|
||||
)
|
||||
@@ -553,7 +573,7 @@ where
|
||||
(read, write) => {
|
||||
trace!(
|
||||
"[{}] read: {:?}, write: {:?} (not ready)",
|
||||
self.server_addr(),
|
||||
self.as_mut().server_addr(),
|
||||
read,
|
||||
write,
|
||||
);
|
||||
@@ -607,8 +627,8 @@ impl RequestCancellation {
|
||||
impl Stream for CanceledRequests {
|
||||
type Item = u64;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<u64>> {
|
||||
self.0.poll_next_unpin(waker)
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<u64>> {
|
||||
self.0.poll_next_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -639,8 +659,8 @@ where
|
||||
{
|
||||
type Output = io::Result<Fut::Ok>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
match self.future().try_poll(lw) {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
match self.as_mut().future().try_poll(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(result) => {
|
||||
self.finished().take().expect(
|
||||
@@ -679,11 +699,12 @@ where
|
||||
{
|
||||
type Output = Result<DispatchResponse<Resp>, Fut::Error>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
match self.future().try_poll(lw) {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
match self.as_mut().future().try_poll(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(result) => {
|
||||
let response = self
|
||||
.as_mut()
|
||||
.response()
|
||||
.take()
|
||||
.expect("MapOk must not be polled after it returned `Poll::Ready`");
|
||||
@@ -721,8 +742,8 @@ where
|
||||
{
|
||||
type Output = Result<Fut2::Ok, Fut2::Error>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
self.try_chain().poll(lw, |result| match result {
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
self.try_chain().poll(cx, |result| match result {
|
||||
Ok(ok) => TryChainAction::Future(ok),
|
||||
Err(err) => TryChainAction::Output(Err(err)),
|
||||
})
|
||||
@@ -754,27 +775,31 @@ where
|
||||
TryChain::First(fut1)
|
||||
}
|
||||
|
||||
fn poll<F>(self: Pin<&mut Self>, lw: &LocalWaker, f: F) -> Poll<Result<Fut2::Ok, Fut2::Error>>
|
||||
fn poll<F>(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
f: F,
|
||||
) -> Poll<Result<Fut2::Ok, Fut2::Error>>
|
||||
where
|
||||
F: FnOnce(Result<Fut1::Ok, Fut1::Error>) -> TryChainAction<Fut2>,
|
||||
{
|
||||
let mut f = Some(f);
|
||||
|
||||
// Safe to call `get_mut_unchecked` because we won't move the futures.
|
||||
let this = unsafe { Pin::get_mut_unchecked(self) };
|
||||
// Safe to call `get_unchecked_mut` because we won't move the futures.
|
||||
let this = unsafe { Pin::get_unchecked_mut(self) };
|
||||
|
||||
loop {
|
||||
let output = match this {
|
||||
TryChain::First(fut1) => {
|
||||
// Poll the first future
|
||||
match unsafe { Pin::new_unchecked(fut1) }.try_poll(lw) {
|
||||
match unsafe { Pin::new_unchecked(fut1) }.try_poll(cx) {
|
||||
Poll::Pending => return Poll::Pending,
|
||||
Poll::Ready(output) => output,
|
||||
}
|
||||
}
|
||||
TryChain::Second(fut2) => {
|
||||
// Poll the second future
|
||||
return unsafe { Pin::new_unchecked(fut2) }.try_poll(lw);
|
||||
return unsafe { Pin::new_unchecked(fut2) }.try_poll(cx);
|
||||
}
|
||||
TryChain::Empty => {
|
||||
panic!("future must not be polled after it returned `Poll::Ready`");
|
||||
@@ -793,7 +818,9 @@ where
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{CanceledRequests, Channel, RequestCancellation, RequestDispatch};
|
||||
use super::{
|
||||
CanceledRequests, Channel, DispatchResponse, RequestCancellation, RequestDispatch,
|
||||
};
|
||||
use crate::{
|
||||
client::Config,
|
||||
context,
|
||||
@@ -801,8 +828,8 @@ mod tests {
|
||||
ClientMessage, Response,
|
||||
};
|
||||
use fnv::FnvHashMap;
|
||||
use futures::{channel::mpsc, prelude::*, Poll};
|
||||
use futures_test::task::noop_local_waker_ref;
|
||||
use futures::{channel::mpsc, prelude::*, task::Context, Poll};
|
||||
use futures_test::task::noop_waker_ref;
|
||||
use std::{
|
||||
marker,
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr},
|
||||
@@ -814,20 +841,12 @@ mod tests {
|
||||
#[test]
|
||||
fn stage_request() {
|
||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||
|
||||
// Test that a request future dropped before it's processed by dispatch will cause the request
|
||||
// to not be added to the in-flight request map.
|
||||
let _resp = tokio::runtime::current_thread::block_on_all(
|
||||
channel
|
||||
.send(context::current(), "hi".to_string())
|
||||
.boxed()
|
||||
.compat(),
|
||||
);
|
||||
|
||||
let mut dispatch = Pin::new(&mut dispatch);
|
||||
let waker = &noop_local_waker_ref();
|
||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||
|
||||
let req = dispatch.poll_next_request(waker).ready();
|
||||
let _resp = send_request(&mut channel, "hi");
|
||||
|
||||
let req = dispatch.poll_next_request(cx).ready();
|
||||
assert!(req.is_some());
|
||||
|
||||
let req = req.unwrap();
|
||||
@@ -835,49 +854,77 @@ mod tests {
|
||||
assert_eq!(req.request, "hi".to_string());
|
||||
}
|
||||
|
||||
// Regression test for https://github.com/google/tarpc/issues/220
|
||||
#[test]
|
||||
fn stage_request_response_future_dropped() {
|
||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||
fn stage_request_channel_dropped_doesnt_panic() {
|
||||
let (mut dispatch, mut channel, mut server_channel) = set_up();
|
||||
let mut dispatch = Pin::new(&mut dispatch);
|
||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||
|
||||
// Test that a request future dropped before it's processed by dispatch will cause the request
|
||||
// to not be added to the in-flight request map.
|
||||
let resp = tokio::runtime::current_thread::block_on_all(
|
||||
channel
|
||||
.send(context::current(), "hi".into())
|
||||
.boxed()
|
||||
.compat(),
|
||||
)
|
||||
.unwrap();
|
||||
drop(resp);
|
||||
let _ = send_request(&mut channel, "hi");
|
||||
drop(channel);
|
||||
|
||||
let mut dispatch = Pin::new(&mut dispatch);
|
||||
let waker = &noop_local_waker_ref();
|
||||
|
||||
dispatch.poll_next_cancellation(waker).unwrap();
|
||||
assert!(dispatch.poll_next_request(waker).ready().is_none());
|
||||
assert!(dispatch.as_mut().poll(cx).is_ready());
|
||||
send_response(
|
||||
&mut server_channel,
|
||||
Response {
|
||||
request_id: 0,
|
||||
message: Ok("hello".into()),
|
||||
},
|
||||
);
|
||||
tokio::runtime::current_thread::block_on_all(dispatch.boxed().compat()).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stage_request_response_future_closed() {
|
||||
fn stage_request_response_future_dropped_is_canceled_before_sending() {
|
||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||
let mut dispatch = Pin::new(&mut dispatch);
|
||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||
|
||||
let _ = send_request(&mut channel, "hi");
|
||||
|
||||
// Drop the channel so polling returns none if no requests are currently ready.
|
||||
drop(channel);
|
||||
// Test that a request future dropped before it's processed by dispatch will cause the request
|
||||
// to not be added to the in-flight request map.
|
||||
assert!(dispatch.poll_next_request(cx).ready().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stage_request_response_future_dropped_is_canceled_after_sending() {
|
||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||
let mut dispatch = Pin::new(&mut dispatch);
|
||||
|
||||
let req = send_request(&mut channel, "hi");
|
||||
|
||||
assert!(dispatch.as_mut().pump_write(cx).ready().is_some());
|
||||
assert!(!dispatch.as_mut().in_flight_requests().is_empty());
|
||||
|
||||
// Test that a request future dropped after it's processed by dispatch will cause the request
|
||||
// to be removed from the in-flight request map.
|
||||
drop(req);
|
||||
if let Poll::Ready(Some(_)) = dispatch.as_mut().poll_next_cancellation(cx).unwrap() {
|
||||
// ok
|
||||
} else {
|
||||
panic!("Expected request to be cancelled")
|
||||
};
|
||||
assert!(dispatch.in_flight_requests().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stage_request_response_closed_skipped() {
|
||||
let (mut dispatch, mut channel, _server_channel) = set_up();
|
||||
let mut dispatch = Pin::new(&mut dispatch);
|
||||
let cx = &mut Context::from_waker(&noop_waker_ref());
|
||||
|
||||
// Test that a request future that's closed its receiver but not yet canceled its request --
|
||||
// i.e. still in `drop fn` -- will cause the request to not be added to the in-flight request
|
||||
// map.
|
||||
let resp = tokio::runtime::current_thread::block_on_all(
|
||||
channel
|
||||
.send(context::current(), "hi".into())
|
||||
.boxed()
|
||||
.compat(),
|
||||
)
|
||||
.unwrap();
|
||||
drop(resp);
|
||||
drop(channel);
|
||||
let mut resp = send_request(&mut channel, "hi");
|
||||
resp.response.get_mut().close();
|
||||
|
||||
let mut dispatch = Pin::new(&mut dispatch);
|
||||
let waker = &noop_local_waker_ref();
|
||||
assert!(dispatch.poll_next_request(waker).ready().is_none());
|
||||
assert!(dispatch.poll_next_request(cx).is_pending());
|
||||
}
|
||||
|
||||
fn set_up() -> (
|
||||
@@ -894,7 +941,7 @@ mod tests {
|
||||
let dispatch = RequestDispatch::<String, String, _> {
|
||||
transport: client_channel.fuse(),
|
||||
pending_requests: pending_requests.fuse(),
|
||||
canceled_requests: CanceledRequests(canceled_requests),
|
||||
canceled_requests: CanceledRequests(canceled_requests).fuse(),
|
||||
in_flight_requests: FnvHashMap::default(),
|
||||
config: Config::default(),
|
||||
server_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
|
||||
@@ -911,6 +958,27 @@ mod tests {
|
||||
(dispatch, channel, server_channel)
|
||||
}
|
||||
|
||||
fn send_request(
|
||||
channel: &mut Channel<String, String>,
|
||||
request: &str,
|
||||
) -> DispatchResponse<String> {
|
||||
tokio::runtime::current_thread::block_on_all(
|
||||
channel
|
||||
.send(context::current(), request.to_string())
|
||||
.boxed()
|
||||
.compat(),
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn send_response(
|
||||
channel: &mut UnboundedChannel<ClientMessage<String>, Response<String>>,
|
||||
response: Response<String>,
|
||||
) {
|
||||
tokio::runtime::current_thread::block_on_all(channel.send(response).boxed().compat())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
trait PollTest {
|
||||
type T;
|
||||
fn unwrap(self) -> Poll<Self::T>;
|
||||
@@ -941,5 +1009,4 @@ mod tests {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -9,10 +9,9 @@
|
||||
integer_atomics,
|
||||
try_trait,
|
||||
futures_api,
|
||||
pin,
|
||||
arbitrary_self_types,
|
||||
await_macro,
|
||||
async_await,
|
||||
async_await
|
||||
)]
|
||||
#![deny(missing_docs, missing_debug_implementations)]
|
||||
|
||||
@@ -43,17 +42,14 @@ pub(crate) mod util;
|
||||
pub use crate::{client::Client, server::Server, transport::Transport};
|
||||
|
||||
use futures::{
|
||||
task::{Spawn, SpawnError, SpawnExt},
|
||||
task::{Poll, Spawn, SpawnError, SpawnExt},
|
||||
Future,
|
||||
};
|
||||
use std::{cell::RefCell, io, sync::Once, time::SystemTime};
|
||||
|
||||
/// A message from a client to a server.
|
||||
#[derive(Debug)]
|
||||
#[cfg_attr(
|
||||
feature = "serde1",
|
||||
derive(serde::Serialize, serde::Deserialize)
|
||||
)]
|
||||
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[non_exhaustive]
|
||||
pub struct ClientMessage<T> {
|
||||
/// The trace context associates the message with a specific chain of causally-related actions,
|
||||
@@ -65,10 +61,7 @@ pub struct ClientMessage<T> {
|
||||
|
||||
/// Different messages that can be sent from a client to a server.
|
||||
#[derive(Debug)]
|
||||
#[cfg_attr(
|
||||
feature = "serde1",
|
||||
derive(serde::Serialize, serde::Deserialize)
|
||||
)]
|
||||
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[non_exhaustive]
|
||||
pub enum ClientMessageKind<T> {
|
||||
/// A request initiated by a user. The server responds to a request by invoking a
|
||||
@@ -90,10 +83,7 @@ pub enum ClientMessageKind<T> {
|
||||
|
||||
/// A request from a client to a server.
|
||||
#[derive(Debug)]
|
||||
#[cfg_attr(
|
||||
feature = "serde1",
|
||||
derive(serde::Serialize, serde::Deserialize)
|
||||
)]
|
||||
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[non_exhaustive]
|
||||
pub struct Request<T> {
|
||||
/// Uniquely identifies the request across all requests sent over a single channel.
|
||||
@@ -115,10 +105,7 @@ pub struct Request<T> {
|
||||
|
||||
/// A response from a server to a client.
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
#[cfg_attr(
|
||||
feature = "serde1",
|
||||
derive(serde::Serialize, serde::Deserialize)
|
||||
)]
|
||||
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[non_exhaustive]
|
||||
pub struct Response<T> {
|
||||
/// The ID of the request being responded to.
|
||||
@@ -129,10 +116,7 @@ pub struct Response<T> {
|
||||
|
||||
/// An error response from a server to a client.
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
#[cfg_attr(
|
||||
feature = "serde1",
|
||||
derive(serde::Serialize, serde::Deserialize)
|
||||
)]
|
||||
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[non_exhaustive]
|
||||
pub struct ServerError {
|
||||
#[cfg_attr(
|
||||
@@ -162,6 +146,8 @@ impl<T> Request<T> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) type PollIo<T> = Poll<Option<io::Result<T>>>;
|
||||
|
||||
static INIT: Once = Once::new();
|
||||
static mut SEED_SPAWN: Option<Box<dyn CloneSpawn>> = None;
|
||||
thread_local! {
|
||||
@@ -170,7 +156,7 @@ thread_local! {
|
||||
// INIT must always be called before accessing SPAWN.
|
||||
// Otherwise, accessing SPAWN can trigger undefined behavior due to race conditions.
|
||||
INIT.call_once(|| {});
|
||||
RefCell::new(SEED_SPAWN.clone().expect("init() must be called."))
|
||||
RefCell::new(SEED_SPAWN.as_ref().expect("init() must be called.").box_clone())
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -196,12 +182,6 @@ trait CloneSpawn: Spawn {
|
||||
fn box_clone(&self) -> Box<dyn CloneSpawn>;
|
||||
}
|
||||
|
||||
impl Clone for Box<dyn CloneSpawn> {
|
||||
fn clone(&self) -> Self {
|
||||
self.box_clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: Spawn + Clone + 'static> CloneSpawn for S {
|
||||
fn box_clone(&self) -> Box<dyn CloneSpawn> {
|
||||
Box::new(self.clone())
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
use crate::{
|
||||
server::{Channel, Config},
|
||||
util::Compact,
|
||||
ClientMessage, Response, Transport,
|
||||
ClientMessage, PollIo, Response, Transport,
|
||||
};
|
||||
use fnv::FnvHashMap;
|
||||
use futures::{
|
||||
@@ -15,7 +15,7 @@ use futures::{
|
||||
prelude::*,
|
||||
ready,
|
||||
stream::Fuse,
|
||||
task::{LocalWaker, Poll},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use pin_utils::unsafe_pinned;
|
||||
@@ -107,28 +107,28 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
|
||||
}
|
||||
};
|
||||
|
||||
let open_connections = *self.open_connections();
|
||||
if open_connections >= self.config().max_connections {
|
||||
let open_connections = *self.as_mut().open_connections();
|
||||
if open_connections >= self.as_mut().config().max_connections {
|
||||
warn!(
|
||||
"[{}] Shedding connection because the maximum open connections \
|
||||
limit is reached ({}/{}).",
|
||||
peer,
|
||||
open_connections,
|
||||
self.config().max_connections
|
||||
self.as_mut().config().max_connections
|
||||
);
|
||||
return NewConnection::Filtered;
|
||||
}
|
||||
|
||||
let config = self.config.clone();
|
||||
let open_connections_for_ip = self.increment_connections_for_ip(&peer)?;
|
||||
*self.open_connections() += 1;
|
||||
*self.as_mut().open_connections() += 1;
|
||||
|
||||
debug!(
|
||||
"[{}] Opening channel ({}/{} connections for IP, {} total).",
|
||||
peer,
|
||||
open_connections_for_ip,
|
||||
config.max_connections_per_ip,
|
||||
self.open_connections(),
|
||||
self.as_mut().open_connections(),
|
||||
);
|
||||
|
||||
NewConnection::Accepted(Channel {
|
||||
@@ -141,19 +141,19 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
|
||||
}
|
||||
|
||||
fn handle_closed_connection(self: &mut Pin<&mut Self>, addr: &SocketAddr) {
|
||||
*self.open_connections() -= 1;
|
||||
*self.as_mut().open_connections() -= 1;
|
||||
debug!(
|
||||
"[{}] Closing channel. {} open connections remaining.",
|
||||
addr, self.open_connections
|
||||
);
|
||||
self.decrement_connections_for_ip(&addr);
|
||||
self.connections_per_ip().compact(0.1);
|
||||
self.as_mut().connections_per_ip().compact(0.1);
|
||||
}
|
||||
|
||||
fn increment_connections_for_ip(self: &mut Pin<&mut Self>, peer: &SocketAddr) -> Option<usize> {
|
||||
let max_connections_per_ip = self.config().max_connections_per_ip;
|
||||
let max_connections_per_ip = self.as_mut().config().max_connections_per_ip;
|
||||
let mut occupied;
|
||||
let mut connections_per_ip = self.connections_per_ip();
|
||||
let mut connections_per_ip = self.as_mut().connections_per_ip();
|
||||
let occupied = match connections_per_ip.entry(peer.ip()) {
|
||||
Entry::Vacant(vacant) => vacant.insert(0),
|
||||
Entry::Occupied(o) => {
|
||||
@@ -177,7 +177,7 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
|
||||
}
|
||||
|
||||
fn decrement_connections_for_ip(self: &mut Pin<&mut Self>, addr: &SocketAddr) {
|
||||
let should_compact = match self.connections_per_ip().entry(addr.ip()) {
|
||||
let should_compact = match self.as_mut().connections_per_ip().entry(addr.ip()) {
|
||||
Entry::Vacant(_) => {
|
||||
error!("[{}] Got vacant entry when closing connection.", addr);
|
||||
return;
|
||||
@@ -193,26 +193,29 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
|
||||
}
|
||||
};
|
||||
if should_compact {
|
||||
self.connections_per_ip().compact(0.1);
|
||||
self.as_mut().connections_per_ip().compact(0.1);
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_listener<C>(
|
||||
self: &mut Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
) -> Poll<Option<io::Result<NewConnection<Req, Resp, C>>>>
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> PollIo<NewConnection<Req, Resp, C>>
|
||||
where
|
||||
S: Stream<Item = Result<C, io::Error>>,
|
||||
C: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
|
||||
{
|
||||
match ready!(self.listener().poll_next_unpin(cx)?) {
|
||||
match ready!(self.as_mut().listener().poll_next_unpin(cx)?) {
|
||||
Some(codec) => Poll::Ready(Some(Ok(self.handle_new_connection(codec)))),
|
||||
None => Poll::Ready(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_closed_connections(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
match ready!(self.closed_connections_rx().poll_next_unpin(cx)) {
|
||||
fn poll_closed_connections(
|
||||
self: &mut Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
match ready!(self.as_mut().closed_connections_rx().poll_next_unpin(cx)) {
|
||||
Some(addr) => {
|
||||
self.handle_closed_connection(&addr);
|
||||
Poll::Ready(Ok(()))
|
||||
@@ -229,25 +232,28 @@ where
|
||||
{
|
||||
type Item = io::Result<Channel<Req, Resp, T>>;
|
||||
|
||||
fn poll_next(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
) -> Poll<Option<io::Result<Channel<Req, Resp, T>>>> {
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<Channel<Req, Resp, T>> {
|
||||
loop {
|
||||
match (self.poll_listener(cx)?, self.poll_closed_connections(cx)?) {
|
||||
match (
|
||||
self.as_mut().poll_listener(cx)?,
|
||||
self.poll_closed_connections(cx)?,
|
||||
) {
|
||||
(Poll::Ready(Some(NewConnection::Accepted(channel))), _) => {
|
||||
return Poll::Ready(Some(Ok(channel)))
|
||||
return Poll::Ready(Some(Ok(channel)));
|
||||
}
|
||||
(Poll::Ready(Some(NewConnection::Filtered)), _) | (_, Poll::Ready(())) => {
|
||||
trace!("Filtered a connection; {} open.", self.open_connections());
|
||||
trace!(
|
||||
"Filtered a connection; {} open.",
|
||||
self.as_mut().open_connections()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
(Poll::Pending, Poll::Pending) => return Poll::Pending,
|
||||
(Poll::Ready(None), Poll::Pending) => {
|
||||
if *self.open_connections() > 0 {
|
||||
if *self.as_mut().open_connections() > 0 {
|
||||
trace!(
|
||||
"Listener closed; {} open connections.",
|
||||
self.open_connections()
|
||||
self.as_mut().open_connections()
|
||||
);
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
@@ -7,8 +7,8 @@
|
||||
//! Provides a server that concurrently handles many connections sending multiplexed requests.
|
||||
|
||||
use crate::{
|
||||
context::Context, util::deadline_compat, util::AsDuration, util::Compact, ClientMessage,
|
||||
ClientMessageKind, Request, Response, ServerError, Transport,
|
||||
context, util::deadline_compat, util::AsDuration, util::Compact, ClientMessage,
|
||||
ClientMessageKind, PollIo, Request, Response, ServerError, Transport,
|
||||
};
|
||||
use fnv::FnvHashMap;
|
||||
use futures::{
|
||||
@@ -17,7 +17,7 @@ use futures::{
|
||||
prelude::*,
|
||||
ready,
|
||||
stream::Fuse,
|
||||
task::{LocalWaker, Poll},
|
||||
task::{Context, Poll},
|
||||
try_ready,
|
||||
};
|
||||
use humantime::format_rfc3339;
|
||||
@@ -128,18 +128,18 @@ where
|
||||
Req: Send + 'static,
|
||||
Resp: Send + 'static,
|
||||
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send + 'static,
|
||||
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone,
|
||||
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
{
|
||||
type Output = ();
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<()> {
|
||||
while let Some(channel) = ready!(self.incoming().poll_next(cx)) {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
|
||||
while let Some(channel) = ready!(self.as_mut().incoming().poll_next(cx)) {
|
||||
match channel {
|
||||
Ok(channel) => {
|
||||
let peer = channel.client_addr;
|
||||
if let Err(e) =
|
||||
crate::spawn(channel.respond_with(self.request_handler().clone()))
|
||||
crate::spawn(channel.respond_with(self.as_mut().request_handler().clone()))
|
||||
{
|
||||
warn!("[{}] Failed to spawn connection handler: {:?}", peer, e);
|
||||
}
|
||||
@@ -150,7 +150,7 @@ where
|
||||
}
|
||||
}
|
||||
info!("Server shutting down.");
|
||||
return Poll::Ready(());
|
||||
Poll::Ready(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -165,7 +165,7 @@ where
|
||||
/// Responds to all requests with `request_handler`.
|
||||
fn respond_with<F, Fut>(self, request_handler: F) -> Running<Self, F>
|
||||
where
|
||||
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone,
|
||||
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
{
|
||||
Running {
|
||||
@@ -181,7 +181,8 @@ where
|
||||
Req: Send,
|
||||
Resp: Send,
|
||||
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
|
||||
{}
|
||||
{
|
||||
}
|
||||
|
||||
/// Responds to all requests with `request_handler`.
|
||||
/// The server end of an open connection with a client.
|
||||
@@ -229,26 +230,29 @@ where
|
||||
Req: Send,
|
||||
Resp: Send,
|
||||
{
|
||||
pub(crate) fn start_send(
|
||||
self: &mut Pin<&mut Self>,
|
||||
response: Response<Resp>,
|
||||
) -> io::Result<()> {
|
||||
self.transport().start_send(response)
|
||||
pub(crate) fn start_send(mut self: Pin<&mut Self>, response: Response<Resp>) -> io::Result<()> {
|
||||
self.as_mut().transport().start_send(response)
|
||||
}
|
||||
|
||||
pub(crate) fn poll_ready(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
self.transport().poll_ready(cx)
|
||||
pub(crate) fn poll_ready(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
self.as_mut().transport().poll_ready(cx)
|
||||
}
|
||||
|
||||
pub(crate) fn poll_flush(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
self.transport().poll_flush(cx)
|
||||
pub(crate) fn poll_flush(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
self.as_mut().transport().poll_flush(cx)
|
||||
}
|
||||
|
||||
pub(crate) fn poll_next(
|
||||
self: &mut Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
) -> Poll<Option<io::Result<ClientMessage<Req>>>> {
|
||||
self.transport().poll_next(cx)
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> PollIo<ClientMessage<Req>> {
|
||||
self.as_mut().transport().poll_next(cx)
|
||||
}
|
||||
|
||||
/// Returns the address of the client connected to the channel.
|
||||
@@ -260,7 +264,7 @@ where
|
||||
/// responses and resolves when the connection is closed.
|
||||
pub fn respond_with<F, Fut>(self, f: F) -> impl Future<Output = ()>
|
||||
where
|
||||
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone,
|
||||
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
Req: 'static,
|
||||
Resp: 'static,
|
||||
@@ -286,9 +290,9 @@ where
|
||||
struct ClientHandler<Req, Resp, T, F> {
|
||||
channel: Channel<Req, Resp, T>,
|
||||
/// Responses waiting to be written to the wire.
|
||||
pending_responses: Fuse<mpsc::Receiver<(Context, Response<Resp>)>>,
|
||||
pending_responses: Fuse<mpsc::Receiver<(context::Context, Response<Resp>)>>,
|
||||
/// Handed out to request handlers to fan in responses.
|
||||
responses_tx: mpsc::Sender<(Context, Response<Resp>)>,
|
||||
responses_tx: mpsc::Sender<(context::Context, Response<Resp>)>,
|
||||
/// Number of requests currently being responded to.
|
||||
in_flight_requests: FnvHashMap<u64, AbortHandle>,
|
||||
/// Request handler.
|
||||
@@ -298,8 +302,8 @@ struct ClientHandler<Req, Resp, T, F> {
|
||||
impl<Req, Resp, T, F> ClientHandler<Req, Resp, T, F> {
|
||||
unsafe_pinned!(channel: Channel<Req, Resp, T>);
|
||||
unsafe_pinned!(in_flight_requests: FnvHashMap<u64, AbortHandle>);
|
||||
unsafe_pinned!(pending_responses: Fuse<mpsc::Receiver<(Context, Response<Resp>)>>);
|
||||
unsafe_pinned!(responses_tx: mpsc::Sender<(Context, Response<Resp>)>);
|
||||
unsafe_pinned!(pending_responses: Fuse<mpsc::Receiver<(context::Context, Response<Resp>)>>);
|
||||
unsafe_pinned!(responses_tx: mpsc::Sender<(context::Context, Response<Resp>)>);
|
||||
// For this to be safe, field f must be private, and code in this module must never
|
||||
// construct PinMut<F>.
|
||||
unsafe_unpinned!(f: F);
|
||||
@@ -310,36 +314,36 @@ where
|
||||
Req: Send + 'static,
|
||||
Resp: Send + 'static,
|
||||
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
|
||||
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone,
|
||||
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
{
|
||||
/// If at max in-flight requests, check that there's room to immediately write a throttled
|
||||
/// response.
|
||||
fn poll_ready_if_throttling(
|
||||
self: &mut Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
if self.in_flight_requests.len()
|
||||
>= self.channel.config.max_in_flight_requests_per_connection
|
||||
{
|
||||
let peer = self.channel().client_addr;
|
||||
let peer = self.as_mut().channel().client_addr;
|
||||
|
||||
while let Poll::Pending = self.channel().poll_ready(cx)? {
|
||||
while let Poll::Pending = self.as_mut().channel().poll_ready(cx)? {
|
||||
info!(
|
||||
"[{}] In-flight requests at max ({}), and transport is not ready.",
|
||||
peer,
|
||||
self.in_flight_requests().len(),
|
||||
self.as_mut().in_flight_requests().len(),
|
||||
);
|
||||
try_ready!(self.channel().poll_flush(cx));
|
||||
try_ready!(self.as_mut().channel().poll_flush(cx));
|
||||
}
|
||||
}
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn pump_read(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<Option<io::Result<()>>> {
|
||||
ready!(self.poll_ready_if_throttling(cx)?);
|
||||
fn pump_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> {
|
||||
ready!(self.as_mut().poll_ready_if_throttling(cx)?);
|
||||
|
||||
Poll::Ready(match ready!(self.channel().poll_next(cx)?) {
|
||||
Poll::Ready(match ready!(self.as_mut().channel().poll_next(cx)?) {
|
||||
Some(message) => {
|
||||
match message.message {
|
||||
ClientMessageKind::Request(request) => {
|
||||
@@ -359,28 +363,28 @@ where
|
||||
}
|
||||
|
||||
fn pump_write(
|
||||
self: &mut Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
read_half_closed: bool,
|
||||
) -> Poll<Option<io::Result<()>>> {
|
||||
match self.poll_next_response(cx)? {
|
||||
) -> PollIo<()> {
|
||||
match self.as_mut().poll_next_response(cx)? {
|
||||
Poll::Ready(Some((_, response))) => {
|
||||
self.channel().start_send(response)?;
|
||||
self.as_mut().channel().start_send(response)?;
|
||||
Poll::Ready(Some(Ok(())))
|
||||
}
|
||||
Poll::Ready(None) => {
|
||||
// Shutdown can't be done before we finish pumping out remaining responses.
|
||||
ready!(self.channel().poll_flush(cx)?);
|
||||
ready!(self.as_mut().channel().poll_flush(cx)?);
|
||||
Poll::Ready(None)
|
||||
}
|
||||
Poll::Pending => {
|
||||
// No more requests to process, so flush any requests buffered in the transport.
|
||||
ready!(self.channel().poll_flush(cx)?);
|
||||
ready!(self.as_mut().channel().poll_flush(cx)?);
|
||||
|
||||
// Being here means there are no staged requests and all written responses are
|
||||
// fully flushed. So, if the read half is closed and there are no in-flight
|
||||
// requests, then we can close the write half.
|
||||
if read_half_closed && self.in_flight_requests().is_empty() {
|
||||
if read_half_closed && self.as_mut().in_flight_requests().is_empty() {
|
||||
Poll::Ready(None)
|
||||
} else {
|
||||
Poll::Pending
|
||||
@@ -390,28 +394,33 @@ where
|
||||
}
|
||||
|
||||
fn poll_next_response(
|
||||
self: &mut Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
) -> Poll<Option<io::Result<(Context, Response<Resp>)>>> {
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> PollIo<(context::Context, Response<Resp>)> {
|
||||
// Ensure there's room to write a response.
|
||||
while let Poll::Pending = self.channel().poll_ready(cx)? {
|
||||
ready!(self.channel().poll_flush(cx)?);
|
||||
while let Poll::Pending = self.as_mut().channel().poll_ready(cx)? {
|
||||
ready!(self.as_mut().channel().poll_flush(cx)?);
|
||||
}
|
||||
|
||||
let peer = self.channel().client_addr;
|
||||
let peer = self.as_mut().channel().client_addr;
|
||||
|
||||
match ready!(self.pending_responses().poll_next(cx)) {
|
||||
match ready!(self.as_mut().pending_responses().poll_next(cx)) {
|
||||
Some((ctx, response)) => {
|
||||
if let Some(_) = self.in_flight_requests().remove(&response.request_id) {
|
||||
self.in_flight_requests().compact(0.1);
|
||||
if self
|
||||
.as_mut()
|
||||
.in_flight_requests()
|
||||
.remove(&response.request_id)
|
||||
.is_some()
|
||||
{
|
||||
self.as_mut().in_flight_requests().compact(0.1);
|
||||
}
|
||||
trace!(
|
||||
"[{}/{}] Staging response. In-flight requests = {}.",
|
||||
ctx.trace_id(),
|
||||
peer,
|
||||
self.in_flight_requests().len(),
|
||||
self.as_mut().in_flight_requests().len(),
|
||||
);
|
||||
return Poll::Ready(Some(Ok((ctx, response))));
|
||||
Poll::Ready(Some(Ok((ctx, response))))
|
||||
}
|
||||
None => {
|
||||
// This branch likely won't happen, since the ClientHandler is holding a Sender.
|
||||
@@ -422,30 +431,37 @@ where
|
||||
}
|
||||
|
||||
fn handle_request(
|
||||
self: &mut Pin<&mut Self>,
|
||||
mut self: Pin<&mut Self>,
|
||||
trace_context: trace::Context,
|
||||
request: Request<Req>,
|
||||
) -> io::Result<()> {
|
||||
let request_id = request.id;
|
||||
let peer = self.channel().client_addr;
|
||||
let ctx = Context {
|
||||
let peer = self.as_mut().channel().client_addr;
|
||||
let ctx = context::Context {
|
||||
deadline: request.deadline,
|
||||
trace_context,
|
||||
};
|
||||
let request = request.message;
|
||||
|
||||
if self.in_flight_requests().len()
|
||||
>= self.channel().config.max_in_flight_requests_per_connection
|
||||
if self.as_mut().in_flight_requests().len()
|
||||
>= self
|
||||
.as_mut()
|
||||
.channel()
|
||||
.config
|
||||
.max_in_flight_requests_per_connection
|
||||
{
|
||||
debug!(
|
||||
"[{}/{}] Client has reached in-flight request limit ({}/{}).",
|
||||
ctx.trace_id(),
|
||||
peer,
|
||||
self.in_flight_requests().len(),
|
||||
self.channel().config.max_in_flight_requests_per_connection
|
||||
self.as_mut().in_flight_requests().len(),
|
||||
self.as_mut()
|
||||
.channel()
|
||||
.config
|
||||
.max_in_flight_requests_per_connection
|
||||
);
|
||||
|
||||
self.channel().start_send(Response {
|
||||
self.as_mut().channel().start_send(Response {
|
||||
request_id,
|
||||
message: Err(ServerError {
|
||||
kind: io::ErrorKind::WouldBlock,
|
||||
@@ -464,10 +480,10 @@ where
|
||||
format_rfc3339(deadline),
|
||||
timeout,
|
||||
);
|
||||
let mut response_tx = self.responses_tx().clone();
|
||||
let mut response_tx = self.as_mut().responses_tx().clone();
|
||||
|
||||
let trace_id = *ctx.trace_id();
|
||||
let response = self.f().clone()(ctx.clone(), request);
|
||||
let response = self.as_mut().f().clone()(ctx, request);
|
||||
let response = deadline_compat::Deadline::new(response, Instant::now() + timeout).then(
|
||||
async move |result| {
|
||||
let response = Response {
|
||||
@@ -491,18 +507,20 @@ where
|
||||
),
|
||||
)
|
||||
})?;
|
||||
self.in_flight_requests().insert(request_id, abort_handle);
|
||||
self.as_mut()
|
||||
.in_flight_requests()
|
||||
.insert(request_id, abort_handle);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn cancel_request(self: &mut Pin<&mut Self>, trace_context: &trace::Context, request_id: u64) {
|
||||
fn cancel_request(mut self: Pin<&mut Self>, trace_context: &trace::Context, request_id: u64) {
|
||||
// It's possible the request was already completed, so it's fine
|
||||
// if this is None.
|
||||
if let Some(cancel_handle) = self.in_flight_requests().remove(&request_id) {
|
||||
self.in_flight_requests().compact(0.1);
|
||||
if let Some(cancel_handle) = self.as_mut().in_flight_requests().remove(&request_id) {
|
||||
self.as_mut().in_flight_requests().compact(0.1);
|
||||
|
||||
cancel_handle.abort();
|
||||
let remaining = self.in_flight_requests().len();
|
||||
let remaining = self.as_mut().in_flight_requests().len();
|
||||
trace!(
|
||||
"[{}/{}] Request canceled. In-flight requests = {}",
|
||||
trace_context.trace_id,
|
||||
@@ -525,16 +543,19 @@ where
|
||||
Req: Send + 'static,
|
||||
Resp: Send + 'static,
|
||||
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
|
||||
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone,
|
||||
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
{
|
||||
type Output = io::Result<()>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
trace!("[{}] ClientHandler::poll", self.channel.client_addr);
|
||||
loop {
|
||||
let read = self.pump_read(cx)?;
|
||||
match (read, self.pump_write(cx, read == Poll::Ready(None))?) {
|
||||
let read = self.as_mut().pump_read(cx)?;
|
||||
match (
|
||||
read,
|
||||
self.as_mut().pump_write(cx, read == Poll::Ready(None))?,
|
||||
) {
|
||||
(Poll::Ready(None), Poll::Ready(None)) => {
|
||||
info!("[{}] Client disconnected.", self.channel.client_addr);
|
||||
return Poll::Ready(Ok(()));
|
||||
|
||||
@@ -6,8 +6,8 @@
|
||||
|
||||
//! Transports backed by in-memory channels.
|
||||
|
||||
use crate::Transport;
|
||||
use futures::{channel::mpsc, task::LocalWaker, Poll, Sink, Stream};
|
||||
use crate::{PollIo, Transport};
|
||||
use futures::{channel::mpsc, task::Context, Poll, Sink, Stream};
|
||||
use pin_utils::unsafe_pinned;
|
||||
use std::pin::Pin;
|
||||
use std::{
|
||||
@@ -45,34 +45,33 @@ impl<Item, SinkItem> UnboundedChannel<Item, SinkItem> {
|
||||
impl<Item, SinkItem> Stream for UnboundedChannel<Item, SinkItem> {
|
||||
type Item = Result<Item, io::Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<Option<io::Result<Item>>> {
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<Item> {
|
||||
self.rx().poll_next(cx).map(|option| option.map(Ok))
|
||||
}
|
||||
}
|
||||
|
||||
impl<Item, SinkItem> Sink for UnboundedChannel<Item, SinkItem> {
|
||||
type SinkItem = SinkItem;
|
||||
impl<Item, SinkItem> Sink<SinkItem> for UnboundedChannel<Item, SinkItem> {
|
||||
type SinkError = io::Error;
|
||||
|
||||
fn poll_ready(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
self.tx()
|
||||
.poll_ready(cx)
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
|
||||
}
|
||||
|
||||
fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
|
||||
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
|
||||
self.tx()
|
||||
.start_send(item)
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<Result<(), Self::SinkError>> {
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::SinkError>> {
|
||||
self.tx()
|
||||
.poll_flush(cx)
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
|
||||
}
|
||||
|
||||
fn poll_close(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
self.tx()
|
||||
.poll_close(cx)
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
|
||||
@@ -80,8 +79,8 @@ impl<Item, SinkItem> Sink for UnboundedChannel<Item, SinkItem> {
|
||||
}
|
||||
|
||||
impl<Item, SinkItem> Transport for UnboundedChannel<Item, SinkItem> {
|
||||
type Item = Item;
|
||||
type SinkItem = SinkItem;
|
||||
type Item = Item;
|
||||
|
||||
fn peer_addr(&self) -> io::Result<SocketAddr> {
|
||||
Ok(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
|
||||
@@ -99,14 +98,15 @@ mod tests {
|
||||
server::{Handler, Server},
|
||||
transport,
|
||||
};
|
||||
use futures::{compat::TokioDefaultSpawner, prelude::*, stream};
|
||||
use futures::compat::Executor01CompatExt;
|
||||
use futures::{prelude::*, stream};
|
||||
use log::trace;
|
||||
use std::io;
|
||||
|
||||
#[test]
|
||||
fn integration() {
|
||||
let _ = env_logger::try_init();
|
||||
crate::init(TokioDefaultSpawner);
|
||||
crate::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
let (client_channel, server_channel) = transport::channel::unbounded();
|
||||
let server = Server::<String, u64>::default()
|
||||
@@ -129,8 +129,11 @@ mod tests {
|
||||
Ok::<_, io::Error>((response1, response2))
|
||||
};
|
||||
|
||||
let (response1, response2) =
|
||||
run_future(server.join(responses.unwrap_or_else(|e| panic!(e)))).1;
|
||||
let (response1, response2) = run_future(future::join(
|
||||
server,
|
||||
responses.unwrap_or_else(|e| panic!(e)),
|
||||
))
|
||||
.1;
|
||||
|
||||
trace!("response1: {:?}, response2: {:?}", response1, response2);
|
||||
|
||||
|
||||
@@ -12,9 +12,10 @@
|
||||
use futures::prelude::*;
|
||||
use std::{
|
||||
io,
|
||||
marker::PhantomData,
|
||||
net::SocketAddr,
|
||||
pin::Pin,
|
||||
task::{LocalWaker, Poll},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
pub mod channel;
|
||||
@@ -23,7 +24,7 @@ pub mod channel;
|
||||
pub trait Transport
|
||||
where
|
||||
Self: Stream<Item = io::Result<<Self as Transport>::Item>>,
|
||||
Self: Sink<SinkItem = <Self as Transport>::SinkItem, SinkError = io::Error>,
|
||||
Self: Sink<<Self as Transport>::SinkItem, SinkError = io::Error>,
|
||||
{
|
||||
/// The type read off the transport.
|
||||
type Item;
|
||||
@@ -37,77 +38,78 @@ where
|
||||
}
|
||||
|
||||
/// Returns a new Transport backed by the given Stream + Sink and connecting addresses.
|
||||
pub fn new<S, Item>(
|
||||
pub fn new<S, SinkItem, Item>(
|
||||
inner: S,
|
||||
peer_addr: SocketAddr,
|
||||
local_addr: SocketAddr,
|
||||
) -> impl Transport<Item = Item, SinkItem = S::SinkItem>
|
||||
) -> impl Transport<Item = Item, SinkItem = SinkItem>
|
||||
where
|
||||
S: Stream<Item = io::Result<Item>>,
|
||||
S: Sink<SinkError = io::Error>,
|
||||
S: Sink<SinkItem, SinkError = io::Error>,
|
||||
{
|
||||
TransportShim {
|
||||
inner,
|
||||
peer_addr,
|
||||
local_addr,
|
||||
_marker: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// A transport created by adding peers to a Stream + Sink.
|
||||
#[derive(Debug)]
|
||||
struct TransportShim<S> {
|
||||
struct TransportShim<S, SinkItem> {
|
||||
peer_addr: SocketAddr,
|
||||
local_addr: SocketAddr,
|
||||
inner: S,
|
||||
_marker: PhantomData<SinkItem>,
|
||||
}
|
||||
|
||||
impl<S> TransportShim<S> {
|
||||
impl<S, SinkItem> TransportShim<S, SinkItem> {
|
||||
pin_utils::unsafe_pinned!(inner: S);
|
||||
}
|
||||
|
||||
impl<S> Stream for TransportShim<S>
|
||||
impl<S, SinkItem> Stream for TransportShim<S, SinkItem>
|
||||
where
|
||||
S: Stream,
|
||||
{
|
||||
type Item = S::Item;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<S::Item>> {
|
||||
self.inner().poll_next(waker)
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
|
||||
self.inner().poll_next(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Sink for TransportShim<S>
|
||||
impl<S, Item> Sink<Item> for TransportShim<S, Item>
|
||||
where
|
||||
S: Sink,
|
||||
S: Sink<Item>,
|
||||
{
|
||||
type SinkItem = S::SinkItem;
|
||||
type SinkError = S::SinkError;
|
||||
|
||||
fn start_send(mut self: Pin<&mut Self>, item: S::SinkItem) -> Result<(), S::SinkError> {
|
||||
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), S::SinkError> {
|
||||
self.inner().start_send(item)
|
||||
}
|
||||
|
||||
fn poll_ready(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
self.inner().poll_ready(waker)
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::SinkError>> {
|
||||
self.inner().poll_ready(cx)
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
self.inner().poll_flush(waker)
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::SinkError>> {
|
||||
self.inner().poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_close(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
self.inner().poll_close(waker)
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::SinkError>> {
|
||||
self.inner().poll_close(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, Item> Transport for TransportShim<S>
|
||||
impl<S, SinkItem, Item> Transport for TransportShim<S, SinkItem>
|
||||
where
|
||||
S: Stream + Sink,
|
||||
S: Stream + Sink<SinkItem>,
|
||||
Self: Stream<Item = io::Result<Item>>,
|
||||
Self: Sink<SinkItem = S::SinkItem, SinkError = io::Error>,
|
||||
Self: Sink<SinkItem, SinkError = io::Error>,
|
||||
{
|
||||
type Item = Item;
|
||||
type SinkItem = S::SinkItem;
|
||||
type SinkItem = SinkItem;
|
||||
|
||||
/// The address of the remote peer this transport is in communication with.
|
||||
fn peer_addr(&self) -> io::Result<SocketAddr> {
|
||||
|
||||
@@ -5,10 +5,10 @@
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
use futures::{
|
||||
compat::{Compat01As03, Future01CompatExt},
|
||||
compat::*,
|
||||
prelude::*,
|
||||
ready,
|
||||
task::{LocalWaker, Poll},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use pin_utils::unsafe_pinned;
|
||||
use std::pin::Pin;
|
||||
@@ -50,15 +50,15 @@ where
|
||||
{
|
||||
type Output = Result<T::Ok, timeout::Error<T::Error>>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Self::Output> {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
// First, try polling the future
|
||||
match self.future().try_poll(waker) {
|
||||
match self.as_mut().future().try_poll(cx) {
|
||||
Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)),
|
||||
Poll::Pending => {}
|
||||
Poll::Ready(Err(e)) => return Poll::Ready(Err(timeout::Error::inner(e))),
|
||||
}
|
||||
|
||||
let delay = self.delay().poll_unpin(waker);
|
||||
let delay = self.delay().poll_unpin(cx);
|
||||
|
||||
// Now check the timer
|
||||
match ready!(delay) {
|
||||
|
||||
@@ -31,6 +31,7 @@ where
|
||||
}
|
||||
|
||||
/// Serializes [`io::ErrorKind`] as a `u32`.
|
||||
#[allow(clippy::trivially_copy_pass_by_ref)] // Exact fn signature required by serde derive
|
||||
pub fn serialize_io_error_kind_as_u32<S>(
|
||||
kind: &io::ErrorKind,
|
||||
serializer: S,
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
cargo-features = ["rename-dependency"]
|
||||
|
||||
[package]
|
||||
name = "tarpc"
|
||||
version = "0.14.1"
|
||||
version = "0.16.0"
|
||||
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
|
||||
edition = "2018"
|
||||
license = "MIT"
|
||||
@@ -21,22 +19,20 @@ serde1 = ["rpc/serde1", "serde", "serde/derive"]
|
||||
travis-ci = { repository = "google/tarpc" }
|
||||
|
||||
[dependencies]
|
||||
futures-preview = { version = "0.3.0-alpha.14", features = ["compat"] }
|
||||
log = "0.4"
|
||||
serde = { optional = true, version = "1.0" }
|
||||
rpc = { package = "tarpc-lib", path = "../rpc", version = "0.4" }
|
||||
tarpc-plugins = { path = "../plugins", version = "0.5.0" }
|
||||
rpc = { package = "tarpc-lib", path = "../rpc", version = "0.2" }
|
||||
|
||||
[target.'cfg(not(test))'.dependencies]
|
||||
futures-preview = "0.3.0-alpha.9"
|
||||
|
||||
[dev-dependencies]
|
||||
bincode = "1.0"
|
||||
bincode = "1"
|
||||
bytes = { version = "0.4", features = ["serde"] }
|
||||
humantime = "1.0"
|
||||
futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] }
|
||||
bincode-transport = { package = "tarpc-bincode-transport", version = "0.3", path = "../bincode-transport" }
|
||||
env_logger = "0.5"
|
||||
bincode-transport = { package = "tarpc-bincode-transport", version = "0.5", path = "../bincode-transport" }
|
||||
env_logger = "0.6"
|
||||
libtest = "0.0.1"
|
||||
tokio = "0.1"
|
||||
tokio-executor = "0.1"
|
||||
tokio-tcp = "0.1"
|
||||
pin-utils = "0.1.0-alpha.3"
|
||||
pin-utils = "0.1.0-alpha.4"
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
|
||||
#![feature(
|
||||
arbitrary_self_types,
|
||||
pin,
|
||||
futures_api,
|
||||
await_macro,
|
||||
async_await,
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
|
||||
#![feature(
|
||||
futures_api,
|
||||
pin,
|
||||
arbitrary_self_types,
|
||||
await_macro,
|
||||
async_await,
|
||||
@@ -14,7 +13,7 @@
|
||||
)]
|
||||
|
||||
use futures::{
|
||||
compat::TokioDefaultSpawner,
|
||||
compat::Executor01CompatExt,
|
||||
future::{self, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
@@ -83,7 +82,7 @@ async fn run() -> io::Result<()> {
|
||||
}
|
||||
|
||||
fn main() {
|
||||
tarpc::init(TokioDefaultSpawner);
|
||||
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
tokio::run(
|
||||
run()
|
||||
|
||||
@@ -7,7 +7,6 @@
|
||||
#![feature(
|
||||
existential_type,
|
||||
arbitrary_self_types,
|
||||
pin,
|
||||
futures_api,
|
||||
await_macro,
|
||||
async_await,
|
||||
@@ -16,7 +15,7 @@
|
||||
|
||||
use crate::{add::Service as AddService, double::Service as DoubleService};
|
||||
use futures::{
|
||||
compat::TokioDefaultSpawner,
|
||||
compat::Executor01CompatExt,
|
||||
future::{self, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
@@ -103,6 +102,6 @@ async fn run() -> io::Result<()> {
|
||||
|
||||
fn main() {
|
||||
env_logger::init();
|
||||
tarpc::init(TokioDefaultSpawner);
|
||||
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
tokio::run(run().map_err(|e| panic!(e)).boxed().compat());
|
||||
}
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
#![feature(
|
||||
pin,
|
||||
async_await,
|
||||
await_macro,
|
||||
futures_api,
|
||||
@@ -19,7 +18,7 @@ mod registry {
|
||||
io,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{LocalWaker, Poll},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tarpc::{
|
||||
client::{self, Client},
|
||||
@@ -54,8 +53,10 @@ mod registry {
|
||||
/// Returns a function that serves requests for the registered services.
|
||||
pub fn serve(
|
||||
self,
|
||||
) -> impl FnOnce(context::Context, ServiceRequest)
|
||||
-> Either<Services::Future, Ready<io::Result<ServiceResponse>>>
|
||||
) -> impl FnOnce(
|
||||
context::Context,
|
||||
ServiceRequest,
|
||||
) -> Either<Services::Future, Ready<io::Result<ServiceResponse>>>
|
||||
+ Clone {
|
||||
let registrations = Arc::new(self.registrations);
|
||||
move |cx, req: ServiceRequest| match registrations.serve(cx, &req) {
|
||||
@@ -212,11 +213,11 @@ mod registry {
|
||||
{
|
||||
type Output = Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Output> {
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Output> {
|
||||
unsafe {
|
||||
match Pin::get_mut_unchecked(self) {
|
||||
Either::Left(car) => Pin::new_unchecked(car).poll(waker),
|
||||
Either::Right(cdr) => Pin::new_unchecked(cdr).poll(waker),
|
||||
match Pin::get_unchecked_mut(self) {
|
||||
Either::Left(car) => Pin::new_unchecked(car).poll(cx),
|
||||
Either::Right(cdr) => Pin::new_unchecked(cdr).poll(cx),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -238,6 +239,7 @@ mod registry {
|
||||
// Example
|
||||
use bytes::Bytes;
|
||||
use futures::{
|
||||
compat::Executor01CompatExt,
|
||||
future::{ready, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
@@ -328,7 +330,8 @@ impl<Services: registry::MaybeServe + Sync> BincodeRegistry<Services> {
|
||||
fn serve(
|
||||
self,
|
||||
) -> impl FnOnce(
|
||||
context::Context, registry::ServiceRequest
|
||||
context::Context,
|
||||
registry::ServiceRequest,
|
||||
) -> registry::Either<
|
||||
Services::Future,
|
||||
Ready<io::Result<registry::ServiceResponse>>,
|
||||
@@ -406,6 +409,6 @@ async fn run() -> io::Result<()> {
|
||||
}
|
||||
|
||||
fn main() {
|
||||
tarpc::init(futures::compat::TokioDefaultSpawner);
|
||||
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
tokio::run(run().boxed().map_err(|e| panic!(e)).boxed().compat());
|
||||
}
|
||||
|
||||
@@ -9,13 +9,7 @@
|
||||
#![feature(async_await, external_doc)]
|
||||
#![cfg_attr(
|
||||
test,
|
||||
feature(
|
||||
pin,
|
||||
futures_api,
|
||||
await_macro,
|
||||
proc_macro_hygiene,
|
||||
arbitrary_self_types
|
||||
)
|
||||
feature(futures_api, await_macro, proc_macro_hygiene, arbitrary_self_types)
|
||||
)]
|
||||
|
||||
#[doc(hidden)]
|
||||
|
||||
@@ -177,15 +177,15 @@ macro_rules! service {
|
||||
impl<S: Service> ::std::future::Future for ResponseFut<S> {
|
||||
type Output = ::std::io::Result<Response>;
|
||||
|
||||
fn poll(self: ::std::pin::Pin<&mut Self>, waker: &::std::task::LocalWaker)
|
||||
fn poll(self: ::std::pin::Pin<&mut Self>, cx: &mut ::std::task::Context<'_>)
|
||||
-> ::std::task::Poll<::std::io::Result<Response>>
|
||||
{
|
||||
unsafe {
|
||||
match ::std::pin::Pin::get_mut_unchecked(self) {
|
||||
match ::std::pin::Pin::get_unchecked_mut(self) {
|
||||
$(
|
||||
ResponseFut::$fn_name(resp) =>
|
||||
::std::pin::Pin::new_unchecked(resp)
|
||||
.poll(waker)
|
||||
.poll(cx)
|
||||
.map(Response::$fn_name)
|
||||
.map(Ok),
|
||||
)*
|
||||
@@ -282,7 +282,7 @@ mod syntax_test {
|
||||
#[cfg(test)]
|
||||
mod functional_test {
|
||||
use futures::{
|
||||
compat::TokioDefaultSpawner,
|
||||
compat::Executor01CompatExt,
|
||||
future::{ready, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
@@ -315,7 +315,7 @@ mod functional_test {
|
||||
#[test]
|
||||
fn sequential() {
|
||||
let _ = env_logger::try_init();
|
||||
rpc::init(TokioDefaultSpawner);
|
||||
rpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
let test = async {
|
||||
let (tx, rx) = channel::unbounded();
|
||||
@@ -344,7 +344,7 @@ mod functional_test {
|
||||
#[test]
|
||||
fn concurrent() {
|
||||
let _ = env_logger::try_init();
|
||||
rpc::init(TokioDefaultSpawner);
|
||||
rpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
let test = async {
|
||||
let (tx, rx) = channel::unbounded();
|
||||
|
||||
@@ -7,7 +7,6 @@
|
||||
#![feature(
|
||||
test,
|
||||
arbitrary_self_types,
|
||||
pin,
|
||||
integer_atomics,
|
||||
futures_api,
|
||||
generators,
|
||||
@@ -16,10 +15,8 @@
|
||||
proc_macro_hygiene
|
||||
)]
|
||||
|
||||
extern crate test;
|
||||
|
||||
use self::test::stats::Stats;
|
||||
use futures::{compat::TokioDefaultSpawner, future, prelude::*};
|
||||
use futures::{compat::Executor01CompatExt, future, prelude::*};
|
||||
use libtest::stats::Stats;
|
||||
use rpc::{
|
||||
client, context,
|
||||
server::{Handler, Server},
|
||||
@@ -120,7 +117,7 @@ async fn bench() -> io::Result<()> {
|
||||
#[test]
|
||||
fn bench_small_packet() {
|
||||
env_logger::init();
|
||||
tarpc::init(TokioDefaultSpawner);
|
||||
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
tokio::run(bench().map_err(|e| panic!(e.to_string())).boxed().compat())
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tarpc-trace"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
authors = ["tikue <tikue@google.com>"]
|
||||
edition = '2018'
|
||||
license = "MIT"
|
||||
@@ -13,7 +13,7 @@ readme = "../README.md"
|
||||
description = "foundations for tracing in tarpc"
|
||||
|
||||
[dependencies]
|
||||
rand = "0.5"
|
||||
rand = "0.6"
|
||||
|
||||
[dependencies.serde]
|
||||
version = "1.0"
|
||||
|
||||
@@ -27,10 +27,7 @@ use std::{
|
||||
/// Consists of a span identifying an event, an optional parent span identifying a causal event
|
||||
/// that triggered the current span, and a trace with which all related spans are associated.
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
|
||||
#[cfg_attr(
|
||||
feature = "serde",
|
||||
derive(serde::Serialize, serde::Deserialize)
|
||||
)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
pub struct Context {
|
||||
/// An identifier of the trace associated with the current context. A trace ID is typically
|
||||
/// created at a root span and passed along through all causal events.
|
||||
@@ -50,18 +47,12 @@ pub struct Context {
|
||||
/// A 128-bit UUID identifying a trace. All spans caused by the same originating span share the
|
||||
/// same trace ID.
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
|
||||
#[cfg_attr(
|
||||
feature = "serde",
|
||||
derive(serde::Serialize, serde::Deserialize)
|
||||
)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
pub struct TraceId(u128);
|
||||
|
||||
/// A 64-bit identifier of a span within a trace. The identifier is unique within the span's trace.
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
|
||||
#[cfg_attr(
|
||||
feature = "serde",
|
||||
derive(serde::Serialize, serde::Deserialize)
|
||||
)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
pub struct SpanId(u64);
|
||||
|
||||
impl Context {
|
||||
@@ -80,7 +71,7 @@ impl TraceId {
|
||||
/// Returns a random trace ID that can be assumed to be globally unique if `rng` generates
|
||||
/// actually-random numbers.
|
||||
pub fn random<R: Rng>(rng: &mut R) -> Self {
|
||||
TraceId((rng.next_u64() as u128) << mem::size_of::<u64>() | rng.next_u64() as u128)
|
||||
TraceId(u128::from(rng.next_u64()) << mem::size_of::<u64>() | u128::from(rng.next_u64()))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user