WIP multiplex Parse/Serialize/FramedIo impls

This commit is contained in:
Tim Kuehn
2016-09-15 02:38:07 -07:00
parent 8c0181633d
commit 20d1a019ae
16 changed files with 525 additions and 603 deletions

View File

@@ -337,39 +337,42 @@ macro_rules! service {
rpc $fn_name:ident ( $( $arg:ident : $in_:ty ),* ) -> $out:ty | $error:ty;
)*
) => {
service! {
{ }
#[allow(non_camel_case_types, unused)]
#[derive(Debug)]
enum __tarpc_service_Request {
NotIrrefutable(()),
$(
$(#[$attr])*
rpc $fn_name( $( $arg : $in_ ),* ) -> $out | $error;
)*
{
#[allow(non_camel_case_types, unused)]
#[derive(Debug)]
enum __ClientSideRequest<'a> {
$(
$fn_name(&'a ( $(&'a $in_,)* ))
),*
}
impl_serialize!(__ClientSideRequest, { <'__a> }, $($fn_name(($($in_),*)))*);
}
$fn_name(( $($in_,)* ))
),*
}
};
// Pattern for when all return types and the client request have been expanded
(
{ } // none left to expand
$(
$(#[$attr:meta])*
rpc $fn_name:ident ( $( $arg:ident : $in_:ty ),* ) -> $out:ty | $error:ty;
)*
{
$client_req:item
$client_serialize_impl:item
impl_deserialize!(__tarpc_service_Request, NotIrrefutable(()) $($fn_name(($($in_),*)))*);
impl_serialize!(__tarpc_service_Request, {}, NotIrrefutable(()) $($fn_name(($($in_),*)))*);
#[allow(non_camel_case_types, unused)]
#[derive(Debug)]
enum __tarpc_service_Response {
NotIrrefutable(()),
$(
$fn_name($out)
),*
}
) => {
impl_deserialize!(__tarpc_service_Response, NotIrrefutable(()) $($fn_name($out))*);
impl_serialize!(__tarpc_service_Response, {}, NotIrrefutable(()) $($fn_name($out))*);
#[allow(non_camel_case_types, unused)]
#[derive(Debug)]
enum __tarpc_service_Error {
NotIrrefutable(()),
$(
$fn_name($error)
),*
}
impl_deserialize!(__tarpc_service_Error, NotIrrefutable(()) $($fn_name($error))*);
impl_serialize!(__tarpc_service_Error, {}, NotIrrefutable(()) $($fn_name($error))*);
/// Defines the `Future` RPC service. Implementors must be `Clone`, `Send`, and `'static`,
/// as required by `tokio_proto::NewService`. This is required so that the service can be used
@@ -399,7 +402,7 @@ macro_rules! service {
fn listen<L>(self, addr: L) -> $crate::ListenFuture
where L: ::std::net::ToSocketAddrs
{
return $crate::listen(addr, __tarpc_service_AsyncServer(self));
return $crate::listen_pipeline(addr, __tarpc_service_AsyncServer(self));
#[allow(non_camel_case_types)]
#[derive(Clone)]
@@ -411,30 +414,35 @@ macro_rules! service {
}
}
#[allow(non_camel_case_types)]
enum __tarpc_service_Reply<__tarpc_service_S: FutureService> {
DeserializeError($crate::SerializeFuture),
$($fn_name($crate::futures::Then<
$crate::futures::MapErr<
ty_snake_to_camel!(__tarpc_service_S::$fn_name),
fn($error) -> $crate::WireError<$error>>,
$crate::SerializeFuture,
fn(::std::result::Result<$out, $crate::WireError<$error>>)
-> $crate::SerializeFuture>)),*
type __tarpc_service_Future =
$crate::futures::Finished<$crate::Response<__tarpc_service_Response,
__tarpc_service_Error>,
::std::io::Error>;
#[allow(non_camel_case_types)]
enum __tarpc_service_FutureReply<__tarpc_service_S: FutureService> {
DeserializeError(__tarpc_service_Future),
$($fn_name($crate::futures::Then<ty_snake_to_camel!(__tarpc_service_S::$fn_name),
__tarpc_service_Future,
fn(::std::result::Result<$out, $error>)
-> __tarpc_service_Future>)),*
}
impl<S: FutureService> $crate::futures::Future for __tarpc_service_Reply<S> {
type Item = $crate::SerializedReply;
impl<S: FutureService> $crate::futures::Future for __tarpc_service_FutureReply<S> {
type Item = $crate::Response<__tarpc_service_Response, __tarpc_service_Error>;
type Error = ::std::io::Error;
fn poll(&mut self) -> $crate::futures::Poll<Self::Item, Self::Error> {
match *self {
__tarpc_service_Reply::DeserializeError(ref mut f) => {
$crate::futures::Future::poll(f)
__tarpc_service_FutureReply::DeserializeError(ref mut __tarpc_service_future) => {
$crate::futures::Future::poll(__tarpc_service_future)
}
$(
__tarpc_service_Reply::$fn_name(ref mut f) => {
$crate::futures::Future::poll(f)
__tarpc_service_FutureReply::$fn_name(ref mut __tarpc_service_future) => {
$crate::futures::Future::poll(__tarpc_service_future)
}
),*
}
@@ -447,66 +455,48 @@ macro_rules! service {
for __tarpc_service_AsyncServer<__tarpc_service_S>
where __tarpc_service_S: FutureService
{
type Request = ::std::vec::Vec<u8>;
type Response = $crate::SerializedReply;
type Request = ::std::result::Result<__tarpc_service_Request,
$crate::bincode::serde::DeserializeError>;
type Response = $crate::Response<__tarpc_service_Response, __tarpc_service_Error>;
type Error = ::std::io::Error;
type Future = __tarpc_service_Reply<__tarpc_service_S>;
type Future = __tarpc_service_FutureReply<__tarpc_service_S>;
fn poll_ready(&self) -> $crate::futures::Async<()> {
$crate::futures::Async::Ready(())
}
fn call(&self, __tarpc_service_req: Self::Request) -> Self::Future {
#[allow(non_camel_case_types, unused)]
#[derive(Debug)]
enum __tarpc_service_ServerSideRequest {
$(
$fn_name(( $($in_,)* ))
),*
}
impl_deserialize!(__tarpc_service_ServerSideRequest,
$($fn_name(($($in_),*)))*);
let __tarpc_service_request = $crate::deserialize(&__tarpc_service_req);
let __tarpc_service_request: __tarpc_service_ServerSideRequest =
match __tarpc_service_request {
::std::result::Result::Ok(__tarpc_service_request) => {
__tarpc_service_request
}
::std::result::Result::Err(__tarpc_service_e) => {
return __tarpc_service_Reply::DeserializeError(
deserialize_error(__tarpc_service_e));
}
};
match __tarpc_service_request {$(
__tarpc_service_ServerSideRequest::$fn_name(( $($arg,)* )) => {
const SERIALIZE:
fn(::std::result::Result<$out, $crate::WireError<$error>>)
-> $crate::SerializeFuture
= $crate::serialize_reply;
const TO_APP: fn($error) -> $crate::WireError<$error> =
$crate::WireError::App;
return __tarpc_service_Reply::$fn_name(
$crate::futures::Future::then(
$crate::futures::Future::map_err(
FutureService::$fn_name(&self.0, $($arg),*),
TO_APP),
SERIALIZE));
fn call(&self, __tarpc_service_request: Self::Request) -> Self::Future {
let __tarpc_service_request = match __tarpc_service_request {
Ok(__tarpc_service_request) => __tarpc_service_request,
Err(__tarpc_service_deserialize_err) => {
return __tarpc_service_FutureReply::DeserializeError(
$crate::futures::finished(
$crate::tokio_proto::pipeline::Message::WithoutBody(
::std::result::Result::Err(
$crate::WireError::ServerDeserialize(
::std::string::ToString::to_string(&__tarpc_service_deserialize_err))))));
}
)*}
#[inline]
fn deserialize_error<E: ::std::error::Error>(__tarpc_service_e: E)
-> $crate::SerializeFuture
{
$crate::serialize_reply(
// The type param is only used in the Error::App variant, so it
// doesn't matter what we specify it as here.
::std::result::Result::Err::<(), _>(
$crate::WireError::ServerDeserialize::<$crate::util::Never>(
__tarpc_service_e.to_string())))
};
match __tarpc_service_request {
__tarpc_service_Request::NotIrrefutable(()) => unreachable!(),
$(
__tarpc_service_Request::$fn_name(( $($arg,)* )) => {
fn __tarpc_service_wrap(
__tarpc_service_response: ::std::result::Result<$out, $error>)
-> __tarpc_service_Future
{
$crate::futures::finished($crate::tokio_proto::pipeline::Message::WithoutBody(
__tarpc_service_response
.map(__tarpc_service_Response::$fn_name)
.map_err(|__tarpc_service_error| $crate::WireError::App(__tarpc_service_Error::$fn_name(__tarpc_service_error)))
))
}
return __tarpc_service_FutureReply::$fn_name(
$crate::futures::Future::then(
FutureService::$fn_name(&self.0, $($arg),*),
__tarpc_service_wrap));
}
)*
}
}
}
@@ -622,7 +612,7 @@ macro_rules! service {
#[allow(unused)]
$(#[$attr])*
#[inline]
pub fn $fn_name(&self, $($arg: &$in_),*)
pub fn $fn_name(&self, $($arg: $in_),*)
-> ::std::result::Result<$out, $crate::Error<$error>>
{
let rpc = (self.0).$fn_name($($arg),*);
@@ -631,17 +621,36 @@ macro_rules! service {
)*
}
#[allow(non_camel_case_types)]
type __tarpc_service_Client = $crate::Client<__tarpc_service_Request, __tarpc_service_Response, __tarpc_service_Error>;
#[allow(non_camel_case_types)]
pub struct __tarpc_service_ConnectFuture<T> {
inner: $crate::futures::Map<$crate::ClientFuture<__tarpc_service_Request, __tarpc_service_Response, __tarpc_service_Error>, fn(__tarpc_service_Client) -> T>,
}
impl<T> $crate::futures::Future for __tarpc_service_ConnectFuture<T> {
type Item = T;
type Error = ::std::io::Error;
fn poll(&mut self) -> $crate::futures::Poll<Self::Item, Self::Error> {
$crate::futures::Future::poll(&mut self.inner)
}
}
#[allow(unused)]
#[derive(Clone, Debug)]
/// The client stub that makes RPC calls to the server. Exposes a Future interface.
pub struct FutureClient($crate::Client);
pub struct FutureClient(__tarpc_service_Client);
impl $crate::future::Connect for FutureClient {
type Fut = $crate::futures::Map<$crate::ClientFuture, fn($crate::Client) -> Self>;
type Fut = __tarpc_service_ConnectFuture<Self>;
fn connect(addr: &::std::net::SocketAddr) -> Self::Fut {
let client = <$crate::Client as $crate::future::Connect>::connect(addr);
$crate::futures::Future::map(client, FutureClient)
let client = <__tarpc_service_Client as $crate::future::Connect>::connect(addr);
__tarpc_service_ConnectFuture {
inner: $crate::futures::Future::map(client, FutureClient)
}
}
}
@@ -650,51 +659,46 @@ macro_rules! service {
#[allow(unused)]
$(#[$attr])*
#[inline]
pub fn $fn_name(&self, $($arg: &$in_),*)
pub fn $fn_name(&self, $($arg: $in_),*)
-> impl $crate::futures::Future<Item=$out, Error=$crate::Error<$error>>
+ 'static
{
$client_req
$client_serialize_impl
future_enum! {
enum Fut<C, F> {
Called(C),
Failed(F)
}
}
let __tarpc_service_args = ($($arg,)*);
let __tarpc_service_req = &__ClientSideRequest::$fn_name(&__tarpc_service_args);
let __tarpc_service_req =
match $crate::Packet::serialize(&__tarpc_service_req)
{
::std::result::Result::Err(__tarpc_service_e) => {
return Fut::Failed(
$crate::futures::failed(
$crate::Error::ClientSerialize(__tarpc_service_e)))
}
::std::result::Result::Ok(__tarpc_service_req) => __tarpc_service_req,
};
let __tarpc_service_req = __tarpc_service_Request::$fn_name(($($arg,)*));
let __tarpc_service_fut =
$crate::tokio_service::Service::call(&self.0, __tarpc_service_req);
Fut::Called($crate::futures::Future::then(__tarpc_service_fut,
$crate::futures::Future::then(__tarpc_service_fut,
move |__tarpc_service_msg| {
let __tarpc_service_msg: Vec<u8> = try!(__tarpc_service_msg);
let __tarpc_service_msg:
::std::result::Result<
::std::result::Result<$out, $crate::WireError<$error>>, _>
= $crate::deserialize(&__tarpc_service_msg);
let __tarpc_service_msg = try!(__tarpc_service_msg);
match __tarpc_service_msg {
::std::result::Result::Ok(__tarpc_service_msg) => {
::std::result::Result::Ok(try!(__tarpc_service_msg))
if let __tarpc_service_Response::$fn_name(__tarpc_service_msg) =
__tarpc_service_msg
{
::std::result::Result::Ok(__tarpc_service_msg)
} else {
unreachable!()
}
}
::std::result::Result::Err(__tarpc_service_e) => {
::std::result::Result::Err(
$crate::Error::ClientDeserialize(__tarpc_service_e))
::std::result::Result::Err(__tarpc_service_err) => {
::std::result::Result::Err(match __tarpc_service_err {
$crate::Error::App(__tarpc_service_err) => {
if let __tarpc_service_Error::$fn_name(__tarpc_service_err) =
__tarpc_service_err
{
$crate::Error::App(__tarpc_service_err)
} else {
unreachable!()
}
}
$crate::Error::ServerDeserialize(__tarpc_service_err) => $crate::Error::ServerDeserialize(__tarpc_service_err),
$crate::Error::ServerSerialize(__tarpc_service_err) => $crate::Error::ServerSerialize(__tarpc_service_err),
$crate::Error::ClientDeserialize(__tarpc_service_err) => $crate::Error::ClientDeserialize(__tarpc_service_err),
$crate::Error::ClientSerialize(__tarpc_service_err) => $crate::Error::ClientSerialize(__tarpc_service_err),
$crate::Error::Io(__tarpc_service_error) => $crate::Error::Io(__tarpc_service_error),
})
}
}
}))
})
}
)*
@@ -760,8 +764,8 @@ mod functional_test {
let _ = env_logger::init();
let handle = Server.listen("localhost:0");
let client = SyncClient::connect(handle.local_addr()).unwrap();
assert_eq!(3, client.add(&1, &2).unwrap());
assert_eq!("Hey, Tim.", client.hey(&"Tim".to_string()).unwrap());
assert_eq!(3, client.add(1, 2).unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
}
#[test]
@@ -769,8 +773,8 @@ mod functional_test {
let handle = Server.listen("localhost:0");
let client1 = SyncClient::connect(handle.local_addr()).unwrap();
let client2 = client1.clone();
assert_eq!(3, client1.add(&1, &2).unwrap());
assert_eq!(3, client2.add(&1, &2).unwrap());
assert_eq!(3, client1.add(1, 2).unwrap());
assert_eq!(3, client2.add(1, 2).unwrap());
}
#[test]
@@ -814,8 +818,8 @@ mod functional_test {
let _ = env_logger::init();
let handle = Server.listen("localhost:0").wait().unwrap();
let client = FutureClient::connect(handle.local_addr()).wait().unwrap();
assert_eq!(3, client.add(&1, &2).wait().unwrap());
assert_eq!("Hey, Tim.", client.hey(&"Tim".to_string()).wait().unwrap());
assert_eq!(3, client.add(1, 2).wait().unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap());
}
#[test]
@@ -824,8 +828,8 @@ mod functional_test {
let handle = Server.listen("localhost:0").wait().unwrap();
let client1 = FutureClient::connect(handle.local_addr()).wait().unwrap();
let client2 = client1.clone();
assert_eq!(3, client1.add(&1, &2).wait().unwrap());
assert_eq!(3, client2.add(&1, &2).wait().unwrap());
assert_eq!(3, client1.add(1, 2).wait().unwrap());
assert_eq!(3, client2.add(1, 2).wait().unwrap());
}
#[test]