mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-02-23 15:49:54 +01:00
Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2c7c64841f | ||
|
|
4ea142d0f3 | ||
|
|
00751d2518 | ||
|
|
4394a52b65 | ||
|
|
70938501d7 | ||
|
|
d5f5cf4300 | ||
|
|
e2c4164d8c | ||
|
|
78124ef7a8 | ||
|
|
096d354b7e | ||
|
|
7ad0e4b070 | ||
|
|
64755d5329 | ||
|
|
3071422132 | ||
|
|
8847330dbe | ||
|
|
6d396520f4 | ||
|
|
79a2f7fe2f | ||
|
|
af66841f68 | ||
|
|
1ab4cfdff9 | ||
|
|
f7e03eeeb7 |
@@ -9,4 +9,5 @@ os:
|
||||
- linux
|
||||
|
||||
script:
|
||||
- cargo test --all --all-features
|
||||
- cargo test --all-targets --all-features
|
||||
- cargo test --doc --all-features
|
||||
|
||||
41
README.md
41
README.md
@@ -1,6 +1,5 @@
|
||||
## tarpc: Tim & Adam's RPC lib
|
||||
[](https://travis-ci.org/google/tarpc)
|
||||
[](https://coveralls.io/github/google/tarpc?branch=master)
|
||||
[](LICENSE)
|
||||
[](https://crates.io/crates/tarpc)
|
||||
[](https://gitter.im/tarpc/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
|
||||
@@ -31,17 +30,12 @@ works with the community-backed library serde: any serde-serializable type can b
|
||||
arguments to tarpc fns.
|
||||
|
||||
## Usage
|
||||
**NB**: *this example is for master. Are you looking for other
|
||||
[versions](https://docs.rs/tarpc)?*
|
||||
|
||||
Add to your `Cargo.toml` dependencies:
|
||||
|
||||
```toml
|
||||
tarpc = "0.12.0"
|
||||
tarpc-plugins = "0.4.0"
|
||||
tarpc = "0.14.0"
|
||||
```
|
||||
|
||||
|
||||
The `service!` macro expands to a collection of items that form an
|
||||
rpc service. In the above example, the macro is called within the
|
||||
`hello_service` module. This module will contain a `Client` stub and `Service` trait. There is
|
||||
@@ -49,29 +43,29 @@ These generated types make it easy and ergonomic to write servers without dealin
|
||||
directly. Simply implement one of the generated traits, and you're off to the
|
||||
races!
|
||||
|
||||
## Example:
|
||||
## Example
|
||||
|
||||
Here's a small service.
|
||||
|
||||
```rust
|
||||
#![feature(plugin, futures_api, pin, arbitrary_self_types, await_macro, async_await)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
#![feature(futures_api, pin, arbitrary_self_types, await_macro, async_await, proc_macro_hygiene)]
|
||||
|
||||
|
||||
use futures::{
|
||||
compat::TokioDefaultSpawner,
|
||||
future::{self, Ready},
|
||||
prelude::*,
|
||||
spawn,
|
||||
};
|
||||
use tarpc::rpc::{
|
||||
use tarpc::{
|
||||
client, context,
|
||||
server::{self, Handler, Server},
|
||||
server::{self, Handler},
|
||||
};
|
||||
use std::io;
|
||||
|
||||
// This is the service definition. It looks a lot like a trait definition.
|
||||
// It defines one RPC, hello, which takes one arg, name, and returns a String.
|
||||
tarpc::service! {
|
||||
/// Returns a greeting for name.
|
||||
rpc hello(name: String) -> String;
|
||||
}
|
||||
|
||||
@@ -86,7 +80,7 @@ impl Service for HelloServer {
|
||||
|
||||
type HelloFut = Ready<String>;
|
||||
|
||||
fn hello(&self, _: context::Context, name: String) -> Self::HelloFut {
|
||||
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
|
||||
future::ready(format!("Hello, {}!", name))
|
||||
}
|
||||
}
|
||||
@@ -98,7 +92,7 @@ async fn run() -> io::Result<()> {
|
||||
let addr = transport.local_addr();
|
||||
|
||||
// The server is configured with the defaults.
|
||||
let server = Server::new(server::Config::default())
|
||||
let server = server::new(server::Config::default())
|
||||
// Server can listen on any type that implements the Transport trait.
|
||||
.incoming(transport)
|
||||
// Close the stream after the client connects
|
||||
@@ -107,14 +101,14 @@ async fn run() -> io::Result<()> {
|
||||
// the generated Service trait.
|
||||
.respond_with(serve(HelloServer));
|
||||
|
||||
spawn!(server).unwrap();
|
||||
tokio_executor::spawn(server.unit_error().boxed().compat());
|
||||
|
||||
let transport = await!(bincode_transport::connect(&addr))?;
|
||||
|
||||
// new_stub is generated by the service! macro. Like Server, it takes a config and any
|
||||
// Transport as input, and returns a Client, also generated by the macro.
|
||||
// by the service mcro.
|
||||
let mut client = await!(new_stub(client::Config::default(), transport));
|
||||
let mut client = await!(new_stub(client::Config::default(), transport))?;
|
||||
|
||||
// The client has an RPC method for each RPC defined in service!. It takes the same args
|
||||
// as defined, with the addition of a Context, which is always the first arg. The Context
|
||||
@@ -127,10 +121,11 @@ async fn run() -> io::Result<()> {
|
||||
}
|
||||
|
||||
fn main() {
|
||||
tarpc::init(TokioDefaultSpawner);
|
||||
tokio::run(run()
|
||||
.map_err(|e| eprintln!("Oh no: {}", e))
|
||||
.boxed()
|
||||
.compat(TokioDefaultSpawner),
|
||||
.compat(),
|
||||
);
|
||||
}
|
||||
```
|
||||
@@ -139,13 +134,3 @@ fn main() {
|
||||
|
||||
Use `cargo doc` as you normally would to see the documentation created for all
|
||||
items expanded by a `service!` invocation.
|
||||
|
||||
## Contributing
|
||||
|
||||
To contribute to tarpc, please see [CONTRIBUTING](CONTRIBUTING.md).
|
||||
|
||||
## License
|
||||
|
||||
tarpc is distributed under the terms of the MIT license.
|
||||
|
||||
See [LICENSE](LICENSE) for details.
|
||||
|
||||
@@ -2,7 +2,7 @@ cargo-features = ["rename-dependency"]
|
||||
|
||||
[package]
|
||||
name = "tarpc-bincode-transport"
|
||||
version = "0.1.0"
|
||||
version = "0.3.0"
|
||||
authors = ["Tim Kuehn <tikue@google.com>"]
|
||||
edition = '2018'
|
||||
license = "MIT"
|
||||
@@ -16,22 +16,19 @@ description = "A bincode-based transport for tarpc services."
|
||||
|
||||
[dependencies]
|
||||
bincode = { version = "1.0", features = ["i128"] }
|
||||
bytes = "0.4"
|
||||
futures_legacy = { version = "0.1", package = "futures" }
|
||||
pin-utils = "0.1.0-alpha.2"
|
||||
rpc = { package = "tarpc-lib", version = "0.1", path = "../rpc", features = ["serde1"] }
|
||||
pin-utils = "0.1.0-alpha.3"
|
||||
rpc = { package = "tarpc-lib", version = "0.2", path = "../rpc", features = ["serde1"] }
|
||||
serde = "1.0"
|
||||
tokio = "0.1"
|
||||
tokio-io = "0.1"
|
||||
tokio-serde-bincode = "0.1"
|
||||
async-bincode = "0.4"
|
||||
tokio-tcp = "0.1"
|
||||
tokio-serde = "0.2"
|
||||
|
||||
[target.'cfg(not(test))'.dependencies]
|
||||
futures-preview = { version = "0.3.0-alpha.8", features = ["compat"] }
|
||||
futures-preview = { version = "0.3.0-alpha.9", features = ["compat"] }
|
||||
|
||||
[dev-dependencies]
|
||||
futures-preview = { version = "0.3.0-alpha.8", features = ["compat", "tokio-compat"] }
|
||||
futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] }
|
||||
env_logger = "0.5"
|
||||
humantime = "1.0"
|
||||
log = "0.4"
|
||||
|
||||
@@ -1 +1 @@
|
||||
edition = "Edition2018"
|
||||
edition = "2018"
|
||||
|
||||
150
bincode-transport/src/compat.rs
Normal file
150
bincode-transport/src/compat.rs
Normal file
@@ -0,0 +1,150 @@
|
||||
use futures::{compat::Stream01CompatExt, prelude::*, ready};
|
||||
use futures_legacy::{
|
||||
executor::{
|
||||
self as executor01, Notify as Notify01, NotifyHandle as NotifyHandle01,
|
||||
UnsafeNotify as UnsafeNotify01,
|
||||
},
|
||||
Async as Async01, AsyncSink as AsyncSink01, Sink as Sink01, Stream as Stream01,
|
||||
};
|
||||
use std::{
|
||||
pin::Pin,
|
||||
task::{self, LocalWaker, Poll},
|
||||
};
|
||||
|
||||
/// A shim to convert a 0.1 Sink + Stream to a 0.3 Sink + Stream.
|
||||
#[derive(Debug)]
|
||||
pub struct Compat<S, SinkItem> {
|
||||
staged_item: Option<SinkItem>,
|
||||
inner: S,
|
||||
}
|
||||
|
||||
impl<S, SinkItem> Compat<S, SinkItem> {
|
||||
/// Returns a new Compat.
|
||||
pub fn new(inner: S) -> Self {
|
||||
Compat {
|
||||
inner,
|
||||
staged_item: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Unwraps Compat, returning the inner value.
|
||||
pub fn into_inner(self) -> S {
|
||||
self.inner
|
||||
}
|
||||
|
||||
/// Returns a reference to the value wrapped by Compat.
|
||||
pub fn get_ref(&self) -> &S {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, SinkItem> Stream for Compat<S, SinkItem>
|
||||
where
|
||||
S: Stream01,
|
||||
{
|
||||
type Item = Result<S::Item, S::Error>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<Self::Item>> {
|
||||
unsafe {
|
||||
let inner = &mut Pin::get_mut_unchecked(self).inner;
|
||||
let mut compat = inner.compat();
|
||||
let compat = Pin::new_unchecked(&mut compat);
|
||||
match ready!(compat.poll_next(waker)) {
|
||||
None => Poll::Ready(None),
|
||||
Some(Ok(next)) => Poll::Ready(Some(Ok(next))),
|
||||
Some(Err(e)) => Poll::Ready(Some(Err(e))),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, SinkItem> Sink for Compat<S, SinkItem>
|
||||
where
|
||||
S: Sink01<SinkItem = SinkItem>,
|
||||
{
|
||||
type SinkItem = SinkItem;
|
||||
type SinkError = S::SinkError;
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> Result<(), S::SinkError> {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
assert!(me.staged_item.is_none());
|
||||
me.staged_item = Some(item);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
let notify = &WakerToHandle(waker);
|
||||
|
||||
executor01::with_notify(notify, 0, move || {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
match me.staged_item.take() {
|
||||
Some(staged_item) => match me.inner.start_send(staged_item) {
|
||||
Ok(AsyncSink01::Ready) => Poll::Ready(Ok(())),
|
||||
Ok(AsyncSink01::NotReady(item)) => {
|
||||
me.staged_item = Some(item);
|
||||
Poll::Pending
|
||||
}
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
},
|
||||
None => Poll::Ready(Ok(())),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
let notify = &WakerToHandle(waker);
|
||||
|
||||
executor01::with_notify(notify, 0, move || {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
match me.inner.poll_complete() {
|
||||
Ok(Async01::Ready(())) => Poll::Ready(Ok(())),
|
||||
Ok(Async01::NotReady) => Poll::Pending,
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
let notify = &WakerToHandle(waker);
|
||||
|
||||
executor01::with_notify(notify, 0, move || {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
match me.inner.close() {
|
||||
Ok(Async01::Ready(())) => Poll::Ready(Ok(())),
|
||||
Ok(Async01::NotReady) => Poll::Pending,
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct WakerToHandle<'a>(&'a LocalWaker);
|
||||
|
||||
#[derive(Debug)]
|
||||
struct NotifyWaker(task::Waker);
|
||||
|
||||
impl Notify01 for NotifyWaker {
|
||||
fn notify(&self, _: usize) {
|
||||
self.0.wake();
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl UnsafeNotify01 for NotifyWaker {
|
||||
unsafe fn clone_raw(&self) -> NotifyHandle01 {
|
||||
let ptr = Box::new(NotifyWaker(self.0.clone()));
|
||||
|
||||
NotifyHandle01::new(Box::into_raw(ptr))
|
||||
}
|
||||
|
||||
unsafe fn drop_raw(&self) {
|
||||
let ptr: *const dyn UnsafeNotify01 = self;
|
||||
drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<WakerToHandle<'a>> for NotifyHandle01 {
|
||||
fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 {
|
||||
unsafe { NotifyWaker(handle.0.clone().into_waker()).clone_raw() }
|
||||
}
|
||||
}
|
||||
@@ -10,75 +10,149 @@
|
||||
futures_api,
|
||||
pin,
|
||||
arbitrary_self_types,
|
||||
underscore_imports,
|
||||
await_macro,
|
||||
async_await,
|
||||
async_await
|
||||
)]
|
||||
#![deny(missing_docs, missing_debug_implementations)]
|
||||
|
||||
mod vendored;
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use crate::vendored::tokio_serde_bincode::{IoErrorWrapper, ReadBincode, WriteBincode};
|
||||
use self::compat::Compat;
|
||||
use async_bincode::{AsyncBincodeStream, AsyncDestination};
|
||||
use futures::{
|
||||
Poll,
|
||||
compat::{Compat01As03, Future01CompatExt, Stream01CompatExt},
|
||||
prelude::*,
|
||||
ready, task,
|
||||
};
|
||||
use futures_legacy::{
|
||||
executor::{
|
||||
self as executor01, Notify as Notify01, NotifyHandle as NotifyHandle01,
|
||||
UnsafeNotify as UnsafeNotify01,
|
||||
},
|
||||
sink::SinkMapErr as SinkMapErr01,
|
||||
sink::With as With01,
|
||||
stream::MapErr as MapErr01,
|
||||
Async as Async01, AsyncSink as AsyncSink01, Sink as Sink01, Stream as Stream01,
|
||||
ready,
|
||||
};
|
||||
use pin_utils::unsafe_pinned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{fmt, io, marker::PhantomData, net::SocketAddr, pin::Pin, task::LocalWaker};
|
||||
use tokio::codec::{Framed, LengthDelimitedCodec, length_delimited};
|
||||
use tokio_tcp::{self, TcpListener, TcpStream};
|
||||
use std::{
|
||||
error::Error,
|
||||
io,
|
||||
marker::PhantomData,
|
||||
net::SocketAddr,
|
||||
pin::Pin,
|
||||
task::{LocalWaker, Poll},
|
||||
};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_tcp::{TcpListener, TcpStream};
|
||||
|
||||
/// Returns a new bincode transport that reads from and writes to `io`.
|
||||
pub fn new<Item, SinkItem>(io: TcpStream) -> Transport<Item, SinkItem>
|
||||
where
|
||||
Item: for<'de> Deserialize<'de>,
|
||||
SinkItem: Serialize,
|
||||
{
|
||||
let peer_addr = io.peer_addr();
|
||||
let local_addr = io.local_addr();
|
||||
let inner = length_delimited::Builder::new()
|
||||
.max_frame_length(8_000_000)
|
||||
.new_framed(io)
|
||||
.map_err(IoErrorWrapper as _)
|
||||
.sink_map_err(IoErrorWrapper as _)
|
||||
.with(freeze as _);
|
||||
let inner = WriteBincode::new(inner);
|
||||
let inner = ReadBincode::new(inner);
|
||||
mod compat;
|
||||
|
||||
Transport {
|
||||
inner,
|
||||
staged_item: None,
|
||||
peer_addr,
|
||||
local_addr,
|
||||
/// A transport that serializes to, and deserializes from, a [`TcpStream`].
|
||||
#[derive(Debug)]
|
||||
pub struct Transport<S, Item, SinkItem> {
|
||||
inner: Compat<AsyncBincodeStream<S, Item, SinkItem, AsyncDestination>, SinkItem>,
|
||||
}
|
||||
|
||||
impl<S, Item, SinkItem> Transport<S, Item, SinkItem> {
|
||||
/// Returns the transport underlying the bincode transport.
|
||||
pub fn into_inner(self) -> S {
|
||||
self.inner.into_inner().into_inner()
|
||||
}
|
||||
}
|
||||
|
||||
fn freeze(bytes: BytesMut) -> Result<Bytes, IoErrorWrapper> {
|
||||
Ok(bytes.freeze())
|
||||
impl<S, Item, SinkItem> Transport<S, Item, SinkItem> {
|
||||
unsafe_pinned!(
|
||||
inner: Compat<AsyncBincodeStream<S, Item, SinkItem, AsyncDestination>, SinkItem>
|
||||
);
|
||||
}
|
||||
|
||||
/// Connects to `addr`, wrapping the connection in a bincode transport.
|
||||
pub async fn connect<Item, SinkItem>(addr: &SocketAddr) -> io::Result<Transport<Item, SinkItem>>
|
||||
impl<S, Item, SinkItem> Stream for Transport<S, Item, SinkItem>
|
||||
where
|
||||
S: AsyncRead,
|
||||
Item: for<'a> Deserialize<'a>,
|
||||
{
|
||||
type Item = io::Result<Item>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<io::Result<Item>>> {
|
||||
match self.inner().poll_next(waker) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
Poll::Ready(Some(Ok(next))) => Poll::Ready(Some(Ok(next))),
|
||||
Poll::Ready(Some(Err(e))) => {
|
||||
Poll::Ready(Some(Err(io::Error::new(io::ErrorKind::Other, e))))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, Item, SinkItem> Sink for Transport<S, Item, SinkItem>
|
||||
where
|
||||
S: AsyncWrite,
|
||||
SinkItem: Serialize,
|
||||
{
|
||||
type SinkItem = SinkItem;
|
||||
type SinkError = io::Error;
|
||||
|
||||
fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
|
||||
self.inner()
|
||||
.start_send(item)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
|
||||
}
|
||||
|
||||
fn poll_ready(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
convert(self.inner().poll_ready(waker))
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
convert(self.inner().poll_flush(waker))
|
||||
}
|
||||
|
||||
fn poll_close(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
convert(self.inner().poll_close(waker))
|
||||
}
|
||||
}
|
||||
|
||||
fn convert<E: Into<Box<Error + Send + Sync>>>(poll: Poll<Result<(), E>>) -> Poll<io::Result<()>> {
|
||||
match poll {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))),
|
||||
}
|
||||
}
|
||||
|
||||
impl<Item, SinkItem> rpc::Transport for Transport<TcpStream, Item, SinkItem>
|
||||
where
|
||||
Item: for<'de> Deserialize<'de>,
|
||||
SinkItem: Serialize,
|
||||
{
|
||||
let stream = await!(TcpStream::connect(addr).compat())?;
|
||||
Ok(new(stream))
|
||||
type Item = Item;
|
||||
type SinkItem = SinkItem;
|
||||
|
||||
fn peer_addr(&self) -> io::Result<SocketAddr> {
|
||||
self.inner.get_ref().get_ref().peer_addr()
|
||||
}
|
||||
|
||||
fn local_addr(&self) -> io::Result<SocketAddr> {
|
||||
self.inner.get_ref().get_ref().local_addr()
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a new bincode transport that reads from and writes to `io`.
|
||||
pub fn new<Item, SinkItem>(io: TcpStream) -> Transport<TcpStream, Item, SinkItem>
|
||||
where
|
||||
Item: for<'de> Deserialize<'de>,
|
||||
SinkItem: Serialize,
|
||||
{
|
||||
Transport::from(io)
|
||||
}
|
||||
|
||||
impl<S, Item, SinkItem> From<S> for Transport<S, Item, SinkItem> {
|
||||
fn from(inner: S) -> Self {
|
||||
Transport {
|
||||
inner: Compat::new(AsyncBincodeStream::from(inner).for_async()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Connects to `addr`, wrapping the connection in a bincode transport.
|
||||
pub async fn connect<Item, SinkItem>(
|
||||
addr: &SocketAddr,
|
||||
) -> io::Result<Transport<TcpStream, Item, SinkItem>>
|
||||
where
|
||||
Item: for<'de> Deserialize<'de>,
|
||||
SinkItem: Serialize,
|
||||
{
|
||||
Ok(new(await!(TcpStream::connect(addr).compat())?))
|
||||
}
|
||||
|
||||
/// Listens on `addr`, wrapping accepted connections in bincode transports.
|
||||
@@ -119,168 +193,10 @@ where
|
||||
Item: for<'a> Deserialize<'a>,
|
||||
SinkItem: Serialize,
|
||||
{
|
||||
type Item = io::Result<Transport<Item, SinkItem>>;
|
||||
type Item = io::Result<Transport<TcpStream, Item, SinkItem>>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<Self::Item>> {
|
||||
let next = ready!(self.incoming().poll_next(waker)?);
|
||||
Poll::Ready(next.map(|conn| Ok(new(conn))))
|
||||
}
|
||||
}
|
||||
|
||||
/// A transport that serializes to, and deserializes from, a [`TcpStream`].
|
||||
pub struct Transport<Item, SinkItem> {
|
||||
inner: ReadBincode<
|
||||
WriteBincode<
|
||||
With01<
|
||||
SinkMapErr01<
|
||||
MapErr01<
|
||||
Framed<tokio_tcp::TcpStream, LengthDelimitedCodec>,
|
||||
fn(std::io::Error) -> IoErrorWrapper,
|
||||
>,
|
||||
fn(std::io::Error) -> IoErrorWrapper,
|
||||
>,
|
||||
BytesMut,
|
||||
fn(BytesMut) -> Result<Bytes, IoErrorWrapper>,
|
||||
Result<Bytes, IoErrorWrapper>
|
||||
>,
|
||||
SinkItem,
|
||||
>,
|
||||
Item,
|
||||
>,
|
||||
staged_item: Option<SinkItem>,
|
||||
peer_addr: io::Result<SocketAddr>,
|
||||
local_addr: io::Result<SocketAddr>,
|
||||
}
|
||||
|
||||
impl<Item, SinkItem> fmt::Debug for Transport<Item, SinkItem> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "Transport")
|
||||
}
|
||||
}
|
||||
|
||||
impl<Item, SinkItem> Stream for Transport<Item, SinkItem>
|
||||
where
|
||||
Item: for<'a> Deserialize<'a>,
|
||||
{
|
||||
type Item = io::Result<Item>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<io::Result<Item>>> {
|
||||
unsafe {
|
||||
let inner = &mut Pin::get_mut_unchecked(self).inner;
|
||||
let mut compat = inner.compat();
|
||||
let compat = Pin::new_unchecked(&mut compat);
|
||||
match ready!(compat.poll_next(waker)) {
|
||||
None => Poll::Ready(None),
|
||||
Some(Ok(next)) => Poll::Ready(Some(Ok(next))),
|
||||
Some(Err(e)) => Poll::Ready(Some(Err(e.0))),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Item, SinkItem> Sink for Transport<Item, SinkItem>
|
||||
where
|
||||
SinkItem: Serialize,
|
||||
{
|
||||
type SinkItem = SinkItem;
|
||||
type SinkError = io::Error;
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
assert!(me.staged_item.is_none());
|
||||
me.staged_item = Some(item);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
let notify = &WakerToHandle(waker);
|
||||
|
||||
executor01::with_notify(notify, 0, move || {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
match me.staged_item.take() {
|
||||
Some(staged_item) => match me.inner.start_send(staged_item)? {
|
||||
AsyncSink01::Ready => Poll::Ready(Ok(())),
|
||||
AsyncSink01::NotReady(item) => {
|
||||
me.staged_item = Some(item);
|
||||
Poll::Pending
|
||||
}
|
||||
},
|
||||
None => Poll::Ready(Ok(())),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
let notify = &WakerToHandle(waker);
|
||||
|
||||
executor01::with_notify(notify, 0, move || {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
match me.inner.poll_complete()? {
|
||||
Async01::Ready(()) => Poll::Ready(Ok(())),
|
||||
Async01::NotReady => Poll::Pending,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
let notify = &WakerToHandle(waker);
|
||||
|
||||
executor01::with_notify(notify, 0, move || {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
match me.inner.get_mut().close()? {
|
||||
Async01::Ready(()) => Poll::Ready(Ok(())),
|
||||
Async01::NotReady => Poll::Pending,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<Item, SinkItem> rpc::Transport for Transport<Item, SinkItem>
|
||||
where
|
||||
Item: for<'de> Deserialize<'de>,
|
||||
SinkItem: Serialize,
|
||||
{
|
||||
type Item = Item;
|
||||
type SinkItem = SinkItem;
|
||||
|
||||
fn peer_addr(&self) -> io::Result<SocketAddr> {
|
||||
// TODO: should just access from the inner transport.
|
||||
// https://github.com/alexcrichton/tokio-serde-bincode/issues/4
|
||||
Ok(*self.peer_addr.as_ref().unwrap())
|
||||
}
|
||||
|
||||
fn local_addr(&self) -> io::Result<SocketAddr> {
|
||||
Ok(*self.local_addr.as_ref().unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct WakerToHandle<'a>(&'a LocalWaker);
|
||||
|
||||
#[derive(Debug)]
|
||||
struct NotifyWaker(task::Waker);
|
||||
|
||||
impl Notify01 for NotifyWaker {
|
||||
fn notify(&self, _: usize) {
|
||||
self.0.wake();
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl UnsafeNotify01 for NotifyWaker {
|
||||
unsafe fn clone_raw(&self) -> NotifyHandle01 {
|
||||
let ptr = Box::new(NotifyWaker(self.0.clone()));
|
||||
|
||||
NotifyHandle01::new(Box::into_raw(ptr))
|
||||
}
|
||||
|
||||
unsafe fn drop_raw(&self) {
|
||||
let ptr: *const dyn UnsafeNotify01 = self;
|
||||
drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<WakerToHandle<'a>> for NotifyHandle01 {
|
||||
fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 {
|
||||
unsafe { NotifyWaker(handle.0.clone().into_waker()).clone_raw() }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +0,0 @@
|
||||
// Copyright 2018 Google LLC
|
||||
//
|
||||
// Use of this source code is governed by an MIT-style
|
||||
// license that can be found in the LICENSE file or at
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
pub(crate) mod tokio_serde_bincode;
|
||||
@@ -1,224 +0,0 @@
|
||||
//! `Stream` and `Sink` adaptors for serializing and deserializing values using
|
||||
//! Bincode.
|
||||
//!
|
||||
//! This crate provides adaptors for going from a stream or sink of buffers
|
||||
//! ([`Bytes`]) to a stream or sink of values by performing Bincode encoding or
|
||||
//! decoding. It is expected that each yielded buffer contains a single
|
||||
//! serialized Bincode value. The specific strategy by which this is done is left
|
||||
//! up to the user. One option is to use using [`length_delimited`] from
|
||||
//! [tokio-io].
|
||||
//!
|
||||
//! [`Bytes`]: https://docs.rs/bytes/0.4/bytes/struct.Bytes.html
|
||||
//! [`length_delimited`]: http://alexcrichton.com/tokio-io/tokio_io/codec/length_delimited/index.html
|
||||
//! [tokio-io]: http://github.com/alexcrichton/tokio-io
|
||||
//! [examples]: https://github.com/carllerche/tokio-serde-json/tree/master/examples
|
||||
|
||||
#![allow(missing_debug_implementations)]
|
||||
|
||||
use bincode::Error;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use futures_legacy::{Poll, Sink, StartSend, Stream};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io;
|
||||
use tokio_serde::{Deserializer, FramedRead, FramedWrite, Serializer};
|
||||
|
||||
use std::marker::PhantomData;
|
||||
|
||||
/// Adapts a stream of Bincode encoded buffers to a stream of values by
|
||||
/// deserializing them.
|
||||
///
|
||||
/// `ReadBincode` implements `Stream` by polling the inner buffer stream and
|
||||
/// deserializing the buffer as Bincode. It expects that each yielded buffer
|
||||
/// represents a single Bincode value and does not contain any extra trailing
|
||||
/// bytes.
|
||||
pub(crate) struct ReadBincode<T, U> {
|
||||
inner: FramedRead<T, U, Bincode<U>>,
|
||||
}
|
||||
|
||||
/// Adapts a buffer sink to a value sink by serializing the values as Bincode.
|
||||
///
|
||||
/// `WriteBincode` implements `Sink` by serializing the submitted values to a
|
||||
/// buffer. The buffer is then sent to the inner stream, which is responsible
|
||||
/// for handling framing on the wire.
|
||||
pub(crate) struct WriteBincode<T: Sink, U> {
|
||||
inner: FramedWrite<T, U, Bincode<U>>,
|
||||
}
|
||||
|
||||
struct Bincode<T> {
|
||||
ghost: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<T, U> ReadBincode<T, U>
|
||||
where
|
||||
T: Stream<Error = IoErrorWrapper>,
|
||||
U: for<'de> Deserialize<'de>,
|
||||
Bytes: From<T::Item>,
|
||||
{
|
||||
/// Creates a new `ReadBincode` with the given buffer stream.
|
||||
pub fn new(inner: T) -> ReadBincode<T, U> {
|
||||
let json = Bincode { ghost: PhantomData };
|
||||
ReadBincode {
|
||||
inner: FramedRead::new(inner, json),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> ReadBincode<T, U> {
|
||||
/// Returns a mutable reference to the underlying stream wrapped by
|
||||
/// `ReadBincode`.
|
||||
///
|
||||
/// Note that care should be taken to not tamper with the underlying stream
|
||||
/// of data coming in as it may corrupt the stream of frames otherwise
|
||||
/// being worked with.
|
||||
pub fn get_mut(&mut self) -> &mut T {
|
||||
self.inner.get_mut()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> Stream for ReadBincode<T, U>
|
||||
where
|
||||
T: Stream<Error = IoErrorWrapper>,
|
||||
U: for<'de> Deserialize<'de>,
|
||||
Bytes: From<T::Item>,
|
||||
{
|
||||
type Item = U;
|
||||
type Error = <T as Stream>::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<U>, Self::Error> {
|
||||
self.inner.poll()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> Sink for ReadBincode<T, U>
|
||||
where
|
||||
T: Sink,
|
||||
{
|
||||
type SinkItem = T::SinkItem;
|
||||
type SinkError = T::SinkError;
|
||||
|
||||
fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
|
||||
self.get_mut().start_send(item)
|
||||
}
|
||||
|
||||
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
|
||||
self.get_mut().poll_complete()
|
||||
}
|
||||
|
||||
fn close(&mut self) -> Poll<(), T::SinkError> {
|
||||
self.get_mut().close()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct IoErrorWrapper(pub io::Error);
|
||||
impl From<Box<bincode::ErrorKind>> for IoErrorWrapper {
|
||||
fn from(e: Box<bincode::ErrorKind>) -> Self {
|
||||
IoErrorWrapper(match *e {
|
||||
bincode::ErrorKind::Io(e) => e,
|
||||
bincode::ErrorKind::InvalidUtf8Encoding(e) => {
|
||||
io::Error::new(io::ErrorKind::InvalidInput, e)
|
||||
}
|
||||
bincode::ErrorKind::InvalidBoolEncoding(e) => {
|
||||
io::Error::new(io::ErrorKind::InvalidInput, e.to_string())
|
||||
}
|
||||
bincode::ErrorKind::InvalidTagEncoding(e) => {
|
||||
io::Error::new(io::ErrorKind::InvalidInput, e.to_string())
|
||||
}
|
||||
bincode::ErrorKind::InvalidCharEncoding => {
|
||||
io::Error::new(io::ErrorKind::InvalidInput, "Invalid char encoding")
|
||||
}
|
||||
bincode::ErrorKind::DeserializeAnyNotSupported => {
|
||||
io::Error::new(io::ErrorKind::InvalidInput, "Deserialize Any not supported")
|
||||
}
|
||||
bincode::ErrorKind::SizeLimit => {
|
||||
io::Error::new(io::ErrorKind::InvalidInput, "Size limit exceeded")
|
||||
}
|
||||
bincode::ErrorKind::SequenceMustHaveLength => {
|
||||
io::Error::new(io::ErrorKind::InvalidInput, "Sequence must have length")
|
||||
}
|
||||
bincode::ErrorKind::Custom(s) => io::Error::new(io::ErrorKind::Other, s),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<IoErrorWrapper> for io::Error {
|
||||
fn from(wrapper: IoErrorWrapper) -> io::Error {
|
||||
wrapper.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> WriteBincode<T, U>
|
||||
where
|
||||
T: Sink<SinkItem = BytesMut, SinkError = IoErrorWrapper>,
|
||||
U: Serialize,
|
||||
{
|
||||
/// Creates a new `WriteBincode` with the given buffer sink.
|
||||
pub fn new(inner: T) -> WriteBincode<T, U> {
|
||||
let json = Bincode { ghost: PhantomData };
|
||||
WriteBincode {
|
||||
inner: FramedWrite::new(inner, json),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Sink, U> WriteBincode<T, U> {
|
||||
/// Returns a mutable reference to the underlying sink wrapped by
|
||||
/// `WriteBincode`.
|
||||
///
|
||||
/// Note that care should be taken to not tamper with the underlying sink as
|
||||
/// it may corrupt the sequence of frames otherwise being worked with.
|
||||
pub fn get_mut(&mut self) -> &mut T {
|
||||
self.inner.get_mut()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> Sink for WriteBincode<T, U>
|
||||
where
|
||||
T: Sink<SinkItem = BytesMut, SinkError = IoErrorWrapper>,
|
||||
U: Serialize,
|
||||
{
|
||||
type SinkItem = U;
|
||||
type SinkError = <T as Sink>::SinkError;
|
||||
|
||||
fn start_send(&mut self, item: U) -> StartSend<U, Self::SinkError> {
|
||||
self.inner.start_send(item)
|
||||
}
|
||||
|
||||
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
|
||||
self.inner.poll_complete()
|
||||
}
|
||||
|
||||
fn close(&mut self) -> Poll<(), Self::SinkError> {
|
||||
self.inner.poll_complete()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> Stream for WriteBincode<T, U>
|
||||
where
|
||||
T: Stream + Sink,
|
||||
{
|
||||
type Item = T::Item;
|
||||
type Error = T::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
|
||||
self.get_mut().poll()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Deserializer<T> for Bincode<T>
|
||||
where
|
||||
T: for<'de> Deserialize<'de>,
|
||||
{
|
||||
type Error = Error;
|
||||
|
||||
fn deserialize(&mut self, src: &Bytes) -> Result<T, Error> {
|
||||
bincode::deserialize(src)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Serialize> Serializer<T> for Bincode<T> {
|
||||
type Error = Error;
|
||||
|
||||
fn serialize(&mut self, item: &T) -> Result<BytesMut, Self::Error> {
|
||||
bincode::serialize(item).map(Into::into)
|
||||
}
|
||||
}
|
||||
@@ -20,9 +20,8 @@ extern crate test;
|
||||
use self::test::stats::Stats;
|
||||
use futures::{compat::TokioDefaultSpawner, prelude::*};
|
||||
use rpc::{
|
||||
client::{self, Client},
|
||||
context,
|
||||
server::{self, Handler, Server},
|
||||
client, context,
|
||||
server::{Handler, Server},
|
||||
};
|
||||
use std::{
|
||||
io,
|
||||
@@ -34,17 +33,17 @@ async fn bench() -> io::Result<()> {
|
||||
let addr = listener.local_addr();
|
||||
|
||||
tokio_executor::spawn(
|
||||
Server::<u32, u32>::new(server::Config::default())
|
||||
Server::<u32, u32>::default()
|
||||
.incoming(listener)
|
||||
.take(1)
|
||||
.respond_with(|_ctx, request| futures::future::ready(Ok(request)))
|
||||
.unit_error()
|
||||
.boxed()
|
||||
.compat()
|
||||
.compat(),
|
||||
);
|
||||
|
||||
let conn = await!(tarpc_bincode_transport::connect(&addr))?;
|
||||
let client = &mut await!(Client::<u32, u32>::new(client::Config::default(), conn))?;
|
||||
let client = &mut await!(client::new::<u32, u32, _>(client::Config::default(), conn))?;
|
||||
|
||||
let total = 10_000usize;
|
||||
let mut successful = 0u32;
|
||||
@@ -104,12 +103,7 @@ fn bench_small_packet() -> io::Result<()> {
|
||||
env_logger::init();
|
||||
rpc::init(TokioDefaultSpawner);
|
||||
|
||||
tokio::run(
|
||||
bench()
|
||||
.map_err(|e| panic!(e.to_string()))
|
||||
.boxed()
|
||||
.compat(),
|
||||
);
|
||||
tokio::run(bench().map_err(|e| panic!(e.to_string())).boxed().compat());
|
||||
println!("done");
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
//! Tests client/server control flow.
|
||||
|
||||
#![feature(generators, await_macro, async_await, futures_api,)]
|
||||
#![feature(generators, await_macro, async_await, futures_api)]
|
||||
|
||||
use futures::{
|
||||
compat::{Future01CompatExt, TokioDefaultSpawner},
|
||||
@@ -15,11 +15,7 @@ use futures::{
|
||||
};
|
||||
use log::{info, trace};
|
||||
use rand::distributions::{Distribution, Normal};
|
||||
use rpc::{
|
||||
client::{self, Client},
|
||||
context,
|
||||
server::{self, Server},
|
||||
};
|
||||
use rpc::{client, context, server::Server};
|
||||
use std::{
|
||||
io,
|
||||
time::{Duration, Instant, SystemTime},
|
||||
@@ -40,7 +36,7 @@ impl AsDuration for SystemTime {
|
||||
async fn run() -> io::Result<()> {
|
||||
let listener = tarpc_bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
|
||||
let addr = listener.local_addr();
|
||||
let server = Server::<String, String>::new(server::Config::default())
|
||||
let server = Server::<String, String>::default()
|
||||
.incoming(listener)
|
||||
.take(1)
|
||||
.for_each(async move |channel| {
|
||||
@@ -80,7 +76,7 @@ async fn run() -> io::Result<()> {
|
||||
tokio_executor::spawn(server.unit_error().boxed().compat());
|
||||
|
||||
let conn = await!(tarpc_bincode_transport::connect(&addr))?;
|
||||
let client = await!(Client::<String, String>::new(
|
||||
let client = await!(client::new::<String, String, _>(
|
||||
client::Config::default(),
|
||||
conn
|
||||
))?;
|
||||
@@ -88,7 +84,7 @@ async fn run() -> io::Result<()> {
|
||||
// Proxy service
|
||||
let listener = tarpc_bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
|
||||
let addr = listener.local_addr();
|
||||
let proxy_server = Server::<String, String>::new(server::Config::default())
|
||||
let proxy_server = Server::<String, String>::default()
|
||||
.incoming(listener)
|
||||
.take(1)
|
||||
.for_each(move |channel| {
|
||||
@@ -115,7 +111,7 @@ async fn run() -> io::Result<()> {
|
||||
config.max_in_flight_requests = 10;
|
||||
config.pending_request_buffer = 10;
|
||||
|
||||
let client = await!(Client::<String, String>::new(
|
||||
let client = await!(client::new::<String, String, _>(
|
||||
config,
|
||||
await!(tarpc_bincode_transport::connect(&addr))?
|
||||
))?;
|
||||
@@ -142,11 +138,6 @@ fn cancel_slower() -> io::Result<()> {
|
||||
env_logger::init();
|
||||
rpc::init(TokioDefaultSpawner);
|
||||
|
||||
tokio::run(
|
||||
run()
|
||||
.boxed()
|
||||
.map_err(|e| panic!(e))
|
||||
.compat(),
|
||||
);
|
||||
tokio::run(run().boxed().map_err(|e| panic!(e)).compat());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
//! Tests client/server control flow.
|
||||
|
||||
#![feature(generators, await_macro, async_await, futures_api,)]
|
||||
#![feature(generators, await_macro, async_await, futures_api)]
|
||||
|
||||
use futures::{
|
||||
compat::{Future01CompatExt, TokioDefaultSpawner},
|
||||
@@ -14,11 +14,7 @@ use futures::{
|
||||
};
|
||||
use log::{error, info, trace};
|
||||
use rand::distributions::{Distribution, Normal};
|
||||
use rpc::{
|
||||
client::{self, Client},
|
||||
context,
|
||||
server::{self, Server},
|
||||
};
|
||||
use rpc::{client, context, server::Server};
|
||||
use std::{
|
||||
io,
|
||||
time::{Duration, Instant, SystemTime},
|
||||
@@ -39,7 +35,7 @@ impl AsDuration for SystemTime {
|
||||
async fn run() -> io::Result<()> {
|
||||
let listener = tarpc_bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
|
||||
let addr = listener.local_addr();
|
||||
let server = Server::<String, String>::new(server::Config::default())
|
||||
let server = Server::<String, String>::default()
|
||||
.incoming(listener)
|
||||
.take(1)
|
||||
.for_each(async move |channel| {
|
||||
@@ -83,7 +79,7 @@ async fn run() -> io::Result<()> {
|
||||
config.pending_request_buffer = 10;
|
||||
|
||||
let conn = await!(tarpc_bincode_transport::connect(&addr))?;
|
||||
let client = await!(Client::<String, String>::new(config, conn))?;
|
||||
let client = await!(client::new::<String, String, _>(config, conn))?;
|
||||
|
||||
let clients = (1..=100u32).map(|_| client.clone()).collect::<Vec<_>>();
|
||||
for mut client in clients {
|
||||
@@ -96,7 +92,10 @@ async fn run() -> io::Result<()> {
|
||||
Ok(response) => info!("[{}] response: {}", trace_id, response),
|
||||
Err(e) => error!("[{}] request error: {:?}: {}", trace_id, e.kind(), e),
|
||||
}
|
||||
}.unit_error().boxed().compat()
|
||||
}
|
||||
.unit_error()
|
||||
.boxed()
|
||||
.compat(),
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -2,23 +2,24 @@ cargo-features = ["rename-dependency"]
|
||||
|
||||
[package]
|
||||
name = "tarpc-example-service"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
authors = ["Tim Kuehn <tikue@google.com>"]
|
||||
edition = "2018"
|
||||
license = "MIT"
|
||||
documentation = "https://docs.rs/tarpc-example-service"
|
||||
homepage = "https://github.com/google/tarpc"
|
||||
repository = "https://github.com/google/tarpc"
|
||||
keywords = ["rpc", "network", "server", "api", "microservices", "example"]
|
||||
keywords = ["rpc", "network", "server", "microservices", "example"]
|
||||
categories = ["asynchronous", "network-programming"]
|
||||
readme = "../README.md"
|
||||
description = "An example server built on tarpc."
|
||||
|
||||
[dependencies]
|
||||
bincode-transport = { package = "tarpc-bincode-transport", version = "0.1", path = "../bincode-transport" }
|
||||
futures-preview = { version = "0.3.0-alpha.8", features = ["compat", "tokio-compat"] }
|
||||
bincode-transport = { package = "tarpc-bincode-transport", version = "0.3", path = "../bincode-transport" }
|
||||
clap = "2.0"
|
||||
futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] }
|
||||
serde = { version = "1.0" }
|
||||
tarpc = { version = "0.13", path = "../tarpc", features = ["serde1"] }
|
||||
tarpc = { version = "0.14", path = "../tarpc", features = ["serde1"] }
|
||||
tokio = "0.1"
|
||||
tokio-executor = "0.1"
|
||||
|
||||
@@ -28,4 +29,8 @@ path = "src/lib.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "server"
|
||||
path = "src/main.rs"
|
||||
path = "src/server.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "client"
|
||||
path = "src/client.rs"
|
||||
|
||||
79
example-service/src/client.rs
Normal file
79
example-service/src/client.rs
Normal file
@@ -0,0 +1,79 @@
|
||||
// Copyright 2018 Google LLC
|
||||
//
|
||||
// Use of this source code is governed by an MIT-style
|
||||
// license that can be found in the LICENSE file or at
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
#![feature(
|
||||
futures_api,
|
||||
pin,
|
||||
arbitrary_self_types,
|
||||
await_macro,
|
||||
async_await
|
||||
)]
|
||||
|
||||
use clap::{App, Arg};
|
||||
use futures::{compat::TokioDefaultSpawner, prelude::*};
|
||||
use std::{io, net::SocketAddr};
|
||||
use tarpc::{client, context};
|
||||
|
||||
async fn run(server_addr: SocketAddr, name: String) -> io::Result<()> {
|
||||
let transport = await!(bincode_transport::connect(&server_addr))?;
|
||||
|
||||
// new_stub is generated by the service! macro. Like Server, it takes a config and any
|
||||
// Transport as input, and returns a Client, also generated by the macro.
|
||||
// by the service mcro.
|
||||
let mut client = await!(service::new_stub(client::Config::default(), transport))?;
|
||||
|
||||
// The client has an RPC method for each RPC defined in service!. It takes the same args
|
||||
// as defined, with the addition of a Context, which is always the first arg. The Context
|
||||
// specifies a deadline and trace information which can be helpful in debugging requests.
|
||||
let hello = await!(client.hello(context::current(), name))?;
|
||||
|
||||
println!("{}", hello);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let flags = App::new("Hello Client")
|
||||
.version("0.1")
|
||||
.author("Tim <tikue@google.com>")
|
||||
.about("Say hello!")
|
||||
.arg(
|
||||
Arg::with_name("server_addr")
|
||||
.long("server_addr")
|
||||
.value_name("ADDRESS")
|
||||
.help("Sets the server address to connect to.")
|
||||
.required(true)
|
||||
.takes_value(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("name")
|
||||
.short("n")
|
||||
.long("name")
|
||||
.value_name("STRING")
|
||||
.help("Sets the name to say hello to.")
|
||||
.required(true)
|
||||
.takes_value(true),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
tarpc::init(TokioDefaultSpawner);
|
||||
|
||||
let server_addr = flags.value_of("server_addr").unwrap();
|
||||
let server_addr = server_addr
|
||||
.parse()
|
||||
.unwrap_or_else(|e| panic!(r#"--server_addr value "{}" invalid: {}"#, server_addr, e));
|
||||
|
||||
let name = flags.value_of("name").unwrap();
|
||||
|
||||
tarpc::init(TokioDefaultSpawner);
|
||||
|
||||
tokio::run(
|
||||
run(server_addr, name.into())
|
||||
.map_err(|e| eprintln!("Oh no: {}", e))
|
||||
.boxed()
|
||||
.compat(),
|
||||
);
|
||||
}
|
||||
@@ -10,7 +10,7 @@
|
||||
arbitrary_self_types,
|
||||
await_macro,
|
||||
async_await,
|
||||
proc_macro_hygiene,
|
||||
proc_macro_hygiene
|
||||
)]
|
||||
|
||||
// This is the service definition. It looks a lot like a trait definition.
|
||||
|
||||
@@ -9,19 +9,20 @@
|
||||
pin,
|
||||
arbitrary_self_types,
|
||||
await_macro,
|
||||
async_await,
|
||||
async_await
|
||||
)]
|
||||
|
||||
use clap::{App, Arg};
|
||||
use futures::{
|
||||
compat::TokioDefaultSpawner,
|
||||
future::{self, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
use std::{io, net::SocketAddr};
|
||||
use tarpc::{
|
||||
client, context,
|
||||
server::{self, Handler, Server},
|
||||
context,
|
||||
server::{Handler, Server},
|
||||
};
|
||||
use std::io;
|
||||
|
||||
// This is the type that implements the generated Service trait. It is the business logic
|
||||
// and is used to start the server.
|
||||
@@ -34,52 +35,56 @@ impl service::Service for HelloServer {
|
||||
|
||||
type HelloFut = Ready<String>;
|
||||
|
||||
fn hello(&self, _: context::Context, name: String) -> Self::HelloFut {
|
||||
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
|
||||
future::ready(format!("Hello, {}!", name))
|
||||
}
|
||||
}
|
||||
|
||||
async fn run() -> io::Result<()> {
|
||||
async fn run(server_addr: SocketAddr) -> io::Result<()> {
|
||||
// bincode_transport is provided by the associated crate bincode-transport. It makes it easy
|
||||
// to start up a serde-powered bincode serialization strategy over TCP.
|
||||
let transport = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
|
||||
let addr = transport.local_addr();
|
||||
let transport = bincode_transport::listen(&server_addr)?;
|
||||
|
||||
// The server is configured with the defaults.
|
||||
let server = Server::new(server::Config::default())
|
||||
let server = Server::default()
|
||||
// Server can listen on any type that implements the Transport trait.
|
||||
.incoming(transport)
|
||||
// Close the stream after the client connects
|
||||
.take(1)
|
||||
// serve is generated by the service! macro. It takes as input any type implementing
|
||||
// the generated Service trait.
|
||||
.respond_with(service::serve(HelloServer));
|
||||
|
||||
tokio_executor::spawn(server.unit_error().boxed().compat());
|
||||
|
||||
let transport = await!(bincode_transport::connect(&addr))?;
|
||||
|
||||
// new_stub is generated by the service! macro. Like Server, it takes a config and any
|
||||
// Transport as input, and returns a Client, also generated by the macro.
|
||||
// by the service mcro.
|
||||
let mut client = await!(service::new_stub(client::Config::default(), transport))?;
|
||||
|
||||
// The client has an RPC method for each RPC defined in service!. It takes the same args
|
||||
// as defined, with the addition of a Context, which is always the first arg. The Context
|
||||
// specifies a deadline and trace information which can be helpful in debugging requests.
|
||||
let hello = await!(client.hello(context::current(), "Stim".to_string()))?;
|
||||
|
||||
println!("{}", hello);
|
||||
await!(server);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let flags = App::new("Hello Server")
|
||||
.version("0.1")
|
||||
.author("Tim <tikue@google.com>")
|
||||
.about("Say hello!")
|
||||
.arg(
|
||||
Arg::with_name("port")
|
||||
.short("p")
|
||||
.long("port")
|
||||
.value_name("NUMBER")
|
||||
.help("Sets the port number to listen on")
|
||||
.required(true)
|
||||
.takes_value(true),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
let port = flags.value_of("port").unwrap();
|
||||
let port = port
|
||||
.parse()
|
||||
.unwrap_or_else(|e| panic!(r#"--port value "{}" invalid: {}"#, port, e));
|
||||
|
||||
tarpc::init(TokioDefaultSpawner);
|
||||
|
||||
tokio::run(run()
|
||||
tokio::run(
|
||||
run(([0, 0, 0, 0], port).into())
|
||||
.map_err(|e| eprintln!("Oh no: {}", e))
|
||||
.boxed()
|
||||
.compat()
|
||||
.compat(),
|
||||
);
|
||||
}
|
||||
@@ -1 +1 @@
|
||||
edition = "Edition2018"
|
||||
edition = "2018"
|
||||
|
||||
@@ -4,31 +4,35 @@
|
||||
// license that can be found in the LICENSE file or at
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
extern crate itertools;
|
||||
extern crate proc_macro;
|
||||
extern crate proc_macro2;
|
||||
extern crate syn;
|
||||
extern crate itertools;
|
||||
extern crate quote;
|
||||
extern crate syn;
|
||||
|
||||
use proc_macro::TokenStream;
|
||||
|
||||
use itertools::Itertools;
|
||||
use quote::ToTokens;
|
||||
use syn::{Ident, TraitItemType, TypePath, parse};
|
||||
use proc_macro2::Span;
|
||||
use quote::ToTokens;
|
||||
use std::str::FromStr;
|
||||
use syn::{parse, Ident, TraitItemType, TypePath};
|
||||
|
||||
#[proc_macro]
|
||||
pub fn snake_to_camel(input: TokenStream) -> TokenStream {
|
||||
let i = input.clone();
|
||||
let mut assoc_type = parse::<TraitItemType>(input).unwrap_or_else(|_| panic!("Could not parse trait item from:\n{}", i));
|
||||
let mut assoc_type = parse::<TraitItemType>(input)
|
||||
.unwrap_or_else(|_| panic!("Could not parse trait item from:\n{}", i));
|
||||
|
||||
let old_ident = convert(&mut assoc_type.ident);
|
||||
|
||||
for mut attr in &mut assoc_type.attrs {
|
||||
if let Some(pair) = attr.path.segments.first() {
|
||||
if pair.value().ident == "doc" {
|
||||
attr.tts = proc_macro2::TokenStream::from_str(&attr.tts.to_string().replace("{}", &old_ident)).unwrap();
|
||||
attr.tts = proc_macro2::TokenStream::from_str(
|
||||
&attr.tts.to_string().replace("{}", &old_ident),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -41,12 +45,7 @@ pub fn ty_snake_to_camel(input: TokenStream) -> TokenStream {
|
||||
let mut path = parse::<TypePath>(input).unwrap();
|
||||
|
||||
// Only capitalize the final segment
|
||||
convert(&mut path.path
|
||||
.segments
|
||||
.last_mut()
|
||||
.unwrap()
|
||||
.into_value()
|
||||
.ident);
|
||||
convert(&mut path.path.segments.last_mut().unwrap().into_value().ident);
|
||||
|
||||
path.into_token_stream().into()
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ cargo-features = ["rename-dependency"]
|
||||
|
||||
[package]
|
||||
name = "tarpc-lib"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
authors = ["Tim Kuehn <tikue@google.com>"]
|
||||
edition = '2018'
|
||||
license = "MIT"
|
||||
@@ -22,17 +22,17 @@ serde1 = ["trace/serde", "serde", "serde/derive"]
|
||||
fnv = "1.0"
|
||||
humantime = "1.0"
|
||||
log = "0.4"
|
||||
pin-utils = "0.1.0-alpha.2"
|
||||
pin-utils = "0.1.0-alpha.3"
|
||||
rand = "0.5"
|
||||
tokio-timer = "0.2"
|
||||
trace = { package = "tarpc-trace", version = "0.1", path = "../trace" }
|
||||
serde = { optional = true, version = "1.0" }
|
||||
|
||||
[target.'cfg(not(test))'.dependencies]
|
||||
futures-preview = { version = "0.3.0-alpha.8", features = ["compat"] }
|
||||
futures-preview = { version = "0.3.0-alpha.9", features = ["compat"] }
|
||||
|
||||
[dev-dependencies]
|
||||
futures-preview = { version = "0.3.0-alpha.8", features = ["compat", "tokio-compat"] }
|
||||
futures-test-preview = { version = "0.3.0-alpha.8" }
|
||||
futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] }
|
||||
futures-test-preview = { version = "0.3.0-alpha.9" }
|
||||
env_logger = "0.5"
|
||||
tokio = "0.1"
|
||||
|
||||
@@ -1 +1 @@
|
||||
edition = "Edition2018"
|
||||
edition = "2018"
|
||||
|
||||
@@ -11,18 +11,19 @@ use crate::{
|
||||
};
|
||||
use fnv::FnvHashMap;
|
||||
use futures::{
|
||||
Poll,
|
||||
channel::{mpsc, oneshot},
|
||||
prelude::*,
|
||||
ready,
|
||||
stream::Fuse,
|
||||
task::LocalWaker,
|
||||
Poll,
|
||||
};
|
||||
use humantime::format_rfc3339;
|
||||
use log::{debug, error, info, trace};
|
||||
use pin_utils::unsafe_pinned;
|
||||
use pin_utils::{unsafe_pinned, unsafe_unpinned};
|
||||
use std::{
|
||||
io,
|
||||
marker::{self, Unpin},
|
||||
net::SocketAddr,
|
||||
pin::Pin,
|
||||
sync::{
|
||||
@@ -37,7 +38,7 @@ use super::Config;
|
||||
|
||||
/// Handles communication from the client to request dispatch.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Channel<Req, Resp> {
|
||||
pub struct Channel<Req, Resp> {
|
||||
to_dispatch: mpsc::Sender<DispatchRequest<Req, Resp>>,
|
||||
/// Channel to send a cancel message to the dispatcher.
|
||||
cancellation: RequestCancellation,
|
||||
@@ -57,14 +58,58 @@ impl<Req, Resp> Clone for Channel<Req, Resp> {
|
||||
}
|
||||
}
|
||||
|
||||
/// A future returned by [`Channel::send`] that resolves to a server response.
|
||||
#[derive(Debug)]
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
struct Send<'a, Req, Resp> {
|
||||
fut: MapOkDispatchResponse<
|
||||
MapErrConnectionReset<futures::sink::Send<'a, mpsc::Sender<DispatchRequest<Req, Resp>>>>,
|
||||
Resp,
|
||||
>,
|
||||
}
|
||||
|
||||
impl<'a, Req, Resp> Send<'a, Req, Resp> {
|
||||
unsafe_pinned!(
|
||||
fut: MapOkDispatchResponse<
|
||||
MapErrConnectionReset<
|
||||
futures::sink::Send<'a, mpsc::Sender<DispatchRequest<Req, Resp>>>,
|
||||
>,
|
||||
Resp,
|
||||
>
|
||||
);
|
||||
}
|
||||
|
||||
impl<'a, Req, Resp> Future for Send<'a, Req, Resp> {
|
||||
type Output = io::Result<DispatchResponse<Resp>>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
self.fut().poll(lw)
|
||||
}
|
||||
}
|
||||
|
||||
/// A future returned by [`Channel::call`] that resolves to a server response.
|
||||
#[derive(Debug)]
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct Call<'a, Req, Resp> {
|
||||
fut: AndThenIdent<Send<'a, Req, Resp>, DispatchResponse<Resp>>,
|
||||
}
|
||||
|
||||
impl<'a, Req, Resp> Call<'a, Req, Resp> {
|
||||
unsafe_pinned!(fut: AndThenIdent<Send<'a, Req, Resp>, DispatchResponse<Resp>>);
|
||||
}
|
||||
|
||||
impl<'a, Req, Resp> Future for Call<'a, Req, Resp> {
|
||||
type Output = io::Result<Resp>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
self.fut().poll(lw)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp> Channel<Req, Resp> {
|
||||
/// Sends a request to the dispatch task to forward to the server, returning a [`Future`] that
|
||||
/// resolves when the request is sent (not when the response is received).
|
||||
pub(crate) async fn send(
|
||||
&mut self,
|
||||
mut ctx: context::Context,
|
||||
request: Req,
|
||||
) -> io::Result<DispatchResponse<Resp>> {
|
||||
fn send<'a>(&'a mut self, mut ctx: context::Context, request: Req) -> Send<'a, Req, Resp> {
|
||||
// Convert the context to the call context.
|
||||
ctx.trace_context.parent_id = Some(ctx.trace_context.span_id);
|
||||
ctx.trace_context.span_id = SpanId::random(&mut rand::thread_rng());
|
||||
@@ -82,38 +127,40 @@ impl<Req, Resp> Channel<Req, Resp> {
|
||||
let (response_completion, response) = oneshot::channel();
|
||||
let cancellation = self.cancellation.clone();
|
||||
let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed);
|
||||
await!(self.to_dispatch.send(DispatchRequest {
|
||||
ctx,
|
||||
request_id,
|
||||
request,
|
||||
response_completion,
|
||||
})).map_err(|_| io::Error::from(io::ErrorKind::ConnectionReset))?;
|
||||
Ok(DispatchResponse {
|
||||
response: deadline_compat::Deadline::new(response, deadline),
|
||||
complete: false,
|
||||
request_id,
|
||||
cancellation,
|
||||
ctx,
|
||||
server_addr: self.server_addr,
|
||||
})
|
||||
let server_addr = self.server_addr;
|
||||
Send {
|
||||
fut: MapOkDispatchResponse::new(
|
||||
MapErrConnectionReset::new(self.to_dispatch.send(DispatchRequest {
|
||||
ctx,
|
||||
request_id,
|
||||
request,
|
||||
response_completion,
|
||||
})),
|
||||
DispatchResponse {
|
||||
response: deadline_compat::Deadline::new(response, deadline),
|
||||
complete: false,
|
||||
request_id,
|
||||
cancellation,
|
||||
ctx,
|
||||
server_addr,
|
||||
},
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends a request to the dispatch task to forward to the server, returning a [`Future`] that
|
||||
/// resolves to the response.
|
||||
pub(crate) async fn call(
|
||||
&mut self,
|
||||
context: context::Context,
|
||||
request: Req,
|
||||
) -> io::Result<Resp> {
|
||||
let response_future = await!(self.send(context, request))?;
|
||||
await!(response_future)
|
||||
pub fn call<'a>(&'a mut self, context: context::Context, request: Req) -> Call<'a, Req, Resp> {
|
||||
Call {
|
||||
fut: AndThenIdent::new(self.send(context, request)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A server response that is completed by request dispatch when the corresponding response
|
||||
/// arrives off the wire.
|
||||
#[derive(Debug)]
|
||||
pub struct DispatchResponse<Resp> {
|
||||
struct DispatchResponse<Resp> {
|
||||
response: deadline_compat::Deadline<oneshot::Receiver<Response<Resp>>>,
|
||||
ctx: context::Context,
|
||||
complete: bool,
|
||||
@@ -205,9 +252,9 @@ pub async fn spawn<Req, Resp, C>(
|
||||
server_addr: SocketAddr,
|
||||
) -> io::Result<Channel<Req, Resp>>
|
||||
where
|
||||
Req: Send,
|
||||
Resp: Send,
|
||||
C: Transport<Item = Response<Resp>, SinkItem = ClientMessage<Req>> + Send,
|
||||
Req: marker::Send + 'static,
|
||||
Resp: marker::Send + 'static,
|
||||
C: Transport<Item = Response<Resp>, SinkItem = ClientMessage<Req>> + marker::Send + 'static,
|
||||
{
|
||||
let (to_dispatch, pending_requests) = mpsc::channel(config.pending_request_buffer);
|
||||
let (cancellation, canceled_requests) = cancellations();
|
||||
@@ -220,16 +267,18 @@ where
|
||||
transport: transport.fuse(),
|
||||
in_flight_requests: FnvHashMap::default(),
|
||||
pending_requests: pending_requests.fuse(),
|
||||
}.unwrap_or_else(move |e| error!("[{}] Connection broken: {}", server_addr, e))
|
||||
).map_err(|e| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!(
|
||||
"Could not spawn client dispatch task. Is shutdown: {}",
|
||||
e.is_shutdown()
|
||||
),
|
||||
)
|
||||
})?;
|
||||
}
|
||||
.unwrap_or_else(move |e| error!("[{}] Connection broken: {}", server_addr, e)),
|
||||
)
|
||||
.map_err(|e| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!(
|
||||
"Could not spawn client dispatch task. Is shutdown: {}",
|
||||
e.is_shutdown()
|
||||
),
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(Channel {
|
||||
to_dispatch,
|
||||
@@ -258,8 +307,8 @@ struct RequestDispatch<Req, Resp, C> {
|
||||
|
||||
impl<Req, Resp, C> RequestDispatch<Req, Resp, C>
|
||||
where
|
||||
Req: Send,
|
||||
Resp: Send,
|
||||
Req: marker::Send,
|
||||
Resp: marker::Send,
|
||||
C: Transport<Item = Response<Resp>, SinkItem = ClientMessage<Req>>,
|
||||
{
|
||||
unsafe_pinned!(server_addr: SocketAddr);
|
||||
@@ -462,8 +511,8 @@ where
|
||||
|
||||
impl<Req, Resp, C> Future for RequestDispatch<Req, Resp, C>
|
||||
where
|
||||
Req: Send,
|
||||
Resp: Send,
|
||||
Req: marker::Send,
|
||||
Resp: marker::Send,
|
||||
C: Transport<Item = Response<Resp>, SinkItem = ClientMessage<Req>>,
|
||||
{
|
||||
type Output = io::Result<()>;
|
||||
@@ -563,6 +612,185 @@ impl Stream for CanceledRequests {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
struct MapErrConnectionReset<Fut> {
|
||||
future: Fut,
|
||||
finished: Option<()>,
|
||||
}
|
||||
|
||||
impl<Fut> MapErrConnectionReset<Fut> {
|
||||
unsafe_pinned!(future: Fut);
|
||||
unsafe_unpinned!(finished: Option<()>);
|
||||
|
||||
fn new(future: Fut) -> MapErrConnectionReset<Fut> {
|
||||
MapErrConnectionReset {
|
||||
future,
|
||||
finished: Some(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Fut: Unpin> Unpin for MapErrConnectionReset<Fut> {}
|
||||
|
||||
impl<Fut> Future for MapErrConnectionReset<Fut>
|
||||
where
|
||||
Fut: TryFuture,
|
||||
{
|
||||
type Output = io::Result<Fut::Ok>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
match self.future().try_poll(lw) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(result) => {
|
||||
self.finished().take().expect(
|
||||
"MapErrConnectionReset must not be polled after it returned `Poll::Ready`",
|
||||
);
|
||||
Poll::Ready(result.map_err(|_| io::Error::from(io::ErrorKind::ConnectionReset)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
struct MapOkDispatchResponse<Fut, Resp> {
|
||||
future: Fut,
|
||||
response: Option<DispatchResponse<Resp>>,
|
||||
}
|
||||
|
||||
impl<Fut, Resp> MapOkDispatchResponse<Fut, Resp> {
|
||||
unsafe_pinned!(future: Fut);
|
||||
unsafe_unpinned!(response: Option<DispatchResponse<Resp>>);
|
||||
|
||||
fn new(future: Fut, response: DispatchResponse<Resp>) -> MapOkDispatchResponse<Fut, Resp> {
|
||||
MapOkDispatchResponse {
|
||||
future,
|
||||
response: Some(response),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Fut: Unpin, Resp> Unpin for MapOkDispatchResponse<Fut, Resp> {}
|
||||
|
||||
impl<Fut, Resp> Future for MapOkDispatchResponse<Fut, Resp>
|
||||
where
|
||||
Fut: TryFuture,
|
||||
{
|
||||
type Output = Result<DispatchResponse<Resp>, Fut::Error>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
match self.future().try_poll(lw) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(result) => {
|
||||
let response = self
|
||||
.response()
|
||||
.take()
|
||||
.expect("MapOk must not be polled after it returned `Poll::Ready`");
|
||||
Poll::Ready(result.map(|_| response))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
struct AndThenIdent<Fut1, Fut2> {
|
||||
try_chain: TryChain<Fut1, Fut2>,
|
||||
}
|
||||
|
||||
impl<Fut1, Fut2> AndThenIdent<Fut1, Fut2>
|
||||
where
|
||||
Fut1: TryFuture<Ok = Fut2>,
|
||||
Fut2: TryFuture,
|
||||
{
|
||||
unsafe_pinned!(try_chain: TryChain<Fut1, Fut2>);
|
||||
|
||||
/// Creates a new `Then`.
|
||||
fn new(future: Fut1) -> AndThenIdent<Fut1, Fut2> {
|
||||
AndThenIdent {
|
||||
try_chain: TryChain::new(future),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Fut1, Fut2> Future for AndThenIdent<Fut1, Fut2>
|
||||
where
|
||||
Fut1: TryFuture<Ok = Fut2>,
|
||||
Fut2: TryFuture<Error = Fut1::Error>,
|
||||
{
|
||||
type Output = Result<Fut2::Ok, Fut2::Error>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
self.try_chain().poll(lw, |result| match result {
|
||||
Ok(ok) => TryChainAction::Future(ok),
|
||||
Err(err) => TryChainAction::Output(Err(err)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
#[derive(Debug)]
|
||||
enum TryChain<Fut1, Fut2> {
|
||||
First(Fut1),
|
||||
Second(Fut2),
|
||||
Empty,
|
||||
}
|
||||
|
||||
enum TryChainAction<Fut2>
|
||||
where
|
||||
Fut2: TryFuture,
|
||||
{
|
||||
Future(Fut2),
|
||||
Output(Result<Fut2::Ok, Fut2::Error>),
|
||||
}
|
||||
|
||||
impl<Fut1, Fut2> TryChain<Fut1, Fut2>
|
||||
where
|
||||
Fut1: TryFuture<Ok = Fut2>,
|
||||
Fut2: TryFuture,
|
||||
{
|
||||
fn new(fut1: Fut1) -> TryChain<Fut1, Fut2> {
|
||||
TryChain::First(fut1)
|
||||
}
|
||||
|
||||
fn poll<F>(self: Pin<&mut Self>, lw: &LocalWaker, f: F) -> Poll<Result<Fut2::Ok, Fut2::Error>>
|
||||
where
|
||||
F: FnOnce(Result<Fut1::Ok, Fut1::Error>) -> TryChainAction<Fut2>,
|
||||
{
|
||||
let mut f = Some(f);
|
||||
|
||||
// Safe to call `get_mut_unchecked` because we won't move the futures.
|
||||
let this = unsafe { Pin::get_mut_unchecked(self) };
|
||||
|
||||
loop {
|
||||
let output = match this {
|
||||
TryChain::First(fut1) => {
|
||||
// Poll the first future
|
||||
match unsafe { Pin::new_unchecked(fut1) }.try_poll(lw) {
|
||||
Poll::Pending => return Poll::Pending,
|
||||
Poll::Ready(output) => output,
|
||||
}
|
||||
}
|
||||
TryChain::Second(fut2) => {
|
||||
// Poll the second future
|
||||
return unsafe { Pin::new_unchecked(fut2) }.try_poll(lw);
|
||||
}
|
||||
TryChain::Empty => {
|
||||
panic!("future must not be polled after it returned `Poll::Ready`");
|
||||
}
|
||||
};
|
||||
|
||||
*this = TryChain::Empty; // Drop fut1
|
||||
let f = f.take().unwrap();
|
||||
match f(output) {
|
||||
TryChainAction::Future(fut2) => *this = TryChain::Second(fut2),
|
||||
TryChainAction::Output(output) => return Poll::Ready(output),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{CanceledRequests, Channel, RequestCancellation, RequestDispatch};
|
||||
@@ -573,9 +801,10 @@ mod tests {
|
||||
ClientMessage, Response,
|
||||
};
|
||||
use fnv::FnvHashMap;
|
||||
use futures::{Poll, channel::mpsc, prelude::*};
|
||||
use futures_test::task::{noop_local_waker_ref};
|
||||
use futures::{channel::mpsc, prelude::*, Poll};
|
||||
use futures_test::task::noop_local_waker_ref;
|
||||
use std::{
|
||||
marker,
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr},
|
||||
pin::Pin,
|
||||
sync::atomic::AtomicU64,
|
||||
@@ -617,7 +846,8 @@ mod tests {
|
||||
.send(context::current(), "hi".into())
|
||||
.boxed()
|
||||
.compat(),
|
||||
).unwrap();
|
||||
)
|
||||
.unwrap();
|
||||
drop(resp);
|
||||
drop(channel);
|
||||
|
||||
@@ -640,7 +870,8 @@ mod tests {
|
||||
.send(context::current(), "hi".into())
|
||||
.boxed()
|
||||
.compat(),
|
||||
).unwrap();
|
||||
)
|
||||
.unwrap();
|
||||
drop(resp);
|
||||
drop(channel);
|
||||
|
||||
@@ -688,7 +919,7 @@ mod tests {
|
||||
|
||||
impl<T, E> PollTest for Poll<Option<Result<T, E>>>
|
||||
where
|
||||
E: ::std::fmt::Display + Send + 'static,
|
||||
E: ::std::fmt::Display + marker::Send + 'static,
|
||||
{
|
||||
type T = Option<T>;
|
||||
|
||||
@@ -6,27 +6,103 @@
|
||||
|
||||
//! Provides a client that connects to a server and sends multiplexed requests.
|
||||
|
||||
use crate::{context::Context, ClientMessage, Response, Transport};
|
||||
use crate::{context, ClientMessage, Response, Transport};
|
||||
use futures::prelude::*;
|
||||
use log::warn;
|
||||
use std::{
|
||||
io,
|
||||
net::{Ipv4Addr, SocketAddr},
|
||||
};
|
||||
|
||||
mod dispatch;
|
||||
/// Provides a [`Client`] backed by a transport.
|
||||
pub mod channel;
|
||||
pub use self::channel::Channel;
|
||||
|
||||
/// Sends multiplexed requests to, and receives responses from, a server.
|
||||
#[derive(Debug)]
|
||||
pub struct Client<Req, Resp> {
|
||||
/// Channel to send requests to the dispatch task.
|
||||
channel: dispatch::Channel<Req, Resp>,
|
||||
pub trait Client<'a, Req> {
|
||||
/// The response type.
|
||||
type Response;
|
||||
|
||||
/// The future response.
|
||||
type Future: Future<Output = io::Result<Self::Response>> + 'a;
|
||||
|
||||
/// Initiates a request, sending it to the dispatch task.
|
||||
///
|
||||
/// Returns a [`Future`] that resolves to this client and the future response
|
||||
/// once the request is successfully enqueued.
|
||||
///
|
||||
/// [`Future`]: futures::Future
|
||||
fn call(&'a mut self, ctx: context::Context, request: Req) -> Self::Future;
|
||||
|
||||
/// Returns a Client that applies a post-processing function to the returned response.
|
||||
fn map_response<F, R>(self, f: F) -> MapResponse<Self, F>
|
||||
where
|
||||
F: FnMut(Self::Response) -> R,
|
||||
Self: Sized,
|
||||
{
|
||||
MapResponse { inner: self, f }
|
||||
}
|
||||
|
||||
/// Returns a Client that applies a pre-processing function to the request.
|
||||
fn with_request<F, Req2>(self, f: F) -> WithRequest<Self, F>
|
||||
where
|
||||
F: FnMut(Req2) -> Req,
|
||||
Self: Sized,
|
||||
{
|
||||
WithRequest { inner: self, f }
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp> Clone for Client<Req, Resp> {
|
||||
fn clone(&self) -> Self {
|
||||
Client {
|
||||
channel: self.channel.clone(),
|
||||
}
|
||||
/// A Client that applies a function to the returned response.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct MapResponse<C, F> {
|
||||
inner: C,
|
||||
f: F,
|
||||
}
|
||||
|
||||
impl<'a, C, F, Req, Resp, Resp2> Client<'a, Req> for MapResponse<C, F>
|
||||
where
|
||||
C: Client<'a, Req, Response = Resp>,
|
||||
F: FnMut(Resp) -> Resp2 + 'a,
|
||||
{
|
||||
type Response = Resp2;
|
||||
type Future = futures::future::MapOk<<C as Client<'a, Req>>::Future, &'a mut F>;
|
||||
|
||||
fn call(&'a mut self, ctx: context::Context, request: Req) -> Self::Future {
|
||||
self.inner.call(ctx, request).map_ok(&mut self.f)
|
||||
}
|
||||
}
|
||||
|
||||
/// A Client that applies a pre-processing function to the request.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct WithRequest<C, F> {
|
||||
inner: C,
|
||||
f: F,
|
||||
}
|
||||
|
||||
impl<'a, C, F, Req, Req2, Resp> Client<'a, Req2> for WithRequest<C, F>
|
||||
where
|
||||
C: Client<'a, Req, Response = Resp>,
|
||||
F: FnMut(Req2) -> Req,
|
||||
{
|
||||
type Response = Resp;
|
||||
type Future = <C as Client<'a, Req>>::Future;
|
||||
|
||||
fn call(&'a mut self, ctx: context::Context, request: Req2) -> Self::Future {
|
||||
self.inner.call(ctx, (self.f)(request))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, Req, Resp> Client<'a, Req> for Channel<Req, Resp>
|
||||
where
|
||||
Req: 'a,
|
||||
Resp: 'a,
|
||||
{
|
||||
type Response = Resp;
|
||||
type Future = channel::Call<'a, Req, Resp>;
|
||||
|
||||
fn call(&'a mut self, ctx: context::Context, request: Req) -> channel::Call<'a, Req, Resp> {
|
||||
self.call(ctx, request)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,39 +129,23 @@ impl Default for Config {
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp> Client<Req, Resp>
|
||||
/// Creates a new Client by wrapping a [`Transport`] and spawning a dispatch task
|
||||
/// that manages the lifecycle of requests.
|
||||
///
|
||||
/// Must only be called from on an executor.
|
||||
pub async fn new<Req, Resp, T>(config: Config, transport: T) -> io::Result<Channel<Req, Resp>>
|
||||
where
|
||||
Req: Send,
|
||||
Resp: Send,
|
||||
Req: Send + 'static,
|
||||
Resp: Send + 'static,
|
||||
T: Transport<Item = Response<Resp>, SinkItem = ClientMessage<Req>> + Send + 'static,
|
||||
{
|
||||
/// Creates a new Client by wrapping a [`Transport`] and spawning a dispatch task
|
||||
/// that manages the lifecycle of requests.
|
||||
///
|
||||
/// Must only be called from on an executor.
|
||||
pub async fn new<T>(config: Config, transport: T) -> io::Result<Self>
|
||||
where
|
||||
T: Transport<Item = Response<Resp>, SinkItem = ClientMessage<Req>> + Send,
|
||||
{
|
||||
let server_addr = transport.peer_addr().unwrap_or_else(|e| {
|
||||
warn!(
|
||||
"Setting peer to unspecified because peer could not be determined: {}",
|
||||
e
|
||||
);
|
||||
SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0)
|
||||
});
|
||||
let server_addr = transport.peer_addr().unwrap_or_else(|e| {
|
||||
warn!(
|
||||
"Setting peer to unspecified because peer could not be determined: {}",
|
||||
e
|
||||
);
|
||||
SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0)
|
||||
});
|
||||
|
||||
Ok(Client {
|
||||
channel: await!(dispatch::spawn(config, transport, server_addr))?,
|
||||
})
|
||||
}
|
||||
|
||||
/// Initiates a request, sending it to the dispatch task.
|
||||
///
|
||||
/// Returns a [`Future`] that resolves to this client and the future response
|
||||
/// once the request is successfully enqueued.
|
||||
///
|
||||
/// [`Future`]: futures::Future
|
||||
pub async fn call(&mut self, ctx: Context, request: Req) -> io::Result<Resp> {
|
||||
await!(self.channel.call(ctx, request))
|
||||
}
|
||||
Ok(await!(channel::spawn(config, transport, server_addr))?)
|
||||
}
|
||||
|
||||
@@ -5,21 +5,14 @@
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
#![feature(
|
||||
const_fn,
|
||||
non_exhaustive,
|
||||
integer_atomics,
|
||||
try_trait,
|
||||
nll,
|
||||
futures_api,
|
||||
pin,
|
||||
arbitrary_self_types,
|
||||
await_macro,
|
||||
async_await,
|
||||
generators,
|
||||
optin_builtin_traits,
|
||||
generator_trait,
|
||||
gen_future,
|
||||
decl_macro,
|
||||
)]
|
||||
#![deny(missing_docs, missing_debug_implementations)]
|
||||
|
||||
@@ -49,7 +42,10 @@ pub(crate) mod util;
|
||||
|
||||
pub use crate::{client::Client, server::Server, transport::Transport};
|
||||
|
||||
use futures::{Future, task::{Spawn, SpawnExt, SpawnError}};
|
||||
use futures::{
|
||||
task::{Spawn, SpawnError, SpawnExt},
|
||||
Future,
|
||||
};
|
||||
use std::{cell::RefCell, io, sync::Once, time::SystemTime};
|
||||
|
||||
/// A message from a client to a server.
|
||||
@@ -193,9 +189,7 @@ pub fn init(spawn: impl Spawn + Clone + 'static) {
|
||||
}
|
||||
|
||||
pub(crate) fn spawn(future: impl Future<Output = ()> + Send + 'static) -> Result<(), SpawnError> {
|
||||
SPAWN.with(|spawn| {
|
||||
spawn.borrow_mut().spawn(future)
|
||||
})
|
||||
SPAWN.with(|spawn| spawn.borrow_mut().spawn(future))
|
||||
}
|
||||
|
||||
trait CloneSpawn: Spawn {
|
||||
|
||||
@@ -10,7 +10,13 @@ use crate::{
|
||||
ClientMessage, Response, Transport,
|
||||
};
|
||||
use fnv::FnvHashMap;
|
||||
use futures::{channel::mpsc, prelude::*, ready, stream::Fuse, task::{LocalWaker, Poll}};
|
||||
use futures::{
|
||||
channel::mpsc,
|
||||
prelude::*,
|
||||
ready,
|
||||
stream::Fuse,
|
||||
task::{LocalWaker, Poll},
|
||||
};
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use pin_utils::unsafe_pinned;
|
||||
use std::{
|
||||
@@ -205,10 +211,7 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_closed_connections(
|
||||
self: &mut Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
) -> Poll<io::Result<()>> {
|
||||
fn poll_closed_connections(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
match ready!(self.closed_connections_rx().poll_next_unpin(cx)) {
|
||||
Some(addr) => {
|
||||
self.handle_closed_connection(&addr);
|
||||
|
||||
@@ -43,6 +43,12 @@ pub struct Server<Req, Resp> {
|
||||
ghost: PhantomData<(Req, Resp)>,
|
||||
}
|
||||
|
||||
impl<Req, Resp> Default for Server<Req, Resp> {
|
||||
fn default() -> Self {
|
||||
new(Config::default())
|
||||
}
|
||||
}
|
||||
|
||||
/// Settings that control the behavior of the server.
|
||||
#[non_exhaustive]
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -75,15 +81,15 @@ impl Default for Config {
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp> Server<Req, Resp> {
|
||||
/// Returns a new server with configuration specified `config`.
|
||||
pub fn new(config: Config) -> Self {
|
||||
Server {
|
||||
config,
|
||||
ghost: PhantomData,
|
||||
}
|
||||
/// Returns a new server with configuration specified `config`.
|
||||
pub fn new<Req, Resp>(config: Config) -> Server<Req, Resp> {
|
||||
Server {
|
||||
config,
|
||||
ghost: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp> Server<Req, Resp> {
|
||||
/// Returns the config for this server.
|
||||
pub fn config(&self) -> &Config {
|
||||
&self.config
|
||||
@@ -122,7 +128,7 @@ where
|
||||
Req: Send + 'static,
|
||||
Resp: Send + 'static,
|
||||
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send + 'static,
|
||||
F: FnMut(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
{
|
||||
type Output = ();
|
||||
@@ -132,7 +138,8 @@ where
|
||||
match channel {
|
||||
Ok(channel) => {
|
||||
let peer = channel.client_addr;
|
||||
if let Err(e) = crate::spawn(channel.respond_with(self.request_handler().clone()))
|
||||
if let Err(e) =
|
||||
crate::spawn(channel.respond_with(self.request_handler().clone()))
|
||||
{
|
||||
warn!("[{}] Failed to spawn connection handler: {:?}", peer, e);
|
||||
}
|
||||
@@ -158,7 +165,7 @@ where
|
||||
/// Responds to all requests with `request_handler`.
|
||||
fn respond_with<F, Fut>(self, request_handler: F) -> Running<Self, F>
|
||||
where
|
||||
F: FnMut(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
{
|
||||
Running {
|
||||
@@ -222,21 +229,18 @@ where
|
||||
Req: Send,
|
||||
Resp: Send,
|
||||
{
|
||||
pub(crate) fn start_send(self: &mut Pin<&mut Self>, response: Response<Resp>) -> io::Result<()> {
|
||||
pub(crate) fn start_send(
|
||||
self: &mut Pin<&mut Self>,
|
||||
response: Response<Resp>,
|
||||
) -> io::Result<()> {
|
||||
self.transport().start_send(response)
|
||||
}
|
||||
|
||||
pub(crate) fn poll_ready(
|
||||
self: &mut Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
) -> Poll<io::Result<()>> {
|
||||
pub(crate) fn poll_ready(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
self.transport().poll_ready(cx)
|
||||
}
|
||||
|
||||
pub(crate) fn poll_flush(
|
||||
self: &mut Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
) -> Poll<io::Result<()>> {
|
||||
pub(crate) fn poll_flush(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
self.transport().poll_flush(cx)
|
||||
}
|
||||
|
||||
@@ -256,7 +260,7 @@ where
|
||||
/// responses and resolves when the connection is closed.
|
||||
pub fn respond_with<F, Fut>(self, f: F) -> impl Future<Output = ()>
|
||||
where
|
||||
F: FnMut(Context, Req) -> Fut + Send + 'static,
|
||||
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
Req: 'static,
|
||||
Resp: 'static,
|
||||
@@ -271,7 +275,8 @@ where
|
||||
pending_responses: responses,
|
||||
responses_tx,
|
||||
in_flight_requests: FnvHashMap::default(),
|
||||
}.unwrap_or_else(move |e| {
|
||||
}
|
||||
.unwrap_or_else(move |e| {
|
||||
info!("[{}] ClientHandler errored out: {}", peer, e);
|
||||
})
|
||||
}
|
||||
@@ -305,7 +310,7 @@ where
|
||||
Req: Send + 'static,
|
||||
Resp: Send + 'static,
|
||||
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
|
||||
F: FnMut(Context, Req) -> Fut + Send + 'static,
|
||||
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
{
|
||||
/// If at max in-flight requests, check that there's room to immediately write a throttled
|
||||
@@ -462,7 +467,7 @@ where
|
||||
let mut response_tx = self.responses_tx().clone();
|
||||
|
||||
let trace_id = *ctx.trace_id();
|
||||
let response = self.f()(ctx.clone(), request);
|
||||
let response = self.f().clone()(ctx.clone(), request);
|
||||
let response = deadline_compat::Deadline::new(response, Instant::now() + timeout).then(
|
||||
async move |result| {
|
||||
let response = Response {
|
||||
@@ -477,16 +482,15 @@ where
|
||||
},
|
||||
);
|
||||
let (abortable_response, abort_handle) = abortable(response);
|
||||
crate::spawn(abortable_response.map(|_| ()))
|
||||
.map_err(|e| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!(
|
||||
"Could not spawn response task. Is shutdown: {}",
|
||||
e.is_shutdown()
|
||||
),
|
||||
)
|
||||
})?;
|
||||
crate::spawn(abortable_response.map(|_| ())).map_err(|e| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!(
|
||||
"Could not spawn response task. Is shutdown: {}",
|
||||
e.is_shutdown()
|
||||
),
|
||||
)
|
||||
})?;
|
||||
self.in_flight_requests().insert(request_id, abort_handle);
|
||||
Ok(())
|
||||
}
|
||||
@@ -521,7 +525,7 @@ where
|
||||
Req: Send + 'static,
|
||||
Resp: Send + 'static,
|
||||
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
|
||||
F: FnMut(Context, Req) -> Fut + Send + 'static,
|
||||
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone,
|
||||
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
{
|
||||
type Output = io::Result<()>;
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
//! Transports backed by in-memory channels.
|
||||
|
||||
use crate::Transport;
|
||||
use futures::{channel::mpsc, task::{LocalWaker}, Poll, Sink, Stream};
|
||||
use futures::{channel::mpsc, task::LocalWaker, Poll, Sink, Stream};
|
||||
use pin_utils::unsafe_pinned;
|
||||
use std::pin::Pin;
|
||||
use std::{
|
||||
@@ -66,10 +66,7 @@ impl<Item, SinkItem> Sink for UnboundedChannel<Item, SinkItem> {
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
|
||||
}
|
||||
|
||||
fn poll_flush(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
) -> Poll<Result<(), Self::SinkError>> {
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<Result<(), Self::SinkError>> {
|
||||
self.tx()
|
||||
.poll_flush(cx)
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
|
||||
@@ -97,8 +94,12 @@ impl<Item, SinkItem> Transport for UnboundedChannel<Item, SinkItem> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{client::{self, Client}, context, server::{self, Handler, Server}, transport};
|
||||
use futures::{prelude::*, stream, compat::TokioDefaultSpawner};
|
||||
use crate::{
|
||||
client, context,
|
||||
server::{Handler, Server},
|
||||
transport,
|
||||
};
|
||||
use futures::{compat::TokioDefaultSpawner, prelude::*, stream};
|
||||
use log::trace;
|
||||
use std::io;
|
||||
|
||||
@@ -108,7 +109,7 @@ mod tests {
|
||||
crate::init(TokioDefaultSpawner);
|
||||
|
||||
let (client_channel, server_channel) = transport::channel::unbounded();
|
||||
let server = Server::<String, u64>::new(server::Config::default())
|
||||
let server = Server::<String, u64>::default()
|
||||
.incoming(stream::once(future::ready(Ok(server_channel))))
|
||||
.respond_with(|_ctx, request| {
|
||||
future::ready(request.parse::<u64>().map_err(|_| {
|
||||
@@ -120,7 +121,7 @@ mod tests {
|
||||
});
|
||||
|
||||
let responses = async {
|
||||
let mut client = await!(Client::new(client::Config::default(), client_channel))?;
|
||||
let mut client = await!(client::new(client::Config::default(), client_channel))?;
|
||||
|
||||
let response1 = await!(client.call(context::current(), "123".into()));
|
||||
let response2 = await!(client.call(context::current(), "abc".into()));
|
||||
|
||||
@@ -10,7 +10,12 @@
|
||||
//! can be plugged in, using whatever protocol it wants.
|
||||
|
||||
use futures::prelude::*;
|
||||
use std::{io, net::SocketAddr};
|
||||
use std::{
|
||||
io,
|
||||
net::SocketAddr,
|
||||
pin::Pin,
|
||||
task::{LocalWaker, Poll},
|
||||
};
|
||||
|
||||
pub mod channel;
|
||||
|
||||
@@ -30,3 +35,87 @@ where
|
||||
/// The address of the local half of this transport.
|
||||
fn local_addr(&self) -> io::Result<SocketAddr>;
|
||||
}
|
||||
|
||||
/// Returns a new Transport backed by the given Stream + Sink and connecting addresses.
|
||||
pub fn new<S, Item>(
|
||||
inner: S,
|
||||
peer_addr: SocketAddr,
|
||||
local_addr: SocketAddr,
|
||||
) -> impl Transport<Item = Item, SinkItem = S::SinkItem>
|
||||
where
|
||||
S: Stream<Item = io::Result<Item>>,
|
||||
S: Sink<SinkError = io::Error>,
|
||||
{
|
||||
TransportShim {
|
||||
inner,
|
||||
peer_addr,
|
||||
local_addr,
|
||||
}
|
||||
}
|
||||
|
||||
/// A transport created by adding peers to a Stream + Sink.
|
||||
#[derive(Debug)]
|
||||
struct TransportShim<S> {
|
||||
peer_addr: SocketAddr,
|
||||
local_addr: SocketAddr,
|
||||
inner: S,
|
||||
}
|
||||
|
||||
impl<S> TransportShim<S> {
|
||||
pin_utils::unsafe_pinned!(inner: S);
|
||||
}
|
||||
|
||||
impl<S> Stream for TransportShim<S>
|
||||
where
|
||||
S: Stream,
|
||||
{
|
||||
type Item = S::Item;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<S::Item>> {
|
||||
self.inner().poll_next(waker)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Sink for TransportShim<S>
|
||||
where
|
||||
S: Sink,
|
||||
{
|
||||
type SinkItem = S::SinkItem;
|
||||
type SinkError = S::SinkError;
|
||||
|
||||
fn start_send(mut self: Pin<&mut Self>, item: S::SinkItem) -> Result<(), S::SinkError> {
|
||||
self.inner().start_send(item)
|
||||
}
|
||||
|
||||
fn poll_ready(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
self.inner().poll_ready(waker)
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
self.inner().poll_flush(waker)
|
||||
}
|
||||
|
||||
fn poll_close(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
self.inner().poll_close(waker)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, Item> Transport for TransportShim<S>
|
||||
where
|
||||
S: Stream + Sink,
|
||||
Self: Stream<Item = io::Result<Item>>,
|
||||
Self: Sink<SinkItem = S::SinkItem, SinkError = io::Error>,
|
||||
{
|
||||
type Item = Item;
|
||||
type SinkItem = S::SinkItem;
|
||||
|
||||
/// The address of the remote peer this transport is in communication with.
|
||||
fn peer_addr(&self) -> io::Result<SocketAddr> {
|
||||
Ok(self.peer_addr)
|
||||
}
|
||||
|
||||
/// The address of the local half of this transport.
|
||||
fn local_addr(&self) -> io::Result<SocketAddr> {
|
||||
Ok(self.local_addr)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,8 @@
|
||||
use futures::{
|
||||
compat::{Compat01As03, Future01CompatExt},
|
||||
prelude::*,
|
||||
ready, task::{Poll, LocalWaker},
|
||||
ready,
|
||||
task::{LocalWaker, Poll},
|
||||
};
|
||||
use pin_utils::unsafe_pinned;
|
||||
use std::pin::Pin;
|
||||
@@ -50,7 +51,6 @@ where
|
||||
type Output = Result<T::Ok, timeout::Error<T::Error>>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Self::Output> {
|
||||
|
||||
// First, try polling the future
|
||||
match self.future().try_poll(waker) {
|
||||
Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)),
|
||||
|
||||
@@ -59,7 +59,8 @@ where
|
||||
Other => 16,
|
||||
UnexpectedEof => 17,
|
||||
_ => 16,
|
||||
}.serialize(serializer)
|
||||
}
|
||||
.serialize(serializer)
|
||||
}
|
||||
|
||||
/// Deserializes [`io::ErrorKind`] from a `u32`.
|
||||
|
||||
@@ -2,7 +2,7 @@ cargo-features = ["rename-dependency"]
|
||||
|
||||
[package]
|
||||
name = "tarpc"
|
||||
version = "0.13.0"
|
||||
version = "0.14.1"
|
||||
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
|
||||
edition = "2018"
|
||||
license = "MIT"
|
||||
@@ -24,15 +24,19 @@ travis-ci = { repository = "google/tarpc" }
|
||||
log = "0.4"
|
||||
serde = { optional = true, version = "1.0" }
|
||||
tarpc-plugins = { path = "../plugins", version = "0.5.0" }
|
||||
rpc = { package = "tarpc-lib", path = "../rpc", version = "0.1" }
|
||||
rpc = { package = "tarpc-lib", path = "../rpc", version = "0.2" }
|
||||
|
||||
[target.'cfg(not(test))'.dependencies]
|
||||
futures-preview = "0.3.0-alpha.8"
|
||||
futures-preview = "0.3.0-alpha.9"
|
||||
|
||||
[dev-dependencies]
|
||||
bincode = "1.0"
|
||||
bytes = { version = "0.4", features = ["serde"] }
|
||||
humantime = "1.0"
|
||||
futures-preview = { version = "0.3.0-alpha.8", features = ["compat", "tokio-compat"] }
|
||||
bincode-transport = { package = "tarpc-bincode-transport", version = "0.1", path = "../bincode-transport" }
|
||||
futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] }
|
||||
bincode-transport = { package = "tarpc-bincode-transport", version = "0.3", path = "../bincode-transport" }
|
||||
env_logger = "0.5"
|
||||
tokio = "0.1"
|
||||
tokio-executor = "0.1"
|
||||
tokio-tcp = "0.1"
|
||||
pin-utils = "0.1.0-alpha.3"
|
||||
|
||||
1
tarpc/README.md
Symbolic link
1
tarpc/README.md
Symbolic link
@@ -0,0 +1 @@
|
||||
../README.md
|
||||
@@ -11,7 +11,7 @@
|
||||
await_macro,
|
||||
async_await,
|
||||
existential_type,
|
||||
proc_macro_hygiene,
|
||||
proc_macro_hygiene
|
||||
)]
|
||||
|
||||
use futures::{
|
||||
@@ -55,7 +55,7 @@ struct Subscriber {
|
||||
impl subscriber::Service for Subscriber {
|
||||
type ReceiveFut = Ready<()>;
|
||||
|
||||
fn receive(&self, _: context::Context, message: String) -> Self::ReceiveFut {
|
||||
fn receive(self, _: context::Context, message: String) -> Self::ReceiveFut {
|
||||
println!("{} received message: {}", self.id, message);
|
||||
future::ready(())
|
||||
}
|
||||
@@ -66,13 +66,13 @@ impl Subscriber {
|
||||
let incoming = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
|
||||
let addr = incoming.local_addr();
|
||||
tokio_executor::spawn(
|
||||
Server::new(config)
|
||||
server::new(config)
|
||||
.incoming(incoming)
|
||||
.take(1)
|
||||
.respond_with(subscriber::serve(Subscriber { id }))
|
||||
.unit_error()
|
||||
.boxed()
|
||||
.compat()
|
||||
.compat(),
|
||||
);
|
||||
Ok(addr)
|
||||
}
|
||||
@@ -94,7 +94,7 @@ impl Publisher {
|
||||
impl publisher::Service for Publisher {
|
||||
existential type BroadcastFut: Future<Output = ()>;
|
||||
|
||||
fn broadcast(&self, _: context::Context, message: String) -> Self::BroadcastFut {
|
||||
fn broadcast(self, _: context::Context, message: String) -> Self::BroadcastFut {
|
||||
async fn broadcast(clients: Arc<Mutex<HashMap<u32, subscriber::Client>>>, message: String) {
|
||||
let mut clients = clients.lock().unwrap().clone();
|
||||
for client in clients.values_mut() {
|
||||
@@ -110,7 +110,7 @@ impl publisher::Service for Publisher {
|
||||
|
||||
existential type SubscribeFut: Future<Output = Result<(), String>>;
|
||||
|
||||
fn subscribe(&self, _: context::Context, id: u32, addr: SocketAddr) -> Self::SubscribeFut {
|
||||
fn subscribe(self, _: context::Context, id: u32, addr: SocketAddr) -> Self::SubscribeFut {
|
||||
async fn subscribe(
|
||||
clients: Arc<Mutex<HashMap<u32, subscriber::Client>>>,
|
||||
id: u32,
|
||||
@@ -128,7 +128,7 @@ impl publisher::Service for Publisher {
|
||||
|
||||
existential type UnsubscribeFut: Future<Output = ()>;
|
||||
|
||||
fn unsubscribe(&self, _: context::Context, id: u32) -> Self::UnsubscribeFut {
|
||||
fn unsubscribe(self, _: context::Context, id: u32) -> Self::UnsubscribeFut {
|
||||
println!("Unsubscribing {}", id);
|
||||
let mut clients = self.clients.lock().unwrap();
|
||||
if let None = clients.remove(&id) {
|
||||
@@ -146,13 +146,13 @@ async fn run() -> io::Result<()> {
|
||||
let transport = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
|
||||
let publisher_addr = transport.local_addr();
|
||||
tokio_executor::spawn(
|
||||
Server::new(server::Config::default())
|
||||
Server::default()
|
||||
.incoming(transport)
|
||||
.take(1)
|
||||
.respond_with(publisher::serve(Publisher::new()))
|
||||
.unit_error()
|
||||
.boxed()
|
||||
.compat()
|
||||
.compat(),
|
||||
);
|
||||
|
||||
let subscriber1 = await!(Subscriber::listen(0, server::Config::default()))?;
|
||||
@@ -180,12 +180,6 @@ async fn run() -> io::Result<()> {
|
||||
}
|
||||
|
||||
fn main() {
|
||||
tokio::run(
|
||||
run()
|
||||
.boxed()
|
||||
.map_err(|e| panic!(e))
|
||||
.boxed()
|
||||
.compat(),
|
||||
);
|
||||
tokio::run(run().boxed().map_err(|e| panic!(e)).boxed().compat());
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
}
|
||||
|
||||
@@ -10,16 +10,17 @@
|
||||
arbitrary_self_types,
|
||||
await_macro,
|
||||
async_await,
|
||||
proc_macro_hygiene,
|
||||
proc_macro_hygiene
|
||||
)]
|
||||
|
||||
use futures::{
|
||||
compat::TokioDefaultSpawner,
|
||||
future::{self, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
use rpc::{
|
||||
client, context,
|
||||
server::{self, Handler, Server},
|
||||
server::{Handler, Server},
|
||||
};
|
||||
use std::io;
|
||||
|
||||
@@ -41,7 +42,7 @@ impl Service for HelloServer {
|
||||
|
||||
type HelloFut = Ready<String>;
|
||||
|
||||
fn hello(&self, _: context::Context, name: String) -> Self::HelloFut {
|
||||
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
|
||||
future::ready(format!("Hello, {}!", name))
|
||||
}
|
||||
}
|
||||
@@ -53,7 +54,7 @@ async fn run() -> io::Result<()> {
|
||||
let addr = transport.local_addr();
|
||||
|
||||
// The server is configured with the defaults.
|
||||
let server = Server::new(server::Config::default())
|
||||
let server = Server::default()
|
||||
// Server can listen on any type that implements the Transport trait.
|
||||
.incoming(transport)
|
||||
// Close the stream after the client connects
|
||||
@@ -82,6 +83,8 @@ async fn run() -> io::Result<()> {
|
||||
}
|
||||
|
||||
fn main() {
|
||||
tarpc::init(TokioDefaultSpawner);
|
||||
|
||||
tokio::run(
|
||||
run()
|
||||
.map_err(|e| eprintln!("Oh no: {}", e))
|
||||
|
||||
@@ -11,17 +11,18 @@
|
||||
futures_api,
|
||||
await_macro,
|
||||
async_await,
|
||||
proc_macro_hygiene,
|
||||
proc_macro_hygiene
|
||||
)]
|
||||
|
||||
use crate::{add::Service as AddService, double::Service as DoubleService};
|
||||
use futures::{
|
||||
compat::TokioDefaultSpawner,
|
||||
future::{self, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
use rpc::{
|
||||
client, context,
|
||||
server::{self, Handler, Server},
|
||||
server::{Handler, Server},
|
||||
};
|
||||
use std::io;
|
||||
|
||||
@@ -45,7 +46,7 @@ struct AddServer;
|
||||
impl AddService for AddServer {
|
||||
type AddFut = Ready<i32>;
|
||||
|
||||
fn add(&self, _: context::Context, x: i32, y: i32) -> Self::AddFut {
|
||||
fn add(self, _: context::Context, x: i32, y: i32) -> Self::AddFut {
|
||||
future::ready(x + y)
|
||||
}
|
||||
}
|
||||
@@ -58,7 +59,7 @@ struct DoubleServer {
|
||||
impl DoubleService for DoubleServer {
|
||||
existential type DoubleFut: Future<Output = Result<i32, String>> + Send;
|
||||
|
||||
fn double(&self, _: context::Context, x: i32) -> Self::DoubleFut {
|
||||
fn double(self, _: context::Context, x: i32) -> Self::DoubleFut {
|
||||
async fn double(mut client: add::Client, x: i32) -> Result<i32, String> {
|
||||
let result = await!(client.add(context::current(), x, x));
|
||||
result.map_err(|e| e.to_string())
|
||||
@@ -71,7 +72,7 @@ impl DoubleService for DoubleServer {
|
||||
async fn run() -> io::Result<()> {
|
||||
let add_listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
|
||||
let addr = add_listener.local_addr();
|
||||
let add_server = Server::new(server::Config::default())
|
||||
let add_server = Server::default()
|
||||
.incoming(add_listener)
|
||||
.take(1)
|
||||
.respond_with(add::serve(AddServer));
|
||||
@@ -82,7 +83,7 @@ async fn run() -> io::Result<()> {
|
||||
|
||||
let double_listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
|
||||
let addr = double_listener.local_addr();
|
||||
let double_server = rpc::Server::new(server::Config::default())
|
||||
let double_server = rpc::Server::default()
|
||||
.incoming(double_listener)
|
||||
.take(1)
|
||||
.respond_with(double::serve(DoubleServer { add_client }));
|
||||
@@ -102,10 +103,6 @@ async fn run() -> io::Result<()> {
|
||||
|
||||
fn main() {
|
||||
env_logger::init();
|
||||
tokio::run(
|
||||
run()
|
||||
.map_err(|e| panic!(e))
|
||||
.boxed()
|
||||
.compat(),
|
||||
);
|
||||
tarpc::init(TokioDefaultSpawner);
|
||||
tokio::run(run().map_err(|e| panic!(e)).boxed().compat());
|
||||
}
|
||||
|
||||
411
tarpc/examples/service_registry.rs
Normal file
411
tarpc/examples/service_registry.rs
Normal file
@@ -0,0 +1,411 @@
|
||||
#![feature(
|
||||
pin,
|
||||
async_await,
|
||||
await_macro,
|
||||
futures_api,
|
||||
arbitrary_self_types,
|
||||
proc_macro_hygiene,
|
||||
impl_trait_in_bindings
|
||||
)]
|
||||
|
||||
mod registry {
|
||||
use bytes::Bytes;
|
||||
use futures::{
|
||||
future::{ready, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
io,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{LocalWaker, Poll},
|
||||
};
|
||||
use tarpc::{
|
||||
client::{self, Client},
|
||||
context,
|
||||
};
|
||||
|
||||
/// A request to a named service.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ServiceRequest {
|
||||
service_name: String,
|
||||
request: Bytes,
|
||||
}
|
||||
|
||||
/// A response from a named service.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ServiceResponse {
|
||||
response: Bytes,
|
||||
}
|
||||
|
||||
/// A list of registered services.
|
||||
pub struct Registry<Services> {
|
||||
registrations: Services,
|
||||
}
|
||||
|
||||
impl Default for Registry<Nil> {
|
||||
fn default() -> Self {
|
||||
Registry { registrations: Nil }
|
||||
}
|
||||
}
|
||||
|
||||
impl<Services: MaybeServe + Sync> Registry<Services> {
|
||||
/// Returns a function that serves requests for the registered services.
|
||||
pub fn serve(
|
||||
self,
|
||||
) -> impl FnOnce(context::Context, ServiceRequest)
|
||||
-> Either<Services::Future, Ready<io::Result<ServiceResponse>>>
|
||||
+ Clone {
|
||||
let registrations = Arc::new(self.registrations);
|
||||
move |cx, req: ServiceRequest| match registrations.serve(cx, &req) {
|
||||
Some(serve) => Either::Left(serve),
|
||||
None => Either::Right(ready(Err(io::Error::new(
|
||||
io::ErrorKind::NotFound,
|
||||
format!("Service '{}' not registered", req.service_name),
|
||||
)))),
|
||||
}
|
||||
}
|
||||
|
||||
/// Registers `serve` with the given `name` using the given serialization scheme.
|
||||
pub fn register<S, Req, Resp, RespFut, Ser, De>(
|
||||
self,
|
||||
name: String,
|
||||
serve: S,
|
||||
deserialize: De,
|
||||
serialize: Ser,
|
||||
) -> Registry<Registration<impl Serve + Send + 'static, Services>>
|
||||
where
|
||||
Req: Send,
|
||||
S: FnOnce(context::Context, Req) -> RespFut + Send + 'static + Clone,
|
||||
RespFut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
De: FnOnce(Bytes) -> io::Result<Req> + Send + 'static + Clone,
|
||||
Ser: FnOnce(Resp) -> io::Result<Bytes> + Send + 'static + Clone,
|
||||
{
|
||||
let registrations = Registration {
|
||||
name: name,
|
||||
serve: move |cx, req: Bytes| {
|
||||
async move {
|
||||
let req = deserialize.clone()(req)?;
|
||||
let response = await!(serve.clone()(cx, req))?;
|
||||
let response = serialize.clone()(response)?;
|
||||
Ok(ServiceResponse { response })
|
||||
}
|
||||
},
|
||||
rest: self.registrations,
|
||||
};
|
||||
Registry { registrations }
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a client that sends requests to a service
|
||||
/// named `service_name`, over the given channel, using
|
||||
/// the specified serialization scheme.
|
||||
pub fn new_client<Req, Resp, Ser, De>(
|
||||
service_name: String,
|
||||
channel: &client::Channel<ServiceRequest, ServiceResponse>,
|
||||
mut serialize: Ser,
|
||||
mut deserialize: De,
|
||||
) -> client::MapResponse<
|
||||
client::WithRequest<
|
||||
client::Channel<ServiceRequest, ServiceResponse>,
|
||||
impl FnMut(Req) -> ServiceRequest,
|
||||
>,
|
||||
impl FnMut(ServiceResponse) -> Resp,
|
||||
>
|
||||
where
|
||||
Req: Send + 'static,
|
||||
Resp: Send + 'static,
|
||||
De: FnMut(Bytes) -> io::Result<Resp> + Clone + Send + 'static,
|
||||
Ser: FnMut(Req) -> io::Result<Bytes> + Clone + Send + 'static,
|
||||
{
|
||||
channel
|
||||
.clone()
|
||||
.with_request(move |req| {
|
||||
ServiceRequest {
|
||||
service_name: service_name.clone(),
|
||||
// TODO: shouldn't need to unwrap here. Maybe with_request should allow for
|
||||
// returning Result.
|
||||
request: serialize(req).unwrap(),
|
||||
}
|
||||
})
|
||||
// TODO: same thing. Maybe this should be more like and_then rather than map.
|
||||
.map_response(move |resp| deserialize(resp.response).unwrap())
|
||||
}
|
||||
|
||||
/// Serves a request.
|
||||
///
|
||||
/// This trait is mostly an implementation detail that isn't used outside of the registry
|
||||
/// internals.
|
||||
pub trait Serve: Clone + Send + 'static {
|
||||
type Response: Future<Output = io::Result<ServiceResponse>> + Send + 'static;
|
||||
fn serve(self, cx: context::Context, request: Bytes) -> Self::Response;
|
||||
}
|
||||
|
||||
/// Serves a request if the request is for a registered service.
|
||||
///
|
||||
/// This trait is mostly an implementation detail that isn't used outside of the registry
|
||||
/// internals.
|
||||
pub trait MaybeServe: Send + 'static {
|
||||
type Future: Future<Output = io::Result<ServiceResponse>> + Send + 'static;
|
||||
|
||||
fn serve(&self, cx: context::Context, request: &ServiceRequest) -> Option<Self::Future>;
|
||||
}
|
||||
|
||||
/// A registry starting with service S, followed by Rest.
|
||||
///
|
||||
/// This type is mostly an implementation detail that is not used directly
|
||||
/// outside of the registry internals.
|
||||
pub struct Registration<S, Rest> {
|
||||
/// The registered service's name. Must be unique across all registered services.
|
||||
name: String,
|
||||
/// The registered service.
|
||||
serve: S,
|
||||
/// Any remaining registered services.
|
||||
rest: Rest,
|
||||
}
|
||||
|
||||
/// An empty registry.
|
||||
///
|
||||
/// This type is mostly an implementation detail that is not used directly
|
||||
/// outside of the registry internals.
|
||||
pub struct Nil;
|
||||
|
||||
impl MaybeServe for Nil {
|
||||
type Future = futures::future::Ready<io::Result<ServiceResponse>>;
|
||||
|
||||
fn serve(&self, _: context::Context, _: &ServiceRequest) -> Option<Self::Future> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, Rest> MaybeServe for Registration<S, Rest>
|
||||
where
|
||||
S: Serve,
|
||||
Rest: MaybeServe,
|
||||
{
|
||||
type Future = Either<S::Response, Rest::Future>;
|
||||
|
||||
fn serve(&self, cx: context::Context, request: &ServiceRequest) -> Option<Self::Future> {
|
||||
if self.name == request.service_name {
|
||||
Some(Either::Left(
|
||||
self.serve.clone().serve(cx, request.request.clone()),
|
||||
))
|
||||
} else {
|
||||
self.rest.serve(cx, request).map(Either::Right)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Wraps either of two future types that both resolve to the same output type.
|
||||
#[derive(Debug)]
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub enum Either<Left, Right> {
|
||||
Left(Left),
|
||||
Right(Right),
|
||||
}
|
||||
|
||||
impl<Output, Left, Right> Future for Either<Left, Right>
|
||||
where
|
||||
Left: Future<Output = Output>,
|
||||
Right: Future<Output = Output>,
|
||||
{
|
||||
type Output = Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Output> {
|
||||
unsafe {
|
||||
match Pin::get_mut_unchecked(self) {
|
||||
Either::Left(car) => Pin::new_unchecked(car).poll(waker),
|
||||
Either::Right(cdr) => Pin::new_unchecked(cdr).poll(waker),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Resp, F> Serve for F
|
||||
where
|
||||
F: FnOnce(context::Context, Bytes) -> Resp + Clone + Send + 'static,
|
||||
Resp: Future<Output = io::Result<ServiceResponse>> + Send + 'static,
|
||||
{
|
||||
type Response = Resp;
|
||||
|
||||
fn serve(self, cx: context::Context, request: Bytes) -> Resp {
|
||||
self(cx, request)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Example
|
||||
use bytes::Bytes;
|
||||
use futures::{
|
||||
future::{ready, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
io,
|
||||
sync::{Arc, RwLock},
|
||||
};
|
||||
use tarpc::{client, context, server::Handler};
|
||||
|
||||
fn deserialize<Req>(req: Bytes) -> io::Result<Req>
|
||||
where
|
||||
Req: for<'a> Deserialize<'a> + Send,
|
||||
{
|
||||
bincode::deserialize(req.as_ref()).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
|
||||
}
|
||||
|
||||
fn serialize<Resp>(resp: Resp) -> io::Result<Bytes>
|
||||
where
|
||||
Resp: Serialize,
|
||||
{
|
||||
Ok(bincode::serialize(&resp)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
|
||||
.into())
|
||||
}
|
||||
|
||||
mod write_service {
|
||||
tarpc::service! {
|
||||
rpc write(key: String, value: String);
|
||||
}
|
||||
}
|
||||
|
||||
mod read_service {
|
||||
tarpc::service! {
|
||||
rpc read(key: String) -> Option<String>;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
struct Server {
|
||||
data: Arc<RwLock<HashMap<String, String>>>,
|
||||
}
|
||||
|
||||
impl write_service::Service for Server {
|
||||
type WriteFut = Ready<()>;
|
||||
|
||||
fn write(self, _: context::Context, key: String, value: String) -> Self::WriteFut {
|
||||
self.data.write().unwrap().insert(key, value);
|
||||
ready(())
|
||||
}
|
||||
}
|
||||
|
||||
impl read_service::Service for Server {
|
||||
type ReadFut = Ready<Option<String>>;
|
||||
|
||||
fn read(self, _: context::Context, key: String) -> Self::ReadFut {
|
||||
ready(self.data.read().unwrap().get(&key).cloned())
|
||||
}
|
||||
}
|
||||
|
||||
trait DefaultSpawn {
|
||||
fn spawn(self);
|
||||
}
|
||||
|
||||
impl<F> DefaultSpawn for F
|
||||
where
|
||||
F: Future<Output = ()> + Send + 'static,
|
||||
{
|
||||
fn spawn(self) {
|
||||
tokio_executor::spawn(self.unit_error().boxed().compat())
|
||||
}
|
||||
}
|
||||
|
||||
struct BincodeRegistry<Services> {
|
||||
registry: registry::Registry<Services>,
|
||||
}
|
||||
|
||||
impl Default for BincodeRegistry<registry::Nil> {
|
||||
fn default() -> Self {
|
||||
BincodeRegistry {
|
||||
registry: registry::Registry::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Services: registry::MaybeServe + Sync> BincodeRegistry<Services> {
|
||||
fn serve(
|
||||
self,
|
||||
) -> impl FnOnce(
|
||||
context::Context, registry::ServiceRequest
|
||||
) -> registry::Either<
|
||||
Services::Future,
|
||||
Ready<io::Result<registry::ServiceResponse>>,
|
||||
> + Clone {
|
||||
self.registry.serve()
|
||||
}
|
||||
|
||||
fn register<S, Req, Resp, RespFut>(
|
||||
self,
|
||||
name: String,
|
||||
serve: S,
|
||||
) -> BincodeRegistry<registry::Registration<impl registry::Serve + Send + 'static, Services>>
|
||||
where
|
||||
Req: for<'a> Deserialize<'a> + Send + 'static,
|
||||
Resp: Serialize + 'static,
|
||||
S: FnOnce(context::Context, Req) -> RespFut + Send + 'static + Clone,
|
||||
RespFut: Future<Output = io::Result<Resp>> + Send + 'static,
|
||||
{
|
||||
let registry = self.registry.register(name, serve, deserialize, serialize);
|
||||
BincodeRegistry { registry }
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_client<Req, Resp>(
|
||||
service_name: String,
|
||||
channel: &client::Channel<registry::ServiceRequest, registry::ServiceResponse>,
|
||||
) -> client::MapResponse<
|
||||
client::WithRequest<
|
||||
client::Channel<registry::ServiceRequest, registry::ServiceResponse>,
|
||||
impl FnMut(Req) -> registry::ServiceRequest,
|
||||
>,
|
||||
impl FnMut(registry::ServiceResponse) -> Resp,
|
||||
>
|
||||
where
|
||||
Req: Serialize + Send + 'static,
|
||||
Resp: for<'a> Deserialize<'a> + Send + 'static,
|
||||
{
|
||||
registry::new_client(service_name, channel, serialize, deserialize)
|
||||
}
|
||||
|
||||
async fn run() -> io::Result<()> {
|
||||
let server = Server::default();
|
||||
let registry = BincodeRegistry::default()
|
||||
.register(
|
||||
"WriteService".to_string(),
|
||||
write_service::serve(server.clone()),
|
||||
)
|
||||
.register(
|
||||
"ReadService".to_string(),
|
||||
read_service::serve(server.clone()),
|
||||
);
|
||||
|
||||
let listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
|
||||
let server_addr = listener.local_addr();
|
||||
let server = tarpc::Server::default()
|
||||
.incoming(listener)
|
||||
.take(1)
|
||||
.respond_with(registry.serve());
|
||||
tokio_executor::spawn(server.unit_error().boxed().compat());
|
||||
|
||||
let transport = await!(bincode_transport::connect(&server_addr))?;
|
||||
let channel = await!(client::new(client::Config::default(), transport))?;
|
||||
|
||||
let write_client = new_client("WriteService".to_string(), &channel);
|
||||
let mut write_client = write_service::Client::from(write_client);
|
||||
|
||||
let read_client = new_client("ReadService".to_string(), &channel);
|
||||
let mut read_client = read_service::Client::from(read_client);
|
||||
|
||||
await!(write_client.write(context::current(), "key".to_string(), "val".to_string()))?;
|
||||
let val = await!(read_client.read(context::current(), "key".to_string()))?;
|
||||
println!("{:?}", val);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn main() {
|
||||
tarpc::init(futures::compat::TokioDefaultSpawner);
|
||||
tokio::run(run().boxed().map_err(|e| panic!(e)).boxed().compat());
|
||||
}
|
||||
@@ -1 +1 @@
|
||||
edition = "Edition2018"
|
||||
edition = "2018"
|
||||
|
||||
126
tarpc/src/lib.rs
126
tarpc/src/lib.rs
@@ -4,123 +4,19 @@
|
||||
// license that can be found in the LICENSE file or at
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
//! tarpc is an RPC framework for rust with a focus on ease of use. Defining a
|
||||
//! service can be done in just a few lines of code, and most of the boilerplate of
|
||||
//! writing a server is taken care of for you.
|
||||
//!
|
||||
//! ## What is an RPC framework?
|
||||
//! "RPC" stands for "Remote Procedure Call," a function call where the work of
|
||||
//! producing the return value is being done somewhere else. When an rpc function is
|
||||
//! invoked, behind the scenes the function contacts some other process somewhere
|
||||
//! and asks them to evaluate the function instead. The original function then
|
||||
//! returns the value produced by the other process.
|
||||
//!
|
||||
//! RPC frameworks are a fundamental building block of most microservices-oriented
|
||||
//! architectures. Two well-known ones are [gRPC](http://www.grpc.io) and
|
||||
//! [Cap'n Proto](https://capnproto.org/).
|
||||
//!
|
||||
//! tarpc differentiates itself from other RPC frameworks by defining the schema in code,
|
||||
//! rather than in a separate language such as .proto. This means there's no separate compilation
|
||||
//! process, and no cognitive context switching between different languages. Additionally, it
|
||||
//! works with the community-backed library serde: any serde-serializable type can be used as
|
||||
//! arguments to tarpc fns.
|
||||
//!
|
||||
//! ## Example
|
||||
//!
|
||||
//! Here's a small service.
|
||||
//!
|
||||
//! ```rust
|
||||
//! #![feature(futures_api, pin, arbitrary_self_types, await_macro, async_await, proc_macro_hygiene)]
|
||||
//!
|
||||
//!
|
||||
//! use futures::{
|
||||
//! compat::TokioDefaultSpawner,
|
||||
//! future::{self, Ready},
|
||||
//! prelude::*,
|
||||
//! };
|
||||
//! use tarpc::{
|
||||
//! client, context,
|
||||
//! server::{self, Handler, Server},
|
||||
//! };
|
||||
//! use std::io;
|
||||
//!
|
||||
//! // This is the service definition. It looks a lot like a trait definition.
|
||||
//! // It defines one RPC, hello, which takes one arg, name, and returns a String.
|
||||
//! tarpc::service! {
|
||||
//! /// Returns a greeting for name.
|
||||
//! rpc hello(name: String) -> String;
|
||||
//! }
|
||||
//!
|
||||
//! // This is the type that implements the generated Service trait. It is the business logic
|
||||
//! // and is used to start the server.
|
||||
//! #[derive(Clone)]
|
||||
//! struct HelloServer;
|
||||
//!
|
||||
//! impl Service for HelloServer {
|
||||
//! // Each defined rpc generates two items in the trait, a fn that serves the RPC, and
|
||||
//! // an associated type representing the future output by the fn.
|
||||
//!
|
||||
//! type HelloFut = Ready<String>;
|
||||
//!
|
||||
//! fn hello(&self, _: context::Context, name: String) -> Self::HelloFut {
|
||||
//! future::ready(format!("Hello, {}!", name))
|
||||
//! }
|
||||
//! }
|
||||
//!
|
||||
//! async fn run() -> io::Result<()> {
|
||||
//! // bincode_transport is provided by the associated crate bincode-transport. It makes it easy
|
||||
//! // to start up a serde-powered bincode serialization strategy over TCP.
|
||||
//! let transport = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
|
||||
//! let addr = transport.local_addr();
|
||||
//!
|
||||
//! // The server is configured with the defaults.
|
||||
//! let server = Server::new(server::Config::default())
|
||||
//! // Server can listen on any type that implements the Transport trait.
|
||||
//! .incoming(transport)
|
||||
//! // Close the stream after the client connects
|
||||
//! .take(1)
|
||||
//! // serve is generated by the service! macro. It takes as input any type implementing
|
||||
//! // the generated Service trait.
|
||||
//! .respond_with(serve(HelloServer));
|
||||
//!
|
||||
//! tokio_executor::spawn(server.unit_error().boxed().compat());
|
||||
//!
|
||||
//! let transport = await!(bincode_transport::connect(&addr))?;
|
||||
//!
|
||||
//! // new_stub is generated by the service! macro. Like Server, it takes a config and any
|
||||
//! // Transport as input, and returns a Client, also generated by the macro.
|
||||
//! // by the service mcro.
|
||||
//! let mut client = await!(new_stub(client::Config::default(), transport))?;
|
||||
//!
|
||||
//! // The client has an RPC method for each RPC defined in service!. It takes the same args
|
||||
//! // as defined, with the addition of a Context, which is always the first arg. The Context
|
||||
//! // specifies a deadline and trace information which can be helpful in debugging requests.
|
||||
//! let hello = await!(client.hello(context::current(), "Stim".to_string()))?;
|
||||
//!
|
||||
//! println!("{}", hello);
|
||||
//!
|
||||
//! Ok(())
|
||||
//! }
|
||||
//!
|
||||
//! fn main() {
|
||||
//! tarpc::init(TokioDefaultSpawner);
|
||||
//! tokio::run(run()
|
||||
//! .map_err(|e| eprintln!("Oh no: {}", e))
|
||||
//! .boxed()
|
||||
//! .compat(),
|
||||
//! );
|
||||
//! }
|
||||
//! ```
|
||||
|
||||
#![doc(include = "../README.md")]
|
||||
#![deny(missing_docs, missing_debug_implementations)]
|
||||
#![feature(
|
||||
futures_api,
|
||||
pin,
|
||||
await_macro,
|
||||
async_await,
|
||||
decl_macro,
|
||||
#![feature(async_await, external_doc)]
|
||||
#![cfg_attr(
|
||||
test,
|
||||
feature(
|
||||
pin,
|
||||
futures_api,
|
||||
await_macro,
|
||||
proc_macro_hygiene,
|
||||
arbitrary_self_types
|
||||
)
|
||||
)]
|
||||
#![cfg_attr(test, feature(proc_macro_hygiene, arbitrary_self_types))]
|
||||
|
||||
#[doc(hidden)]
|
||||
pub use futures;
|
||||
|
||||
@@ -51,6 +51,9 @@ macro_rules! add_serde_if_enabled {
|
||||
///
|
||||
#[macro_export]
|
||||
macro_rules! service {
|
||||
() => {
|
||||
compile_error!("Must define at least one RPC method.");
|
||||
};
|
||||
// Entry point
|
||||
(
|
||||
$(
|
||||
@@ -112,24 +115,26 @@ macro_rules! service {
|
||||
)*
|
||||
) => {
|
||||
$crate::add_serde_if_enabled! {
|
||||
/// The request sent over the wire from the client to the server.
|
||||
#[derive(Debug)]
|
||||
#[doc(hidden)]
|
||||
#[allow(non_camel_case_types, unused)]
|
||||
--
|
||||
pub enum Request__ {
|
||||
pub enum Request {
|
||||
$(
|
||||
$(#[$attr])*
|
||||
$fn_name{ $($arg: $in_,)* }
|
||||
),*
|
||||
}
|
||||
}
|
||||
|
||||
$crate::add_serde_if_enabled! {
|
||||
/// The response sent over the wire from the server to the client.
|
||||
#[derive(Debug)]
|
||||
#[doc(hidden)]
|
||||
#[allow(non_camel_case_types, unused)]
|
||||
--
|
||||
pub enum Response__ {
|
||||
pub enum Response {
|
||||
$(
|
||||
$(#[$attr])*
|
||||
$fn_name($out)
|
||||
),*
|
||||
}
|
||||
@@ -149,37 +154,39 @@ macro_rules! service {
|
||||
}
|
||||
|
||||
$(#[$attr])*
|
||||
fn $fn_name(&self, ctx: $crate::context::Context, $($arg:$in_),*) -> $crate::ty_snake_to_camel!(Self::$fn_name);
|
||||
fn $fn_name(self, ctx: $crate::context::Context, $($arg:$in_),*) -> $crate::ty_snake_to_camel!(Self::$fn_name);
|
||||
)*
|
||||
}
|
||||
|
||||
// TODO: use an existential type instead of this when existential types work.
|
||||
/// A future resolving to a server [`Response`].
|
||||
#[allow(non_camel_case_types)]
|
||||
pub enum Response<S: Service> {
|
||||
pub enum ResponseFut<S: Service> {
|
||||
$(
|
||||
$(#[$attr])*
|
||||
$fn_name($crate::ty_snake_to_camel!(<S as Service>::$fn_name)),
|
||||
)*
|
||||
}
|
||||
|
||||
impl<S: Service> ::std::fmt::Debug for Response<S> {
|
||||
impl<S: Service> ::std::fmt::Debug for ResponseFut<S> {
|
||||
fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
|
||||
fmt.debug_struct("Response").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: Service> ::std::future::Future for Response<S> {
|
||||
type Output = ::std::io::Result<Response__>;
|
||||
impl<S: Service> ::std::future::Future for ResponseFut<S> {
|
||||
type Output = ::std::io::Result<Response>;
|
||||
|
||||
fn poll(self: ::std::pin::Pin<&mut Self>, waker: &::std::task::LocalWaker)
|
||||
-> ::std::task::Poll<::std::io::Result<Response__>>
|
||||
-> ::std::task::Poll<::std::io::Result<Response>>
|
||||
{
|
||||
unsafe {
|
||||
match ::std::pin::Pin::get_mut_unchecked(self) {
|
||||
$(
|
||||
Response::$fn_name(resp) =>
|
||||
ResponseFut::$fn_name(resp) =>
|
||||
::std::pin::Pin::new_unchecked(resp)
|
||||
.poll(waker)
|
||||
.map(Response__::$fn_name)
|
||||
.map(Response::$fn_name)
|
||||
.map(Ok),
|
||||
)*
|
||||
}
|
||||
@@ -189,13 +196,13 @@ macro_rules! service {
|
||||
|
||||
/// Returns a serving function to use with rpc::server::Server.
|
||||
pub fn serve<S: Service>(service: S)
|
||||
-> impl FnMut($crate::context::Context, Request__) -> Response<S> + Send + 'static + Clone {
|
||||
-> impl FnOnce($crate::context::Context, Request) -> ResponseFut<S> + Send + 'static + Clone {
|
||||
move |ctx, req| {
|
||||
match req {
|
||||
$(
|
||||
Request__::$fn_name{ $($arg,)* } => {
|
||||
let resp = Service::$fn_name(&mut service.clone(), ctx, $($arg),*);
|
||||
Response::$fn_name(resp)
|
||||
Request::$fn_name{ $($arg,)* } => {
|
||||
let resp = Service::$fn_name(service.clone(), ctx, $($arg),*);
|
||||
ResponseFut::$fn_name(resp)
|
||||
}
|
||||
)*
|
||||
}
|
||||
@@ -205,30 +212,40 @@ macro_rules! service {
|
||||
#[allow(unused)]
|
||||
#[derive(Clone, Debug)]
|
||||
/// The client stub that makes RPC calls to the server. Exposes a Future interface.
|
||||
pub struct Client($crate::client::Client<Request__, Response__>);
|
||||
pub struct Client<C = $crate::client::Channel<Request, Response>>(C);
|
||||
|
||||
/// Returns a new client stub that sends requests over the given transport.
|
||||
pub async fn new_stub<T>(config: $crate::client::Config, transport: T)
|
||||
-> ::std::io::Result<Client>
|
||||
where
|
||||
T: $crate::Transport<
|
||||
Item = $crate::Response<Response__>,
|
||||
SinkItem = $crate::ClientMessage<Request__>> + Send,
|
||||
Item = $crate::Response<Response>,
|
||||
SinkItem = $crate::ClientMessage<Request>> + Send + 'static,
|
||||
{
|
||||
Ok(Client(await!($crate::client::Client::new(config, transport))?))
|
||||
Ok(Client(await!($crate::client::new(config, transport))?))
|
||||
}
|
||||
|
||||
impl Client {
|
||||
impl<C> From<C> for Client<C>
|
||||
where for <'a> C: $crate::Client<'a, Request, Response = Response>
|
||||
{
|
||||
fn from(client: C) -> Self {
|
||||
Client(client)
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> Client<C>
|
||||
where for<'a> C: $crate::Client<'a, Request, Response = Response>
|
||||
{
|
||||
$(
|
||||
#[allow(unused)]
|
||||
$(#[$attr])*
|
||||
pub fn $fn_name(&mut self, ctx: $crate::context::Context, $($arg: $in_),*)
|
||||
-> impl ::std::future::Future<Output = ::std::io::Result<$out>> + '_ {
|
||||
let request__ = Request__::$fn_name { $($arg,)* };
|
||||
let resp = self.0.call(ctx, request__);
|
||||
let request__ = Request::$fn_name { $($arg,)* };
|
||||
let resp = $crate::Client::call(&mut self.0, ctx, request__);
|
||||
async move {
|
||||
match await!(resp)? {
|
||||
Response__::$fn_name(msg__) => ::std::result::Result::Ok(msg__),
|
||||
Response::$fn_name(msg__) => ::std::result::Result::Ok(msg__),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
@@ -269,11 +286,7 @@ mod functional_test {
|
||||
future::{ready, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
use rpc::{
|
||||
client, context,
|
||||
server::{self, Handler},
|
||||
transport::channel,
|
||||
};
|
||||
use rpc::{client, context, server::Handler, transport::channel};
|
||||
use std::io;
|
||||
use tokio::runtime::current_thread;
|
||||
|
||||
@@ -288,13 +301,13 @@ mod functional_test {
|
||||
impl Service for Server {
|
||||
type AddFut = Ready<i32>;
|
||||
|
||||
fn add(&self, _: context::Context, x: i32, y: i32) -> Self::AddFut {
|
||||
fn add(self, _: context::Context, x: i32, y: i32) -> Self::AddFut {
|
||||
ready(x + y)
|
||||
}
|
||||
|
||||
type HeyFut = Ready<String>;
|
||||
|
||||
fn hey(&self, _: context::Context, name: String) -> Self::HeyFut {
|
||||
fn hey(self, _: context::Context, name: String) -> Self::HeyFut {
|
||||
ready(format!("Hey, {}.", name))
|
||||
}
|
||||
}
|
||||
@@ -307,12 +320,12 @@ mod functional_test {
|
||||
let test = async {
|
||||
let (tx, rx) = channel::unbounded();
|
||||
tokio_executor::spawn(
|
||||
rpc::Server::new(server::Config::default())
|
||||
crate::Server::default()
|
||||
.incoming(stream::once(ready(Ok(rx))))
|
||||
.respond_with(serve(Server))
|
||||
.unit_error()
|
||||
.boxed()
|
||||
.compat()
|
||||
.compat(),
|
||||
);
|
||||
|
||||
let mut client = await!(new_stub(client::Config::default(), tx))?;
|
||||
@@ -336,12 +349,12 @@ mod functional_test {
|
||||
let test = async {
|
||||
let (tx, rx) = channel::unbounded();
|
||||
tokio_executor::spawn(
|
||||
rpc::Server::new(server::Config::default())
|
||||
rpc::Server::default()
|
||||
.incoming(stream::once(ready(Ok(rx))))
|
||||
.respond_with(serve(Server))
|
||||
.unit_error()
|
||||
.boxed()
|
||||
.compat()
|
||||
.compat(),
|
||||
);
|
||||
|
||||
let client = await!(new_stub(client::Config::default(), tx))?;
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
generators,
|
||||
await_macro,
|
||||
async_await,
|
||||
proc_macro_hygiene,
|
||||
proc_macro_hygiene
|
||||
)]
|
||||
|
||||
extern crate test;
|
||||
@@ -22,7 +22,7 @@ use self::test::stats::Stats;
|
||||
use futures::{compat::TokioDefaultSpawner, future, prelude::*};
|
||||
use rpc::{
|
||||
client, context,
|
||||
server::{self, Handler, Server},
|
||||
server::{Handler, Server},
|
||||
};
|
||||
use std::{
|
||||
io,
|
||||
@@ -41,7 +41,7 @@ struct Serve;
|
||||
impl ack::Service for Serve {
|
||||
type AckFut = future::Ready<()>;
|
||||
|
||||
fn ack(&self, _: context::Context) -> Self::AckFut {
|
||||
fn ack(self, _: context::Context) -> Self::AckFut {
|
||||
future::ready(())
|
||||
}
|
||||
}
|
||||
@@ -51,13 +51,13 @@ async fn bench() -> io::Result<()> {
|
||||
let addr = listener.local_addr();
|
||||
|
||||
tokio_executor::spawn(
|
||||
Server::new(server::Config::default())
|
||||
Server::default()
|
||||
.incoming(listener)
|
||||
.take(1)
|
||||
.respond_with(ack::serve(Serve))
|
||||
.unit_error()
|
||||
.boxed()
|
||||
.compat()
|
||||
.compat(),
|
||||
);
|
||||
|
||||
let conn = await!(bincode_transport::connect(&addr))?;
|
||||
@@ -122,10 +122,5 @@ fn bench_small_packet() {
|
||||
env_logger::init();
|
||||
tarpc::init(TokioDefaultSpawner);
|
||||
|
||||
tokio::run(
|
||||
bench()
|
||||
.map_err(|e| panic!(e.to_string()))
|
||||
.boxed()
|
||||
.compat(),
|
||||
)
|
||||
tokio::run(bench().map_err(|e| panic!(e.to_string())).boxed().compat())
|
||||
}
|
||||
|
||||
@@ -1 +1 @@
|
||||
edition = "Edition2018"
|
||||
edition = "2018"
|
||||
|
||||
Reference in New Issue
Block a user