16 Commits

Author SHA1 Message Date
Tim Kuehn
00751d2518 external_doc doesn't work with crates.io yet :( 2018-10-29 11:05:09 -07:00
Tim Kuehn
4394a52b65 Add doc tests to .travis.yml 2018-10-29 10:55:12 -07:00
Tim Kuehn
70938501d7 Use eternal_doc for tarpc package. This will ensure our README is always up-to-date. 2018-10-29 10:53:34 -07:00
Tim Kuehn
d5f5cf4300 Bump versions. 2018-10-29 10:43:41 -07:00
Tim Kuehn
e2c4164d8c Remove unused feature enablements from tarpc 2018-10-25 11:44:38 -07:00
Tim Kuehn
78124ef7a8 Cargo fmt 2018-10-25 11:44:18 -07:00
Tim Kuehn
096d354b7e Remove unused features 2018-10-25 11:41:08 -07:00
Tim
7ad0e4b070 Service registry (#204)
# Changes

## Client is now a trait
And `Channel<Req, Resp>` implements `Client<Req, Resp>`. Previously, `Client<Req, Resp>` was a thin wrapper around `Channel<Req, Resp>`.

This was changed to allow for mapping the request and response types. For example, you can take a `channel: Channel<Req, Resp>` and do:

```rust
channel
    .with_request(|req: Req2| -> Req { ... })
    .map_response(|resp: Resp| -> Resp2 { ... })
```

...which returns a type that implements `Client<Req2, Resp2>`.

### Why would you want to map request and response types?

The main benefit of this is that it enables creating different client types backed by the same channel. For example, you could run multiple clients multiplexing requests over a single `TcpStream`. I have a demo in `tarpc/examples/service_registry.rs` showing how you might do this with a bincode transport. I am considering factoring out the service registry portion of that to an actual library, because it's doing pretty cool stuff. For this PR, though, it'll just be part of the example.

## Client::new is now client::new

This is pretty minor, but necessary because async fns can't currently exist on traits. I changed `Server::new` to match this as well.

## Macro-generated Clients are generic over the backing Client.

This is a natural consequence of the above change. However, it is transparent to the user by keeping `Channel<Req, Resp>` as the default type for the `<C: Client>` type parameter. `new_stub` returns `Client<Channel<Req, Resp>>`, and other clients can be created via the `From` trait.

## example-service/ now has two binaries, one for client and one for server.

This serves as a "realistic" example of how one might set up a service. The other examples all run the client and server in the same binary, which isn't realistic in distributed systems use cases.

## `service!` trait fns take self by value.

Services are already cloned per request, so this just passes on that flexibility to the trait implementers.

# Open Questions

In the service registry example, multiple services are running on a single port, and thus multiple clients are sending requests over a single `TcpStream`. This has implications for throttling: [`max_in_flight_requests_per_connection`](https://github.com/google/tarpc/blob/master/rpc/src/server/mod.rs#L57-L60) will set a maximum for the sum of requests for all clients sharing a single connection. I think this is reasonable behavior, but users may expect this setting to act like `max_in_flight_requests_per_client`.

Fixes #103 #153 #205
2018-10-25 11:22:55 -07:00
Tim
64755d5329 Update futures 2018-10-19 11:19:25 -07:00
Tim Kuehn
3071422132 Helper fn to create transports 2018-10-18 00:24:26 -07:00
Tim Kuehn
8847330dbe impl From<S> for bincode::Transport<S> 2018-10-18 00:24:08 -07:00
Tim Kuehn
6d396520f4 Don't allow empty service invocations 2018-10-18 00:23:34 -07:00
Tim Kuehn
79a2f7fe2f Replace tokio-serde-bincode with async-bincode 2018-10-17 20:24:31 -07:00
Tim Kuehn
af66841f68 Remove keyword 2018-10-17 11:59:09 -07:00
Tim
1ab4cfdff9 Make Request and Resonse enums' docs public, because they show up in the serve fn. 2018-10-16 23:02:52 -07:00
Tim
f7e03eeeb7 Fix up readme 2018-10-16 22:28:57 -07:00
38 changed files with 1544 additions and 820 deletions

View File

@@ -9,4 +9,5 @@ os:
- linux
script:
- cargo test --all --all-features
- cargo test --all-targets --all-features
- cargo test --doc --all-features

View File

@@ -31,17 +31,12 @@ works with the community-backed library serde: any serde-serializable type can b
arguments to tarpc fns.
## Usage
**NB**: *this example is for master. Are you looking for other
[versions](https://docs.rs/tarpc)?*
Add to your `Cargo.toml` dependencies:
```toml
tarpc = "0.12.0"
tarpc-plugins = "0.4.0"
tarpc = "0.14.0"
```
The `service!` macro expands to a collection of items that form an
rpc service. In the above example, the macro is called within the
`hello_service` module. This module will contain a `Client` stub and `Service` trait. There is
@@ -49,29 +44,29 @@ These generated types make it easy and ergonomic to write servers without dealin
directly. Simply implement one of the generated traits, and you're off to the
races!
## Example:
## Example
Here's a small service.
```rust
#![feature(plugin, futures_api, pin, arbitrary_self_types, await_macro, async_await)]
#![plugin(tarpc_plugins)]
#![feature(futures_api, pin, arbitrary_self_types, await_macro, async_await, proc_macro_hygiene)]
use futures::{
compat::TokioDefaultSpawner,
future::{self, Ready},
prelude::*,
spawn,
};
use tarpc::rpc::{
use tarpc::{
client, context,
server::{self, Handler, Server},
server::{self, Handler},
};
use std::io;
// This is the service definition. It looks a lot like a trait definition.
// It defines one RPC, hello, which takes one arg, name, and returns a String.
tarpc::service! {
/// Returns a greeting for name.
rpc hello(name: String) -> String;
}
@@ -86,7 +81,7 @@ impl Service for HelloServer {
type HelloFut = Ready<String>;
fn hello(&self, _: context::Context, name: String) -> Self::HelloFut {
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
future::ready(format!("Hello, {}!", name))
}
}
@@ -98,7 +93,7 @@ async fn run() -> io::Result<()> {
let addr = transport.local_addr();
// The server is configured with the defaults.
let server = Server::new(server::Config::default())
let server = server::new(server::Config::default())
// Server can listen on any type that implements the Transport trait.
.incoming(transport)
// Close the stream after the client connects
@@ -107,14 +102,14 @@ async fn run() -> io::Result<()> {
// the generated Service trait.
.respond_with(serve(HelloServer));
spawn!(server).unwrap();
tokio_executor::spawn(server.unit_error().boxed().compat());
let transport = await!(bincode_transport::connect(&addr))?;
// new_stub is generated by the service! macro. Like Server, it takes a config and any
// Transport as input, and returns a Client, also generated by the macro.
// by the service mcro.
let mut client = await!(new_stub(client::Config::default(), transport));
let mut client = await!(new_stub(client::Config::default(), transport))?;
// The client has an RPC method for each RPC defined in service!. It takes the same args
// as defined, with the addition of a Context, which is always the first arg. The Context
@@ -127,10 +122,11 @@ async fn run() -> io::Result<()> {
}
fn main() {
tarpc::init(TokioDefaultSpawner);
tokio::run(run()
.map_err(|e| eprintln!("Oh no: {}", e))
.boxed()
.compat(TokioDefaultSpawner),
.compat(),
);
}
```
@@ -139,13 +135,3 @@ fn main() {
Use `cargo doc` as you normally would to see the documentation created for all
items expanded by a `service!` invocation.
## Contributing
To contribute to tarpc, please see [CONTRIBUTING](CONTRIBUTING.md).
## License
tarpc is distributed under the terms of the MIT license.
See [LICENSE](LICENSE) for details.

View File

@@ -2,7 +2,7 @@ cargo-features = ["rename-dependency"]
[package]
name = "tarpc-bincode-transport"
version = "0.1.0"
version = "0.3.0"
authors = ["Tim Kuehn <tikue@google.com>"]
edition = '2018'
license = "MIT"
@@ -16,22 +16,19 @@ description = "A bincode-based transport for tarpc services."
[dependencies]
bincode = { version = "1.0", features = ["i128"] }
bytes = "0.4"
futures_legacy = { version = "0.1", package = "futures" }
pin-utils = "0.1.0-alpha.2"
rpc = { package = "tarpc-lib", version = "0.1", path = "../rpc", features = ["serde1"] }
pin-utils = "0.1.0-alpha.3"
rpc = { package = "tarpc-lib", version = "0.2", path = "../rpc", features = ["serde1"] }
serde = "1.0"
tokio = "0.1"
tokio-io = "0.1"
tokio-serde-bincode = "0.1"
async-bincode = "0.4"
tokio-tcp = "0.1"
tokio-serde = "0.2"
[target.'cfg(not(test))'.dependencies]
futures-preview = { version = "0.3.0-alpha.8", features = ["compat"] }
futures-preview = { version = "0.3.0-alpha.9", features = ["compat"] }
[dev-dependencies]
futures-preview = { version = "0.3.0-alpha.8", features = ["compat", "tokio-compat"] }
futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] }
env_logger = "0.5"
humantime = "1.0"
log = "0.4"

View File

@@ -1 +1 @@
edition = "Edition2018"
edition = "2018"

View File

@@ -0,0 +1,150 @@
use futures::{compat::Stream01CompatExt, prelude::*, ready};
use futures_legacy::{
executor::{
self as executor01, Notify as Notify01, NotifyHandle as NotifyHandle01,
UnsafeNotify as UnsafeNotify01,
},
Async as Async01, AsyncSink as AsyncSink01, Sink as Sink01, Stream as Stream01,
};
use std::{
pin::Pin,
task::{self, LocalWaker, Poll},
};
/// A shim to convert a 0.1 Sink + Stream to a 0.3 Sink + Stream.
#[derive(Debug)]
pub struct Compat<S, SinkItem> {
staged_item: Option<SinkItem>,
inner: S,
}
impl<S, SinkItem> Compat<S, SinkItem> {
/// Returns a new Compat.
pub fn new(inner: S) -> Self {
Compat {
inner,
staged_item: None,
}
}
/// Unwraps Compat, returning the inner value.
pub fn into_inner(self) -> S {
self.inner
}
/// Returns a reference to the value wrapped by Compat.
pub fn get_ref(&self) -> &S {
&self.inner
}
}
impl<S, SinkItem> Stream for Compat<S, SinkItem>
where
S: Stream01,
{
type Item = Result<S::Item, S::Error>;
fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<Self::Item>> {
unsafe {
let inner = &mut Pin::get_mut_unchecked(self).inner;
let mut compat = inner.compat();
let compat = Pin::new_unchecked(&mut compat);
match ready!(compat.poll_next(waker)) {
None => Poll::Ready(None),
Some(Ok(next)) => Poll::Ready(Some(Ok(next))),
Some(Err(e)) => Poll::Ready(Some(Err(e))),
}
}
}
}
impl<S, SinkItem> Sink for Compat<S, SinkItem>
where
S: Sink01<SinkItem = SinkItem>,
{
type SinkItem = SinkItem;
type SinkError = S::SinkError;
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> Result<(), S::SinkError> {
let me = unsafe { Pin::get_mut_unchecked(self) };
assert!(me.staged_item.is_none());
me.staged_item = Some(item);
Ok(())
}
fn poll_ready(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
let notify = &WakerToHandle(waker);
executor01::with_notify(notify, 0, move || {
let me = unsafe { Pin::get_mut_unchecked(self) };
match me.staged_item.take() {
Some(staged_item) => match me.inner.start_send(staged_item) {
Ok(AsyncSink01::Ready) => Poll::Ready(Ok(())),
Ok(AsyncSink01::NotReady(item)) => {
me.staged_item = Some(item);
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
},
None => Poll::Ready(Ok(())),
}
})
}
fn poll_flush(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
let notify = &WakerToHandle(waker);
executor01::with_notify(notify, 0, move || {
let me = unsafe { Pin::get_mut_unchecked(self) };
match me.inner.poll_complete() {
Ok(Async01::Ready(())) => Poll::Ready(Ok(())),
Ok(Async01::NotReady) => Poll::Pending,
Err(e) => Poll::Ready(Err(e)),
}
})
}
fn poll_close(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
let notify = &WakerToHandle(waker);
executor01::with_notify(notify, 0, move || {
let me = unsafe { Pin::get_mut_unchecked(self) };
match me.inner.close() {
Ok(Async01::Ready(())) => Poll::Ready(Ok(())),
Ok(Async01::NotReady) => Poll::Pending,
Err(e) => Poll::Ready(Err(e)),
}
})
}
}
#[derive(Clone, Debug)]
struct WakerToHandle<'a>(&'a LocalWaker);
#[derive(Debug)]
struct NotifyWaker(task::Waker);
impl Notify01 for NotifyWaker {
fn notify(&self, _: usize) {
self.0.wake();
}
}
unsafe impl UnsafeNotify01 for NotifyWaker {
unsafe fn clone_raw(&self) -> NotifyHandle01 {
let ptr = Box::new(NotifyWaker(self.0.clone()));
NotifyHandle01::new(Box::into_raw(ptr))
}
unsafe fn drop_raw(&self) {
let ptr: *const dyn UnsafeNotify01 = self;
drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
}
}
impl<'a> From<WakerToHandle<'a>> for NotifyHandle01 {
fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 {
unsafe { NotifyWaker(handle.0.clone().into_waker()).clone_raw() }
}
}

View File

@@ -10,75 +10,149 @@
futures_api,
pin,
arbitrary_self_types,
underscore_imports,
await_macro,
async_await,
async_await
)]
#![deny(missing_docs, missing_debug_implementations)]
mod vendored;
use bytes::{Bytes, BytesMut};
use crate::vendored::tokio_serde_bincode::{IoErrorWrapper, ReadBincode, WriteBincode};
use self::compat::Compat;
use async_bincode::{AsyncBincodeStream, AsyncDestination};
use futures::{
Poll,
compat::{Compat01As03, Future01CompatExt, Stream01CompatExt},
prelude::*,
ready, task,
};
use futures_legacy::{
executor::{
self as executor01, Notify as Notify01, NotifyHandle as NotifyHandle01,
UnsafeNotify as UnsafeNotify01,
},
sink::SinkMapErr as SinkMapErr01,
sink::With as With01,
stream::MapErr as MapErr01,
Async as Async01, AsyncSink as AsyncSink01, Sink as Sink01, Stream as Stream01,
ready,
};
use pin_utils::unsafe_pinned;
use serde::{Deserialize, Serialize};
use std::{fmt, io, marker::PhantomData, net::SocketAddr, pin::Pin, task::LocalWaker};
use tokio::codec::{Framed, LengthDelimitedCodec, length_delimited};
use tokio_tcp::{self, TcpListener, TcpStream};
use std::{
error::Error,
io,
marker::PhantomData,
net::SocketAddr,
pin::Pin,
task::{LocalWaker, Poll},
};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tcp::{TcpListener, TcpStream};
/// Returns a new bincode transport that reads from and writes to `io`.
pub fn new<Item, SinkItem>(io: TcpStream) -> Transport<Item, SinkItem>
where
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
{
let peer_addr = io.peer_addr();
let local_addr = io.local_addr();
let inner = length_delimited::Builder::new()
.max_frame_length(8_000_000)
.new_framed(io)
.map_err(IoErrorWrapper as _)
.sink_map_err(IoErrorWrapper as _)
.with(freeze as _);
let inner = WriteBincode::new(inner);
let inner = ReadBincode::new(inner);
mod compat;
Transport {
inner,
staged_item: None,
peer_addr,
local_addr,
/// A transport that serializes to, and deserializes from, a [`TcpStream`].
#[derive(Debug)]
pub struct Transport<S, Item, SinkItem> {
inner: Compat<AsyncBincodeStream<S, Item, SinkItem, AsyncDestination>, SinkItem>,
}
impl<S, Item, SinkItem> Transport<S, Item, SinkItem> {
/// Returns the transport underlying the bincode transport.
pub fn into_inner(self) -> S {
self.inner.into_inner().into_inner()
}
}
fn freeze(bytes: BytesMut) -> Result<Bytes, IoErrorWrapper> {
Ok(bytes.freeze())
impl<S, Item, SinkItem> Transport<S, Item, SinkItem> {
unsafe_pinned!(
inner: Compat<AsyncBincodeStream<S, Item, SinkItem, AsyncDestination>, SinkItem>
);
}
/// Connects to `addr`, wrapping the connection in a bincode transport.
pub async fn connect<Item, SinkItem>(addr: &SocketAddr) -> io::Result<Transport<Item, SinkItem>>
impl<S, Item, SinkItem> Stream for Transport<S, Item, SinkItem>
where
S: AsyncRead,
Item: for<'a> Deserialize<'a>,
{
type Item = io::Result<Item>;
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<io::Result<Item>>> {
match self.inner().poll_next(waker) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(next))) => Poll::Ready(Some(Ok(next))),
Poll::Ready(Some(Err(e))) => {
Poll::Ready(Some(Err(io::Error::new(io::ErrorKind::Other, e))))
}
}
}
}
impl<S, Item, SinkItem> Sink for Transport<S, Item, SinkItem>
where
S: AsyncWrite,
SinkItem: Serialize,
{
type SinkItem = SinkItem;
type SinkError = io::Error;
fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
self.inner()
.start_send(item)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
fn poll_ready(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
convert(self.inner().poll_ready(waker))
}
fn poll_flush(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
convert(self.inner().poll_flush(waker))
}
fn poll_close(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
convert(self.inner().poll_close(waker))
}
}
fn convert<E: Into<Box<Error + Send + Sync>>>(poll: Poll<Result<(), E>>) -> Poll<io::Result<()>> {
match poll {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))),
}
}
impl<Item, SinkItem> rpc::Transport for Transport<TcpStream, Item, SinkItem>
where
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
{
let stream = await!(TcpStream::connect(addr).compat())?;
Ok(new(stream))
type Item = Item;
type SinkItem = SinkItem;
fn peer_addr(&self) -> io::Result<SocketAddr> {
self.inner.get_ref().get_ref().peer_addr()
}
fn local_addr(&self) -> io::Result<SocketAddr> {
self.inner.get_ref().get_ref().local_addr()
}
}
/// Returns a new bincode transport that reads from and writes to `io`.
pub fn new<Item, SinkItem>(io: TcpStream) -> Transport<TcpStream, Item, SinkItem>
where
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
{
Transport::from(io)
}
impl<S, Item, SinkItem> From<S> for Transport<S, Item, SinkItem> {
fn from(inner: S) -> Self {
Transport {
inner: Compat::new(AsyncBincodeStream::from(inner).for_async()),
}
}
}
/// Connects to `addr`, wrapping the connection in a bincode transport.
pub async fn connect<Item, SinkItem>(
addr: &SocketAddr,
) -> io::Result<Transport<TcpStream, Item, SinkItem>>
where
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
{
Ok(new(await!(TcpStream::connect(addr).compat())?))
}
/// Listens on `addr`, wrapping accepted connections in bincode transports.
@@ -119,168 +193,10 @@ where
Item: for<'a> Deserialize<'a>,
SinkItem: Serialize,
{
type Item = io::Result<Transport<Item, SinkItem>>;
type Item = io::Result<Transport<TcpStream, Item, SinkItem>>;
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<Self::Item>> {
let next = ready!(self.incoming().poll_next(waker)?);
Poll::Ready(next.map(|conn| Ok(new(conn))))
}
}
/// A transport that serializes to, and deserializes from, a [`TcpStream`].
pub struct Transport<Item, SinkItem> {
inner: ReadBincode<
WriteBincode<
With01<
SinkMapErr01<
MapErr01<
Framed<tokio_tcp::TcpStream, LengthDelimitedCodec>,
fn(std::io::Error) -> IoErrorWrapper,
>,
fn(std::io::Error) -> IoErrorWrapper,
>,
BytesMut,
fn(BytesMut) -> Result<Bytes, IoErrorWrapper>,
Result<Bytes, IoErrorWrapper>
>,
SinkItem,
>,
Item,
>,
staged_item: Option<SinkItem>,
peer_addr: io::Result<SocketAddr>,
local_addr: io::Result<SocketAddr>,
}
impl<Item, SinkItem> fmt::Debug for Transport<Item, SinkItem> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Transport")
}
}
impl<Item, SinkItem> Stream for Transport<Item, SinkItem>
where
Item: for<'a> Deserialize<'a>,
{
type Item = io::Result<Item>;
fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<io::Result<Item>>> {
unsafe {
let inner = &mut Pin::get_mut_unchecked(self).inner;
let mut compat = inner.compat();
let compat = Pin::new_unchecked(&mut compat);
match ready!(compat.poll_next(waker)) {
None => Poll::Ready(None),
Some(Ok(next)) => Poll::Ready(Some(Ok(next))),
Some(Err(e)) => Poll::Ready(Some(Err(e.0))),
}
}
}
}
impl<Item, SinkItem> Sink for Transport<Item, SinkItem>
where
SinkItem: Serialize,
{
type SinkItem = SinkItem;
type SinkError = io::Error;
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
let me = unsafe { Pin::get_mut_unchecked(self) };
assert!(me.staged_item.is_none());
me.staged_item = Some(item);
Ok(())
}
fn poll_ready(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
let notify = &WakerToHandle(waker);
executor01::with_notify(notify, 0, move || {
let me = unsafe { Pin::get_mut_unchecked(self) };
match me.staged_item.take() {
Some(staged_item) => match me.inner.start_send(staged_item)? {
AsyncSink01::Ready => Poll::Ready(Ok(())),
AsyncSink01::NotReady(item) => {
me.staged_item = Some(item);
Poll::Pending
}
},
None => Poll::Ready(Ok(())),
}
})
}
fn poll_flush(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
let notify = &WakerToHandle(waker);
executor01::with_notify(notify, 0, move || {
let me = unsafe { Pin::get_mut_unchecked(self) };
match me.inner.poll_complete()? {
Async01::Ready(()) => Poll::Ready(Ok(())),
Async01::NotReady => Poll::Pending,
}
})
}
fn poll_close(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
let notify = &WakerToHandle(waker);
executor01::with_notify(notify, 0, move || {
let me = unsafe { Pin::get_mut_unchecked(self) };
match me.inner.get_mut().close()? {
Async01::Ready(()) => Poll::Ready(Ok(())),
Async01::NotReady => Poll::Pending,
}
})
}
}
impl<Item, SinkItem> rpc::Transport for Transport<Item, SinkItem>
where
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
{
type Item = Item;
type SinkItem = SinkItem;
fn peer_addr(&self) -> io::Result<SocketAddr> {
// TODO: should just access from the inner transport.
// https://github.com/alexcrichton/tokio-serde-bincode/issues/4
Ok(*self.peer_addr.as_ref().unwrap())
}
fn local_addr(&self) -> io::Result<SocketAddr> {
Ok(*self.local_addr.as_ref().unwrap())
}
}
#[derive(Clone, Debug)]
struct WakerToHandle<'a>(&'a LocalWaker);
#[derive(Debug)]
struct NotifyWaker(task::Waker);
impl Notify01 for NotifyWaker {
fn notify(&self, _: usize) {
self.0.wake();
}
}
unsafe impl UnsafeNotify01 for NotifyWaker {
unsafe fn clone_raw(&self) -> NotifyHandle01 {
let ptr = Box::new(NotifyWaker(self.0.clone()));
NotifyHandle01::new(Box::into_raw(ptr))
}
unsafe fn drop_raw(&self) {
let ptr: *const dyn UnsafeNotify01 = self;
drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
}
}
impl<'a> From<WakerToHandle<'a>> for NotifyHandle01 {
fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 {
unsafe { NotifyWaker(handle.0.clone().into_waker()).clone_raw() }
}
}

View File

@@ -1,7 +0,0 @@
// Copyright 2018 Google LLC
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
pub(crate) mod tokio_serde_bincode;

View File

@@ -1,224 +0,0 @@
//! `Stream` and `Sink` adaptors for serializing and deserializing values using
//! Bincode.
//!
//! This crate provides adaptors for going from a stream or sink of buffers
//! ([`Bytes`]) to a stream or sink of values by performing Bincode encoding or
//! decoding. It is expected that each yielded buffer contains a single
//! serialized Bincode value. The specific strategy by which this is done is left
//! up to the user. One option is to use using [`length_delimited`] from
//! [tokio-io].
//!
//! [`Bytes`]: https://docs.rs/bytes/0.4/bytes/struct.Bytes.html
//! [`length_delimited`]: http://alexcrichton.com/tokio-io/tokio_io/codec/length_delimited/index.html
//! [tokio-io]: http://github.com/alexcrichton/tokio-io
//! [examples]: https://github.com/carllerche/tokio-serde-json/tree/master/examples
#![allow(missing_debug_implementations)]
use bincode::Error;
use bytes::{Bytes, BytesMut};
use futures_legacy::{Poll, Sink, StartSend, Stream};
use serde::{Deserialize, Serialize};
use std::io;
use tokio_serde::{Deserializer, FramedRead, FramedWrite, Serializer};
use std::marker::PhantomData;
/// Adapts a stream of Bincode encoded buffers to a stream of values by
/// deserializing them.
///
/// `ReadBincode` implements `Stream` by polling the inner buffer stream and
/// deserializing the buffer as Bincode. It expects that each yielded buffer
/// represents a single Bincode value and does not contain any extra trailing
/// bytes.
pub(crate) struct ReadBincode<T, U> {
inner: FramedRead<T, U, Bincode<U>>,
}
/// Adapts a buffer sink to a value sink by serializing the values as Bincode.
///
/// `WriteBincode` implements `Sink` by serializing the submitted values to a
/// buffer. The buffer is then sent to the inner stream, which is responsible
/// for handling framing on the wire.
pub(crate) struct WriteBincode<T: Sink, U> {
inner: FramedWrite<T, U, Bincode<U>>,
}
struct Bincode<T> {
ghost: PhantomData<T>,
}
impl<T, U> ReadBincode<T, U>
where
T: Stream<Error = IoErrorWrapper>,
U: for<'de> Deserialize<'de>,
Bytes: From<T::Item>,
{
/// Creates a new `ReadBincode` with the given buffer stream.
pub fn new(inner: T) -> ReadBincode<T, U> {
let json = Bincode { ghost: PhantomData };
ReadBincode {
inner: FramedRead::new(inner, json),
}
}
}
impl<T, U> ReadBincode<T, U> {
/// Returns a mutable reference to the underlying stream wrapped by
/// `ReadBincode`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_mut(&mut self) -> &mut T {
self.inner.get_mut()
}
}
impl<T, U> Stream for ReadBincode<T, U>
where
T: Stream<Error = IoErrorWrapper>,
U: for<'de> Deserialize<'de>,
Bytes: From<T::Item>,
{
type Item = U;
type Error = <T as Stream>::Error;
fn poll(&mut self) -> Poll<Option<U>, Self::Error> {
self.inner.poll()
}
}
impl<T, U> Sink for ReadBincode<T, U>
where
T: Sink,
{
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
self.get_mut().start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
self.get_mut().poll_complete()
}
fn close(&mut self) -> Poll<(), T::SinkError> {
self.get_mut().close()
}
}
pub(crate) struct IoErrorWrapper(pub io::Error);
impl From<Box<bincode::ErrorKind>> for IoErrorWrapper {
fn from(e: Box<bincode::ErrorKind>) -> Self {
IoErrorWrapper(match *e {
bincode::ErrorKind::Io(e) => e,
bincode::ErrorKind::InvalidUtf8Encoding(e) => {
io::Error::new(io::ErrorKind::InvalidInput, e)
}
bincode::ErrorKind::InvalidBoolEncoding(e) => {
io::Error::new(io::ErrorKind::InvalidInput, e.to_string())
}
bincode::ErrorKind::InvalidTagEncoding(e) => {
io::Error::new(io::ErrorKind::InvalidInput, e.to_string())
}
bincode::ErrorKind::InvalidCharEncoding => {
io::Error::new(io::ErrorKind::InvalidInput, "Invalid char encoding")
}
bincode::ErrorKind::DeserializeAnyNotSupported => {
io::Error::new(io::ErrorKind::InvalidInput, "Deserialize Any not supported")
}
bincode::ErrorKind::SizeLimit => {
io::Error::new(io::ErrorKind::InvalidInput, "Size limit exceeded")
}
bincode::ErrorKind::SequenceMustHaveLength => {
io::Error::new(io::ErrorKind::InvalidInput, "Sequence must have length")
}
bincode::ErrorKind::Custom(s) => io::Error::new(io::ErrorKind::Other, s),
})
}
}
impl From<IoErrorWrapper> for io::Error {
fn from(wrapper: IoErrorWrapper) -> io::Error {
wrapper.0
}
}
impl<T, U> WriteBincode<T, U>
where
T: Sink<SinkItem = BytesMut, SinkError = IoErrorWrapper>,
U: Serialize,
{
/// Creates a new `WriteBincode` with the given buffer sink.
pub fn new(inner: T) -> WriteBincode<T, U> {
let json = Bincode { ghost: PhantomData };
WriteBincode {
inner: FramedWrite::new(inner, json),
}
}
}
impl<T: Sink, U> WriteBincode<T, U> {
/// Returns a mutable reference to the underlying sink wrapped by
/// `WriteBincode`.
///
/// Note that care should be taken to not tamper with the underlying sink as
/// it may corrupt the sequence of frames otherwise being worked with.
pub fn get_mut(&mut self) -> &mut T {
self.inner.get_mut()
}
}
impl<T, U> Sink for WriteBincode<T, U>
where
T: Sink<SinkItem = BytesMut, SinkError = IoErrorWrapper>,
U: Serialize,
{
type SinkItem = U;
type SinkError = <T as Sink>::SinkError;
fn start_send(&mut self, item: U) -> StartSend<U, Self::SinkError> {
self.inner.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
self.inner.poll_complete()
}
fn close(&mut self) -> Poll<(), Self::SinkError> {
self.inner.poll_complete()
}
}
impl<T, U> Stream for WriteBincode<T, U>
where
T: Stream + Sink,
{
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
self.get_mut().poll()
}
}
impl<T> Deserializer<T> for Bincode<T>
where
T: for<'de> Deserialize<'de>,
{
type Error = Error;
fn deserialize(&mut self, src: &Bytes) -> Result<T, Error> {
bincode::deserialize(src)
}
}
impl<T: Serialize> Serializer<T> for Bincode<T> {
type Error = Error;
fn serialize(&mut self, item: &T) -> Result<BytesMut, Self::Error> {
bincode::serialize(item).map(Into::into)
}
}

View File

@@ -20,9 +20,8 @@ extern crate test;
use self::test::stats::Stats;
use futures::{compat::TokioDefaultSpawner, prelude::*};
use rpc::{
client::{self, Client},
context,
server::{self, Handler, Server},
client, context,
server::{Handler, Server},
};
use std::{
io,
@@ -34,17 +33,17 @@ async fn bench() -> io::Result<()> {
let addr = listener.local_addr();
tokio_executor::spawn(
Server::<u32, u32>::new(server::Config::default())
Server::<u32, u32>::default()
.incoming(listener)
.take(1)
.respond_with(|_ctx, request| futures::future::ready(Ok(request)))
.unit_error()
.boxed()
.compat()
.compat(),
);
let conn = await!(tarpc_bincode_transport::connect(&addr))?;
let client = &mut await!(Client::<u32, u32>::new(client::Config::default(), conn))?;
let client = &mut await!(client::new::<u32, u32, _>(client::Config::default(), conn))?;
let total = 10_000usize;
let mut successful = 0u32;
@@ -104,12 +103,7 @@ fn bench_small_packet() -> io::Result<()> {
env_logger::init();
rpc::init(TokioDefaultSpawner);
tokio::run(
bench()
.map_err(|e| panic!(e.to_string()))
.boxed()
.compat(),
);
tokio::run(bench().map_err(|e| panic!(e.to_string())).boxed().compat());
println!("done");
Ok(())

View File

@@ -6,7 +6,7 @@
//! Tests client/server control flow.
#![feature(generators, await_macro, async_await, futures_api,)]
#![feature(generators, await_macro, async_await, futures_api)]
use futures::{
compat::{Future01CompatExt, TokioDefaultSpawner},
@@ -15,11 +15,7 @@ use futures::{
};
use log::{info, trace};
use rand::distributions::{Distribution, Normal};
use rpc::{
client::{self, Client},
context,
server::{self, Server},
};
use rpc::{client, context, server::Server};
use std::{
io,
time::{Duration, Instant, SystemTime},
@@ -40,7 +36,7 @@ impl AsDuration for SystemTime {
async fn run() -> io::Result<()> {
let listener = tarpc_bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = listener.local_addr();
let server = Server::<String, String>::new(server::Config::default())
let server = Server::<String, String>::default()
.incoming(listener)
.take(1)
.for_each(async move |channel| {
@@ -80,7 +76,7 @@ async fn run() -> io::Result<()> {
tokio_executor::spawn(server.unit_error().boxed().compat());
let conn = await!(tarpc_bincode_transport::connect(&addr))?;
let client = await!(Client::<String, String>::new(
let client = await!(client::new::<String, String, _>(
client::Config::default(),
conn
))?;
@@ -88,7 +84,7 @@ async fn run() -> io::Result<()> {
// Proxy service
let listener = tarpc_bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = listener.local_addr();
let proxy_server = Server::<String, String>::new(server::Config::default())
let proxy_server = Server::<String, String>::default()
.incoming(listener)
.take(1)
.for_each(move |channel| {
@@ -115,7 +111,7 @@ async fn run() -> io::Result<()> {
config.max_in_flight_requests = 10;
config.pending_request_buffer = 10;
let client = await!(Client::<String, String>::new(
let client = await!(client::new::<String, String, _>(
config,
await!(tarpc_bincode_transport::connect(&addr))?
))?;
@@ -142,11 +138,6 @@ fn cancel_slower() -> io::Result<()> {
env_logger::init();
rpc::init(TokioDefaultSpawner);
tokio::run(
run()
.boxed()
.map_err(|e| panic!(e))
.compat(),
);
tokio::run(run().boxed().map_err(|e| panic!(e)).compat());
Ok(())
}

View File

@@ -6,7 +6,7 @@
//! Tests client/server control flow.
#![feature(generators, await_macro, async_await, futures_api,)]
#![feature(generators, await_macro, async_await, futures_api)]
use futures::{
compat::{Future01CompatExt, TokioDefaultSpawner},
@@ -14,11 +14,7 @@ use futures::{
};
use log::{error, info, trace};
use rand::distributions::{Distribution, Normal};
use rpc::{
client::{self, Client},
context,
server::{self, Server},
};
use rpc::{client, context, server::Server};
use std::{
io,
time::{Duration, Instant, SystemTime},
@@ -39,7 +35,7 @@ impl AsDuration for SystemTime {
async fn run() -> io::Result<()> {
let listener = tarpc_bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = listener.local_addr();
let server = Server::<String, String>::new(server::Config::default())
let server = Server::<String, String>::default()
.incoming(listener)
.take(1)
.for_each(async move |channel| {
@@ -83,7 +79,7 @@ async fn run() -> io::Result<()> {
config.pending_request_buffer = 10;
let conn = await!(tarpc_bincode_transport::connect(&addr))?;
let client = await!(Client::<String, String>::new(config, conn))?;
let client = await!(client::new::<String, String, _>(config, conn))?;
let clients = (1..=100u32).map(|_| client.clone()).collect::<Vec<_>>();
for mut client in clients {
@@ -96,7 +92,10 @@ async fn run() -> io::Result<()> {
Ok(response) => info!("[{}] response: {}", trace_id, response),
Err(e) => error!("[{}] request error: {:?}: {}", trace_id, e.kind(), e),
}
}.unit_error().boxed().compat()
}
.unit_error()
.boxed()
.compat(),
);
}

View File

@@ -2,23 +2,24 @@ cargo-features = ["rename-dependency"]
[package]
name = "tarpc-example-service"
version = "0.1.0"
version = "0.2.0"
authors = ["Tim Kuehn <tikue@google.com>"]
edition = "2018"
license = "MIT"
documentation = "https://docs.rs/tarpc-example-service"
homepage = "https://github.com/google/tarpc"
repository = "https://github.com/google/tarpc"
keywords = ["rpc", "network", "server", "api", "microservices", "example"]
keywords = ["rpc", "network", "server", "microservices", "example"]
categories = ["asynchronous", "network-programming"]
readme = "../README.md"
description = "An example server built on tarpc."
[dependencies]
bincode-transport = { package = "tarpc-bincode-transport", version = "0.1", path = "../bincode-transport" }
futures-preview = { version = "0.3.0-alpha.8", features = ["compat", "tokio-compat"] }
bincode-transport = { package = "tarpc-bincode-transport", version = "0.3", path = "../bincode-transport" }
clap = "2.0"
futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] }
serde = { version = "1.0" }
tarpc = { version = "0.13", path = "../tarpc", features = ["serde1"] }
tarpc = { version = "0.14", path = "../tarpc", features = ["serde1"] }
tokio = "0.1"
tokio-executor = "0.1"
@@ -28,4 +29,8 @@ path = "src/lib.rs"
[[bin]]
name = "server"
path = "src/main.rs"
path = "src/server.rs"
[[bin]]
name = "client"
path = "src/client.rs"

View File

@@ -0,0 +1,79 @@
// Copyright 2018 Google LLC
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
#![feature(
futures_api,
pin,
arbitrary_self_types,
await_macro,
async_await
)]
use clap::{App, Arg};
use futures::{compat::TokioDefaultSpawner, prelude::*};
use std::{io, net::SocketAddr};
use tarpc::{client, context};
async fn run(server_addr: SocketAddr, name: String) -> io::Result<()> {
let transport = await!(bincode_transport::connect(&server_addr))?;
// new_stub is generated by the service! macro. Like Server, it takes a config and any
// Transport as input, and returns a Client, also generated by the macro.
// by the service mcro.
let mut client = await!(service::new_stub(client::Config::default(), transport))?;
// The client has an RPC method for each RPC defined in service!. It takes the same args
// as defined, with the addition of a Context, which is always the first arg. The Context
// specifies a deadline and trace information which can be helpful in debugging requests.
let hello = await!(client.hello(context::current(), name))?;
println!("{}", hello);
Ok(())
}
fn main() {
let flags = App::new("Hello Client")
.version("0.1")
.author("Tim <tikue@google.com>")
.about("Say hello!")
.arg(
Arg::with_name("server_addr")
.long("server_addr")
.value_name("ADDRESS")
.help("Sets the server address to connect to.")
.required(true)
.takes_value(true),
)
.arg(
Arg::with_name("name")
.short("n")
.long("name")
.value_name("STRING")
.help("Sets the name to say hello to.")
.required(true)
.takes_value(true),
)
.get_matches();
tarpc::init(TokioDefaultSpawner);
let server_addr = flags.value_of("server_addr").unwrap();
let server_addr = server_addr
.parse()
.unwrap_or_else(|e| panic!(r#"--server_addr value "{}" invalid: {}"#, server_addr, e));
let name = flags.value_of("name").unwrap();
tarpc::init(TokioDefaultSpawner);
tokio::run(
run(server_addr, name.into())
.map_err(|e| eprintln!("Oh no: {}", e))
.boxed()
.compat(),
);
}

View File

@@ -10,7 +10,7 @@
arbitrary_self_types,
await_macro,
async_await,
proc_macro_hygiene,
proc_macro_hygiene
)]
// This is the service definition. It looks a lot like a trait definition.

View File

@@ -9,19 +9,20 @@
pin,
arbitrary_self_types,
await_macro,
async_await,
async_await
)]
use clap::{App, Arg};
use futures::{
compat::TokioDefaultSpawner,
future::{self, Ready},
prelude::*,
};
use std::{io, net::SocketAddr};
use tarpc::{
client, context,
server::{self, Handler, Server},
context,
server::{Handler, Server},
};
use std::io;
// This is the type that implements the generated Service trait. It is the business logic
// and is used to start the server.
@@ -34,52 +35,56 @@ impl service::Service for HelloServer {
type HelloFut = Ready<String>;
fn hello(&self, _: context::Context, name: String) -> Self::HelloFut {
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
future::ready(format!("Hello, {}!", name))
}
}
async fn run() -> io::Result<()> {
async fn run(server_addr: SocketAddr) -> io::Result<()> {
// bincode_transport is provided by the associated crate bincode-transport. It makes it easy
// to start up a serde-powered bincode serialization strategy over TCP.
let transport = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = transport.local_addr();
let transport = bincode_transport::listen(&server_addr)?;
// The server is configured with the defaults.
let server = Server::new(server::Config::default())
let server = Server::default()
// Server can listen on any type that implements the Transport trait.
.incoming(transport)
// Close the stream after the client connects
.take(1)
// serve is generated by the service! macro. It takes as input any type implementing
// the generated Service trait.
.respond_with(service::serve(HelloServer));
tokio_executor::spawn(server.unit_error().boxed().compat());
let transport = await!(bincode_transport::connect(&addr))?;
// new_stub is generated by the service! macro. Like Server, it takes a config and any
// Transport as input, and returns a Client, also generated by the macro.
// by the service mcro.
let mut client = await!(service::new_stub(client::Config::default(), transport))?;
// The client has an RPC method for each RPC defined in service!. It takes the same args
// as defined, with the addition of a Context, which is always the first arg. The Context
// specifies a deadline and trace information which can be helpful in debugging requests.
let hello = await!(client.hello(context::current(), "Stim".to_string()))?;
println!("{}", hello);
await!(server);
Ok(())
}
fn main() {
let flags = App::new("Hello Server")
.version("0.1")
.author("Tim <tikue@google.com>")
.about("Say hello!")
.arg(
Arg::with_name("port")
.short("p")
.long("port")
.value_name("NUMBER")
.help("Sets the port number to listen on")
.required(true)
.takes_value(true),
)
.get_matches();
let port = flags.value_of("port").unwrap();
let port = port
.parse()
.unwrap_or_else(|e| panic!(r#"--port value "{}" invalid: {}"#, port, e));
tarpc::init(TokioDefaultSpawner);
tokio::run(run()
tokio::run(
run(([0, 0, 0, 0], port).into())
.map_err(|e| eprintln!("Oh no: {}", e))
.boxed()
.compat()
.compat(),
);
}

View File

@@ -1 +1 @@
edition = "Edition2018"
edition = "2018"

View File

@@ -4,31 +4,35 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
extern crate itertools;
extern crate proc_macro;
extern crate proc_macro2;
extern crate syn;
extern crate itertools;
extern crate quote;
extern crate syn;
use proc_macro::TokenStream;
use itertools::Itertools;
use quote::ToTokens;
use syn::{Ident, TraitItemType, TypePath, parse};
use proc_macro2::Span;
use quote::ToTokens;
use std::str::FromStr;
use syn::{parse, Ident, TraitItemType, TypePath};
#[proc_macro]
pub fn snake_to_camel(input: TokenStream) -> TokenStream {
let i = input.clone();
let mut assoc_type = parse::<TraitItemType>(input).unwrap_or_else(|_| panic!("Could not parse trait item from:\n{}", i));
let mut assoc_type = parse::<TraitItemType>(input)
.unwrap_or_else(|_| panic!("Could not parse trait item from:\n{}", i));
let old_ident = convert(&mut assoc_type.ident);
for mut attr in &mut assoc_type.attrs {
if let Some(pair) = attr.path.segments.first() {
if pair.value().ident == "doc" {
attr.tts = proc_macro2::TokenStream::from_str(&attr.tts.to_string().replace("{}", &old_ident)).unwrap();
attr.tts = proc_macro2::TokenStream::from_str(
&attr.tts.to_string().replace("{}", &old_ident),
)
.unwrap();
}
}
}
@@ -41,12 +45,7 @@ pub fn ty_snake_to_camel(input: TokenStream) -> TokenStream {
let mut path = parse::<TypePath>(input).unwrap();
// Only capitalize the final segment
convert(&mut path.path
.segments
.last_mut()
.unwrap()
.into_value()
.ident);
convert(&mut path.path.segments.last_mut().unwrap().into_value().ident);
path.into_token_stream().into()
}

View File

@@ -2,7 +2,7 @@ cargo-features = ["rename-dependency"]
[package]
name = "tarpc-lib"
version = "0.1.0"
version = "0.2.0"
authors = ["Tim Kuehn <tikue@google.com>"]
edition = '2018'
license = "MIT"
@@ -22,17 +22,17 @@ serde1 = ["trace/serde", "serde", "serde/derive"]
fnv = "1.0"
humantime = "1.0"
log = "0.4"
pin-utils = "0.1.0-alpha.2"
pin-utils = "0.1.0-alpha.3"
rand = "0.5"
tokio-timer = "0.2"
trace = { package = "tarpc-trace", version = "0.1", path = "../trace" }
serde = { optional = true, version = "1.0" }
[target.'cfg(not(test))'.dependencies]
futures-preview = { version = "0.3.0-alpha.8", features = ["compat"] }
futures-preview = { version = "0.3.0-alpha.9", features = ["compat"] }
[dev-dependencies]
futures-preview = { version = "0.3.0-alpha.8", features = ["compat", "tokio-compat"] }
futures-test-preview = { version = "0.3.0-alpha.8" }
futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] }
futures-test-preview = { version = "0.3.0-alpha.9" }
env_logger = "0.5"
tokio = "0.1"

View File

@@ -1 +1 @@
edition = "Edition2018"
edition = "2018"

View File

@@ -11,18 +11,19 @@ use crate::{
};
use fnv::FnvHashMap;
use futures::{
Poll,
channel::{mpsc, oneshot},
prelude::*,
ready,
stream::Fuse,
task::LocalWaker,
Poll,
};
use humantime::format_rfc3339;
use log::{debug, error, info, trace};
use pin_utils::unsafe_pinned;
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::{
io,
marker::{self, Unpin},
net::SocketAddr,
pin::Pin,
sync::{
@@ -37,7 +38,7 @@ use super::Config;
/// Handles communication from the client to request dispatch.
#[derive(Debug)]
pub(crate) struct Channel<Req, Resp> {
pub struct Channel<Req, Resp> {
to_dispatch: mpsc::Sender<DispatchRequest<Req, Resp>>,
/// Channel to send a cancel message to the dispatcher.
cancellation: RequestCancellation,
@@ -57,14 +58,58 @@ impl<Req, Resp> Clone for Channel<Req, Resp> {
}
}
/// A future returned by [`Channel::send`] that resolves to a server response.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
struct Send<'a, Req, Resp> {
fut: MapOkDispatchResponse<
MapErrConnectionReset<futures::sink::Send<'a, mpsc::Sender<DispatchRequest<Req, Resp>>>>,
Resp,
>,
}
impl<'a, Req, Resp> Send<'a, Req, Resp> {
unsafe_pinned!(
fut: MapOkDispatchResponse<
MapErrConnectionReset<
futures::sink::Send<'a, mpsc::Sender<DispatchRequest<Req, Resp>>>,
>,
Resp,
>
);
}
impl<'a, Req, Resp> Future for Send<'a, Req, Resp> {
type Output = io::Result<DispatchResponse<Resp>>;
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
self.fut().poll(lw)
}
}
/// A future returned by [`Channel::call`] that resolves to a server response.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct Call<'a, Req, Resp> {
fut: AndThenIdent<Send<'a, Req, Resp>, DispatchResponse<Resp>>,
}
impl<'a, Req, Resp> Call<'a, Req, Resp> {
unsafe_pinned!(fut: AndThenIdent<Send<'a, Req, Resp>, DispatchResponse<Resp>>);
}
impl<'a, Req, Resp> Future for Call<'a, Req, Resp> {
type Output = io::Result<Resp>;
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
self.fut().poll(lw)
}
}
impl<Req, Resp> Channel<Req, Resp> {
/// Sends a request to the dispatch task to forward to the server, returning a [`Future`] that
/// resolves when the request is sent (not when the response is received).
pub(crate) async fn send(
&mut self,
mut ctx: context::Context,
request: Req,
) -> io::Result<DispatchResponse<Resp>> {
fn send<'a>(&'a mut self, mut ctx: context::Context, request: Req) -> Send<'a, Req, Resp> {
// Convert the context to the call context.
ctx.trace_context.parent_id = Some(ctx.trace_context.span_id);
ctx.trace_context.span_id = SpanId::random(&mut rand::thread_rng());
@@ -82,38 +127,40 @@ impl<Req, Resp> Channel<Req, Resp> {
let (response_completion, response) = oneshot::channel();
let cancellation = self.cancellation.clone();
let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed);
await!(self.to_dispatch.send(DispatchRequest {
ctx,
request_id,
request,
response_completion,
})).map_err(|_| io::Error::from(io::ErrorKind::ConnectionReset))?;
Ok(DispatchResponse {
response: deadline_compat::Deadline::new(response, deadline),
complete: false,
request_id,
cancellation,
ctx,
server_addr: self.server_addr,
})
let server_addr = self.server_addr;
Send {
fut: MapOkDispatchResponse::new(
MapErrConnectionReset::new(self.to_dispatch.send(DispatchRequest {
ctx,
request_id,
request,
response_completion,
})),
DispatchResponse {
response: deadline_compat::Deadline::new(response, deadline),
complete: false,
request_id,
cancellation,
ctx,
server_addr,
},
),
}
}
/// Sends a request to the dispatch task to forward to the server, returning a [`Future`] that
/// resolves to the response.
pub(crate) async fn call(
&mut self,
context: context::Context,
request: Req,
) -> io::Result<Resp> {
let response_future = await!(self.send(context, request))?;
await!(response_future)
pub fn call<'a>(&'a mut self, context: context::Context, request: Req) -> Call<'a, Req, Resp> {
Call {
fut: AndThenIdent::new(self.send(context, request)),
}
}
}
/// A server response that is completed by request dispatch when the corresponding response
/// arrives off the wire.
#[derive(Debug)]
pub struct DispatchResponse<Resp> {
struct DispatchResponse<Resp> {
response: deadline_compat::Deadline<oneshot::Receiver<Response<Resp>>>,
ctx: context::Context,
complete: bool,
@@ -205,9 +252,9 @@ pub async fn spawn<Req, Resp, C>(
server_addr: SocketAddr,
) -> io::Result<Channel<Req, Resp>>
where
Req: Send,
Resp: Send,
C: Transport<Item = Response<Resp>, SinkItem = ClientMessage<Req>> + Send,
Req: marker::Send + 'static,
Resp: marker::Send + 'static,
C: Transport<Item = Response<Resp>, SinkItem = ClientMessage<Req>> + marker::Send + 'static,
{
let (to_dispatch, pending_requests) = mpsc::channel(config.pending_request_buffer);
let (cancellation, canceled_requests) = cancellations();
@@ -220,16 +267,18 @@ where
transport: transport.fuse(),
in_flight_requests: FnvHashMap::default(),
pending_requests: pending_requests.fuse(),
}.unwrap_or_else(move |e| error!("[{}] Connection broken: {}", server_addr, e))
).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!(
"Could not spawn client dispatch task. Is shutdown: {}",
e.is_shutdown()
),
)
})?;
}
.unwrap_or_else(move |e| error!("[{}] Connection broken: {}", server_addr, e)),
)
.map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!(
"Could not spawn client dispatch task. Is shutdown: {}",
e.is_shutdown()
),
)
})?;
Ok(Channel {
to_dispatch,
@@ -258,8 +307,8 @@ struct RequestDispatch<Req, Resp, C> {
impl<Req, Resp, C> RequestDispatch<Req, Resp, C>
where
Req: Send,
Resp: Send,
Req: marker::Send,
Resp: marker::Send,
C: Transport<Item = Response<Resp>, SinkItem = ClientMessage<Req>>,
{
unsafe_pinned!(server_addr: SocketAddr);
@@ -462,8 +511,8 @@ where
impl<Req, Resp, C> Future for RequestDispatch<Req, Resp, C>
where
Req: Send,
Resp: Send,
Req: marker::Send,
Resp: marker::Send,
C: Transport<Item = Response<Resp>, SinkItem = ClientMessage<Req>>,
{
type Output = io::Result<()>;
@@ -563,6 +612,185 @@ impl Stream for CanceledRequests {
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
struct MapErrConnectionReset<Fut> {
future: Fut,
finished: Option<()>,
}
impl<Fut> MapErrConnectionReset<Fut> {
unsafe_pinned!(future: Fut);
unsafe_unpinned!(finished: Option<()>);
fn new(future: Fut) -> MapErrConnectionReset<Fut> {
MapErrConnectionReset {
future,
finished: Some(()),
}
}
}
impl<Fut: Unpin> Unpin for MapErrConnectionReset<Fut> {}
impl<Fut> Future for MapErrConnectionReset<Fut>
where
Fut: TryFuture,
{
type Output = io::Result<Fut::Ok>;
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
match self.future().try_poll(lw) {
Poll::Pending => Poll::Pending,
Poll::Ready(result) => {
self.finished().take().expect(
"MapErrConnectionReset must not be polled after it returned `Poll::Ready`",
);
Poll::Ready(result.map_err(|_| io::Error::from(io::ErrorKind::ConnectionReset)))
}
}
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
struct MapOkDispatchResponse<Fut, Resp> {
future: Fut,
response: Option<DispatchResponse<Resp>>,
}
impl<Fut, Resp> MapOkDispatchResponse<Fut, Resp> {
unsafe_pinned!(future: Fut);
unsafe_unpinned!(response: Option<DispatchResponse<Resp>>);
fn new(future: Fut, response: DispatchResponse<Resp>) -> MapOkDispatchResponse<Fut, Resp> {
MapOkDispatchResponse {
future,
response: Some(response),
}
}
}
impl<Fut: Unpin, Resp> Unpin for MapOkDispatchResponse<Fut, Resp> {}
impl<Fut, Resp> Future for MapOkDispatchResponse<Fut, Resp>
where
Fut: TryFuture,
{
type Output = Result<DispatchResponse<Resp>, Fut::Error>;
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
match self.future().try_poll(lw) {
Poll::Pending => Poll::Pending,
Poll::Ready(result) => {
let response = self
.response()
.take()
.expect("MapOk must not be polled after it returned `Poll::Ready`");
Poll::Ready(result.map(|_| response))
}
}
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
struct AndThenIdent<Fut1, Fut2> {
try_chain: TryChain<Fut1, Fut2>,
}
impl<Fut1, Fut2> AndThenIdent<Fut1, Fut2>
where
Fut1: TryFuture<Ok = Fut2>,
Fut2: TryFuture,
{
unsafe_pinned!(try_chain: TryChain<Fut1, Fut2>);
/// Creates a new `Then`.
fn new(future: Fut1) -> AndThenIdent<Fut1, Fut2> {
AndThenIdent {
try_chain: TryChain::new(future),
}
}
}
impl<Fut1, Fut2> Future for AndThenIdent<Fut1, Fut2>
where
Fut1: TryFuture<Ok = Fut2>,
Fut2: TryFuture<Error = Fut1::Error>,
{
type Output = Result<Fut2::Ok, Fut2::Error>;
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
self.try_chain().poll(lw, |result| match result {
Ok(ok) => TryChainAction::Future(ok),
Err(err) => TryChainAction::Output(Err(err)),
})
}
}
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
enum TryChain<Fut1, Fut2> {
First(Fut1),
Second(Fut2),
Empty,
}
enum TryChainAction<Fut2>
where
Fut2: TryFuture,
{
Future(Fut2),
Output(Result<Fut2::Ok, Fut2::Error>),
}
impl<Fut1, Fut2> TryChain<Fut1, Fut2>
where
Fut1: TryFuture<Ok = Fut2>,
Fut2: TryFuture,
{
fn new(fut1: Fut1) -> TryChain<Fut1, Fut2> {
TryChain::First(fut1)
}
fn poll<F>(self: Pin<&mut Self>, lw: &LocalWaker, f: F) -> Poll<Result<Fut2::Ok, Fut2::Error>>
where
F: FnOnce(Result<Fut1::Ok, Fut1::Error>) -> TryChainAction<Fut2>,
{
let mut f = Some(f);
// Safe to call `get_mut_unchecked` because we won't move the futures.
let this = unsafe { Pin::get_mut_unchecked(self) };
loop {
let output = match this {
TryChain::First(fut1) => {
// Poll the first future
match unsafe { Pin::new_unchecked(fut1) }.try_poll(lw) {
Poll::Pending => return Poll::Pending,
Poll::Ready(output) => output,
}
}
TryChain::Second(fut2) => {
// Poll the second future
return unsafe { Pin::new_unchecked(fut2) }.try_poll(lw);
}
TryChain::Empty => {
panic!("future must not be polled after it returned `Poll::Ready`");
}
};
*this = TryChain::Empty; // Drop fut1
let f = f.take().unwrap();
match f(output) {
TryChainAction::Future(fut2) => *this = TryChain::Second(fut2),
TryChainAction::Output(output) => return Poll::Ready(output),
}
}
}
}
#[cfg(test)]
mod tests {
use super::{CanceledRequests, Channel, RequestCancellation, RequestDispatch};
@@ -573,9 +801,10 @@ mod tests {
ClientMessage, Response,
};
use fnv::FnvHashMap;
use futures::{Poll, channel::mpsc, prelude::*};
use futures_test::task::{noop_local_waker_ref};
use futures::{channel::mpsc, prelude::*, Poll};
use futures_test::task::noop_local_waker_ref;
use std::{
marker,
net::{IpAddr, Ipv4Addr, SocketAddr},
pin::Pin,
sync::atomic::AtomicU64,
@@ -617,7 +846,8 @@ mod tests {
.send(context::current(), "hi".into())
.boxed()
.compat(),
).unwrap();
)
.unwrap();
drop(resp);
drop(channel);
@@ -640,7 +870,8 @@ mod tests {
.send(context::current(), "hi".into())
.boxed()
.compat(),
).unwrap();
)
.unwrap();
drop(resp);
drop(channel);
@@ -688,7 +919,7 @@ mod tests {
impl<T, E> PollTest for Poll<Option<Result<T, E>>>
where
E: ::std::fmt::Display + Send + 'static,
E: ::std::fmt::Display + marker::Send + 'static,
{
type T = Option<T>;

View File

@@ -6,27 +6,103 @@
//! Provides a client that connects to a server and sends multiplexed requests.
use crate::{context::Context, ClientMessage, Response, Transport};
use crate::{context, ClientMessage, Response, Transport};
use futures::prelude::*;
use log::warn;
use std::{
io,
net::{Ipv4Addr, SocketAddr},
};
mod dispatch;
/// Provides a [`Client`] backed by a transport.
pub mod channel;
pub use self::channel::Channel;
/// Sends multiplexed requests to, and receives responses from, a server.
#[derive(Debug)]
pub struct Client<Req, Resp> {
/// Channel to send requests to the dispatch task.
channel: dispatch::Channel<Req, Resp>,
pub trait Client<'a, Req> {
/// The response type.
type Response;
/// The future response.
type Future: Future<Output = io::Result<Self::Response>> + 'a;
/// Initiates a request, sending it to the dispatch task.
///
/// Returns a [`Future`] that resolves to this client and the future response
/// once the request is successfully enqueued.
///
/// [`Future`]: futures::Future
fn call(&'a mut self, ctx: context::Context, request: Req) -> Self::Future;
/// Returns a Client that applies a post-processing function to the returned response.
fn map_response<F, R>(self, f: F) -> MapResponse<Self, F>
where
F: FnMut(Self::Response) -> R,
Self: Sized,
{
MapResponse { inner: self, f }
}
/// Returns a Client that applies a pre-processing function to the request.
fn with_request<F, Req2>(self, f: F) -> WithRequest<Self, F>
where
F: FnMut(Req2) -> Req,
Self: Sized,
{
WithRequest { inner: self, f }
}
}
impl<Req, Resp> Clone for Client<Req, Resp> {
fn clone(&self) -> Self {
Client {
channel: self.channel.clone(),
}
/// A Client that applies a function to the returned response.
#[derive(Clone, Debug)]
pub struct MapResponse<C, F> {
inner: C,
f: F,
}
impl<'a, C, F, Req, Resp, Resp2> Client<'a, Req> for MapResponse<C, F>
where
C: Client<'a, Req, Response = Resp>,
F: FnMut(Resp) -> Resp2 + 'a,
{
type Response = Resp2;
type Future = futures::future::MapOk<<C as Client<'a, Req>>::Future, &'a mut F>;
fn call(&'a mut self, ctx: context::Context, request: Req) -> Self::Future {
self.inner.call(ctx, request).map_ok(&mut self.f)
}
}
/// A Client that applies a pre-processing function to the request.
#[derive(Clone, Debug)]
pub struct WithRequest<C, F> {
inner: C,
f: F,
}
impl<'a, C, F, Req, Req2, Resp> Client<'a, Req2> for WithRequest<C, F>
where
C: Client<'a, Req, Response = Resp>,
F: FnMut(Req2) -> Req,
{
type Response = Resp;
type Future = <C as Client<'a, Req>>::Future;
fn call(&'a mut self, ctx: context::Context, request: Req2) -> Self::Future {
self.inner.call(ctx, (self.f)(request))
}
}
impl<'a, Req, Resp> Client<'a, Req> for Channel<Req, Resp>
where
Req: 'a,
Resp: 'a,
{
type Response = Resp;
type Future = channel::Call<'a, Req, Resp>;
fn call(&'a mut self, ctx: context::Context, request: Req) -> channel::Call<'a, Req, Resp> {
self.call(ctx, request)
}
}
@@ -53,39 +129,23 @@ impl Default for Config {
}
}
impl<Req, Resp> Client<Req, Resp>
/// Creates a new Client by wrapping a [`Transport`] and spawning a dispatch task
/// that manages the lifecycle of requests.
///
/// Must only be called from on an executor.
pub async fn new<Req, Resp, T>(config: Config, transport: T) -> io::Result<Channel<Req, Resp>>
where
Req: Send,
Resp: Send,
Req: Send + 'static,
Resp: Send + 'static,
T: Transport<Item = Response<Resp>, SinkItem = ClientMessage<Req>> + Send + 'static,
{
/// Creates a new Client by wrapping a [`Transport`] and spawning a dispatch task
/// that manages the lifecycle of requests.
///
/// Must only be called from on an executor.
pub async fn new<T>(config: Config, transport: T) -> io::Result<Self>
where
T: Transport<Item = Response<Resp>, SinkItem = ClientMessage<Req>> + Send,
{
let server_addr = transport.peer_addr().unwrap_or_else(|e| {
warn!(
"Setting peer to unspecified because peer could not be determined: {}",
e
);
SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0)
});
let server_addr = transport.peer_addr().unwrap_or_else(|e| {
warn!(
"Setting peer to unspecified because peer could not be determined: {}",
e
);
SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0)
});
Ok(Client {
channel: await!(dispatch::spawn(config, transport, server_addr))?,
})
}
/// Initiates a request, sending it to the dispatch task.
///
/// Returns a [`Future`] that resolves to this client and the future response
/// once the request is successfully enqueued.
///
/// [`Future`]: futures::Future
pub async fn call(&mut self, ctx: Context, request: Req) -> io::Result<Resp> {
await!(self.channel.call(ctx, request))
}
Ok(await!(channel::spawn(config, transport, server_addr))?)
}

View File

@@ -5,21 +5,14 @@
// https://opensource.org/licenses/MIT.
#![feature(
const_fn,
non_exhaustive,
integer_atomics,
try_trait,
nll,
futures_api,
pin,
arbitrary_self_types,
await_macro,
async_await,
generators,
optin_builtin_traits,
generator_trait,
gen_future,
decl_macro,
)]
#![deny(missing_docs, missing_debug_implementations)]
@@ -49,7 +42,10 @@ pub(crate) mod util;
pub use crate::{client::Client, server::Server, transport::Transport};
use futures::{Future, task::{Spawn, SpawnExt, SpawnError}};
use futures::{
task::{Spawn, SpawnError, SpawnExt},
Future,
};
use std::{cell::RefCell, io, sync::Once, time::SystemTime};
/// A message from a client to a server.
@@ -193,9 +189,7 @@ pub fn init(spawn: impl Spawn + Clone + 'static) {
}
pub(crate) fn spawn(future: impl Future<Output = ()> + Send + 'static) -> Result<(), SpawnError> {
SPAWN.with(|spawn| {
spawn.borrow_mut().spawn(future)
})
SPAWN.with(|spawn| spawn.borrow_mut().spawn(future))
}
trait CloneSpawn: Spawn {

View File

@@ -10,7 +10,13 @@ use crate::{
ClientMessage, Response, Transport,
};
use fnv::FnvHashMap;
use futures::{channel::mpsc, prelude::*, ready, stream::Fuse, task::{LocalWaker, Poll}};
use futures::{
channel::mpsc,
prelude::*,
ready,
stream::Fuse,
task::{LocalWaker, Poll},
};
use log::{debug, error, info, trace, warn};
use pin_utils::unsafe_pinned;
use std::{
@@ -205,10 +211,7 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
}
}
fn poll_closed_connections(
self: &mut Pin<&mut Self>,
cx: &LocalWaker,
) -> Poll<io::Result<()>> {
fn poll_closed_connections(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
match ready!(self.closed_connections_rx().poll_next_unpin(cx)) {
Some(addr) => {
self.handle_closed_connection(&addr);

View File

@@ -43,6 +43,12 @@ pub struct Server<Req, Resp> {
ghost: PhantomData<(Req, Resp)>,
}
impl<Req, Resp> Default for Server<Req, Resp> {
fn default() -> Self {
new(Config::default())
}
}
/// Settings that control the behavior of the server.
#[non_exhaustive]
#[derive(Clone, Debug)]
@@ -75,15 +81,15 @@ impl Default for Config {
}
}
impl<Req, Resp> Server<Req, Resp> {
/// Returns a new server with configuration specified `config`.
pub fn new(config: Config) -> Self {
Server {
config,
ghost: PhantomData,
}
/// Returns a new server with configuration specified `config`.
pub fn new<Req, Resp>(config: Config) -> Server<Req, Resp> {
Server {
config,
ghost: PhantomData,
}
}
impl<Req, Resp> Server<Req, Resp> {
/// Returns the config for this server.
pub fn config(&self) -> &Config {
&self.config
@@ -122,7 +128,7 @@ where
Req: Send + 'static,
Resp: Send + 'static,
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send + 'static,
F: FnMut(Context, Req) -> Fut + Send + 'static + Clone,
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
{
type Output = ();
@@ -132,7 +138,8 @@ where
match channel {
Ok(channel) => {
let peer = channel.client_addr;
if let Err(e) = crate::spawn(channel.respond_with(self.request_handler().clone()))
if let Err(e) =
crate::spawn(channel.respond_with(self.request_handler().clone()))
{
warn!("[{}] Failed to spawn connection handler: {:?}", peer, e);
}
@@ -158,7 +165,7 @@ where
/// Responds to all requests with `request_handler`.
fn respond_with<F, Fut>(self, request_handler: F) -> Running<Self, F>
where
F: FnMut(Context, Req) -> Fut + Send + 'static + Clone,
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
{
Running {
@@ -222,21 +229,18 @@ where
Req: Send,
Resp: Send,
{
pub(crate) fn start_send(self: &mut Pin<&mut Self>, response: Response<Resp>) -> io::Result<()> {
pub(crate) fn start_send(
self: &mut Pin<&mut Self>,
response: Response<Resp>,
) -> io::Result<()> {
self.transport().start_send(response)
}
pub(crate) fn poll_ready(
self: &mut Pin<&mut Self>,
cx: &LocalWaker,
) -> Poll<io::Result<()>> {
pub(crate) fn poll_ready(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
self.transport().poll_ready(cx)
}
pub(crate) fn poll_flush(
self: &mut Pin<&mut Self>,
cx: &LocalWaker,
) -> Poll<io::Result<()>> {
pub(crate) fn poll_flush(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
self.transport().poll_flush(cx)
}
@@ -256,7 +260,7 @@ where
/// responses and resolves when the connection is closed.
pub fn respond_with<F, Fut>(self, f: F) -> impl Future<Output = ()>
where
F: FnMut(Context, Req) -> Fut + Send + 'static,
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
Req: 'static,
Resp: 'static,
@@ -271,7 +275,8 @@ where
pending_responses: responses,
responses_tx,
in_flight_requests: FnvHashMap::default(),
}.unwrap_or_else(move |e| {
}
.unwrap_or_else(move |e| {
info!("[{}] ClientHandler errored out: {}", peer, e);
})
}
@@ -305,7 +310,7 @@ where
Req: Send + 'static,
Resp: Send + 'static,
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
F: FnMut(Context, Req) -> Fut + Send + 'static,
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
{
/// If at max in-flight requests, check that there's room to immediately write a throttled
@@ -462,7 +467,7 @@ where
let mut response_tx = self.responses_tx().clone();
let trace_id = *ctx.trace_id();
let response = self.f()(ctx.clone(), request);
let response = self.f().clone()(ctx.clone(), request);
let response = deadline_compat::Deadline::new(response, Instant::now() + timeout).then(
async move |result| {
let response = Response {
@@ -477,16 +482,15 @@ where
},
);
let (abortable_response, abort_handle) = abortable(response);
crate::spawn(abortable_response.map(|_| ()))
.map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!(
"Could not spawn response task. Is shutdown: {}",
e.is_shutdown()
),
)
})?;
crate::spawn(abortable_response.map(|_| ())).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!(
"Could not spawn response task. Is shutdown: {}",
e.is_shutdown()
),
)
})?;
self.in_flight_requests().insert(request_id, abort_handle);
Ok(())
}
@@ -521,7 +525,7 @@ where
Req: Send + 'static,
Resp: Send + 'static,
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
F: FnMut(Context, Req) -> Fut + Send + 'static,
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
{
type Output = io::Result<()>;

View File

@@ -7,7 +7,7 @@
//! Transports backed by in-memory channels.
use crate::Transport;
use futures::{channel::mpsc, task::{LocalWaker}, Poll, Sink, Stream};
use futures::{channel::mpsc, task::LocalWaker, Poll, Sink, Stream};
use pin_utils::unsafe_pinned;
use std::pin::Pin;
use std::{
@@ -66,10 +66,7 @@ impl<Item, SinkItem> Sink for UnboundedChannel<Item, SinkItem> {
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &LocalWaker,
) -> Poll<Result<(), Self::SinkError>> {
fn poll_flush(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<Result<(), Self::SinkError>> {
self.tx()
.poll_flush(cx)
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
@@ -97,8 +94,12 @@ impl<Item, SinkItem> Transport for UnboundedChannel<Item, SinkItem> {
#[cfg(test)]
mod tests {
use crate::{client::{self, Client}, context, server::{self, Handler, Server}, transport};
use futures::{prelude::*, stream, compat::TokioDefaultSpawner};
use crate::{
client, context,
server::{Handler, Server},
transport,
};
use futures::{compat::TokioDefaultSpawner, prelude::*, stream};
use log::trace;
use std::io;
@@ -108,7 +109,7 @@ mod tests {
crate::init(TokioDefaultSpawner);
let (client_channel, server_channel) = transport::channel::unbounded();
let server = Server::<String, u64>::new(server::Config::default())
let server = Server::<String, u64>::default()
.incoming(stream::once(future::ready(Ok(server_channel))))
.respond_with(|_ctx, request| {
future::ready(request.parse::<u64>().map_err(|_| {
@@ -120,7 +121,7 @@ mod tests {
});
let responses = async {
let mut client = await!(Client::new(client::Config::default(), client_channel))?;
let mut client = await!(client::new(client::Config::default(), client_channel))?;
let response1 = await!(client.call(context::current(), "123".into()));
let response2 = await!(client.call(context::current(), "abc".into()));

View File

@@ -10,7 +10,12 @@
//! can be plugged in, using whatever protocol it wants.
use futures::prelude::*;
use std::{io, net::SocketAddr};
use std::{
io,
net::SocketAddr,
pin::Pin,
task::{LocalWaker, Poll},
};
pub mod channel;
@@ -30,3 +35,87 @@ where
/// The address of the local half of this transport.
fn local_addr(&self) -> io::Result<SocketAddr>;
}
/// Returns a new Transport backed by the given Stream + Sink and connecting addresses.
pub fn new<S, Item>(
inner: S,
peer_addr: SocketAddr,
local_addr: SocketAddr,
) -> impl Transport<Item = Item, SinkItem = S::SinkItem>
where
S: Stream<Item = io::Result<Item>>,
S: Sink<SinkError = io::Error>,
{
TransportShim {
inner,
peer_addr,
local_addr,
}
}
/// A transport created by adding peers to a Stream + Sink.
#[derive(Debug)]
struct TransportShim<S> {
peer_addr: SocketAddr,
local_addr: SocketAddr,
inner: S,
}
impl<S> TransportShim<S> {
pin_utils::unsafe_pinned!(inner: S);
}
impl<S> Stream for TransportShim<S>
where
S: Stream,
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<S::Item>> {
self.inner().poll_next(waker)
}
}
impl<S> Sink for TransportShim<S>
where
S: Sink,
{
type SinkItem = S::SinkItem;
type SinkError = S::SinkError;
fn start_send(mut self: Pin<&mut Self>, item: S::SinkItem) -> Result<(), S::SinkError> {
self.inner().start_send(item)
}
fn poll_ready(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
self.inner().poll_ready(waker)
}
fn poll_flush(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
self.inner().poll_flush(waker)
}
fn poll_close(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
self.inner().poll_close(waker)
}
}
impl<S, Item> Transport for TransportShim<S>
where
S: Stream + Sink,
Self: Stream<Item = io::Result<Item>>,
Self: Sink<SinkItem = S::SinkItem, SinkError = io::Error>,
{
type Item = Item;
type SinkItem = S::SinkItem;
/// The address of the remote peer this transport is in communication with.
fn peer_addr(&self) -> io::Result<SocketAddr> {
Ok(self.peer_addr)
}
/// The address of the local half of this transport.
fn local_addr(&self) -> io::Result<SocketAddr> {
Ok(self.local_addr)
}
}

View File

@@ -7,7 +7,8 @@
use futures::{
compat::{Compat01As03, Future01CompatExt},
prelude::*,
ready, task::{Poll, LocalWaker},
ready,
task::{LocalWaker, Poll},
};
use pin_utils::unsafe_pinned;
use std::pin::Pin;
@@ -50,7 +51,6 @@ where
type Output = Result<T::Ok, timeout::Error<T::Error>>;
fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Self::Output> {
// First, try polling the future
match self.future().try_poll(waker) {
Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)),

View File

@@ -59,7 +59,8 @@ where
Other => 16,
UnexpectedEof => 17,
_ => 16,
}.serialize(serializer)
}
.serialize(serializer)
}
/// Deserializes [`io::ErrorKind`] from a `u32`.

View File

@@ -2,7 +2,7 @@ cargo-features = ["rename-dependency"]
[package]
name = "tarpc"
version = "0.13.0"
version = "0.14.0"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
edition = "2018"
license = "MIT"
@@ -24,15 +24,19 @@ travis-ci = { repository = "google/tarpc" }
log = "0.4"
serde = { optional = true, version = "1.0" }
tarpc-plugins = { path = "../plugins", version = "0.5.0" }
rpc = { package = "tarpc-lib", path = "../rpc", version = "0.1" }
rpc = { package = "tarpc-lib", path = "../rpc", version = "0.2" }
[target.'cfg(not(test))'.dependencies]
futures-preview = "0.3.0-alpha.8"
futures-preview = "0.3.0-alpha.9"
[dev-dependencies]
bincode = "1.0"
bytes = { version = "0.4", features = ["serde"] }
humantime = "1.0"
futures-preview = { version = "0.3.0-alpha.8", features = ["compat", "tokio-compat"] }
bincode-transport = { package = "tarpc-bincode-transport", version = "0.1", path = "../bincode-transport" }
futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] }
bincode-transport = { package = "tarpc-bincode-transport", version = "0.3", path = "../bincode-transport" }
env_logger = "0.5"
tokio = "0.1"
tokio-executor = "0.1"
tokio-tcp = "0.1"
pin-utils = "0.1.0-alpha.3"

View File

@@ -11,7 +11,7 @@
await_macro,
async_await,
existential_type,
proc_macro_hygiene,
proc_macro_hygiene
)]
use futures::{
@@ -55,7 +55,7 @@ struct Subscriber {
impl subscriber::Service for Subscriber {
type ReceiveFut = Ready<()>;
fn receive(&self, _: context::Context, message: String) -> Self::ReceiveFut {
fn receive(self, _: context::Context, message: String) -> Self::ReceiveFut {
println!("{} received message: {}", self.id, message);
future::ready(())
}
@@ -66,13 +66,13 @@ impl Subscriber {
let incoming = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = incoming.local_addr();
tokio_executor::spawn(
Server::new(config)
server::new(config)
.incoming(incoming)
.take(1)
.respond_with(subscriber::serve(Subscriber { id }))
.unit_error()
.boxed()
.compat()
.compat(),
);
Ok(addr)
}
@@ -94,7 +94,7 @@ impl Publisher {
impl publisher::Service for Publisher {
existential type BroadcastFut: Future<Output = ()>;
fn broadcast(&self, _: context::Context, message: String) -> Self::BroadcastFut {
fn broadcast(self, _: context::Context, message: String) -> Self::BroadcastFut {
async fn broadcast(clients: Arc<Mutex<HashMap<u32, subscriber::Client>>>, message: String) {
let mut clients = clients.lock().unwrap().clone();
for client in clients.values_mut() {
@@ -110,7 +110,7 @@ impl publisher::Service for Publisher {
existential type SubscribeFut: Future<Output = Result<(), String>>;
fn subscribe(&self, _: context::Context, id: u32, addr: SocketAddr) -> Self::SubscribeFut {
fn subscribe(self, _: context::Context, id: u32, addr: SocketAddr) -> Self::SubscribeFut {
async fn subscribe(
clients: Arc<Mutex<HashMap<u32, subscriber::Client>>>,
id: u32,
@@ -128,7 +128,7 @@ impl publisher::Service for Publisher {
existential type UnsubscribeFut: Future<Output = ()>;
fn unsubscribe(&self, _: context::Context, id: u32) -> Self::UnsubscribeFut {
fn unsubscribe(self, _: context::Context, id: u32) -> Self::UnsubscribeFut {
println!("Unsubscribing {}", id);
let mut clients = self.clients.lock().unwrap();
if let None = clients.remove(&id) {
@@ -146,13 +146,13 @@ async fn run() -> io::Result<()> {
let transport = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let publisher_addr = transport.local_addr();
tokio_executor::spawn(
Server::new(server::Config::default())
Server::default()
.incoming(transport)
.take(1)
.respond_with(publisher::serve(Publisher::new()))
.unit_error()
.boxed()
.compat()
.compat(),
);
let subscriber1 = await!(Subscriber::listen(0, server::Config::default()))?;
@@ -180,12 +180,6 @@ async fn run() -> io::Result<()> {
}
fn main() {
tokio::run(
run()
.boxed()
.map_err(|e| panic!(e))
.boxed()
.compat(),
);
tokio::run(run().boxed().map_err(|e| panic!(e)).boxed().compat());
thread::sleep(Duration::from_millis(100));
}

View File

@@ -10,16 +10,17 @@
arbitrary_self_types,
await_macro,
async_await,
proc_macro_hygiene,
proc_macro_hygiene
)]
use futures::{
compat::TokioDefaultSpawner,
future::{self, Ready},
prelude::*,
};
use rpc::{
client, context,
server::{self, Handler, Server},
server::{Handler, Server},
};
use std::io;
@@ -41,7 +42,7 @@ impl Service for HelloServer {
type HelloFut = Ready<String>;
fn hello(&self, _: context::Context, name: String) -> Self::HelloFut {
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
future::ready(format!("Hello, {}!", name))
}
}
@@ -53,7 +54,7 @@ async fn run() -> io::Result<()> {
let addr = transport.local_addr();
// The server is configured with the defaults.
let server = Server::new(server::Config::default())
let server = Server::default()
// Server can listen on any type that implements the Transport trait.
.incoming(transport)
// Close the stream after the client connects
@@ -82,6 +83,8 @@ async fn run() -> io::Result<()> {
}
fn main() {
tarpc::init(TokioDefaultSpawner);
tokio::run(
run()
.map_err(|e| eprintln!("Oh no: {}", e))

View File

@@ -11,17 +11,18 @@
futures_api,
await_macro,
async_await,
proc_macro_hygiene,
proc_macro_hygiene
)]
use crate::{add::Service as AddService, double::Service as DoubleService};
use futures::{
compat::TokioDefaultSpawner,
future::{self, Ready},
prelude::*,
};
use rpc::{
client, context,
server::{self, Handler, Server},
server::{Handler, Server},
};
use std::io;
@@ -45,7 +46,7 @@ struct AddServer;
impl AddService for AddServer {
type AddFut = Ready<i32>;
fn add(&self, _: context::Context, x: i32, y: i32) -> Self::AddFut {
fn add(self, _: context::Context, x: i32, y: i32) -> Self::AddFut {
future::ready(x + y)
}
}
@@ -58,7 +59,7 @@ struct DoubleServer {
impl DoubleService for DoubleServer {
existential type DoubleFut: Future<Output = Result<i32, String>> + Send;
fn double(&self, _: context::Context, x: i32) -> Self::DoubleFut {
fn double(self, _: context::Context, x: i32) -> Self::DoubleFut {
async fn double(mut client: add::Client, x: i32) -> Result<i32, String> {
let result = await!(client.add(context::current(), x, x));
result.map_err(|e| e.to_string())
@@ -71,7 +72,7 @@ impl DoubleService for DoubleServer {
async fn run() -> io::Result<()> {
let add_listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = add_listener.local_addr();
let add_server = Server::new(server::Config::default())
let add_server = Server::default()
.incoming(add_listener)
.take(1)
.respond_with(add::serve(AddServer));
@@ -82,7 +83,7 @@ async fn run() -> io::Result<()> {
let double_listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = double_listener.local_addr();
let double_server = rpc::Server::new(server::Config::default())
let double_server = rpc::Server::default()
.incoming(double_listener)
.take(1)
.respond_with(double::serve(DoubleServer { add_client }));
@@ -102,10 +103,6 @@ async fn run() -> io::Result<()> {
fn main() {
env_logger::init();
tokio::run(
run()
.map_err(|e| panic!(e))
.boxed()
.compat(),
);
tarpc::init(TokioDefaultSpawner);
tokio::run(run().map_err(|e| panic!(e)).boxed().compat());
}

View File

@@ -0,0 +1,411 @@
#![feature(
pin,
async_await,
await_macro,
futures_api,
arbitrary_self_types,
proc_macro_hygiene,
impl_trait_in_bindings
)]
mod registry {
use bytes::Bytes;
use futures::{
future::{ready, Ready},
prelude::*,
};
use serde::{Deserialize, Serialize};
use std::{
io,
pin::Pin,
sync::Arc,
task::{LocalWaker, Poll},
};
use tarpc::{
client::{self, Client},
context,
};
/// A request to a named service.
#[derive(Serialize, Deserialize)]
pub struct ServiceRequest {
service_name: String,
request: Bytes,
}
/// A response from a named service.
#[derive(Serialize, Deserialize)]
pub struct ServiceResponse {
response: Bytes,
}
/// A list of registered services.
pub struct Registry<Services> {
registrations: Services,
}
impl Default for Registry<Nil> {
fn default() -> Self {
Registry { registrations: Nil }
}
}
impl<Services: MaybeServe + Sync> Registry<Services> {
/// Returns a function that serves requests for the registered services.
pub fn serve(
self,
) -> impl FnOnce(context::Context, ServiceRequest)
-> Either<Services::Future, Ready<io::Result<ServiceResponse>>>
+ Clone {
let registrations = Arc::new(self.registrations);
move |cx, req: ServiceRequest| match registrations.serve(cx, &req) {
Some(serve) => Either::Left(serve),
None => Either::Right(ready(Err(io::Error::new(
io::ErrorKind::NotFound,
format!("Service '{}' not registered", req.service_name),
)))),
}
}
/// Registers `serve` with the given `name` using the given serialization scheme.
pub fn register<S, Req, Resp, RespFut, Ser, De>(
self,
name: String,
serve: S,
deserialize: De,
serialize: Ser,
) -> Registry<Registration<impl Serve + Send + 'static, Services>>
where
Req: Send,
S: FnOnce(context::Context, Req) -> RespFut + Send + 'static + Clone,
RespFut: Future<Output = io::Result<Resp>> + Send + 'static,
De: FnOnce(Bytes) -> io::Result<Req> + Send + 'static + Clone,
Ser: FnOnce(Resp) -> io::Result<Bytes> + Send + 'static + Clone,
{
let registrations = Registration {
name: name,
serve: move |cx, req: Bytes| {
async move {
let req = deserialize.clone()(req)?;
let response = await!(serve.clone()(cx, req))?;
let response = serialize.clone()(response)?;
Ok(ServiceResponse { response })
}
},
rest: self.registrations,
};
Registry { registrations }
}
}
/// Creates a client that sends requests to a service
/// named `service_name`, over the given channel, using
/// the specified serialization scheme.
pub fn new_client<Req, Resp, Ser, De>(
service_name: String,
channel: &client::Channel<ServiceRequest, ServiceResponse>,
mut serialize: Ser,
mut deserialize: De,
) -> client::MapResponse<
client::WithRequest<
client::Channel<ServiceRequest, ServiceResponse>,
impl FnMut(Req) -> ServiceRequest,
>,
impl FnMut(ServiceResponse) -> Resp,
>
where
Req: Send + 'static,
Resp: Send + 'static,
De: FnMut(Bytes) -> io::Result<Resp> + Clone + Send + 'static,
Ser: FnMut(Req) -> io::Result<Bytes> + Clone + Send + 'static,
{
channel
.clone()
.with_request(move |req| {
ServiceRequest {
service_name: service_name.clone(),
// TODO: shouldn't need to unwrap here. Maybe with_request should allow for
// returning Result.
request: serialize(req).unwrap(),
}
})
// TODO: same thing. Maybe this should be more like and_then rather than map.
.map_response(move |resp| deserialize(resp.response).unwrap())
}
/// Serves a request.
///
/// This trait is mostly an implementation detail that isn't used outside of the registry
/// internals.
pub trait Serve: Clone + Send + 'static {
type Response: Future<Output = io::Result<ServiceResponse>> + Send + 'static;
fn serve(self, cx: context::Context, request: Bytes) -> Self::Response;
}
/// Serves a request if the request is for a registered service.
///
/// This trait is mostly an implementation detail that isn't used outside of the registry
/// internals.
pub trait MaybeServe: Send + 'static {
type Future: Future<Output = io::Result<ServiceResponse>> + Send + 'static;
fn serve(&self, cx: context::Context, request: &ServiceRequest) -> Option<Self::Future>;
}
/// A registry starting with service S, followed by Rest.
///
/// This type is mostly an implementation detail that is not used directly
/// outside of the registry internals.
pub struct Registration<S, Rest> {
/// The registered service's name. Must be unique across all registered services.
name: String,
/// The registered service.
serve: S,
/// Any remaining registered services.
rest: Rest,
}
/// An empty registry.
///
/// This type is mostly an implementation detail that is not used directly
/// outside of the registry internals.
pub struct Nil;
impl MaybeServe for Nil {
type Future = futures::future::Ready<io::Result<ServiceResponse>>;
fn serve(&self, _: context::Context, _: &ServiceRequest) -> Option<Self::Future> {
None
}
}
impl<S, Rest> MaybeServe for Registration<S, Rest>
where
S: Serve,
Rest: MaybeServe,
{
type Future = Either<S::Response, Rest::Future>;
fn serve(&self, cx: context::Context, request: &ServiceRequest) -> Option<Self::Future> {
if self.name == request.service_name {
Some(Either::Left(
self.serve.clone().serve(cx, request.request.clone()),
))
} else {
self.rest.serve(cx, request).map(Either::Right)
}
}
}
/// Wraps either of two future types that both resolve to the same output type.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub enum Either<Left, Right> {
Left(Left),
Right(Right),
}
impl<Output, Left, Right> Future for Either<Left, Right>
where
Left: Future<Output = Output>,
Right: Future<Output = Output>,
{
type Output = Output;
fn poll(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Output> {
unsafe {
match Pin::get_mut_unchecked(self) {
Either::Left(car) => Pin::new_unchecked(car).poll(waker),
Either::Right(cdr) => Pin::new_unchecked(cdr).poll(waker),
}
}
}
}
impl<Resp, F> Serve for F
where
F: FnOnce(context::Context, Bytes) -> Resp + Clone + Send + 'static,
Resp: Future<Output = io::Result<ServiceResponse>> + Send + 'static,
{
type Response = Resp;
fn serve(self, cx: context::Context, request: Bytes) -> Resp {
self(cx, request)
}
}
}
// Example
use bytes::Bytes;
use futures::{
future::{ready, Ready},
prelude::*,
};
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
io,
sync::{Arc, RwLock},
};
use tarpc::{client, context, server::Handler};
fn deserialize<Req>(req: Bytes) -> io::Result<Req>
where
Req: for<'a> Deserialize<'a> + Send,
{
bincode::deserialize(req.as_ref()).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
fn serialize<Resp>(resp: Resp) -> io::Result<Bytes>
where
Resp: Serialize,
{
Ok(bincode::serialize(&resp)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
.into())
}
mod write_service {
tarpc::service! {
rpc write(key: String, value: String);
}
}
mod read_service {
tarpc::service! {
rpc read(key: String) -> Option<String>;
}
}
#[derive(Debug, Default, Clone)]
struct Server {
data: Arc<RwLock<HashMap<String, String>>>,
}
impl write_service::Service for Server {
type WriteFut = Ready<()>;
fn write(self, _: context::Context, key: String, value: String) -> Self::WriteFut {
self.data.write().unwrap().insert(key, value);
ready(())
}
}
impl read_service::Service for Server {
type ReadFut = Ready<Option<String>>;
fn read(self, _: context::Context, key: String) -> Self::ReadFut {
ready(self.data.read().unwrap().get(&key).cloned())
}
}
trait DefaultSpawn {
fn spawn(self);
}
impl<F> DefaultSpawn for F
where
F: Future<Output = ()> + Send + 'static,
{
fn spawn(self) {
tokio_executor::spawn(self.unit_error().boxed().compat())
}
}
struct BincodeRegistry<Services> {
registry: registry::Registry<Services>,
}
impl Default for BincodeRegistry<registry::Nil> {
fn default() -> Self {
BincodeRegistry {
registry: registry::Registry::default(),
}
}
}
impl<Services: registry::MaybeServe + Sync> BincodeRegistry<Services> {
fn serve(
self,
) -> impl FnOnce(
context::Context, registry::ServiceRequest
) -> registry::Either<
Services::Future,
Ready<io::Result<registry::ServiceResponse>>,
> + Clone {
self.registry.serve()
}
fn register<S, Req, Resp, RespFut>(
self,
name: String,
serve: S,
) -> BincodeRegistry<registry::Registration<impl registry::Serve + Send + 'static, Services>>
where
Req: for<'a> Deserialize<'a> + Send + 'static,
Resp: Serialize + 'static,
S: FnOnce(context::Context, Req) -> RespFut + Send + 'static + Clone,
RespFut: Future<Output = io::Result<Resp>> + Send + 'static,
{
let registry = self.registry.register(name, serve, deserialize, serialize);
BincodeRegistry { registry }
}
}
pub fn new_client<Req, Resp>(
service_name: String,
channel: &client::Channel<registry::ServiceRequest, registry::ServiceResponse>,
) -> client::MapResponse<
client::WithRequest<
client::Channel<registry::ServiceRequest, registry::ServiceResponse>,
impl FnMut(Req) -> registry::ServiceRequest,
>,
impl FnMut(registry::ServiceResponse) -> Resp,
>
where
Req: Serialize + Send + 'static,
Resp: for<'a> Deserialize<'a> + Send + 'static,
{
registry::new_client(service_name, channel, serialize, deserialize)
}
async fn run() -> io::Result<()> {
let server = Server::default();
let registry = BincodeRegistry::default()
.register(
"WriteService".to_string(),
write_service::serve(server.clone()),
)
.register(
"ReadService".to_string(),
read_service::serve(server.clone()),
);
let listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let server_addr = listener.local_addr();
let server = tarpc::Server::default()
.incoming(listener)
.take(1)
.respond_with(registry.serve());
tokio_executor::spawn(server.unit_error().boxed().compat());
let transport = await!(bincode_transport::connect(&server_addr))?;
let channel = await!(client::new(client::Config::default(), transport))?;
let write_client = new_client("WriteService".to_string(), &channel);
let mut write_client = write_service::Client::from(write_client);
let read_client = new_client("ReadService".to_string(), &channel);
let mut read_client = read_service::Client::from(read_client);
await!(write_client.write(context::current(), "key".to_string(), "val".to_string()))?;
let val = await!(read_client.read(context::current(), "key".to_string()))?;
println!("{:?}", val);
Ok(())
}
fn main() {
tarpc::init(futures::compat::TokioDefaultSpawner);
tokio::run(run().boxed().map_err(|e| panic!(e)).boxed().compat());
}

View File

@@ -1 +1 @@
edition = "Edition2018"
edition = "2018"

View File

@@ -4,10 +4,21 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
//! ## tarpc: Tim & Adam's RPC lib
//! [![Travis-CI Status](https://travis-ci.org/google/tarpc.png?branch=master)](https://travis-ci.org/google/tarpc)
//! [![Coverage Status](https://coveralls.io/repos/github/google/tarpc/badge.svg?branch=master)](https://coveralls.io/github/google/tarpc?branch=master)
//! [![Software License](https://img.shields.io/badge/license-MIT-brightgreen.svg)](LICENSE)
//! [![Latest Version](https://img.shields.io/crates/v/tarpc.svg)](https://crates.io/crates/tarpc)
//! [![Join the chat at https://gitter.im/tarpc/Lobby](https://badges.gitter.im/tarpc/Lobby.svg)](https://gitter.im/tarpc/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
//!
//! *Disclaimer*: This is not an official Google product.
//!
//! tarpc is an RPC framework for rust with a focus on ease of use. Defining a
//! service can be done in just a few lines of code, and most of the boilerplate of
//! writing a server is taken care of for you.
//!
//! [Documentation](https://docs.rs/crate/tarpc/)
//!
//! ## What is an RPC framework?
//! "RPC" stands for "Remote Procedure Call," a function call where the work of
//! producing the return value is being done somewhere else. When an rpc function is
@@ -25,6 +36,20 @@
//! works with the community-backed library serde: any serde-serializable type can be used as
//! arguments to tarpc fns.
//!
//! ## Usage
//! Add to your `Cargo.toml` dependencies:
//!
//! ```toml
//! tarpc = "0.14.0"
//! ```
//!
//! The `service!` macro expands to a collection of items that form an
//! rpc service. In the above example, the macro is called within the
//! `hello_service` module. This module will contain a `Client` stub and `Service` trait. There is
//! These generated types make it easy and ergonomic to write servers without dealing with serialization
//! directly. Simply implement one of the generated traits, and you're off to the
//! races!
//!
//! ## Example
//!
//! Here's a small service.
@@ -40,7 +65,7 @@
//! };
//! use tarpc::{
//! client, context,
//! server::{self, Handler, Server},
//! server::{self, Handler},
//! };
//! use std::io;
//!
@@ -62,7 +87,7 @@
//!
//! type HelloFut = Ready<String>;
//!
//! fn hello(&self, _: context::Context, name: String) -> Self::HelloFut {
//! fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
//! future::ready(format!("Hello, {}!", name))
//! }
//! }
@@ -74,7 +99,7 @@
//! let addr = transport.local_addr();
//!
//! // The server is configured with the defaults.
//! let server = Server::new(server::Config::default())
//! let server = server::new(server::Config::default())
//! // Server can listen on any type that implements the Transport trait.
//! .incoming(transport)
//! // Close the stream after the client connects
@@ -111,16 +136,24 @@
//! );
//! }
//! ```
//!
//! ## Service Documentation
//!
//! Use `cargo doc` as you normally would to see the documentation created for all
//! items expanded by a `service!` invocation.
#![deny(missing_docs, missing_debug_implementations)]
#![feature(
futures_api,
pin,
await_macro,
async_await,
decl_macro,
#![feature(async_await)]
#![cfg_attr(
test,
feature(
pin,
futures_api,
await_macro,
proc_macro_hygiene,
arbitrary_self_types
)
)]
#![cfg_attr(test, feature(proc_macro_hygiene, arbitrary_self_types))]
#[doc(hidden)]
pub use futures;

View File

@@ -51,6 +51,9 @@ macro_rules! add_serde_if_enabled {
///
#[macro_export]
macro_rules! service {
() => {
compile_error!("Must define at least one RPC method.");
};
// Entry point
(
$(
@@ -112,24 +115,26 @@ macro_rules! service {
)*
) => {
$crate::add_serde_if_enabled! {
/// The request sent over the wire from the client to the server.
#[derive(Debug)]
#[doc(hidden)]
#[allow(non_camel_case_types, unused)]
--
pub enum Request__ {
pub enum Request {
$(
$(#[$attr])*
$fn_name{ $($arg: $in_,)* }
),*
}
}
$crate::add_serde_if_enabled! {
/// The response sent over the wire from the server to the client.
#[derive(Debug)]
#[doc(hidden)]
#[allow(non_camel_case_types, unused)]
--
pub enum Response__ {
pub enum Response {
$(
$(#[$attr])*
$fn_name($out)
),*
}
@@ -149,37 +154,39 @@ macro_rules! service {
}
$(#[$attr])*
fn $fn_name(&self, ctx: $crate::context::Context, $($arg:$in_),*) -> $crate::ty_snake_to_camel!(Self::$fn_name);
fn $fn_name(self, ctx: $crate::context::Context, $($arg:$in_),*) -> $crate::ty_snake_to_camel!(Self::$fn_name);
)*
}
// TODO: use an existential type instead of this when existential types work.
/// A future resolving to a server [`Response`].
#[allow(non_camel_case_types)]
pub enum Response<S: Service> {
pub enum ResponseFut<S: Service> {
$(
$(#[$attr])*
$fn_name($crate::ty_snake_to_camel!(<S as Service>::$fn_name)),
)*
}
impl<S: Service> ::std::fmt::Debug for Response<S> {
impl<S: Service> ::std::fmt::Debug for ResponseFut<S> {
fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
fmt.debug_struct("Response").finish()
}
}
impl<S: Service> ::std::future::Future for Response<S> {
type Output = ::std::io::Result<Response__>;
impl<S: Service> ::std::future::Future for ResponseFut<S> {
type Output = ::std::io::Result<Response>;
fn poll(self: ::std::pin::Pin<&mut Self>, waker: &::std::task::LocalWaker)
-> ::std::task::Poll<::std::io::Result<Response__>>
-> ::std::task::Poll<::std::io::Result<Response>>
{
unsafe {
match ::std::pin::Pin::get_mut_unchecked(self) {
$(
Response::$fn_name(resp) =>
ResponseFut::$fn_name(resp) =>
::std::pin::Pin::new_unchecked(resp)
.poll(waker)
.map(Response__::$fn_name)
.map(Response::$fn_name)
.map(Ok),
)*
}
@@ -189,13 +196,13 @@ macro_rules! service {
/// Returns a serving function to use with rpc::server::Server.
pub fn serve<S: Service>(service: S)
-> impl FnMut($crate::context::Context, Request__) -> Response<S> + Send + 'static + Clone {
-> impl FnOnce($crate::context::Context, Request) -> ResponseFut<S> + Send + 'static + Clone {
move |ctx, req| {
match req {
$(
Request__::$fn_name{ $($arg,)* } => {
let resp = Service::$fn_name(&mut service.clone(), ctx, $($arg),*);
Response::$fn_name(resp)
Request::$fn_name{ $($arg,)* } => {
let resp = Service::$fn_name(service.clone(), ctx, $($arg),*);
ResponseFut::$fn_name(resp)
}
)*
}
@@ -205,30 +212,40 @@ macro_rules! service {
#[allow(unused)]
#[derive(Clone, Debug)]
/// The client stub that makes RPC calls to the server. Exposes a Future interface.
pub struct Client($crate::client::Client<Request__, Response__>);
pub struct Client<C = $crate::client::Channel<Request, Response>>(C);
/// Returns a new client stub that sends requests over the given transport.
pub async fn new_stub<T>(config: $crate::client::Config, transport: T)
-> ::std::io::Result<Client>
where
T: $crate::Transport<
Item = $crate::Response<Response__>,
SinkItem = $crate::ClientMessage<Request__>> + Send,
Item = $crate::Response<Response>,
SinkItem = $crate::ClientMessage<Request>> + Send + 'static,
{
Ok(Client(await!($crate::client::Client::new(config, transport))?))
Ok(Client(await!($crate::client::new(config, transport))?))
}
impl Client {
impl<C> From<C> for Client<C>
where for <'a> C: $crate::Client<'a, Request, Response = Response>
{
fn from(client: C) -> Self {
Client(client)
}
}
impl<C> Client<C>
where for<'a> C: $crate::Client<'a, Request, Response = Response>
{
$(
#[allow(unused)]
$(#[$attr])*
pub fn $fn_name(&mut self, ctx: $crate::context::Context, $($arg: $in_),*)
-> impl ::std::future::Future<Output = ::std::io::Result<$out>> + '_ {
let request__ = Request__::$fn_name { $($arg,)* };
let resp = self.0.call(ctx, request__);
let request__ = Request::$fn_name { $($arg,)* };
let resp = $crate::Client::call(&mut self.0, ctx, request__);
async move {
match await!(resp)? {
Response__::$fn_name(msg__) => ::std::result::Result::Ok(msg__),
Response::$fn_name(msg__) => ::std::result::Result::Ok(msg__),
_ => unreachable!(),
}
}
@@ -269,11 +286,7 @@ mod functional_test {
future::{ready, Ready},
prelude::*,
};
use rpc::{
client, context,
server::{self, Handler},
transport::channel,
};
use rpc::{client, context, server::Handler, transport::channel};
use std::io;
use tokio::runtime::current_thread;
@@ -288,13 +301,13 @@ mod functional_test {
impl Service for Server {
type AddFut = Ready<i32>;
fn add(&self, _: context::Context, x: i32, y: i32) -> Self::AddFut {
fn add(self, _: context::Context, x: i32, y: i32) -> Self::AddFut {
ready(x + y)
}
type HeyFut = Ready<String>;
fn hey(&self, _: context::Context, name: String) -> Self::HeyFut {
fn hey(self, _: context::Context, name: String) -> Self::HeyFut {
ready(format!("Hey, {}.", name))
}
}
@@ -307,12 +320,12 @@ mod functional_test {
let test = async {
let (tx, rx) = channel::unbounded();
tokio_executor::spawn(
rpc::Server::new(server::Config::default())
crate::Server::default()
.incoming(stream::once(ready(Ok(rx))))
.respond_with(serve(Server))
.unit_error()
.boxed()
.compat()
.compat(),
);
let mut client = await!(new_stub(client::Config::default(), tx))?;
@@ -336,12 +349,12 @@ mod functional_test {
let test = async {
let (tx, rx) = channel::unbounded();
tokio_executor::spawn(
rpc::Server::new(server::Config::default())
rpc::Server::default()
.incoming(stream::once(ready(Ok(rx))))
.respond_with(serve(Server))
.unit_error()
.boxed()
.compat()
.compat(),
);
let client = await!(new_stub(client::Config::default(), tx))?;

View File

@@ -13,7 +13,7 @@
generators,
await_macro,
async_await,
proc_macro_hygiene,
proc_macro_hygiene
)]
extern crate test;
@@ -22,7 +22,7 @@ use self::test::stats::Stats;
use futures::{compat::TokioDefaultSpawner, future, prelude::*};
use rpc::{
client, context,
server::{self, Handler, Server},
server::{Handler, Server},
};
use std::{
io,
@@ -41,7 +41,7 @@ struct Serve;
impl ack::Service for Serve {
type AckFut = future::Ready<()>;
fn ack(&self, _: context::Context) -> Self::AckFut {
fn ack(self, _: context::Context) -> Self::AckFut {
future::ready(())
}
}
@@ -51,13 +51,13 @@ async fn bench() -> io::Result<()> {
let addr = listener.local_addr();
tokio_executor::spawn(
Server::new(server::Config::default())
Server::default()
.incoming(listener)
.take(1)
.respond_with(ack::serve(Serve))
.unit_error()
.boxed()
.compat()
.compat(),
);
let conn = await!(bincode_transport::connect(&addr))?;
@@ -122,10 +122,5 @@ fn bench_small_packet() {
env_logger::init();
tarpc::init(TokioDefaultSpawner);
tokio::run(
bench()
.map_err(|e| panic!(e.to_string()))
.boxed()
.compat(),
)
tokio::run(bench().map_err(|e| panic!(e.to_string())).boxed().compat())
}

View File

@@ -1 +1 @@
edition = "Edition2018"
edition = "2018"