From 5bace01f2b9e64dd2a5a893f270d5d30cff5e8c9 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Mon, 26 Sep 2016 23:44:22 -0700 Subject: [PATCH] Finish the multiplex implementation --- examples/pubsub.rs | 22 +++++++------ examples/server_calling_server.rs | 17 ++++++++-- src/client.rs | 19 ++++------- src/errors.rs | 20 +++--------- src/framed.rs | 53 ++++++++++++++++++++----------- src/macros.rs | 27 +++------------- src/server.rs | 6 ++-- 7 files changed, 79 insertions(+), 85 deletions(-) diff --git a/examples/pubsub.rs b/examples/pubsub.rs index b7d9380..d9df6e3 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -44,7 +44,6 @@ pub mod publisher { #[derive(Clone, Debug)] struct Subscriber { id: u32, - publisher: publisher::SyncClient, } impl subscriber::FutureService for Subscriber { @@ -57,16 +56,13 @@ impl subscriber::FutureService for Subscriber { } impl Subscriber { - fn new(id: u32, publisher: publisher::SyncClient) -> tokio::server::ServerHandle { - let subscriber = Subscriber { + fn new(id: u32) -> tokio::server::ServerHandle { + Subscriber { id: id, - publisher: publisher.clone(), } .listen("localhost:0".first_socket_addr()) .wait() - .unwrap(); - publisher.subscribe(id, *subscriber.local_addr()).unwrap(); - subscriber + .unwrap() } } @@ -122,9 +118,15 @@ impl publisher::FutureService for Publisher { fn main() { let _ = env_logger::init(); let publisher = Publisher::new().listen("localhost:0".first_socket_addr()).wait().unwrap(); - let publisher = publisher::SyncClient::connect(publisher.local_addr()).unwrap(); - let _subscriber1 = Subscriber::new(0, publisher.clone()); - let _subscriber2 = Subscriber::new(1, publisher.clone()); + let publisher_addr = publisher.local_addr(); + let publisher = publisher::SyncClient::connect(publisher_addr).unwrap(); + + let subscriber1 = Subscriber::new(0); + publisher.subscribe(0, *subscriber1.local_addr()).unwrap(); + + let subscriber2 = Subscriber::new(1); + publisher.subscribe(1, *subscriber2.local_addr()).unwrap(); + println!("Broadcasting..."); publisher.broadcast("hello to all".to_string()).unwrap(); diff --git a/examples/server_calling_server.rs b/examples/server_calling_server.rs index c3a1a1b..788c366 100644 --- a/examples/server_calling_server.rs +++ b/examples/server_calling_server.rs @@ -10,9 +10,10 @@ extern crate tarpc; extern crate futures; -use futures::{BoxFuture, Future}; use add::{FutureService as AddFutureService, FutureServiceExt as AddExt}; use double::{FutureService as DoubleFutureService, FutureServiceExt as DoubleExt}; +use futures::{BoxFuture, Future}; +use std::sync::{Arc, Mutex}; use tarpc::util::{FirstSocketAddr, Message, Never}; use tarpc::future::Connect as Fc; use tarpc::sync::Connect as Sc; @@ -46,7 +47,15 @@ impl AddFutureService for AddServer { #[derive(Clone)] struct DoubleServer { - client: add::FutureClient, + client: Arc>, +} + +impl DoubleServer { + fn new(client: add::FutureClient) -> Self { + DoubleServer { + client: Arc::new(Mutex::new(client)) + } + } } impl DoubleFutureService for DoubleServer { @@ -54,6 +63,8 @@ impl DoubleFutureService for DoubleServer { fn double(&self, x: i32) -> Self::DoubleFut { self.client + .lock() + .unwrap() .add(x, x) .map_err(|e| e.to_string().into()) .boxed() @@ -63,7 +74,7 @@ impl DoubleFutureService for DoubleServer { fn main() { let add = AddServer.listen("localhost:0".first_socket_addr()).wait().unwrap(); let add_client = add::FutureClient::connect(add.local_addr()).wait().unwrap(); - let double = DoubleServer { client: add_client }; + let double = DoubleServer::new(add_client); let double = double.listen("localhost:0".first_socket_addr()).wait().unwrap(); let double_client = double::SyncClient::connect(double.local_addr()).unwrap(); diff --git a/src/client.rs b/src/client.rs index 7891a1c..1d69bfd 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,13 +3,12 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use WireError; +use {WireError, tokio_proto as proto}; use bincode::serde::DeserializeError; use futures::{Async, BoxFuture, Future}; use futures::stream::Empty; use std::fmt; use std::io; -use tokio_proto::pipeline; use tokio_service::Service; use util::Never; @@ -18,19 +17,13 @@ use util::Never; /// Typically, this would be combined with a serialization pre-processing step /// and a deserialization post-processing step. pub struct Client { - inner: pipeline::Client>, DeserializeError>, Empty, io::Error>, } -impl Clone for Client { - fn clone(&self) -> Self { - Client { inner: self.inner.clone() } - } -} - impl Service for Client where Req: Send + 'static, Resp: Send + 'static, @@ -46,7 +39,7 @@ impl Service for Client } fn call(&self, request: Self::Request) -> Self::Future { - self.inner.call(pipeline::Message::WithoutBody(request)) + self.inner.call(proto::Message::WithoutBody(request)) .map(|r| r.map(|r| r.map_err(::Error::from)) .map_err(::Error::ClientDeserialize) .and_then(|r| r)) @@ -71,7 +64,7 @@ pub mod future { use std::net::SocketAddr; use super::Client; use tokio_core::net::TcpStream; - use tokio_proto::pipeline; + use tokio_proto::multiplex; /// Types that can connect to a server asynchronously. @@ -118,9 +111,9 @@ pub mod future { TcpStream::connect(&addr, handle) .and_then(move |tcp| { let tcp = RefCell::new(Some(tcp)); - let c = try!(pipeline::connect(&handle2, move || { + let c = multiplex::connect(move || { Ok(Framed::new(tcp.borrow_mut().take().unwrap())) - })); + }, &handle2); Ok(Client { inner: c }) }) .then(|client| Ok(tx.complete(client))) diff --git a/src/errors.rs b/src/errors.rs index f0cab7e..1668ec9 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -3,11 +3,10 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use bincode; +use {bincode, tokio_proto as proto}; use serde::{Deserialize, Serialize}; use std::{fmt, io}; use std::error::Error as StdError; -use tokio_proto::{multiplex, pipeline}; /// All errors that can occur during the use of tarpc. #[derive(Debug)] @@ -76,20 +75,11 @@ impl StdError for Error { } } -impl From>> for Error { - fn from(err: pipeline::Error>) -> Self { +impl From>> for Error { + fn from(err: proto::Error>) -> Self { match err { - pipeline::Error::Transport(e) => e, - pipeline::Error::Io(e) => e.into(), - } - } -} - -impl From>> for Error { - fn from(err: multiplex::Error>) -> Self { - match err { - multiplex::Error::Transport(e) => e, - multiplex::Error::Io(e) => e.into(), + proto::Error::Transport(e) => e, + proto::Error::Io(e) => e.into(), } } } diff --git a/src/framed.rs b/src/framed.rs index af518ff..4e6522c 100644 --- a/src/framed.rs +++ b/src/framed.rs @@ -7,12 +7,13 @@ use serde; use futures::Async; use bincode::{SizeLimit, serde as bincode}; use byteorder::BigEndian; -use bytes::{BlockBuf, BlockBufCursor, Buf, MutBuf}; +use bytes::{Buf, MutBuf}; +use bytes::buf::{BlockBuf, BlockBufCursor}; use std::{cmp, io, mem}; use std::marker::PhantomData; use util::Never; use tokio_core::io::{FramedIo, Io}; -use tokio_proto::{self as proto, pipeline}; +use tokio_proto::{self as proto, multiplex}; /// Handles the IO of tarpc messages. pub struct Framed { @@ -38,7 +39,7 @@ impl Framed { } /// The type of message sent and received by the transport. -pub type Frame = pipeline::Frame; +pub type Frame = multiplex::Frame; impl FramedIo for Framed where I: Io, @@ -75,8 +76,12 @@ struct Parser { } enum ParserState { - Len, + Id, + Len { + id: u64, + }, Payload { + id: u64, len: u64, } } @@ -84,7 +89,7 @@ enum ParserState { impl Parser { fn new() -> Self { Parser { - state: ParserState::Len, + state: ParserState::Id, _phantom_data: PhantomData, } } @@ -100,26 +105,36 @@ impl proto::Parse for Parser loop { match self.state { - Len if buf.len() < mem::size_of::() => return None, - Len => { - self.state = Payload { len: buf.buf().read_u64::() }; + Id if buf.len() < mem::size_of::() => return None, + Id => { + self.state = Len { + id: buf.buf().read_u64::() + }; buf.shift(mem::size_of::()); } - Payload { len } if buf.len() < len as usize => return None, - Payload { len } => { + Len { .. } if buf.len() < mem::size_of::() => return None, + Len { id }=> { + self.state = Payload { + id: id, + len: buf.buf().read_u64::() + }; + buf.shift(mem::size_of::()); + } + Payload { len, .. } if buf.len() < len as usize => return None, + Payload { id, len } => { match bincode::deserialize_from(&mut BlockBufReader::new(buf), SizeLimit::Infinite) { Ok(msg) => { buf.shift(len as usize); - self.state = Len; - return Some(pipeline::Frame::Message(Ok(msg))); + self.state = Id; + return Some(multiplex::Frame::Message(id, Ok(msg))); } Err(err) => { // Clear any unread bytes so we don't read garbage on next request. let buf_len = buf.len(); buf.shift(buf_len); - return Some(pipeline::Frame::Message(Err(err))); + return Some(multiplex::Frame::Message(id, Err(err))); } } } @@ -142,10 +157,11 @@ impl proto::Serialize for Serializer type In = Frame; fn serialize(&mut self, msg: Self::In, buf: &mut BlockBuf) { - use tokio_proto::pipeline::Frame::*; + use tokio_proto::multiplex::Frame::*; match msg { - Message(msg) => { + Message(id, msg) => { + buf.write_u64::(id); buf.write_u64::(bincode::serialized_size(&msg)); bincode::serialize_into(&mut BlockBufWriter::new(buf), &msg, @@ -153,7 +169,7 @@ impl proto::Serialize for Serializer // TODO(tikue): handle err .expect("In bincode::serialize_into"); } - Error(e) => panic!("Unexpected error in Serializer::serialize: {}", e), + Error(id, e) => panic!("Unexpected error in Serializer::serialize, id={}: {}", id, e), MessageWithBody(..) | Body(..) | Done => unreachable!(), } @@ -207,7 +223,7 @@ impl<'a> io::Write for BlockBufWriter<'a> { fn serialize() { use tokio_proto::{Parse, Serialize}; - const MSG: Frame<(char, char, char)> = pipeline::Frame::Message(('a', 'b', 'c')); + const MSG: Frame<(char, char, char)> = multiplex::Frame::Message(4, ('a', 'b', 'c')); let mut buf = BlockBuf::default(); // Serialize twice to check for idempotence. @@ -216,7 +232,8 @@ fn serialize() { let actual: Option>> = Parser::new().parse(&mut buf); match actual { - Some(pipeline::Frame::Message(ref v)) if *v.as_ref().unwrap() == MSG.unwrap_msg() => {} // good, + Some(multiplex::Frame::Message(id, ref v)) if id == MSG.request_id().unwrap() && + *v.as_ref().unwrap() == MSG.unwrap_msg() => {} // good, bad => panic!("Expected {:?}, but got {:?}", Some(MSG), bad), } diff --git a/src/macros.rs b/src/macros.rs index 05055ad..29c0426 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -469,7 +469,7 @@ macro_rules! service { Err(__tarpc_service_deserialize_err) => { return __tarpc_service_FutureReply::DeserializeError( $crate::futures::finished( - $crate::tokio_proto::pipeline::Message::WithoutBody( + $crate::tokio_proto::Message::WithoutBody( ::std::result::Result::Err( $crate::WireError::ServerDeserialize( ::std::string::ToString::to_string(&__tarpc_service_deserialize_err)))))); @@ -483,7 +483,7 @@ macro_rules! service { __tarpc_service_response: ::std::result::Result<$out, $error>) -> __tarpc_service_Future { - $crate::futures::finished($crate::tokio_proto::pipeline::Message::WithoutBody( + $crate::futures::finished($crate::tokio_proto::Message::WithoutBody( __tarpc_service_response .map(__tarpc_service_Response::$fn_name) .map_err(|__tarpc_service_error| $crate::WireError::App(__tarpc_service_Error::$fn_name(__tarpc_service_error))) @@ -585,7 +585,7 @@ macro_rules! service { impl SyncServiceExt for S where S: SyncService {} #[allow(unused)] - #[derive(Clone, Debug)] + #[derive(Debug)] /// The client stub that makes RPC calls to the server. Exposes a blocking interface. pub struct SyncClient(FutureClient); @@ -643,7 +643,7 @@ macro_rules! service { } #[allow(unused)] - #[derive(Clone, Debug)] + #[derive(Debug)] /// The client stub that makes RPC calls to the server. Exposes a Future interface. pub struct FutureClient(__tarpc_service_Client); @@ -774,15 +774,6 @@ mod functional_test { assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap()); } - #[test] - fn clone() { - let handle = Server.listen("localhost:0".first_socket_addr()).unwrap(); - let client1 = SyncClient::connect(handle.local_addr()).unwrap(); - let client2 = client1.clone(); - assert_eq!(3, client1.add(1, 2).unwrap()); - assert_eq!(3, client2.add(1, 2).unwrap()); - } - #[test] fn other_service() { let _ = env_logger::init(); @@ -829,16 +820,6 @@ mod functional_test { assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap()); } - #[test] - fn clone() { - let _ = env_logger::init(); - let handle = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let client1 = FutureClient::connect(handle.local_addr()).wait().unwrap(); - let client2 = client1.clone(); - assert_eq!(3, client1.add(1, 2).wait().unwrap()); - assert_eq!(3, client2.add(1, 2).wait().unwrap()); - } - #[test] fn other_service() { let _ = env_logger::init(); diff --git a/src/server.rs b/src/server.rs index 9dbee23..58f6e18 100644 --- a/src/server.rs +++ b/src/server.rs @@ -13,13 +13,13 @@ use serde::{Deserialize, Serialize}; use std::io; use std::net::SocketAddr; use tokio_core::reactor::Handle; -use tokio_proto::pipeline; +use tokio_proto::{self as proto, multiplex}; use tokio_proto::server::{self, ServerHandle}; use tokio_service::NewService; use util::Never; /// A message from server to client. -pub type Response = pipeline::Message>, Empty>; +pub type Response = proto::Message>, Empty>; /// Spawns a service that binds to the given address and runs on the default reactor core. pub fn listen(addr: SocketAddr, new_service: S) -> ListenFuture @@ -48,7 +48,7 @@ pub fn listen_with(addr: SocketAddr, new_service: S, handle: &H E: Serialize, { server::listen(handle, addr, move |stream| { - pipeline::Server::new(new_service.new_service()?, Framed::new(stream)) + Ok(multiplex::Server::new(new_service.new_service()?, Framed::new(stream))) }) }