From 79a2f7fe2f35a60b71da1ff1339e18a30628575a Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Wed, 17 Oct 2018 20:19:37 -0700 Subject: [PATCH] Replace tokio-serde-bincode with async-bincode --- bincode-transport/Cargo.toml | 7 +- bincode-transport/src/compat.rs | 145 ++++++++ bincode-transport/src/lib.rs | 315 ++++++------------ bincode-transport/src/vendored/mod.rs | 7 - .../src/vendored/tokio_serde_bincode.rs | 224 ------------- example-service/Cargo.toml | 2 +- tarpc/Cargo.toml | 4 +- 7 files changed, 256 insertions(+), 448 deletions(-) create mode 100644 bincode-transport/src/compat.rs delete mode 100644 bincode-transport/src/vendored/mod.rs delete mode 100644 bincode-transport/src/vendored/tokio_serde_bincode.rs diff --git a/bincode-transport/Cargo.toml b/bincode-transport/Cargo.toml index 9839b60..bfdc596 100644 --- a/bincode-transport/Cargo.toml +++ b/bincode-transport/Cargo.toml @@ -2,7 +2,7 @@ cargo-features = ["rename-dependency"] [package] name = "tarpc-bincode-transport" -version = "0.1.0" +version = "0.2.0" authors = ["Tim Kuehn "] edition = '2018' license = "MIT" @@ -16,16 +16,13 @@ description = "A bincode-based transport for tarpc services." [dependencies] bincode = { version = "1.0", features = ["i128"] } -bytes = "0.4" futures_legacy = { version = "0.1", package = "futures" } pin-utils = "0.1.0-alpha.2" rpc = { package = "tarpc-lib", version = "0.1", path = "../rpc", features = ["serde1"] } serde = "1.0" -tokio = "0.1" tokio-io = "0.1" -tokio-serde-bincode = "0.1" +async-bincode = "0.4" tokio-tcp = "0.1" -tokio-serde = "0.2" [target.'cfg(not(test))'.dependencies] futures-preview = { version = "0.3.0-alpha.8", features = ["compat"] } diff --git a/bincode-transport/src/compat.rs b/bincode-transport/src/compat.rs new file mode 100644 index 0000000..a9aa7c4 --- /dev/null +++ b/bincode-transport/src/compat.rs @@ -0,0 +1,145 @@ +use futures::{compat::Stream01CompatExt, prelude::*, ready}; +use futures_legacy::{ + executor::{ + self as executor01, Notify as Notify01, NotifyHandle as NotifyHandle01, + UnsafeNotify as UnsafeNotify01, + }, + Async as Async01, AsyncSink as AsyncSink01, Stream as Stream01, Sink as Sink01 +}; +use std::{pin::Pin, task::{self, LocalWaker, Poll}}; + +/// A shim to convert a 0.1 Sink + Stream to a 0.3 Sink + Stream. +#[derive(Debug)] +pub struct Compat { + staged_item: Option, + inner: S, +} + +impl Compat { + /// Returns a new Compat. + pub fn new(inner: S) -> Self { + Compat { inner, staged_item: None } + } + + /// Unwraps Compat, returning the inner value. + pub fn into_inner(self) -> S { + self.inner + } + + /// Returns a reference to the value wrapped by Compat. + pub fn get_ref(&self) -> &S { + &self.inner + } +} + +impl Stream for Compat +where + S: Stream01 +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + unsafe { + let inner = &mut Pin::get_mut_unchecked(self).inner; + let mut compat = inner.compat(); + let compat = Pin::new_unchecked(&mut compat); + match ready!(compat.poll_next(waker)) { + None => Poll::Ready(None), + Some(Ok(next)) => Poll::Ready(Some(Ok(next))), + Some(Err(e)) => Poll::Ready(Some(Err(e))), + } + } + } +} + +impl Sink for Compat +where + S: Sink01, +{ + type SinkItem = SinkItem; + type SinkError = S::SinkError; + + fn start_send(self: Pin<&mut Self>, item: SinkItem) -> Result<(), S::SinkError> { + let me = unsafe { Pin::get_mut_unchecked(self) }; + assert!(me.staged_item.is_none()); + me.staged_item = Some(item); + Ok(()) + } + + fn poll_ready(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + let notify = &WakerToHandle(waker); + + executor01::with_notify(notify, 0, move || { + let me = unsafe { Pin::get_mut_unchecked(self) }; + match me.staged_item.take() { + Some(staged_item) => match me.inner.start_send(staged_item) { + Ok(AsyncSink01::Ready) => Poll::Ready(Ok(())), + Ok(AsyncSink01::NotReady(item)) => { + me.staged_item = Some(item); + Poll::Pending + } + Err(e) => Poll::Ready(Err(e)), + }, + None => Poll::Ready(Ok(())), + } + }) + } + + fn poll_flush(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + let notify = &WakerToHandle(waker); + + executor01::with_notify(notify, 0, move || { + let me = unsafe { Pin::get_mut_unchecked(self) }; + match me.inner.poll_complete() { + Ok(Async01::Ready(())) => Poll::Ready(Ok(())), + Ok(Async01::NotReady) => Poll::Pending, + Err(e) => Poll::Ready(Err(e)), + } + }) + } + + fn poll_close(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + let notify = &WakerToHandle(waker); + + executor01::with_notify(notify, 0, move || { + let me = unsafe { Pin::get_mut_unchecked(self) }; + match me.inner.close() { + Ok(Async01::Ready(())) => Poll::Ready(Ok(())), + Ok(Async01::NotReady) => Poll::Pending, + Err(e) => Poll::Ready(Err(e)), + } + }) + } +} + +#[derive(Clone, Debug)] +struct WakerToHandle<'a>(&'a LocalWaker); + +#[derive(Debug)] +struct NotifyWaker(task::Waker); + +impl Notify01 for NotifyWaker { + fn notify(&self, _: usize) { + self.0.wake(); + } +} + +unsafe impl UnsafeNotify01 for NotifyWaker { + unsafe fn clone_raw(&self) -> NotifyHandle01 { + let ptr = Box::new(NotifyWaker(self.0.clone())); + + NotifyHandle01::new(Box::into_raw(ptr)) + } + + unsafe fn drop_raw(&self) { + let ptr: *const dyn UnsafeNotify01 = self; + drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01)); + } +} + +impl<'a> From> for NotifyHandle01 { + fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 { + unsafe { NotifyWaker(handle.0.clone().into_waker()).clone_raw() } + } +} + diff --git a/bincode-transport/src/lib.rs b/bincode-transport/src/lib.rs index fc85538..48348a7 100644 --- a/bincode-transport/src/lib.rs +++ b/bincode-transport/src/lib.rs @@ -10,75 +10,127 @@ futures_api, pin, arbitrary_self_types, - underscore_imports, await_macro, async_await, )] #![deny(missing_docs, missing_debug_implementations)] -mod vendored; - -use bytes::{Bytes, BytesMut}; -use crate::vendored::tokio_serde_bincode::{IoErrorWrapper, ReadBincode, WriteBincode}; -use futures::{ - Poll, - compat::{Compat01As03, Future01CompatExt, Stream01CompatExt}, - prelude::*, - ready, task, -}; -use futures_legacy::{ - executor::{ - self as executor01, Notify as Notify01, NotifyHandle as NotifyHandle01, - UnsafeNotify as UnsafeNotify01, - }, - sink::SinkMapErr as SinkMapErr01, - sink::With as With01, - stream::MapErr as MapErr01, - Async as Async01, AsyncSink as AsyncSink01, Sink as Sink01, Stream as Stream01, -}; +use async_bincode::{AsyncBincodeStream, AsyncDestination}; +use futures::{compat::{Compat01As03, Future01CompatExt, Stream01CompatExt}, prelude::*, ready}; use pin_utils::unsafe_pinned; -use serde::{Deserialize, Serialize}; -use std::{fmt, io, marker::PhantomData, net::SocketAddr, pin::Pin, task::LocalWaker}; -use tokio::codec::{Framed, LengthDelimitedCodec, length_delimited}; -use tokio_tcp::{self, TcpListener, TcpStream}; +use serde::{Serialize, Deserialize}; +use self::compat::Compat; +use std::{error::Error, io, marker::PhantomData, net::SocketAddr, pin::Pin, task::{LocalWaker, Poll}}; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_tcp::{TcpListener, TcpStream}; -/// Returns a new bincode transport that reads from and writes to `io`. -pub fn new(io: TcpStream) -> Transport -where - Item: for<'de> Deserialize<'de>, - SinkItem: Serialize, -{ - let peer_addr = io.peer_addr(); - let local_addr = io.local_addr(); - let inner = length_delimited::Builder::new() - .max_frame_length(8_000_000) - .new_framed(io) - .map_err(IoErrorWrapper as _) - .sink_map_err(IoErrorWrapper as _) - .with(freeze as _); - let inner = WriteBincode::new(inner); - let inner = ReadBincode::new(inner); +mod compat; - Transport { - inner, - staged_item: None, - peer_addr, - local_addr, +/// A transport that serializes to, and deserializes from, a [`TcpStream`]. +#[derive(Debug)] +pub struct Transport { + inner: Compat, SinkItem> +} + +impl Transport { + /// Returns the transport underlying the bincode transport. + pub fn into_inner(self) -> S { + self.inner.into_inner().into_inner() } } -fn freeze(bytes: BytesMut) -> Result { - Ok(bytes.freeze()) +impl Transport { + unsafe_pinned!(inner: Compat, SinkItem>); } -/// Connects to `addr`, wrapping the connection in a bincode transport. -pub async fn connect(addr: &SocketAddr) -> io::Result> +impl Stream for Transport +where + S: AsyncRead, + Item: for<'a> Deserialize<'a>, +{ + type Item = io::Result; + + fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll>> { + match self.inner().poll_next(waker) { + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(Some(Ok(next))) => Poll::Ready(Some(Ok(next))), + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(io::Error::new(io::ErrorKind::Other, e)))), + } + } +} + +impl Sink for Transport +where + S: AsyncWrite, + SinkItem: Serialize, +{ + type SinkItem = SinkItem; + type SinkError = io::Error; + + fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> { + self.inner() + .start_send(item) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + } + + fn poll_ready(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + convert(self.inner().poll_ready(waker)) + } + + fn poll_flush(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + convert(self.inner().poll_flush(waker)) + } + + fn poll_close(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + convert(self.inner().poll_close(waker)) + } +} + +fn convert>>(poll: Poll>) -> Poll> { + match poll { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))), + } +} + +impl rpc::Transport for Transport where Item: for<'de> Deserialize<'de>, SinkItem: Serialize, { - let stream = await!(TcpStream::connect(addr).compat())?; - Ok(new(stream)) + type Item = Item; + type SinkItem = SinkItem; + + fn peer_addr(&self) -> io::Result { + self.inner.get_ref().get_ref().peer_addr() + } + + fn local_addr(&self) -> io::Result { + self.inner.get_ref().get_ref().local_addr() + } +} + +/// Returns a new bincode transport that reads from and writes to `io`. +pub fn new(io: TcpStream) -> Transport +where + Item: for<'de> Deserialize<'de>, + SinkItem: Serialize, +{ + Transport { + inner: Compat::new(AsyncBincodeStream::from(io).for_async()), + } + +} + +/// Connects to `addr`, wrapping the connection in a bincode transport. +pub async fn connect(addr: &SocketAddr) -> io::Result> +where + Item: for<'de> Deserialize<'de>, + SinkItem: Serialize, +{ + Ok(new(await!(TcpStream::connect(addr).compat())?)) } /// Listens on `addr`, wrapping accepted connections in bincode transports. @@ -119,7 +171,7 @@ where Item: for<'a> Deserialize<'a>, SinkItem: Serialize, { - type Item = io::Result>; + type Item = io::Result>; fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { let next = ready!(self.incoming().poll_next(waker)?); @@ -127,160 +179,3 @@ where } } -/// A transport that serializes to, and deserializes from, a [`TcpStream`]. -pub struct Transport { - inner: ReadBincode< - WriteBincode< - With01< - SinkMapErr01< - MapErr01< - Framed, - fn(std::io::Error) -> IoErrorWrapper, - >, - fn(std::io::Error) -> IoErrorWrapper, - >, - BytesMut, - fn(BytesMut) -> Result, - Result - >, - SinkItem, - >, - Item, - >, - staged_item: Option, - peer_addr: io::Result, - local_addr: io::Result, -} - -impl fmt::Debug for Transport { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Transport") - } -} - -impl Stream for Transport -where - Item: for<'a> Deserialize<'a>, -{ - type Item = io::Result; - - fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll>> { - unsafe { - let inner = &mut Pin::get_mut_unchecked(self).inner; - let mut compat = inner.compat(); - let compat = Pin::new_unchecked(&mut compat); - match ready!(compat.poll_next(waker)) { - None => Poll::Ready(None), - Some(Ok(next)) => Poll::Ready(Some(Ok(next))), - Some(Err(e)) => Poll::Ready(Some(Err(e.0))), - } - } - } -} - -impl Sink for Transport -where - SinkItem: Serialize, -{ - type SinkItem = SinkItem; - type SinkError = io::Error; - - fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> { - let me = unsafe { Pin::get_mut_unchecked(self) }; - assert!(me.staged_item.is_none()); - me.staged_item = Some(item); - Ok(()) - } - - fn poll_ready(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { - let notify = &WakerToHandle(waker); - - executor01::with_notify(notify, 0, move || { - let me = unsafe { Pin::get_mut_unchecked(self) }; - match me.staged_item.take() { - Some(staged_item) => match me.inner.start_send(staged_item)? { - AsyncSink01::Ready => Poll::Ready(Ok(())), - AsyncSink01::NotReady(item) => { - me.staged_item = Some(item); - Poll::Pending - } - }, - None => Poll::Ready(Ok(())), - } - }) - } - - fn poll_flush(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { - let notify = &WakerToHandle(waker); - - executor01::with_notify(notify, 0, move || { - let me = unsafe { Pin::get_mut_unchecked(self) }; - match me.inner.poll_complete()? { - Async01::Ready(()) => Poll::Ready(Ok(())), - Async01::NotReady => Poll::Pending, - } - }) - } - - fn poll_close(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { - let notify = &WakerToHandle(waker); - - executor01::with_notify(notify, 0, move || { - let me = unsafe { Pin::get_mut_unchecked(self) }; - match me.inner.get_mut().close()? { - Async01::Ready(()) => Poll::Ready(Ok(())), - Async01::NotReady => Poll::Pending, - } - }) - } -} - -impl rpc::Transport for Transport -where - Item: for<'de> Deserialize<'de>, - SinkItem: Serialize, -{ - type Item = Item; - type SinkItem = SinkItem; - - fn peer_addr(&self) -> io::Result { - // TODO: should just access from the inner transport. - // https://github.com/alexcrichton/tokio-serde-bincode/issues/4 - Ok(*self.peer_addr.as_ref().unwrap()) - } - - fn local_addr(&self) -> io::Result { - Ok(*self.local_addr.as_ref().unwrap()) - } -} - -#[derive(Clone, Debug)] -struct WakerToHandle<'a>(&'a LocalWaker); - -#[derive(Debug)] -struct NotifyWaker(task::Waker); - -impl Notify01 for NotifyWaker { - fn notify(&self, _: usize) { - self.0.wake(); - } -} - -unsafe impl UnsafeNotify01 for NotifyWaker { - unsafe fn clone_raw(&self) -> NotifyHandle01 { - let ptr = Box::new(NotifyWaker(self.0.clone())); - - NotifyHandle01::new(Box::into_raw(ptr)) - } - - unsafe fn drop_raw(&self) { - let ptr: *const dyn UnsafeNotify01 = self; - drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01)); - } -} - -impl<'a> From> for NotifyHandle01 { - fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 { - unsafe { NotifyWaker(handle.0.clone().into_waker()).clone_raw() } - } -} diff --git a/bincode-transport/src/vendored/mod.rs b/bincode-transport/src/vendored/mod.rs deleted file mode 100644 index 5ad3496..0000000 --- a/bincode-transport/src/vendored/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright 2018 Google LLC -// -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -pub(crate) mod tokio_serde_bincode; diff --git a/bincode-transport/src/vendored/tokio_serde_bincode.rs b/bincode-transport/src/vendored/tokio_serde_bincode.rs deleted file mode 100644 index abdc63a..0000000 --- a/bincode-transport/src/vendored/tokio_serde_bincode.rs +++ /dev/null @@ -1,224 +0,0 @@ -//! `Stream` and `Sink` adaptors for serializing and deserializing values using -//! Bincode. -//! -//! This crate provides adaptors for going from a stream or sink of buffers -//! ([`Bytes`]) to a stream or sink of values by performing Bincode encoding or -//! decoding. It is expected that each yielded buffer contains a single -//! serialized Bincode value. The specific strategy by which this is done is left -//! up to the user. One option is to use using [`length_delimited`] from -//! [tokio-io]. -//! -//! [`Bytes`]: https://docs.rs/bytes/0.4/bytes/struct.Bytes.html -//! [`length_delimited`]: http://alexcrichton.com/tokio-io/tokio_io/codec/length_delimited/index.html -//! [tokio-io]: http://github.com/alexcrichton/tokio-io -//! [examples]: https://github.com/carllerche/tokio-serde-json/tree/master/examples - -#![allow(missing_debug_implementations)] - -use bincode::Error; -use bytes::{Bytes, BytesMut}; -use futures_legacy::{Poll, Sink, StartSend, Stream}; -use serde::{Deserialize, Serialize}; -use std::io; -use tokio_serde::{Deserializer, FramedRead, FramedWrite, Serializer}; - -use std::marker::PhantomData; - -/// Adapts a stream of Bincode encoded buffers to a stream of values by -/// deserializing them. -/// -/// `ReadBincode` implements `Stream` by polling the inner buffer stream and -/// deserializing the buffer as Bincode. It expects that each yielded buffer -/// represents a single Bincode value and does not contain any extra trailing -/// bytes. -pub(crate) struct ReadBincode { - inner: FramedRead>, -} - -/// Adapts a buffer sink to a value sink by serializing the values as Bincode. -/// -/// `WriteBincode` implements `Sink` by serializing the submitted values to a -/// buffer. The buffer is then sent to the inner stream, which is responsible -/// for handling framing on the wire. -pub(crate) struct WriteBincode { - inner: FramedWrite>, -} - -struct Bincode { - ghost: PhantomData, -} - -impl ReadBincode -where - T: Stream, - U: for<'de> Deserialize<'de>, - Bytes: From, -{ - /// Creates a new `ReadBincode` with the given buffer stream. - pub fn new(inner: T) -> ReadBincode { - let json = Bincode { ghost: PhantomData }; - ReadBincode { - inner: FramedRead::new(inner, json), - } - } -} - -impl ReadBincode { - /// Returns a mutable reference to the underlying stream wrapped by - /// `ReadBincode`. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. - pub fn get_mut(&mut self) -> &mut T { - self.inner.get_mut() - } -} - -impl Stream for ReadBincode -where - T: Stream, - U: for<'de> Deserialize<'de>, - Bytes: From, -{ - type Item = U; - type Error = ::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.poll() - } -} - -impl Sink for ReadBincode -where - T: Sink, -{ - type SinkItem = T::SinkItem; - type SinkError = T::SinkError; - - fn start_send(&mut self, item: T::SinkItem) -> StartSend { - self.get_mut().start_send(item) - } - - fn poll_complete(&mut self) -> Poll<(), T::SinkError> { - self.get_mut().poll_complete() - } - - fn close(&mut self) -> Poll<(), T::SinkError> { - self.get_mut().close() - } -} - -pub(crate) struct IoErrorWrapper(pub io::Error); -impl From> for IoErrorWrapper { - fn from(e: Box) -> Self { - IoErrorWrapper(match *e { - bincode::ErrorKind::Io(e) => e, - bincode::ErrorKind::InvalidUtf8Encoding(e) => { - io::Error::new(io::ErrorKind::InvalidInput, e) - } - bincode::ErrorKind::InvalidBoolEncoding(e) => { - io::Error::new(io::ErrorKind::InvalidInput, e.to_string()) - } - bincode::ErrorKind::InvalidTagEncoding(e) => { - io::Error::new(io::ErrorKind::InvalidInput, e.to_string()) - } - bincode::ErrorKind::InvalidCharEncoding => { - io::Error::new(io::ErrorKind::InvalidInput, "Invalid char encoding") - } - bincode::ErrorKind::DeserializeAnyNotSupported => { - io::Error::new(io::ErrorKind::InvalidInput, "Deserialize Any not supported") - } - bincode::ErrorKind::SizeLimit => { - io::Error::new(io::ErrorKind::InvalidInput, "Size limit exceeded") - } - bincode::ErrorKind::SequenceMustHaveLength => { - io::Error::new(io::ErrorKind::InvalidInput, "Sequence must have length") - } - bincode::ErrorKind::Custom(s) => io::Error::new(io::ErrorKind::Other, s), - }) - } -} - -impl From for io::Error { - fn from(wrapper: IoErrorWrapper) -> io::Error { - wrapper.0 - } -} - -impl WriteBincode -where - T: Sink, - U: Serialize, -{ - /// Creates a new `WriteBincode` with the given buffer sink. - pub fn new(inner: T) -> WriteBincode { - let json = Bincode { ghost: PhantomData }; - WriteBincode { - inner: FramedWrite::new(inner, json), - } - } -} - -impl WriteBincode { - /// Returns a mutable reference to the underlying sink wrapped by - /// `WriteBincode`. - /// - /// Note that care should be taken to not tamper with the underlying sink as - /// it may corrupt the sequence of frames otherwise being worked with. - pub fn get_mut(&mut self) -> &mut T { - self.inner.get_mut() - } -} - -impl Sink for WriteBincode -where - T: Sink, - U: Serialize, -{ - type SinkItem = U; - type SinkError = ::SinkError; - - fn start_send(&mut self, item: U) -> StartSend { - self.inner.start_send(item) - } - - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.inner.poll_complete() - } - - fn close(&mut self) -> Poll<(), Self::SinkError> { - self.inner.poll_complete() - } -} - -impl Stream for WriteBincode -where - T: Stream + Sink, -{ - type Item = T::Item; - type Error = T::Error; - - fn poll(&mut self) -> Poll, T::Error> { - self.get_mut().poll() - } -} - -impl Deserializer for Bincode -where - T: for<'de> Deserialize<'de>, -{ - type Error = Error; - - fn deserialize(&mut self, src: &Bytes) -> Result { - bincode::deserialize(src) - } -} - -impl Serializer for Bincode { - type Error = Error; - - fn serialize(&mut self, item: &T) -> Result { - bincode::serialize(item).map(Into::into) - } -} diff --git a/example-service/Cargo.toml b/example-service/Cargo.toml index d943b68..249ce1c 100644 --- a/example-service/Cargo.toml +++ b/example-service/Cargo.toml @@ -15,7 +15,7 @@ readme = "../README.md" description = "An example server built on tarpc." [dependencies] -bincode-transport = { package = "tarpc-bincode-transport", version = "0.1", path = "../bincode-transport" } +bincode-transport = { package = "tarpc-bincode-transport", version = "0.2", path = "../bincode-transport" } futures-preview = { version = "0.3.0-alpha.8", features = ["compat", "tokio-compat"] } serde = { version = "1.0" } tarpc = { version = "0.13", path = "../tarpc", features = ["serde1"] } diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index 0ea5d3c..42d86c4 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -32,7 +32,9 @@ futures-preview = "0.3.0-alpha.8" [dev-dependencies] humantime = "1.0" futures-preview = { version = "0.3.0-alpha.8", features = ["compat", "tokio-compat"] } -bincode-transport = { package = "tarpc-bincode-transport", version = "0.1", path = "../bincode-transport" } +bincode-transport = { package = "tarpc-bincode-transport", version = "0.2", path = "../bincode-transport" } env_logger = "0.5" tokio = "0.1" tokio-executor = "0.1" +tokio-tcp = "0.1" +pin-utils = "0.1.0-alpha.3"