11 Commits

Author SHA1 Message Date
Tim Kuehn
4e0be5b626 Publish tarpc v0.15.0 2019-03-26 21:13:41 -07:00
Artem Vorotnikov
5516034bbc Use libtest crate (#213) 2019-03-24 22:29:01 -07:00
Artem Vorotnikov
06544faa5a Update to futures 0.3.0-alpha.13 (#211) 2019-02-26 09:32:41 -08:00
Tim Kuehn
39737b720a Cargo fmt 2019-01-17 10:37:16 -08:00
Tim Kuehn
0f36985440 Update for latest changes to futures.
Fixes #209.
2019-01-17 10:37:03 -08:00
Tyler Bindon
959bb691cd Update regex to match diffs output by cargo fmt. (#208)
It appears the header of the diffs output by cargo fmt have changed. It now says "Diff in /blah/blah/blah.rs at line 99:" Matching on lines starting with + or - should be more future-proof against changes to the surroundings.
2018-12-09 01:59:35 -08:00
Tim
2a3162c5fa Cargo feature 'rename-dependency' is stabilized 2018-11-21 11:03:41 -08:00
Tim Kuehn
0cc976b729 cargo fmt 2018-11-06 17:01:27 -08:00
Tim Kuehn
4d2d3f24c6 Address Clippy lints 2018-11-06 17:00:15 -08:00
Tim Kuehn
2c7c64841f Add symlink tarpc/README.md -> README.md 2018-10-29 16:11:01 -07:00
Tim Kuehn
4ea142d0f3 Remove coverage badge.
It hasn't been updated in over 2 years.
2018-10-29 11:40:09 -07:00
33 changed files with 319 additions and 494 deletions

View File

@@ -1,6 +1,5 @@
## tarpc: Tim & Adam's RPC lib
[![Travis-CI Status](https://travis-ci.org/google/tarpc.png?branch=master)](https://travis-ci.org/google/tarpc)
[![Coverage Status](https://coveralls.io/repos/github/google/tarpc/badge.svg?branch=master)](https://coveralls.io/github/google/tarpc?branch=master)
[![Software License](https://img.shields.io/badge/license-MIT-brightgreen.svg)](LICENSE)
[![Latest Version](https://img.shields.io/crates/v/tarpc.svg)](https://crates.io/crates/tarpc)
[![Join the chat at https://gitter.im/tarpc/Lobby](https://badges.gitter.im/tarpc/Lobby.svg)](https://gitter.im/tarpc/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
@@ -34,7 +33,7 @@ arguments to tarpc fns.
Add to your `Cargo.toml` dependencies:
```toml
tarpc = "0.14.0"
tarpc = "0.15.0"
```
The `service!` macro expands to a collection of items that form an
@@ -53,7 +52,7 @@ Here's a small service.
use futures::{
compat::TokioDefaultSpawner,
compat::Executor01CompatExt,
future::{self, Ready},
prelude::*,
};
@@ -122,7 +121,7 @@ async fn run() -> io::Result<()> {
}
fn main() {
tarpc::init(TokioDefaultSpawner);
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(run()
.map_err(|e| eprintln!("Oh no: {}", e))
.boxed()

View File

@@ -1,8 +1,6 @@
cargo-features = ["rename-dependency"]
[package]
name = "tarpc-bincode-transport"
version = "0.3.0"
version = "0.4.0"
authors = ["Tim Kuehn <tikue@google.com>"]
edition = '2018'
license = "MIT"
@@ -17,24 +15,25 @@ description = "A bincode-based transport for tarpc services."
[dependencies]
bincode = { version = "1.0", features = ["i128"] }
futures_legacy = { version = "0.1", package = "futures" }
pin-utils = "0.1.0-alpha.3"
rpc = { package = "tarpc-lib", version = "0.2", path = "../rpc", features = ["serde1"] }
pin-utils = "0.1.0-alpha.4"
rpc = { package = "tarpc-lib", version = "0.3", path = "../rpc", features = ["serde1"] }
serde = "1.0"
tokio-io = "0.1"
async-bincode = "0.4"
tokio-tcp = "0.1"
[target.'cfg(not(test))'.dependencies]
futures-preview = { version = "0.3.0-alpha.9", features = ["compat"] }
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] }
[dev-dependencies]
futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] }
env_logger = "0.5"
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] }
env_logger = "0.6"
humantime = "1.0"
libtest = "0.0.1"
log = "0.4"
rand = "0.5"
rand = "0.6"
tokio = "0.1"
tokio-executor = "0.1"
tokio-reactor = "0.1"
tokio-serde = "0.2"
tokio-serde = "0.3"
tokio-timer = "0.2"

View File

@@ -8,7 +8,7 @@ use futures_legacy::{
};
use std::{
pin::Pin,
task::{self, LocalWaker, Poll},
task::{self, Poll, Waker},
};
/// A shim to convert a 0.1 Sink + Stream to a 0.3 Sink + Stream.
@@ -44,9 +44,9 @@ where
{
type Item = Result<S::Item, S::Error>;
fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<Self::Item>> {
fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<Self::Item>> {
unsafe {
let inner = &mut Pin::get_mut_unchecked(self).inner;
let inner = &mut Pin::get_unchecked_mut(self).inner;
let mut compat = inner.compat();
let compat = Pin::new_unchecked(&mut compat);
match ready!(compat.poll_next(waker)) {
@@ -66,17 +66,17 @@ where
type SinkError = S::SinkError;
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> Result<(), S::SinkError> {
let me = unsafe { Pin::get_mut_unchecked(self) };
let me = unsafe { Pin::get_unchecked_mut(self) };
assert!(me.staged_item.is_none());
me.staged_item = Some(item);
Ok(())
}
fn poll_ready(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
let notify = &WakerToHandle(waker);
executor01::with_notify(notify, 0, move || {
let me = unsafe { Pin::get_mut_unchecked(self) };
let me = unsafe { Pin::get_unchecked_mut(self) };
match me.staged_item.take() {
Some(staged_item) => match me.inner.start_send(staged_item) {
Ok(AsyncSink01::Ready) => Poll::Ready(Ok(())),
@@ -91,11 +91,11 @@ where
})
}
fn poll_flush(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
let notify = &WakerToHandle(waker);
executor01::with_notify(notify, 0, move || {
let me = unsafe { Pin::get_mut_unchecked(self) };
let me = unsafe { Pin::get_unchecked_mut(self) };
match me.inner.poll_complete() {
Ok(Async01::Ready(())) => Poll::Ready(Ok(())),
Ok(Async01::NotReady) => Poll::Pending,
@@ -104,11 +104,11 @@ where
})
}
fn poll_close(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
let notify = &WakerToHandle(waker);
executor01::with_notify(notify, 0, move || {
let me = unsafe { Pin::get_mut_unchecked(self) };
let me = unsafe { Pin::get_unchecked_mut(self) };
match me.inner.close() {
Ok(Async01::Ready(())) => Poll::Ready(Ok(())),
Ok(Async01::NotReady) => Poll::Pending,
@@ -119,7 +119,7 @@ where
}
#[derive(Clone, Debug)]
struct WakerToHandle<'a>(&'a LocalWaker);
struct WakerToHandle<'a>(&'a Waker);
#[derive(Debug)]
struct NotifyWaker(task::Waker);
@@ -145,6 +145,6 @@ unsafe impl UnsafeNotify01 for NotifyWaker {
impl<'a> From<WakerToHandle<'a>> for NotifyHandle01 {
fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 {
unsafe { NotifyWaker(handle.0.clone().into_waker()).clone_raw() }
unsafe { NotifyWaker(handle.0.clone()).clone_raw() }
}
}

View File

@@ -6,13 +6,7 @@
//! A TCP [`Transport`] that serializes as bincode.
#![feature(
futures_api,
pin,
arbitrary_self_types,
await_macro,
async_await
)]
#![feature(futures_api, arbitrary_self_types, await_macro, async_await)]
#![deny(missing_docs, missing_debug_implementations)]
use self::compat::Compat;
@@ -30,7 +24,7 @@ use std::{
marker::PhantomData,
net::SocketAddr,
pin::Pin,
task::{LocalWaker, Poll},
task::{Poll, Waker},
};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tcp::{TcpListener, TcpStream};
@@ -63,7 +57,7 @@ where
{
type Item = io::Result<Item>;
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<io::Result<Item>>> {
fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<io::Result<Item>>> {
match self.inner().poll_next(waker) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
@@ -83,21 +77,21 @@ where
type SinkItem = SinkItem;
type SinkError = io::Error;
fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
self.inner()
.start_send(item)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
fn poll_ready(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<()>> {
convert(self.inner().poll_ready(waker))
}
fn poll_flush(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<()>> {
convert(self.inner().poll_flush(waker))
}
fn poll_close(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<()>> {
convert(self.inner().poll_close(waker))
}
}
@@ -195,7 +189,7 @@ where
{
type Item = io::Result<Transport<TcpStream, Item, SinkItem>>;
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<Self::Item>> {
fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<Self::Item>> {
let next = ready!(self.incoming().poll_next(waker)?);
Poll::Ready(next.map(|conn| Ok(new(conn))))
}

View File

@@ -15,10 +15,8 @@
async_await
)]
extern crate test;
use self::test::stats::Stats;
use futures::{compat::TokioDefaultSpawner, prelude::*};
use futures::{compat::Executor01CompatExt, prelude::*};
use libtest::stats::Stats;
use rpc::{
client, context,
server::{Handler, Server},
@@ -101,7 +99,7 @@ async fn bench() -> io::Result<()> {
#[test]
fn bench_small_packet() -> io::Result<()> {
env_logger::init();
rpc::init(TokioDefaultSpawner);
rpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(bench().map_err(|e| panic!(e.to_string())).boxed().compat());
println!("done");

View File

@@ -9,7 +9,7 @@
#![feature(generators, await_macro, async_await, futures_api)]
use futures::{
compat::{Future01CompatExt, TokioDefaultSpawner},
compat::{Executor01CompatExt, Future01CompatExt},
prelude::*,
stream,
};
@@ -136,7 +136,7 @@ async fn run() -> io::Result<()> {
#[test]
fn cancel_slower() -> io::Result<()> {
env_logger::init();
rpc::init(TokioDefaultSpawner);
rpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(run().boxed().map_err(|e| panic!(e)).compat());
Ok(())

View File

@@ -9,7 +9,7 @@
#![feature(generators, await_macro, async_await, futures_api)]
use futures::{
compat::{Future01CompatExt, TokioDefaultSpawner},
compat::{Executor01CompatExt, Future01CompatExt},
prelude::*,
};
use log::{error, info, trace};
@@ -105,7 +105,7 @@ async fn run() -> io::Result<()> {
#[test]
fn ping_pong() -> io::Result<()> {
env_logger::init();
rpc::init(TokioDefaultSpawner);
rpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(
run()

View File

@@ -1,8 +1,6 @@
cargo-features = ["rename-dependency"]
[package]
name = "tarpc-example-service"
version = "0.2.0"
version = "0.3.0"
authors = ["Tim Kuehn <tikue@google.com>"]
edition = "2018"
license = "MIT"
@@ -15,11 +13,11 @@ readme = "../README.md"
description = "An example server built on tarpc."
[dependencies]
bincode-transport = { package = "tarpc-bincode-transport", version = "0.3", path = "../bincode-transport" }
bincode-transport = { package = "tarpc-bincode-transport", version = "0.4", path = "../bincode-transport" }
clap = "2.0"
futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] }
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] }
serde = { version = "1.0" }
tarpc = { version = "0.14", path = "../tarpc", features = ["serde1"] }
tarpc = { version = "0.15", path = "../tarpc", features = ["serde1"] }
tokio = "0.1"
tokio-executor = "0.1"

View File

@@ -4,16 +4,10 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
#![feature(
futures_api,
pin,
arbitrary_self_types,
await_macro,
async_await
)]
#![feature(futures_api, arbitrary_self_types, await_macro, async_await)]
use clap::{App, Arg};
use futures::{compat::TokioDefaultSpawner, prelude::*};
use futures::{compat::Executor01CompatExt, prelude::*};
use std::{io, net::SocketAddr};
use tarpc::{client, context};
@@ -59,7 +53,7 @@ fn main() {
)
.get_matches();
tarpc::init(TokioDefaultSpawner);
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
let server_addr = flags.value_of("server_addr").unwrap();
let server_addr = server_addr
@@ -68,7 +62,7 @@ fn main() {
let name = flags.value_of("name").unwrap();
tarpc::init(TokioDefaultSpawner);
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(
run(server_addr, name.into())

View File

@@ -6,7 +6,6 @@
#![feature(
futures_api,
pin,
arbitrary_self_types,
await_macro,
async_await,

View File

@@ -4,17 +4,11 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
#![feature(
futures_api,
pin,
arbitrary_self_types,
await_macro,
async_await
)]
#![feature(futures_api, arbitrary_self_types, await_macro, async_await)]
use clap::{App, Arg};
use futures::{
compat::TokioDefaultSpawner,
compat::Executor01CompatExt,
future::{self, Ready},
prelude::*,
};
@@ -79,7 +73,7 @@ fn main() {
.parse()
.unwrap_or_else(|e| panic!(r#"--port value "{}" invalid: {}"#, port, e));
tarpc::init(TokioDefaultSpawner);
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(
run(([0, 0, 0, 0], port).into())

View File

@@ -96,7 +96,7 @@ do
diff="$diff$(cargo fmt -- --skip-children --write-mode=diff $file)"
fi
done
if grep --quiet "^Diff at line" <<< "$diff"; then
if grep --quiet "^[-+]" <<< "$diff"; then
FMTRESULT=1
fi

View File

@@ -1,6 +1,6 @@
[package]
name = "tarpc-plugins"
version = "0.5.0"
version = "0.5.1"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
license = "MIT"
documentation = "https://docs.rs/tarpc-plugins"
@@ -15,7 +15,7 @@ description = "Proc macros for tarpc."
travis-ci = { repository = "google/tarpc" }
[dependencies]
itertools = "0.7"
itertools = "0.8"
syn = { version = "0.15", features = ["full", "extra-traits"] }
quote = "0.6"
proc-macro2 = "0.4"

View File

@@ -1,8 +1,6 @@
cargo-features = ["rename-dependency"]
[package]
name = "tarpc-lib"
version = "0.2.0"
version = "0.3.0"
authors = ["Tim Kuehn <tikue@google.com>"]
edition = '2018'
license = "MIT"
@@ -22,17 +20,17 @@ serde1 = ["trace/serde", "serde", "serde/derive"]
fnv = "1.0"
humantime = "1.0"
log = "0.4"
pin-utils = "0.1.0-alpha.3"
rand = "0.5"
pin-utils = "0.1.0-alpha.4"
rand = "0.6"
tokio-timer = "0.2"
trace = { package = "tarpc-trace", version = "0.1", path = "../trace" }
trace = { package = "tarpc-trace", version = "0.2", path = "../trace" }
serde = { optional = true, version = "1.0" }
[target.'cfg(not(test))'.dependencies]
futures-preview = { version = "0.3.0-alpha.9", features = ["compat"] }
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] }
[dev-dependencies]
futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] }
futures-test-preview = { version = "0.3.0-alpha.9" }
env_logger = "0.5"
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] }
futures-test-preview = { version = "0.3.0-alpha.13" }
env_logger = "0.6"
tokio = "0.1"

View File

@@ -7,7 +7,7 @@
use crate::{
context,
util::{deadline_compat, AsDuration, Compact},
ClientMessage, ClientMessageKind, Request, Response, Transport,
ClientMessage, ClientMessageKind, PollIo, Request, Response, Transport,
};
use fnv::FnvHashMap;
use futures::{
@@ -15,7 +15,7 @@ use futures::{
prelude::*,
ready,
stream::Fuse,
task::LocalWaker,
task::Waker,
Poll,
};
use humantime::format_rfc3339;
@@ -62,12 +62,12 @@ impl<Req, Resp> Clone for Channel<Req, Resp> {
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
struct Send<'a, Req, Resp> {
fut: MapOkDispatchResponse<
MapErrConnectionReset<futures::sink::Send<'a, mpsc::Sender<DispatchRequest<Req, Resp>>>>,
Resp,
>,
fut: MapOkDispatchResponse<SendMapErrConnectionReset<'a, Req, Resp>, Resp>,
}
type SendMapErrConnectionReset<'a, Req, Resp> =
MapErrConnectionReset<futures::sink::Send<'a, mpsc::Sender<DispatchRequest<Req, Resp>>>>;
impl<'a, Req, Resp> Send<'a, Req, Resp> {
unsafe_pinned!(
fut: MapOkDispatchResponse<
@@ -82,8 +82,8 @@ impl<'a, Req, Resp> Send<'a, Req, Resp> {
impl<'a, Req, Resp> Future for Send<'a, Req, Resp> {
type Output = io::Result<DispatchResponse<Resp>>;
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
self.fut().poll(lw)
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
self.as_mut().fut().poll(waker)
}
}
@@ -101,15 +101,15 @@ impl<'a, Req, Resp> Call<'a, Req, Resp> {
impl<'a, Req, Resp> Future for Call<'a, Req, Resp> {
type Output = io::Result<Resp>;
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
self.fut().poll(lw)
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
self.as_mut().fut().poll(waker)
}
}
impl<Req, Resp> Channel<Req, Resp> {
/// Sends a request to the dispatch task to forward to the server, returning a [`Future`] that
/// resolves when the request is sent (not when the response is received).
fn send<'a>(&'a mut self, mut ctx: context::Context, request: Req) -> Send<'a, Req, Resp> {
fn send(&mut self, mut ctx: context::Context, request: Req) -> Send<Req, Resp> {
// Convert the context to the call context.
ctx.trace_context.parent_id = Some(ctx.trace_context.span_id);
ctx.trace_context.span_id = SpanId::random(&mut rand::thread_rng());
@@ -150,7 +150,7 @@ impl<Req, Resp> Channel<Req, Resp> {
/// Sends a request to the dispatch task to forward to the server, returning a [`Future`] that
/// resolves to the response.
pub fn call<'a>(&'a mut self, context: context::Context, request: Req) -> Call<'a, Req, Resp> {
pub fn call(&mut self, context: context::Context, request: Req) -> Call<Req, Resp> {
Call {
fut: AndThenIdent::new(self.send(context, request)),
}
@@ -177,7 +177,7 @@ impl<Resp> DispatchResponse<Resp> {
impl<Resp> Future for DispatchResponse<Resp> {
type Output = io::Result<Resp>;
fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<Resp>> {
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<Resp>> {
let resp = ready!(self.response.poll_unpin(waker));
self.complete = true;
@@ -185,8 +185,8 @@ impl<Resp> Future for DispatchResponse<Resp> {
Poll::Ready(match resp {
Ok(resp) => Ok(resp.message?),
Err(e) => Err({
let trace_id = *self.ctx().trace_id();
let server_addr = *self.server_addr();
let trace_id = *self.as_mut().ctx().trace_id();
let server_addr = *self.as_mut().server_addr();
if e.is_elapsed() {
io::Error::new(
@@ -317,20 +317,20 @@ where
unsafe_pinned!(pending_requests: Fuse<mpsc::Receiver<DispatchRequest<Req, Resp>>>);
unsafe_pinned!(transport: Fuse<C>);
fn pump_read(self: &mut Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<io::Result<()>>> {
Poll::Ready(match ready!(self.transport().poll_next(waker)?) {
fn pump_read(self: &mut Pin<&mut Self>, waker: &Waker) -> PollIo<()> {
Poll::Ready(match ready!(self.as_mut().transport().poll_next(waker)?) {
Some(response) => {
self.complete(response);
Some(Ok(()))
}
None => {
trace!("[{}] read half closed", self.server_addr());
trace!("[{}] read half closed", self.as_mut().server_addr());
None
}
})
}
fn pump_write(self: &mut Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<io::Result<()>>> {
fn pump_write(self: &mut Pin<&mut Self>, waker: &Waker) -> PollIo<()> {
enum ReceiverStatus {
NotReady,
Closed,
@@ -356,12 +356,12 @@ where
match (pending_requests_status, canceled_requests_status) {
(ReceiverStatus::Closed, ReceiverStatus::Closed) => {
ready!(self.transport().poll_flush(waker)?);
ready!(self.as_mut().transport().poll_flush(waker)?);
Poll::Ready(None)
}
(ReceiverStatus::NotReady, _) | (_, ReceiverStatus::NotReady) => {
// No more messages to process, so flush any messages buffered in the transport.
ready!(self.transport().poll_flush(waker)?);
ready!(self.as_mut().transport().poll_flush(waker)?);
// Even if we fully-flush, we return Pending, because we have no more requests
// or cancellations right now.
@@ -373,12 +373,12 @@ where
/// Yields the next pending request, if one is ready to be sent.
fn poll_next_request(
self: &mut Pin<&mut Self>,
waker: &LocalWaker,
) -> Poll<Option<io::Result<DispatchRequest<Req, Resp>>>> {
if self.in_flight_requests().len() >= self.config.max_in_flight_requests {
waker: &Waker,
) -> PollIo<DispatchRequest<Req, Resp>> {
if self.as_mut().in_flight_requests().len() >= self.config.max_in_flight_requests {
info!(
"At in-flight request capacity ({}/{}).",
self.in_flight_requests().len(),
self.as_mut().in_flight_requests().len(),
self.config.max_in_flight_requests
);
@@ -387,13 +387,13 @@ where
return Poll::Pending;
}
while let Poll::Pending = self.transport().poll_ready(waker)? {
while let Poll::Pending = self.as_mut().transport().poll_ready(waker)? {
// We can't yield a request-to-be-sent before the transport is capable of buffering it.
ready!(self.transport().poll_flush(waker)?);
ready!(self.as_mut().transport().poll_flush(waker)?);
}
loop {
match ready!(self.pending_requests().poll_next_unpin(waker)) {
match ready!(self.as_mut().pending_requests().poll_next_unpin(waker)) {
Some(request) => {
if request.response_completion.is_canceled() {
trace!(
@@ -406,7 +406,7 @@ where
return Poll::Ready(Some(Ok(request)));
}
None => {
trace!("[{}] pending_requests closed", self.server_addr());
trace!("[{}] pending_requests closed", self.as_mut().server_addr());
return Poll::Ready(None);
}
}
@@ -416,29 +416,34 @@ where
/// Yields the next pending cancellation, and, if one is ready, cancels the associated request.
fn poll_next_cancellation(
self: &mut Pin<&mut Self>,
waker: &LocalWaker,
) -> Poll<Option<io::Result<(context::Context, u64)>>> {
while let Poll::Pending = self.transport().poll_ready(waker)? {
ready!(self.transport().poll_flush(waker)?);
waker: &Waker,
) -> PollIo<(context::Context, u64)> {
while let Poll::Pending = self.as_mut().transport().poll_ready(waker)? {
ready!(self.as_mut().transport().poll_flush(waker)?);
}
loop {
match ready!(self.canceled_requests().poll_next_unpin(waker)) {
match ready!(self.as_mut().canceled_requests().poll_next_unpin(waker)) {
Some(request_id) => {
if let Some(in_flight_data) = self.in_flight_requests().remove(&request_id) {
self.in_flight_requests().compact(0.1);
if let Some(in_flight_data) =
self.as_mut().in_flight_requests().remove(&request_id)
{
self.as_mut().in_flight_requests().compact(0.1);
debug!(
"[{}/{}] Removed request.",
in_flight_data.ctx.trace_id(),
self.server_addr()
self.as_mut().server_addr()
);
return Poll::Ready(Some(Ok((in_flight_data.ctx, request_id))));
}
}
None => {
trace!("[{}] canceled_requests closed.", self.server_addr());
trace!(
"[{}] canceled_requests closed.",
self.as_mut().server_addr()
);
return Poll::Ready(None);
}
}
@@ -458,8 +463,8 @@ where
deadline: dispatch_request.ctx.deadline,
}),
};
self.transport().start_send(request)?;
self.in_flight_requests().insert(
self.as_mut().transport().start_send(request)?;
self.as_mut().in_flight_requests().insert(
request_id,
InFlightData {
ctx: dispatch_request.ctx,
@@ -479,20 +484,28 @@ where
trace_context: context.trace_context,
message: ClientMessageKind::Cancel { request_id },
};
self.transport().start_send(cancel)?;
trace!("[{}/{}] Cancel message sent.", trace_id, self.server_addr());
return Ok(());
self.as_mut().transport().start_send(cancel)?;
trace!(
"[{}/{}] Cancel message sent.",
trace_id,
self.as_mut().server_addr()
);
Ok(())
}
/// Sends a server response to the client task that initiated the associated request.
fn complete(self: &mut Pin<&mut Self>, response: Response<Resp>) -> bool {
if let Some(in_flight_data) = self.in_flight_requests().remove(&response.request_id) {
self.in_flight_requests().compact(0.1);
if let Some(in_flight_data) = self
.as_mut()
.in_flight_requests()
.remove(&response.request_id)
{
self.as_mut().in_flight_requests().compact(0.1);
trace!(
"[{}/{}] Received response.",
in_flight_data.ctx.trace_id(),
self.server_addr()
self.as_mut().server_addr()
);
let _ = in_flight_data.response_completion.send(response);
return true;
@@ -500,7 +513,7 @@ where
debug!(
"[{}] No in-flight request found for request_id = {}.",
self.server_addr(),
self.as_mut().server_addr(),
response.request_id
);
@@ -517,15 +530,15 @@ where
{
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
trace!("[{}] RequestDispatch::poll", self.server_addr());
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<()>> {
trace!("[{}] RequestDispatch::poll", self.as_mut().server_addr());
loop {
match (self.pump_read(waker)?, self.pump_write(waker)?) {
(read, write @ Poll::Ready(None)) => {
if self.in_flight_requests().is_empty() {
if self.as_mut().in_flight_requests().is_empty() {
info!(
"[{}] Shutdown: write half closed, and no requests in flight.",
self.server_addr()
self.as_mut().server_addr()
);
return Poll::Ready(Ok(()));
}
@@ -534,7 +547,7 @@ where
_ => {
trace!(
"[{}] read: {:?}, write: {:?}, (not ready)",
self.server_addr(),
self.as_mut().server_addr(),
read,
write,
);
@@ -545,7 +558,7 @@ where
(read @ Poll::Ready(Some(())), write) | (read, write @ Poll::Ready(Some(()))) => {
trace!(
"[{}] read: {:?}, write: {:?}",
self.server_addr(),
self.as_mut().server_addr(),
read,
write,
)
@@ -553,7 +566,7 @@ where
(read, write) => {
trace!(
"[{}] read: {:?}, write: {:?} (not ready)",
self.server_addr(),
self.as_mut().server_addr(),
read,
write,
);
@@ -607,7 +620,7 @@ impl RequestCancellation {
impl Stream for CanceledRequests {
type Item = u64;
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<u64>> {
fn poll_next(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<u64>> {
self.0.poll_next_unpin(waker)
}
}
@@ -639,8 +652,8 @@ where
{
type Output = io::Result<Fut::Ok>;
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
match self.future().try_poll(lw) {
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
match self.as_mut().future().try_poll(waker) {
Poll::Pending => Poll::Pending,
Poll::Ready(result) => {
self.finished().take().expect(
@@ -679,11 +692,12 @@ where
{
type Output = Result<DispatchResponse<Resp>, Fut::Error>;
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
match self.future().try_poll(lw) {
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
match self.as_mut().future().try_poll(waker) {
Poll::Pending => Poll::Pending,
Poll::Ready(result) => {
let response = self
.as_mut()
.response()
.take()
.expect("MapOk must not be polled after it returned `Poll::Ready`");
@@ -721,8 +735,8 @@ where
{
type Output = Result<Fut2::Ok, Fut2::Error>;
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
self.try_chain().poll(lw, |result| match result {
fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
self.try_chain().poll(waker, |result| match result {
Ok(ok) => TryChainAction::Future(ok),
Err(err) => TryChainAction::Output(Err(err)),
})
@@ -754,27 +768,27 @@ where
TryChain::First(fut1)
}
fn poll<F>(self: Pin<&mut Self>, lw: &LocalWaker, f: F) -> Poll<Result<Fut2::Ok, Fut2::Error>>
fn poll<F>(self: Pin<&mut Self>, waker: &Waker, f: F) -> Poll<Result<Fut2::Ok, Fut2::Error>>
where
F: FnOnce(Result<Fut1::Ok, Fut1::Error>) -> TryChainAction<Fut2>,
{
let mut f = Some(f);
// Safe to call `get_mut_unchecked` because we won't move the futures.
let this = unsafe { Pin::get_mut_unchecked(self) };
// Safe to call `get_unchecked_mut` because we won't move the futures.
let this = unsafe { Pin::get_unchecked_mut(self) };
loop {
let output = match this {
TryChain::First(fut1) => {
// Poll the first future
match unsafe { Pin::new_unchecked(fut1) }.try_poll(lw) {
match unsafe { Pin::new_unchecked(fut1) }.try_poll(waker) {
Poll::Pending => return Poll::Pending,
Poll::Ready(output) => output,
}
}
TryChain::Second(fut2) => {
// Poll the second future
return unsafe { Pin::new_unchecked(fut2) }.try_poll(lw);
return unsafe { Pin::new_unchecked(fut2) }.try_poll(waker);
}
TryChain::Empty => {
panic!("future must not be polled after it returned `Poll::Ready`");
@@ -802,7 +816,7 @@ mod tests {
};
use fnv::FnvHashMap;
use futures::{channel::mpsc, prelude::*, Poll};
use futures_test::task::noop_local_waker_ref;
use futures_test::task::noop_waker_ref;
use std::{
marker,
net::{IpAddr, Ipv4Addr, SocketAddr},
@@ -825,7 +839,7 @@ mod tests {
);
let mut dispatch = Pin::new(&mut dispatch);
let waker = &noop_local_waker_ref();
let waker = &noop_waker_ref();
let req = dispatch.poll_next_request(waker).ready();
assert!(req.is_some());
@@ -852,7 +866,7 @@ mod tests {
drop(channel);
let mut dispatch = Pin::new(&mut dispatch);
let waker = &noop_local_waker_ref();
let waker = &noop_waker_ref();
dispatch.poll_next_cancellation(waker).unwrap();
assert!(dispatch.poll_next_request(waker).ready().is_none());
@@ -876,7 +890,7 @@ mod tests {
drop(channel);
let mut dispatch = Pin::new(&mut dispatch);
let waker = &noop_local_waker_ref();
let waker = &noop_waker_ref();
assert!(dispatch.poll_next_request(waker).ready().is_none());
}

View File

@@ -9,10 +9,9 @@
integer_atomics,
try_trait,
futures_api,
pin,
arbitrary_self_types,
await_macro,
async_await,
async_await
)]
#![deny(missing_docs, missing_debug_implementations)]
@@ -43,17 +42,14 @@ pub(crate) mod util;
pub use crate::{client::Client, server::Server, transport::Transport};
use futures::{
task::{Spawn, SpawnError, SpawnExt},
task::{Poll, Spawn, SpawnError, SpawnExt},
Future,
};
use std::{cell::RefCell, io, sync::Once, time::SystemTime};
/// A message from a client to a server.
#[derive(Debug)]
#[cfg_attr(
feature = "serde1",
derive(serde::Serialize, serde::Deserialize)
)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub struct ClientMessage<T> {
/// The trace context associates the message with a specific chain of causally-related actions,
@@ -65,10 +61,7 @@ pub struct ClientMessage<T> {
/// Different messages that can be sent from a client to a server.
#[derive(Debug)]
#[cfg_attr(
feature = "serde1",
derive(serde::Serialize, serde::Deserialize)
)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub enum ClientMessageKind<T> {
/// A request initiated by a user. The server responds to a request by invoking a
@@ -90,10 +83,7 @@ pub enum ClientMessageKind<T> {
/// A request from a client to a server.
#[derive(Debug)]
#[cfg_attr(
feature = "serde1",
derive(serde::Serialize, serde::Deserialize)
)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub struct Request<T> {
/// Uniquely identifies the request across all requests sent over a single channel.
@@ -115,10 +105,7 @@ pub struct Request<T> {
/// A response from a server to a client.
#[derive(Debug, PartialEq, Eq)]
#[cfg_attr(
feature = "serde1",
derive(serde::Serialize, serde::Deserialize)
)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub struct Response<T> {
/// The ID of the request being responded to.
@@ -129,10 +116,7 @@ pub struct Response<T> {
/// An error response from a server to a client.
#[derive(Debug, PartialEq, Eq)]
#[cfg_attr(
feature = "serde1",
derive(serde::Serialize, serde::Deserialize)
)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub struct ServerError {
#[cfg_attr(
@@ -162,6 +146,8 @@ impl<T> Request<T> {
}
}
pub(crate) type PollIo<T> = Poll<Option<io::Result<T>>>;
static INIT: Once = Once::new();
static mut SEED_SPAWN: Option<Box<dyn CloneSpawn>> = None;
thread_local! {

View File

@@ -7,7 +7,7 @@
use crate::{
server::{Channel, Config},
util::Compact,
ClientMessage, Response, Transport,
ClientMessage, PollIo, Response, Transport,
};
use fnv::FnvHashMap;
use futures::{
@@ -15,7 +15,7 @@ use futures::{
prelude::*,
ready,
stream::Fuse,
task::{LocalWaker, Poll},
task::{Poll, Waker},
};
use log::{debug, error, info, trace, warn};
use pin_utils::unsafe_pinned;
@@ -107,28 +107,28 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
}
};
let open_connections = *self.open_connections();
if open_connections >= self.config().max_connections {
let open_connections = *self.as_mut().open_connections();
if open_connections >= self.as_mut().config().max_connections {
warn!(
"[{}] Shedding connection because the maximum open connections \
limit is reached ({}/{}).",
peer,
open_connections,
self.config().max_connections
self.as_mut().config().max_connections
);
return NewConnection::Filtered;
}
let config = self.config.clone();
let open_connections_for_ip = self.increment_connections_for_ip(&peer)?;
*self.open_connections() += 1;
*self.as_mut().open_connections() += 1;
debug!(
"[{}] Opening channel ({}/{} connections for IP, {} total).",
peer,
open_connections_for_ip,
config.max_connections_per_ip,
self.open_connections(),
self.as_mut().open_connections(),
);
NewConnection::Accepted(Channel {
@@ -141,19 +141,19 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
}
fn handle_closed_connection(self: &mut Pin<&mut Self>, addr: &SocketAddr) {
*self.open_connections() -= 1;
*self.as_mut().open_connections() -= 1;
debug!(
"[{}] Closing channel. {} open connections remaining.",
addr, self.open_connections
);
self.decrement_connections_for_ip(&addr);
self.connections_per_ip().compact(0.1);
self.as_mut().connections_per_ip().compact(0.1);
}
fn increment_connections_for_ip(self: &mut Pin<&mut Self>, peer: &SocketAddr) -> Option<usize> {
let max_connections_per_ip = self.config().max_connections_per_ip;
let max_connections_per_ip = self.as_mut().config().max_connections_per_ip;
let mut occupied;
let mut connections_per_ip = self.connections_per_ip();
let mut connections_per_ip = self.as_mut().connections_per_ip();
let occupied = match connections_per_ip.entry(peer.ip()) {
Entry::Vacant(vacant) => vacant.insert(0),
Entry::Occupied(o) => {
@@ -177,7 +177,7 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
}
fn decrement_connections_for_ip(self: &mut Pin<&mut Self>, addr: &SocketAddr) {
let should_compact = match self.connections_per_ip().entry(addr.ip()) {
let should_compact = match self.as_mut().connections_per_ip().entry(addr.ip()) {
Entry::Vacant(_) => {
error!("[{}] Got vacant entry when closing connection.", addr);
return;
@@ -193,26 +193,23 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
}
};
if should_compact {
self.connections_per_ip().compact(0.1);
self.as_mut().connections_per_ip().compact(0.1);
}
}
fn poll_listener<C>(
self: &mut Pin<&mut Self>,
cx: &LocalWaker,
) -> Poll<Option<io::Result<NewConnection<Req, Resp, C>>>>
fn poll_listener<C>(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<NewConnection<Req, Resp, C>>
where
S: Stream<Item = Result<C, io::Error>>,
C: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
{
match ready!(self.listener().poll_next_unpin(cx)?) {
match ready!(self.as_mut().listener().poll_next_unpin(cx)?) {
Some(codec) => Poll::Ready(Some(Ok(self.handle_new_connection(codec)))),
None => Poll::Ready(None),
}
}
fn poll_closed_connections(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
match ready!(self.closed_connections_rx().poll_next_unpin(cx)) {
fn poll_closed_connections(self: &mut Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
match ready!(self.as_mut().closed_connections_rx().poll_next_unpin(cx)) {
Some(addr) => {
self.handle_closed_connection(&addr);
Poll::Ready(Ok(()))
@@ -229,25 +226,28 @@ where
{
type Item = io::Result<Channel<Req, Resp, T>>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &LocalWaker,
) -> Poll<Option<io::Result<Channel<Req, Resp, T>>>> {
fn poll_next(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<Channel<Req, Resp, T>> {
loop {
match (self.poll_listener(cx)?, self.poll_closed_connections(cx)?) {
match (
self.as_mut().poll_listener(cx)?,
self.poll_closed_connections(cx)?,
) {
(Poll::Ready(Some(NewConnection::Accepted(channel))), _) => {
return Poll::Ready(Some(Ok(channel)))
return Poll::Ready(Some(Ok(channel)));
}
(Poll::Ready(Some(NewConnection::Filtered)), _) | (_, Poll::Ready(())) => {
trace!("Filtered a connection; {} open.", self.open_connections());
trace!(
"Filtered a connection; {} open.",
self.as_mut().open_connections()
);
continue;
}
(Poll::Pending, Poll::Pending) => return Poll::Pending,
(Poll::Ready(None), Poll::Pending) => {
if *self.open_connections() > 0 {
if *self.as_mut().open_connections() > 0 {
trace!(
"Listener closed; {} open connections.",
self.open_connections()
self.as_mut().open_connections()
);
return Poll::Pending;
}

View File

@@ -8,7 +8,7 @@
use crate::{
context::Context, util::deadline_compat, util::AsDuration, util::Compact, ClientMessage,
ClientMessageKind, Request, Response, ServerError, Transport,
ClientMessageKind, PollIo, Request, Response, ServerError, Transport,
};
use fnv::FnvHashMap;
use futures::{
@@ -17,7 +17,7 @@ use futures::{
prelude::*,
ready,
stream::Fuse,
task::{LocalWaker, Poll},
task::{Poll, Waker},
try_ready,
};
use humantime::format_rfc3339;
@@ -133,13 +133,13 @@ where
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<()> {
while let Some(channel) = ready!(self.incoming().poll_next(cx)) {
fn poll(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<()> {
while let Some(channel) = ready!(self.as_mut().incoming().poll_next(cx)) {
match channel {
Ok(channel) => {
let peer = channel.client_addr;
if let Err(e) =
crate::spawn(channel.respond_with(self.request_handler().clone()))
crate::spawn(channel.respond_with(self.as_mut().request_handler().clone()))
{
warn!("[{}] Failed to spawn connection handler: {:?}", peer, e);
}
@@ -150,7 +150,7 @@ where
}
}
info!("Server shutting down.");
return Poll::Ready(());
Poll::Ready(())
}
}
@@ -181,7 +181,8 @@ where
Req: Send,
Resp: Send,
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
{}
{
}
/// Responds to all requests with `request_handler`.
/// The server end of an open connection with a client.
@@ -229,26 +230,20 @@ where
Req: Send,
Resp: Send,
{
pub(crate) fn start_send(
self: &mut Pin<&mut Self>,
response: Response<Resp>,
) -> io::Result<()> {
self.transport().start_send(response)
pub(crate) fn start_send(mut self: Pin<&mut Self>, response: Response<Resp>) -> io::Result<()> {
self.as_mut().transport().start_send(response)
}
pub(crate) fn poll_ready(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
self.transport().poll_ready(cx)
pub(crate) fn poll_ready(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
self.as_mut().transport().poll_ready(cx)
}
pub(crate) fn poll_flush(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
self.transport().poll_flush(cx)
pub(crate) fn poll_flush(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
self.as_mut().transport().poll_flush(cx)
}
pub(crate) fn poll_next(
self: &mut Pin<&mut Self>,
cx: &LocalWaker,
) -> Poll<Option<io::Result<ClientMessage<Req>>>> {
self.transport().poll_next(cx)
pub(crate) fn poll_next(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<ClientMessage<Req>> {
self.as_mut().transport().poll_next(cx)
}
/// Returns the address of the client connected to the channel.
@@ -315,31 +310,28 @@ where
{
/// If at max in-flight requests, check that there's room to immediately write a throttled
/// response.
fn poll_ready_if_throttling(
self: &mut Pin<&mut Self>,
cx: &LocalWaker,
) -> Poll<io::Result<()>> {
fn poll_ready_if_throttling(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
if self.in_flight_requests.len()
>= self.channel.config.max_in_flight_requests_per_connection
{
let peer = self.channel().client_addr;
let peer = self.as_mut().channel().client_addr;
while let Poll::Pending = self.channel().poll_ready(cx)? {
while let Poll::Pending = self.as_mut().channel().poll_ready(cx)? {
info!(
"[{}] In-flight requests at max ({}), and transport is not ready.",
peer,
self.in_flight_requests().len(),
self.as_mut().in_flight_requests().len(),
);
try_ready!(self.channel().poll_flush(cx));
try_ready!(self.as_mut().channel().poll_flush(cx));
}
}
Poll::Ready(Ok(()))
}
fn pump_read(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<Option<io::Result<()>>> {
ready!(self.poll_ready_if_throttling(cx)?);
fn pump_read(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<()> {
ready!(self.as_mut().poll_ready_if_throttling(cx)?);
Poll::Ready(match ready!(self.channel().poll_next(cx)?) {
Poll::Ready(match ready!(self.as_mut().channel().poll_next(cx)?) {
Some(message) => {
match message.message {
ClientMessageKind::Request(request) => {
@@ -358,29 +350,25 @@ where
})
}
fn pump_write(
self: &mut Pin<&mut Self>,
cx: &LocalWaker,
read_half_closed: bool,
) -> Poll<Option<io::Result<()>>> {
match self.poll_next_response(cx)? {
fn pump_write(mut self: Pin<&mut Self>, cx: &Waker, read_half_closed: bool) -> PollIo<()> {
match self.as_mut().poll_next_response(cx)? {
Poll::Ready(Some((_, response))) => {
self.channel().start_send(response)?;
self.as_mut().channel().start_send(response)?;
Poll::Ready(Some(Ok(())))
}
Poll::Ready(None) => {
// Shutdown can't be done before we finish pumping out remaining responses.
ready!(self.channel().poll_flush(cx)?);
ready!(self.as_mut().channel().poll_flush(cx)?);
Poll::Ready(None)
}
Poll::Pending => {
// No more requests to process, so flush any requests buffered in the transport.
ready!(self.channel().poll_flush(cx)?);
ready!(self.as_mut().channel().poll_flush(cx)?);
// Being here means there are no staged requests and all written responses are
// fully flushed. So, if the read half is closed and there are no in-flight
// requests, then we can close the write half.
if read_half_closed && self.in_flight_requests().is_empty() {
if read_half_closed && self.as_mut().in_flight_requests().is_empty() {
Poll::Ready(None)
} else {
Poll::Pending
@@ -390,28 +378,33 @@ where
}
fn poll_next_response(
self: &mut Pin<&mut Self>,
cx: &LocalWaker,
) -> Poll<Option<io::Result<(Context, Response<Resp>)>>> {
mut self: Pin<&mut Self>,
cx: &Waker,
) -> PollIo<(Context, Response<Resp>)> {
// Ensure there's room to write a response.
while let Poll::Pending = self.channel().poll_ready(cx)? {
ready!(self.channel().poll_flush(cx)?);
while let Poll::Pending = self.as_mut().channel().poll_ready(cx)? {
ready!(self.as_mut().channel().poll_flush(cx)?);
}
let peer = self.channel().client_addr;
let peer = self.as_mut().channel().client_addr;
match ready!(self.pending_responses().poll_next(cx)) {
match ready!(self.as_mut().pending_responses().poll_next(cx)) {
Some((ctx, response)) => {
if let Some(_) = self.in_flight_requests().remove(&response.request_id) {
self.in_flight_requests().compact(0.1);
if self
.as_mut()
.in_flight_requests()
.remove(&response.request_id)
.is_some()
{
self.as_mut().in_flight_requests().compact(0.1);
}
trace!(
"[{}/{}] Staging response. In-flight requests = {}.",
ctx.trace_id(),
peer,
self.in_flight_requests().len(),
self.as_mut().in_flight_requests().len(),
);
return Poll::Ready(Some(Ok((ctx, response))));
Poll::Ready(Some(Ok((ctx, response))))
}
None => {
// This branch likely won't happen, since the ClientHandler is holding a Sender.
@@ -422,30 +415,37 @@ where
}
fn handle_request(
self: &mut Pin<&mut Self>,
mut self: Pin<&mut Self>,
trace_context: trace::Context,
request: Request<Req>,
) -> io::Result<()> {
let request_id = request.id;
let peer = self.channel().client_addr;
let peer = self.as_mut().channel().client_addr;
let ctx = Context {
deadline: request.deadline,
trace_context,
};
let request = request.message;
if self.in_flight_requests().len()
>= self.channel().config.max_in_flight_requests_per_connection
if self.as_mut().in_flight_requests().len()
>= self
.as_mut()
.channel()
.config
.max_in_flight_requests_per_connection
{
debug!(
"[{}/{}] Client has reached in-flight request limit ({}/{}).",
ctx.trace_id(),
peer,
self.in_flight_requests().len(),
self.channel().config.max_in_flight_requests_per_connection
self.as_mut().in_flight_requests().len(),
self.as_mut()
.channel()
.config
.max_in_flight_requests_per_connection
);
self.channel().start_send(Response {
self.as_mut().channel().start_send(Response {
request_id,
message: Err(ServerError {
kind: io::ErrorKind::WouldBlock,
@@ -464,10 +464,10 @@ where
format_rfc3339(deadline),
timeout,
);
let mut response_tx = self.responses_tx().clone();
let mut response_tx = self.as_mut().responses_tx().clone();
let trace_id = *ctx.trace_id();
let response = self.f().clone()(ctx.clone(), request);
let response = self.as_mut().f().clone()(ctx, request);
let response = deadline_compat::Deadline::new(response, Instant::now() + timeout).then(
async move |result| {
let response = Response {
@@ -491,18 +491,20 @@ where
),
)
})?;
self.in_flight_requests().insert(request_id, abort_handle);
self.as_mut()
.in_flight_requests()
.insert(request_id, abort_handle);
Ok(())
}
fn cancel_request(self: &mut Pin<&mut Self>, trace_context: &trace::Context, request_id: u64) {
fn cancel_request(mut self: Pin<&mut Self>, trace_context: &trace::Context, request_id: u64) {
// It's possible the request was already completed, so it's fine
// if this is None.
if let Some(cancel_handle) = self.in_flight_requests().remove(&request_id) {
self.in_flight_requests().compact(0.1);
if let Some(cancel_handle) = self.as_mut().in_flight_requests().remove(&request_id) {
self.as_mut().in_flight_requests().compact(0.1);
cancel_handle.abort();
let remaining = self.in_flight_requests().len();
let remaining = self.as_mut().in_flight_requests().len();
trace!(
"[{}/{}] Request canceled. In-flight requests = {}",
trace_context.trace_id,
@@ -530,11 +532,14 @@ where
{
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
fn poll(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
trace!("[{}] ClientHandler::poll", self.channel.client_addr);
loop {
let read = self.pump_read(cx)?;
match (read, self.pump_write(cx, read == Poll::Ready(None))?) {
let read = self.as_mut().pump_read(cx)?;
match (
read,
self.as_mut().pump_write(cx, read == Poll::Ready(None))?,
) {
(Poll::Ready(None), Poll::Ready(None)) => {
info!("[{}] Client disconnected.", self.channel.client_addr);
return Poll::Ready(Ok(()));

View File

@@ -6,8 +6,8 @@
//! Transports backed by in-memory channels.
use crate::Transport;
use futures::{channel::mpsc, task::LocalWaker, Poll, Sink, Stream};
use crate::{PollIo, Transport};
use futures::{channel::mpsc, task::Waker, Poll, Sink, Stream};
use pin_utils::unsafe_pinned;
use std::pin::Pin;
use std::{
@@ -45,7 +45,7 @@ impl<Item, SinkItem> UnboundedChannel<Item, SinkItem> {
impl<Item, SinkItem> Stream for UnboundedChannel<Item, SinkItem> {
type Item = Result<Item, io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<Option<io::Result<Item>>> {
fn poll_next(self: Pin<&mut Self>, cx: &Waker) -> PollIo<Item> {
self.rx().poll_next(cx).map(|option| option.map(Ok))
}
}
@@ -54,25 +54,25 @@ impl<Item, SinkItem> Sink for UnboundedChannel<Item, SinkItem> {
type SinkItem = SinkItem;
type SinkError = io::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
fn poll_ready(self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
self.tx()
.poll_ready(cx)
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
}
fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
self.tx()
.start_send(item)
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<Result<(), Self::SinkError>> {
fn poll_flush(self: Pin<&mut Self>, cx: &Waker) -> Poll<Result<(), Self::SinkError>> {
self.tx()
.poll_flush(cx)
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
}
fn poll_close(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
fn poll_close(self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
self.tx()
.poll_close(cx)
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
@@ -99,14 +99,15 @@ mod tests {
server::{Handler, Server},
transport,
};
use futures::{compat::TokioDefaultSpawner, prelude::*, stream};
use futures::compat::Executor01CompatExt;
use futures::{prelude::*, stream};
use log::trace;
use std::io;
#[test]
fn integration() {
let _ = env_logger::try_init();
crate::init(TokioDefaultSpawner);
crate::init(tokio::executor::DefaultExecutor::current().compat());
let (client_channel, server_channel) = transport::channel::unbounded();
let server = Server::<String, u64>::default()

View File

@@ -14,7 +14,7 @@ use std::{
io,
net::SocketAddr,
pin::Pin,
task::{LocalWaker, Poll},
task::{Poll, Waker},
};
pub mod channel;
@@ -71,7 +71,7 @@ where
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<S::Item>> {
fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<S::Item>> {
self.inner().poll_next(waker)
}
}
@@ -83,19 +83,19 @@ where
type SinkItem = S::SinkItem;
type SinkError = S::SinkError;
fn start_send(mut self: Pin<&mut Self>, item: S::SinkItem) -> Result<(), S::SinkError> {
fn start_send(self: Pin<&mut Self>, item: S::SinkItem) -> Result<(), S::SinkError> {
self.inner().start_send(item)
}
fn poll_ready(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
self.inner().poll_ready(waker)
}
fn poll_flush(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
self.inner().poll_flush(waker)
}
fn poll_close(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
self.inner().poll_close(waker)
}
}

View File

@@ -8,7 +8,7 @@ use futures::{
compat::{Compat01As03, Future01CompatExt},
prelude::*,
ready,
task::{LocalWaker, Poll},
task::{Poll, Waker},
};
use pin_utils::unsafe_pinned;
use std::pin::Pin;
@@ -50,9 +50,9 @@ where
{
type Output = Result<T::Ok, timeout::Error<T::Error>>;
fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
// First, try polling the future
match self.future().try_poll(waker) {
match self.as_mut().future().try_poll(waker) {
Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)),
Poll::Pending => {}
Poll::Ready(Err(e)) => return Poll::Ready(Err(timeout::Error::inner(e))),

View File

@@ -31,6 +31,7 @@ where
}
/// Serializes [`io::ErrorKind`] as a `u32`.
#[allow(clippy::trivially_copy_pass_by_ref)] // Exact fn signature required by serde derive
pub fn serialize_io_error_kind_as_u32<S>(
kind: &io::ErrorKind,
serializer: S,

View File

@@ -1,8 +1,6 @@
cargo-features = ["rename-dependency"]
[package]
name = "tarpc"
version = "0.14.0"
version = "0.15.0"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
edition = "2018"
license = "MIT"
@@ -24,19 +22,20 @@ travis-ci = { repository = "google/tarpc" }
log = "0.4"
serde = { optional = true, version = "1.0" }
tarpc-plugins = { path = "../plugins", version = "0.5.0" }
rpc = { package = "tarpc-lib", path = "../rpc", version = "0.2" }
rpc = { package = "tarpc-lib", path = "../rpc", version = "0.3" }
[target.'cfg(not(test))'.dependencies]
futures-preview = "0.3.0-alpha.9"
futures-preview = "0.3.0-alpha.13"
[dev-dependencies]
bincode = "1.0"
bytes = { version = "0.4", features = ["serde"] }
humantime = "1.0"
futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] }
bincode-transport = { package = "tarpc-bincode-transport", version = "0.3", path = "../bincode-transport" }
env_logger = "0.5"
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] }
bincode-transport = { package = "tarpc-bincode-transport", version = "0.4", path = "../bincode-transport" }
env_logger = "0.6"
libtest = "0.0.1"
tokio = "0.1"
tokio-executor = "0.1"
tokio-tcp = "0.1"
pin-utils = "0.1.0-alpha.3"
pin-utils = "0.1.0-alpha.4"

1
tarpc/README.md Symbolic link
View File

@@ -0,0 +1 @@
../README.md

View File

@@ -6,7 +6,6 @@
#![feature(
arbitrary_self_types,
pin,
futures_api,
await_macro,
async_await,

View File

@@ -6,7 +6,6 @@
#![feature(
futures_api,
pin,
arbitrary_self_types,
await_macro,
async_await,
@@ -14,7 +13,7 @@
)]
use futures::{
compat::TokioDefaultSpawner,
compat::Executor01CompatExt,
future::{self, Ready},
prelude::*,
};
@@ -83,7 +82,7 @@ async fn run() -> io::Result<()> {
}
fn main() {
tarpc::init(TokioDefaultSpawner);
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(
run()

View File

@@ -7,7 +7,6 @@
#![feature(
existential_type,
arbitrary_self_types,
pin,
futures_api,
await_macro,
async_await,
@@ -16,7 +15,7 @@
use crate::{add::Service as AddService, double::Service as DoubleService};
use futures::{
compat::TokioDefaultSpawner,
compat::Executor01CompatExt,
future::{self, Ready},
prelude::*,
};
@@ -103,6 +102,6 @@ async fn run() -> io::Result<()> {
fn main() {
env_logger::init();
tarpc::init(TokioDefaultSpawner);
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(run().map_err(|e| panic!(e)).boxed().compat());
}

View File

@@ -1,5 +1,4 @@
#![feature(
pin,
async_await,
await_macro,
futures_api,
@@ -19,7 +18,7 @@ mod registry {
io,
pin::Pin,
sync::Arc,
task::{LocalWaker, Poll},
task::{Poll, Waker},
};
use tarpc::{
client::{self, Client},
@@ -54,8 +53,10 @@ mod registry {
/// Returns a function that serves requests for the registered services.
pub fn serve(
self,
) -> impl FnOnce(context::Context, ServiceRequest)
-> Either<Services::Future, Ready<io::Result<ServiceResponse>>>
) -> impl FnOnce(
context::Context,
ServiceRequest,
) -> Either<Services::Future, Ready<io::Result<ServiceResponse>>>
+ Clone {
let registrations = Arc::new(self.registrations);
move |cx, req: ServiceRequest| match registrations.serve(cx, &req) {
@@ -212,9 +213,9 @@ mod registry {
{
type Output = Output;
fn poll(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Output> {
fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Output> {
unsafe {
match Pin::get_mut_unchecked(self) {
match Pin::get_unchecked_mut(self) {
Either::Left(car) => Pin::new_unchecked(car).poll(waker),
Either::Right(cdr) => Pin::new_unchecked(cdr).poll(waker),
}
@@ -238,6 +239,7 @@ mod registry {
// Example
use bytes::Bytes;
use futures::{
compat::Executor01CompatExt,
future::{ready, Ready},
prelude::*,
};
@@ -328,7 +330,8 @@ impl<Services: registry::MaybeServe + Sync> BincodeRegistry<Services> {
fn serve(
self,
) -> impl FnOnce(
context::Context, registry::ServiceRequest
context::Context,
registry::ServiceRequest,
) -> registry::Either<
Services::Future,
Ready<io::Result<registry::ServiceResponse>>,
@@ -406,6 +409,6 @@ async fn run() -> io::Result<()> {
}
fn main() {
tarpc::init(futures::compat::TokioDefaultSpawner);
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(run().boxed().map_err(|e| panic!(e)).boxed().compat());
}

View File

@@ -4,155 +4,12 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
//! ## tarpc: Tim & Adam's RPC lib
//! [![Travis-CI Status](https://travis-ci.org/google/tarpc.png?branch=master)](https://travis-ci.org/google/tarpc)
//! [![Coverage Status](https://coveralls.io/repos/github/google/tarpc/badge.svg?branch=master)](https://coveralls.io/github/google/tarpc?branch=master)
//! [![Software License](https://img.shields.io/badge/license-MIT-brightgreen.svg)](LICENSE)
//! [![Latest Version](https://img.shields.io/crates/v/tarpc.svg)](https://crates.io/crates/tarpc)
//! [![Join the chat at https://gitter.im/tarpc/Lobby](https://badges.gitter.im/tarpc/Lobby.svg)](https://gitter.im/tarpc/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
//!
//! *Disclaimer*: This is not an official Google product.
//!
//! tarpc is an RPC framework for rust with a focus on ease of use. Defining a
//! service can be done in just a few lines of code, and most of the boilerplate of
//! writing a server is taken care of for you.
//!
//! [Documentation](https://docs.rs/crate/tarpc/)
//!
//! ## What is an RPC framework?
//! "RPC" stands for "Remote Procedure Call," a function call where the work of
//! producing the return value is being done somewhere else. When an rpc function is
//! invoked, behind the scenes the function contacts some other process somewhere
//! and asks them to evaluate the function instead. The original function then
//! returns the value produced by the other process.
//!
//! RPC frameworks are a fundamental building block of most microservices-oriented
//! architectures. Two well-known ones are [gRPC](http://www.grpc.io) and
//! [Cap'n Proto](https://capnproto.org/).
//!
//! tarpc differentiates itself from other RPC frameworks by defining the schema in code,
//! rather than in a separate language such as .proto. This means there's no separate compilation
//! process, and no cognitive context switching between different languages. Additionally, it
//! works with the community-backed library serde: any serde-serializable type can be used as
//! arguments to tarpc fns.
//!
//! ## Usage
//! Add to your `Cargo.toml` dependencies:
//!
//! ```toml
//! tarpc = "0.14.0"
//! ```
//!
//! The `service!` macro expands to a collection of items that form an
//! rpc service. In the above example, the macro is called within the
//! `hello_service` module. This module will contain a `Client` stub and `Service` trait. There is
//! These generated types make it easy and ergonomic to write servers without dealing with serialization
//! directly. Simply implement one of the generated traits, and you're off to the
//! races!
//!
//! ## Example
//!
//! Here's a small service.
//!
//! ```rust
//! #![feature(futures_api, pin, arbitrary_self_types, await_macro, async_await, proc_macro_hygiene)]
//!
//!
//! use futures::{
//! compat::TokioDefaultSpawner,
//! future::{self, Ready},
//! prelude::*,
//! };
//! use tarpc::{
//! client, context,
//! server::{self, Handler},
//! };
//! use std::io;
//!
//! // This is the service definition. It looks a lot like a trait definition.
//! // It defines one RPC, hello, which takes one arg, name, and returns a String.
//! tarpc::service! {
//! /// Returns a greeting for name.
//! rpc hello(name: String) -> String;
//! }
//!
//! // This is the type that implements the generated Service trait. It is the business logic
//! // and is used to start the server.
//! #[derive(Clone)]
//! struct HelloServer;
//!
//! impl Service for HelloServer {
//! // Each defined rpc generates two items in the trait, a fn that serves the RPC, and
//! // an associated type representing the future output by the fn.
//!
//! type HelloFut = Ready<String>;
//!
//! fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
//! future::ready(format!("Hello, {}!", name))
//! }
//! }
//!
//! async fn run() -> io::Result<()> {
//! // bincode_transport is provided by the associated crate bincode-transport. It makes it easy
//! // to start up a serde-powered bincode serialization strategy over TCP.
//! let transport = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
//! let addr = transport.local_addr();
//!
//! // The server is configured with the defaults.
//! let server = server::new(server::Config::default())
//! // Server can listen on any type that implements the Transport trait.
//! .incoming(transport)
//! // Close the stream after the client connects
//! .take(1)
//! // serve is generated by the service! macro. It takes as input any type implementing
//! // the generated Service trait.
//! .respond_with(serve(HelloServer));
//!
//! tokio_executor::spawn(server.unit_error().boxed().compat());
//!
//! let transport = await!(bincode_transport::connect(&addr))?;
//!
//! // new_stub is generated by the service! macro. Like Server, it takes a config and any
//! // Transport as input, and returns a Client, also generated by the macro.
//! // by the service mcro.
//! let mut client = await!(new_stub(client::Config::default(), transport))?;
//!
//! // The client has an RPC method for each RPC defined in service!. It takes the same args
//! // as defined, with the addition of a Context, which is always the first arg. The Context
//! // specifies a deadline and trace information which can be helpful in debugging requests.
//! let hello = await!(client.hello(context::current(), "Stim".to_string()))?;
//!
//! println!("{}", hello);
//!
//! Ok(())
//! }
//!
//! fn main() {
//! tarpc::init(TokioDefaultSpawner);
//! tokio::run(run()
//! .map_err(|e| eprintln!("Oh no: {}", e))
//! .boxed()
//! .compat(),
//! );
//! }
//! ```
//!
//! ## Service Documentation
//!
//! Use `cargo doc` as you normally would to see the documentation created for all
//! items expanded by a `service!` invocation.
#![doc(include = "../README.md")]
#![deny(missing_docs, missing_debug_implementations)]
#![feature(async_await)]
#![feature(async_await, external_doc)]
#![cfg_attr(
test,
feature(
pin,
futures_api,
await_macro,
proc_macro_hygiene,
arbitrary_self_types
)
feature(futures_api, await_macro, proc_macro_hygiene, arbitrary_self_types)
)]
#[doc(hidden)]

View File

@@ -177,11 +177,11 @@ macro_rules! service {
impl<S: Service> ::std::future::Future for ResponseFut<S> {
type Output = ::std::io::Result<Response>;
fn poll(self: ::std::pin::Pin<&mut Self>, waker: &::std::task::LocalWaker)
fn poll(self: ::std::pin::Pin<&mut Self>, waker: &::std::task::Waker)
-> ::std::task::Poll<::std::io::Result<Response>>
{
unsafe {
match ::std::pin::Pin::get_mut_unchecked(self) {
match ::std::pin::Pin::get_unchecked_mut(self) {
$(
ResponseFut::$fn_name(resp) =>
::std::pin::Pin::new_unchecked(resp)
@@ -282,7 +282,7 @@ mod syntax_test {
#[cfg(test)]
mod functional_test {
use futures::{
compat::TokioDefaultSpawner,
compat::Executor01CompatExt,
future::{ready, Ready},
prelude::*,
};
@@ -315,7 +315,7 @@ mod functional_test {
#[test]
fn sequential() {
let _ = env_logger::try_init();
rpc::init(TokioDefaultSpawner);
rpc::init(tokio::executor::DefaultExecutor::current().compat());
let test = async {
let (tx, rx) = channel::unbounded();
@@ -344,7 +344,7 @@ mod functional_test {
#[test]
fn concurrent() {
let _ = env_logger::try_init();
rpc::init(TokioDefaultSpawner);
rpc::init(tokio::executor::DefaultExecutor::current().compat());
let test = async {
let (tx, rx) = channel::unbounded();

View File

@@ -7,7 +7,6 @@
#![feature(
test,
arbitrary_self_types,
pin,
integer_atomics,
futures_api,
generators,
@@ -16,10 +15,8 @@
proc_macro_hygiene
)]
extern crate test;
use self::test::stats::Stats;
use futures::{compat::TokioDefaultSpawner, future, prelude::*};
use futures::{compat::Executor01CompatExt, future, prelude::*};
use libtest::stats::Stats;
use rpc::{
client, context,
server::{Handler, Server},
@@ -120,7 +117,7 @@ async fn bench() -> io::Result<()> {
#[test]
fn bench_small_packet() {
env_logger::init();
tarpc::init(TokioDefaultSpawner);
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(bench().map_err(|e| panic!(e.to_string())).boxed().compat())
}

View File

@@ -1,6 +1,6 @@
[package]
name = "tarpc-trace"
version = "0.1.0"
version = "0.2.0"
authors = ["tikue <tikue@google.com>"]
edition = '2018'
license = "MIT"
@@ -13,7 +13,7 @@ readme = "../README.md"
description = "foundations for tracing in tarpc"
[dependencies]
rand = "0.5"
rand = "0.6"
[dependencies.serde]
version = "1.0"

View File

@@ -27,10 +27,7 @@ use std::{
/// Consists of a span identifying an event, an optional parent span identifying a causal event
/// that triggered the current span, and a trace with which all related spans are associated.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
#[cfg_attr(
feature = "serde",
derive(serde::Serialize, serde::Deserialize)
)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Context {
/// An identifier of the trace associated with the current context. A trace ID is typically
/// created at a root span and passed along through all causal events.
@@ -50,18 +47,12 @@ pub struct Context {
/// A 128-bit UUID identifying a trace. All spans caused by the same originating span share the
/// same trace ID.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
#[cfg_attr(
feature = "serde",
derive(serde::Serialize, serde::Deserialize)
)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct TraceId(u128);
/// A 64-bit identifier of a span within a trace. The identifier is unique within the span's trace.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
#[cfg_attr(
feature = "serde",
derive(serde::Serialize, serde::Deserialize)
)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct SpanId(u64);
impl Context {
@@ -80,7 +71,7 @@ impl TraceId {
/// Returns a random trace ID that can be assumed to be globally unique if `rng` generates
/// actually-random numbers.
pub fn random<R: Rng>(rng: &mut R) -> Self {
TraceId((rng.next_u64() as u128) << mem::size_of::<u64>() | rng.next_u64() as u128)
TraceId(u128::from(rng.next_u64()) << mem::size_of::<u64>() | u128::from(rng.next_u64()))
}
}