27 Commits

Author SHA1 Message Date
Tim Kuehn
4e0be5b626 Publish tarpc v0.15.0 2019-03-26 21:13:41 -07:00
Artem Vorotnikov
5516034bbc Use libtest crate (#213) 2019-03-24 22:29:01 -07:00
Artem Vorotnikov
06544faa5a Update to futures 0.3.0-alpha.13 (#211) 2019-02-26 09:32:41 -08:00
Tim Kuehn
39737b720a Cargo fmt 2019-01-17 10:37:16 -08:00
Tim Kuehn
0f36985440 Update for latest changes to futures.
Fixes #209.
2019-01-17 10:37:03 -08:00
Tyler Bindon
959bb691cd Update regex to match diffs output by cargo fmt. (#208)
It appears the header of the diffs output by cargo fmt have changed. It now says "Diff in /blah/blah/blah.rs at line 99:" Matching on lines starting with + or - should be more future-proof against changes to the surroundings.
2018-12-09 01:59:35 -08:00
Tim
2a3162c5fa Cargo feature 'rename-dependency' is stabilized 2018-11-21 11:03:41 -08:00
Tim Kuehn
0cc976b729 cargo fmt 2018-11-06 17:01:27 -08:00
Tim Kuehn
4d2d3f24c6 Address Clippy lints 2018-11-06 17:00:15 -08:00
Tim Kuehn
2c7c64841f Add symlink tarpc/README.md -> README.md 2018-10-29 16:11:01 -07:00
Tim Kuehn
4ea142d0f3 Remove coverage badge.
It hasn't been updated in over 2 years.
2018-10-29 11:40:09 -07:00
Tim Kuehn
00751d2518 external_doc doesn't work with crates.io yet :( 2018-10-29 11:05:09 -07:00
Tim Kuehn
4394a52b65 Add doc tests to .travis.yml 2018-10-29 10:55:12 -07:00
Tim Kuehn
70938501d7 Use eternal_doc for tarpc package. This will ensure our README is always up-to-date. 2018-10-29 10:53:34 -07:00
Tim Kuehn
d5f5cf4300 Bump versions. 2018-10-29 10:43:41 -07:00
Tim Kuehn
e2c4164d8c Remove unused feature enablements from tarpc 2018-10-25 11:44:38 -07:00
Tim Kuehn
78124ef7a8 Cargo fmt 2018-10-25 11:44:18 -07:00
Tim Kuehn
096d354b7e Remove unused features 2018-10-25 11:41:08 -07:00
Tim
7ad0e4b070 Service registry (#204)
# Changes

## Client is now a trait
And `Channel<Req, Resp>` implements `Client<Req, Resp>`. Previously, `Client<Req, Resp>` was a thin wrapper around `Channel<Req, Resp>`.

This was changed to allow for mapping the request and response types. For example, you can take a `channel: Channel<Req, Resp>` and do:

```rust
channel
    .with_request(|req: Req2| -> Req { ... })
    .map_response(|resp: Resp| -> Resp2 { ... })
```

...which returns a type that implements `Client<Req2, Resp2>`.

### Why would you want to map request and response types?

The main benefit of this is that it enables creating different client types backed by the same channel. For example, you could run multiple clients multiplexing requests over a single `TcpStream`. I have a demo in `tarpc/examples/service_registry.rs` showing how you might do this with a bincode transport. I am considering factoring out the service registry portion of that to an actual library, because it's doing pretty cool stuff. For this PR, though, it'll just be part of the example.

## Client::new is now client::new

This is pretty minor, but necessary because async fns can't currently exist on traits. I changed `Server::new` to match this as well.

## Macro-generated Clients are generic over the backing Client.

This is a natural consequence of the above change. However, it is transparent to the user by keeping `Channel<Req, Resp>` as the default type for the `<C: Client>` type parameter. `new_stub` returns `Client<Channel<Req, Resp>>`, and other clients can be created via the `From` trait.

## example-service/ now has two binaries, one for client and one for server.

This serves as a "realistic" example of how one might set up a service. The other examples all run the client and server in the same binary, which isn't realistic in distributed systems use cases.

## `service!` trait fns take self by value.

Services are already cloned per request, so this just passes on that flexibility to the trait implementers.

# Open Questions

In the service registry example, multiple services are running on a single port, and thus multiple clients are sending requests over a single `TcpStream`. This has implications for throttling: [`max_in_flight_requests_per_connection`](https://github.com/google/tarpc/blob/master/rpc/src/server/mod.rs#L57-L60) will set a maximum for the sum of requests for all clients sharing a single connection. I think this is reasonable behavior, but users may expect this setting to act like `max_in_flight_requests_per_client`.

Fixes #103 #153 #205
2018-10-25 11:22:55 -07:00
Tim
64755d5329 Update futures 2018-10-19 11:19:25 -07:00
Tim Kuehn
3071422132 Helper fn to create transports 2018-10-18 00:24:26 -07:00
Tim Kuehn
8847330dbe impl From<S> for bincode::Transport<S> 2018-10-18 00:24:08 -07:00
Tim Kuehn
6d396520f4 Don't allow empty service invocations 2018-10-18 00:23:34 -07:00
Tim Kuehn
79a2f7fe2f Replace tokio-serde-bincode with async-bincode 2018-10-17 20:24:31 -07:00
Tim Kuehn
af66841f68 Remove keyword 2018-10-17 11:59:09 -07:00
Tim
1ab4cfdff9 Make Request and Resonse enums' docs public, because they show up in the serve fn. 2018-10-16 23:02:52 -07:00
Tim
f7e03eeeb7 Fix up readme 2018-10-16 22:28:57 -07:00
44 changed files with 1764 additions and 1215 deletions

View File

@@ -9,4 +9,5 @@ os:
- linux
script:
- cargo test --all --all-features
- cargo test --all-targets --all-features
- cargo test --doc --all-features

View File

@@ -1,6 +1,5 @@
## tarpc: Tim & Adam's RPC lib
[![Travis-CI Status](https://travis-ci.org/google/tarpc.png?branch=master)](https://travis-ci.org/google/tarpc)
[![Coverage Status](https://coveralls.io/repos/github/google/tarpc/badge.svg?branch=master)](https://coveralls.io/github/google/tarpc?branch=master)
[![Software License](https://img.shields.io/badge/license-MIT-brightgreen.svg)](LICENSE)
[![Latest Version](https://img.shields.io/crates/v/tarpc.svg)](https://crates.io/crates/tarpc)
[![Join the chat at https://gitter.im/tarpc/Lobby](https://badges.gitter.im/tarpc/Lobby.svg)](https://gitter.im/tarpc/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
@@ -31,17 +30,12 @@ works with the community-backed library serde: any serde-serializable type can b
arguments to tarpc fns.
## Usage
**NB**: *this example is for master. Are you looking for other
[versions](https://docs.rs/tarpc)?*
Add to your `Cargo.toml` dependencies:
```toml
tarpc = "0.12.0"
tarpc-plugins = "0.4.0"
tarpc = "0.15.0"
```
The `service!` macro expands to a collection of items that form an
rpc service. In the above example, the macro is called within the
`hello_service` module. This module will contain a `Client` stub and `Service` trait. There is
@@ -49,29 +43,29 @@ These generated types make it easy and ergonomic to write servers without dealin
directly. Simply implement one of the generated traits, and you're off to the
races!
## Example:
## Example
Here's a small service.
```rust
#![feature(plugin, futures_api, pin, arbitrary_self_types, await_macro, async_await)]
#![plugin(tarpc_plugins)]
#![feature(futures_api, pin, arbitrary_self_types, await_macro, async_await, proc_macro_hygiene)]
use futures::{
compat::TokioDefaultSpawner,
compat::Executor01CompatExt,
future::{self, Ready},
prelude::*,
spawn,
};
use tarpc::rpc::{
use tarpc::{
client, context,
server::{self, Handler, Server},
server::{self, Handler},
};
use std::io;
// This is the service definition. It looks a lot like a trait definition.
// It defines one RPC, hello, which takes one arg, name, and returns a String.
tarpc::service! {
/// Returns a greeting for name.
rpc hello(name: String) -> String;
}
@@ -86,7 +80,7 @@ impl Service for HelloServer {
type HelloFut = Ready<String>;
fn hello(&self, _: context::Context, name: String) -> Self::HelloFut {
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
future::ready(format!("Hello, {}!", name))
}
}
@@ -98,7 +92,7 @@ async fn run() -> io::Result<()> {
let addr = transport.local_addr();
// The server is configured with the defaults.
let server = Server::new(server::Config::default())
let server = server::new(server::Config::default())
// Server can listen on any type that implements the Transport trait.
.incoming(transport)
// Close the stream after the client connects
@@ -107,14 +101,14 @@ async fn run() -> io::Result<()> {
// the generated Service trait.
.respond_with(serve(HelloServer));
spawn!(server).unwrap();
tokio_executor::spawn(server.unit_error().boxed().compat());
let transport = await!(bincode_transport::connect(&addr))?;
// new_stub is generated by the service! macro. Like Server, it takes a config and any
// Transport as input, and returns a Client, also generated by the macro.
// by the service mcro.
let mut client = await!(new_stub(client::Config::default(), transport));
let mut client = await!(new_stub(client::Config::default(), transport))?;
// The client has an RPC method for each RPC defined in service!. It takes the same args
// as defined, with the addition of a Context, which is always the first arg. The Context
@@ -127,10 +121,11 @@ async fn run() -> io::Result<()> {
}
fn main() {
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(run()
.map_err(|e| eprintln!("Oh no: {}", e))
.boxed()
.compat(TokioDefaultSpawner),
.compat(),
);
}
```
@@ -139,13 +134,3 @@ fn main() {
Use `cargo doc` as you normally would to see the documentation created for all
items expanded by a `service!` invocation.
## Contributing
To contribute to tarpc, please see [CONTRIBUTING](CONTRIBUTING.md).
## License
tarpc is distributed under the terms of the MIT license.
See [LICENSE](LICENSE) for details.

View File

@@ -1,8 +1,6 @@
cargo-features = ["rename-dependency"]
[package]
name = "tarpc-bincode-transport"
version = "0.1.0"
version = "0.4.0"
authors = ["Tim Kuehn <tikue@google.com>"]
edition = '2018'
license = "MIT"
@@ -16,28 +14,26 @@ description = "A bincode-based transport for tarpc services."
[dependencies]
bincode = { version = "1.0", features = ["i128"] }
bytes = "0.4"
futures_legacy = { version = "0.1", package = "futures" }
pin-utils = "0.1.0-alpha.2"
rpc = { package = "tarpc-lib", version = "0.1", path = "../rpc", features = ["serde1"] }
pin-utils = "0.1.0-alpha.4"
rpc = { package = "tarpc-lib", version = "0.3", path = "../rpc", features = ["serde1"] }
serde = "1.0"
tokio = "0.1"
tokio-io = "0.1"
tokio-serde-bincode = "0.1"
async-bincode = "0.4"
tokio-tcp = "0.1"
tokio-serde = "0.2"
[target.'cfg(not(test))'.dependencies]
futures-preview = { version = "0.3.0-alpha.8", features = ["compat"] }
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] }
[dev-dependencies]
futures-preview = { version = "0.3.0-alpha.8", features = ["compat", "tokio-compat"] }
env_logger = "0.5"
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] }
env_logger = "0.6"
humantime = "1.0"
libtest = "0.0.1"
log = "0.4"
rand = "0.5"
rand = "0.6"
tokio = "0.1"
tokio-executor = "0.1"
tokio-reactor = "0.1"
tokio-serde = "0.2"
tokio-serde = "0.3"
tokio-timer = "0.2"

View File

@@ -1 +1 @@
edition = "Edition2018"
edition = "2018"

View File

@@ -0,0 +1,150 @@
use futures::{compat::Stream01CompatExt, prelude::*, ready};
use futures_legacy::{
executor::{
self as executor01, Notify as Notify01, NotifyHandle as NotifyHandle01,
UnsafeNotify as UnsafeNotify01,
},
Async as Async01, AsyncSink as AsyncSink01, Sink as Sink01, Stream as Stream01,
};
use std::{
pin::Pin,
task::{self, Poll, Waker},
};
/// A shim to convert a 0.1 Sink + Stream to a 0.3 Sink + Stream.
#[derive(Debug)]
pub struct Compat<S, SinkItem> {
staged_item: Option<SinkItem>,
inner: S,
}
impl<S, SinkItem> Compat<S, SinkItem> {
/// Returns a new Compat.
pub fn new(inner: S) -> Self {
Compat {
inner,
staged_item: None,
}
}
/// Unwraps Compat, returning the inner value.
pub fn into_inner(self) -> S {
self.inner
}
/// Returns a reference to the value wrapped by Compat.
pub fn get_ref(&self) -> &S {
&self.inner
}
}
impl<S, SinkItem> Stream for Compat<S, SinkItem>
where
S: Stream01,
{
type Item = Result<S::Item, S::Error>;
fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<Self::Item>> {
unsafe {
let inner = &mut Pin::get_unchecked_mut(self).inner;
let mut compat = inner.compat();
let compat = Pin::new_unchecked(&mut compat);
match ready!(compat.poll_next(waker)) {
None => Poll::Ready(None),
Some(Ok(next)) => Poll::Ready(Some(Ok(next))),
Some(Err(e)) => Poll::Ready(Some(Err(e))),
}
}
}
}
impl<S, SinkItem> Sink for Compat<S, SinkItem>
where
S: Sink01<SinkItem = SinkItem>,
{
type SinkItem = SinkItem;
type SinkError = S::SinkError;
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> Result<(), S::SinkError> {
let me = unsafe { Pin::get_unchecked_mut(self) };
assert!(me.staged_item.is_none());
me.staged_item = Some(item);
Ok(())
}
fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
let notify = &WakerToHandle(waker);
executor01::with_notify(notify, 0, move || {
let me = unsafe { Pin::get_unchecked_mut(self) };
match me.staged_item.take() {
Some(staged_item) => match me.inner.start_send(staged_item) {
Ok(AsyncSink01::Ready) => Poll::Ready(Ok(())),
Ok(AsyncSink01::NotReady(item)) => {
me.staged_item = Some(item);
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
},
None => Poll::Ready(Ok(())),
}
})
}
fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
let notify = &WakerToHandle(waker);
executor01::with_notify(notify, 0, move || {
let me = unsafe { Pin::get_unchecked_mut(self) };
match me.inner.poll_complete() {
Ok(Async01::Ready(())) => Poll::Ready(Ok(())),
Ok(Async01::NotReady) => Poll::Pending,
Err(e) => Poll::Ready(Err(e)),
}
})
}
fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
let notify = &WakerToHandle(waker);
executor01::with_notify(notify, 0, move || {
let me = unsafe { Pin::get_unchecked_mut(self) };
match me.inner.close() {
Ok(Async01::Ready(())) => Poll::Ready(Ok(())),
Ok(Async01::NotReady) => Poll::Pending,
Err(e) => Poll::Ready(Err(e)),
}
})
}
}
#[derive(Clone, Debug)]
struct WakerToHandle<'a>(&'a Waker);
#[derive(Debug)]
struct NotifyWaker(task::Waker);
impl Notify01 for NotifyWaker {
fn notify(&self, _: usize) {
self.0.wake();
}
}
unsafe impl UnsafeNotify01 for NotifyWaker {
unsafe fn clone_raw(&self) -> NotifyHandle01 {
let ptr = Box::new(NotifyWaker(self.0.clone()));
NotifyHandle01::new(Box::into_raw(ptr))
}
unsafe fn drop_raw(&self) {
let ptr: *const dyn UnsafeNotify01 = self;
drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
}
}
impl<'a> From<WakerToHandle<'a>> for NotifyHandle01 {
fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 {
unsafe { NotifyWaker(handle.0.clone()).clone_raw() }
}
}

View File

@@ -6,79 +6,147 @@
//! A TCP [`Transport`] that serializes as bincode.
#![feature(
futures_api,
pin,
arbitrary_self_types,
underscore_imports,
await_macro,
async_await,
)]
#![feature(futures_api, arbitrary_self_types, await_macro, async_await)]
#![deny(missing_docs, missing_debug_implementations)]
mod vendored;
use bytes::{Bytes, BytesMut};
use crate::vendored::tokio_serde_bincode::{IoErrorWrapper, ReadBincode, WriteBincode};
use self::compat::Compat;
use async_bincode::{AsyncBincodeStream, AsyncDestination};
use futures::{
Poll,
compat::{Compat01As03, Future01CompatExt, Stream01CompatExt},
prelude::*,
ready, task,
};
use futures_legacy::{
executor::{
self as executor01, Notify as Notify01, NotifyHandle as NotifyHandle01,
UnsafeNotify as UnsafeNotify01,
},
sink::SinkMapErr as SinkMapErr01,
sink::With as With01,
stream::MapErr as MapErr01,
Async as Async01, AsyncSink as AsyncSink01, Sink as Sink01, Stream as Stream01,
ready,
};
use pin_utils::unsafe_pinned;
use serde::{Deserialize, Serialize};
use std::{fmt, io, marker::PhantomData, net::SocketAddr, pin::Pin, task::LocalWaker};
use tokio::codec::{Framed, LengthDelimitedCodec, length_delimited};
use tokio_tcp::{self, TcpListener, TcpStream};
use std::{
error::Error,
io,
marker::PhantomData,
net::SocketAddr,
pin::Pin,
task::{Poll, Waker},
};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tcp::{TcpListener, TcpStream};
/// Returns a new bincode transport that reads from and writes to `io`.
pub fn new<Item, SinkItem>(io: TcpStream) -> Transport<Item, SinkItem>
where
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
{
let peer_addr = io.peer_addr();
let local_addr = io.local_addr();
let inner = length_delimited::Builder::new()
.max_frame_length(8_000_000)
.new_framed(io)
.map_err(IoErrorWrapper as _)
.sink_map_err(IoErrorWrapper as _)
.with(freeze as _);
let inner = WriteBincode::new(inner);
let inner = ReadBincode::new(inner);
mod compat;
Transport {
inner,
staged_item: None,
peer_addr,
local_addr,
/// A transport that serializes to, and deserializes from, a [`TcpStream`].
#[derive(Debug)]
pub struct Transport<S, Item, SinkItem> {
inner: Compat<AsyncBincodeStream<S, Item, SinkItem, AsyncDestination>, SinkItem>,
}
impl<S, Item, SinkItem> Transport<S, Item, SinkItem> {
/// Returns the transport underlying the bincode transport.
pub fn into_inner(self) -> S {
self.inner.into_inner().into_inner()
}
}
fn freeze(bytes: BytesMut) -> Result<Bytes, IoErrorWrapper> {
Ok(bytes.freeze())
impl<S, Item, SinkItem> Transport<S, Item, SinkItem> {
unsafe_pinned!(
inner: Compat<AsyncBincodeStream<S, Item, SinkItem, AsyncDestination>, SinkItem>
);
}
/// Connects to `addr`, wrapping the connection in a bincode transport.
pub async fn connect<Item, SinkItem>(addr: &SocketAddr) -> io::Result<Transport<Item, SinkItem>>
impl<S, Item, SinkItem> Stream for Transport<S, Item, SinkItem>
where
S: AsyncRead,
Item: for<'a> Deserialize<'a>,
{
type Item = io::Result<Item>;
fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<io::Result<Item>>> {
match self.inner().poll_next(waker) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(next))) => Poll::Ready(Some(Ok(next))),
Poll::Ready(Some(Err(e))) => {
Poll::Ready(Some(Err(io::Error::new(io::ErrorKind::Other, e))))
}
}
}
}
impl<S, Item, SinkItem> Sink for Transport<S, Item, SinkItem>
where
S: AsyncWrite,
SinkItem: Serialize,
{
type SinkItem = SinkItem;
type SinkError = io::Error;
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
self.inner()
.start_send(item)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<()>> {
convert(self.inner().poll_ready(waker))
}
fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<()>> {
convert(self.inner().poll_flush(waker))
}
fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<()>> {
convert(self.inner().poll_close(waker))
}
}
fn convert<E: Into<Box<Error + Send + Sync>>>(poll: Poll<Result<(), E>>) -> Poll<io::Result<()>> {
match poll {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))),
}
}
impl<Item, SinkItem> rpc::Transport for Transport<TcpStream, Item, SinkItem>
where
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
{
let stream = await!(TcpStream::connect(addr).compat())?;
Ok(new(stream))
type Item = Item;
type SinkItem = SinkItem;
fn peer_addr(&self) -> io::Result<SocketAddr> {
self.inner.get_ref().get_ref().peer_addr()
}
fn local_addr(&self) -> io::Result<SocketAddr> {
self.inner.get_ref().get_ref().local_addr()
}
}
/// Returns a new bincode transport that reads from and writes to `io`.
pub fn new<Item, SinkItem>(io: TcpStream) -> Transport<TcpStream, Item, SinkItem>
where
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
{
Transport::from(io)
}
impl<S, Item, SinkItem> From<S> for Transport<S, Item, SinkItem> {
fn from(inner: S) -> Self {
Transport {
inner: Compat::new(AsyncBincodeStream::from(inner).for_async()),
}
}
}
/// Connects to `addr`, wrapping the connection in a bincode transport.
pub async fn connect<Item, SinkItem>(
addr: &SocketAddr,
) -> io::Result<Transport<TcpStream, Item, SinkItem>>
where
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
{
Ok(new(await!(TcpStream::connect(addr).compat())?))
}
/// Listens on `addr`, wrapping accepted connections in bincode transports.
@@ -119,168 +187,10 @@ where
Item: for<'a> Deserialize<'a>,
SinkItem: Serialize,
{
type Item = io::Result<Transport<Item, SinkItem>>;
type Item = io::Result<Transport<TcpStream, Item, SinkItem>>;
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<Self::Item>> {
fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<Self::Item>> {
let next = ready!(self.incoming().poll_next(waker)?);
Poll::Ready(next.map(|conn| Ok(new(conn))))
}
}
/// A transport that serializes to, and deserializes from, a [`TcpStream`].
pub struct Transport<Item, SinkItem> {
inner: ReadBincode<
WriteBincode<
With01<
SinkMapErr01<
MapErr01<
Framed<tokio_tcp::TcpStream, LengthDelimitedCodec>,
fn(std::io::Error) -> IoErrorWrapper,
>,
fn(std::io::Error) -> IoErrorWrapper,
>,
BytesMut,
fn(BytesMut) -> Result<Bytes, IoErrorWrapper>,
Result<Bytes, IoErrorWrapper>
>,
SinkItem,
>,
Item,
>,
staged_item: Option<SinkItem>,
peer_addr: io::Result<SocketAddr>,
local_addr: io::Result<SocketAddr>,
}
impl<Item, SinkItem> fmt::Debug for Transport<Item, SinkItem> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Transport")
}
}
impl<Item, SinkItem> Stream for Transport<Item, SinkItem>
where
Item: for<'a> Deserialize<'a>,
{
type Item = io::Result<Item>;
fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<io::Result<Item>>> {
unsafe {
let inner = &mut Pin::get_mut_unchecked(self).inner;
let mut compat = inner.compat();
let compat = Pin::new_unchecked(&mut compat);
match ready!(compat.poll_next(waker)) {
None => Poll::Ready(None),
Some(Ok(next)) => Poll::Ready(Some(Ok(next))),
Some(Err(e)) => Poll::Ready(Some(Err(e.0))),
}
}
}
}
impl<Item, SinkItem> Sink for Transport<Item, SinkItem>
where
SinkItem: Serialize,
{
type SinkItem = SinkItem;
type SinkError = io::Error;
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
let me = unsafe { Pin::get_mut_unchecked(self) };
assert!(me.staged_item.is_none());
me.staged_item = Some(item);
Ok(())
}
fn poll_ready(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
let notify = &WakerToHandle(waker);
executor01::with_notify(notify, 0, move || {
let me = unsafe { Pin::get_mut_unchecked(self) };
match me.staged_item.take() {
Some(staged_item) => match me.inner.start_send(staged_item)? {
AsyncSink01::Ready => Poll::Ready(Ok(())),
AsyncSink01::NotReady(item) => {
me.staged_item = Some(item);
Poll::Pending
}
},
None => Poll::Ready(Ok(())),
}
})
}
fn poll_flush(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
let notify = &WakerToHandle(waker);
executor01::with_notify(notify, 0, move || {
let me = unsafe { Pin::get_mut_unchecked(self) };
match me.inner.poll_complete()? {
Async01::Ready(()) => Poll::Ready(Ok(())),
Async01::NotReady => Poll::Pending,
}
})
}
fn poll_close(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
let notify = &WakerToHandle(waker);
executor01::with_notify(notify, 0, move || {
let me = unsafe { Pin::get_mut_unchecked(self) };
match me.inner.get_mut().close()? {
Async01::Ready(()) => Poll::Ready(Ok(())),
Async01::NotReady => Poll::Pending,
}
})
}
}
impl<Item, SinkItem> rpc::Transport for Transport<Item, SinkItem>
where
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
{
type Item = Item;
type SinkItem = SinkItem;
fn peer_addr(&self) -> io::Result<SocketAddr> {
// TODO: should just access from the inner transport.
// https://github.com/alexcrichton/tokio-serde-bincode/issues/4
Ok(*self.peer_addr.as_ref().unwrap())
}
fn local_addr(&self) -> io::Result<SocketAddr> {
Ok(*self.local_addr.as_ref().unwrap())
}
}
#[derive(Clone, Debug)]
struct WakerToHandle<'a>(&'a LocalWaker);
#[derive(Debug)]
struct NotifyWaker(task::Waker);
impl Notify01 for NotifyWaker {
fn notify(&self, _: usize) {
self.0.wake();
}
}
unsafe impl UnsafeNotify01 for NotifyWaker {
unsafe fn clone_raw(&self) -> NotifyHandle01 {
let ptr = Box::new(NotifyWaker(self.0.clone()));
NotifyHandle01::new(Box::into_raw(ptr))
}
unsafe fn drop_raw(&self) {
let ptr: *const dyn UnsafeNotify01 = self;
drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
}
}
impl<'a> From<WakerToHandle<'a>> for NotifyHandle01 {
fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 {
unsafe { NotifyWaker(handle.0.clone().into_waker()).clone_raw() }
}
}

View File

@@ -1,7 +0,0 @@
// Copyright 2018 Google LLC
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
pub(crate) mod tokio_serde_bincode;

View File

@@ -1,224 +0,0 @@
//! `Stream` and `Sink` adaptors for serializing and deserializing values using
//! Bincode.
//!
//! This crate provides adaptors for going from a stream or sink of buffers
//! ([`Bytes`]) to a stream or sink of values by performing Bincode encoding or
//! decoding. It is expected that each yielded buffer contains a single
//! serialized Bincode value. The specific strategy by which this is done is left
//! up to the user. One option is to use using [`length_delimited`] from
//! [tokio-io].
//!
//! [`Bytes`]: https://docs.rs/bytes/0.4/bytes/struct.Bytes.html
//! [`length_delimited`]: http://alexcrichton.com/tokio-io/tokio_io/codec/length_delimited/index.html
//! [tokio-io]: http://github.com/alexcrichton/tokio-io
//! [examples]: https://github.com/carllerche/tokio-serde-json/tree/master/examples
#![allow(missing_debug_implementations)]
use bincode::Error;
use bytes::{Bytes, BytesMut};
use futures_legacy::{Poll, Sink, StartSend, Stream};
use serde::{Deserialize, Serialize};
use std::io;
use tokio_serde::{Deserializer, FramedRead, FramedWrite, Serializer};
use std::marker::PhantomData;
/// Adapts a stream of Bincode encoded buffers to a stream of values by
/// deserializing them.
///
/// `ReadBincode` implements `Stream` by polling the inner buffer stream and
/// deserializing the buffer as Bincode. It expects that each yielded buffer
/// represents a single Bincode value and does not contain any extra trailing
/// bytes.
pub(crate) struct ReadBincode<T, U> {
inner: FramedRead<T, U, Bincode<U>>,
}
/// Adapts a buffer sink to a value sink by serializing the values as Bincode.
///
/// `WriteBincode` implements `Sink` by serializing the submitted values to a
/// buffer. The buffer is then sent to the inner stream, which is responsible
/// for handling framing on the wire.
pub(crate) struct WriteBincode<T: Sink, U> {
inner: FramedWrite<T, U, Bincode<U>>,
}
struct Bincode<T> {
ghost: PhantomData<T>,
}
impl<T, U> ReadBincode<T, U>
where
T: Stream<Error = IoErrorWrapper>,
U: for<'de> Deserialize<'de>,
Bytes: From<T::Item>,
{
/// Creates a new `ReadBincode` with the given buffer stream.
pub fn new(inner: T) -> ReadBincode<T, U> {
let json = Bincode { ghost: PhantomData };
ReadBincode {
inner: FramedRead::new(inner, json),
}
}
}
impl<T, U> ReadBincode<T, U> {
/// Returns a mutable reference to the underlying stream wrapped by
/// `ReadBincode`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_mut(&mut self) -> &mut T {
self.inner.get_mut()
}
}
impl<T, U> Stream for ReadBincode<T, U>
where
T: Stream<Error = IoErrorWrapper>,
U: for<'de> Deserialize<'de>,
Bytes: From<T::Item>,
{
type Item = U;
type Error = <T as Stream>::Error;
fn poll(&mut self) -> Poll<Option<U>, Self::Error> {
self.inner.poll()
}
}
impl<T, U> Sink for ReadBincode<T, U>
where
T: Sink,
{
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
self.get_mut().start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
self.get_mut().poll_complete()
}
fn close(&mut self) -> Poll<(), T::SinkError> {
self.get_mut().close()
}
}
pub(crate) struct IoErrorWrapper(pub io::Error);
impl From<Box<bincode::ErrorKind>> for IoErrorWrapper {
fn from(e: Box<bincode::ErrorKind>) -> Self {
IoErrorWrapper(match *e {
bincode::ErrorKind::Io(e) => e,
bincode::ErrorKind::InvalidUtf8Encoding(e) => {
io::Error::new(io::ErrorKind::InvalidInput, e)
}
bincode::ErrorKind::InvalidBoolEncoding(e) => {
io::Error::new(io::ErrorKind::InvalidInput, e.to_string())
}
bincode::ErrorKind::InvalidTagEncoding(e) => {
io::Error::new(io::ErrorKind::InvalidInput, e.to_string())
}
bincode::ErrorKind::InvalidCharEncoding => {
io::Error::new(io::ErrorKind::InvalidInput, "Invalid char encoding")
}
bincode::ErrorKind::DeserializeAnyNotSupported => {
io::Error::new(io::ErrorKind::InvalidInput, "Deserialize Any not supported")
}
bincode::ErrorKind::SizeLimit => {
io::Error::new(io::ErrorKind::InvalidInput, "Size limit exceeded")
}
bincode::ErrorKind::SequenceMustHaveLength => {
io::Error::new(io::ErrorKind::InvalidInput, "Sequence must have length")
}
bincode::ErrorKind::Custom(s) => io::Error::new(io::ErrorKind::Other, s),
})
}
}
impl From<IoErrorWrapper> for io::Error {
fn from(wrapper: IoErrorWrapper) -> io::Error {
wrapper.0
}
}
impl<T, U> WriteBincode<T, U>
where
T: Sink<SinkItem = BytesMut, SinkError = IoErrorWrapper>,
U: Serialize,
{
/// Creates a new `WriteBincode` with the given buffer sink.
pub fn new(inner: T) -> WriteBincode<T, U> {
let json = Bincode { ghost: PhantomData };
WriteBincode {
inner: FramedWrite::new(inner, json),
}
}
}
impl<T: Sink, U> WriteBincode<T, U> {
/// Returns a mutable reference to the underlying sink wrapped by
/// `WriteBincode`.
///
/// Note that care should be taken to not tamper with the underlying sink as
/// it may corrupt the sequence of frames otherwise being worked with.
pub fn get_mut(&mut self) -> &mut T {
self.inner.get_mut()
}
}
impl<T, U> Sink for WriteBincode<T, U>
where
T: Sink<SinkItem = BytesMut, SinkError = IoErrorWrapper>,
U: Serialize,
{
type SinkItem = U;
type SinkError = <T as Sink>::SinkError;
fn start_send(&mut self, item: U) -> StartSend<U, Self::SinkError> {
self.inner.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
self.inner.poll_complete()
}
fn close(&mut self) -> Poll<(), Self::SinkError> {
self.inner.poll_complete()
}
}
impl<T, U> Stream for WriteBincode<T, U>
where
T: Stream + Sink,
{
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
self.get_mut().poll()
}
}
impl<T> Deserializer<T> for Bincode<T>
where
T: for<'de> Deserialize<'de>,
{
type Error = Error;
fn deserialize(&mut self, src: &Bytes) -> Result<T, Error> {
bincode::deserialize(src)
}
}
impl<T: Serialize> Serializer<T> for Bincode<T> {
type Error = Error;
fn serialize(&mut self, item: &T) -> Result<BytesMut, Self::Error> {
bincode::serialize(item).map(Into::into)
}
}

View File

@@ -15,14 +15,11 @@
async_await
)]
extern crate test;
use self::test::stats::Stats;
use futures::{compat::TokioDefaultSpawner, prelude::*};
use futures::{compat::Executor01CompatExt, prelude::*};
use libtest::stats::Stats;
use rpc::{
client::{self, Client},
context,
server::{self, Handler, Server},
client, context,
server::{Handler, Server},
};
use std::{
io,
@@ -34,17 +31,17 @@ async fn bench() -> io::Result<()> {
let addr = listener.local_addr();
tokio_executor::spawn(
Server::<u32, u32>::new(server::Config::default())
Server::<u32, u32>::default()
.incoming(listener)
.take(1)
.respond_with(|_ctx, request| futures::future::ready(Ok(request)))
.unit_error()
.boxed()
.compat()
.compat(),
);
let conn = await!(tarpc_bincode_transport::connect(&addr))?;
let client = &mut await!(Client::<u32, u32>::new(client::Config::default(), conn))?;
let client = &mut await!(client::new::<u32, u32, _>(client::Config::default(), conn))?;
let total = 10_000usize;
let mut successful = 0u32;
@@ -102,14 +99,9 @@ async fn bench() -> io::Result<()> {
#[test]
fn bench_small_packet() -> io::Result<()> {
env_logger::init();
rpc::init(TokioDefaultSpawner);
rpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(
bench()
.map_err(|e| panic!(e.to_string()))
.boxed()
.compat(),
);
tokio::run(bench().map_err(|e| panic!(e.to_string())).boxed().compat());
println!("done");
Ok(())

View File

@@ -6,20 +6,16 @@
//! Tests client/server control flow.
#![feature(generators, await_macro, async_await, futures_api,)]
#![feature(generators, await_macro, async_await, futures_api)]
use futures::{
compat::{Future01CompatExt, TokioDefaultSpawner},
compat::{Executor01CompatExt, Future01CompatExt},
prelude::*,
stream,
};
use log::{info, trace};
use rand::distributions::{Distribution, Normal};
use rpc::{
client::{self, Client},
context,
server::{self, Server},
};
use rpc::{client, context, server::Server};
use std::{
io,
time::{Duration, Instant, SystemTime},
@@ -40,7 +36,7 @@ impl AsDuration for SystemTime {
async fn run() -> io::Result<()> {
let listener = tarpc_bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = listener.local_addr();
let server = Server::<String, String>::new(server::Config::default())
let server = Server::<String, String>::default()
.incoming(listener)
.take(1)
.for_each(async move |channel| {
@@ -80,7 +76,7 @@ async fn run() -> io::Result<()> {
tokio_executor::spawn(server.unit_error().boxed().compat());
let conn = await!(tarpc_bincode_transport::connect(&addr))?;
let client = await!(Client::<String, String>::new(
let client = await!(client::new::<String, String, _>(
client::Config::default(),
conn
))?;
@@ -88,7 +84,7 @@ async fn run() -> io::Result<()> {
// Proxy service
let listener = tarpc_bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = listener.local_addr();
let proxy_server = Server::<String, String>::new(server::Config::default())
let proxy_server = Server::<String, String>::default()
.incoming(listener)
.take(1)
.for_each(move |channel| {
@@ -115,7 +111,7 @@ async fn run() -> io::Result<()> {
config.max_in_flight_requests = 10;
config.pending_request_buffer = 10;
let client = await!(Client::<String, String>::new(
let client = await!(client::new::<String, String, _>(
config,
await!(tarpc_bincode_transport::connect(&addr))?
))?;
@@ -140,13 +136,8 @@ async fn run() -> io::Result<()> {
#[test]
fn cancel_slower() -> io::Result<()> {
env_logger::init();
rpc::init(TokioDefaultSpawner);
rpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(
run()
.boxed()
.map_err(|e| panic!(e))
.compat(),
);
tokio::run(run().boxed().map_err(|e| panic!(e)).compat());
Ok(())
}

View File

@@ -6,19 +6,15 @@
//! Tests client/server control flow.
#![feature(generators, await_macro, async_await, futures_api,)]
#![feature(generators, await_macro, async_await, futures_api)]
use futures::{
compat::{Future01CompatExt, TokioDefaultSpawner},
compat::{Executor01CompatExt, Future01CompatExt},
prelude::*,
};
use log::{error, info, trace};
use rand::distributions::{Distribution, Normal};
use rpc::{
client::{self, Client},
context,
server::{self, Server},
};
use rpc::{client, context, server::Server};
use std::{
io,
time::{Duration, Instant, SystemTime},
@@ -39,7 +35,7 @@ impl AsDuration for SystemTime {
async fn run() -> io::Result<()> {
let listener = tarpc_bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = listener.local_addr();
let server = Server::<String, String>::new(server::Config::default())
let server = Server::<String, String>::default()
.incoming(listener)
.take(1)
.for_each(async move |channel| {
@@ -83,7 +79,7 @@ async fn run() -> io::Result<()> {
config.pending_request_buffer = 10;
let conn = await!(tarpc_bincode_transport::connect(&addr))?;
let client = await!(Client::<String, String>::new(config, conn))?;
let client = await!(client::new::<String, String, _>(config, conn))?;
let clients = (1..=100u32).map(|_| client.clone()).collect::<Vec<_>>();
for mut client in clients {
@@ -96,7 +92,10 @@ async fn run() -> io::Result<()> {
Ok(response) => info!("[{}] response: {}", trace_id, response),
Err(e) => error!("[{}] request error: {:?}: {}", trace_id, e.kind(), e),
}
}.unit_error().boxed().compat()
}
.unit_error()
.boxed()
.compat(),
);
}
@@ -106,7 +105,7 @@ async fn run() -> io::Result<()> {
#[test]
fn ping_pong() -> io::Result<()> {
env_logger::init();
rpc::init(TokioDefaultSpawner);
rpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(
run()

View File

@@ -1,24 +1,23 @@
cargo-features = ["rename-dependency"]
[package]
name = "tarpc-example-service"
version = "0.1.0"
version = "0.3.0"
authors = ["Tim Kuehn <tikue@google.com>"]
edition = "2018"
license = "MIT"
documentation = "https://docs.rs/tarpc-example-service"
homepage = "https://github.com/google/tarpc"
repository = "https://github.com/google/tarpc"
keywords = ["rpc", "network", "server", "api", "microservices", "example"]
keywords = ["rpc", "network", "server", "microservices", "example"]
categories = ["asynchronous", "network-programming"]
readme = "../README.md"
description = "An example server built on tarpc."
[dependencies]
bincode-transport = { package = "tarpc-bincode-transport", version = "0.1", path = "../bincode-transport" }
futures-preview = { version = "0.3.0-alpha.8", features = ["compat", "tokio-compat"] }
bincode-transport = { package = "tarpc-bincode-transport", version = "0.4", path = "../bincode-transport" }
clap = "2.0"
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] }
serde = { version = "1.0" }
tarpc = { version = "0.13", path = "../tarpc", features = ["serde1"] }
tarpc = { version = "0.15", path = "../tarpc", features = ["serde1"] }
tokio = "0.1"
tokio-executor = "0.1"
@@ -28,4 +27,8 @@ path = "src/lib.rs"
[[bin]]
name = "server"
path = "src/main.rs"
path = "src/server.rs"
[[bin]]
name = "client"
path = "src/client.rs"

View File

@@ -0,0 +1,73 @@
// Copyright 2018 Google LLC
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
#![feature(futures_api, arbitrary_self_types, await_macro, async_await)]
use clap::{App, Arg};
use futures::{compat::Executor01CompatExt, prelude::*};
use std::{io, net::SocketAddr};
use tarpc::{client, context};
async fn run(server_addr: SocketAddr, name: String) -> io::Result<()> {
let transport = await!(bincode_transport::connect(&server_addr))?;
// new_stub is generated by the service! macro. Like Server, it takes a config and any
// Transport as input, and returns a Client, also generated by the macro.
// by the service mcro.
let mut client = await!(service::new_stub(client::Config::default(), transport))?;
// The client has an RPC method for each RPC defined in service!. It takes the same args
// as defined, with the addition of a Context, which is always the first arg. The Context
// specifies a deadline and trace information which can be helpful in debugging requests.
let hello = await!(client.hello(context::current(), name))?;
println!("{}", hello);
Ok(())
}
fn main() {
let flags = App::new("Hello Client")
.version("0.1")
.author("Tim <tikue@google.com>")
.about("Say hello!")
.arg(
Arg::with_name("server_addr")
.long("server_addr")
.value_name("ADDRESS")
.help("Sets the server address to connect to.")
.required(true)
.takes_value(true),
)
.arg(
Arg::with_name("name")
.short("n")
.long("name")
.value_name("STRING")
.help("Sets the name to say hello to.")
.required(true)
.takes_value(true),
)
.get_matches();
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
let server_addr = flags.value_of("server_addr").unwrap();
let server_addr = server_addr
.parse()
.unwrap_or_else(|e| panic!(r#"--server_addr value "{}" invalid: {}"#, server_addr, e));
let name = flags.value_of("name").unwrap();
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(
run(server_addr, name.into())
.map_err(|e| eprintln!("Oh no: {}", e))
.boxed()
.compat(),
);
}

View File

@@ -6,11 +6,10 @@
#![feature(
futures_api,
pin,
arbitrary_self_types,
await_macro,
async_await,
proc_macro_hygiene,
proc_macro_hygiene
)]
// This is the service definition. It looks a lot like a trait definition.

View File

@@ -1,85 +0,0 @@
// Copyright 2018 Google LLC
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
#![feature(
futures_api,
pin,
arbitrary_self_types,
await_macro,
async_await,
)]
use futures::{
compat::TokioDefaultSpawner,
future::{self, Ready},
prelude::*,
};
use tarpc::{
client, context,
server::{self, Handler, Server},
};
use std::io;
// This is the type that implements the generated Service trait. It is the business logic
// and is used to start the server.
#[derive(Clone)]
struct HelloServer;
impl service::Service for HelloServer {
// Each defined rpc generates two items in the trait, a fn that serves the RPC, and
// an associated type representing the future output by the fn.
type HelloFut = Ready<String>;
fn hello(&self, _: context::Context, name: String) -> Self::HelloFut {
future::ready(format!("Hello, {}!", name))
}
}
async fn run() -> io::Result<()> {
// bincode_transport is provided by the associated crate bincode-transport. It makes it easy
// to start up a serde-powered bincode serialization strategy over TCP.
let transport = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = transport.local_addr();
// The server is configured with the defaults.
let server = Server::new(server::Config::default())
// Server can listen on any type that implements the Transport trait.
.incoming(transport)
// Close the stream after the client connects
.take(1)
// serve is generated by the service! macro. It takes as input any type implementing
// the generated Service trait.
.respond_with(service::serve(HelloServer));
tokio_executor::spawn(server.unit_error().boxed().compat());
let transport = await!(bincode_transport::connect(&addr))?;
// new_stub is generated by the service! macro. Like Server, it takes a config and any
// Transport as input, and returns a Client, also generated by the macro.
// by the service mcro.
let mut client = await!(service::new_stub(client::Config::default(), transport))?;
// The client has an RPC method for each RPC defined in service!. It takes the same args
// as defined, with the addition of a Context, which is always the first arg. The Context
// specifies a deadline and trace information which can be helpful in debugging requests.
let hello = await!(client.hello(context::current(), "Stim".to_string()))?;
println!("{}", hello);
Ok(())
}
fn main() {
tarpc::init(TokioDefaultSpawner);
tokio::run(run()
.map_err(|e| eprintln!("Oh no: {}", e))
.boxed()
.compat()
);
}

View File

@@ -0,0 +1,84 @@
// Copyright 2018 Google LLC
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
#![feature(futures_api, arbitrary_self_types, await_macro, async_await)]
use clap::{App, Arg};
use futures::{
compat::Executor01CompatExt,
future::{self, Ready},
prelude::*,
};
use std::{io, net::SocketAddr};
use tarpc::{
context,
server::{Handler, Server},
};
// This is the type that implements the generated Service trait. It is the business logic
// and is used to start the server.
#[derive(Clone)]
struct HelloServer;
impl service::Service for HelloServer {
// Each defined rpc generates two items in the trait, a fn that serves the RPC, and
// an associated type representing the future output by the fn.
type HelloFut = Ready<String>;
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
future::ready(format!("Hello, {}!", name))
}
}
async fn run(server_addr: SocketAddr) -> io::Result<()> {
// bincode_transport is provided by the associated crate bincode-transport. It makes it easy
// to start up a serde-powered bincode serialization strategy over TCP.
let transport = bincode_transport::listen(&server_addr)?;
// The server is configured with the defaults.
let server = Server::default()
// Server can listen on any type that implements the Transport trait.
.incoming(transport)
// serve is generated by the service! macro. It takes as input any type implementing
// the generated Service trait.
.respond_with(service::serve(HelloServer));
await!(server);
Ok(())
}
fn main() {
let flags = App::new("Hello Server")
.version("0.1")
.author("Tim <tikue@google.com>")
.about("Say hello!")
.arg(
Arg::with_name("port")
.short("p")
.long("port")
.value_name("NUMBER")
.help("Sets the port number to listen on")
.required(true)
.takes_value(true),
)
.get_matches();
let port = flags.value_of("port").unwrap();
let port = port
.parse()
.unwrap_or_else(|e| panic!(r#"--port value "{}" invalid: {}"#, port, e));
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(
run(([0, 0, 0, 0], port).into())
.map_err(|e| eprintln!("Oh no: {}", e))
.boxed()
.compat(),
);
}

View File

@@ -96,7 +96,7 @@ do
diff="$diff$(cargo fmt -- --skip-children --write-mode=diff $file)"
fi
done
if grep --quiet "^Diff at line" <<< "$diff"; then
if grep --quiet "^[-+]" <<< "$diff"; then
FMTRESULT=1
fi

View File

@@ -1,6 +1,6 @@
[package]
name = "tarpc-plugins"
version = "0.5.0"
version = "0.5.1"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
license = "MIT"
documentation = "https://docs.rs/tarpc-plugins"
@@ -15,7 +15,7 @@ description = "Proc macros for tarpc."
travis-ci = { repository = "google/tarpc" }
[dependencies]
itertools = "0.7"
itertools = "0.8"
syn = { version = "0.15", features = ["full", "extra-traits"] }
quote = "0.6"
proc-macro2 = "0.4"

View File

@@ -1 +1 @@
edition = "Edition2018"
edition = "2018"

View File

@@ -4,31 +4,35 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
extern crate itertools;
extern crate proc_macro;
extern crate proc_macro2;
extern crate syn;
extern crate itertools;
extern crate quote;
extern crate syn;
use proc_macro::TokenStream;
use itertools::Itertools;
use quote::ToTokens;
use syn::{Ident, TraitItemType, TypePath, parse};
use proc_macro2::Span;
use quote::ToTokens;
use std::str::FromStr;
use syn::{parse, Ident, TraitItemType, TypePath};
#[proc_macro]
pub fn snake_to_camel(input: TokenStream) -> TokenStream {
let i = input.clone();
let mut assoc_type = parse::<TraitItemType>(input).unwrap_or_else(|_| panic!("Could not parse trait item from:\n{}", i));
let mut assoc_type = parse::<TraitItemType>(input)
.unwrap_or_else(|_| panic!("Could not parse trait item from:\n{}", i));
let old_ident = convert(&mut assoc_type.ident);
for mut attr in &mut assoc_type.attrs {
if let Some(pair) = attr.path.segments.first() {
if pair.value().ident == "doc" {
attr.tts = proc_macro2::TokenStream::from_str(&attr.tts.to_string().replace("{}", &old_ident)).unwrap();
attr.tts = proc_macro2::TokenStream::from_str(
&attr.tts.to_string().replace("{}", &old_ident),
)
.unwrap();
}
}
}
@@ -41,12 +45,7 @@ pub fn ty_snake_to_camel(input: TokenStream) -> TokenStream {
let mut path = parse::<TypePath>(input).unwrap();
// Only capitalize the final segment
convert(&mut path.path
.segments
.last_mut()
.unwrap()
.into_value()
.ident);
convert(&mut path.path.segments.last_mut().unwrap().into_value().ident);
path.into_token_stream().into()
}

View File

@@ -1,8 +1,6 @@
cargo-features = ["rename-dependency"]
[package]
name = "tarpc-lib"
version = "0.1.0"
version = "0.3.0"
authors = ["Tim Kuehn <tikue@google.com>"]
edition = '2018'
license = "MIT"
@@ -22,17 +20,17 @@ serde1 = ["trace/serde", "serde", "serde/derive"]
fnv = "1.0"
humantime = "1.0"
log = "0.4"
pin-utils = "0.1.0-alpha.2"
rand = "0.5"
pin-utils = "0.1.0-alpha.4"
rand = "0.6"
tokio-timer = "0.2"
trace = { package = "tarpc-trace", version = "0.1", path = "../trace" }
trace = { package = "tarpc-trace", version = "0.2", path = "../trace" }
serde = { optional = true, version = "1.0" }
[target.'cfg(not(test))'.dependencies]
futures-preview = { version = "0.3.0-alpha.8", features = ["compat"] }
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] }
[dev-dependencies]
futures-preview = { version = "0.3.0-alpha.8", features = ["compat", "tokio-compat"] }
futures-test-preview = { version = "0.3.0-alpha.8" }
env_logger = "0.5"
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] }
futures-test-preview = { version = "0.3.0-alpha.13" }
env_logger = "0.6"
tokio = "0.1"

View File

@@ -1 +1 @@
edition = "Edition2018"
edition = "2018"

View File

@@ -7,22 +7,23 @@
use crate::{
context,
util::{deadline_compat, AsDuration, Compact},
ClientMessage, ClientMessageKind, Request, Response, Transport,
ClientMessage, ClientMessageKind, PollIo, Request, Response, Transport,
};
use fnv::FnvHashMap;
use futures::{
Poll,
channel::{mpsc, oneshot},
prelude::*,
ready,
stream::Fuse,
task::LocalWaker,
task::Waker,
Poll,
};
use humantime::format_rfc3339;
use log::{debug, error, info, trace};
use pin_utils::unsafe_pinned;
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::{
io,
marker::{self, Unpin},
net::SocketAddr,
pin::Pin,
sync::{
@@ -37,7 +38,7 @@ use super::Config;
/// Handles communication from the client to request dispatch.
#[derive(Debug)]
pub(crate) struct Channel<Req, Resp> {
pub struct Channel<Req, Resp> {
to_dispatch: mpsc::Sender<DispatchRequest<Req, Resp>>,
/// Channel to send a cancel message to the dispatcher.
cancellation: RequestCancellation,
@@ -57,14 +58,58 @@ impl<Req, Resp> Clone for Channel<Req, Resp> {
}
}
/// A future returned by [`Channel::send`] that resolves to a server response.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
struct Send<'a, Req, Resp> {
fut: MapOkDispatchResponse<SendMapErrConnectionReset<'a, Req, Resp>, Resp>,
}
type SendMapErrConnectionReset<'a, Req, Resp> =
MapErrConnectionReset<futures::sink::Send<'a, mpsc::Sender<DispatchRequest<Req, Resp>>>>;
impl<'a, Req, Resp> Send<'a, Req, Resp> {
unsafe_pinned!(
fut: MapOkDispatchResponse<
MapErrConnectionReset<
futures::sink::Send<'a, mpsc::Sender<DispatchRequest<Req, Resp>>>,
>,
Resp,
>
);
}
impl<'a, Req, Resp> Future for Send<'a, Req, Resp> {
type Output = io::Result<DispatchResponse<Resp>>;
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
self.as_mut().fut().poll(waker)
}
}
/// A future returned by [`Channel::call`] that resolves to a server response.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct Call<'a, Req, Resp> {
fut: AndThenIdent<Send<'a, Req, Resp>, DispatchResponse<Resp>>,
}
impl<'a, Req, Resp> Call<'a, Req, Resp> {
unsafe_pinned!(fut: AndThenIdent<Send<'a, Req, Resp>, DispatchResponse<Resp>>);
}
impl<'a, Req, Resp> Future for Call<'a, Req, Resp> {
type Output = io::Result<Resp>;
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
self.as_mut().fut().poll(waker)
}
}
impl<Req, Resp> Channel<Req, Resp> {
/// Sends a request to the dispatch task to forward to the server, returning a [`Future`] that
/// resolves when the request is sent (not when the response is received).
pub(crate) async fn send(
&mut self,
mut ctx: context::Context,
request: Req,
) -> io::Result<DispatchResponse<Resp>> {
fn send(&mut self, mut ctx: context::Context, request: Req) -> Send<Req, Resp> {
// Convert the context to the call context.
ctx.trace_context.parent_id = Some(ctx.trace_context.span_id);
ctx.trace_context.span_id = SpanId::random(&mut rand::thread_rng());
@@ -82,38 +127,40 @@ impl<Req, Resp> Channel<Req, Resp> {
let (response_completion, response) = oneshot::channel();
let cancellation = self.cancellation.clone();
let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed);
await!(self.to_dispatch.send(DispatchRequest {
ctx,
request_id,
request,
response_completion,
})).map_err(|_| io::Error::from(io::ErrorKind::ConnectionReset))?;
Ok(DispatchResponse {
response: deadline_compat::Deadline::new(response, deadline),
complete: false,
request_id,
cancellation,
ctx,
server_addr: self.server_addr,
})
let server_addr = self.server_addr;
Send {
fut: MapOkDispatchResponse::new(
MapErrConnectionReset::new(self.to_dispatch.send(DispatchRequest {
ctx,
request_id,
request,
response_completion,
})),
DispatchResponse {
response: deadline_compat::Deadline::new(response, deadline),
complete: false,
request_id,
cancellation,
ctx,
server_addr,
},
),
}
}
/// Sends a request to the dispatch task to forward to the server, returning a [`Future`] that
/// resolves to the response.
pub(crate) async fn call(
&mut self,
context: context::Context,
request: Req,
) -> io::Result<Resp> {
let response_future = await!(self.send(context, request))?;
await!(response_future)
pub fn call(&mut self, context: context::Context, request: Req) -> Call<Req, Resp> {
Call {
fut: AndThenIdent::new(self.send(context, request)),
}
}
}
/// A server response that is completed by request dispatch when the corresponding response
/// arrives off the wire.
#[derive(Debug)]
pub struct DispatchResponse<Resp> {
struct DispatchResponse<Resp> {
response: deadline_compat::Deadline<oneshot::Receiver<Response<Resp>>>,
ctx: context::Context,
complete: bool,
@@ -130,7 +177,7 @@ impl<Resp> DispatchResponse<Resp> {
impl<Resp> Future for DispatchResponse<Resp> {
type Output = io::Result<Resp>;
fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<Resp>> {
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<Resp>> {
let resp = ready!(self.response.poll_unpin(waker));
self.complete = true;
@@ -138,8 +185,8 @@ impl<Resp> Future for DispatchResponse<Resp> {
Poll::Ready(match resp {
Ok(resp) => Ok(resp.message?),
Err(e) => Err({
let trace_id = *self.ctx().trace_id();
let server_addr = *self.server_addr();
let trace_id = *self.as_mut().ctx().trace_id();
let server_addr = *self.as_mut().server_addr();
if e.is_elapsed() {
io::Error::new(
@@ -205,9 +252,9 @@ pub async fn spawn<Req, Resp, C>(
server_addr: SocketAddr,
) -> io::Result<Channel<Req, Resp>>
where
Req: Send,
Resp: Send,
C: Transport<Item = Response<Resp>, SinkItem = ClientMessage<Req>> + Send,
Req: marker::Send + 'static,
Resp: marker::Send + 'static,
C: Transport<Item = Response<Resp>, SinkItem = ClientMessage<Req>> + marker::Send + 'static,
{
let (to_dispatch, pending_requests) = mpsc::channel(config.pending_request_buffer);
let (cancellation, canceled_requests) = cancellations();
@@ -220,16 +267,18 @@ where
transport: transport.fuse(),
in_flight_requests: FnvHashMap::default(),
pending_requests: pending_requests.fuse(),
}.unwrap_or_else(move |e| error!("[{}] Connection broken: {}", server_addr, e))
).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!(
"Could not spawn client dispatch task. Is shutdown: {}",
e.is_shutdown()
),
)
})?;
}
.unwrap_or_else(move |e| error!("[{}] Connection broken: {}", server_addr, e)),
)
.map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!(
"Could not spawn client dispatch task. Is shutdown: {}",
e.is_shutdown()
),
)
})?;
Ok(Channel {
to_dispatch,
@@ -258,8 +307,8 @@ struct RequestDispatch<Req, Resp, C> {
impl<Req, Resp, C> RequestDispatch<Req, Resp, C>
where
Req: Send,
Resp: Send,
Req: marker::Send,
Resp: marker::Send,
C: Transport<Item = Response<Resp>, SinkItem = ClientMessage<Req>>,
{
unsafe_pinned!(server_addr: SocketAddr);
@@ -268,20 +317,20 @@ where
unsafe_pinned!(pending_requests: Fuse<mpsc::Receiver<DispatchRequest<Req, Resp>>>);
unsafe_pinned!(transport: Fuse<C>);
fn pump_read(self: &mut Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<io::Result<()>>> {
Poll::Ready(match ready!(self.transport().poll_next(waker)?) {
fn pump_read(self: &mut Pin<&mut Self>, waker: &Waker) -> PollIo<()> {
Poll::Ready(match ready!(self.as_mut().transport().poll_next(waker)?) {
Some(response) => {
self.complete(response);
Some(Ok(()))
}
None => {
trace!("[{}] read half closed", self.server_addr());
trace!("[{}] read half closed", self.as_mut().server_addr());
None
}
})
}
fn pump_write(self: &mut Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<io::Result<()>>> {
fn pump_write(self: &mut Pin<&mut Self>, waker: &Waker) -> PollIo<()> {
enum ReceiverStatus {
NotReady,
Closed,
@@ -307,12 +356,12 @@ where
match (pending_requests_status, canceled_requests_status) {
(ReceiverStatus::Closed, ReceiverStatus::Closed) => {
ready!(self.transport().poll_flush(waker)?);
ready!(self.as_mut().transport().poll_flush(waker)?);
Poll::Ready(None)
}
(ReceiverStatus::NotReady, _) | (_, ReceiverStatus::NotReady) => {
// No more messages to process, so flush any messages buffered in the transport.
ready!(self.transport().poll_flush(waker)?);
ready!(self.as_mut().transport().poll_flush(waker)?);
// Even if we fully-flush, we return Pending, because we have no more requests
// or cancellations right now.
@@ -324,12 +373,12 @@ where
/// Yields the next pending request, if one is ready to be sent.
fn poll_next_request(
self: &mut Pin<&mut Self>,
waker: &LocalWaker,
) -> Poll<Option<io::Result<DispatchRequest<Req, Resp>>>> {
if self.in_flight_requests().len() >= self.config.max_in_flight_requests {
waker: &Waker,
) -> PollIo<DispatchRequest<Req, Resp>> {
if self.as_mut().in_flight_requests().len() >= self.config.max_in_flight_requests {
info!(
"At in-flight request capacity ({}/{}).",
self.in_flight_requests().len(),
self.as_mut().in_flight_requests().len(),
self.config.max_in_flight_requests
);
@@ -338,13 +387,13 @@ where
return Poll::Pending;
}
while let Poll::Pending = self.transport().poll_ready(waker)? {
while let Poll::Pending = self.as_mut().transport().poll_ready(waker)? {
// We can't yield a request-to-be-sent before the transport is capable of buffering it.
ready!(self.transport().poll_flush(waker)?);
ready!(self.as_mut().transport().poll_flush(waker)?);
}
loop {
match ready!(self.pending_requests().poll_next_unpin(waker)) {
match ready!(self.as_mut().pending_requests().poll_next_unpin(waker)) {
Some(request) => {
if request.response_completion.is_canceled() {
trace!(
@@ -357,7 +406,7 @@ where
return Poll::Ready(Some(Ok(request)));
}
None => {
trace!("[{}] pending_requests closed", self.server_addr());
trace!("[{}] pending_requests closed", self.as_mut().server_addr());
return Poll::Ready(None);
}
}
@@ -367,29 +416,34 @@ where
/// Yields the next pending cancellation, and, if one is ready, cancels the associated request.
fn poll_next_cancellation(
self: &mut Pin<&mut Self>,
waker: &LocalWaker,
) -> Poll<Option<io::Result<(context::Context, u64)>>> {
while let Poll::Pending = self.transport().poll_ready(waker)? {
ready!(self.transport().poll_flush(waker)?);
waker: &Waker,
) -> PollIo<(context::Context, u64)> {
while let Poll::Pending = self.as_mut().transport().poll_ready(waker)? {
ready!(self.as_mut().transport().poll_flush(waker)?);
}
loop {
match ready!(self.canceled_requests().poll_next_unpin(waker)) {
match ready!(self.as_mut().canceled_requests().poll_next_unpin(waker)) {
Some(request_id) => {
if let Some(in_flight_data) = self.in_flight_requests().remove(&request_id) {
self.in_flight_requests().compact(0.1);
if let Some(in_flight_data) =
self.as_mut().in_flight_requests().remove(&request_id)
{
self.as_mut().in_flight_requests().compact(0.1);
debug!(
"[{}/{}] Removed request.",
in_flight_data.ctx.trace_id(),
self.server_addr()
self.as_mut().server_addr()
);
return Poll::Ready(Some(Ok((in_flight_data.ctx, request_id))));
}
}
None => {
trace!("[{}] canceled_requests closed.", self.server_addr());
trace!(
"[{}] canceled_requests closed.",
self.as_mut().server_addr()
);
return Poll::Ready(None);
}
}
@@ -409,8 +463,8 @@ where
deadline: dispatch_request.ctx.deadline,
}),
};
self.transport().start_send(request)?;
self.in_flight_requests().insert(
self.as_mut().transport().start_send(request)?;
self.as_mut().in_flight_requests().insert(
request_id,
InFlightData {
ctx: dispatch_request.ctx,
@@ -430,20 +484,28 @@ where
trace_context: context.trace_context,
message: ClientMessageKind::Cancel { request_id },
};
self.transport().start_send(cancel)?;
trace!("[{}/{}] Cancel message sent.", trace_id, self.server_addr());
return Ok(());
self.as_mut().transport().start_send(cancel)?;
trace!(
"[{}/{}] Cancel message sent.",
trace_id,
self.as_mut().server_addr()
);
Ok(())
}
/// Sends a server response to the client task that initiated the associated request.
fn complete(self: &mut Pin<&mut Self>, response: Response<Resp>) -> bool {
if let Some(in_flight_data) = self.in_flight_requests().remove(&response.request_id) {
self.in_flight_requests().compact(0.1);
if let Some(in_flight_data) = self
.as_mut()
.in_flight_requests()
.remove(&response.request_id)
{
self.as_mut().in_flight_requests().compact(0.1);
trace!(
"[{}/{}] Received response.",
in_flight_data.ctx.trace_id(),
self.server_addr()
self.as_mut().server_addr()
);
let _ = in_flight_data.response_completion.send(response);
return true;
@@ -451,7 +513,7 @@ where
debug!(
"[{}] No in-flight request found for request_id = {}.",
self.server_addr(),
self.as_mut().server_addr(),
response.request_id
);
@@ -462,21 +524,21 @@ where
impl<Req, Resp, C> Future for RequestDispatch<Req, Resp, C>
where
Req: Send,
Resp: Send,
Req: marker::Send,
Resp: marker::Send,
C: Transport<Item = Response<Resp>, SinkItem = ClientMessage<Req>>,
{
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
trace!("[{}] RequestDispatch::poll", self.server_addr());
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<()>> {
trace!("[{}] RequestDispatch::poll", self.as_mut().server_addr());
loop {
match (self.pump_read(waker)?, self.pump_write(waker)?) {
(read, write @ Poll::Ready(None)) => {
if self.in_flight_requests().is_empty() {
if self.as_mut().in_flight_requests().is_empty() {
info!(
"[{}] Shutdown: write half closed, and no requests in flight.",
self.server_addr()
self.as_mut().server_addr()
);
return Poll::Ready(Ok(()));
}
@@ -485,7 +547,7 @@ where
_ => {
trace!(
"[{}] read: {:?}, write: {:?}, (not ready)",
self.server_addr(),
self.as_mut().server_addr(),
read,
write,
);
@@ -496,7 +558,7 @@ where
(read @ Poll::Ready(Some(())), write) | (read, write @ Poll::Ready(Some(()))) => {
trace!(
"[{}] read: {:?}, write: {:?}",
self.server_addr(),
self.as_mut().server_addr(),
read,
write,
)
@@ -504,7 +566,7 @@ where
(read, write) => {
trace!(
"[{}] read: {:?}, write: {:?} (not ready)",
self.server_addr(),
self.as_mut().server_addr(),
read,
write,
);
@@ -558,11 +620,191 @@ impl RequestCancellation {
impl Stream for CanceledRequests {
type Item = u64;
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<u64>> {
fn poll_next(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<u64>> {
self.0.poll_next_unpin(waker)
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
struct MapErrConnectionReset<Fut> {
future: Fut,
finished: Option<()>,
}
impl<Fut> MapErrConnectionReset<Fut> {
unsafe_pinned!(future: Fut);
unsafe_unpinned!(finished: Option<()>);
fn new(future: Fut) -> MapErrConnectionReset<Fut> {
MapErrConnectionReset {
future,
finished: Some(()),
}
}
}
impl<Fut: Unpin> Unpin for MapErrConnectionReset<Fut> {}
impl<Fut> Future for MapErrConnectionReset<Fut>
where
Fut: TryFuture,
{
type Output = io::Result<Fut::Ok>;
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
match self.as_mut().future().try_poll(waker) {
Poll::Pending => Poll::Pending,
Poll::Ready(result) => {
self.finished().take().expect(
"MapErrConnectionReset must not be polled after it returned `Poll::Ready`",
);
Poll::Ready(result.map_err(|_| io::Error::from(io::ErrorKind::ConnectionReset)))
}
}
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
struct MapOkDispatchResponse<Fut, Resp> {
future: Fut,
response: Option<DispatchResponse<Resp>>,
}
impl<Fut, Resp> MapOkDispatchResponse<Fut, Resp> {
unsafe_pinned!(future: Fut);
unsafe_unpinned!(response: Option<DispatchResponse<Resp>>);
fn new(future: Fut, response: DispatchResponse<Resp>) -> MapOkDispatchResponse<Fut, Resp> {
MapOkDispatchResponse {
future,
response: Some(response),
}
}
}
impl<Fut: Unpin, Resp> Unpin for MapOkDispatchResponse<Fut, Resp> {}
impl<Fut, Resp> Future for MapOkDispatchResponse<Fut, Resp>
where
Fut: TryFuture,
{
type Output = Result<DispatchResponse<Resp>, Fut::Error>;
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
match self.as_mut().future().try_poll(waker) {
Poll::Pending => Poll::Pending,
Poll::Ready(result) => {
let response = self
.as_mut()
.response()
.take()
.expect("MapOk must not be polled after it returned `Poll::Ready`");
Poll::Ready(result.map(|_| response))
}
}
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
struct AndThenIdent<Fut1, Fut2> {
try_chain: TryChain<Fut1, Fut2>,
}
impl<Fut1, Fut2> AndThenIdent<Fut1, Fut2>
where
Fut1: TryFuture<Ok = Fut2>,
Fut2: TryFuture,
{
unsafe_pinned!(try_chain: TryChain<Fut1, Fut2>);
/// Creates a new `Then`.
fn new(future: Fut1) -> AndThenIdent<Fut1, Fut2> {
AndThenIdent {
try_chain: TryChain::new(future),
}
}
}
impl<Fut1, Fut2> Future for AndThenIdent<Fut1, Fut2>
where
Fut1: TryFuture<Ok = Fut2>,
Fut2: TryFuture<Error = Fut1::Error>,
{
type Output = Result<Fut2::Ok, Fut2::Error>;
fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
self.try_chain().poll(waker, |result| match result {
Ok(ok) => TryChainAction::Future(ok),
Err(err) => TryChainAction::Output(Err(err)),
})
}
}
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
enum TryChain<Fut1, Fut2> {
First(Fut1),
Second(Fut2),
Empty,
}
enum TryChainAction<Fut2>
where
Fut2: TryFuture,
{
Future(Fut2),
Output(Result<Fut2::Ok, Fut2::Error>),
}
impl<Fut1, Fut2> TryChain<Fut1, Fut2>
where
Fut1: TryFuture<Ok = Fut2>,
Fut2: TryFuture,
{
fn new(fut1: Fut1) -> TryChain<Fut1, Fut2> {
TryChain::First(fut1)
}
fn poll<F>(self: Pin<&mut Self>, waker: &Waker, f: F) -> Poll<Result<Fut2::Ok, Fut2::Error>>
where
F: FnOnce(Result<Fut1::Ok, Fut1::Error>) -> TryChainAction<Fut2>,
{
let mut f = Some(f);
// Safe to call `get_unchecked_mut` because we won't move the futures.
let this = unsafe { Pin::get_unchecked_mut(self) };
loop {
let output = match this {
TryChain::First(fut1) => {
// Poll the first future
match unsafe { Pin::new_unchecked(fut1) }.try_poll(waker) {
Poll::Pending => return Poll::Pending,
Poll::Ready(output) => output,
}
}
TryChain::Second(fut2) => {
// Poll the second future
return unsafe { Pin::new_unchecked(fut2) }.try_poll(waker);
}
TryChain::Empty => {
panic!("future must not be polled after it returned `Poll::Ready`");
}
};
*this = TryChain::Empty; // Drop fut1
let f = f.take().unwrap();
match f(output) {
TryChainAction::Future(fut2) => *this = TryChain::Second(fut2),
TryChainAction::Output(output) => return Poll::Ready(output),
}
}
}
}
#[cfg(test)]
mod tests {
use super::{CanceledRequests, Channel, RequestCancellation, RequestDispatch};
@@ -573,9 +815,10 @@ mod tests {
ClientMessage, Response,
};
use fnv::FnvHashMap;
use futures::{Poll, channel::mpsc, prelude::*};
use futures_test::task::{noop_local_waker_ref};
use futures::{channel::mpsc, prelude::*, Poll};
use futures_test::task::noop_waker_ref;
use std::{
marker,
net::{IpAddr, Ipv4Addr, SocketAddr},
pin::Pin,
sync::atomic::AtomicU64,
@@ -596,7 +839,7 @@ mod tests {
);
let mut dispatch = Pin::new(&mut dispatch);
let waker = &noop_local_waker_ref();
let waker = &noop_waker_ref();
let req = dispatch.poll_next_request(waker).ready();
assert!(req.is_some());
@@ -617,12 +860,13 @@ mod tests {
.send(context::current(), "hi".into())
.boxed()
.compat(),
).unwrap();
)
.unwrap();
drop(resp);
drop(channel);
let mut dispatch = Pin::new(&mut dispatch);
let waker = &noop_local_waker_ref();
let waker = &noop_waker_ref();
dispatch.poll_next_cancellation(waker).unwrap();
assert!(dispatch.poll_next_request(waker).ready().is_none());
@@ -640,12 +884,13 @@ mod tests {
.send(context::current(), "hi".into())
.boxed()
.compat(),
).unwrap();
)
.unwrap();
drop(resp);
drop(channel);
let mut dispatch = Pin::new(&mut dispatch);
let waker = &noop_local_waker_ref();
let waker = &noop_waker_ref();
assert!(dispatch.poll_next_request(waker).ready().is_none());
}
@@ -688,7 +933,7 @@ mod tests {
impl<T, E> PollTest for Poll<Option<Result<T, E>>>
where
E: ::std::fmt::Display + Send + 'static,
E: ::std::fmt::Display + marker::Send + 'static,
{
type T = Option<T>;

View File

@@ -6,27 +6,103 @@
//! Provides a client that connects to a server and sends multiplexed requests.
use crate::{context::Context, ClientMessage, Response, Transport};
use crate::{context, ClientMessage, Response, Transport};
use futures::prelude::*;
use log::warn;
use std::{
io,
net::{Ipv4Addr, SocketAddr},
};
mod dispatch;
/// Provides a [`Client`] backed by a transport.
pub mod channel;
pub use self::channel::Channel;
/// Sends multiplexed requests to, and receives responses from, a server.
#[derive(Debug)]
pub struct Client<Req, Resp> {
/// Channel to send requests to the dispatch task.
channel: dispatch::Channel<Req, Resp>,
pub trait Client<'a, Req> {
/// The response type.
type Response;
/// The future response.
type Future: Future<Output = io::Result<Self::Response>> + 'a;
/// Initiates a request, sending it to the dispatch task.
///
/// Returns a [`Future`] that resolves to this client and the future response
/// once the request is successfully enqueued.
///
/// [`Future`]: futures::Future
fn call(&'a mut self, ctx: context::Context, request: Req) -> Self::Future;
/// Returns a Client that applies a post-processing function to the returned response.
fn map_response<F, R>(self, f: F) -> MapResponse<Self, F>
where
F: FnMut(Self::Response) -> R,
Self: Sized,
{
MapResponse { inner: self, f }
}
/// Returns a Client that applies a pre-processing function to the request.
fn with_request<F, Req2>(self, f: F) -> WithRequest<Self, F>
where
F: FnMut(Req2) -> Req,
Self: Sized,
{
WithRequest { inner: self, f }
}
}
impl<Req, Resp> Clone for Client<Req, Resp> {
fn clone(&self) -> Self {
Client {
channel: self.channel.clone(),
}
/// A Client that applies a function to the returned response.
#[derive(Clone, Debug)]
pub struct MapResponse<C, F> {
inner: C,
f: F,
}
impl<'a, C, F, Req, Resp, Resp2> Client<'a, Req> for MapResponse<C, F>
where
C: Client<'a, Req, Response = Resp>,
F: FnMut(Resp) -> Resp2 + 'a,
{
type Response = Resp2;
type Future = futures::future::MapOk<<C as Client<'a, Req>>::Future, &'a mut F>;
fn call(&'a mut self, ctx: context::Context, request: Req) -> Self::Future {
self.inner.call(ctx, request).map_ok(&mut self.f)
}
}
/// A Client that applies a pre-processing function to the request.
#[derive(Clone, Debug)]
pub struct WithRequest<C, F> {
inner: C,
f: F,
}
impl<'a, C, F, Req, Req2, Resp> Client<'a, Req2> for WithRequest<C, F>
where
C: Client<'a, Req, Response = Resp>,
F: FnMut(Req2) -> Req,
{
type Response = Resp;
type Future = <C as Client<'a, Req>>::Future;
fn call(&'a mut self, ctx: context::Context, request: Req2) -> Self::Future {
self.inner.call(ctx, (self.f)(request))
}
}
impl<'a, Req, Resp> Client<'a, Req> for Channel<Req, Resp>
where
Req: 'a,
Resp: 'a,
{
type Response = Resp;
type Future = channel::Call<'a, Req, Resp>;
fn call(&'a mut self, ctx: context::Context, request: Req) -> channel::Call<'a, Req, Resp> {
self.call(ctx, request)
}
}
@@ -53,39 +129,23 @@ impl Default for Config {
}
}
impl<Req, Resp> Client<Req, Resp>
/// Creates a new Client by wrapping a [`Transport`] and spawning a dispatch task
/// that manages the lifecycle of requests.
///
/// Must only be called from on an executor.
pub async fn new<Req, Resp, T>(config: Config, transport: T) -> io::Result<Channel<Req, Resp>>
where
Req: Send,
Resp: Send,
Req: Send + 'static,
Resp: Send + 'static,
T: Transport<Item = Response<Resp>, SinkItem = ClientMessage<Req>> + Send + 'static,
{
/// Creates a new Client by wrapping a [`Transport`] and spawning a dispatch task
/// that manages the lifecycle of requests.
///
/// Must only be called from on an executor.
pub async fn new<T>(config: Config, transport: T) -> io::Result<Self>
where
T: Transport<Item = Response<Resp>, SinkItem = ClientMessage<Req>> + Send,
{
let server_addr = transport.peer_addr().unwrap_or_else(|e| {
warn!(
"Setting peer to unspecified because peer could not be determined: {}",
e
);
SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0)
});
let server_addr = transport.peer_addr().unwrap_or_else(|e| {
warn!(
"Setting peer to unspecified because peer could not be determined: {}",
e
);
SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0)
});
Ok(Client {
channel: await!(dispatch::spawn(config, transport, server_addr))?,
})
}
/// Initiates a request, sending it to the dispatch task.
///
/// Returns a [`Future`] that resolves to this client and the future response
/// once the request is successfully enqueued.
///
/// [`Future`]: futures::Future
pub async fn call(&mut self, ctx: Context, request: Req) -> io::Result<Resp> {
await!(self.channel.call(ctx, request))
}
Ok(await!(channel::spawn(config, transport, server_addr))?)
}

View File

@@ -5,21 +5,13 @@
// https://opensource.org/licenses/MIT.
#![feature(
const_fn,
non_exhaustive,
integer_atomics,
try_trait,
nll,
futures_api,
pin,
arbitrary_self_types,
await_macro,
async_await,
generators,
optin_builtin_traits,
generator_trait,
gen_future,
decl_macro,
async_await
)]
#![deny(missing_docs, missing_debug_implementations)]
@@ -49,15 +41,15 @@ pub(crate) mod util;
pub use crate::{client::Client, server::Server, transport::Transport};
use futures::{Future, task::{Spawn, SpawnExt, SpawnError}};
use futures::{
task::{Poll, Spawn, SpawnError, SpawnExt},
Future,
};
use std::{cell::RefCell, io, sync::Once, time::SystemTime};
/// A message from a client to a server.
#[derive(Debug)]
#[cfg_attr(
feature = "serde1",
derive(serde::Serialize, serde::Deserialize)
)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub struct ClientMessage<T> {
/// The trace context associates the message with a specific chain of causally-related actions,
@@ -69,10 +61,7 @@ pub struct ClientMessage<T> {
/// Different messages that can be sent from a client to a server.
#[derive(Debug)]
#[cfg_attr(
feature = "serde1",
derive(serde::Serialize, serde::Deserialize)
)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub enum ClientMessageKind<T> {
/// A request initiated by a user. The server responds to a request by invoking a
@@ -94,10 +83,7 @@ pub enum ClientMessageKind<T> {
/// A request from a client to a server.
#[derive(Debug)]
#[cfg_attr(
feature = "serde1",
derive(serde::Serialize, serde::Deserialize)
)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub struct Request<T> {
/// Uniquely identifies the request across all requests sent over a single channel.
@@ -119,10 +105,7 @@ pub struct Request<T> {
/// A response from a server to a client.
#[derive(Debug, PartialEq, Eq)]
#[cfg_attr(
feature = "serde1",
derive(serde::Serialize, serde::Deserialize)
)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub struct Response<T> {
/// The ID of the request being responded to.
@@ -133,10 +116,7 @@ pub struct Response<T> {
/// An error response from a server to a client.
#[derive(Debug, PartialEq, Eq)]
#[cfg_attr(
feature = "serde1",
derive(serde::Serialize, serde::Deserialize)
)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub struct ServerError {
#[cfg_attr(
@@ -166,6 +146,8 @@ impl<T> Request<T> {
}
}
pub(crate) type PollIo<T> = Poll<Option<io::Result<T>>>;
static INIT: Once = Once::new();
static mut SEED_SPAWN: Option<Box<dyn CloneSpawn>> = None;
thread_local! {
@@ -193,9 +175,7 @@ pub fn init(spawn: impl Spawn + Clone + 'static) {
}
pub(crate) fn spawn(future: impl Future<Output = ()> + Send + 'static) -> Result<(), SpawnError> {
SPAWN.with(|spawn| {
spawn.borrow_mut().spawn(future)
})
SPAWN.with(|spawn| spawn.borrow_mut().spawn(future))
}
trait CloneSpawn: Spawn {

View File

@@ -7,10 +7,16 @@
use crate::{
server::{Channel, Config},
util::Compact,
ClientMessage, Response, Transport,
ClientMessage, PollIo, Response, Transport,
};
use fnv::FnvHashMap;
use futures::{channel::mpsc, prelude::*, ready, stream::Fuse, task::{LocalWaker, Poll}};
use futures::{
channel::mpsc,
prelude::*,
ready,
stream::Fuse,
task::{Poll, Waker},
};
use log::{debug, error, info, trace, warn};
use pin_utils::unsafe_pinned;
use std::{
@@ -101,28 +107,28 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
}
};
let open_connections = *self.open_connections();
if open_connections >= self.config().max_connections {
let open_connections = *self.as_mut().open_connections();
if open_connections >= self.as_mut().config().max_connections {
warn!(
"[{}] Shedding connection because the maximum open connections \
limit is reached ({}/{}).",
peer,
open_connections,
self.config().max_connections
self.as_mut().config().max_connections
);
return NewConnection::Filtered;
}
let config = self.config.clone();
let open_connections_for_ip = self.increment_connections_for_ip(&peer)?;
*self.open_connections() += 1;
*self.as_mut().open_connections() += 1;
debug!(
"[{}] Opening channel ({}/{} connections for IP, {} total).",
peer,
open_connections_for_ip,
config.max_connections_per_ip,
self.open_connections(),
self.as_mut().open_connections(),
);
NewConnection::Accepted(Channel {
@@ -135,19 +141,19 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
}
fn handle_closed_connection(self: &mut Pin<&mut Self>, addr: &SocketAddr) {
*self.open_connections() -= 1;
*self.as_mut().open_connections() -= 1;
debug!(
"[{}] Closing channel. {} open connections remaining.",
addr, self.open_connections
);
self.decrement_connections_for_ip(&addr);
self.connections_per_ip().compact(0.1);
self.as_mut().connections_per_ip().compact(0.1);
}
fn increment_connections_for_ip(self: &mut Pin<&mut Self>, peer: &SocketAddr) -> Option<usize> {
let max_connections_per_ip = self.config().max_connections_per_ip;
let max_connections_per_ip = self.as_mut().config().max_connections_per_ip;
let mut occupied;
let mut connections_per_ip = self.connections_per_ip();
let mut connections_per_ip = self.as_mut().connections_per_ip();
let occupied = match connections_per_ip.entry(peer.ip()) {
Entry::Vacant(vacant) => vacant.insert(0),
Entry::Occupied(o) => {
@@ -171,7 +177,7 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
}
fn decrement_connections_for_ip(self: &mut Pin<&mut Self>, addr: &SocketAddr) {
let should_compact = match self.connections_per_ip().entry(addr.ip()) {
let should_compact = match self.as_mut().connections_per_ip().entry(addr.ip()) {
Entry::Vacant(_) => {
error!("[{}] Got vacant entry when closing connection.", addr);
return;
@@ -187,29 +193,23 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
}
};
if should_compact {
self.connections_per_ip().compact(0.1);
self.as_mut().connections_per_ip().compact(0.1);
}
}
fn poll_listener<C>(
self: &mut Pin<&mut Self>,
cx: &LocalWaker,
) -> Poll<Option<io::Result<NewConnection<Req, Resp, C>>>>
fn poll_listener<C>(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<NewConnection<Req, Resp, C>>
where
S: Stream<Item = Result<C, io::Error>>,
C: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
{
match ready!(self.listener().poll_next_unpin(cx)?) {
match ready!(self.as_mut().listener().poll_next_unpin(cx)?) {
Some(codec) => Poll::Ready(Some(Ok(self.handle_new_connection(codec)))),
None => Poll::Ready(None),
}
}
fn poll_closed_connections(
self: &mut Pin<&mut Self>,
cx: &LocalWaker,
) -> Poll<io::Result<()>> {
match ready!(self.closed_connections_rx().poll_next_unpin(cx)) {
fn poll_closed_connections(self: &mut Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
match ready!(self.as_mut().closed_connections_rx().poll_next_unpin(cx)) {
Some(addr) => {
self.handle_closed_connection(&addr);
Poll::Ready(Ok(()))
@@ -226,25 +226,28 @@ where
{
type Item = io::Result<Channel<Req, Resp, T>>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &LocalWaker,
) -> Poll<Option<io::Result<Channel<Req, Resp, T>>>> {
fn poll_next(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<Channel<Req, Resp, T>> {
loop {
match (self.poll_listener(cx)?, self.poll_closed_connections(cx)?) {
match (
self.as_mut().poll_listener(cx)?,
self.poll_closed_connections(cx)?,
) {
(Poll::Ready(Some(NewConnection::Accepted(channel))), _) => {
return Poll::Ready(Some(Ok(channel)))
return Poll::Ready(Some(Ok(channel)));
}
(Poll::Ready(Some(NewConnection::Filtered)), _) | (_, Poll::Ready(())) => {
trace!("Filtered a connection; {} open.", self.open_connections());
trace!(
"Filtered a connection; {} open.",
self.as_mut().open_connections()
);
continue;
}
(Poll::Pending, Poll::Pending) => return Poll::Pending,
(Poll::Ready(None), Poll::Pending) => {
if *self.open_connections() > 0 {
if *self.as_mut().open_connections() > 0 {
trace!(
"Listener closed; {} open connections.",
self.open_connections()
self.as_mut().open_connections()
);
return Poll::Pending;
}

View File

@@ -8,7 +8,7 @@
use crate::{
context::Context, util::deadline_compat, util::AsDuration, util::Compact, ClientMessage,
ClientMessageKind, Request, Response, ServerError, Transport,
ClientMessageKind, PollIo, Request, Response, ServerError, Transport,
};
use fnv::FnvHashMap;
use futures::{
@@ -17,7 +17,7 @@ use futures::{
prelude::*,
ready,
stream::Fuse,
task::{LocalWaker, Poll},
task::{Poll, Waker},
try_ready,
};
use humantime::format_rfc3339;
@@ -43,6 +43,12 @@ pub struct Server<Req, Resp> {
ghost: PhantomData<(Req, Resp)>,
}
impl<Req, Resp> Default for Server<Req, Resp> {
fn default() -> Self {
new(Config::default())
}
}
/// Settings that control the behavior of the server.
#[non_exhaustive]
#[derive(Clone, Debug)]
@@ -75,15 +81,15 @@ impl Default for Config {
}
}
impl<Req, Resp> Server<Req, Resp> {
/// Returns a new server with configuration specified `config`.
pub fn new(config: Config) -> Self {
Server {
config,
ghost: PhantomData,
}
/// Returns a new server with configuration specified `config`.
pub fn new<Req, Resp>(config: Config) -> Server<Req, Resp> {
Server {
config,
ghost: PhantomData,
}
}
impl<Req, Resp> Server<Req, Resp> {
/// Returns the config for this server.
pub fn config(&self) -> &Config {
&self.config
@@ -122,17 +128,18 @@ where
Req: Send + 'static,
Resp: Send + 'static,
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send + 'static,
F: FnMut(Context, Req) -> Fut + Send + 'static + Clone,
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<()> {
while let Some(channel) = ready!(self.incoming().poll_next(cx)) {
fn poll(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<()> {
while let Some(channel) = ready!(self.as_mut().incoming().poll_next(cx)) {
match channel {
Ok(channel) => {
let peer = channel.client_addr;
if let Err(e) = crate::spawn(channel.respond_with(self.request_handler().clone()))
if let Err(e) =
crate::spawn(channel.respond_with(self.as_mut().request_handler().clone()))
{
warn!("[{}] Failed to spawn connection handler: {:?}", peer, e);
}
@@ -143,7 +150,7 @@ where
}
}
info!("Server shutting down.");
return Poll::Ready(());
Poll::Ready(())
}
}
@@ -158,7 +165,7 @@ where
/// Responds to all requests with `request_handler`.
fn respond_with<F, Fut>(self, request_handler: F) -> Running<Self, F>
where
F: FnMut(Context, Req) -> Fut + Send + 'static + Clone,
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
{
Running {
@@ -174,7 +181,8 @@ where
Req: Send,
Resp: Send,
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
{}
{
}
/// Responds to all requests with `request_handler`.
/// The server end of an open connection with a client.
@@ -222,29 +230,20 @@ where
Req: Send,
Resp: Send,
{
pub(crate) fn start_send(self: &mut Pin<&mut Self>, response: Response<Resp>) -> io::Result<()> {
self.transport().start_send(response)
pub(crate) fn start_send(mut self: Pin<&mut Self>, response: Response<Resp>) -> io::Result<()> {
self.as_mut().transport().start_send(response)
}
pub(crate) fn poll_ready(
self: &mut Pin<&mut Self>,
cx: &LocalWaker,
) -> Poll<io::Result<()>> {
self.transport().poll_ready(cx)
pub(crate) fn poll_ready(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
self.as_mut().transport().poll_ready(cx)
}
pub(crate) fn poll_flush(
self: &mut Pin<&mut Self>,
cx: &LocalWaker,
) -> Poll<io::Result<()>> {
self.transport().poll_flush(cx)
pub(crate) fn poll_flush(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
self.as_mut().transport().poll_flush(cx)
}
pub(crate) fn poll_next(
self: &mut Pin<&mut Self>,
cx: &LocalWaker,
) -> Poll<Option<io::Result<ClientMessage<Req>>>> {
self.transport().poll_next(cx)
pub(crate) fn poll_next(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<ClientMessage<Req>> {
self.as_mut().transport().poll_next(cx)
}
/// Returns the address of the client connected to the channel.
@@ -256,7 +255,7 @@ where
/// responses and resolves when the connection is closed.
pub fn respond_with<F, Fut>(self, f: F) -> impl Future<Output = ()>
where
F: FnMut(Context, Req) -> Fut + Send + 'static,
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
Req: 'static,
Resp: 'static,
@@ -271,7 +270,8 @@ where
pending_responses: responses,
responses_tx,
in_flight_requests: FnvHashMap::default(),
}.unwrap_or_else(move |e| {
}
.unwrap_or_else(move |e| {
info!("[{}] ClientHandler errored out: {}", peer, e);
})
}
@@ -305,36 +305,33 @@ where
Req: Send + 'static,
Resp: Send + 'static,
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
F: FnMut(Context, Req) -> Fut + Send + 'static,
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
{
/// If at max in-flight requests, check that there's room to immediately write a throttled
/// response.
fn poll_ready_if_throttling(
self: &mut Pin<&mut Self>,
cx: &LocalWaker,
) -> Poll<io::Result<()>> {
fn poll_ready_if_throttling(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
if self.in_flight_requests.len()
>= self.channel.config.max_in_flight_requests_per_connection
{
let peer = self.channel().client_addr;
let peer = self.as_mut().channel().client_addr;
while let Poll::Pending = self.channel().poll_ready(cx)? {
while let Poll::Pending = self.as_mut().channel().poll_ready(cx)? {
info!(
"[{}] In-flight requests at max ({}), and transport is not ready.",
peer,
self.in_flight_requests().len(),
self.as_mut().in_flight_requests().len(),
);
try_ready!(self.channel().poll_flush(cx));
try_ready!(self.as_mut().channel().poll_flush(cx));
}
}
Poll::Ready(Ok(()))
}
fn pump_read(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<Option<io::Result<()>>> {
ready!(self.poll_ready_if_throttling(cx)?);
fn pump_read(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<()> {
ready!(self.as_mut().poll_ready_if_throttling(cx)?);
Poll::Ready(match ready!(self.channel().poll_next(cx)?) {
Poll::Ready(match ready!(self.as_mut().channel().poll_next(cx)?) {
Some(message) => {
match message.message {
ClientMessageKind::Request(request) => {
@@ -353,29 +350,25 @@ where
})
}
fn pump_write(
self: &mut Pin<&mut Self>,
cx: &LocalWaker,
read_half_closed: bool,
) -> Poll<Option<io::Result<()>>> {
match self.poll_next_response(cx)? {
fn pump_write(mut self: Pin<&mut Self>, cx: &Waker, read_half_closed: bool) -> PollIo<()> {
match self.as_mut().poll_next_response(cx)? {
Poll::Ready(Some((_, response))) => {
self.channel().start_send(response)?;
self.as_mut().channel().start_send(response)?;
Poll::Ready(Some(Ok(())))
}
Poll::Ready(None) => {
// Shutdown can't be done before we finish pumping out remaining responses.
ready!(self.channel().poll_flush(cx)?);
ready!(self.as_mut().channel().poll_flush(cx)?);
Poll::Ready(None)
}
Poll::Pending => {
// No more requests to process, so flush any requests buffered in the transport.
ready!(self.channel().poll_flush(cx)?);
ready!(self.as_mut().channel().poll_flush(cx)?);
// Being here means there are no staged requests and all written responses are
// fully flushed. So, if the read half is closed and there are no in-flight
// requests, then we can close the write half.
if read_half_closed && self.in_flight_requests().is_empty() {
if read_half_closed && self.as_mut().in_flight_requests().is_empty() {
Poll::Ready(None)
} else {
Poll::Pending
@@ -385,28 +378,33 @@ where
}
fn poll_next_response(
self: &mut Pin<&mut Self>,
cx: &LocalWaker,
) -> Poll<Option<io::Result<(Context, Response<Resp>)>>> {
mut self: Pin<&mut Self>,
cx: &Waker,
) -> PollIo<(Context, Response<Resp>)> {
// Ensure there's room to write a response.
while let Poll::Pending = self.channel().poll_ready(cx)? {
ready!(self.channel().poll_flush(cx)?);
while let Poll::Pending = self.as_mut().channel().poll_ready(cx)? {
ready!(self.as_mut().channel().poll_flush(cx)?);
}
let peer = self.channel().client_addr;
let peer = self.as_mut().channel().client_addr;
match ready!(self.pending_responses().poll_next(cx)) {
match ready!(self.as_mut().pending_responses().poll_next(cx)) {
Some((ctx, response)) => {
if let Some(_) = self.in_flight_requests().remove(&response.request_id) {
self.in_flight_requests().compact(0.1);
if self
.as_mut()
.in_flight_requests()
.remove(&response.request_id)
.is_some()
{
self.as_mut().in_flight_requests().compact(0.1);
}
trace!(
"[{}/{}] Staging response. In-flight requests = {}.",
ctx.trace_id(),
peer,
self.in_flight_requests().len(),
self.as_mut().in_flight_requests().len(),
);
return Poll::Ready(Some(Ok((ctx, response))));
Poll::Ready(Some(Ok((ctx, response))))
}
None => {
// This branch likely won't happen, since the ClientHandler is holding a Sender.
@@ -417,30 +415,37 @@ where
}
fn handle_request(
self: &mut Pin<&mut Self>,
mut self: Pin<&mut Self>,
trace_context: trace::Context,
request: Request<Req>,
) -> io::Result<()> {
let request_id = request.id;
let peer = self.channel().client_addr;
let peer = self.as_mut().channel().client_addr;
let ctx = Context {
deadline: request.deadline,
trace_context,
};
let request = request.message;
if self.in_flight_requests().len()
>= self.channel().config.max_in_flight_requests_per_connection
if self.as_mut().in_flight_requests().len()
>= self
.as_mut()
.channel()
.config
.max_in_flight_requests_per_connection
{
debug!(
"[{}/{}] Client has reached in-flight request limit ({}/{}).",
ctx.trace_id(),
peer,
self.in_flight_requests().len(),
self.channel().config.max_in_flight_requests_per_connection
self.as_mut().in_flight_requests().len(),
self.as_mut()
.channel()
.config
.max_in_flight_requests_per_connection
);
self.channel().start_send(Response {
self.as_mut().channel().start_send(Response {
request_id,
message: Err(ServerError {
kind: io::ErrorKind::WouldBlock,
@@ -459,10 +464,10 @@ where
format_rfc3339(deadline),
timeout,
);
let mut response_tx = self.responses_tx().clone();
let mut response_tx = self.as_mut().responses_tx().clone();
let trace_id = *ctx.trace_id();
let response = self.f()(ctx.clone(), request);
let response = self.as_mut().f().clone()(ctx, request);
let response = deadline_compat::Deadline::new(response, Instant::now() + timeout).then(
async move |result| {
let response = Response {
@@ -477,28 +482,29 @@ where
},
);
let (abortable_response, abort_handle) = abortable(response);
crate::spawn(abortable_response.map(|_| ()))
.map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!(
"Could not spawn response task. Is shutdown: {}",
e.is_shutdown()
),
)
})?;
self.in_flight_requests().insert(request_id, abort_handle);
crate::spawn(abortable_response.map(|_| ())).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!(
"Could not spawn response task. Is shutdown: {}",
e.is_shutdown()
),
)
})?;
self.as_mut()
.in_flight_requests()
.insert(request_id, abort_handle);
Ok(())
}
fn cancel_request(self: &mut Pin<&mut Self>, trace_context: &trace::Context, request_id: u64) {
fn cancel_request(mut self: Pin<&mut Self>, trace_context: &trace::Context, request_id: u64) {
// It's possible the request was already completed, so it's fine
// if this is None.
if let Some(cancel_handle) = self.in_flight_requests().remove(&request_id) {
self.in_flight_requests().compact(0.1);
if let Some(cancel_handle) = self.as_mut().in_flight_requests().remove(&request_id) {
self.as_mut().in_flight_requests().compact(0.1);
cancel_handle.abort();
let remaining = self.in_flight_requests().len();
let remaining = self.as_mut().in_flight_requests().len();
trace!(
"[{}/{}] Request canceled. In-flight requests = {}",
trace_context.trace_id,
@@ -521,16 +527,19 @@ where
Req: Send + 'static,
Resp: Send + 'static,
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
F: FnMut(Context, Req) -> Fut + Send + 'static,
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
{
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
fn poll(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
trace!("[{}] ClientHandler::poll", self.channel.client_addr);
loop {
let read = self.pump_read(cx)?;
match (read, self.pump_write(cx, read == Poll::Ready(None))?) {
let read = self.as_mut().pump_read(cx)?;
match (
read,
self.as_mut().pump_write(cx, read == Poll::Ready(None))?,
) {
(Poll::Ready(None), Poll::Ready(None)) => {
info!("[{}] Client disconnected.", self.channel.client_addr);
return Poll::Ready(Ok(()));

View File

@@ -6,8 +6,8 @@
//! Transports backed by in-memory channels.
use crate::Transport;
use futures::{channel::mpsc, task::{LocalWaker}, Poll, Sink, Stream};
use crate::{PollIo, Transport};
use futures::{channel::mpsc, task::Waker, Poll, Sink, Stream};
use pin_utils::unsafe_pinned;
use std::pin::Pin;
use std::{
@@ -45,7 +45,7 @@ impl<Item, SinkItem> UnboundedChannel<Item, SinkItem> {
impl<Item, SinkItem> Stream for UnboundedChannel<Item, SinkItem> {
type Item = Result<Item, io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<Option<io::Result<Item>>> {
fn poll_next(self: Pin<&mut Self>, cx: &Waker) -> PollIo<Item> {
self.rx().poll_next(cx).map(|option| option.map(Ok))
}
}
@@ -54,28 +54,25 @@ impl<Item, SinkItem> Sink for UnboundedChannel<Item, SinkItem> {
type SinkItem = SinkItem;
type SinkError = io::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
fn poll_ready(self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
self.tx()
.poll_ready(cx)
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
}
fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
self.tx()
.start_send(item)
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &LocalWaker,
) -> Poll<Result<(), Self::SinkError>> {
fn poll_flush(self: Pin<&mut Self>, cx: &Waker) -> Poll<Result<(), Self::SinkError>> {
self.tx()
.poll_flush(cx)
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
}
fn poll_close(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
fn poll_close(self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
self.tx()
.poll_close(cx)
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
@@ -97,18 +94,23 @@ impl<Item, SinkItem> Transport for UnboundedChannel<Item, SinkItem> {
#[cfg(test)]
mod tests {
use crate::{client::{self, Client}, context, server::{self, Handler, Server}, transport};
use futures::{prelude::*, stream, compat::TokioDefaultSpawner};
use crate::{
client, context,
server::{Handler, Server},
transport,
};
use futures::compat::Executor01CompatExt;
use futures::{prelude::*, stream};
use log::trace;
use std::io;
#[test]
fn integration() {
let _ = env_logger::try_init();
crate::init(TokioDefaultSpawner);
crate::init(tokio::executor::DefaultExecutor::current().compat());
let (client_channel, server_channel) = transport::channel::unbounded();
let server = Server::<String, u64>::new(server::Config::default())
let server = Server::<String, u64>::default()
.incoming(stream::once(future::ready(Ok(server_channel))))
.respond_with(|_ctx, request| {
future::ready(request.parse::<u64>().map_err(|_| {
@@ -120,7 +122,7 @@ mod tests {
});
let responses = async {
let mut client = await!(Client::new(client::Config::default(), client_channel))?;
let mut client = await!(client::new(client::Config::default(), client_channel))?;
let response1 = await!(client.call(context::current(), "123".into()));
let response2 = await!(client.call(context::current(), "abc".into()));

View File

@@ -10,7 +10,12 @@
//! can be plugged in, using whatever protocol it wants.
use futures::prelude::*;
use std::{io, net::SocketAddr};
use std::{
io,
net::SocketAddr,
pin::Pin,
task::{Poll, Waker},
};
pub mod channel;
@@ -30,3 +35,87 @@ where
/// The address of the local half of this transport.
fn local_addr(&self) -> io::Result<SocketAddr>;
}
/// Returns a new Transport backed by the given Stream + Sink and connecting addresses.
pub fn new<S, Item>(
inner: S,
peer_addr: SocketAddr,
local_addr: SocketAddr,
) -> impl Transport<Item = Item, SinkItem = S::SinkItem>
where
S: Stream<Item = io::Result<Item>>,
S: Sink<SinkError = io::Error>,
{
TransportShim {
inner,
peer_addr,
local_addr,
}
}
/// A transport created by adding peers to a Stream + Sink.
#[derive(Debug)]
struct TransportShim<S> {
peer_addr: SocketAddr,
local_addr: SocketAddr,
inner: S,
}
impl<S> TransportShim<S> {
pin_utils::unsafe_pinned!(inner: S);
}
impl<S> Stream for TransportShim<S>
where
S: Stream,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<S::Item>> {
self.inner().poll_next(waker)
}
}
impl<S> Sink for TransportShim<S>
where
S: Sink,
{
type SinkItem = S::SinkItem;
type SinkError = S::SinkError;
fn start_send(self: Pin<&mut Self>, item: S::SinkItem) -> Result<(), S::SinkError> {
self.inner().start_send(item)
}
fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
self.inner().poll_ready(waker)
}
fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
self.inner().poll_flush(waker)
}
fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
self.inner().poll_close(waker)
}
}
impl<S, Item> Transport for TransportShim<S>
where
S: Stream + Sink,
Self: Stream<Item = io::Result<Item>>,
Self: Sink<SinkItem = S::SinkItem, SinkError = io::Error>,
{
type Item = Item;
type SinkItem = S::SinkItem;
/// The address of the remote peer this transport is in communication with.
fn peer_addr(&self) -> io::Result<SocketAddr> {
Ok(self.peer_addr)
}
/// The address of the local half of this transport.
fn local_addr(&self) -> io::Result<SocketAddr> {
Ok(self.local_addr)
}
}

View File

@@ -7,7 +7,8 @@
use futures::{
compat::{Compat01As03, Future01CompatExt},
prelude::*,
ready, task::{Poll, LocalWaker},
ready,
task::{Poll, Waker},
};
use pin_utils::unsafe_pinned;
use std::pin::Pin;
@@ -49,10 +50,9 @@ where
{
type Output = Result<T::Ok, timeout::Error<T::Error>>;
fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
// First, try polling the future
match self.future().try_poll(waker) {
match self.as_mut().future().try_poll(waker) {
Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)),
Poll::Pending => {}
Poll::Ready(Err(e)) => return Poll::Ready(Err(timeout::Error::inner(e))),

View File

@@ -31,6 +31,7 @@ where
}
/// Serializes [`io::ErrorKind`] as a `u32`.
#[allow(clippy::trivially_copy_pass_by_ref)] // Exact fn signature required by serde derive
pub fn serialize_io_error_kind_as_u32<S>(
kind: &io::ErrorKind,
serializer: S,
@@ -59,7 +60,8 @@ where
Other => 16,
UnexpectedEof => 17,
_ => 16,
}.serialize(serializer)
}
.serialize(serializer)
}
/// Deserializes [`io::ErrorKind`] from a `u32`.

View File

@@ -1,8 +1,6 @@
cargo-features = ["rename-dependency"]
[package]
name = "tarpc"
version = "0.13.0"
version = "0.15.0"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
edition = "2018"
license = "MIT"
@@ -24,15 +22,20 @@ travis-ci = { repository = "google/tarpc" }
log = "0.4"
serde = { optional = true, version = "1.0" }
tarpc-plugins = { path = "../plugins", version = "0.5.0" }
rpc = { package = "tarpc-lib", path = "../rpc", version = "0.1" }
rpc = { package = "tarpc-lib", path = "../rpc", version = "0.3" }
[target.'cfg(not(test))'.dependencies]
futures-preview = "0.3.0-alpha.8"
futures-preview = "0.3.0-alpha.13"
[dev-dependencies]
bincode = "1.0"
bytes = { version = "0.4", features = ["serde"] }
humantime = "1.0"
futures-preview = { version = "0.3.0-alpha.8", features = ["compat", "tokio-compat"] }
bincode-transport = { package = "tarpc-bincode-transport", version = "0.1", path = "../bincode-transport" }
env_logger = "0.5"
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] }
bincode-transport = { package = "tarpc-bincode-transport", version = "0.4", path = "../bincode-transport" }
env_logger = "0.6"
libtest = "0.0.1"
tokio = "0.1"
tokio-executor = "0.1"
tokio-tcp = "0.1"
pin-utils = "0.1.0-alpha.4"

1
tarpc/README.md Symbolic link
View File

@@ -0,0 +1 @@
../README.md

View File

@@ -6,12 +6,11 @@
#![feature(
arbitrary_self_types,
pin,
futures_api,
await_macro,
async_await,
existential_type,
proc_macro_hygiene,
proc_macro_hygiene
)]
use futures::{
@@ -55,7 +54,7 @@ struct Subscriber {
impl subscriber::Service for Subscriber {
type ReceiveFut = Ready<()>;
fn receive(&self, _: context::Context, message: String) -> Self::ReceiveFut {
fn receive(self, _: context::Context, message: String) -> Self::ReceiveFut {
println!("{} received message: {}", self.id, message);
future::ready(())
}
@@ -66,13 +65,13 @@ impl Subscriber {
let incoming = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = incoming.local_addr();
tokio_executor::spawn(
Server::new(config)
server::new(config)
.incoming(incoming)
.take(1)
.respond_with(subscriber::serve(Subscriber { id }))
.unit_error()
.boxed()
.compat()
.compat(),
);
Ok(addr)
}
@@ -94,7 +93,7 @@ impl Publisher {
impl publisher::Service for Publisher {
existential type BroadcastFut: Future<Output = ()>;
fn broadcast(&self, _: context::Context, message: String) -> Self::BroadcastFut {
fn broadcast(self, _: context::Context, message: String) -> Self::BroadcastFut {
async fn broadcast(clients: Arc<Mutex<HashMap<u32, subscriber::Client>>>, message: String) {
let mut clients = clients.lock().unwrap().clone();
for client in clients.values_mut() {
@@ -110,7 +109,7 @@ impl publisher::Service for Publisher {
existential type SubscribeFut: Future<Output = Result<(), String>>;
fn subscribe(&self, _: context::Context, id: u32, addr: SocketAddr) -> Self::SubscribeFut {
fn subscribe(self, _: context::Context, id: u32, addr: SocketAddr) -> Self::SubscribeFut {
async fn subscribe(
clients: Arc<Mutex<HashMap<u32, subscriber::Client>>>,
id: u32,
@@ -128,7 +127,7 @@ impl publisher::Service for Publisher {
existential type UnsubscribeFut: Future<Output = ()>;
fn unsubscribe(&self, _: context::Context, id: u32) -> Self::UnsubscribeFut {
fn unsubscribe(self, _: context::Context, id: u32) -> Self::UnsubscribeFut {
println!("Unsubscribing {}", id);
let mut clients = self.clients.lock().unwrap();
if let None = clients.remove(&id) {
@@ -146,13 +145,13 @@ async fn run() -> io::Result<()> {
let transport = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let publisher_addr = transport.local_addr();
tokio_executor::spawn(
Server::new(server::Config::default())
Server::default()
.incoming(transport)
.take(1)
.respond_with(publisher::serve(Publisher::new()))
.unit_error()
.boxed()
.compat()
.compat(),
);
let subscriber1 = await!(Subscriber::listen(0, server::Config::default()))?;
@@ -180,12 +179,6 @@ async fn run() -> io::Result<()> {
}
fn main() {
tokio::run(
run()
.boxed()
.map_err(|e| panic!(e))
.boxed()
.compat(),
);
tokio::run(run().boxed().map_err(|e| panic!(e)).boxed().compat());
thread::sleep(Duration::from_millis(100));
}

View File

@@ -6,20 +6,20 @@
#![feature(
futures_api,
pin,
arbitrary_self_types,
await_macro,
async_await,
proc_macro_hygiene,
proc_macro_hygiene
)]
use futures::{
compat::Executor01CompatExt,
future::{self, Ready},
prelude::*,
};
use rpc::{
client, context,
server::{self, Handler, Server},
server::{Handler, Server},
};
use std::io;
@@ -41,7 +41,7 @@ impl Service for HelloServer {
type HelloFut = Ready<String>;
fn hello(&self, _: context::Context, name: String) -> Self::HelloFut {
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
future::ready(format!("Hello, {}!", name))
}
}
@@ -53,7 +53,7 @@ async fn run() -> io::Result<()> {
let addr = transport.local_addr();
// The server is configured with the defaults.
let server = Server::new(server::Config::default())
let server = Server::default()
// Server can listen on any type that implements the Transport trait.
.incoming(transport)
// Close the stream after the client connects
@@ -82,6 +82,8 @@ async fn run() -> io::Result<()> {
}
fn main() {
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(
run()
.map_err(|e| eprintln!("Oh no: {}", e))

View File

@@ -7,21 +7,21 @@
#![feature(
existential_type,
arbitrary_self_types,
pin,
futures_api,
await_macro,
async_await,
proc_macro_hygiene,
proc_macro_hygiene
)]
use crate::{add::Service as AddService, double::Service as DoubleService};
use futures::{
compat::Executor01CompatExt,
future::{self, Ready},
prelude::*,
};
use rpc::{
client, context,
server::{self, Handler, Server},
server::{Handler, Server},
};
use std::io;
@@ -45,7 +45,7 @@ struct AddServer;
impl AddService for AddServer {
type AddFut = Ready<i32>;
fn add(&self, _: context::Context, x: i32, y: i32) -> Self::AddFut {
fn add(self, _: context::Context, x: i32, y: i32) -> Self::AddFut {
future::ready(x + y)
}
}
@@ -58,7 +58,7 @@ struct DoubleServer {
impl DoubleService for DoubleServer {
existential type DoubleFut: Future<Output = Result<i32, String>> + Send;
fn double(&self, _: context::Context, x: i32) -> Self::DoubleFut {
fn double(self, _: context::Context, x: i32) -> Self::DoubleFut {
async fn double(mut client: add::Client, x: i32) -> Result<i32, String> {
let result = await!(client.add(context::current(), x, x));
result.map_err(|e| e.to_string())
@@ -71,7 +71,7 @@ impl DoubleService for DoubleServer {
async fn run() -> io::Result<()> {
let add_listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = add_listener.local_addr();
let add_server = Server::new(server::Config::default())
let add_server = Server::default()
.incoming(add_listener)
.take(1)
.respond_with(add::serve(AddServer));
@@ -82,7 +82,7 @@ async fn run() -> io::Result<()> {
let double_listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = double_listener.local_addr();
let double_server = rpc::Server::new(server::Config::default())
let double_server = rpc::Server::default()
.incoming(double_listener)
.take(1)
.respond_with(double::serve(DoubleServer { add_client }));
@@ -102,10 +102,6 @@ async fn run() -> io::Result<()> {
fn main() {
env_logger::init();
tokio::run(
run()
.map_err(|e| panic!(e))
.boxed()
.compat(),
);
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(run().map_err(|e| panic!(e)).boxed().compat());
}

View File

@@ -0,0 +1,414 @@
#![feature(
async_await,
await_macro,
futures_api,
arbitrary_self_types,
proc_macro_hygiene,
impl_trait_in_bindings
)]
mod registry {
use bytes::Bytes;
use futures::{
future::{ready, Ready},
prelude::*,
};
use serde::{Deserialize, Serialize};
use std::{
io,
pin::Pin,
sync::Arc,
task::{Poll, Waker},
};
use tarpc::{
client::{self, Client},
context,
};
/// A request to a named service.
#[derive(Serialize, Deserialize)]
pub struct ServiceRequest {
service_name: String,
request: Bytes,
}
/// A response from a named service.
#[derive(Serialize, Deserialize)]
pub struct ServiceResponse {
response: Bytes,
}
/// A list of registered services.
pub struct Registry<Services> {
registrations: Services,
}
impl Default for Registry<Nil> {
fn default() -> Self {
Registry { registrations: Nil }
}
}
impl<Services: MaybeServe + Sync> Registry<Services> {
/// Returns a function that serves requests for the registered services.
pub fn serve(
self,
) -> impl FnOnce(
context::Context,
ServiceRequest,
) -> Either<Services::Future, Ready<io::Result<ServiceResponse>>>
+ Clone {
let registrations = Arc::new(self.registrations);
move |cx, req: ServiceRequest| match registrations.serve(cx, &req) {
Some(serve) => Either::Left(serve),
None => Either::Right(ready(Err(io::Error::new(
io::ErrorKind::NotFound,
format!("Service '{}' not registered", req.service_name),
)))),
}
}
/// Registers `serve` with the given `name` using the given serialization scheme.
pub fn register<S, Req, Resp, RespFut, Ser, De>(
self,
name: String,
serve: S,
deserialize: De,
serialize: Ser,
) -> Registry<Registration<impl Serve + Send + 'static, Services>>
where
Req: Send,
S: FnOnce(context::Context, Req) -> RespFut + Send + 'static + Clone,
RespFut: Future<Output = io::Result<Resp>> + Send + 'static,
De: FnOnce(Bytes) -> io::Result<Req> + Send + 'static + Clone,
Ser: FnOnce(Resp) -> io::Result<Bytes> + Send + 'static + Clone,
{
let registrations = Registration {
name: name,
serve: move |cx, req: Bytes| {
async move {
let req = deserialize.clone()(req)?;
let response = await!(serve.clone()(cx, req))?;
let response = serialize.clone()(response)?;
Ok(ServiceResponse { response })
}
},
rest: self.registrations,
};
Registry { registrations }
}
}
/// Creates a client that sends requests to a service
/// named `service_name`, over the given channel, using
/// the specified serialization scheme.
pub fn new_client<Req, Resp, Ser, De>(
service_name: String,
channel: &client::Channel<ServiceRequest, ServiceResponse>,
mut serialize: Ser,
mut deserialize: De,
) -> client::MapResponse<
client::WithRequest<
client::Channel<ServiceRequest, ServiceResponse>,
impl FnMut(Req) -> ServiceRequest,
>,
impl FnMut(ServiceResponse) -> Resp,
>
where
Req: Send + 'static,
Resp: Send + 'static,
De: FnMut(Bytes) -> io::Result<Resp> + Clone + Send + 'static,
Ser: FnMut(Req) -> io::Result<Bytes> + Clone + Send + 'static,
{
channel
.clone()
.with_request(move |req| {
ServiceRequest {
service_name: service_name.clone(),
// TODO: shouldn't need to unwrap here. Maybe with_request should allow for
// returning Result.
request: serialize(req).unwrap(),
}
})
// TODO: same thing. Maybe this should be more like and_then rather than map.
.map_response(move |resp| deserialize(resp.response).unwrap())
}
/// Serves a request.
///
/// This trait is mostly an implementation detail that isn't used outside of the registry
/// internals.
pub trait Serve: Clone + Send + 'static {
type Response: Future<Output = io::Result<ServiceResponse>> + Send + 'static;
fn serve(self, cx: context::Context, request: Bytes) -> Self::Response;
}
/// Serves a request if the request is for a registered service.
///
/// This trait is mostly an implementation detail that isn't used outside of the registry
/// internals.
pub trait MaybeServe: Send + 'static {
type Future: Future<Output = io::Result<ServiceResponse>> + Send + 'static;
fn serve(&self, cx: context::Context, request: &ServiceRequest) -> Option<Self::Future>;
}
/// A registry starting with service S, followed by Rest.
///
/// This type is mostly an implementation detail that is not used directly
/// outside of the registry internals.
pub struct Registration<S, Rest> {
/// The registered service's name. Must be unique across all registered services.
name: String,
/// The registered service.
serve: S,
/// Any remaining registered services.
rest: Rest,
}
/// An empty registry.
///
/// This type is mostly an implementation detail that is not used directly
/// outside of the registry internals.
pub struct Nil;
impl MaybeServe for Nil {
type Future = futures::future::Ready<io::Result<ServiceResponse>>;
fn serve(&self, _: context::Context, _: &ServiceRequest) -> Option<Self::Future> {
None
}
}
impl<S, Rest> MaybeServe for Registration<S, Rest>
where
S: Serve,
Rest: MaybeServe,
{
type Future = Either<S::Response, Rest::Future>;
fn serve(&self, cx: context::Context, request: &ServiceRequest) -> Option<Self::Future> {
if self.name == request.service_name {
Some(Either::Left(
self.serve.clone().serve(cx, request.request.clone()),
))
} else {
self.rest.serve(cx, request).map(Either::Right)
}
}
}
/// Wraps either of two future types that both resolve to the same output type.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub enum Either<Left, Right> {
Left(Left),
Right(Right),
}
impl<Output, Left, Right> Future for Either<Left, Right>
where
Left: Future<Output = Output>,
Right: Future<Output = Output>,
{
type Output = Output;
fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Output> {
unsafe {
match Pin::get_unchecked_mut(self) {
Either::Left(car) => Pin::new_unchecked(car).poll(waker),
Either::Right(cdr) => Pin::new_unchecked(cdr).poll(waker),
}
}
}
}
impl<Resp, F> Serve for F
where
F: FnOnce(context::Context, Bytes) -> Resp + Clone + Send + 'static,
Resp: Future<Output = io::Result<ServiceResponse>> + Send + 'static,
{
type Response = Resp;
fn serve(self, cx: context::Context, request: Bytes) -> Resp {
self(cx, request)
}
}
}
// Example
use bytes::Bytes;
use futures::{
compat::Executor01CompatExt,
future::{ready, Ready},
prelude::*,
};
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
io,
sync::{Arc, RwLock},
};
use tarpc::{client, context, server::Handler};
fn deserialize<Req>(req: Bytes) -> io::Result<Req>
where
Req: for<'a> Deserialize<'a> + Send,
{
bincode::deserialize(req.as_ref()).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
fn serialize<Resp>(resp: Resp) -> io::Result<Bytes>
where
Resp: Serialize,
{
Ok(bincode::serialize(&resp)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
.into())
}
mod write_service {
tarpc::service! {
rpc write(key: String, value: String);
}
}
mod read_service {
tarpc::service! {
rpc read(key: String) -> Option<String>;
}
}
#[derive(Debug, Default, Clone)]
struct Server {
data: Arc<RwLock<HashMap<String, String>>>,
}
impl write_service::Service for Server {
type WriteFut = Ready<()>;
fn write(self, _: context::Context, key: String, value: String) -> Self::WriteFut {
self.data.write().unwrap().insert(key, value);
ready(())
}
}
impl read_service::Service for Server {
type ReadFut = Ready<Option<String>>;
fn read(self, _: context::Context, key: String) -> Self::ReadFut {
ready(self.data.read().unwrap().get(&key).cloned())
}
}
trait DefaultSpawn {
fn spawn(self);
}
impl<F> DefaultSpawn for F
where
F: Future<Output = ()> + Send + 'static,
{
fn spawn(self) {
tokio_executor::spawn(self.unit_error().boxed().compat())
}
}
struct BincodeRegistry<Services> {
registry: registry::Registry<Services>,
}
impl Default for BincodeRegistry<registry::Nil> {
fn default() -> Self {
BincodeRegistry {
registry: registry::Registry::default(),
}
}
}
impl<Services: registry::MaybeServe + Sync> BincodeRegistry<Services> {
fn serve(
self,
) -> impl FnOnce(
context::Context,
registry::ServiceRequest,
) -> registry::Either<
Services::Future,
Ready<io::Result<registry::ServiceResponse>>,
> + Clone {
self.registry.serve()
}
fn register<S, Req, Resp, RespFut>(
self,
name: String,
serve: S,
) -> BincodeRegistry<registry::Registration<impl registry::Serve + Send + 'static, Services>>
where
Req: for<'a> Deserialize<'a> + Send + 'static,
Resp: Serialize + 'static,
S: FnOnce(context::Context, Req) -> RespFut + Send + 'static + Clone,
RespFut: Future<Output = io::Result<Resp>> + Send + 'static,
{
let registry = self.registry.register(name, serve, deserialize, serialize);
BincodeRegistry { registry }
}
}
pub fn new_client<Req, Resp>(
service_name: String,
channel: &client::Channel<registry::ServiceRequest, registry::ServiceResponse>,
) -> client::MapResponse<
client::WithRequest<
client::Channel<registry::ServiceRequest, registry::ServiceResponse>,
impl FnMut(Req) -> registry::ServiceRequest,
>,
impl FnMut(registry::ServiceResponse) -> Resp,
>
where
Req: Serialize + Send + 'static,
Resp: for<'a> Deserialize<'a> + Send + 'static,
{
registry::new_client(service_name, channel, serialize, deserialize)
}
async fn run() -> io::Result<()> {
let server = Server::default();
let registry = BincodeRegistry::default()
.register(
"WriteService".to_string(),
write_service::serve(server.clone()),
)
.register(
"ReadService".to_string(),
read_service::serve(server.clone()),
);
let listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let server_addr = listener.local_addr();
let server = tarpc::Server::default()
.incoming(listener)
.take(1)
.respond_with(registry.serve());
tokio_executor::spawn(server.unit_error().boxed().compat());
let transport = await!(bincode_transport::connect(&server_addr))?;
let channel = await!(client::new(client::Config::default(), transport))?;
let write_client = new_client("WriteService".to_string(), &channel);
let mut write_client = write_service::Client::from(write_client);
let read_client = new_client("ReadService".to_string(), &channel);
let mut read_client = read_service::Client::from(read_client);
await!(write_client.write(context::current(), "key".to_string(), "val".to_string()))?;
let val = await!(read_client.read(context::current(), "key".to_string()))?;
println!("{:?}", val);
Ok(())
}
fn main() {
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(run().boxed().map_err(|e| panic!(e)).boxed().compat());
}

View File

@@ -1 +1 @@
edition = "Edition2018"
edition = "2018"

View File

@@ -4,123 +4,13 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
//! tarpc is an RPC framework for rust with a focus on ease of use. Defining a
//! service can be done in just a few lines of code, and most of the boilerplate of
//! writing a server is taken care of for you.
//!
//! ## What is an RPC framework?
//! "RPC" stands for "Remote Procedure Call," a function call where the work of
//! producing the return value is being done somewhere else. When an rpc function is
//! invoked, behind the scenes the function contacts some other process somewhere
//! and asks them to evaluate the function instead. The original function then
//! returns the value produced by the other process.
//!
//! RPC frameworks are a fundamental building block of most microservices-oriented
//! architectures. Two well-known ones are [gRPC](http://www.grpc.io) and
//! [Cap'n Proto](https://capnproto.org/).
//!
//! tarpc differentiates itself from other RPC frameworks by defining the schema in code,
//! rather than in a separate language such as .proto. This means there's no separate compilation
//! process, and no cognitive context switching between different languages. Additionally, it
//! works with the community-backed library serde: any serde-serializable type can be used as
//! arguments to tarpc fns.
//!
//! ## Example
//!
//! Here's a small service.
//!
//! ```rust
//! #![feature(futures_api, pin, arbitrary_self_types, await_macro, async_await, proc_macro_hygiene)]
//!
//!
//! use futures::{
//! compat::TokioDefaultSpawner,
//! future::{self, Ready},
//! prelude::*,
//! };
//! use tarpc::{
//! client, context,
//! server::{self, Handler, Server},
//! };
//! use std::io;
//!
//! // This is the service definition. It looks a lot like a trait definition.
//! // It defines one RPC, hello, which takes one arg, name, and returns a String.
//! tarpc::service! {
//! /// Returns a greeting for name.
//! rpc hello(name: String) -> String;
//! }
//!
//! // This is the type that implements the generated Service trait. It is the business logic
//! // and is used to start the server.
//! #[derive(Clone)]
//! struct HelloServer;
//!
//! impl Service for HelloServer {
//! // Each defined rpc generates two items in the trait, a fn that serves the RPC, and
//! // an associated type representing the future output by the fn.
//!
//! type HelloFut = Ready<String>;
//!
//! fn hello(&self, _: context::Context, name: String) -> Self::HelloFut {
//! future::ready(format!("Hello, {}!", name))
//! }
//! }
//!
//! async fn run() -> io::Result<()> {
//! // bincode_transport is provided by the associated crate bincode-transport. It makes it easy
//! // to start up a serde-powered bincode serialization strategy over TCP.
//! let transport = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
//! let addr = transport.local_addr();
//!
//! // The server is configured with the defaults.
//! let server = Server::new(server::Config::default())
//! // Server can listen on any type that implements the Transport trait.
//! .incoming(transport)
//! // Close the stream after the client connects
//! .take(1)
//! // serve is generated by the service! macro. It takes as input any type implementing
//! // the generated Service trait.
//! .respond_with(serve(HelloServer));
//!
//! tokio_executor::spawn(server.unit_error().boxed().compat());
//!
//! let transport = await!(bincode_transport::connect(&addr))?;
//!
//! // new_stub is generated by the service! macro. Like Server, it takes a config and any
//! // Transport as input, and returns a Client, also generated by the macro.
//! // by the service mcro.
//! let mut client = await!(new_stub(client::Config::default(), transport))?;
//!
//! // The client has an RPC method for each RPC defined in service!. It takes the same args
//! // as defined, with the addition of a Context, which is always the first arg. The Context
//! // specifies a deadline and trace information which can be helpful in debugging requests.
//! let hello = await!(client.hello(context::current(), "Stim".to_string()))?;
//!
//! println!("{}", hello);
//!
//! Ok(())
//! }
//!
//! fn main() {
//! tarpc::init(TokioDefaultSpawner);
//! tokio::run(run()
//! .map_err(|e| eprintln!("Oh no: {}", e))
//! .boxed()
//! .compat(),
//! );
//! }
//! ```
#![doc(include = "../README.md")]
#![deny(missing_docs, missing_debug_implementations)]
#![feature(
futures_api,
pin,
await_macro,
async_await,
decl_macro,
#![feature(async_await, external_doc)]
#![cfg_attr(
test,
feature(futures_api, await_macro, proc_macro_hygiene, arbitrary_self_types)
)]
#![cfg_attr(test, feature(proc_macro_hygiene, arbitrary_self_types))]
#[doc(hidden)]
pub use futures;

View File

@@ -51,6 +51,9 @@ macro_rules! add_serde_if_enabled {
///
#[macro_export]
macro_rules! service {
() => {
compile_error!("Must define at least one RPC method.");
};
// Entry point
(
$(
@@ -112,24 +115,26 @@ macro_rules! service {
)*
) => {
$crate::add_serde_if_enabled! {
/// The request sent over the wire from the client to the server.
#[derive(Debug)]
#[doc(hidden)]
#[allow(non_camel_case_types, unused)]
--
pub enum Request__ {
pub enum Request {
$(
$(#[$attr])*
$fn_name{ $($arg: $in_,)* }
),*
}
}
$crate::add_serde_if_enabled! {
/// The response sent over the wire from the server to the client.
#[derive(Debug)]
#[doc(hidden)]
#[allow(non_camel_case_types, unused)]
--
pub enum Response__ {
pub enum Response {
$(
$(#[$attr])*
$fn_name($out)
),*
}
@@ -149,37 +154,39 @@ macro_rules! service {
}
$(#[$attr])*
fn $fn_name(&self, ctx: $crate::context::Context, $($arg:$in_),*) -> $crate::ty_snake_to_camel!(Self::$fn_name);
fn $fn_name(self, ctx: $crate::context::Context, $($arg:$in_),*) -> $crate::ty_snake_to_camel!(Self::$fn_name);
)*
}
// TODO: use an existential type instead of this when existential types work.
/// A future resolving to a server [`Response`].
#[allow(non_camel_case_types)]
pub enum Response<S: Service> {
pub enum ResponseFut<S: Service> {
$(
$(#[$attr])*
$fn_name($crate::ty_snake_to_camel!(<S as Service>::$fn_name)),
)*
}
impl<S: Service> ::std::fmt::Debug for Response<S> {
impl<S: Service> ::std::fmt::Debug for ResponseFut<S> {
fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
fmt.debug_struct("Response").finish()
}
}
impl<S: Service> ::std::future::Future for Response<S> {
type Output = ::std::io::Result<Response__>;
impl<S: Service> ::std::future::Future for ResponseFut<S> {
type Output = ::std::io::Result<Response>;
fn poll(self: ::std::pin::Pin<&mut Self>, waker: &::std::task::LocalWaker)
-> ::std::task::Poll<::std::io::Result<Response__>>
fn poll(self: ::std::pin::Pin<&mut Self>, waker: &::std::task::Waker)
-> ::std::task::Poll<::std::io::Result<Response>>
{
unsafe {
match ::std::pin::Pin::get_mut_unchecked(self) {
match ::std::pin::Pin::get_unchecked_mut(self) {
$(
Response::$fn_name(resp) =>
ResponseFut::$fn_name(resp) =>
::std::pin::Pin::new_unchecked(resp)
.poll(waker)
.map(Response__::$fn_name)
.map(Response::$fn_name)
.map(Ok),
)*
}
@@ -189,13 +196,13 @@ macro_rules! service {
/// Returns a serving function to use with rpc::server::Server.
pub fn serve<S: Service>(service: S)
-> impl FnMut($crate::context::Context, Request__) -> Response<S> + Send + 'static + Clone {
-> impl FnOnce($crate::context::Context, Request) -> ResponseFut<S> + Send + 'static + Clone {
move |ctx, req| {
match req {
$(
Request__::$fn_name{ $($arg,)* } => {
let resp = Service::$fn_name(&mut service.clone(), ctx, $($arg),*);
Response::$fn_name(resp)
Request::$fn_name{ $($arg,)* } => {
let resp = Service::$fn_name(service.clone(), ctx, $($arg),*);
ResponseFut::$fn_name(resp)
}
)*
}
@@ -205,30 +212,40 @@ macro_rules! service {
#[allow(unused)]
#[derive(Clone, Debug)]
/// The client stub that makes RPC calls to the server. Exposes a Future interface.
pub struct Client($crate::client::Client<Request__, Response__>);
pub struct Client<C = $crate::client::Channel<Request, Response>>(C);
/// Returns a new client stub that sends requests over the given transport.
pub async fn new_stub<T>(config: $crate::client::Config, transport: T)
-> ::std::io::Result<Client>
where
T: $crate::Transport<
Item = $crate::Response<Response__>,
SinkItem = $crate::ClientMessage<Request__>> + Send,
Item = $crate::Response<Response>,
SinkItem = $crate::ClientMessage<Request>> + Send + 'static,
{
Ok(Client(await!($crate::client::Client::new(config, transport))?))
Ok(Client(await!($crate::client::new(config, transport))?))
}
impl Client {
impl<C> From<C> for Client<C>
where for <'a> C: $crate::Client<'a, Request, Response = Response>
{
fn from(client: C) -> Self {
Client(client)
}
}
impl<C> Client<C>
where for<'a> C: $crate::Client<'a, Request, Response = Response>
{
$(
#[allow(unused)]
$(#[$attr])*
pub fn $fn_name(&mut self, ctx: $crate::context::Context, $($arg: $in_),*)
-> impl ::std::future::Future<Output = ::std::io::Result<$out>> + '_ {
let request__ = Request__::$fn_name { $($arg,)* };
let resp = self.0.call(ctx, request__);
let request__ = Request::$fn_name { $($arg,)* };
let resp = $crate::Client::call(&mut self.0, ctx, request__);
async move {
match await!(resp)? {
Response__::$fn_name(msg__) => ::std::result::Result::Ok(msg__),
Response::$fn_name(msg__) => ::std::result::Result::Ok(msg__),
_ => unreachable!(),
}
}
@@ -265,15 +282,11 @@ mod syntax_test {
#[cfg(test)]
mod functional_test {
use futures::{
compat::TokioDefaultSpawner,
compat::Executor01CompatExt,
future::{ready, Ready},
prelude::*,
};
use rpc::{
client, context,
server::{self, Handler},
transport::channel,
};
use rpc::{client, context, server::Handler, transport::channel};
use std::io;
use tokio::runtime::current_thread;
@@ -288,13 +301,13 @@ mod functional_test {
impl Service for Server {
type AddFut = Ready<i32>;
fn add(&self, _: context::Context, x: i32, y: i32) -> Self::AddFut {
fn add(self, _: context::Context, x: i32, y: i32) -> Self::AddFut {
ready(x + y)
}
type HeyFut = Ready<String>;
fn hey(&self, _: context::Context, name: String) -> Self::HeyFut {
fn hey(self, _: context::Context, name: String) -> Self::HeyFut {
ready(format!("Hey, {}.", name))
}
}
@@ -302,17 +315,17 @@ mod functional_test {
#[test]
fn sequential() {
let _ = env_logger::try_init();
rpc::init(TokioDefaultSpawner);
rpc::init(tokio::executor::DefaultExecutor::current().compat());
let test = async {
let (tx, rx) = channel::unbounded();
tokio_executor::spawn(
rpc::Server::new(server::Config::default())
crate::Server::default()
.incoming(stream::once(ready(Ok(rx))))
.respond_with(serve(Server))
.unit_error()
.boxed()
.compat()
.compat(),
);
let mut client = await!(new_stub(client::Config::default(), tx))?;
@@ -331,17 +344,17 @@ mod functional_test {
#[test]
fn concurrent() {
let _ = env_logger::try_init();
rpc::init(TokioDefaultSpawner);
rpc::init(tokio::executor::DefaultExecutor::current().compat());
let test = async {
let (tx, rx) = channel::unbounded();
tokio_executor::spawn(
rpc::Server::new(server::Config::default())
rpc::Server::default()
.incoming(stream::once(ready(Ok(rx))))
.respond_with(serve(Server))
.unit_error()
.boxed()
.compat()
.compat(),
);
let client = await!(new_stub(client::Config::default(), tx))?;

View File

@@ -7,22 +7,19 @@
#![feature(
test,
arbitrary_self_types,
pin,
integer_atomics,
futures_api,
generators,
await_macro,
async_await,
proc_macro_hygiene,
proc_macro_hygiene
)]
extern crate test;
use self::test::stats::Stats;
use futures::{compat::TokioDefaultSpawner, future, prelude::*};
use futures::{compat::Executor01CompatExt, future, prelude::*};
use libtest::stats::Stats;
use rpc::{
client, context,
server::{self, Handler, Server},
server::{Handler, Server},
};
use std::{
io,
@@ -41,7 +38,7 @@ struct Serve;
impl ack::Service for Serve {
type AckFut = future::Ready<()>;
fn ack(&self, _: context::Context) -> Self::AckFut {
fn ack(self, _: context::Context) -> Self::AckFut {
future::ready(())
}
}
@@ -51,13 +48,13 @@ async fn bench() -> io::Result<()> {
let addr = listener.local_addr();
tokio_executor::spawn(
Server::new(server::Config::default())
Server::default()
.incoming(listener)
.take(1)
.respond_with(ack::serve(Serve))
.unit_error()
.boxed()
.compat()
.compat(),
);
let conn = await!(bincode_transport::connect(&addr))?;
@@ -120,12 +117,7 @@ async fn bench() -> io::Result<()> {
#[test]
fn bench_small_packet() {
env_logger::init();
tarpc::init(TokioDefaultSpawner);
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(
bench()
.map_err(|e| panic!(e.to_string()))
.boxed()
.compat(),
)
tokio::run(bench().map_err(|e| panic!(e.to_string())).boxed().compat())
}

View File

@@ -1,6 +1,6 @@
[package]
name = "tarpc-trace"
version = "0.1.0"
version = "0.2.0"
authors = ["tikue <tikue@google.com>"]
edition = '2018'
license = "MIT"
@@ -13,7 +13,7 @@ readme = "../README.md"
description = "foundations for tracing in tarpc"
[dependencies]
rand = "0.5"
rand = "0.6"
[dependencies.serde]
version = "1.0"

View File

@@ -1 +1 @@
edition = "Edition2018"
edition = "2018"

View File

@@ -27,10 +27,7 @@ use std::{
/// Consists of a span identifying an event, an optional parent span identifying a causal event
/// that triggered the current span, and a trace with which all related spans are associated.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
#[cfg_attr(
feature = "serde",
derive(serde::Serialize, serde::Deserialize)
)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Context {
/// An identifier of the trace associated with the current context. A trace ID is typically
/// created at a root span and passed along through all causal events.
@@ -50,18 +47,12 @@ pub struct Context {
/// A 128-bit UUID identifying a trace. All spans caused by the same originating span share the
/// same trace ID.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
#[cfg_attr(
feature = "serde",
derive(serde::Serialize, serde::Deserialize)
)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct TraceId(u128);
/// A 64-bit identifier of a span within a trace. The identifier is unique within the span's trace.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
#[cfg_attr(
feature = "serde",
derive(serde::Serialize, serde::Deserialize)
)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct SpanId(u64);
impl Context {
@@ -80,7 +71,7 @@ impl TraceId {
/// Returns a random trace ID that can be assumed to be globally unique if `rng` generates
/// actually-random numbers.
pub fn random<R: Rng>(rng: &mut R) -> Self {
TraceId((rng.next_u64() as u128) << mem::size_of::<u64>() | rng.next_u64() as u128)
TraceId(u128::from(rng.next_u64()) << mem::size_of::<u64>() | u128::from(rng.next_u64()))
}
}