Finish the multiplex implementation

This commit is contained in:
Tim Kuehn
2016-09-26 23:44:22 -07:00
parent 4a63064cbd
commit 5bace01f2b
7 changed files with 79 additions and 85 deletions

View File

@@ -44,7 +44,6 @@ pub mod publisher {
#[derive(Clone, Debug)]
struct Subscriber {
id: u32,
publisher: publisher::SyncClient,
}
impl subscriber::FutureService for Subscriber {
@@ -57,16 +56,13 @@ impl subscriber::FutureService for Subscriber {
}
impl Subscriber {
fn new(id: u32, publisher: publisher::SyncClient) -> tokio::server::ServerHandle {
let subscriber = Subscriber {
fn new(id: u32) -> tokio::server::ServerHandle {
Subscriber {
id: id,
publisher: publisher.clone(),
}
.listen("localhost:0".first_socket_addr())
.wait()
.unwrap();
publisher.subscribe(id, *subscriber.local_addr()).unwrap();
subscriber
.unwrap()
}
}
@@ -122,9 +118,15 @@ impl publisher::FutureService for Publisher {
fn main() {
let _ = env_logger::init();
let publisher = Publisher::new().listen("localhost:0".first_socket_addr()).wait().unwrap();
let publisher = publisher::SyncClient::connect(publisher.local_addr()).unwrap();
let _subscriber1 = Subscriber::new(0, publisher.clone());
let _subscriber2 = Subscriber::new(1, publisher.clone());
let publisher_addr = publisher.local_addr();
let publisher = publisher::SyncClient::connect(publisher_addr).unwrap();
let subscriber1 = Subscriber::new(0);
publisher.subscribe(0, *subscriber1.local_addr()).unwrap();
let subscriber2 = Subscriber::new(1);
publisher.subscribe(1, *subscriber2.local_addr()).unwrap();
println!("Broadcasting...");
publisher.broadcast("hello to all".to_string()).unwrap();

View File

@@ -10,9 +10,10 @@
extern crate tarpc;
extern crate futures;
use futures::{BoxFuture, Future};
use add::{FutureService as AddFutureService, FutureServiceExt as AddExt};
use double::{FutureService as DoubleFutureService, FutureServiceExt as DoubleExt};
use futures::{BoxFuture, Future};
use std::sync::{Arc, Mutex};
use tarpc::util::{FirstSocketAddr, Message, Never};
use tarpc::future::Connect as Fc;
use tarpc::sync::Connect as Sc;
@@ -46,7 +47,15 @@ impl AddFutureService for AddServer {
#[derive(Clone)]
struct DoubleServer {
client: add::FutureClient,
client: Arc<Mutex<add::FutureClient>>,
}
impl DoubleServer {
fn new(client: add::FutureClient) -> Self {
DoubleServer {
client: Arc::new(Mutex::new(client))
}
}
}
impl DoubleFutureService for DoubleServer {
@@ -54,6 +63,8 @@ impl DoubleFutureService for DoubleServer {
fn double(&self, x: i32) -> Self::DoubleFut {
self.client
.lock()
.unwrap()
.add(x, x)
.map_err(|e| e.to_string().into())
.boxed()
@@ -63,7 +74,7 @@ impl DoubleFutureService for DoubleServer {
fn main() {
let add = AddServer.listen("localhost:0".first_socket_addr()).wait().unwrap();
let add_client = add::FutureClient::connect(add.local_addr()).wait().unwrap();
let double = DoubleServer { client: add_client };
let double = DoubleServer::new(add_client);
let double = double.listen("localhost:0".first_socket_addr()).wait().unwrap();
let double_client = double::SyncClient::connect(double.local_addr()).unwrap();

View File

@@ -3,13 +3,12 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms.
use WireError;
use {WireError, tokio_proto as proto};
use bincode::serde::DeserializeError;
use futures::{Async, BoxFuture, Future};
use futures::stream::Empty;
use std::fmt;
use std::io;
use tokio_proto::pipeline;
use tokio_service::Service;
use util::Never;
@@ -18,19 +17,13 @@ use util::Never;
/// Typically, this would be combined with a serialization pre-processing step
/// and a deserialization post-processing step.
pub struct Client<Req, Resp, E> {
inner: pipeline::Client<Req,
inner: proto::Client<Req,
Result<Result<Resp, WireError<E>>,
DeserializeError>,
Empty<Never, io::Error>,
io::Error>,
}
impl<Req, Resp, E> Clone for Client<Req, Resp, E> {
fn clone(&self) -> Self {
Client { inner: self.inner.clone() }
}
}
impl<Req, Resp, E> Service for Client<Req, Resp, E>
where Req: Send + 'static,
Resp: Send + 'static,
@@ -46,7 +39,7 @@ impl<Req, Resp, E> Service for Client<Req, Resp, E>
}
fn call(&self, request: Self::Request) -> Self::Future {
self.inner.call(pipeline::Message::WithoutBody(request))
self.inner.call(proto::Message::WithoutBody(request))
.map(|r| r.map(|r| r.map_err(::Error::from))
.map_err(::Error::ClientDeserialize)
.and_then(|r| r))
@@ -71,7 +64,7 @@ pub mod future {
use std::net::SocketAddr;
use super::Client;
use tokio_core::net::TcpStream;
use tokio_proto::pipeline;
use tokio_proto::multiplex;
/// Types that can connect to a server asynchronously.
@@ -118,9 +111,9 @@ pub mod future {
TcpStream::connect(&addr, handle)
.and_then(move |tcp| {
let tcp = RefCell::new(Some(tcp));
let c = try!(pipeline::connect(&handle2, move || {
let c = multiplex::connect(move || {
Ok(Framed::new(tcp.borrow_mut().take().unwrap()))
}));
}, &handle2);
Ok(Client { inner: c })
})
.then(|client| Ok(tx.complete(client)))

View File

@@ -3,11 +3,10 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms.
use bincode;
use {bincode, tokio_proto as proto};
use serde::{Deserialize, Serialize};
use std::{fmt, io};
use std::error::Error as StdError;
use tokio_proto::{multiplex, pipeline};
/// All errors that can occur during the use of tarpc.
#[derive(Debug)]
@@ -76,20 +75,11 @@ impl<E: SerializableError> StdError for Error<E> {
}
}
impl<E> From<pipeline::Error<Error<E>>> for Error<E> {
fn from(err: pipeline::Error<Error<E>>) -> Self {
impl<E> From<proto::Error<Error<E>>> for Error<E> {
fn from(err: proto::Error<Error<E>>) -> Self {
match err {
pipeline::Error::Transport(e) => e,
pipeline::Error::Io(e) => e.into(),
}
}
}
impl<E> From<multiplex::Error<Error<E>>> for Error<E> {
fn from(err: multiplex::Error<Error<E>>) -> Self {
match err {
multiplex::Error::Transport(e) => e,
multiplex::Error::Io(e) => e.into(),
proto::Error::Transport(e) => e,
proto::Error::Io(e) => e.into(),
}
}
}

View File

@@ -7,12 +7,13 @@ use serde;
use futures::Async;
use bincode::{SizeLimit, serde as bincode};
use byteorder::BigEndian;
use bytes::{BlockBuf, BlockBufCursor, Buf, MutBuf};
use bytes::{Buf, MutBuf};
use bytes::buf::{BlockBuf, BlockBufCursor};
use std::{cmp, io, mem};
use std::marker::PhantomData;
use util::Never;
use tokio_core::io::{FramedIo, Io};
use tokio_proto::{self as proto, pipeline};
use tokio_proto::{self as proto, multiplex};
/// Handles the IO of tarpc messages.
pub struct Framed<I, In, Out> {
@@ -38,7 +39,7 @@ impl<I, In, Out> Framed<I, In, Out> {
}
/// The type of message sent and received by the transport.
pub type Frame<T> = pipeline::Frame<T, Never, io::Error>;
pub type Frame<T> = multiplex::Frame<T, Never, io::Error>;
impl<I, In, Out> FramedIo for Framed<I, In, Out>
where I: Io,
@@ -75,8 +76,12 @@ struct Parser<T> {
}
enum ParserState {
Len,
Id,
Len {
id: u64,
},
Payload {
id: u64,
len: u64,
}
}
@@ -84,7 +89,7 @@ enum ParserState {
impl<T> Parser<T> {
fn new() -> Self {
Parser {
state: ParserState::Len,
state: ParserState::Id,
_phantom_data: PhantomData,
}
}
@@ -100,26 +105,36 @@ impl<T> proto::Parse for Parser<T>
loop {
match self.state {
Len if buf.len() < mem::size_of::<u64>() => return None,
Len => {
self.state = Payload { len: buf.buf().read_u64::<BigEndian>() };
Id if buf.len() < mem::size_of::<u64>() => return None,
Id => {
self.state = Len {
id: buf.buf().read_u64::<BigEndian>()
};
buf.shift(mem::size_of::<u64>());
}
Payload { len } if buf.len() < len as usize => return None,
Payload { len } => {
Len { .. } if buf.len() < mem::size_of::<u64>() => return None,
Len { id }=> {
self.state = Payload {
id: id,
len: buf.buf().read_u64::<BigEndian>()
};
buf.shift(mem::size_of::<u64>());
}
Payload { len, .. } if buf.len() < len as usize => return None,
Payload { id, len } => {
match bincode::deserialize_from(&mut BlockBufReader::new(buf),
SizeLimit::Infinite)
{
Ok(msg) => {
buf.shift(len as usize);
self.state = Len;
return Some(pipeline::Frame::Message(Ok(msg)));
self.state = Id;
return Some(multiplex::Frame::Message(id, Ok(msg)));
}
Err(err) => {
// Clear any unread bytes so we don't read garbage on next request.
let buf_len = buf.len();
buf.shift(buf_len);
return Some(pipeline::Frame::Message(Err(err)));
return Some(multiplex::Frame::Message(id, Err(err)));
}
}
}
@@ -142,10 +157,11 @@ impl<T> proto::Serialize for Serializer<T>
type In = Frame<T>;
fn serialize(&mut self, msg: Self::In, buf: &mut BlockBuf) {
use tokio_proto::pipeline::Frame::*;
use tokio_proto::multiplex::Frame::*;
match msg {
Message(msg) => {
Message(id, msg) => {
buf.write_u64::<BigEndian>(id);
buf.write_u64::<BigEndian>(bincode::serialized_size(&msg));
bincode::serialize_into(&mut BlockBufWriter::new(buf),
&msg,
@@ -153,7 +169,7 @@ impl<T> proto::Serialize for Serializer<T>
// TODO(tikue): handle err
.expect("In bincode::serialize_into");
}
Error(e) => panic!("Unexpected error in Serializer::serialize: {}", e),
Error(id, e) => panic!("Unexpected error in Serializer::serialize, id={}: {}", id, e),
MessageWithBody(..) | Body(..) | Done => unreachable!(),
}
@@ -207,7 +223,7 @@ impl<'a> io::Write for BlockBufWriter<'a> {
fn serialize() {
use tokio_proto::{Parse, Serialize};
const MSG: Frame<(char, char, char)> = pipeline::Frame::Message(('a', 'b', 'c'));
const MSG: Frame<(char, char, char)> = multiplex::Frame::Message(4, ('a', 'b', 'c'));
let mut buf = BlockBuf::default();
// Serialize twice to check for idempotence.
@@ -216,7 +232,8 @@ fn serialize() {
let actual: Option<Frame<Result<(char, char, char), bincode::DeserializeError>>> = Parser::new().parse(&mut buf);
match actual {
Some(pipeline::Frame::Message(ref v)) if *v.as_ref().unwrap() == MSG.unwrap_msg() => {} // good,
Some(multiplex::Frame::Message(id, ref v)) if id == MSG.request_id().unwrap() &&
*v.as_ref().unwrap() == MSG.unwrap_msg() => {} // good,
bad => panic!("Expected {:?}, but got {:?}", Some(MSG), bad),
}

View File

@@ -469,7 +469,7 @@ macro_rules! service {
Err(__tarpc_service_deserialize_err) => {
return __tarpc_service_FutureReply::DeserializeError(
$crate::futures::finished(
$crate::tokio_proto::pipeline::Message::WithoutBody(
$crate::tokio_proto::Message::WithoutBody(
::std::result::Result::Err(
$crate::WireError::ServerDeserialize(
::std::string::ToString::to_string(&__tarpc_service_deserialize_err))))));
@@ -483,7 +483,7 @@ macro_rules! service {
__tarpc_service_response: ::std::result::Result<$out, $error>)
-> __tarpc_service_Future
{
$crate::futures::finished($crate::tokio_proto::pipeline::Message::WithoutBody(
$crate::futures::finished($crate::tokio_proto::Message::WithoutBody(
__tarpc_service_response
.map(__tarpc_service_Response::$fn_name)
.map_err(|__tarpc_service_error| $crate::WireError::App(__tarpc_service_Error::$fn_name(__tarpc_service_error)))
@@ -585,7 +585,7 @@ macro_rules! service {
impl<S> SyncServiceExt for S where S: SyncService {}
#[allow(unused)]
#[derive(Clone, Debug)]
#[derive(Debug)]
/// The client stub that makes RPC calls to the server. Exposes a blocking interface.
pub struct SyncClient(FutureClient);
@@ -643,7 +643,7 @@ macro_rules! service {
}
#[allow(unused)]
#[derive(Clone, Debug)]
#[derive(Debug)]
/// The client stub that makes RPC calls to the server. Exposes a Future interface.
pub struct FutureClient(__tarpc_service_Client);
@@ -774,15 +774,6 @@ mod functional_test {
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
}
#[test]
fn clone() {
let handle = Server.listen("localhost:0".first_socket_addr()).unwrap();
let client1 = SyncClient::connect(handle.local_addr()).unwrap();
let client2 = client1.clone();
assert_eq!(3, client1.add(1, 2).unwrap());
assert_eq!(3, client2.add(1, 2).unwrap());
}
#[test]
fn other_service() {
let _ = env_logger::init();
@@ -829,16 +820,6 @@ mod functional_test {
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap());
}
#[test]
fn clone() {
let _ = env_logger::init();
let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
let client1 = FutureClient::connect(handle.local_addr()).wait().unwrap();
let client2 = client1.clone();
assert_eq!(3, client1.add(1, 2).wait().unwrap());
assert_eq!(3, client2.add(1, 2).wait().unwrap());
}
#[test]
fn other_service() {
let _ = env_logger::init();

View File

@@ -13,13 +13,13 @@ use serde::{Deserialize, Serialize};
use std::io;
use std::net::SocketAddr;
use tokio_core::reactor::Handle;
use tokio_proto::pipeline;
use tokio_proto::{self as proto, multiplex};
use tokio_proto::server::{self, ServerHandle};
use tokio_service::NewService;
use util::Never;
/// A message from server to client.
pub type Response<T, E> = pipeline::Message<Result<T, WireError<E>>, Empty<Never, io::Error>>;
pub type Response<T, E> = proto::Message<Result<T, WireError<E>>, Empty<Never, io::Error>>;
/// Spawns a service that binds to the given address and runs on the default reactor core.
pub fn listen<S, Req, Resp, E>(addr: SocketAddr, new_service: S) -> ListenFuture
@@ -48,7 +48,7 @@ pub fn listen_with<S, Req, Resp, E>(addr: SocketAddr, new_service: S, handle: &H
E: Serialize,
{
server::listen(handle, addr, move |stream| {
pipeline::Server::new(new_service.new_service()?, Framed::new(stream))
Ok(multiplex::Server::new(new_service.new_service()?, Framed::new(stream)))
})
}