mirror of
https://github.com/OMGeeky/tarpc.git
synced 2025-12-29 23:55:59 +01:00
Replace tokio-serde-bincode with async-bincode
This commit is contained in:
@@ -2,7 +2,7 @@ cargo-features = ["rename-dependency"]
|
||||
|
||||
[package]
|
||||
name = "tarpc-bincode-transport"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
authors = ["Tim Kuehn <tikue@google.com>"]
|
||||
edition = '2018'
|
||||
license = "MIT"
|
||||
@@ -16,16 +16,13 @@ description = "A bincode-based transport for tarpc services."
|
||||
|
||||
[dependencies]
|
||||
bincode = { version = "1.0", features = ["i128"] }
|
||||
bytes = "0.4"
|
||||
futures_legacy = { version = "0.1", package = "futures" }
|
||||
pin-utils = "0.1.0-alpha.2"
|
||||
rpc = { package = "tarpc-lib", version = "0.1", path = "../rpc", features = ["serde1"] }
|
||||
serde = "1.0"
|
||||
tokio = "0.1"
|
||||
tokio-io = "0.1"
|
||||
tokio-serde-bincode = "0.1"
|
||||
async-bincode = "0.4"
|
||||
tokio-tcp = "0.1"
|
||||
tokio-serde = "0.2"
|
||||
|
||||
[target.'cfg(not(test))'.dependencies]
|
||||
futures-preview = { version = "0.3.0-alpha.8", features = ["compat"] }
|
||||
|
||||
145
bincode-transport/src/compat.rs
Normal file
145
bincode-transport/src/compat.rs
Normal file
@@ -0,0 +1,145 @@
|
||||
use futures::{compat::Stream01CompatExt, prelude::*, ready};
|
||||
use futures_legacy::{
|
||||
executor::{
|
||||
self as executor01, Notify as Notify01, NotifyHandle as NotifyHandle01,
|
||||
UnsafeNotify as UnsafeNotify01,
|
||||
},
|
||||
Async as Async01, AsyncSink as AsyncSink01, Stream as Stream01, Sink as Sink01
|
||||
};
|
||||
use std::{pin::Pin, task::{self, LocalWaker, Poll}};
|
||||
|
||||
/// A shim to convert a 0.1 Sink + Stream to a 0.3 Sink + Stream.
|
||||
#[derive(Debug)]
|
||||
pub struct Compat<S, SinkItem> {
|
||||
staged_item: Option<SinkItem>,
|
||||
inner: S,
|
||||
}
|
||||
|
||||
impl<S, SinkItem> Compat<S, SinkItem> {
|
||||
/// Returns a new Compat.
|
||||
pub fn new(inner: S) -> Self {
|
||||
Compat { inner, staged_item: None }
|
||||
}
|
||||
|
||||
/// Unwraps Compat, returning the inner value.
|
||||
pub fn into_inner(self) -> S {
|
||||
self.inner
|
||||
}
|
||||
|
||||
/// Returns a reference to the value wrapped by Compat.
|
||||
pub fn get_ref(&self) -> &S {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, SinkItem> Stream for Compat<S, SinkItem>
|
||||
where
|
||||
S: Stream01
|
||||
{
|
||||
type Item = Result<S::Item, S::Error>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<Self::Item>> {
|
||||
unsafe {
|
||||
let inner = &mut Pin::get_mut_unchecked(self).inner;
|
||||
let mut compat = inner.compat();
|
||||
let compat = Pin::new_unchecked(&mut compat);
|
||||
match ready!(compat.poll_next(waker)) {
|
||||
None => Poll::Ready(None),
|
||||
Some(Ok(next)) => Poll::Ready(Some(Ok(next))),
|
||||
Some(Err(e)) => Poll::Ready(Some(Err(e))),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, SinkItem> Sink for Compat<S, SinkItem>
|
||||
where
|
||||
S: Sink01<SinkItem = SinkItem>,
|
||||
{
|
||||
type SinkItem = SinkItem;
|
||||
type SinkError = S::SinkError;
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> Result<(), S::SinkError> {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
assert!(me.staged_item.is_none());
|
||||
me.staged_item = Some(item);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
let notify = &WakerToHandle(waker);
|
||||
|
||||
executor01::with_notify(notify, 0, move || {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
match me.staged_item.take() {
|
||||
Some(staged_item) => match me.inner.start_send(staged_item) {
|
||||
Ok(AsyncSink01::Ready) => Poll::Ready(Ok(())),
|
||||
Ok(AsyncSink01::NotReady(item)) => {
|
||||
me.staged_item = Some(item);
|
||||
Poll::Pending
|
||||
}
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
},
|
||||
None => Poll::Ready(Ok(())),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
let notify = &WakerToHandle(waker);
|
||||
|
||||
executor01::with_notify(notify, 0, move || {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
match me.inner.poll_complete() {
|
||||
Ok(Async01::Ready(())) => Poll::Ready(Ok(())),
|
||||
Ok(Async01::NotReady) => Poll::Pending,
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
let notify = &WakerToHandle(waker);
|
||||
|
||||
executor01::with_notify(notify, 0, move || {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
match me.inner.close() {
|
||||
Ok(Async01::Ready(())) => Poll::Ready(Ok(())),
|
||||
Ok(Async01::NotReady) => Poll::Pending,
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct WakerToHandle<'a>(&'a LocalWaker);
|
||||
|
||||
#[derive(Debug)]
|
||||
struct NotifyWaker(task::Waker);
|
||||
|
||||
impl Notify01 for NotifyWaker {
|
||||
fn notify(&self, _: usize) {
|
||||
self.0.wake();
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl UnsafeNotify01 for NotifyWaker {
|
||||
unsafe fn clone_raw(&self) -> NotifyHandle01 {
|
||||
let ptr = Box::new(NotifyWaker(self.0.clone()));
|
||||
|
||||
NotifyHandle01::new(Box::into_raw(ptr))
|
||||
}
|
||||
|
||||
unsafe fn drop_raw(&self) {
|
||||
let ptr: *const dyn UnsafeNotify01 = self;
|
||||
drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<WakerToHandle<'a>> for NotifyHandle01 {
|
||||
fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 {
|
||||
unsafe { NotifyWaker(handle.0.clone().into_waker()).clone_raw() }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,75 +10,127 @@
|
||||
futures_api,
|
||||
pin,
|
||||
arbitrary_self_types,
|
||||
underscore_imports,
|
||||
await_macro,
|
||||
async_await,
|
||||
)]
|
||||
#![deny(missing_docs, missing_debug_implementations)]
|
||||
|
||||
mod vendored;
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use crate::vendored::tokio_serde_bincode::{IoErrorWrapper, ReadBincode, WriteBincode};
|
||||
use futures::{
|
||||
Poll,
|
||||
compat::{Compat01As03, Future01CompatExt, Stream01CompatExt},
|
||||
prelude::*,
|
||||
ready, task,
|
||||
};
|
||||
use futures_legacy::{
|
||||
executor::{
|
||||
self as executor01, Notify as Notify01, NotifyHandle as NotifyHandle01,
|
||||
UnsafeNotify as UnsafeNotify01,
|
||||
},
|
||||
sink::SinkMapErr as SinkMapErr01,
|
||||
sink::With as With01,
|
||||
stream::MapErr as MapErr01,
|
||||
Async as Async01, AsyncSink as AsyncSink01, Sink as Sink01, Stream as Stream01,
|
||||
};
|
||||
use async_bincode::{AsyncBincodeStream, AsyncDestination};
|
||||
use futures::{compat::{Compat01As03, Future01CompatExt, Stream01CompatExt}, prelude::*, ready};
|
||||
use pin_utils::unsafe_pinned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{fmt, io, marker::PhantomData, net::SocketAddr, pin::Pin, task::LocalWaker};
|
||||
use tokio::codec::{Framed, LengthDelimitedCodec, length_delimited};
|
||||
use tokio_tcp::{self, TcpListener, TcpStream};
|
||||
use serde::{Serialize, Deserialize};
|
||||
use self::compat::Compat;
|
||||
use std::{error::Error, io, marker::PhantomData, net::SocketAddr, pin::Pin, task::{LocalWaker, Poll}};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_tcp::{TcpListener, TcpStream};
|
||||
|
||||
/// Returns a new bincode transport that reads from and writes to `io`.
|
||||
pub fn new<Item, SinkItem>(io: TcpStream) -> Transport<Item, SinkItem>
|
||||
where
|
||||
Item: for<'de> Deserialize<'de>,
|
||||
SinkItem: Serialize,
|
||||
{
|
||||
let peer_addr = io.peer_addr();
|
||||
let local_addr = io.local_addr();
|
||||
let inner = length_delimited::Builder::new()
|
||||
.max_frame_length(8_000_000)
|
||||
.new_framed(io)
|
||||
.map_err(IoErrorWrapper as _)
|
||||
.sink_map_err(IoErrorWrapper as _)
|
||||
.with(freeze as _);
|
||||
let inner = WriteBincode::new(inner);
|
||||
let inner = ReadBincode::new(inner);
|
||||
mod compat;
|
||||
|
||||
Transport {
|
||||
inner,
|
||||
staged_item: None,
|
||||
peer_addr,
|
||||
local_addr,
|
||||
/// A transport that serializes to, and deserializes from, a [`TcpStream`].
|
||||
#[derive(Debug)]
|
||||
pub struct Transport<S, Item, SinkItem> {
|
||||
inner: Compat<AsyncBincodeStream<S, Item, SinkItem, AsyncDestination>, SinkItem>
|
||||
}
|
||||
|
||||
impl<S, Item, SinkItem> Transport<S, Item, SinkItem> {
|
||||
/// Returns the transport underlying the bincode transport.
|
||||
pub fn into_inner(self) -> S {
|
||||
self.inner.into_inner().into_inner()
|
||||
}
|
||||
}
|
||||
|
||||
fn freeze(bytes: BytesMut) -> Result<Bytes, IoErrorWrapper> {
|
||||
Ok(bytes.freeze())
|
||||
impl<S, Item, SinkItem> Transport<S, Item, SinkItem> {
|
||||
unsafe_pinned!(inner: Compat<AsyncBincodeStream<S, Item, SinkItem, AsyncDestination>, SinkItem>);
|
||||
}
|
||||
|
||||
/// Connects to `addr`, wrapping the connection in a bincode transport.
|
||||
pub async fn connect<Item, SinkItem>(addr: &SocketAddr) -> io::Result<Transport<Item, SinkItem>>
|
||||
impl<S, Item, SinkItem> Stream for Transport<S, Item, SinkItem>
|
||||
where
|
||||
S: AsyncRead,
|
||||
Item: for<'a> Deserialize<'a>,
|
||||
{
|
||||
type Item = io::Result<Item>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<io::Result<Item>>> {
|
||||
match self.inner().poll_next(waker) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
Poll::Ready(Some(Ok(next))) => Poll::Ready(Some(Ok(next))),
|
||||
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(io::Error::new(io::ErrorKind::Other, e)))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, Item, SinkItem> Sink for Transport<S, Item, SinkItem>
|
||||
where
|
||||
S: AsyncWrite,
|
||||
SinkItem: Serialize,
|
||||
{
|
||||
type SinkItem = SinkItem;
|
||||
type SinkError = io::Error;
|
||||
|
||||
fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
|
||||
self.inner()
|
||||
.start_send(item)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
|
||||
}
|
||||
|
||||
fn poll_ready(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
convert(self.inner().poll_ready(waker))
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
convert(self.inner().poll_flush(waker))
|
||||
}
|
||||
|
||||
fn poll_close(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
convert(self.inner().poll_close(waker))
|
||||
}
|
||||
}
|
||||
|
||||
fn convert<E: Into<Box<Error + Send + Sync>>>(poll: Poll<Result<(), E>>) -> Poll<io::Result<()>> {
|
||||
match poll {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))),
|
||||
}
|
||||
}
|
||||
|
||||
impl<Item, SinkItem> rpc::Transport for Transport<TcpStream, Item, SinkItem>
|
||||
where
|
||||
Item: for<'de> Deserialize<'de>,
|
||||
SinkItem: Serialize,
|
||||
{
|
||||
let stream = await!(TcpStream::connect(addr).compat())?;
|
||||
Ok(new(stream))
|
||||
type Item = Item;
|
||||
type SinkItem = SinkItem;
|
||||
|
||||
fn peer_addr(&self) -> io::Result<SocketAddr> {
|
||||
self.inner.get_ref().get_ref().peer_addr()
|
||||
}
|
||||
|
||||
fn local_addr(&self) -> io::Result<SocketAddr> {
|
||||
self.inner.get_ref().get_ref().local_addr()
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a new bincode transport that reads from and writes to `io`.
|
||||
pub fn new<Item, SinkItem>(io: TcpStream) -> Transport<TcpStream, Item, SinkItem>
|
||||
where
|
||||
Item: for<'de> Deserialize<'de>,
|
||||
SinkItem: Serialize,
|
||||
{
|
||||
Transport {
|
||||
inner: Compat::new(AsyncBincodeStream::from(io).for_async()),
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// Connects to `addr`, wrapping the connection in a bincode transport.
|
||||
pub async fn connect<Item, SinkItem>(addr: &SocketAddr) -> io::Result<Transport<TcpStream, Item, SinkItem>>
|
||||
where
|
||||
Item: for<'de> Deserialize<'de>,
|
||||
SinkItem: Serialize,
|
||||
{
|
||||
Ok(new(await!(TcpStream::connect(addr).compat())?))
|
||||
}
|
||||
|
||||
/// Listens on `addr`, wrapping accepted connections in bincode transports.
|
||||
@@ -119,7 +171,7 @@ where
|
||||
Item: for<'a> Deserialize<'a>,
|
||||
SinkItem: Serialize,
|
||||
{
|
||||
type Item = io::Result<Transport<Item, SinkItem>>;
|
||||
type Item = io::Result<Transport<TcpStream, Item, SinkItem>>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<Self::Item>> {
|
||||
let next = ready!(self.incoming().poll_next(waker)?);
|
||||
@@ -127,160 +179,3 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// A transport that serializes to, and deserializes from, a [`TcpStream`].
|
||||
pub struct Transport<Item, SinkItem> {
|
||||
inner: ReadBincode<
|
||||
WriteBincode<
|
||||
With01<
|
||||
SinkMapErr01<
|
||||
MapErr01<
|
||||
Framed<tokio_tcp::TcpStream, LengthDelimitedCodec>,
|
||||
fn(std::io::Error) -> IoErrorWrapper,
|
||||
>,
|
||||
fn(std::io::Error) -> IoErrorWrapper,
|
||||
>,
|
||||
BytesMut,
|
||||
fn(BytesMut) -> Result<Bytes, IoErrorWrapper>,
|
||||
Result<Bytes, IoErrorWrapper>
|
||||
>,
|
||||
SinkItem,
|
||||
>,
|
||||
Item,
|
||||
>,
|
||||
staged_item: Option<SinkItem>,
|
||||
peer_addr: io::Result<SocketAddr>,
|
||||
local_addr: io::Result<SocketAddr>,
|
||||
}
|
||||
|
||||
impl<Item, SinkItem> fmt::Debug for Transport<Item, SinkItem> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "Transport")
|
||||
}
|
||||
}
|
||||
|
||||
impl<Item, SinkItem> Stream for Transport<Item, SinkItem>
|
||||
where
|
||||
Item: for<'a> Deserialize<'a>,
|
||||
{
|
||||
type Item = io::Result<Item>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<io::Result<Item>>> {
|
||||
unsafe {
|
||||
let inner = &mut Pin::get_mut_unchecked(self).inner;
|
||||
let mut compat = inner.compat();
|
||||
let compat = Pin::new_unchecked(&mut compat);
|
||||
match ready!(compat.poll_next(waker)) {
|
||||
None => Poll::Ready(None),
|
||||
Some(Ok(next)) => Poll::Ready(Some(Ok(next))),
|
||||
Some(Err(e)) => Poll::Ready(Some(Err(e.0))),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Item, SinkItem> Sink for Transport<Item, SinkItem>
|
||||
where
|
||||
SinkItem: Serialize,
|
||||
{
|
||||
type SinkItem = SinkItem;
|
||||
type SinkError = io::Error;
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
assert!(me.staged_item.is_none());
|
||||
me.staged_item = Some(item);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
let notify = &WakerToHandle(waker);
|
||||
|
||||
executor01::with_notify(notify, 0, move || {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
match me.staged_item.take() {
|
||||
Some(staged_item) => match me.inner.start_send(staged_item)? {
|
||||
AsyncSink01::Ready => Poll::Ready(Ok(())),
|
||||
AsyncSink01::NotReady(item) => {
|
||||
me.staged_item = Some(item);
|
||||
Poll::Pending
|
||||
}
|
||||
},
|
||||
None => Poll::Ready(Ok(())),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
let notify = &WakerToHandle(waker);
|
||||
|
||||
executor01::with_notify(notify, 0, move || {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
match me.inner.poll_complete()? {
|
||||
Async01::Ready(()) => Poll::Ready(Ok(())),
|
||||
Async01::NotReady => Poll::Pending,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
let notify = &WakerToHandle(waker);
|
||||
|
||||
executor01::with_notify(notify, 0, move || {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
match me.inner.get_mut().close()? {
|
||||
Async01::Ready(()) => Poll::Ready(Ok(())),
|
||||
Async01::NotReady => Poll::Pending,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<Item, SinkItem> rpc::Transport for Transport<Item, SinkItem>
|
||||
where
|
||||
Item: for<'de> Deserialize<'de>,
|
||||
SinkItem: Serialize,
|
||||
{
|
||||
type Item = Item;
|
||||
type SinkItem = SinkItem;
|
||||
|
||||
fn peer_addr(&self) -> io::Result<SocketAddr> {
|
||||
// TODO: should just access from the inner transport.
|
||||
// https://github.com/alexcrichton/tokio-serde-bincode/issues/4
|
||||
Ok(*self.peer_addr.as_ref().unwrap())
|
||||
}
|
||||
|
||||
fn local_addr(&self) -> io::Result<SocketAddr> {
|
||||
Ok(*self.local_addr.as_ref().unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct WakerToHandle<'a>(&'a LocalWaker);
|
||||
|
||||
#[derive(Debug)]
|
||||
struct NotifyWaker(task::Waker);
|
||||
|
||||
impl Notify01 for NotifyWaker {
|
||||
fn notify(&self, _: usize) {
|
||||
self.0.wake();
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl UnsafeNotify01 for NotifyWaker {
|
||||
unsafe fn clone_raw(&self) -> NotifyHandle01 {
|
||||
let ptr = Box::new(NotifyWaker(self.0.clone()));
|
||||
|
||||
NotifyHandle01::new(Box::into_raw(ptr))
|
||||
}
|
||||
|
||||
unsafe fn drop_raw(&self) {
|
||||
let ptr: *const dyn UnsafeNotify01 = self;
|
||||
drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<WakerToHandle<'a>> for NotifyHandle01 {
|
||||
fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 {
|
||||
unsafe { NotifyWaker(handle.0.clone().into_waker()).clone_raw() }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +0,0 @@
|
||||
// Copyright 2018 Google LLC
|
||||
//
|
||||
// Use of this source code is governed by an MIT-style
|
||||
// license that can be found in the LICENSE file or at
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
pub(crate) mod tokio_serde_bincode;
|
||||
@@ -1,224 +0,0 @@
|
||||
//! `Stream` and `Sink` adaptors for serializing and deserializing values using
|
||||
//! Bincode.
|
||||
//!
|
||||
//! This crate provides adaptors for going from a stream or sink of buffers
|
||||
//! ([`Bytes`]) to a stream or sink of values by performing Bincode encoding or
|
||||
//! decoding. It is expected that each yielded buffer contains a single
|
||||
//! serialized Bincode value. The specific strategy by which this is done is left
|
||||
//! up to the user. One option is to use using [`length_delimited`] from
|
||||
//! [tokio-io].
|
||||
//!
|
||||
//! [`Bytes`]: https://docs.rs/bytes/0.4/bytes/struct.Bytes.html
|
||||
//! [`length_delimited`]: http://alexcrichton.com/tokio-io/tokio_io/codec/length_delimited/index.html
|
||||
//! [tokio-io]: http://github.com/alexcrichton/tokio-io
|
||||
//! [examples]: https://github.com/carllerche/tokio-serde-json/tree/master/examples
|
||||
|
||||
#![allow(missing_debug_implementations)]
|
||||
|
||||
use bincode::Error;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use futures_legacy::{Poll, Sink, StartSend, Stream};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io;
|
||||
use tokio_serde::{Deserializer, FramedRead, FramedWrite, Serializer};
|
||||
|
||||
use std::marker::PhantomData;
|
||||
|
||||
/// Adapts a stream of Bincode encoded buffers to a stream of values by
|
||||
/// deserializing them.
|
||||
///
|
||||
/// `ReadBincode` implements `Stream` by polling the inner buffer stream and
|
||||
/// deserializing the buffer as Bincode. It expects that each yielded buffer
|
||||
/// represents a single Bincode value and does not contain any extra trailing
|
||||
/// bytes.
|
||||
pub(crate) struct ReadBincode<T, U> {
|
||||
inner: FramedRead<T, U, Bincode<U>>,
|
||||
}
|
||||
|
||||
/// Adapts a buffer sink to a value sink by serializing the values as Bincode.
|
||||
///
|
||||
/// `WriteBincode` implements `Sink` by serializing the submitted values to a
|
||||
/// buffer. The buffer is then sent to the inner stream, which is responsible
|
||||
/// for handling framing on the wire.
|
||||
pub(crate) struct WriteBincode<T: Sink, U> {
|
||||
inner: FramedWrite<T, U, Bincode<U>>,
|
||||
}
|
||||
|
||||
struct Bincode<T> {
|
||||
ghost: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<T, U> ReadBincode<T, U>
|
||||
where
|
||||
T: Stream<Error = IoErrorWrapper>,
|
||||
U: for<'de> Deserialize<'de>,
|
||||
Bytes: From<T::Item>,
|
||||
{
|
||||
/// Creates a new `ReadBincode` with the given buffer stream.
|
||||
pub fn new(inner: T) -> ReadBincode<T, U> {
|
||||
let json = Bincode { ghost: PhantomData };
|
||||
ReadBincode {
|
||||
inner: FramedRead::new(inner, json),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> ReadBincode<T, U> {
|
||||
/// Returns a mutable reference to the underlying stream wrapped by
|
||||
/// `ReadBincode`.
|
||||
///
|
||||
/// Note that care should be taken to not tamper with the underlying stream
|
||||
/// of data coming in as it may corrupt the stream of frames otherwise
|
||||
/// being worked with.
|
||||
pub fn get_mut(&mut self) -> &mut T {
|
||||
self.inner.get_mut()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> Stream for ReadBincode<T, U>
|
||||
where
|
||||
T: Stream<Error = IoErrorWrapper>,
|
||||
U: for<'de> Deserialize<'de>,
|
||||
Bytes: From<T::Item>,
|
||||
{
|
||||
type Item = U;
|
||||
type Error = <T as Stream>::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<U>, Self::Error> {
|
||||
self.inner.poll()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> Sink for ReadBincode<T, U>
|
||||
where
|
||||
T: Sink,
|
||||
{
|
||||
type SinkItem = T::SinkItem;
|
||||
type SinkError = T::SinkError;
|
||||
|
||||
fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
|
||||
self.get_mut().start_send(item)
|
||||
}
|
||||
|
||||
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
|
||||
self.get_mut().poll_complete()
|
||||
}
|
||||
|
||||
fn close(&mut self) -> Poll<(), T::SinkError> {
|
||||
self.get_mut().close()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct IoErrorWrapper(pub io::Error);
|
||||
impl From<Box<bincode::ErrorKind>> for IoErrorWrapper {
|
||||
fn from(e: Box<bincode::ErrorKind>) -> Self {
|
||||
IoErrorWrapper(match *e {
|
||||
bincode::ErrorKind::Io(e) => e,
|
||||
bincode::ErrorKind::InvalidUtf8Encoding(e) => {
|
||||
io::Error::new(io::ErrorKind::InvalidInput, e)
|
||||
}
|
||||
bincode::ErrorKind::InvalidBoolEncoding(e) => {
|
||||
io::Error::new(io::ErrorKind::InvalidInput, e.to_string())
|
||||
}
|
||||
bincode::ErrorKind::InvalidTagEncoding(e) => {
|
||||
io::Error::new(io::ErrorKind::InvalidInput, e.to_string())
|
||||
}
|
||||
bincode::ErrorKind::InvalidCharEncoding => {
|
||||
io::Error::new(io::ErrorKind::InvalidInput, "Invalid char encoding")
|
||||
}
|
||||
bincode::ErrorKind::DeserializeAnyNotSupported => {
|
||||
io::Error::new(io::ErrorKind::InvalidInput, "Deserialize Any not supported")
|
||||
}
|
||||
bincode::ErrorKind::SizeLimit => {
|
||||
io::Error::new(io::ErrorKind::InvalidInput, "Size limit exceeded")
|
||||
}
|
||||
bincode::ErrorKind::SequenceMustHaveLength => {
|
||||
io::Error::new(io::ErrorKind::InvalidInput, "Sequence must have length")
|
||||
}
|
||||
bincode::ErrorKind::Custom(s) => io::Error::new(io::ErrorKind::Other, s),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<IoErrorWrapper> for io::Error {
|
||||
fn from(wrapper: IoErrorWrapper) -> io::Error {
|
||||
wrapper.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> WriteBincode<T, U>
|
||||
where
|
||||
T: Sink<SinkItem = BytesMut, SinkError = IoErrorWrapper>,
|
||||
U: Serialize,
|
||||
{
|
||||
/// Creates a new `WriteBincode` with the given buffer sink.
|
||||
pub fn new(inner: T) -> WriteBincode<T, U> {
|
||||
let json = Bincode { ghost: PhantomData };
|
||||
WriteBincode {
|
||||
inner: FramedWrite::new(inner, json),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Sink, U> WriteBincode<T, U> {
|
||||
/// Returns a mutable reference to the underlying sink wrapped by
|
||||
/// `WriteBincode`.
|
||||
///
|
||||
/// Note that care should be taken to not tamper with the underlying sink as
|
||||
/// it may corrupt the sequence of frames otherwise being worked with.
|
||||
pub fn get_mut(&mut self) -> &mut T {
|
||||
self.inner.get_mut()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> Sink for WriteBincode<T, U>
|
||||
where
|
||||
T: Sink<SinkItem = BytesMut, SinkError = IoErrorWrapper>,
|
||||
U: Serialize,
|
||||
{
|
||||
type SinkItem = U;
|
||||
type SinkError = <T as Sink>::SinkError;
|
||||
|
||||
fn start_send(&mut self, item: U) -> StartSend<U, Self::SinkError> {
|
||||
self.inner.start_send(item)
|
||||
}
|
||||
|
||||
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
|
||||
self.inner.poll_complete()
|
||||
}
|
||||
|
||||
fn close(&mut self) -> Poll<(), Self::SinkError> {
|
||||
self.inner.poll_complete()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> Stream for WriteBincode<T, U>
|
||||
where
|
||||
T: Stream + Sink,
|
||||
{
|
||||
type Item = T::Item;
|
||||
type Error = T::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
|
||||
self.get_mut().poll()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Deserializer<T> for Bincode<T>
|
||||
where
|
||||
T: for<'de> Deserialize<'de>,
|
||||
{
|
||||
type Error = Error;
|
||||
|
||||
fn deserialize(&mut self, src: &Bytes) -> Result<T, Error> {
|
||||
bincode::deserialize(src)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Serialize> Serializer<T> for Bincode<T> {
|
||||
type Error = Error;
|
||||
|
||||
fn serialize(&mut self, item: &T) -> Result<BytesMut, Self::Error> {
|
||||
bincode::serialize(item).map(Into::into)
|
||||
}
|
||||
}
|
||||
@@ -15,7 +15,7 @@ readme = "../README.md"
|
||||
description = "An example server built on tarpc."
|
||||
|
||||
[dependencies]
|
||||
bincode-transport = { package = "tarpc-bincode-transport", version = "0.1", path = "../bincode-transport" }
|
||||
bincode-transport = { package = "tarpc-bincode-transport", version = "0.2", path = "../bincode-transport" }
|
||||
futures-preview = { version = "0.3.0-alpha.8", features = ["compat", "tokio-compat"] }
|
||||
serde = { version = "1.0" }
|
||||
tarpc = { version = "0.13", path = "../tarpc", features = ["serde1"] }
|
||||
|
||||
@@ -32,7 +32,9 @@ futures-preview = "0.3.0-alpha.8"
|
||||
[dev-dependencies]
|
||||
humantime = "1.0"
|
||||
futures-preview = { version = "0.3.0-alpha.8", features = ["compat", "tokio-compat"] }
|
||||
bincode-transport = { package = "tarpc-bincode-transport", version = "0.1", path = "../bincode-transport" }
|
||||
bincode-transport = { package = "tarpc-bincode-transport", version = "0.2", path = "../bincode-transport" }
|
||||
env_logger = "0.5"
|
||||
tokio = "0.1"
|
||||
tokio-executor = "0.1"
|
||||
tokio-tcp = "0.1"
|
||||
pin-utils = "0.1.0-alpha.3"
|
||||
|
||||
Reference in New Issue
Block a user