diff --git a/src/client.rs b/src/client.rs index 4ec8be7..f4f4267 100644 --- a/src/client.rs +++ b/src/client.rs @@ -63,7 +63,7 @@ impl fmt::Debug for Client { /// Exposes a trait for connecting asynchronously to servers. pub mod future { use futures::{self, Async, Future}; - use protocol::{LOOP_HANDLE, new_transport}; + use framed::{REMOTE, Framed}; use serde::{Deserialize, Serialize}; use std::cell::RefCell; use std::io; @@ -112,13 +112,13 @@ pub mod future { fn connect(addr: &SocketAddr) -> ClientFuture { let addr = *addr; let (tx, rx) = futures::oneshot(); - LOOP_HANDLE.spawn(move |handle| { + REMOTE.spawn(move |handle| { let handle2 = handle.clone(); TcpStream::connect(&addr, handle) .and_then(move |tcp| { let tcp = RefCell::new(Some(tcp)); let c = try!(pipeline::connect(&handle2, move || { - Ok(new_transport(tcp.borrow_mut().take().unwrap())) + Ok(Framed::new(tcp.borrow_mut().take().unwrap())) })); Ok(Client { inner: c }) }) diff --git a/src/protocol.rs b/src/framed.rs similarity index 91% rename from src/protocol.rs rename to src/framed.rs index da18c9b..43d354c 100644 --- a/src/protocol.rs +++ b/src/framed.rs @@ -18,7 +18,7 @@ use tokio_proto::{self as proto, pipeline}; lazy_static! { #[doc(hidden)] - pub static ref LOOP_HANDLE: Remote = { + pub static ref REMOTE: Remote = { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let mut lupe = Core::new().unwrap(); @@ -35,6 +35,24 @@ pub struct Framed { inner: proto::Framed, Serializer>, } +impl Framed { + /// Constructs a new tarpc FramedIo + pub fn new(upstream: I) -> Framed + where I: Io, + In: serde::Serialize, + Out: serde::Deserialize, + { + Framed { + inner: proto::Framed::new(upstream, + Parser::new(), + Serializer::new(), + BlockBuf::new(128, 8_192), + BlockBuf::new(128, 8_192)) + } + } + +} + /// The type of message sent and received by the transport. pub type Frame = pipeline::Frame; @@ -67,21 +85,6 @@ impl FramedIo for Framed } } -/// Constructs a new tarpc FramedIo -pub fn new_transport(upstream: I) -> Framed - where I: Io, - In: serde::Serialize, - Out: serde::Deserialize, -{ - Framed { - inner: proto::Framed::new(upstream, - Parser::new(), - Serializer::new(), - BlockBuf::new(128, 8_192), - BlockBuf::new(128, 8_192)) - } -} - struct Parser { state: ParserState, _phantom_data: PhantomData diff --git a/src/lib.rs b/src/lib.rs index 7308e2b..7d13dc9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -95,7 +95,7 @@ pub use client::future::ClientFuture; #[doc(hidden)] pub use errors::{WireError}; #[doc(hidden)] -pub use protocol::{new_transport, Framed}; +pub use framed::Framed; #[doc(hidden)] pub use server::{ListenFuture, Response, listen_pipeline}; @@ -110,8 +110,8 @@ mod macros; mod client; /// Provides the base server boilerplate used by service implementations. mod server; -/// Provides the tarpc client and server, which implements the tarpc protocol. -/// The protocol is defined by the implementation. -mod protocol; +/// Provides an implementation of `FramedIo` that implements the tarpc protocol. +/// The tarpc protocol is defined by the `FramedIo` implementation. +mod framed; /// Provides a few different error types. mod errors; diff --git a/src/server.rs b/src/server.rs index 3df7e86..d53f46c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -7,7 +7,7 @@ use bincode::serde::DeserializeError; use errors::WireError; use futures::{self, Async, Future}; use futures::stream::Empty; -use protocol::{LOOP_HANDLE, new_transport}; +use framed::{REMOTE, Framed}; use serde::{Deserialize, Serialize}; use std::io; use std::net::ToSocketAddrs; @@ -33,9 +33,9 @@ pub fn listen_pipeline(addr: A, new_service: S) -> ListenFut let addr = addr.to_socket_addrs().unwrap().next().unwrap(); let (tx, rx) = futures::oneshot(); - LOOP_HANDLE.spawn(move |handle| { + REMOTE.spawn(move |handle| { Ok(tx.complete(server::listen(handle, addr, move |stream| { - pipeline::Server::new(new_service.new_service()?, new_transport(stream)) + pipeline::Server::new(new_service.new_service()?, Framed::new(stream)) }).unwrap())) }); ListenFuture { inner: rx }