From be5f55c5f6309ea6de1e0758e4cd312b20b1914b Mon Sep 17 00:00:00 2001 From: Tim Date: Wed, 14 Sep 2016 01:19:24 -0700 Subject: [PATCH] Extend snake_to_camel plugin to replace {} in the doc string with the original snake-cased ident. (#50) * Extend snake_to_camel plugin to replace {} in the doc string with the origin snake-cased ident. Also, track tokio-rs master. This is really ad-hoc, undiscoverable, and unintuitive, but there's no way to programmatically create doc strings in regular code, and I want to produce better doc strings for the associated types. Given `fn foo_bar`: Before: `/// The type of future returned by the function of the same name.` After: ``/// The type of future returned by `{}`.`` => `/// The type of future returned by foo_bar.` * Fix some docs * Use a helper fn on pipeline::Frame instead of handrolled match. * Don't hide docs for ClientFuture. It's exposed in the Connect impl of FutureService -- the tradeoff for not generating *another* item -- and hiding it breaks doc links. * Formatting * Rename snake_to_camel plugin => tarpc-plugins * Update README * Mangle a lot of names in macro expansion. To lower the chance of any issues, prefix idents in service expansion with __tarpc_service. In future_enum, prefix with __future_enum. The pattern is basically __macro_name_ident. Any imported enum variant will conflict with a let binding or a function arg, so we basically can't use any generic idents at all. Example: enum Req { request(..) } use self::Req::request; fn make_request(request: Request) { ... } ^^^^^^^ conflict here Additionally, suffix generated associated types with Fut to avoid conflicts with camelcased rpcs. Why someone would do that, I don't know, but we shouldn't allow that wart. --- Cargo.toml | 4 +- README.md | 4 +- benches/latency.rs | 6 +- examples/concurrency.rs | 8 +- examples/pubsub.rs | 22 +- examples/readme.rs | 4 +- examples/readme2.rs | 4 +- examples/server_calling_server.rs | 22 +- examples/throughput.rs | 9 +- examples/two_clients.rs | 15 +- src/client.rs | 74 +++-- src/errors.rs | 3 +- src/lib.rs | 8 +- src/macros.rs | 305 +++++++++++---------- src/{snake_to_camel => plugins}/Cargo.toml | 2 +- src/{snake_to_camel => plugins}/src/lib.rs | 93 +++++-- src/protocol/mod.rs | 58 ++-- src/protocol/reader.rs | 47 +--- src/protocol/writer.rs | 44 +-- src/server.rs | 71 +++-- 20 files changed, 422 insertions(+), 381 deletions(-) rename src/{snake_to_camel => plugins}/Cargo.toml (84%) rename src/{snake_to_camel => plugins}/src/lib.rs (55%) diff --git a/Cargo.toml b/Cargo.toml index 3de3324..e036e91 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ description = "An RPC framework for Rust with a focus on ease of use." [dependencies] bincode = "0.6" byteorder = "0.5" -bytes = "0.3" +bytes = { git = "https://github.com/carllerche/bytes" } futures = { git = "https://github.com/alexcrichton/futures-rs" } futures-cpupool = { git = "https://github.com/alexcrichton/futures-rs" } lazy_static = "0.2" @@ -21,7 +21,7 @@ log = "0.3" scoped-pool = "1.0" serde = "0.8" serde_derive = "0.8" -snake_to_camel = { path = "src/snake_to_camel" } +tarpc-plugins = { path = "src/plugins" } take = "0.1" tokio-service = { git = "https://github.com/tokio-rs/tokio-service" } tokio-proto = { git = "https://github.com/tokio-rs/tokio-proto" } diff --git a/README.md b/README.md index 25896a3..c65f0b5 100644 --- a/README.md +++ b/README.md @@ -38,14 +38,14 @@ Add to your `Cargo.toml` dependencies: ```toml tarpc = { git = "https://github.com/google/tarpc" } -snake_to_camel = { git = "https://github.com/google/tarpc" } +tarpc-plugins = { git = "https://github.com/google/tarpc" } ``` ## Example ```rust // required by `FutureClient` (not used in this example) #![feature(conservative_impl_trait, plugin)] -#![plugin(snake_to_camel)] +#![plugin(tarpc_plugins)] extern crate futures; #[macro_use] diff --git a/benches/latency.rs b/benches/latency.rs index 0a47203..46ba702 100644 --- a/benches/latency.rs +++ b/benches/latency.rs @@ -4,7 +4,7 @@ // This file may not be copied, modified, or distributed except according to those terms. #![feature(plugin, conservative_impl_trait, test)] -#![plugin(snake_to_camel)] +#![plugin(tarpc_plugins)] #[macro_use] extern crate tarpc; @@ -26,8 +26,8 @@ service! { struct Server; impl FutureService for Server { - type Ack = futures::Finished<(), Never>; - fn ack(&self) -> Self::Ack { + type AckFut = futures::Finished<(), Never>; + fn ack(&self) -> Self::AckFut { futures::finished(()) } } diff --git a/examples/concurrency.rs b/examples/concurrency.rs index 17b85d5..fc26935 100644 --- a/examples/concurrency.rs +++ b/examples/concurrency.rs @@ -4,7 +4,7 @@ // This file may not be copied, modified, or distributed except according to those terms. #![feature(inclusive_range_syntax, conservative_impl_trait, plugin)] -#![plugin(snake_to_camel)] +#![plugin(tarpc_plugins)] extern crate chrono; extern crate env_logger; @@ -36,9 +36,9 @@ impl Server { } impl FutureService for Server { - type Read = CpuFuture, Never>; + type ReadFut = CpuFuture, Never>; - fn read(&self, size: u32) -> Self::Read { + fn read(&self, size: u32) -> Self::ReadFut { self.0 .spawn(futures::lazy(move || { let mut vec: Vec = Vec::with_capacity(size as usize); @@ -105,7 +105,7 @@ const MAX_CONCURRENCY: u32 = 100; fn main() { let _ = env_logger::init(); - let server = Server::new().listen("localhost:0").unwrap(); + let server = Server::new().listen("localhost:0").wait().unwrap(); println!("Server listening on {}.", server.local_addr()); let clients: Vec<_> = (1...5) .map(|i| { diff --git a/examples/pubsub.rs b/examples/pubsub.rs index 3d7995f..dc1b2b4 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -4,7 +4,7 @@ // This file may not be copied, modified, or distributed except according to those terms. #![feature(conservative_impl_trait, plugin)] -#![plugin(snake_to_camel)] +#![plugin(tarpc_plugins)] extern crate env_logger; extern crate futures; @@ -48,8 +48,9 @@ struct Subscriber { } impl subscriber::FutureService for Subscriber { - type Receive = futures::Finished<(), Never>; - fn receive(&self, message: String) -> Self::Receive { + type ReceiveFut = futures::Finished<(), Never>; + + fn receive(&self, message: String) -> Self::ReceiveFut { println!("{} received message: {}", self.id, message); futures::finished(()) } @@ -62,6 +63,7 @@ impl Subscriber { publisher: publisher.clone(), } .listen("localhost:0") + .wait() .unwrap(); publisher.subscribe(&id, &subscriber.local_addr()).unwrap(); subscriber @@ -80,9 +82,9 @@ impl Publisher { } impl publisher::FutureService for Publisher { - type Broadcast = BoxFuture<(), Never>; + type BroadcastFut = BoxFuture<(), Never>; - fn broadcast(&self, message: String) -> Self::Broadcast { + fn broadcast(&self, message: String) -> Self::BroadcastFut { futures::collect(self.clients .lock() .unwrap() @@ -94,9 +96,9 @@ impl publisher::FutureService for Publisher { .boxed() } - type Subscribe = BoxFuture<(), Message>; + type SubscribeFut = BoxFuture<(), Message>; - fn subscribe(&self, id: u32, address: SocketAddr) -> BoxFuture<(), Message> { + fn subscribe(&self, id: u32, address: SocketAddr) -> Self::SubscribeFut { let clients = self.clients.clone(); subscriber::FutureClient::connect(&address) .map(move |subscriber| { @@ -108,9 +110,9 @@ impl publisher::FutureService for Publisher { .boxed() } - type Unsubscribe = BoxFuture<(), Never>; + type UnsubscribeFut = BoxFuture<(), Never>; - fn unsubscribe(&self, id: u32) -> BoxFuture<(), Never> { + fn unsubscribe(&self, id: u32) -> Self::UnsubscribeFut { println!("Unsubscribing {}", id); self.clients.lock().unwrap().remove(&id).unwrap(); futures::finished(()).boxed() @@ -119,7 +121,7 @@ impl publisher::FutureService for Publisher { fn main() { let _ = env_logger::init(); - let publisher = Publisher::new().listen("localhost:0").unwrap(); + let publisher = Publisher::new().listen("localhost:0").wait().unwrap(); let publisher = publisher::SyncClient::connect(publisher.local_addr()).unwrap(); let _subscriber1 = Subscriber::new(0, publisher.clone()); let _subscriber2 = Subscriber::new(1, publisher.clone()); diff --git a/examples/readme.rs b/examples/readme.rs index 62a2511..04f9e98 100644 --- a/examples/readme.rs +++ b/examples/readme.rs @@ -4,7 +4,7 @@ // This file may not be copied, modified, or distributed except according to those terms. #![feature(conservative_impl_trait, plugin)] -#![plugin(snake_to_camel)] +#![plugin(tarpc_plugins)] extern crate futures; #[macro_use] @@ -28,7 +28,7 @@ impl SyncService for HelloServer { fn main() { let addr = "localhost:10000"; - let _server = HelloServer.listen(addr).unwrap(); + let _server = HelloServer.listen(addr); let client = SyncClient::connect(addr).unwrap(); println!("{}", client.hello(&"Mom".to_string()).unwrap()); } diff --git a/examples/readme2.rs b/examples/readme2.rs index bfe9523..8810df9 100644 --- a/examples/readme2.rs +++ b/examples/readme2.rs @@ -4,7 +4,7 @@ // This file may not be copied, modified, or distributed except according to those terms. #![feature(conservative_impl_trait, plugin, rustc_macro)] -#![plugin(snake_to_camel)] +#![plugin(tarpc_plugins)] extern crate futures; #[macro_use] @@ -50,7 +50,7 @@ impl SyncService for HelloServer { fn main() { let addr = "localhost:10000"; - let _server = HelloServer.listen(addr).unwrap(); + let _server = HelloServer.listen(addr); let client = SyncClient::connect(addr).unwrap(); println!("{}", client.hello(&"Mom".to_string()).unwrap()); println!("{}", client.hello(&"".to_string()).unwrap_err()); diff --git a/examples/server_calling_server.rs b/examples/server_calling_server.rs index a917db6..4e71e0c 100644 --- a/examples/server_calling_server.rs +++ b/examples/server_calling_server.rs @@ -4,15 +4,15 @@ // This file may not be copied, modified, or distributed except according to those terms. #![feature(conservative_impl_trait, plugin)] -#![plugin(snake_to_camel)] +#![plugin(tarpc_plugins)] #[macro_use] extern crate tarpc; extern crate futures; use futures::{BoxFuture, Future}; -use add::{FutureService as AddService, FutureServiceExt as AddExt}; -use double::{FutureService as DoubleService, FutureServiceExt as DoubleExt}; +use add::{FutureService as AddFutureService, FutureServiceExt as AddExt}; +use double::{FutureService as DoubleFutureService, FutureServiceExt as DoubleExt}; use tarpc::util::{Never, Message}; use tarpc::future::Connect as Fc; use tarpc::sync::Connect as Sc; @@ -36,10 +36,10 @@ pub mod double { #[derive(Clone)] struct AddServer; -impl AddService for AddServer { - type Add = futures::Finished; +impl AddFutureService for AddServer { + type AddFut = futures::Finished; - fn add(&self, x: i32, y: i32) -> Self::Add { + fn add(&self, x: i32, y: i32) -> Self::AddFut { futures::finished(x + y) } } @@ -49,10 +49,10 @@ struct DoubleServer { client: add::FutureClient, } -impl DoubleService for DoubleServer { - type Double = BoxFuture; +impl DoubleFutureService for DoubleServer { + type DoubleFut = BoxFuture; - fn double(&self, x: i32) -> Self::Double { + fn double(&self, x: i32) -> Self::DoubleFut { self.client .add(&x, &x) .map_err(|e| e.to_string().into()) @@ -61,10 +61,10 @@ impl DoubleService for DoubleServer { } fn main() { - let add = AddServer.listen("localhost:0").unwrap(); + let add = AddServer.listen("localhost:0").wait().unwrap(); let add_client = add::FutureClient::connect(add.local_addr()).wait().unwrap(); let double = DoubleServer { client: add_client }; - let double = double.listen("localhost:0").unwrap(); + let double = double.listen("localhost:0").wait().unwrap(); let double_client = double::SyncClient::connect(double.local_addr()).unwrap(); for i in 0..5 { diff --git a/examples/throughput.rs b/examples/throughput.rs index 36a0a78..af53875 100644 --- a/examples/throughput.rs +++ b/examples/throughput.rs @@ -4,7 +4,7 @@ // This file may not be copied, modified, or distributed except according to those terms. #![feature(conservative_impl_trait, plugin)] -#![plugin(snake_to_camel)] +#![plugin(tarpc_plugins)] #[macro_use] extern crate lazy_static; @@ -18,6 +18,7 @@ use std::time; use std::net; use std::thread; use std::io::{Read, Write, stdout}; +use futures::Future; use tarpc::util::Never; use tarpc::sync::Connect; @@ -41,9 +42,9 @@ service! { struct Server; impl FutureService for Server { - type Read = futures::Finished>, Never>; + type ReadFut = futures::Finished>, Never>; - fn read(&self) -> Self::Read { + fn read(&self) -> Self::ReadFut { futures::finished(BUF.clone()) } } @@ -51,7 +52,7 @@ impl FutureService for Server { const CHUNK_SIZE: u32 = 1 << 19; fn bench_tarpc(target: u64) { - let handle = Server.listen("localhost:0").unwrap(); + let handle = Server.listen("localhost:0").wait().unwrap(); let client = SyncClient::connect(handle.local_addr()).unwrap(); let start = time::Instant::now(); let mut nread = 0; diff --git a/examples/two_clients.rs b/examples/two_clients.rs index 9888724..9fe3820 100644 --- a/examples/two_clients.rs +++ b/examples/two_clients.rs @@ -4,7 +4,7 @@ // This file may not be copied, modified, or distributed except according to those terms. #![feature(conservative_impl_trait, plugin)] -#![plugin(snake_to_camel)] +#![plugin(tarpc_plugins)] #[macro_use] extern crate log; @@ -16,6 +16,7 @@ extern crate futures; use bar::FutureServiceExt as BarExt; use baz::FutureServiceExt as BazExt; +use futures::Future; use tarpc::util::Never; use tarpc::sync::Connect; @@ -28,9 +29,9 @@ mod bar { #[derive(Clone)] struct Bar; impl bar::FutureService for Bar { - type Bar = futures::Finished; + type BarFut = futures::Finished; - fn bar(&self, i: i32) -> Self::Bar { + fn bar(&self, i: i32) -> Self::BarFut { futures::finished(i) } } @@ -44,9 +45,9 @@ mod baz { #[derive(Clone)] struct Baz; impl baz::FutureService for Baz { - type Baz = futures::Finished; + type BazFut = futures::Finished; - fn baz(&self, s: String) -> Self::Baz { + fn baz(&self, s: String) -> Self::BazFut { futures::finished(format!("Hello, {}!", s)) } } @@ -57,8 +58,8 @@ macro_rules! pos { fn main() { let _ = env_logger::init(); - let bar = Bar.listen("localhost:0").unwrap(); - let baz = Baz.listen("localhost:0").unwrap(); + let bar = Bar.listen("localhost:0").wait().unwrap(); + let baz = Baz.listen("localhost:0").wait().unwrap(); let bar_client = bar::SyncClient::connect(bar.local_addr()).unwrap(); let baz_client = baz::SyncClient::connect(baz.local_addr()).unwrap(); diff --git a/src/client.rs b/src/client.rs index 488f71c..2e044b7 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,26 +4,34 @@ // This file may not be copied, modified, or distributed except according to those terms. use Packet; -use futures::BoxFuture; +use futures::{Async, BoxFuture}; use futures::stream::Empty; use std::fmt; use std::io; -use tokio_service::Service; use tokio_proto::pipeline; +use tokio_service::Service; +use util::Never; -/// A thin wrapper around `pipeline::Client` that handles Serialization. +/// A client `Service` that writes and reads bytes. +/// +/// Typically, this would be combined with a serialization pre-processing step +/// and a deserialization post-processing step. #[derive(Clone)] pub struct Client { - inner: pipeline::Client, Empty<(), io::Error>, io::Error>, + inner: pipeline::Client, Empty, io::Error>, } impl Service for Client { - type Req = Packet; - type Resp = Vec; + type Request = Packet; + type Response = Vec; type Error = io::Error; - type Fut = BoxFuture, io::Error>; + type Future = BoxFuture, io::Error>; - fn call(&self, request: Packet) -> Self::Fut { + fn poll_ready(&self) -> Async<()> { + Async::Ready(()) + } + + fn call(&self, request: Packet) -> Self::Future { self.inner.call(pipeline::Message::WithoutBody(request)) } } @@ -36,28 +44,42 @@ impl fmt::Debug for Client { /// Exposes a trait for connecting asynchronously to servers. pub mod future { - use futures::{self, BoxFuture, Future}; + use futures::{self, Async, Future}; use protocol::{LOOP_HANDLE, TarpcTransport}; + use std::cell::RefCell; use std::io; use std::net::SocketAddr; use super::Client; - use take::Take; - use tokio_core::TcpStream; + use tokio_core::net::TcpStream; use tokio_proto::pipeline; /// Types that can connect to a server asynchronously. pub trait Connect: Sized { /// The type of the future returned when calling connect. - type Fut: Future; + type Fut: Future; /// Connects to a server located at the given address. fn connect(addr: &SocketAddr) -> Self::Fut; } /// A future that resolves to a `Client` or an `io::Error`. - #[doc(hidden)] - pub type ClientFuture = futures::Map, fn(TcpStream) -> Client>; + pub struct ClientFuture { + inner: futures::Oneshot>, + } + + impl Future for ClientFuture { + type Item = Client; + type Error = io::Error; + + fn poll(&mut self) -> futures::Poll { + match self.inner.poll().unwrap() { + Async::Ready(Ok(client)) => Ok(Async::Ready(client)), + Async::Ready(Err(err)) => Err(err), + Async::NotReady => Ok(Async::NotReady), + } + } + } impl Connect for Client { type Fut = ClientFuture; @@ -65,14 +87,21 @@ pub mod future { /// Starts an event loop on a thread and registers a new client /// connected to the given address. fn connect(addr: &SocketAddr) -> ClientFuture { - fn connect(stream: TcpStream) -> Client { - let loop_handle = LOOP_HANDLE.clone(); - let service = Take::new(move || Ok(TarpcTransport::new(stream))); - Client { inner: pipeline::connect(loop_handle, service) } - } - LOOP_HANDLE.clone() - .tcp_connect(addr) - .map(connect) + let addr = *addr; + let (tx, rx) = futures::oneshot(); + LOOP_HANDLE.spawn(move |handle| { + let handle2 = handle.clone(); + TcpStream::connect(&addr, handle) + .and_then(move |tcp| { + let tcp = RefCell::new(Some(tcp)); + let c = try!(pipeline::connect(&handle2, move || { + Ok(TarpcTransport::new(tcp.borrow_mut().take().unwrap())) + })); + Ok(Client { inner: c }) + }) + .then(|client| Ok(tx.complete(client))) + }); + ClientFuture { inner: rx } } } } @@ -105,4 +134,3 @@ pub mod sync { } } } - diff --git a/src/errors.rs b/src/errors.rs index 7bf943c..6c28d3f 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -4,10 +4,10 @@ // This file may not be copied, modified, or distributed except according to those terms. use bincode; +use serde::{Deserialize, Serialize}; use std::{fmt, io}; use std::error::Error as StdError; use tokio_proto::pipeline; -use serde::{Deserialize, Serialize}; /// All errors that can occur during the use of tarpc. #[derive(Debug)] @@ -121,4 +121,3 @@ pub enum WireError pub trait SerializableError: StdError + Deserialize + Serialize + Send + 'static {} impl SerializableError for E {} - diff --git a/src/lib.rs b/src/lib.rs index 871a526..c129f27 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,7 +29,7 @@ //! ``` //! // required by `FutureClient` (not used in this example) //! #![feature(conservative_impl_trait, plugin)] -//! #![plugin(snake_to_camel)] +//! #![plugin(tarpc_plugins)] //! //! #[macro_use] //! extern crate tarpc; @@ -52,7 +52,7 @@ //! //! fn main() { //! let addr = "localhost:10000"; -//! let _server = HelloServer.listen(addr).unwrap(); +//! let _server = HelloServer.listen(addr); //! let client = SyncClient::connect(addr).unwrap(); //! println!("{}", client.hello(&"Mom".to_string()).unwrap()); //! } @@ -60,7 +60,7 @@ //! #![deny(missing_docs)] #![feature(plugin, question_mark, conservative_impl_trait, never_type, rustc_macro)] -#![plugin(snake_to_camel)] +#![plugin(tarpc_plugins)] extern crate bincode; extern crate byteorder; @@ -98,7 +98,7 @@ pub use errors::{WireError}; #[doc(hidden)] pub use protocol::{Packet, deserialize}; #[doc(hidden)] -pub use server::{SerializeFuture, SerializedReply, listen, serialize_reply}; +pub use server::{ListenFuture, SerializeFuture, SerializedReply, listen, serialize_reply}; /// Provides some utility error types, as well as a trait for spawning futures on the default event /// loop. diff --git a/src/macros.rs b/src/macros.rs index 94dcec0..5c63038 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -3,6 +3,12 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. +#[doc(hidden)] +#[macro_export] +macro_rules! as_item { + ($i:item) => {$i}; +} + /// Creates an enum where each variant contains a `Future`. The created enum impls `Future`. /// Useful when a fn needs to return possibly many different types of futures. #[macro_export] @@ -14,23 +20,11 @@ macro_rules! future_enum { $($variant:ident($inner:ty)),* } } => { - $(#[$attr])* - pub enum $name<$($tp),*> { - $(#[$attrv])* - $($variant($inner)),* - } - - impl<__T, __E, $($tp),*> $crate::futures::Future for $name<$($tp),*> - where __T: ::std::marker::Send + 'static, - $($inner: $crate::futures::Future),* - { - type Item = __T; - type Error = __E; - - fn poll(&mut self) -> $crate::futures::Poll { - match *self { - $($name::$variant(ref mut f) => $crate::futures::Future::poll(f)),* - } + future_enum! { + $(#[$attr:meta])* + (pub) enum $name<$($tp),*> { + $(#[$attrv])* + $($variant($inner)),* } } }; @@ -40,19 +34,37 @@ macro_rules! future_enum { $(#[$attrv:meta])* $($variant:ident($inner:ty)),* } + } => { + future_enum! { + $(#[$attr:meta])* + () enum $name<$($tp),*> { + $(#[$attrv])* + $($variant($inner)),* + } + } + }; + { + $(#[$attr:meta])* + ($($vis:tt)*) enum $name:ident<$($tp:ident),*> { + $(#[$attrv:meta])* + $($variant:ident($inner:ty)),* + } } => { $(#[$attr])* - enum $name<$($tp),*> { - $(#[$attrv])* - $($variant($inner)),* + as_item! { + $($vis)* enum $name<$($tp),*> { + $(#[$attrv])* + $($variant($inner)),* + } } - impl<__T, __E, $($tp),*> $crate::futures::Future for $name<$($tp),*> - where __T: ::std::marker::Send + 'static, - $($inner: $crate::futures::Future),* + #[allow(non_camel_case_types)] + impl<__future_enum_T, __future_enum_E, $($tp),*> $crate::futures::Future for $name<$($tp),*> + where __future_enum_T: Send + 'static, + $($inner: $crate::futures::Future),* { - type Item = __T; - type Error = __E; + type Item = __future_enum_T; + type Error = __future_enum_E; fn poll(&mut self) -> $crate::futures::Poll { match *self { @@ -63,12 +75,6 @@ macro_rules! future_enum { } } -#[doc(hidden)] -#[macro_export] -macro_rules! as_item { - ($i:item) => {$i}; -} - #[doc(hidden)] #[macro_export] macro_rules! impl_serialize { @@ -76,18 +82,18 @@ macro_rules! impl_serialize { as_item! { impl$($lifetime)* $crate::serde::Serialize for $impler$($lifetime)* { #[inline] - fn serialize(&self, serializer: &mut S) -> ::std::result::Result<(), S::Error> + fn serialize(&self, __impl_serialize_serializer: &mut S) -> ::std::result::Result<(), S::Error> where S: $crate::serde::Serializer { match *self { $( - $impler::$name(ref field) => + $impler::$name(ref __impl_serialize_field) => $crate::serde::Serializer::serialize_newtype_variant( - serializer, + __impl_serialize_serializer, stringify!($impler), $n, stringify!($name), - field, + __impl_serialize_field, ) ),* } @@ -117,42 +123,43 @@ macro_rules! impl_serialize { macro_rules! impl_deserialize { ($impler:ident, $(@($name:ident $n:expr))* -- #($_n:expr) ) => ( impl $crate::serde::Deserialize for $impler { - #[inline] - fn deserialize(deserializer: &mut D) - -> ::std::result::Result<$impler, D::Error> - where D: $crate::serde::Deserializer + #[allow(non_camel_case_types)] + fn deserialize<__impl_deserialize_D>(__impl_deserialize_deserializer: &mut __impl_deserialize_D) + -> ::std::result::Result<$impler, __impl_deserialize_D::Error> + where __impl_deserialize_D: $crate::serde::Deserializer { #[allow(non_camel_case_types, unused)] - enum Field { + enum __impl_deserialize_Field { $($name),* } - impl $crate::serde::Deserialize for Field { + + impl $crate::serde::Deserialize for __impl_deserialize_Field { #[inline] - fn deserialize(deserializer: &mut D) - -> ::std::result::Result + fn deserialize(__impl_deserialize_deserializer: &mut D) + -> ::std::result::Result<__impl_deserialize_Field, D::Error> where D: $crate::serde::Deserializer { - struct FieldVisitor; - impl $crate::serde::de::Visitor for FieldVisitor { - type Value = Field; + struct __impl_deserialize_FieldVisitor; + impl $crate::serde::de::Visitor for __impl_deserialize_FieldVisitor { + type Value = __impl_deserialize_Field; #[inline] - fn visit_usize(&mut self, value: usize) - -> ::std::result::Result + fn visit_usize(&mut self, __impl_deserialize_value: usize) + -> ::std::result::Result<__impl_deserialize_Field, E> where E: $crate::serde::de::Error, { $( - if value == $n { - return ::std::result::Result::Ok(Field::$name); + if __impl_deserialize_value == $n { + return ::std::result::Result::Ok(__impl_deserialize_Field::$name); } )* ::std::result::Result::Err( $crate::serde::de::Error::custom( - format!("No variants have a value of {}!", value)) + format!("No variants have a value of {}!", __impl_deserialize_value)) ) } } - deserializer.deserialize_struct_field(FieldVisitor) + __impl_deserialize_deserializer.deserialize_struct_field(__impl_deserialize_FieldVisitor) } } @@ -167,7 +174,7 @@ macro_rules! impl_deserialize { { match try!(visitor.visit_variant()) { $( - Field::$name => { + __impl_deserialize_Field::$name => { let val = try!(visitor.visit_newtype()); ::std::result::Result::Ok($impler::$name(val)) } @@ -180,7 +187,7 @@ macro_rules! impl_deserialize { stringify!($name) ),* ]; - deserializer.deserialize_enum(stringify!($impler), VARIANTS, Visitor) + __impl_deserialize_deserializer.deserialize_enum(stringify!($impler), VARIANTS, Visitor) } } ); @@ -198,7 +205,7 @@ macro_rules! impl_deserialize { /// /// ``` /// # #![feature(conservative_impl_trait, plugin)] -/// # #![plugin(snake_to_camel)] +/// # #![plugin(tarpc_plugins)] /// # #[macro_use] extern crate tarpc; /// # fn main() {} /// # service! { @@ -226,7 +233,7 @@ macro_rules! impl_deserialize { /// #[macro_export] macro_rules! service { -// Entry point + // Entry point ( $( $(#[$attr:meta])* @@ -240,7 +247,7 @@ macro_rules! service { )* }} }; -// Pattern for when the next rpc has an implicit unit return type and no error type. + // Pattern for when the next rpc has an implicit unit return type and no error type. ( { $(#[$attr:meta])* @@ -259,7 +266,7 @@ macro_rules! service { rpc $fn_name( $( $arg : $in_ ),* ) -> () | $crate::util::Never; } }; -// Pattern for when the next rpc has an explicit return type and no error type. + // Pattern for when the next rpc has an explicit return type and no error type. ( { $(#[$attr:meta])* @@ -278,7 +285,7 @@ macro_rules! service { rpc $fn_name( $( $arg : $in_ ),* ) -> $out | $crate::util::Never; } }; -// Pattern for when the next rpc has an implicit unit return type and an explicit error type. + // Pattern for when the next rpc has an implicit unit return type and an explicit error type. ( { $(#[$attr:meta])* @@ -297,7 +304,7 @@ macro_rules! service { rpc $fn_name( $( $arg : $in_ ),* ) -> () | $error; } }; -// Pattern for when the next rpc has an explicit return type and an explicit error type. + // Pattern for when the next rpc has an explicit return type and an explicit error type. ( { $(#[$attr:meta])* @@ -316,7 +323,7 @@ macro_rules! service { rpc $fn_name( $( $arg : $in_ ),* ) -> $out | $error; } }; -// Pattern for when all return types have been expanded + // Pattern for when all return types have been expanded ( { } // none left to expand $( @@ -358,9 +365,9 @@ macro_rules! service { } ) => { -/// Defines the `Future` RPC service. Implementors must be `Clone`, `Send`, and `'static`, -/// as required by `tokio_proto::NewService`. This is required so that the service can be used -/// to respond to multiple requests concurrently. + /// Defines the `Future` RPC service. Implementors must be `Clone`, `Send`, and `'static`, + /// as required by `tokio_proto::NewService`. This is required so that the service can be used + /// to respond to multiple requests concurrently. pub trait FutureService: ::std::marker::Send + ::std::clone::Clone + @@ -369,7 +376,7 @@ macro_rules! service { $( snake_to_camel! { - /// The type of future returned by the fn of the same name. + /// The type of future returned by `{}`. type $fn_name: $crate::futures::Future; } @@ -381,73 +388,79 @@ macro_rules! service { /// Provides a function for starting the service. This is a separate trait from /// `FutureService` to prevent collisions with the names of RPCs. pub trait FutureServiceExt: FutureService { - /// Registers the service with the given registry, listening on the given address. - fn listen(self, addr: L) - -> ::std::io::Result<$crate::tokio_proto::server::ServerHandle> + /// Spawns the service, binding to the given address and running on + /// the default tokio `Loop`. + fn listen(self, addr: L) -> $crate::ListenFuture where L: ::std::net::ToSocketAddrs { - return $crate::listen(addr, __AsyncServer(self)); + return $crate::listen(addr, __tarpc_service_AsyncServer(self)); + #[allow(non_camel_case_types)] #[derive(Clone)] - struct __AsyncServer(S); + struct __tarpc_service_AsyncServer(S); - impl ::std::fmt::Debug for __AsyncServer { + impl ::std::fmt::Debug for __tarpc_service_AsyncServer { fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { - write!(fmt, "__AsyncServer {{ .. }}") + write!(fmt, "__tarpc_service_AsyncServer {{ .. }}") } } #[allow(non_camel_case_types)] - enum Reply { + enum __tarpc_service_Reply<__tarpc_service_S: FutureService> { DeserializeError($crate::SerializeFuture), - $($fn_name($crate::futures::Then<$crate::futures::MapErr $crate::WireError<$error>>, $crate::SerializeFuture, fn(::std::result::Result<$out, $crate::WireError<$error>>) -> $crate::SerializeFuture>)),* } - impl $crate::futures::Future for Reply { + impl $crate::futures::Future for __tarpc_service_Reply { type Item = $crate::SerializedReply; type Error = ::std::io::Error; fn poll(&mut self) -> $crate::futures::Poll { match *self { - Reply::DeserializeError(ref mut f) => $crate::futures::Future::poll(f), - $(Reply::$fn_name(ref mut f) => $crate::futures::Future::poll(f)),* + __tarpc_service_Reply::DeserializeError(ref mut f) => $crate::futures::Future::poll(f), + $(__tarpc_service_Reply::$fn_name(ref mut f) => $crate::futures::Future::poll(f)),* } } } - impl $crate::tokio_service::Service for __AsyncServer - where S: FutureService + #[allow(non_camel_case_types)] + impl<__tarpc_service_S> $crate::tokio_service::Service for __tarpc_service_AsyncServer<__tarpc_service_S> + where __tarpc_service_S: FutureService { - type Req = ::std::vec::Vec; - type Resp = $crate::SerializedReply; + type Request = ::std::vec::Vec; + type Response = $crate::SerializedReply; type Error = ::std::io::Error; - type Fut = Reply; + type Future = __tarpc_service_Reply<__tarpc_service_S>; - fn call(&self, req: Self::Req) -> Self::Fut { + fn poll_ready(&self) -> $crate::futures::Async<()> { + $crate::futures::Async::Ready(()) + } + + fn call(&self, req: Self::Request) -> Self::Future { #[allow(non_camel_case_types, unused)] #[derive(Debug)] - enum __ServerSideRequest { + enum __tarpc_service_ServerSideRequest { $( $fn_name(( $($in_,)* )) ),* } - impl_deserialize!(__ServerSideRequest, $($fn_name(($($in_),*)))*); + impl_deserialize!(__tarpc_service_ServerSideRequest, $($fn_name(($($in_),*)))*); - let request = $crate::deserialize(&req); - let request: __ServerSideRequest = match request { - ::std::result::Result::Ok(request) => request, - ::std::result::Result::Err(e) => { - return Reply::DeserializeError(deserialize_error(e)); + let __tarpc_service_request = $crate::deserialize(&req); + let __tarpc_service_request: __tarpc_service_ServerSideRequest = match __tarpc_service_request { + ::std::result::Result::Ok(__tarpc_service_request) => __tarpc_service_request, + ::std::result::Result::Err(__tarpc_service_e) => { + return __tarpc_service_Reply::DeserializeError(deserialize_error(__tarpc_service_e)); } }; - match request {$( - __ServerSideRequest::$fn_name(( $($arg,)* )) => { + match __tarpc_service_request {$( + __tarpc_service_ServerSideRequest::$fn_name(( $($arg,)* )) => { const SERIALIZE: fn(::std::result::Result<$out, $crate::WireError<$error>>) -> $crate::SerializeFuture = $crate::serialize_reply; const TO_APP: fn($error) -> $crate::WireError<$error> = $crate::WireError::App; @@ -455,23 +468,23 @@ macro_rules! service { let reply = FutureService::$fn_name(&self.0, $($arg),*); let reply = $crate::futures::Future::map_err(reply, TO_APP); let reply = $crate::futures::Future::then(reply, SERIALIZE); - return Reply::$fn_name(reply); + return __tarpc_service_Reply::$fn_name(reply); } )*} #[inline] - fn deserialize_error(e: E) -> $crate::SerializeFuture { - let err = $crate::WireError::ServerDeserialize::<$crate::util::Never>(e.to_string()); - $crate::serialize_reply(::std::result::Result::Err::<(), _>(err)) + fn deserialize_error(__tarpc_service_e: E) -> $crate::SerializeFuture { + let __tarpc_service_err = $crate::WireError::ServerDeserialize::<$crate::util::Never>(__tarpc_service_e.to_string()); + $crate::serialize_reply(::std::result::Result::Err::<(), _>(__tarpc_service_err)) } } } } } -/// Defines the blocking RPC service. Must be `Clone`, `Send`, and `'static`, -/// as required by `tokio_proto::NewService`. This is required so that the service can be used -/// to respond to multiple requests concurrently. + /// Defines the blocking RPC service. Must be `Clone`, `Send`, and `'static`, + /// as required by `tokio_proto::NewService`. This is required so that the service can be used + /// to respond to multiple requests concurrently. pub trait SyncService: ::std::marker::Send + ::std::clone::Clone + @@ -486,23 +499,27 @@ macro_rules! service { /// Provides a function for starting the service. This is a separate trait from /// `SyncService` to prevent collisions with the names of RPCs. pub trait SyncServiceExt: SyncService { - /// Registers the service with the given registry, listening on the given address. + /// Spawns the service, binding to the given address and running on + /// the default tokio `Loop`. fn listen(self, addr: L) - -> ::std::io::Result<$crate::tokio_proto::server::ServerHandle> + -> $crate::tokio_proto::server::ServerHandle where L: ::std::net::ToSocketAddrs { let service = __SyncServer { service: self, }; - return service.listen(addr); + return ::std::result::Result::unwrap($crate::futures::Future::wait(FutureServiceExt::listen(service, addr))); #[derive(Clone)] struct __SyncServer { service: S, } - impl FutureService for __SyncServer where S: SyncService { + #[allow(non_camel_case_types)] + impl<__tarpc_service_S> FutureService for __SyncServer<__tarpc_service_S> + where __tarpc_service_S: SyncService + { $( impl_snake_to_camel! { type $fn_name = @@ -517,13 +534,16 @@ macro_rules! service { // TODO(tikue): what do do if SyncService panics? unimplemented!() } + const UNIMPLEMENTED: fn($crate::futures::Canceled) -> $error = + unimplemented; + let (c, p) = $crate::futures::oneshot(); let service = self.clone(); ::std::thread::spawn(move || { let reply = SyncService::$fn_name(&service.service, $($arg),*); c.complete($crate::futures::IntoFuture::into_future(reply)); }); - let p = $crate::futures::Future::map_err(p, unimplemented as fn($crate::futures::Canceled) -> $error); + let p = $crate::futures::Future::map_err(p, UNIMPLEMENTED); $crate::futures::Future::flatten(p) } )* @@ -536,7 +556,7 @@ macro_rules! service { #[allow(unused)] #[derive(Clone, Debug)] -/// The client stub that makes RPC calls to the server. Exposes a blocking interface. + /// The client stub that makes RPC calls to the server. Exposes a blocking interface. pub struct SyncClient(FutureClient); impl $crate::sync::Connect for SyncClient { @@ -544,7 +564,9 @@ macro_rules! service { where A: ::std::net::ToSocketAddrs, { let mut addrs = try!(::std::net::ToSocketAddrs::to_socket_addrs(&addr)); - let addr = if let ::std::option::Option::Some(a) = ::std::iter::Iterator::next(&mut addrs) { + let addr = if let ::std::option::Option::Some(a) = + ::std::iter::Iterator::next(&mut addrs) + { a } else { return ::std::result::Result::Err( @@ -564,7 +586,9 @@ macro_rules! service { #[allow(unused)] $(#[$attr])* #[inline] - pub fn $fn_name(&self, $($arg: &$in_),*) -> ::std::result::Result<$out, $crate::Error<$error>> { + pub fn $fn_name(&self, $($arg: &$in_),*) + -> ::std::result::Result<$out, $crate::Error<$error>> + { let rpc = (self.0).$fn_name($($arg),*); $crate::futures::Future::wait(rpc) } @@ -573,7 +597,7 @@ macro_rules! service { #[allow(unused)] #[derive(Clone, Debug)] -/// The client stub that makes RPC calls to the server. Exposes a Future interface. + /// The client stub that makes RPC calls to the server. Exposes a Future interface. pub struct FutureClient($crate::Client); impl $crate::future::Connect for FutureClient { @@ -603,20 +627,20 @@ macro_rules! service { } } - let args = ($($arg,)*); - let req = &__ClientSideRequest::$fn_name(&args); - let req = match $crate::Packet::serialize(&req) { - ::std::result::Result::Err(e) => return Fut::Failed($crate::futures::failed($crate::Error::ClientSerialize(e))), - ::std::result::Result::Ok(req) => req, + let __tarpc_service_args = ($($arg,)*); + let __tarpc_service_req = &__ClientSideRequest::$fn_name(&__tarpc_service_args); + let __tarpc_service_req = match $crate::Packet::serialize(&__tarpc_service_req) { + ::std::result::Result::Err(__tarpc_service_e) => return Fut::Failed($crate::futures::failed($crate::Error::ClientSerialize(__tarpc_service_e))), + ::std::result::Result::Ok(__tarpc_service_req) => __tarpc_service_req, }; - let fut = $crate::tokio_service::Service::call(&self.0, req); - Fut::Called($crate::futures::Future::then(fut, move |msg| { - let msg: Vec = try!(msg); - let msg: ::std::result::Result<::std::result::Result<$out, $crate::WireError<$error>>, _> - = $crate::deserialize(&msg); - match msg { - ::std::result::Result::Ok(msg) => ::std::result::Result::Ok(try!(msg)), - ::std::result::Result::Err(e) => ::std::result::Result::Err($crate::Error::ClientDeserialize(e)), + let __tarpc_service_fut = $crate::tokio_service::Service::call(&self.0, __tarpc_service_req); + Fut::Called($crate::futures::Future::then(__tarpc_service_fut, move |__tarpc_service_msg| { + let __tarpc_service_msg: Vec = try!(__tarpc_service_msg); + let __tarpc_service_msg: ::std::result::Result<::std::result::Result<$out, $crate::WireError<$error>>, _> + = $crate::deserialize(&__tarpc_service_msg); + match __tarpc_service_msg { + ::std::result::Result::Ok(__tarpc_service_msg) => ::std::result::Result::Ok(try!(__tarpc_service_msg)), + ::std::result::Result::Err(__tarpc_service_e) => ::std::result::Result::Err($crate::Error::ClientDeserialize(__tarpc_service_e)), } })) } @@ -625,13 +649,16 @@ macro_rules! service { } } } - // allow dead code; we're just testing that the macro expansion compiles #[allow(dead_code)] #[cfg(test)] mod syntax_test { use util::Never; + service! { + #[deny(warnings)] + #[allow(non_snake_case)] + rpc TestCamelCaseDoesntConflict(); rpc hello() -> String; #[doc="attr"] rpc attr(s: String) -> String; @@ -659,10 +686,10 @@ mod functional_test { } mod sync { + use super::{SyncClient, SyncService, SyncServiceExt}; + use super::env_logger; use sync::Connect; use util::Never; - use super::env_logger; - use super::{SyncClient, SyncService, SyncServiceExt}; #[derive(Clone, Copy)] struct Server; @@ -679,7 +706,7 @@ mod functional_test { #[test] fn simple() { let _ = env_logger::init(); - let handle = Server.listen("localhost:0").unwrap(); + let handle = Server.listen("localhost:0"); let client = SyncClient::connect(handle.local_addr()).unwrap(); assert_eq!(3, client.add(&1, &2).unwrap()); assert_eq!("Hey, Tim.", client.hey(&"Tim".to_string()).unwrap()); @@ -687,7 +714,7 @@ mod functional_test { #[test] fn clone() { - let handle = Server.listen("localhost:0").unwrap(); + let handle = Server.listen("localhost:0"); let client1 = SyncClient::connect(handle.local_addr()).unwrap(); let client2 = client1.clone(); assert_eq!(3, client1.add(&1, &2).unwrap()); @@ -697,7 +724,7 @@ mod functional_test { #[test] fn other_service() { let _ = env_logger::init(); - let handle = Server.listen("localhost:0").unwrap(); + let handle = Server.listen("localhost:0"); let client = super::other_service::SyncClient::connect(handle.local_addr()).unwrap(); match client.foo().err().unwrap() { ::Error::ServerDeserialize(_) => {} // good @@ -708,24 +735,24 @@ mod functional_test { mod future { use future::Connect; - use util::Never; use futures::{Finished, Future, finished}; - use super::env_logger; use super::{FutureClient, FutureService, FutureServiceExt}; + use super::env_logger; + use util::Never; #[derive(Clone)] struct Server; impl FutureService for Server { - type Add = Finished; + type AddFut = Finished; - fn add(&self, x: i32, y: i32) -> Self::Add { + fn add(&self, x: i32, y: i32) -> Self::AddFut { finished(x + y) } - type Hey = Finished; + type HeyFut = Finished; - fn hey(&self, name: String) -> Self::Hey { + fn hey(&self, name: String) -> Self::HeyFut { finished(format!("Hey, {}.", name)) } } @@ -733,7 +760,7 @@ mod functional_test { #[test] fn simple() { let _ = env_logger::init(); - let handle = Server.listen("localhost:0").unwrap(); + let handle = Server.listen("localhost:0").wait().unwrap(); let client = FutureClient::connect(handle.local_addr()).wait().unwrap(); assert_eq!(3, client.add(&1, &2).wait().unwrap()); assert_eq!("Hey, Tim.", client.hey(&"Tim".to_string()).wait().unwrap()); @@ -742,7 +769,7 @@ mod functional_test { #[test] fn clone() { let _ = env_logger::init(); - let handle = Server.listen("localhost:0").unwrap(); + let handle = Server.listen("localhost:0").wait().unwrap(); let client1 = FutureClient::connect(handle.local_addr()).wait().unwrap(); let client2 = client1.clone(); assert_eq!(3, client1.add(&1, &2).wait().unwrap()); @@ -752,7 +779,7 @@ mod functional_test { #[test] fn other_service() { let _ = env_logger::init(); - let handle = Server.listen("localhost:0").unwrap(); + let handle = Server.listen("localhost:0").wait().unwrap(); let client = super::other_service::FutureClient::connect(handle.local_addr()).wait().unwrap(); match client.foo().wait().err().unwrap() { @@ -772,9 +799,9 @@ mod functional_test { struct ErrorServer; impl error_service::FutureService for ErrorServer { - type Bar = ::futures::Failed; + type BarFut = ::futures::Failed; - fn bar(&self) -> Self::Bar { + fn bar(&self) -> Self::BarFut { info!("Called bar"); failed("lol jk".into()) } @@ -788,7 +815,7 @@ mod functional_test { use self::error_service::*; let _ = env_logger::init(); - let handle = ErrorServer.listen("localhost:0").unwrap(); + let handle = ErrorServer.listen("localhost:0").wait().unwrap(); let client = FutureClient::connect(handle.local_addr()).wait().unwrap(); client.bar() .then(move |result| { diff --git a/src/snake_to_camel/Cargo.toml b/src/plugins/Cargo.toml similarity index 84% rename from src/snake_to_camel/Cargo.toml rename to src/plugins/Cargo.toml index d7f2d19..9d5726e 100644 --- a/src/snake_to_camel/Cargo.toml +++ b/src/plugins/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "snake_to_camel" +name = "tarpc-plugins" version = "0.1.0" authors = ["Tim Kuehn "] diff --git a/src/snake_to_camel/src/lib.rs b/src/plugins/src/lib.rs similarity index 55% rename from src/snake_to_camel/src/lib.rs rename to src/plugins/src/lib.rs index 113a605..f418425 100644 --- a/src/snake_to_camel/src/lib.rs +++ b/src/plugins/src/lib.rs @@ -6,15 +6,19 @@ extern crate rustc_plugin; extern crate syntax; use itertools::Itertools; +use rustc_plugin::Registry; use syntax::ast::{self, Ident, TraitRef, Ty, TyKind}; -use syntax::parse::{self, PResult, token}; +use syntax::ast::LitKind::Str; +use syntax::ast::MetaItemKind::NameValue; +use syntax::codemap::Spanned; +use syntax::ext::base::{ExtCtxt, MacResult, DummyResult, MacEager}; +use syntax::parse::{self, token, PResult}; use syntax::ptr::P; use syntax::parse::parser::{Parser, PathStyle}; +use syntax::parse::token::intern_and_get_ident; use syntax::tokenstream::TokenTree; -use syntax::ext::base::{DummyResult, ExtCtxt, MacEager, MacResult}; use syntax::ext::quote::rt::Span; use syntax::util::small_vector::SmallVector; -use rustc_plugin::Registry; fn snake_to_camel(cx: &mut ExtCtxt, sp: Span, tts: &[TokenTree]) -> Box { let mut parser = parse::new_parser_from_tts(cx.parse_sess(), cx.cfg(), tts.into()); @@ -34,7 +38,33 @@ fn snake_to_camel(cx: &mut ExtCtxt, sp: Span, tts: &[TokenTree]) -> Box { + let mut updated = (**meta_item).clone(); + if let NameValue(_, Spanned { node: Str(ref mut doc, _), .. }) = updated.node { + let updated_doc = doc.replace("{}", &old_ident); + *doc = intern_and_get_ident(&updated_doc); + } else { + unreachable!() + }; + Some(P(updated)) + } + _ => None, + }; + if let Some(updated) = updated { + *meta_item = updated; + } + } + MacEager::trait_items(SmallVector::one(item)) } @@ -78,11 +108,9 @@ fn ty_snake_to_camel(cx: &mut ExtCtxt, sp: Span, tts: &[TokenTree]) -> Box Box String { let ident_str = ident.to_string(); let mut camel_ty = String::new(); - // Find the first non-underscore and add it capitalized. - let mut chars = ident_str.chars(); + { + // Find the first non-underscore and add it capitalized. + let mut chars = ident_str.chars(); - // Find the first non-underscore char, uppercase it, and append it. - // Guaranteed to succeed because all idents must have at least one non-underscore char. - camel_ty.extend(chars.find(|&c| c != '_').unwrap().to_uppercase()); + // Find the first non-underscore char, uppercase it, and append it. + // Guaranteed to succeed because all idents must have at least one non-underscore char. + camel_ty.extend(chars.find(|&c| c != '_').unwrap().to_uppercase()); - // When we find an underscore, we remove it and capitalize the next char. To do this, - // we need to ensure the next char is not another underscore. - let mut chars = chars.coalesce(|c1, c2| { - if c1 == '_' && c2 == '_' { - Ok(c1) - } else { - Err((c1, c2)) - } - }); + // When we find an underscore, we remove it and capitalize the next char. To do this, + // we need to ensure the next char is not another underscore. + let mut chars = chars.coalesce(|c1, c2| { + if c1 == '_' && c2 == '_' { + Ok(c1) + } else { + Err((c1, c2)) + } + }); - while let Some(c) = chars.next() { - if c != '_' { - camel_ty.push(c); - } else { - if let Some(c) = chars.next() { - camel_ty.extend(c.to_uppercase()); + while let Some(c) = chars.next() { + if c != '_' { + camel_ty.push(c); + } else { + if let Some(c) = chars.next() { + camel_ty.extend(c.to_uppercase()); + } } } } + // The Fut suffix is hardcoded right now; this macro isn't really meant to be general-purpose. + camel_ty.push_str("Fut"); + *ident = Ident::with_empty_ctxt(token::intern(&camel_ty)); + ident_str } trait ParseTraitRef { diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 8a30a2a..8ec8032 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -3,22 +3,24 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use {futures, serde}; +use serde; +use futures::{self, Async}; use bincode::{SizeLimit, serde as bincode}; use std::{io, thread}; use std::collections::VecDeque; use std::sync::mpsc; -use tokio_core::{Loop, LoopHandle}; -use tokio_proto::io::{Readiness, Transport}; +use util::Never; +use tokio_core::io::{FramedIo, Io}; +use tokio_core::reactor::{Core, Remote}; use tokio_proto::pipeline::Frame; lazy_static! { #[doc(hidden)] - pub static ref LOOP_HANDLE: LoopHandle = { + pub static ref LOOP_HANDLE: Remote = { let (tx, rx) = mpsc::channel(); thread::spawn(move || { - let mut lupe = Loop::new().unwrap(); - tx.send(lupe.handle()).unwrap(); + let mut lupe = Core::new().unwrap(); + tx.send(lupe.handle().remote().clone()).unwrap(); // Run forever lupe.run(futures::empty::<(), !>()).unwrap(); }); @@ -78,46 +80,30 @@ impl TarpcTransport { } } -impl Readiness for TarpcTransport - where T: Readiness +impl FramedIo for TarpcTransport + where T: Io { - fn is_readable(&self) -> bool { - self.stream.is_readable() + type In = Frame; + type Out = Frame, Never, io::Error>; + + fn poll_read(&mut self) -> Async<()> { + self.stream.poll_read() } - fn is_writable(&self) -> bool { - // Always allow writing... this isn't really the best strategy to do in - // practice, but it is the easiest to implement in this case. The - // number of in-flight requests can be controlled using the pipeline - // dispatcher. - true + fn poll_write(&mut self) -> Async<()> { + self.stream.poll_write() } -} -impl Transport for TarpcTransport - where T: io::Read + io::Write + Readiness, -{ - type In = Frame; - type Out = Frame, io::Error>; - - fn read(&mut self) -> io::Result, io::Error>>> { + fn read(&mut self) -> io::Result, Never, io::Error>>> { self.read_state.next(&mut self.stream) } - fn write(&mut self, req: Frame) -> io::Result> { - match req { - Frame::Message(msg) => { - self.outbound.push_back(msg); - self.flush() - } - Frame::MessageWithBody(..) => unreachable!(), - Frame::Body(_) => unreachable!(), - Frame::Error(_) => unreachable!(), - Frame::Done => unreachable!(), - } + fn write(&mut self, req: Self::In) -> io::Result> { + self.outbound.push_back(req.unwrap_msg()); + self.flush() } - fn flush(&mut self) -> io::Result> { + fn flush(&mut self) -> io::Result> { writer::NextWriteState::next(&mut self.head, &mut self.stream, &mut self.outbound) } } diff --git a/src/protocol/reader.rs b/src/protocol/reader.rs index ddc5527..1b78939 100644 --- a/src/protocol/reader.rs +++ b/src/protocol/reader.rs @@ -5,39 +5,12 @@ use byteorder::{BigEndian, ReadBytesExt}; use bytes::{MutBuf, Take}; -use std::io::{self, Read}; +use futures::Async; +use std::io; use std::mem; -use super::MapNonBlock; +use tokio_proto::TryRead; use tokio_proto::pipeline::Frame; - -pub trait TryRead { - fn try_read_buf(&mut self, buf: &mut B) -> io::Result> - where Self: Sized - { - // Reads the length of the slice supplied by buf.mut_bytes into the buffer - // This is not guaranteed to consume an entire datagram or segment. - // If your protocol is msg based (instead of continuous stream) you should - // ensure that your buffer is large enough to hold an entire segment - // (1532 bytes if not jumbo frames) - let res = self.try_read(unsafe { buf.mut_bytes() }); - - if let Ok(Some(cnt)) = res { - unsafe { - buf.advance(cnt); - } - } - - res - } - - fn try_read(&mut self, buf: &mut [u8]) -> io::Result>; -} - -impl TryRead for T { - fn try_read(&mut self, dst: &mut [u8]) -> io::Result> { - self.read(dst).map_non_block() - } -} +use util::Never; #[derive(Debug)] pub struct U64Reader { @@ -74,7 +47,7 @@ enum NextReadAction { Stop(Result), } -trait MutBufExt: MutBuf { +trait MutBufExt: MutBuf + Sized { type Inner; fn take(&mut self) -> Self::Inner; @@ -138,7 +111,7 @@ impl ReadState { pub fn next(&mut self, socket: &mut R) - -> io::Result, io::Error>>> { + -> io::Result, Never, io::Error>>> { loop { let next = match *self { ReadState::Len(ref mut len) => { @@ -151,7 +124,7 @@ impl ReadState { NextReadState::Next(ReadState::Data(Take::new(buf, len as usize))) } - Err(e) => return Ok(Some(Frame::Error(e))), + Err(e) => return Ok(Async::Ready(Frame::Error(e))), } } } @@ -162,18 +135,18 @@ impl ReadState { NextReadAction::Stop(result) => { match result { Ok(buf) => NextReadState::Reset(buf), - Err(e) => return Ok(Some(Frame::Error(e))), + Err(e) => return Ok(Async::Ready(Frame::Error(e))), } } } } }; match next { - NextReadState::Same => return Ok(None), + NextReadState::Same => return Ok(Async::NotReady), NextReadState::Next(next) => *self = next, NextReadState::Reset(packet) => { *self = ReadState::init(); - return Ok(Some(Frame::Message(packet))); + return Ok(Async::Ready(Frame::Message(packet))); } } } diff --git a/src/protocol/writer.rs b/src/protocol/writer.rs index c122389..3935654 100644 --- a/src/protocol/writer.rs +++ b/src/protocol/writer.rs @@ -7,38 +7,12 @@ use bincode::SizeLimit; use bincode::serde as bincode; use byteorder::{BigEndian, WriteBytesExt}; use bytes::Buf; +use futures::Async; use serde::Serialize; use std::collections::VecDeque; -use std::mem; use std::io::{self, Cursor}; - -mod try_write { - use bytes::Buf; - use protocol::MapNonBlock; - use std::io::{self, Write}; - - pub trait TryWrite { - fn try_write_buf(&mut self, buf: &mut B) -> io::Result> - where Self: Sized - { - let res = self.try_write(buf.bytes()); - - if let Ok(Some(cnt)) = res { - buf.advance(cnt); - } - - res - } - - fn try_write(&mut self, buf: &[u8]) -> io::Result>; - } - - impl TryWrite for T { - fn try_write(&mut self, src: &[u8]) -> io::Result> { - self.write(src).map_non_block() - } - } -} +use std::mem; +use tokio_proto::TryWrite; /// The means of communication between client and server. #[derive(Clone, Debug)] @@ -74,7 +48,7 @@ enum NextWriteAction { trait BufExt: Buf + Sized { /// Writes data to stream. Returns Ok(true) if all data has been written or Ok(false) if /// there's still data to write. - fn try_write(&mut self, stream: &mut W) -> io::Result { + fn try_write(&mut self, stream: &mut W) -> io::Result { while let Some(bytes_written) = stream.try_write_buf(self)? { debug!("Writer: wrote {} bytes; {} remaining.", bytes_written, @@ -100,10 +74,10 @@ pub enum NextWriteState { } impl NextWriteState { - pub fn next(state: &mut Option, + pub fn next(state: &mut Option, socket: &mut W, outbound: &mut VecDeque) - -> io::Result> { + -> io::Result> { loop { let update = match *state { None => { @@ -113,13 +87,13 @@ impl NextWriteState { debug_assert!(size >= mem::size_of::() as u64); NextWriteState::Next(packet) } - None => return Ok(Some(())), + None => return Ok(Async::Ready(())), } } Some(ref mut packet) => { - match packet.buf.try_write(socket)? { + match BufExt::try_write(&mut packet.buf, socket)? { NextWriteAction::Stop => NextWriteState::Nothing, - NextWriteAction::Continue => return Ok(None), + NextWriteAction::Continue => return Ok(Async::NotReady), } } }; diff --git a/src/server.rs b/src/server.rs index 5ba6afb..48ca3f8 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,7 +4,7 @@ // This file may not be copied, modified, or distributed except according to those terms. use errors::{SerializableError, WireError}; -use futures::{self, Future}; +use futures::{self, Async, Future}; use futures::stream::Empty; use futures_cpupool::{CpuFuture, CpuPool}; use protocol::{LOOP_HANDLE, TarpcTransport}; @@ -13,28 +13,44 @@ use serde::Serialize; use std::io; use std::net::ToSocketAddrs; use tokio_proto::pipeline; -use tokio_proto::NewService; use tokio_proto::server::{self, ServerHandle}; +use tokio_service::NewService; +use util::Never; -/// Start a Tarpc service listening on the given address. -pub fn listen(addr: A, new_service: T) -> io::Result - where T: NewService, - Resp = pipeline::Message>, +/// Spawns a service that binds to the given address and runs on the default tokio `Loop`. +pub fn listen(addr: A, new_service: T) -> ListenFuture + where T: NewService, + Response = pipeline::Message>, Error = io::Error> + Send + 'static, A: ToSocketAddrs { - let mut addrs = addr.to_socket_addrs()?; - let addr = if let Some(a) = addrs.next() { - a - } else { - return Err(io::Error::new(io::ErrorKind::AddrNotAvailable, - "`ToSocketAddrs::to_socket_addrs` returned an empty iterator.")); - }; + // TODO(tikue): don't use ToSocketAddrs, or don't unwrap. + let addr = addr.to_socket_addrs().unwrap().next().unwrap(); - server::listen(LOOP_HANDLE.clone(), addr, move |stream| { - pipeline::Server::new(new_service.new_service()?, TarpcTransport::new(stream)) - }) - .wait() + let (tx, rx) = futures::oneshot(); + LOOP_HANDLE.spawn(move |handle| { + Ok(tx.complete(server::listen(handle, addr, move |stream| { + pipeline::Server::new(new_service.new_service()?, TarpcTransport::new(stream)) + }).unwrap())) + }); + ListenFuture { inner: rx } +} + +/// A future that resolves to a `ServerHandle`. +pub struct ListenFuture { + inner: futures::Oneshot, +} + +impl Future for ListenFuture { + type Item = ServerHandle; + type Error = Never; + + fn poll(&mut self) -> futures::Poll { + match self.inner.poll().unwrap() { + Async::Ready(server_handle) => Ok(Async::Ready(server_handle)), + Async::NotReady => Ok(Async::NotReady), + } + } } /// Returns a future containing the serialized reply. @@ -48,23 +64,22 @@ pub fn serialize_reply SerializeFuture { POOL.spawn(futures::lazy(move || { - let packet = match Packet::serialize(&result) { - Ok(packet) => packet, - Err(e) => { - let err: Result> = - Err(WireError::ServerSerialize(e.to_string())); - Packet::serialize(&err).unwrap() - } - }; - futures::finished(pipeline::Message::WithoutBody(packet)) - })) + let packet = match Packet::serialize(&result) { + Ok(packet) => packet, + Err(e) => { + let err: Result> = Err(WireError::ServerSerialize(e.to_string())); + Packet::serialize(&err).unwrap() + } + }; + futures::finished(pipeline::Message::WithoutBody(packet)) + })) } #[doc(hidden)] pub type SerializeFuture = CpuFuture; #[doc(hidden)] -pub type SerializedReply = pipeline::Message>; +pub type SerializedReply = pipeline::Message>; lazy_static! { static ref POOL: CpuPool = { CpuPool::new_num_cpus() };