From 7aabfb3c141945d3efa15f71651d8251b920ad63 Mon Sep 17 00:00:00 2001 From: Tim Date: Sun, 4 Sep 2016 16:09:50 -0700 Subject: [PATCH] Rewrite using tokio (#44) * Rewrite tarpc on top of tokio. * Add examples * Move error types to their own module. Also, cull unused error variants. * Remove unused fn * Remove CanonicalRpcError* types. They're 100% useless. * Track tokio master (WIP) * The great error revamp. Removed the canonical rpc error type. Instead, the user declares the error type for each rpc: In the above example, the error type is Baz. Declaring an error is optional; if none is specified, it defaults to Never, a convenience struct that wraps the never type (exclamation mark) to impl Serialize, Deserialize, Error, etc. Also adds the convenience type StringError for easily using a String as an error type. * Add missing license header * Minor cleanup * Rename StringError => Message * Create a sync::Connect trait. Along with this, the existing Connect trait moves to future::Connect. The future and sync modules are reexported from the crate root. Additionally, the utility errors Never and Message are no longer reexported from the crate root. * Update readme * Track tokio/futures master. Add a Spawn utility trait to replace the removed forget. * Fix pre-push hook * Add doc comment to SyncServiceExt. * Fix up some documentation * Track tokio-proto master * Don't set tcp nodelay * Make future::Connect take an associated type for the future. * Unbox FutureClient::connect return type * Use type alias instead of newtype struct for ClientFuture * Fix benches/latency.rs * Write a plugin to convert lower_snake_case idents/types to UpperCamelCase. Use it to add associated types to FutureService instead of boxing the return futures. * Specify plugin = true in snake_to_camel/Cargo.toml. Weird things happen otherwise. * Add clippy.toml --- .gitignore | 2 + .travis.yml | 10 +- Cargo.toml | 37 ++ README.md | 95 ++-- benches/latency.rs | 45 ++ clippy.toml | 1 + examples/concurrency.rs | 122 +++++ examples/pubsub.rs | 132 +++++ examples/readme.rs | 34 ++ examples/readme2.rs | 56 ++ examples/server_calling_server.rs | 73 +++ examples/throughput.rs | 100 ++++ examples/two_clients.rs | 78 +++ hooks/pre-commit | 11 +- hooks/pre-push | 45 +- tarpc/rustfmt.toml => rustfmt.toml | 0 src/client.rs | 108 ++++ src/errors.rs | 144 ++++++ src/lib.rs | 116 +++++ src/macros.rs | 795 +++++++++++++++++++++++++++++ src/protocol/mod.rs | 123 +++++ src/protocol/reader.rs | 181 +++++++ src/protocol/writer.rs | 134 +++++ src/server.rs | 71 +++ src/snake_to_camel/Cargo.toml | 11 + src/snake_to_camel/src/lib.rs | 149 ++++++ src/util.rs | 83 +++ tarpc/Cargo.toml | 23 - tarpc/src/lib.rs | 66 --- tarpc/src/macros.rs | 632 ----------------------- tarpc/src/protocol/client.rs | 269 ---------- tarpc/src/protocol/mod.rs | 249 --------- tarpc/src/protocol/packet.rs | 78 --- tarpc/src/protocol/server.rs | 280 ---------- tarpc/src/transport/mod.rs | 91 ---- tarpc/src/transport/tcp.rs | 77 --- tarpc/src/transport/unix.rs | 72 --- tarpc_examples/Cargo.toml | 9 - tarpc_examples/src/lib.rs | 74 --- 39 files changed, 2688 insertions(+), 1988 deletions(-) create mode 100644 Cargo.toml create mode 100644 benches/latency.rs create mode 100644 clippy.toml create mode 100644 examples/concurrency.rs create mode 100644 examples/pubsub.rs create mode 100644 examples/readme.rs create mode 100644 examples/readme2.rs create mode 100644 examples/server_calling_server.rs create mode 100644 examples/throughput.rs create mode 100644 examples/two_clients.rs rename tarpc/rustfmt.toml => rustfmt.toml (100%) create mode 100644 src/client.rs create mode 100644 src/errors.rs create mode 100644 src/lib.rs create mode 100644 src/macros.rs create mode 100644 src/protocol/mod.rs create mode 100644 src/protocol/reader.rs create mode 100644 src/protocol/writer.rs create mode 100644 src/server.rs create mode 100644 src/snake_to_camel/Cargo.toml create mode 100644 src/snake_to_camel/src/lib.rs create mode 100644 src/util.rs delete mode 100644 tarpc/Cargo.toml delete mode 100644 tarpc/src/lib.rs delete mode 100644 tarpc/src/macros.rs delete mode 100644 tarpc/src/protocol/client.rs delete mode 100644 tarpc/src/protocol/mod.rs delete mode 100644 tarpc/src/protocol/packet.rs delete mode 100644 tarpc/src/protocol/server.rs delete mode 100644 tarpc/src/transport/mod.rs delete mode 100644 tarpc/src/transport/tcp.rs delete mode 100644 tarpc/src/transport/unix.rs delete mode 100644 tarpc_examples/Cargo.toml delete mode 100644 tarpc_examples/src/lib.rs diff --git a/.gitignore b/.gitignore index d4f917d..cd2dea5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ target Cargo.lock +.cargo *.swp +*.bk diff --git a/.travis.yml b/.travis.yml index a400d11..9205d1d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,10 +2,11 @@ language: rust sudo: false rust: - - stable - - beta - nightly +os: + - linux + addons: apt: packages: @@ -20,11 +21,10 @@ before_script: script: - | - (cd tarpc && travis-cargo build) && - (cd tarpc && travis-cargo test) + travis-cargo build && travis-cargo test after_success: -- (cd tarpc && travis-cargo coveralls --no-sudo) +- travis-cargo coveralls --no-sudo env: global: diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..4dcf96c --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "tarpc" +version = "0.6.0" +authors = ["Adam Wright ", "Tim Kuehn "] +license = "MIT" +documentation = "https://docs.rs/tarpc" +homepage = "https://github.com/google/tarpc" +repository = "https://github.com/google/tarpc" +keywords = ["rpc", "protocol", "remote", "procedure", "serialize"] +readme = "README.md" +description = "An RPC framework for Rust with a focus on ease of use." + +[dependencies] +bincode = "0.6" +byteorder = "0.5" +bytes = "0.3" +futures = { git = "https://github.com/alexcrichton/futures-rs" } +futures-cpupool = { git = "https://github.com/alexcrichton/futures-rs" } +lazy_static = "0.2" +log = "0.3" +scoped-pool = "1.0" +serde = "0.8" +serde_macros = "0.8" +snake_to_camel = { path = "src/snake_to_camel" } +take = "0.1" +tokio-service = { git = "https://github.com/tokio-rs/tokio-service" } +tokio-proto = { git = "https://github.com/tokio-rs/tokio-proto" } +tokio-core = { git = "https://github.com/tokio-rs/tokio-core" } + +[dev-dependencies] +chrono = "0.2" +env_logger = "0.3" + +[features] +unstable = ["serde/unstable"] + +[workspace] diff --git a/README.md b/README.md index 273559d..fca5e47 100644 --- a/README.md +++ b/README.md @@ -7,18 +7,28 @@ *Disclaimer*: This is not an official Google product. -tarpc is an RPC framework for rust with a focus on ease of use. Defining a service can be done in -just a few lines of code, and most of the boilerplate of writing a server is taken care of for you. +tarpc is an RPC framework for rust with a focus on ease of use. Defining a +service can be done in just a few lines of code, and most of the boilerplate of +writing a server is taken care of for you. [Documentation](https://docs.rs/tarpc) ## What is an RPC framework? -"RPC" stands for "Remote Procedure Call," a function call where the work of producing the return -value is being done somewhere else. When an rpc function is invoked, behind the scenes the function -contacts some other process somewhere and asks them to compute the function instead. The original -function then returns the value produced by that other server. +"RPC" stands for "Remote Procedure Call," a function call where the work of +producing the return value is being done somewhere else. When an rpc function is +invoked, behind the scenes the function contacts some other process somewhere +and asks them to evaluate the function instead. The original function then +returns the value produced by the other process. -[More information](https://www.cs.cf.ac.uk/Dave/C/node33.html) +RPC frameworks are a fundamental building block of most microservices-oriented +architectures. Two well-known ones are [gRPC](http://www.grpc.io) and +[Cap'n Proto](https://capnproto.org/). + +tarpc differentiates itself from other RPC frameworks by defining the schema in code, +rather than in a separate language such as .proto. This means there's no separate compilation +process, and no cognitive context switching between different languages. Additionally, it +works with the community-backed library serde: any serde-serializable type can be used as +arguments to tarpc fns. ## Usage Add to your `Cargo.toml` dependencies: @@ -29,57 +39,70 @@ tarpc = "0.6" ## Example ```rust +#![feature(conservative_impl_trait)] // required by `FutureClient` (not used in this example) + +extern crate futures; #[macro_use] extern crate tarpc; -mod hello_service { - service! { - rpc hello(name: String) -> String; - } -} -use hello_service::Service as HelloService; +use tarpc::util::Never; +use tarpc::sync::Connect; +service! { + rpc hello(name: String) -> String; +} + +#[derive(Clone)] struct HelloServer; -impl HelloService for HelloServer { - fn hello(&self, name: String) -> String { - format!("Hello, {}!", name) + +impl SyncService for HelloServer { + fn hello(&self, name: String) -> Result { + Ok(format!("Hello, {}!", name)) } } fn main() { let addr = "localhost:10000"; - let server_handle = HelloServer.spawn(addr).unwrap(); - let client = hello_service::Client::new(addr).unwrap(); - assert_eq!("Hello, Mom!", client.hello("Mom".into()).unwrap()); - drop(client); - server_handle.shutdown(); + let _server = HelloServer.listen(addr).unwrap(); + let client = SyncClient::connect(addr).unwrap(); + println!("{}", client.hello(&"Mom".to_string()).unwrap()); } ``` -The `service!` macro expands to a collection of items that collectively form an rpc service. In the -above example, the macro is called within the `hello_service` module. This module will contain a -`Client` (and `AsyncClient`) type, and a `Service` trait. The trait provides default `fn`s for -starting the service: `spawn` and `spawn_with_config`, which start the service listening over an -arbitrary transport. A `Client` (or `AsyncClient`) can connect to such a service. These generated -types make it easy and ergonomic to write servers without dealing with sockets or serialization -directly. See the tarpc_examples package for more sophisticated examples. +The `service!` macro expands to a collection of items that form an +rpc service. In the above example, the macro is called within the +`hello_service` module. This module will contain `SyncClient`, `AsyncClient`, +and `FutureClient` types, and `SyncService` and `AsyncService` traits. There is +also a `ServiceExt` trait that provides starter `fn`s for services, with an +umbrella impl for all services. These generated types make it easy and +ergonomic to write servers without dealing with sockets or serialization +directly. Simply implement one of the generated traits, and you're off to the +races! See the tarpc_examples package for more examples. ## Documentation Use `cargo doc` as you normally would to see the documentation created for all items expanded by a `service!` invocation. ## Additional Features -- Connect over any transport that `impl`s the `Transport` trait. +- Configurable server rate limiting. +- Automatic client retries with exponential backoff when server is busy. - Concurrent requests from a single client. -- Any type that `impl`s `serde`'s `Serialize` and `Deserialize` can be used in the rpc signatures. -- Attributes can be specified on rpc methods. These will be included on both the `Service` trait - methods as well as on the `Client`'s stub methods. -- Just like regular fns, the return type can be left off when it's `-> ()`. -- Arg-less rpc's are also allowed. +- Backed by an mio `EventLoop`, protecting services (including `SyncService`s) + from slowloris attacks. +- Run any number of clients on a single client event loop. +- Run any number of services on a single service event loop. +- Configure clients and services to run on a custom event loop, defaulting to + the global event loop. +- Any type that `impl`s `serde`'s `Serialize` and `Deserialize` can be used in + rpc signatures. +- Attributes can be specified on rpc methods. These will be included on both the + services' trait methods as well as on the clients' stub methods. -## Planned Improvements (actively being worked on) +## Gaps/Potential Improvements (not necessarily actively being worked on) +- Multithreaded support. +- Load balancing +- Service discovery - Automatically reconnect on the client side when the connection cuts out. -- Support asynchronous server implementations (currently thread per connection). - Support generic serialization protocols. ## Contributing diff --git a/benches/latency.rs b/benches/latency.rs new file mode 100644 index 0000000..0a47203 --- /dev/null +++ b/benches/latency.rs @@ -0,0 +1,45 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the MIT License, . +// This file may not be copied, modified, or distributed except according to those terms. + +#![feature(plugin, conservative_impl_trait, test)] +#![plugin(snake_to_camel)] + +#[macro_use] +extern crate tarpc; +#[cfg(test)] +extern crate test; +extern crate env_logger; +extern crate futures; + +#[cfg(test)] +use test::Bencher; +use tarpc::sync::Connect; +use tarpc::util::Never; + +service! { + rpc ack(); +} + +#[derive(Clone)] +struct Server; + +impl FutureService for Server { + type Ack = futures::Finished<(), Never>; + fn ack(&self) -> Self::Ack { + futures::finished(()) + } +} + +#[cfg(test)] +#[bench] +fn latency(bencher: &mut Bencher) { + let _ = env_logger::init(); + let server = Server.listen("localhost:0").unwrap(); + let client = SyncClient::connect(server.local_addr()).unwrap(); + + bencher.iter(|| { + client.ack().unwrap(); + }); +} diff --git a/clippy.toml b/clippy.toml new file mode 100644 index 0000000..2d90603 --- /dev/null +++ b/clippy.toml @@ -0,0 +1 @@ +doc-valid-idents = ["gRPC"] diff --git a/examples/concurrency.rs b/examples/concurrency.rs new file mode 100644 index 0000000..17b85d5 --- /dev/null +++ b/examples/concurrency.rs @@ -0,0 +1,122 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the MIT License, . +// This file may not be copied, modified, or distributed except according to those terms. + +#![feature(inclusive_range_syntax, conservative_impl_trait, plugin)] +#![plugin(snake_to_camel)] + +extern crate chrono; +extern crate env_logger; +extern crate futures; +#[macro_use] +extern crate log; +#[macro_use] +extern crate tarpc; +extern crate futures_cpupool; + +use futures::Future; +use futures_cpupool::{CpuFuture, CpuPool}; +use std::thread; +use std::time::{Duration, Instant, SystemTime}; +use tarpc::future::{Connect}; +use tarpc::util::Never; + +service! { + rpc read(size: u32) -> Vec; +} + +#[derive(Clone)] +struct Server(CpuPool); + +impl Server { + fn new() -> Self { + Server(CpuPool::new_num_cpus()) + } +} + +impl FutureService for Server { + type Read = CpuFuture, Never>; + + fn read(&self, size: u32) -> Self::Read { + self.0 + .spawn(futures::lazy(move || { + let mut vec: Vec = Vec::with_capacity(size as usize); + for i in 0..size { + vec.push((i % 1 << 8) as u8); + } + futures::finished::<_, Never>(vec) + })) + } +} + +fn run_once(clients: &[FutureClient], concurrency: u32, print: bool) { + let _ = env_logger::init(); + + let start = Instant::now(); + let futures: Vec<_> = clients.iter() + .cycle() + .take(concurrency as usize) + .map(|client| { + let start = SystemTime::now(); + let future = client.read(&CHUNK_SIZE).map(move |_| start.elapsed().unwrap()); + thread::yield_now(); + future + }) + .collect(); + + let latencies: Vec<_> = futures.into_iter() + .map(|future| { + future.wait().unwrap() + }) + .collect(); + let total_time = start.elapsed(); + + let sum_latencies = latencies.iter().fold(Duration::new(0, 0), |sum, &dur| sum + dur); + let mean = sum_latencies / latencies.len() as u32; + let min_latency = *latencies.iter().min().unwrap(); + let max_latency = *latencies.iter().max().unwrap(); + + if print { + println!("{} requests => Mean={}µs, Min={}µs, Max={}µs, Total={}µs", + latencies.len(), + mean.microseconds(), + min_latency.microseconds(), + max_latency.microseconds(), + total_time.microseconds()); + } +} + +trait Microseconds { + fn microseconds(&self) -> i64; +} + +impl Microseconds for Duration { + fn microseconds(&self) -> i64 { + chrono::Duration::from_std(*self) + .unwrap() + .num_microseconds() + .unwrap() + } +} + +const CHUNK_SIZE: u32 = 1 << 10; +const MAX_CONCURRENCY: u32 = 100; + +fn main() { + let _ = env_logger::init(); + let server = Server::new().listen("localhost:0").unwrap(); + println!("Server listening on {}.", server.local_addr()); + let clients: Vec<_> = (1...5) + .map(|i| { + println!("Client {} connecting...", i); + FutureClient::connect(server.local_addr()).wait().unwrap() + }) + .collect(); + println!("Starting..."); + + run_once(&clients, MAX_CONCURRENCY, false); + for concurrency in 1...MAX_CONCURRENCY { + run_once(&clients, concurrency, true); + } +} diff --git a/examples/pubsub.rs b/examples/pubsub.rs new file mode 100644 index 0000000..3d7995f --- /dev/null +++ b/examples/pubsub.rs @@ -0,0 +1,132 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the MIT License, . +// This file may not be copied, modified, or distributed except according to those terms. + +#![feature(conservative_impl_trait, plugin)] +#![plugin(snake_to_camel)] + +extern crate env_logger; +extern crate futures; +#[macro_use] +extern crate tarpc; +extern crate tokio_proto as tokio; + +use futures::{BoxFuture, Future}; +use publisher::FutureServiceExt as PublisherExt; +use subscriber::FutureServiceExt as SubscriberExt; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::Duration; +use tarpc::util::{Never, Message}; +use tarpc::future::Connect as Fc; +use tarpc::sync::Connect as Sc; + +pub mod subscriber { + service! { + rpc receive(message: String); + } +} + +pub mod publisher { + use std::net::SocketAddr; + use tarpc::util::Message; + + service! { + rpc broadcast(message: String); + rpc subscribe(id: u32, address: SocketAddr) | Message; + rpc unsubscribe(id: u32); + } +} + +#[derive(Clone, Debug)] +struct Subscriber { + id: u32, + publisher: publisher::SyncClient, +} + +impl subscriber::FutureService for Subscriber { + type Receive = futures::Finished<(), Never>; + fn receive(&self, message: String) -> Self::Receive { + println!("{} received message: {}", self.id, message); + futures::finished(()) + } +} + +impl Subscriber { + fn new(id: u32, publisher: publisher::SyncClient) -> tokio::server::ServerHandle { + let subscriber = Subscriber { + id: id, + publisher: publisher.clone(), + } + .listen("localhost:0") + .unwrap(); + publisher.subscribe(&id, &subscriber.local_addr()).unwrap(); + subscriber + } +} + +#[derive(Clone, Debug)] +struct Publisher { + clients: Arc>>, +} + +impl Publisher { + fn new() -> Publisher { + Publisher { clients: Arc::new(Mutex::new(HashMap::new())) } + } +} + +impl publisher::FutureService for Publisher { + type Broadcast = BoxFuture<(), Never>; + + fn broadcast(&self, message: String) -> Self::Broadcast { + futures::collect(self.clients + .lock() + .unwrap() + .values() + // Ignore failing subscribers. + .map(move |client| client.receive(&message).then(|_| Ok(()))) + .collect::>()) + .map(|_| ()) + .boxed() + } + + type Subscribe = BoxFuture<(), Message>; + + fn subscribe(&self, id: u32, address: SocketAddr) -> BoxFuture<(), Message> { + let clients = self.clients.clone(); + subscriber::FutureClient::connect(&address) + .map(move |subscriber| { + println!("Subscribing {}.", id); + clients.lock().unwrap().insert(id, subscriber); + () + }) + .map_err(|e| e.to_string().into()) + .boxed() + } + + type Unsubscribe = BoxFuture<(), Never>; + + fn unsubscribe(&self, id: u32) -> BoxFuture<(), Never> { + println!("Unsubscribing {}", id); + self.clients.lock().unwrap().remove(&id).unwrap(); + futures::finished(()).boxed() + } +} + +fn main() { + let _ = env_logger::init(); + let publisher = Publisher::new().listen("localhost:0").unwrap(); + let publisher = publisher::SyncClient::connect(publisher.local_addr()).unwrap(); + let _subscriber1 = Subscriber::new(0, publisher.clone()); + let _subscriber2 = Subscriber::new(1, publisher.clone()); + + println!("Broadcasting..."); + publisher.broadcast(&"hello to all".to_string()).unwrap(); + publisher.unsubscribe(&1).unwrap(); + publisher.broadcast(&"hello again".to_string()).unwrap(); + thread::sleep(Duration::from_millis(300)); +} diff --git a/examples/readme.rs b/examples/readme.rs new file mode 100644 index 0000000..62a2511 --- /dev/null +++ b/examples/readme.rs @@ -0,0 +1,34 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the MIT License, . +// This file may not be copied, modified, or distributed except according to those terms. + +#![feature(conservative_impl_trait, plugin)] +#![plugin(snake_to_camel)] + +extern crate futures; +#[macro_use] +extern crate tarpc; + +use tarpc::util::Never; +use tarpc::sync::Connect; + +service! { + rpc hello(name: String) -> String; +} + +#[derive(Clone)] +struct HelloServer; + +impl SyncService for HelloServer { + fn hello(&self, name: String) -> Result { + Ok(format!("Hello, {}!", name)) + } +} + +fn main() { + let addr = "localhost:10000"; + let _server = HelloServer.listen(addr).unwrap(); + let client = SyncClient::connect(addr).unwrap(); + println!("{}", client.hello(&"Mom".to_string()).unwrap()); +} diff --git a/examples/readme2.rs b/examples/readme2.rs new file mode 100644 index 0000000..a71cef4 --- /dev/null +++ b/examples/readme2.rs @@ -0,0 +1,56 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the MIT License, . +// This file may not be copied, modified, or distributed except according to those terms. + +#![feature(conservative_impl_trait, custom_derive, custom_derive, plugin)] +#![plugin(serde_macros, snake_to_camel)] + +extern crate futures; +#[macro_use] +extern crate tarpc; +extern crate serde; + +use std::error::Error; +use std::fmt; +use tarpc::sync::Connect; + +service! { + rpc hello(name: String) -> String | NoNameGiven; +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct NoNameGiven; + +impl fmt::Display for NoNameGiven { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.description()) + } +} + +impl Error for NoNameGiven { + fn description(&self) -> &str { + r#"The empty String, "", is not a valid argument to rpc `hello`."# + } +} + +#[derive(Clone)] +struct HelloServer; + +impl SyncService for HelloServer { + fn hello(&self, name: String) -> Result { + if name == "" { + Err(NoNameGiven) + } else { + Ok(format!("Hello, {}!", name)) + } + } +} + +fn main() { + let addr = "localhost:10000"; + let _server = HelloServer.listen(addr).unwrap(); + let client = SyncClient::connect(addr).unwrap(); + println!("{}", client.hello(&"Mom".to_string()).unwrap()); + println!("{}", client.hello(&"".to_string()).unwrap_err()); +} diff --git a/examples/server_calling_server.rs b/examples/server_calling_server.rs new file mode 100644 index 0000000..a917db6 --- /dev/null +++ b/examples/server_calling_server.rs @@ -0,0 +1,73 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the MIT License, . +// This file may not be copied, modified, or distributed except according to those terms. + +#![feature(conservative_impl_trait, plugin)] +#![plugin(snake_to_camel)] + +#[macro_use] +extern crate tarpc; +extern crate futures; + +use futures::{BoxFuture, Future}; +use add::{FutureService as AddService, FutureServiceExt as AddExt}; +use double::{FutureService as DoubleService, FutureServiceExt as DoubleExt}; +use tarpc::util::{Never, Message}; +use tarpc::future::Connect as Fc; +use tarpc::sync::Connect as Sc; + +pub mod add { + service! { + /// Add two ints together. + rpc add(x: i32, y: i32) -> i32; + } +} + +pub mod double { + use tarpc::util::Message; + + service! { + /// 2 * x + rpc double(x: i32) -> i32 | Message; + } +} + +#[derive(Clone)] +struct AddServer; + +impl AddService for AddServer { + type Add = futures::Finished; + + fn add(&self, x: i32, y: i32) -> Self::Add { + futures::finished(x + y) + } +} + +#[derive(Clone)] +struct DoubleServer { + client: add::FutureClient, +} + +impl DoubleService for DoubleServer { + type Double = BoxFuture; + + fn double(&self, x: i32) -> Self::Double { + self.client + .add(&x, &x) + .map_err(|e| e.to_string().into()) + .boxed() + } +} + +fn main() { + let add = AddServer.listen("localhost:0").unwrap(); + let add_client = add::FutureClient::connect(add.local_addr()).wait().unwrap(); + let double = DoubleServer { client: add_client }; + let double = double.listen("localhost:0").unwrap(); + + let double_client = double::SyncClient::connect(double.local_addr()).unwrap(); + for i in 0..5 { + println!("{:?}", double_client.double(&i).unwrap()); + } +} diff --git a/examples/throughput.rs b/examples/throughput.rs new file mode 100644 index 0000000..36a0a78 --- /dev/null +++ b/examples/throughput.rs @@ -0,0 +1,100 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the MIT License, . +// This file may not be copied, modified, or distributed except according to those terms. + +#![feature(conservative_impl_trait, plugin)] +#![plugin(snake_to_camel)] + +#[macro_use] +extern crate lazy_static; +#[macro_use] +extern crate tarpc; +extern crate env_logger; +extern crate futures; + +use std::sync::Arc; +use std::time; +use std::net; +use std::thread; +use std::io::{Read, Write, stdout}; +use tarpc::util::Never; +use tarpc::sync::Connect; + +lazy_static! { + static ref BUF: Arc> = Arc::new(gen_vec(CHUNK_SIZE as usize)); +} + +fn gen_vec(size: usize) -> Vec { + let mut vec: Vec = Vec::with_capacity(size); + for i in 0..size { + vec.push((i % 1 << 8) as u8); + } + vec +} + +service! { + rpc read() -> Arc>; +} + +#[derive(Clone)] +struct Server; + +impl FutureService for Server { + type Read = futures::Finished>, Never>; + + fn read(&self) -> Self::Read { + futures::finished(BUF.clone()) + } +} + +const CHUNK_SIZE: u32 = 1 << 19; + +fn bench_tarpc(target: u64) { + let handle = Server.listen("localhost:0").unwrap(); + let client = SyncClient::connect(handle.local_addr()).unwrap(); + let start = time::Instant::now(); + let mut nread = 0; + while nread < target { + nread += client.read().unwrap().len() as u64; + print!("."); + stdout().flush().unwrap(); + } + println!("done"); + let duration = time::Instant::now() - start; + println!("TARPC: {}MB/s", + (target as f64 / (1024f64 * 1024f64)) / + (duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 10E9)); +} + +fn bench_tcp(target: u64) { + let l = net::TcpListener::bind("localhost:0").unwrap(); + let addr = l.local_addr().unwrap(); + thread::spawn(move || { + let (mut stream, _) = l.accept().unwrap(); + while let Ok(_) = stream.write_all(&*BUF) { + } + }); + let mut stream = net::TcpStream::connect(&addr).unwrap(); + let mut buf = vec![0; CHUNK_SIZE as usize]; + let start = time::Instant::now(); + let mut nread = 0; + while nread < target { + stream.read_exact(&mut buf[..]).unwrap(); + nread += CHUNK_SIZE as u64; + print!("."); + stdout().flush().unwrap(); + } + println!("done"); + let duration = time::Instant::now() - start; + println!("TCP: {}MB/s", + (target as f64 / (1024f64 * 1024f64)) / + (duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 10E9)); +} + +fn main() { + let _ = env_logger::init(); + &*BUF; // to non-lazily initialize it. + bench_tcp(256 << 20); + bench_tarpc(256 << 20); +} diff --git a/examples/two_clients.rs b/examples/two_clients.rs new file mode 100644 index 0000000..52766e0 --- /dev/null +++ b/examples/two_clients.rs @@ -0,0 +1,78 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the MIT License, . +// This file may not be copied, modified, or distributed except according to those terms. + +#![feature(conservative_impl_trait, plugin)] +#![plugin(snake_to_camel)] + +#[macro_use] +extern crate log; +#[macro_use] +extern crate tarpc; +extern crate serde; +extern crate bincode; +extern crate env_logger; +extern crate futures; + +use bar::FutureServiceExt as BarExt; +use baz::FutureServiceExt as BazExt; +use tarpc::util::Never; +use tarpc::sync::Connect; + +mod bar { + service! { + rpc bar(i: i32) -> i32; + } +} + +#[derive(Clone)] +struct Bar; +impl bar::FutureService for Bar { + type Bar = futures::Finished; + + fn bar(&self, i: i32) -> Self::Bar { + futures::finished(i) + } +} + +mod baz { + service! { + rpc baz(s: String) -> String; + } +} + +#[derive(Clone)] +struct Baz; +impl baz::FutureService for Baz { + type Baz = futures::Finished; + + fn baz(&self, s: String) -> Self::Baz { + futures::finished(format!("Hello, {}!", s)) + } +} + +macro_rules! pos { + () => (concat!(file!(), ":", line!())) +} + +fn main() { + let _ = env_logger::init(); + let bar = Bar.listen("localhost:0").unwrap(); + let baz = Baz.listen("localhost:0").unwrap(); + let bar_client = bar::SyncClient::connect(bar.local_addr()).unwrap(); + let baz_client = baz::SyncClient::connect(baz.local_addr()).unwrap(); + + info!("Result: {:?}", bar_client.bar(&17)); + + let total = 20; + for i in 1..(total + 1) { + if i % 2 == 0 { + info!("Result 1: {:?}", bar_client.bar(&i)); + } else { + info!("Result 2: {:?}", baz_client.baz(&i.to_string())); + } + } + + info!("Done."); +} diff --git a/hooks/pre-commit b/hooks/pre-commit index 2db031a..c41a546 100755 --- a/hooks/pre-commit +++ b/hooks/pre-commit @@ -89,22 +89,23 @@ fi # not only contain discrete files. printf "${PREFIX} Checking formatting ... " FMTRESULT=0 +diff="" for file in $(git diff --name-only --cached); do if [ ${file: -3} == ".rs" ]; then - diff=$(rustfmt --skip-children --write-mode=diff $file) - if grep --quiet "^Diff at line" <<< "$diff"; then - FMTRESULT=1 - fi + diff="$diff$(rustfmt --skip-children --write-mode=diff $file)" fi done +if grep --quiet "^Diff at line" <<< "$diff"; then + FMTRESULT=1 +fi if [ "${TARPC_SKIP_RUSTFMT}" == 1 ]; then printf "${SKIPPED}\n"$? elif [ ${FMTRESULT} != 0 ]; then FAILED=1 printf "${FAILURE}\n" - echo "$diff" | sed '/Using rustfmt.*$/d' + echo "$diff" | sed 's/Using rustfmt config file.*$/d/' else printf "${SUCCESS}\n" fi diff --git a/hooks/pre-push b/hooks/pre-push index b57b17b..6d63c15 100755 --- a/hooks/pre-push +++ b/hooks/pre-push @@ -57,20 +57,23 @@ PREPUSH_RESULT=0 # args: # 1 - cargo command to run (build/test) -# 2 - directory name of crate to build -# 3 - rust toolchain (nightly/stable/beta) +# 2 - rust toolchain (nightly/stable/beta) run_cargo() { if [ "$1" == "build" ]; then VERB=Building else VERB=Testing fi - if [ "$3" != "" ]; then - printf "${PREFIX} $VERB $2 on $3 ... " - rustup run $3 cargo $1 --manifest-path $2/Cargo.toml &>/dev/null + if [ "$2" != "" ]; then + printf "${PREFIX} $VERB on $2... " + if [ "$2" != "nightly" ]; then + rustup run $2 cargo $1 &>/dev/null + else + rustup run nightly cargo $1 --features unstable &>/dev/null + fi else - printf "${PREFIX} $VERB $2 ... " - cargo $1 --manifest-path $2/Cargo.toml &>/dev/null + printf "${PREFIX} $VERB... " + cargo $1 &>/dev/null fi if [ "$?" != "0" ]; then printf "${FAILURE}\n" @@ -92,7 +95,7 @@ check_toolchain() { fi } -printf "${PREFIX} Checking for rustup ... " +printf "${PREFIX} Checking for rustup or current toolchain directive... " command -v rustup &>/dev/null if [ "$?" == 0 ] && [ "${TARPC_USE_CURRENT_TOOLCHAIN}" == "" ]; then printf "${SUCCESS}\n" @@ -104,24 +107,22 @@ if [ "$?" == 0 ] && [ "${TARPC_USE_CURRENT_TOOLCHAIN}" == "" ]; then exit 1 fi - run_cargo build tarpc stable - run_cargo build tarpc_examples stable - - run_cargo build tarpc beta - run_cargo build tarpc_examples beta - - run_cargo build tarpc nightly - run_cargo build tarpc_examples nightly + run_cargo build stable + run_cargo build beta + run_cargo build nightly # We still rely on some nightly stuff for tests - run_cargo test tarpc nightly - run_cargo test tarpc_examples nightly + run_cargo test nightly else - printf "${YELLOW}NOT FOUND${NC}\n" - printf "${WARNING} Falling back to current toolchain: $(rustc -V)\n" + if [ "${TARPC_USE_CURRENT_TOOLCHAIN}" == "" ]; then + printf "${YELLOW}NOT FOUND${NC}\n" + printf "${WARNING} Falling back to current toolchain: $(rustc -V)\n" + else + printf "${SUCCESS}\n" + fi - run_cargo test tarpc - run_cargo test tarpc_examples + run_cargo build + run_cargo test fi exit $PREPUSH_RESULT diff --git a/tarpc/rustfmt.toml b/rustfmt.toml similarity index 100% rename from tarpc/rustfmt.toml rename to rustfmt.toml diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..488f71c --- /dev/null +++ b/src/client.rs @@ -0,0 +1,108 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the MIT License, . +// This file may not be copied, modified, or distributed except according to those terms. + +use Packet; +use futures::BoxFuture; +use futures::stream::Empty; +use std::fmt; +use std::io; +use tokio_service::Service; +use tokio_proto::pipeline; + +/// A thin wrapper around `pipeline::Client` that handles Serialization. +#[derive(Clone)] +pub struct Client { + inner: pipeline::Client, Empty<(), io::Error>, io::Error>, +} + +impl Service for Client { + type Req = Packet; + type Resp = Vec; + type Error = io::Error; + type Fut = BoxFuture, io::Error>; + + fn call(&self, request: Packet) -> Self::Fut { + self.inner.call(pipeline::Message::WithoutBody(request)) + } +} + +impl fmt::Debug for Client { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "Client {{ .. }}") + } +} + +/// Exposes a trait for connecting asynchronously to servers. +pub mod future { + use futures::{self, BoxFuture, Future}; + use protocol::{LOOP_HANDLE, TarpcTransport}; + use std::io; + use std::net::SocketAddr; + use super::Client; + use take::Take; + use tokio_core::TcpStream; + use tokio_proto::pipeline; + + + /// Types that can connect to a server asynchronously. + pub trait Connect: Sized { + /// The type of the future returned when calling connect. + type Fut: Future; + + /// Connects to a server located at the given address. + fn connect(addr: &SocketAddr) -> Self::Fut; + } + + /// A future that resolves to a `Client` or an `io::Error`. + #[doc(hidden)] + pub type ClientFuture = futures::Map, fn(TcpStream) -> Client>; + + impl Connect for Client { + type Fut = ClientFuture; + + /// Starts an event loop on a thread and registers a new client + /// connected to the given address. + fn connect(addr: &SocketAddr) -> ClientFuture { + fn connect(stream: TcpStream) -> Client { + let loop_handle = LOOP_HANDLE.clone(); + let service = Take::new(move || Ok(TarpcTransport::new(stream))); + Client { inner: pipeline::connect(loop_handle, service) } + } + LOOP_HANDLE.clone() + .tcp_connect(addr) + .map(connect) + } + } +} + +/// Exposes a trait for connecting synchronously to servers. +pub mod sync { + use futures::Future; + use std::io; + use std::net::ToSocketAddrs; + use super::Client; + + /// Types that can connect to a server synchronously. + pub trait Connect: Sized { + /// Connects to a server located at the given address. + fn connect(addr: A) -> Result where A: ToSocketAddrs; + } + + impl Connect for Client { + fn connect(addr: A) -> Result + where A: ToSocketAddrs + { + let addr = if let Some(a) = try!(addr.to_socket_addrs()).next() { + a + } else { + return Err(io::Error::new(io::ErrorKind::AddrNotAvailable, + "`ToSocketAddrs::to_socket_addrs` returned an empty \ + iterator.")); + }; + ::connect(&addr).wait() + } + } +} + diff --git a/src/errors.rs b/src/errors.rs new file mode 100644 index 0000000..27db58b --- /dev/null +++ b/src/errors.rs @@ -0,0 +1,144 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the MIT License, . +// This file may not be copied, modified, or distributed except according to those terms. + +use {bincode, futures}; +use std::{fmt, io}; +use std::error::Error as StdError; +use tokio_proto::pipeline; +use serde::{Deserialize, Serialize}; + +/// All errors that can occur during the use of tarpc. +#[derive(Debug)] +pub enum Error + where E: SerializableError +{ + /// No address found for the specified address. + /// + /// Depending on the outcome of address resolution, `ToSocketAddrs` may not yield any + /// values, which will propagate as this variant. + NoAddressFound, + /// Any IO error. + Io(io::Error), + /// Error in deserializing a server response. + /// + /// Typically this indicates a faulty implementation of `serde::Serialize` or + /// `serde::Deserialize`. + ClientDeserialize(bincode::serde::DeserializeError), + /// Error in serializing a client request. + /// + /// Typically this indicates a faulty implementation of `serde::Serialize`. + ClientSerialize(bincode::serde::SerializeError), + /// Error in deserializing a client request. + /// + /// Typically this indicates a faulty implementation of `serde::Serialize` or + /// `serde::Deserialize`. + ServerDeserialize(String), + /// Error in serializing a server response. + /// + /// Typically this indicates a faulty implementation of `serde::Serialize`. + ServerSerialize(String), + /// The server canceled the response before it was completed. + ReplyCanceled, + /// The server was unable to reply to the rpc for some reason. + App(E), +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Error::NoAddressFound | Error::ReplyCanceled => write!(f, "{}", self.description()), + Error::ClientDeserialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e), + Error::ClientSerialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e), + Error::ServerDeserialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e), + Error::ServerSerialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e), + Error::App(ref e) => fmt::Display::fmt(e, f), + Error::Io(ref e) => fmt::Display::fmt(e, f), + } + } +} + +impl StdError for Error { + fn description(&self) -> &str { + match *self { + Error::NoAddressFound => "No addresses were returned by `ToSocketAddrs::to_socket_addrs`.", + Error::ClientDeserialize(_) => "The client failed to deserialize the server response.", + Error::ClientSerialize(_) => "The client failed to serialize the request.", + Error::ServerDeserialize(_) => "The server failed to deserialize the request.", + Error::ServerSerialize(_) => "The server failed to serialize the response.", + Error::ReplyCanceled => "The server canceled sending a response.", + Error::App(ref e) => e.description(), + Error::Io(ref e) => e.description(), + } + } + + fn cause(&self) -> Option<&StdError> { + match *self { + Error::ClientDeserialize(ref e) => e.cause(), + Error::ClientSerialize(ref e) => e.cause(), + Error::NoAddressFound | + Error::ServerDeserialize(_) | + Error::ServerSerialize(_) | + Error::ReplyCanceled | + Error::App(_) => None, + Error::Io(ref e) => e.cause(), + } + } +} + +impl From>> for Error { + fn from(err: pipeline::Error>) -> Self { + match err { + pipeline::Error::Transport(e) => e, + pipeline::Error::Io(e) => e.into(), + } + } +} + +impl From for Error { + fn from(err: io::Error) -> Self { + Error::Io(err) + } +} + +impl From> for Error { + fn from(err: WireError) -> Self { + match err { + WireError::ReplyCanceled => Error::ReplyCanceled, + WireError::ServerDeserialize(s) => Error::ServerDeserialize(s), + WireError::ServerSerialize(s) => Error::ServerSerialize(s), + WireError::App(e) => Error::App(e), + } + } +} + +/// A serializable, server-supplied error. +#[doc(hidden)] +#[derive(Deserialize, Serialize, Clone, Debug)] +pub enum WireError + where E: SerializableError +{ + /// The server canceled the response before it was completed. + ReplyCanceled, + /// Error in deserializing a client request. + ServerDeserialize(String), + /// Error in serializing server response. + ServerSerialize(String), + /// The server was unable to reply to the rpc for some reason. + App(E), +} + +impl From for WireError + where E: SerializableError +{ + fn from(_: futures::Canceled) -> Self { + WireError::ReplyCanceled + } +} + +/// A serializable error. +pub trait SerializableError: StdError + Deserialize + Serialize + Send + 'static {} + +impl SerializableError for E {} + diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..318f7f4 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,116 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the MIT License, . +// This file may not be copied, modified, or distributed except according to those terms. + +//! tarpc is an RPC framework for rust with a focus on ease of use. Defining a +//! service can be done in just a few lines of code, and most of the boilerplate of +//! writing a server is taken care of for you. +//! +//! ## What is an RPC framework? +//! "RPC" stands for "Remote Procedure Call," a function call where the work of +//! producing the return value is being done somewhere else. When an rpc function is +//! invoked, behind the scenes the function contacts some other process somewhere +//! and asks them to evaluate the function instead. The original function then +//! returns the value produced by the other process. +//! +//! RPC frameworks are a fundamental building block of most microservices-oriented +//! architectures. Two well-known ones are [gRPC](http://www.grpc.io) and +//! [Cap'n Proto](https://capnproto.org/). +//! +//! tarpc differentiates itself from other RPC frameworks by defining the schema in code, +//! rather than in a separate language such as .proto. This means there's no separate compilation +//! process, and no cognitive context switching between different languages. Additionally, it +//! works with the community-backed library serde: any serde-serializable type can be used as +//! arguments to tarpc fns. +//! +//! Example usage: +//! +//! ``` +//! // required by `FutureClient` (not used in this example) +//! #![feature(conservative_impl_trait, plugin)] +//! #![plugin(snake_to_camel)] +//! +//! #[macro_use] +//! extern crate tarpc; +//! +//! use tarpc::sync::Connect; +//! use tarpc::util::Never; +//! +//! service! { +//! rpc hello(name: String) -> String; +//! } +//! +//! #[derive(Clone)] +//! struct HelloServer; +//! +//! impl SyncService for HelloServer { +//! fn hello(&self, name: String) -> Result { +//! Ok(format!("Hello, {}!", name)) +//! } +//! } +//! +//! fn main() { +//! let addr = "localhost:10000"; +//! let _server = HelloServer.listen(addr).unwrap(); +//! let client = SyncClient::connect(addr).unwrap(); +//! println!("{}", client.hello(&"Mom".to_string()).unwrap()); +//! } +//! ``` +//! +#![deny(missing_docs)] +#![feature(custom_derive, plugin, question_mark, conservative_impl_trait, never_type)] +#![plugin(serde_macros, snake_to_camel)] + +extern crate bincode; +extern crate byteorder; +extern crate bytes; +#[macro_use] +extern crate lazy_static; +#[macro_use] +extern crate log; +extern crate take; + +#[doc(hidden)] +pub extern crate futures; +#[doc(hidden)] +pub extern crate futures_cpupool; +#[doc(hidden)] +pub extern crate serde; +#[doc(hidden)] +pub extern crate tokio_core; +#[doc(hidden)] +pub extern crate tokio_proto; +#[doc(hidden)] +pub extern crate tokio_service; + +pub use client::{sync, future}; +pub use errors::{Error, SerializableError}; + +#[doc(hidden)] +pub use client::Client; +#[doc(hidden)] +pub use client::future::ClientFuture; +#[doc(hidden)] +pub use errors::{WireError}; +#[doc(hidden)] +pub use protocol::{Packet, deserialize}; +#[doc(hidden)] +pub use server::{SerializeFuture, SerializedReply, listen, serialize_reply}; + +/// Provides some utility error types, as well as a trait for spawning futures on the default event +/// loop. +pub mod util; + +/// Provides the macro used for constructing rpc services and client stubs. +#[macro_use] +mod macros; +/// Provides the base client stubs used by the service macro. +mod client; +/// Provides the base server boilerplate used by service implementations. +mod server; +/// Provides the tarpc client and server, which implements the tarpc protocol. +/// The protocol is defined by the implementation. +mod protocol; +/// Provides a few different error types. +mod errors; diff --git a/src/macros.rs b/src/macros.rs new file mode 100644 index 0000000..5367f69 --- /dev/null +++ b/src/macros.rs @@ -0,0 +1,795 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the MIT License, . +// This file may not be copied, modified, or distributed except according to those terms. + +/// Creates an enum where each variant contains a `Future`. The created enum impls `Future`. +/// Useful when a fn needs to return possibly many different types of futures. +#[macro_export] +macro_rules! future_enum { + { + $(#[$attr:meta])* + pub enum $name:ident<$($tp:ident),*> { + $(#[$attrv:meta])* + $($variant:ident($inner:ty)),* + } + } => { + $(#[$attr])* + pub enum $name<$($tp),*> { + $(#[$attrv])* + $($variant($inner)),* + } + + impl<__T, __E, $($tp),*> $crate::futures::Future for $name<$($tp),*> + where __T: Send + 'static, + $($inner: $crate::futures::Future),* + { + type Item = __T; + type Error = __E; + + fn poll(&mut self) -> $crate::futures::Poll { + match *self { + $($name::$variant(ref mut f) => $crate::futures::Future::poll(f)),* + } + } + } + }; + { + $(#[$attr:meta])* + enum $name:ident<$($tp:ident),*> { + $(#[$attrv:meta])* + $($variant:ident($inner:ty)),* + } + } => { + $(#[$attr])* + enum $name<$($tp),*> { + $(#[$attrv])* + $($variant($inner)),* + } + + impl<__T, __E, $($tp),*> $crate::futures::Future for $name<$($tp),*> + where __T: Send + 'static, + $($inner: $crate::futures::Future),* + { + type Item = __T; + type Error = __E; + + fn poll(&mut self) -> $crate::futures::Poll { + match *self { + $($name::$variant(ref mut f) => $crate::futures::Future::poll(f)),* + } + } + } + } +} + +#[doc(hidden)] +#[macro_export] +macro_rules! as_item { + ($i:item) => {$i}; +} + +#[doc(hidden)] +#[macro_export] +macro_rules! impl_serialize { + ($impler:ident, { $($lifetime:tt)* }, $(@($name:ident $n:expr))* -- #($_n:expr) ) => { + as_item! { + impl$($lifetime)* $crate::serde::Serialize for $impler$($lifetime)* { + #[inline] + fn serialize(&self, serializer: &mut S) -> ::std::result::Result<(), S::Error> + where S: $crate::serde::Serializer + { + match *self { + $( + $impler::$name(ref field) => + $crate::serde::Serializer::serialize_newtype_variant( + serializer, + stringify!($impler), + $n, + stringify!($name), + field, + ) + ),* + } + } + } + } + }; + // All args are wrapped in a tuple so we can use the newtype variant for each one. + ($impler:ident, + { $($lifetime:tt)* }, + $(@$finished:tt)* + -- #($n:expr) $name:ident($field:ty) $($req:tt)*) => + ( + impl_serialize!($impler, + { $($lifetime)* }, + $(@$finished)* @($name $n) + -- #($n + 1) $($req)*); + ); + // Entry + ($impler:ident, + { $($lifetime:tt)* }, + $($started:tt)*) => (impl_serialize!($impler, { $($lifetime)* }, -- #(0) $($started)*);); +} + +#[doc(hidden)] +#[macro_export] +macro_rules! impl_deserialize { + ($impler:ident, $(@($name:ident $n:expr))* -- #($_n:expr) ) => ( + impl $crate::serde::Deserialize for $impler { + #[inline] + fn deserialize(deserializer: &mut D) + -> ::std::result::Result<$impler, D::Error> + where D: $crate::serde::Deserializer + { + #[allow(non_camel_case_types, unused)] + enum Field { + $($name),* + } + impl $crate::serde::Deserialize for Field { + #[inline] + fn deserialize(deserializer: &mut D) + -> ::std::result::Result + where D: $crate::serde::Deserializer + { + struct FieldVisitor; + impl $crate::serde::de::Visitor for FieldVisitor { + type Value = Field; + + #[inline] + fn visit_usize(&mut self, value: usize) + -> ::std::result::Result + where E: $crate::serde::de::Error, + { + $( + if value == $n { + return ::std::result::Result::Ok(Field::$name); + } + )* + ::std::result::Result::Err( + $crate::serde::de::Error::custom( + format!("No variants have a value of {}!", value)) + ) + } + } + deserializer.deserialize_struct_field(FieldVisitor) + } + } + + struct Visitor; + impl $crate::serde::de::EnumVisitor for Visitor { + type Value = $impler; + + #[inline] + fn visit(&mut self, mut visitor: V) + -> ::std::result::Result<$impler, V::Error> + where V: $crate::serde::de::VariantVisitor + { + match try!(visitor.visit_variant()) { + $( + Field::$name => { + let val = try!(visitor.visit_newtype()); + ::std::result::Result::Ok($impler::$name(val)) + } + ),* + } + } + } + const VARIANTS: &'static [&'static str] = &[ + $( + stringify!($name) + ),* + ]; + deserializer.deserialize_enum(stringify!($impler), VARIANTS, Visitor) + } + } + ); + // All args are wrapped in a tuple so we can use the newtype variant for each one. + ($impler:ident, $(@$finished:tt)* -- #($n:expr) $name:ident($field:ty) $($req:tt)*) => ( + impl_deserialize!($impler, $(@$finished)* @($name $n) -- #($n + 1) $($req)*); + ); + // Entry + ($impler:ident, $($started:tt)*) => (impl_deserialize!($impler, -- #(0) $($started)*);); +} + +/// The main macro that creates RPC services. +/// +/// Rpc methods are specified, mirroring trait syntax: +/// +/// ``` +/// # #![feature(conservative_impl_trait, plugin)] +/// # #![plugin(snake_to_camel)] +/// # #[macro_use] extern crate tarpc; +/// # fn main() {} +/// # service! { +/// /// Say hello +/// rpc hello(name: String) -> String; +/// # } +/// ``` +/// +/// Attributes can be attached to each rpc. These attributes +/// will then be attached to the generated service traits' +/// corresponding `fn`s, as well as to the client stubs' RPCs. +/// +/// The following items are expanded in the enclosing module: +/// +/// * `FutureService` -- the trait defining the RPC service via a `Future` API. +/// * `SyncService` -- a service trait that provides a synchronous API for when +/// spawning a thread per request is acceptable. +/// * `FutureServiceExt` -- provides the methods for starting a service. There is an umbrella impl +/// for all implers of `FutureService`. It's a separate trait to prevent +/// name collisions with RPCs. +/// * `SyncServiceExt` -- same as `FutureServiceExt` but for `SyncService`. +/// * `FutureClient` -- a client whose RPCs return `Future`s. +/// * `SyncClient` -- a client whose RPCs block until the reply is available. Easiest +/// interface to use, as it looks the same as a regular function call. +/// +#[macro_export] +macro_rules! service { +// Entry point + ( + $( + $(#[$attr:meta])* + rpc $fn_name:ident( $( $arg:ident : $in_:ty ),* ) $(-> $out:ty)* $(| $error:ty)*; + )* + ) => { + service! {{ + $( + $(#[$attr])* + rpc $fn_name( $( $arg : $in_ ),* ) $(-> $out)* $(| $error)*; + )* + }} + }; +// Pattern for when the next rpc has an implicit unit return type and no error type. + ( + { + $(#[$attr:meta])* + rpc $fn_name:ident( $( $arg:ident : $in_:ty ),* ); + + $( $unexpanded:tt )* + } + $( $expanded:tt )* + ) => { + service! { + { $( $unexpanded )* } + + $( $expanded )* + + $(#[$attr])* + rpc $fn_name( $( $arg : $in_ ),* ) -> () | $crate::util::Never; + } + }; +// Pattern for when the next rpc has an explicit return type and no error type. + ( + { + $(#[$attr:meta])* + rpc $fn_name:ident( $( $arg:ident : $in_:ty ),* ) -> $out:ty; + + $( $unexpanded:tt )* + } + $( $expanded:tt )* + ) => { + service! { + { $( $unexpanded )* } + + $( $expanded )* + + $(#[$attr])* + rpc $fn_name( $( $arg : $in_ ),* ) -> $out | $crate::util::Never; + } + }; +// Pattern for when the next rpc has an implicit unit return type and an explicit error type. + ( + { + $(#[$attr:meta])* + rpc $fn_name:ident( $( $arg:ident : $in_:ty ),* ) | $error:ty; + + $( $unexpanded:tt )* + } + $( $expanded:tt )* + ) => { + service! { + { $( $unexpanded )* } + + $( $expanded )* + + $(#[$attr])* + rpc $fn_name( $( $arg : $in_ ),* ) -> () | $error; + } + }; +// Pattern for when the next rpc has an explicit return type and an explicit error type. + ( + { + $(#[$attr:meta])* + rpc $fn_name:ident( $( $arg:ident : $in_:ty ),* ) -> $out:ty | $error:ty; + + $( $unexpanded:tt )* + } + $( $expanded:tt )* + ) => { + service! { + { $( $unexpanded )* } + + $( $expanded )* + + $(#[$attr])* + rpc $fn_name( $( $arg : $in_ ),* ) -> $out | $error; + } + }; +// Pattern for when all return types have been expanded + ( + { } // none left to expand + $( + $(#[$attr:meta])* + rpc $fn_name:ident ( $( $arg:ident : $in_:ty ),* ) -> $out:ty | $error:ty; + )* + ) => { + +/// Defines the `Future` RPC service. Implementors must be `Clone`, `Send`, and `'static`, +/// as required by `tokio_proto::NewService`. This is required so that the service can be used +/// to respond to multiple requests concurrently. + pub trait FutureService: + ::std::marker::Send + + ::std::clone::Clone + + 'static + { + $( + + snake_to_camel! { + /// The type of future returned by the fn of the same name. + type $fn_name: $crate::futures::Future + Send; + } + + $(#[$attr])* + fn $fn_name(&self, $($arg:$in_),*) -> ty_snake_to_camel!(Self::$fn_name); + )* + } + + /// Provides a function for starting the service. This is a separate trait from + /// `FutureService` to prevent collisions with the names of RPCs. + pub trait FutureServiceExt: FutureService { + /// Registers the service with the given registry, listening on the given address. + fn listen(self, addr: L) + -> ::std::io::Result<$crate::tokio_proto::server::ServerHandle> + where L: ::std::net::ToSocketAddrs + { + return $crate::listen(addr, __AsyncServer(self)); + + #[derive(Clone)] + struct __AsyncServer(S); + + impl ::std::fmt::Debug for __AsyncServer { + fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + write!(fmt, "__AsyncServer {{ .. }}") + } + } + + #[allow(non_camel_case_types)] + enum Reply { + DeserializeError($crate::SerializeFuture), + $($fn_name($crate::futures::Then<$crate::futures::MapErr $crate::WireError<$error>>, + $crate::SerializeFuture, + fn(::std::result::Result<$out, $crate::WireError<$error>>) + -> $crate::SerializeFuture>)),* + } + + impl $crate::futures::Future for Reply { + type Item = $crate::SerializedReply; + type Error = ::std::io::Error; + + fn poll(&mut self) -> $crate::futures::Poll { + match *self { + Reply::DeserializeError(ref mut f) => $crate::futures::Future::poll(f), + $(Reply::$fn_name(ref mut f) => $crate::futures::Future::poll(f)),* + } + } + } + + + impl $crate::tokio_service::Service for __AsyncServer + where S: FutureService + { + type Req = ::std::vec::Vec; + type Resp = $crate::SerializedReply; + type Error = ::std::io::Error; + type Fut = Reply; + + fn call(&self, req: Self::Req) -> Self::Fut { + #[allow(non_camel_case_types, unused)] + #[derive(Debug)] + enum __ServerSideRequest { + $( + $fn_name(( $($in_,)* )) + ),* + } + + impl_deserialize!(__ServerSideRequest, $($fn_name(($($in_),*)))*); + + let request = $crate::deserialize(&req); + let request: __ServerSideRequest = match request { + ::std::result::Result::Ok(request) => request, + ::std::result::Result::Err(e) => { + return Reply::DeserializeError(deserialize_error(e)); + } + }; + match request {$( + __ServerSideRequest::$fn_name(( $($arg,)* )) => { + const SERIALIZE: fn(::std::result::Result<$out, $crate::WireError<$error>>) + -> $crate::SerializeFuture = $crate::serialize_reply; + const TO_APP: fn($error) -> $crate::WireError<$error> = $crate::WireError::App; + + let reply = FutureService::$fn_name(&self.0, $($arg),*); + let reply = $crate::futures::Future::map_err(reply, TO_APP); + let reply = $crate::futures::Future::then(reply, SERIALIZE); + return Reply::$fn_name(reply); + } + )*} + + #[inline] + fn deserialize_error(e: E) -> $crate::SerializeFuture { + let err = $crate::WireError::ServerDeserialize::<$crate::util::Never>(e.to_string()); + $crate::serialize_reply(::std::result::Result::Err::<(), _>(err)) + } + } + } + } + } + +/// Defines the blocking RPC service. Must be `Clone`, `Send`, and `'static`, +/// as required by `tokio_proto::NewService`. This is required so that the service can be used +/// to respond to multiple requests concurrently. + pub trait SyncService: + ::std::marker::Send + + ::std::clone::Clone + + 'static + { + $( + $(#[$attr])* + fn $fn_name(&self, $($arg:$in_),*) -> ::std::result::Result<$out, $error>; + )* + } + + /// Provides a function for starting the service. This is a separate trait from + /// `SyncService` to prevent collisions with the names of RPCs. + pub trait SyncServiceExt: SyncService { + /// Registers the service with the given registry, listening on the given address. + fn listen(self, addr: L) + -> ::std::io::Result<$crate::tokio_proto::server::ServerHandle> + where L: ::std::net::ToSocketAddrs + { + + let service = __SyncServer { + service: self, + }; + return service.listen(addr); + + #[derive(Clone)] + struct __SyncServer { + service: S, + } + + impl FutureService for __SyncServer where S: SyncService { + $( + impl_snake_to_camel! { + type $fn_name = + $crate::futures::Flatten< + $crate::futures::MapErr< + $crate::futures::Oneshot< + $crate::futures::Done<$out, $error>>, + fn($crate::futures::Canceled) -> $error>>; + } + fn $fn_name(&self, $($arg:$in_),*) -> ty_snake_to_camel!(Self::$fn_name) { + fn unimplemented(_: $crate::futures::Canceled) -> $error { + // TODO(tikue): what do do if SyncService panics? + unimplemented!() + } + let (c, p) = $crate::futures::oneshot(); + let service = self.clone(); + ::std::thread::spawn(move || { + let reply = SyncService::$fn_name(&service.service, $($arg),*); + c.complete($crate::futures::IntoFuture::into_future(reply)); + }); + let p = $crate::futures::Future::map_err(p, unimplemented as fn($crate::futures::Canceled) -> $error); + $crate::futures::Future::flatten(p) + } + )* + } + } + } + + impl FutureServiceExt for A where A: FutureService {} + impl SyncServiceExt for S where S: SyncService {} + + #[allow(unused)] + #[derive(Clone, Debug)] +/// The client stub that makes RPC calls to the server. Exposes a blocking interface. + pub struct SyncClient(FutureClient); + + impl $crate::sync::Connect for SyncClient { + fn connect(addr: A) -> ::std::result::Result + where A: ::std::net::ToSocketAddrs, + { + let mut addrs = try!(::std::net::ToSocketAddrs::to_socket_addrs(&addr)); + let addr = if let ::std::option::Option::Some(a) = ::std::iter::Iterator::next(&mut addrs) { + a + } else { + return ::std::result::Result::Err( + ::std::io::Error::new( + ::std::io::ErrorKind::AddrNotAvailable, + "`ToSocketAddrs::to_socket_addrs` returned an empty iterator.")); + }; + let client = ::connect(&addr); + let client = $crate::futures::Future::wait(client); + let client = SyncClient(try!(client)); + ::std::result::Result::Ok(client) + } + } + + impl SyncClient { + $( + #[allow(unused)] + $(#[$attr])* + #[inline] + pub fn $fn_name(&self, $($arg: &$in_),*) -> ::std::result::Result<$out, $crate::Error<$error>> { + let rpc = (self.0).$fn_name($($arg),*); + $crate::futures::Future::wait(rpc) + } + )* + } + + #[allow(unused)] + #[derive(Clone, Debug)] +/// The client stub that makes RPC calls to the server. Exposes a Future interface. + pub struct FutureClient($crate::Client); + + impl $crate::future::Connect for FutureClient { + type Fut = $crate::futures::Map<$crate::ClientFuture, fn($crate::Client) -> Self>; + + fn connect(addr: &::std::net::SocketAddr) -> Self::Fut { + let client = <$crate::Client as $crate::future::Connect>::connect(addr); + $crate::futures::Future::map(client, FutureClient) + } + } + + #[allow(non_camel_case_types, unused)] + #[derive(Debug)] + enum __ClientSideRequest<'a> { + $( + $fn_name(&'a ( $(&'a $in_,)* )) + ),* + } + + impl_serialize!(__ClientSideRequest, { <'__a> }, $($fn_name(($($in_),*)))*); + + impl FutureClient { + $( + #[allow(unused)] + $(#[$attr])* + #[inline] + pub fn $fn_name(&self, $($arg: &$in_),*) + -> impl $crate::futures::Future> + Send + 'static + { + future_enum! { + enum Fut { + Called(C), + Failed(F) + } + } + + let args = ($($arg,)*); + let req = &__ClientSideRequest::$fn_name(&args); + let req = match $crate::Packet::serialize(&req) { + ::std::result::Result::Err(e) => return Fut::Failed($crate::futures::failed($crate::Error::ClientSerialize(e))), + ::std::result::Result::Ok(req) => req, + }; + let fut = $crate::tokio_service::Service::call(&self.0, req); + Fut::Called($crate::futures::Future::then(fut, move |msg| { + let msg: Vec = try!(msg); + let msg: ::std::result::Result<::std::result::Result<$out, $crate::WireError<$error>>, _> + = $crate::deserialize(&msg); + match msg { + ::std::result::Result::Ok(msg) => ::std::result::Result::Ok(try!(msg)), + ::std::result::Result::Err(e) => ::std::result::Result::Err($crate::Error::ClientDeserialize(e)), + } + })) + } + )* + + } + } +} + +// allow dead code; we're just testing that the macro expansion compiles +#[allow(dead_code)] +#[cfg(test)] +mod syntax_test { + use util::Never; + service! { + rpc hello() -> String; + #[doc="attr"] + rpc attr(s: String) -> String; + rpc no_args_no_return(); + rpc no_args() -> (); + rpc one_arg(foo: String) -> i32; + rpc two_args_no_return(bar: String, baz: u64); + rpc two_args(bar: String, baz: u64) -> String; + rpc no_args_ret_error() -> i32 | Never; + rpc one_arg_ret_error(foo: String) -> String | Never; + rpc no_arg_implicit_return_error() | Never; + #[doc="attr"] + rpc one_arg_implicit_return_error(foo: String) | Never; + } +} + +#[cfg(test)] +mod functional_test { + use futures::{Future, failed}; + extern crate env_logger; + + service! { + rpc add(x: i32, y: i32) -> i32; + rpc hey(name: String) -> String; + } + + mod sync { + use sync::Connect; + use util::Never; + use super::env_logger; + use super::{SyncClient, SyncService, SyncServiceExt}; + + #[derive(Clone, Copy)] + struct Server; + + impl SyncService for Server { + fn add(&self, x: i32, y: i32) -> Result { + Ok(x + y) + } + fn hey(&self, name: String) -> Result { + Ok(format!("Hey, {}.", name)) + } + } + + #[test] + fn simple() { + let _ = env_logger::init(); + let handle = Server.listen("localhost:0").unwrap(); + let client = SyncClient::connect(handle.local_addr()).unwrap(); + assert_eq!(3, client.add(&1, &2).unwrap()); + assert_eq!("Hey, Tim.", client.hey(&"Tim".to_string()).unwrap()); + } + + #[test] + fn clone() { + let handle = Server.listen("localhost:0").unwrap(); + let client1 = SyncClient::connect(handle.local_addr()).unwrap(); + let client2 = client1.clone(); + assert_eq!(3, client1.add(&1, &2).unwrap()); + assert_eq!(3, client2.add(&1, &2).unwrap()); + } + + #[test] + fn other_service() { + let _ = env_logger::init(); + let handle = Server.listen("localhost:0").unwrap(); + let client = + super::other_service::SyncClient::connect(handle.local_addr()).unwrap(); + match client.foo().err().unwrap() { + ::Error::ServerDeserialize(_) => {} // good + bad => panic!("Expected Error::ServerDeserialize but got {}", bad), + } + } + } + + mod future { + use future::Connect; + use util::Never; + use futures::{Finished, Future, finished}; + use super::env_logger; + use super::{FutureClient, FutureService, FutureServiceExt}; + + #[derive(Clone)] + struct Server; + + impl FutureService for Server { + type Add = Finished; + + fn add(&self, x: i32, y: i32) -> Self::Add { + finished(x + y) + } + + type Hey = Finished; + + fn hey(&self, name: String) -> Self::Hey { + finished(format!("Hey, {}.", name)) + } + } + + #[test] + fn simple() { + let _ = env_logger::init(); + let handle = Server.listen("localhost:0").unwrap(); + let client = FutureClient::connect(handle.local_addr()).wait().unwrap(); + assert_eq!(3, client.add(&1, &2).wait().unwrap()); + assert_eq!("Hey, Tim.", client.hey(&"Tim".to_string()).wait().unwrap()); + } + + #[test] + fn clone() { + let _ = env_logger::init(); + let handle = Server.listen("localhost:0").unwrap(); + let client1 = FutureClient::connect(handle.local_addr()).wait().unwrap(); + let client2 = client1.clone(); + assert_eq!(3, client1.add(&1, &2).wait().unwrap()); + assert_eq!(3, client2.add(&1, &2).wait().unwrap()); + } + + #[test] + fn other_service() { + let _ = env_logger::init(); + let handle = Server.listen("localhost:0").unwrap(); + let client = + super::other_service::FutureClient::connect(handle.local_addr()).wait().unwrap(); + match client.foo().wait().err().unwrap() { + ::Error::ServerDeserialize(_) => {} // good + bad => panic!(r#"Expected Error::ServerDeserialize but got "{}""#, bad), + } + } + } + + pub mod error_service { + service! { + rpc bar() -> u32 | ::util::Message; + } + } + + #[derive(Clone)] + struct ErrorServer; + + impl error_service::FutureService for ErrorServer { + type Bar = ::futures::Failed; + + fn bar(&self) -> Self::Bar { + info!("Called bar"); + failed("lol jk".into()) + } + } + + #[test] + fn error() { + use future::Connect as Fc; + use sync::Connect as Sc; + use std::error::Error as E; + use self::error_service::*; + let _ = env_logger::init(); + + let handle = ErrorServer.listen("localhost:0").unwrap(); + let client = FutureClient::connect(handle.local_addr()).wait().unwrap(); + client.bar() + .then(move |result| { + match result.err().unwrap() { + ::Error::App(e) => { + assert_eq!(e.description(), "lol jk"); + Ok::<_, ()>(()) + } // good + bad => panic!("Expected Error::App but got {:?}", bad), + } + }) + .wait() + .unwrap(); + + let client = SyncClient::connect(handle.local_addr()).unwrap(); + match client.bar().err().unwrap() { + ::Error::App(e) => { + assert_eq!(e.description(), "lol jk"); + } // good + bad => panic!("Expected Error::App but got {:?}", bad), + } + } + + pub mod other_service { + service! { + rpc foo(); + } + } +} diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs new file mode 100644 index 0000000..8a30a2a --- /dev/null +++ b/src/protocol/mod.rs @@ -0,0 +1,123 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the MIT License, . +// This file may not be copied, modified, or distributed except according to those terms. + +use {futures, serde}; +use bincode::{SizeLimit, serde as bincode}; +use std::{io, thread}; +use std::collections::VecDeque; +use std::sync::mpsc; +use tokio_core::{Loop, LoopHandle}; +use tokio_proto::io::{Readiness, Transport}; +use tokio_proto::pipeline::Frame; + +lazy_static! { + #[doc(hidden)] + pub static ref LOOP_HANDLE: LoopHandle = { + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + let mut lupe = Loop::new().unwrap(); + tx.send(lupe.handle()).unwrap(); + // Run forever + lupe.run(futures::empty::<(), !>()).unwrap(); + }); + rx.recv().unwrap() + }; +} + +pub use self::writer::Packet; + +pub mod reader; +pub mod writer; + +/// A helper trait to provide the `map_non_block` function on Results. +trait MapNonBlock { + /// Maps a `Result` to a `Result>` by converting + /// operation-would-block errors into `Ok(None)`. + fn map_non_block(self) -> io::Result>; +} + +impl MapNonBlock for io::Result { + fn map_non_block(self) -> io::Result> { + use std::io::ErrorKind::WouldBlock; + + match self { + Ok(value) => Ok(Some(value)), + Err(err) => { + if let WouldBlock = err.kind() { + Ok(None) + } else { + Err(err) + } + } + } + } +} + +/// Deserialize a buffer into a `D` and its ID. On error, returns `tarpc::Error`. +pub fn deserialize(mut buf: &[u8]) -> Result { + bincode::deserialize_from(&mut buf, SizeLimit::Infinite) +} + +pub struct TarpcTransport { + stream: T, + read_state: reader::ReadState, + outbound: VecDeque, + head: Option, +} + +impl TarpcTransport { + pub fn new(stream: T) -> Self { + TarpcTransport { + stream: stream, + read_state: reader::ReadState::init(), + outbound: VecDeque::new(), + head: None, + } + } +} + +impl Readiness for TarpcTransport + where T: Readiness +{ + fn is_readable(&self) -> bool { + self.stream.is_readable() + } + + fn is_writable(&self) -> bool { + // Always allow writing... this isn't really the best strategy to do in + // practice, but it is the easiest to implement in this case. The + // number of in-flight requests can be controlled using the pipeline + // dispatcher. + true + } +} + +impl Transport for TarpcTransport + where T: io::Read + io::Write + Readiness, +{ + type In = Frame; + type Out = Frame, io::Error>; + + fn read(&mut self) -> io::Result, io::Error>>> { + self.read_state.next(&mut self.stream) + } + + fn write(&mut self, req: Frame) -> io::Result> { + match req { + Frame::Message(msg) => { + self.outbound.push_back(msg); + self.flush() + } + Frame::MessageWithBody(..) => unreachable!(), + Frame::Body(_) => unreachable!(), + Frame::Error(_) => unreachable!(), + Frame::Done => unreachable!(), + } + } + + fn flush(&mut self) -> io::Result> { + writer::NextWriteState::next(&mut self.head, &mut self.stream, &mut self.outbound) + } +} diff --git a/src/protocol/reader.rs b/src/protocol/reader.rs new file mode 100644 index 0000000..ddc5527 --- /dev/null +++ b/src/protocol/reader.rs @@ -0,0 +1,181 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the MIT License, . +// This file may not be copied, modified, or distributed except according to those terms. + +use byteorder::{BigEndian, ReadBytesExt}; +use bytes::{MutBuf, Take}; +use std::io::{self, Read}; +use std::mem; +use super::MapNonBlock; +use tokio_proto::pipeline::Frame; + +pub trait TryRead { + fn try_read_buf(&mut self, buf: &mut B) -> io::Result> + where Self: Sized + { + // Reads the length of the slice supplied by buf.mut_bytes into the buffer + // This is not guaranteed to consume an entire datagram or segment. + // If your protocol is msg based (instead of continuous stream) you should + // ensure that your buffer is large enough to hold an entire segment + // (1532 bytes if not jumbo frames) + let res = self.try_read(unsafe { buf.mut_bytes() }); + + if let Ok(Some(cnt)) = res { + unsafe { + buf.advance(cnt); + } + } + + res + } + + fn try_read(&mut self, buf: &mut [u8]) -> io::Result>; +} + +impl TryRead for T { + fn try_read(&mut self, dst: &mut [u8]) -> io::Result> { + self.read(dst).map_non_block() + } +} + +#[derive(Debug)] +pub struct U64Reader { + read: usize, + data: [u8; 8], +} + +impl U64Reader { + fn new() -> Self { + U64Reader { + read: 0, + data: [0; 8], + } + } +} + +impl MutBuf for U64Reader { + fn remaining(&self) -> usize { + 8 - self.read + } + + unsafe fn advance(&mut self, count: usize) { + self.read += count; + } + + unsafe fn mut_bytes(&mut self) -> &mut [u8] { + &mut self.data[self.read..] + } +} + +#[derive(Debug)] +enum NextReadAction { + Continue, + Stop(Result), +} + +trait MutBufExt: MutBuf { + type Inner; + + fn take(&mut self) -> Self::Inner; + + fn try_read(&mut self, stream: &mut R) -> io::Result> { + while let Some(bytes_read) = stream.try_read_buf(self)? { + debug!("Reader: read {} bytes, {} remaining.", + bytes_read, + self.remaining()); + if bytes_read == 0 { + debug!("Reader: connection broken."); + let err = io::Error::new(io::ErrorKind::BrokenPipe, "The connection was closed."); + return Ok(NextReadAction::Stop(Err(err))); + } + + if !self.has_remaining() { + trace!("Reader: finished."); + return Ok(NextReadAction::Stop(Ok(self.take()))); + } + } + Ok(NextReadAction::Continue) + } +} + +impl MutBufExt for U64Reader { + type Inner = u64; + + fn take(&mut self) -> u64 { + (&self.data as &[u8]).read_u64::().unwrap() + } +} + +impl MutBufExt for Take> { + type Inner = Vec; + + fn take(&mut self) -> Vec { + mem::replace(self.get_mut(), vec![]) + } +} + +/// A state machine that reads packets in non-blocking fashion. +#[derive(Debug)] +pub enum ReadState { + /// Tracks how many bytes of the message size have been read. + Len(U64Reader), + /// Tracks read progress. + Data(Take>), +} + +#[derive(Debug)] +enum NextReadState { + Same, + Next(ReadState), + Reset(Vec), +} + +impl ReadState { + pub fn init() -> ReadState { + ReadState::Len(U64Reader::new()) + } + + pub fn next(&mut self, + socket: &mut R) + -> io::Result, io::Error>>> { + loop { + let next = match *self { + ReadState::Len(ref mut len) => { + match len.try_read(socket)? { + NextReadAction::Continue => NextReadState::Same, + NextReadAction::Stop(result) => { + match result { + Ok(len) => { + let buf = Vec::with_capacity(len as usize); + NextReadState::Next(ReadState::Data(Take::new(buf, + len as usize))) + } + Err(e) => return Ok(Some(Frame::Error(e))), + } + } + } + } + ReadState::Data(ref mut buf) => { + match buf.try_read(socket)? { + NextReadAction::Continue => NextReadState::Same, + NextReadAction::Stop(result) => { + match result { + Ok(buf) => NextReadState::Reset(buf), + Err(e) => return Ok(Some(Frame::Error(e))), + } + } + } + } + }; + match next { + NextReadState::Same => return Ok(None), + NextReadState::Next(next) => *self = next, + NextReadState::Reset(packet) => { + *self = ReadState::init(); + return Ok(Some(Frame::Message(packet))); + } + } + } + } +} diff --git a/src/protocol/writer.rs b/src/protocol/writer.rs new file mode 100644 index 0000000..c122389 --- /dev/null +++ b/src/protocol/writer.rs @@ -0,0 +1,134 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the MIT License, . +// This file may not be copied, modified, or distributed except according to those terms. + +use bincode::SizeLimit; +use bincode::serde as bincode; +use byteorder::{BigEndian, WriteBytesExt}; +use bytes::Buf; +use serde::Serialize; +use std::collections::VecDeque; +use std::mem; +use std::io::{self, Cursor}; + +mod try_write { + use bytes::Buf; + use protocol::MapNonBlock; + use std::io::{self, Write}; + + pub trait TryWrite { + fn try_write_buf(&mut self, buf: &mut B) -> io::Result> + where Self: Sized + { + let res = self.try_write(buf.bytes()); + + if let Ok(Some(cnt)) = res { + buf.advance(cnt); + } + + res + } + + fn try_write(&mut self, buf: &[u8]) -> io::Result>; + } + + impl TryWrite for T { + fn try_write(&mut self, src: &[u8]) -> io::Result> { + self.write(src).map_non_block() + } + } +} + +/// The means of communication between client and server. +#[derive(Clone, Debug)] +pub struct Packet { + /// (payload_len: u64, payload) + /// + /// The payload is typically a serialized message. + pub buf: Cursor>, +} + +impl Packet { + /// Creates a new packet, (len, payload) + pub fn serialize(message: &S) -> Result + where S: Serialize + { + let payload_len = bincode::serialized_size(message); + + // (len, message) + let mut buf = Vec::with_capacity(mem::size_of::() + payload_len as usize); + + buf.write_u64::(payload_len).unwrap(); + bincode::serialize_into(&mut buf, message, SizeLimit::Infinite)?; + Ok(Packet { buf: Cursor::new(buf) }) + } +} + +#[derive(Debug)] +enum NextWriteAction { + Stop, + Continue, +} + +trait BufExt: Buf + Sized { + /// Writes data to stream. Returns Ok(true) if all data has been written or Ok(false) if + /// there's still data to write. + fn try_write(&mut self, stream: &mut W) -> io::Result { + while let Some(bytes_written) = stream.try_write_buf(self)? { + debug!("Writer: wrote {} bytes; {} remaining.", + bytes_written, + self.remaining()); + if bytes_written == 0 { + trace!("Writer: would block."); + return Ok(NextWriteAction::Continue); + } + if !self.has_remaining() { + return Ok(NextWriteAction::Stop); + } + } + Ok(NextWriteAction::Continue) + } +} + +impl BufExt for B {} + +#[derive(Debug)] +pub enum NextWriteState { + Nothing, + Next(Packet), +} + +impl NextWriteState { + pub fn next(state: &mut Option, + socket: &mut W, + outbound: &mut VecDeque) + -> io::Result> { + loop { + let update = match *state { + None => { + match outbound.pop_front() { + Some(packet) => { + let size = packet.buf.remaining() as u64; + debug_assert!(size >= mem::size_of::() as u64); + NextWriteState::Next(packet) + } + None => return Ok(Some(())), + } + } + Some(ref mut packet) => { + match packet.buf.try_write(socket)? { + NextWriteAction::Stop => NextWriteState::Nothing, + NextWriteAction::Continue => return Ok(None), + } + } + }; + match update { + NextWriteState::Next(next) => *state = Some(next), + NextWriteState::Nothing => { + *state = None; + } + } + } + } +} diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..5ba6afb --- /dev/null +++ b/src/server.rs @@ -0,0 +1,71 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the MIT License, . +// This file may not be copied, modified, or distributed except according to those terms. + +use errors::{SerializableError, WireError}; +use futures::{self, Future}; +use futures::stream::Empty; +use futures_cpupool::{CpuFuture, CpuPool}; +use protocol::{LOOP_HANDLE, TarpcTransport}; +use protocol::writer::Packet; +use serde::Serialize; +use std::io; +use std::net::ToSocketAddrs; +use tokio_proto::pipeline; +use tokio_proto::NewService; +use tokio_proto::server::{self, ServerHandle}; + +/// Start a Tarpc service listening on the given address. +pub fn listen(addr: A, new_service: T) -> io::Result + where T: NewService, + Resp = pipeline::Message>, + Error = io::Error> + Send + 'static, + A: ToSocketAddrs +{ + let mut addrs = addr.to_socket_addrs()?; + let addr = if let Some(a) = addrs.next() { + a + } else { + return Err(io::Error::new(io::ErrorKind::AddrNotAvailable, + "`ToSocketAddrs::to_socket_addrs` returned an empty iterator.")); + }; + + server::listen(LOOP_HANDLE.clone(), addr, move |stream| { + pipeline::Server::new(new_service.new_service()?, TarpcTransport::new(stream)) + }) + .wait() +} + +/// Returns a future containing the serialized reply. +/// +/// Because serialization can take a non-trivial +/// amount of cpu time, it is run on a thread pool. +#[doc(hidden)] +#[inline] +pub fn serialize_reply(result: Result>) + -> SerializeFuture +{ + POOL.spawn(futures::lazy(move || { + let packet = match Packet::serialize(&result) { + Ok(packet) => packet, + Err(e) => { + let err: Result> = + Err(WireError::ServerSerialize(e.to_string())); + Packet::serialize(&err).unwrap() + } + }; + futures::finished(pipeline::Message::WithoutBody(packet)) + })) +} + +#[doc(hidden)] +pub type SerializeFuture = CpuFuture; + +#[doc(hidden)] +pub type SerializedReply = pipeline::Message>; + +lazy_static! { + static ref POOL: CpuPool = { CpuPool::new_num_cpus() }; +} diff --git a/src/snake_to_camel/Cargo.toml b/src/snake_to_camel/Cargo.toml new file mode 100644 index 0000000..5ec4723 --- /dev/null +++ b/src/snake_to_camel/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "snake_to_camel" +version = "0.1.0" +authors = ["Tim Kuehn "] + +[dependencies] +itertools = "0.4" +aster = "0.26" + +[lib] +plugin = true diff --git a/src/snake_to_camel/src/lib.rs b/src/snake_to_camel/src/lib.rs new file mode 100644 index 0000000..fde2372 --- /dev/null +++ b/src/snake_to_camel/src/lib.rs @@ -0,0 +1,149 @@ +#![feature(plugin_registrar, rustc_private)] + +extern crate aster; +extern crate itertools; +extern crate rustc; +extern crate rustc_plugin; +extern crate syntax; + +use aster::ident::ToIdent; +use itertools::Itertools; +use syntax::ast::{self, Ident, TraitRef, Ty, TyKind}; +use syntax::parse::{self, token, PResult}; +use syntax::ptr::P; +use syntax::parse::parser::{Parser, PathStyle}; +use syntax::tokenstream::TokenTree; +use syntax::ext::base::{ExtCtxt, MacResult, DummyResult, MacEager}; +use syntax::ext::quote::rt::Span; +use syntax::util::small_vector::SmallVector; +use rustc_plugin::Registry; + +fn snake_to_camel(cx: &mut ExtCtxt, sp: Span, tts: &[TokenTree]) -> Box { + let mut parser = parse::new_parser_from_tts(cx.parse_sess(), cx.cfg(), tts.into()); + // The `expand_expr` method is called so that any macro calls in the + // parsed expression are expanded. + + let mut item = match parser.parse_trait_item() { + Ok(s) => s, + Err(mut diagnostic) => { + diagnostic.emit(); + return DummyResult::any(sp); + } + }; + + if let Err(mut diagnostic) = parser.expect(&token::Eof) { + diagnostic.emit(); + return DummyResult::any(sp); + } + + convert(&mut item.ident); + MacEager::trait_items(SmallVector::one(item)) +} + +fn impl_snake_to_camel(cx: &mut ExtCtxt, sp: Span, tts: &[TokenTree]) -> Box { + let mut parser = parse::new_parser_from_tts(cx.parse_sess(), cx.cfg(), tts.into()); + // The `expand_expr` method is called so that any macro calls in the + // parsed expression are expanded. + + let mut item = match parser.parse_impl_item() { + Ok(s) => s, + Err(mut diagnostic) => { + diagnostic.emit(); + return DummyResult::any(sp); + } + }; + + if let Err(mut diagnostic) = parser.expect(&token::Eof) { + diagnostic.emit(); + return DummyResult::any(sp); + } + + convert(&mut item.ident); + MacEager::impl_items(SmallVector::one(item)) +} + +fn ty_snake_to_camel(cx: &mut ExtCtxt, sp: Span, tts: &[TokenTree]) -> Box { + let mut parser = parse::new_parser_from_tts(cx.parse_sess(), cx.cfg(), tts.into()); + // The `expand_expr` method is called so that any macro calls in the + // parsed expression are expanded. + + let mut ty = match parser.parse_ty_path() { + Ok(s) => s, + Err(mut diagnostic) => { + diagnostic.emit(); + return DummyResult::any(sp); + } + }; + + if let Err(mut diagnostic) = parser.expect(&token::Eof) { + diagnostic.emit(); + return DummyResult::any(sp); + } + + // Find the first non-underscore and add it capitalized. + if let TyKind::Path(_, ref mut path) = ty { + for segment in &mut path.segments { + convert(&mut segment.identifier); + } + } else { + unreachable!() + } + MacEager::ty(P(Ty {id: ast::DUMMY_NODE_ID, node: ty, span: sp})) +} + +fn convert(ident: &mut Ident) { + let ident_str = ident.to_string(); + let mut camel_ty = String::new(); + + // Find the first non-underscore and add it capitalized. + let mut chars = ident_str.chars(); + + // Find the first non-underscore char, uppercase it, and append it. + // Guaranteed to succeed because all idents must have at least one non-underscore char. + camel_ty.extend(chars.find(|&c| c != '_').unwrap().to_uppercase()); + + // When we find an underscore, we remove it and capitalize the next char. To do this, + // we need to ensure the next char is not another underscore. + let mut chars = chars.coalesce(|c1, c2| { + if c1 == '_' && c2 == '_' { + Ok(c1) + } else { + Err((c1, c2)) + } + }); + + while let Some(c) = chars.next() { + if c != '_' { + camel_ty.push(c); + } else { + if let Some(c) = chars.next() { + camel_ty.extend(c.to_uppercase()); + } + } + } + + *ident = camel_ty.to_ident(); +} + +trait ParseTraitRef { + fn parse_trait_ref(&mut self) -> PResult; +} + +impl<'a> ParseTraitRef for Parser<'a> { + /// Parse a::B + fn parse_trait_ref(&mut self) -> PResult { + Ok(TraitRef { + path: try!(self.parse_path(PathStyle::Type)), + ref_id: ast::DUMMY_NODE_ID, + }) + } +} + +#[plugin_registrar] +#[doc(hidden)] +pub fn plugin_registrar(reg: &mut Registry) { + reg.register_macro("snake_to_camel", snake_to_camel); + reg.register_macro("impl_snake_to_camel", impl_snake_to_camel); + reg.register_macro("ty_snake_to_camel", ty_snake_to_camel); +} + diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..1066b5c --- /dev/null +++ b/src/util.rs @@ -0,0 +1,83 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the MIT License, . +// This file may not be copied, modified, or distributed except according to those terms. + +use futures; +use std::fmt; +use std::error::Error; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +/// A trait for easy spawning of futures. +/// +/// `Future`s don't actually perform their computations until spawned via an `Executor`. Typically, +/// an executor will be associated with an event loop. The `fn` provided by `Spawn` handles +/// spawning the future on the static event loop on which tarpc clients and servers run by default. +pub trait Spawn { + /// Spawns a future on the default event loop. + fn spawn(self); +} + +impl Spawn for F + where F: futures::Future + Send + 'static +{ + fn spawn(self) { + ::protocol::LOOP_HANDLE.spawn(move |_| self.then(|_| Ok::<(), ()>(()))) + } +} + +/// A bottom type that impls `Error`, `Serialize`, and `Deserialize`. It is impossible to +/// instantiate this type. +#[derive(Debug)] +pub struct Never(!); + +impl Error for Never { + fn description(&self) -> &str { + unreachable!() + } +} + +impl fmt::Display for Never { + fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result { + unreachable!() + } +} + +impl Serialize for Never { + fn serialize(&self, _: &mut S) -> Result<(), S::Error> + where S: Serializer + { + unreachable!() + } +} + +// Please don't try to deserialize this. :( +impl Deserialize for Never { + fn deserialize(_: &mut D) -> Result + where D: Deserializer + { + panic!("Never cannot be instantiated!"); + } +} + +/// A `String` that impls `std::error::Error`. Useful for quick-and-dirty error propagation. +#[derive(Debug, Serialize, Deserialize)] +pub struct Message(pub String); + +impl Error for Message { + fn description(&self) -> &str { + &self.0 + } +} + +impl fmt::Display for Message { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&self.0, f) + } +} + +impl> From for Message { + fn from(s: S) -> Self { + Message(s.into()) + } +} diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml deleted file mode 100644 index f8f8470..0000000 --- a/tarpc/Cargo.toml +++ /dev/null @@ -1,23 +0,0 @@ -[package] -name = "tarpc" -version = "0.6.0" -authors = ["Adam Wright ", "Tim Kuehn "] -license = "MIT" -documentation = "https://google.github.io/tarpc" -homepage = "https://github.com/google/tarpc" -repository = "https://github.com/google/tarpc" -keywords = ["rpc", "protocol", "remote", "procedure", "serialize"] -readme = "../README.md" -description = "An RPC framework for Rust with a focus on ease of use." - -[dependencies] -bincode = "0.6" -log = "0.3" -scoped-pool = "1.0" -serde = "0.8" -unix_socket = "0.5" - -[dev-dependencies] -lazy_static = "0.2" -env_logger = "0.3" -tempdir = "0.3" diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs deleted file mode 100644 index dafa622..0000000 --- a/tarpc/src/lib.rs +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2016 Google Inc. All Rights Reserved. -// -// Licensed under the MIT License, . -// This file may not be copied, modified, or distributed except according to those terms. - -//! An RPC library for Rust. -//! -//! Example usage: -//! -//! ``` -//! #[macro_use] extern crate tarpc; -//! mod my_server { -//! service! { -//! rpc hello(name: String) -> String; -//! rpc add(x: i32, y: i32) -> i32; -//! } -//! } -//! -//! use self::my_server::*; -//! use std::time::Duration; -//! -//! struct Server; -//! impl my_server::Service for Server { -//! fn hello(&self, s: String) -> String { -//! format!("Hello, {}!", s) -//! } -//! fn add(&self, x: i32, y: i32) -> i32 { -//! x + y -//! } -//! } -//! -//! fn main() { -//! let serve_handle = Server.spawn("localhost:0").unwrap(); -//! let client = Client::new(serve_handle.dialer()).unwrap(); -//! assert_eq!(3, client.add(1, 2).unwrap()); -//! assert_eq!("Hello, Mom!".to_string(), -//! client.hello("Mom".to_string()).unwrap()); -//! drop(client); -//! serve_handle.shutdown(); -//! } -//! ``` - -#![deny(missing_docs)] - -extern crate serde; -extern crate bincode; -#[macro_use] -extern crate log; -extern crate scoped_pool; -extern crate unix_socket; - -macro_rules! pos { - () => (concat!(file!(), ":", line!())) -} - -/// Provides the tarpc client and server, which implements the tarpc protocol. -/// The protocol is defined by the implementation. -pub mod protocol; - -/// Provides the macro used for constructing rpc services and client stubs. -pub mod macros; - -/// Provides transport traits and implementations. -pub mod transport; - -pub use protocol::{Config, Error, Result, ServeHandle}; diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs deleted file mode 100644 index f1daa9b..0000000 --- a/tarpc/src/macros.rs +++ /dev/null @@ -1,632 +0,0 @@ -// Copyright 2016 Google Inc. All Rights Reserved. -// -// Licensed under the MIT License, . -// This file may not be copied, modified, or distributed except according to those terms. - -/// Serde re-exports required by macros. Not for general use. -pub mod serde { - pub use serde::{Deserialize, Deserializer, Serialize, Serializer}; - /// Deserialization re-exports required by macros. Not for general use. - pub mod de { - pub use serde::de::{EnumVisitor, Error, VariantVisitor, Visitor}; - } -} - -// Required because if-let can't be used with irrefutable patterns, so it needs -// to be special cased. -#[doc(hidden)] -#[macro_export] -macro_rules! client_methods { - ( - { $(#[$attr:meta])* } - $fn_name:ident( ($($arg:ident,)*) : ($($in_:ty,)*) ) -> $out:ty - ) => ( - #[allow(unused)] - $(#[$attr])* - pub fn $fn_name(&self, $($arg: $in_),*) -> $crate::Result<$out> { - let reply = try!((self.0).rpc(__Request::$fn_name(($($arg,)*)))); - let __Reply::$fn_name(reply) = reply; - ::std::result::Result::Ok(reply) - } - ); - ($( - { $(#[$attr:meta])* } - $fn_name:ident( ($( $arg:ident,)*) : ($($in_:ty, )*) ) -> $out:ty - )*) => ( $( - #[allow(unused)] - $(#[$attr])* - pub fn $fn_name(&self, $($arg: $in_),*) -> $crate::Result<$out> { - let reply = try!((self.0).rpc(__Request::$fn_name(($($arg,)*)))); - if let __Reply::$fn_name(reply) = reply { - ::std::result::Result::Ok(reply) - } else { - panic!("Incorrect reply variant returned from rpc; expected `{}`, \ - but got {:?}", - stringify!($fn_name), - reply); - } - } - )*); -} - -// Required because if-let can't be used with irrefutable patterns, so it needs -// to be special cased. -#[doc(hidden)] -#[macro_export] -macro_rules! async_client_methods { - ( - { $(#[$attr:meta])* } - $fn_name:ident( ($( $arg:ident, )*) : ($( $in_:ty, )*) ) -> $out:ty - ) => ( - #[allow(unused)] - $(#[$attr])* - pub fn $fn_name(&self, $($arg: $in_),*) -> Future<$out> { - fn mapper(reply: __Reply) -> $out { - let __Reply::$fn_name(reply) = reply; - reply - } - let reply = (self.0).rpc_async(__Request::$fn_name(($($arg,)*))); - Future { - future: reply, - mapper: mapper, - } - } - ); - ($( - { $(#[$attr:meta])* } - $fn_name:ident( ($( $arg:ident, )*) : ($( $in_:ty, )*) ) -> $out:ty - )*) => ( $( - #[allow(unused)] - $(#[$attr])* - pub fn $fn_name(&self, $($arg: $in_),*) -> Future<$out> { - fn mapper(reply: __Reply) -> $out { - if let __Reply::$fn_name(reply) = reply { - reply - } else { - panic!("Incorrect reply variant returned from rpc; expected `{}`, but got \ - {:?}", - stringify!($fn_name), - reply); - } - } - let reply = (self.0).rpc_async(__Request::$fn_name(($($arg,)*))); - Future { - future: reply, - mapper: mapper, - } - } - )*); -} - -#[doc(hidden)] -#[macro_export] -macro_rules! impl_serialize { - ($impler:ident, $(@($name:ident $n:expr))* -- #($_n:expr) ) => ( - impl $crate::macros::serde::Serialize for $impler { - #[inline] - fn serialize(&self, serializer: &mut S) -> ::std::result::Result<(), S::Error> - where S: $crate::macros::serde::Serializer - { - match *self { - $( - $impler::$name(ref field) => - $crate::macros::serde::Serializer::serialize_newtype_variant( - serializer, - stringify!($impler), - $n, - stringify!($name), - field, - ) - ),* - } - } - } - ); - // All args are wrapped in a tuple so we can use the newtype variant for each one. - ($impler:ident, $(@$finished:tt)* -- #($n:expr) $name:ident($field:ty) $($req:tt)*) => ( - impl_serialize!($impler, $(@$finished)* @($name $n) -- #($n + 1) $($req)*); - ); - // Entry - ($impler:ident, $($started:tt)*) => (impl_serialize!($impler, -- #(0) $($started)*);); -} - -#[doc(hidden)] -#[macro_export] -macro_rules! impl_deserialize { - ($impler:ident, $(@($name:ident $n:expr))* -- #($_n:expr) ) => ( - impl $crate::macros::serde::Deserialize for $impler { - #[inline] - fn deserialize(deserializer: &mut D) - -> ::std::result::Result<$impler, D::Error> - where D: $crate::macros::serde::Deserializer - { - #[allow(non_camel_case_types, unused)] - enum __Field { - $($name),* - } - impl $crate::macros::serde::Deserialize for __Field { - #[inline] - fn deserialize(deserializer: &mut D) - -> ::std::result::Result<__Field, D::Error> - where D: $crate::macros::serde::Deserializer - { - struct __FieldVisitor; - impl $crate::macros::serde::de::Visitor for __FieldVisitor { - type Value = __Field; - - #[inline] - fn visit_usize(&mut self, value: usize) - -> ::std::result::Result<__Field, E> - where E: $crate::macros::serde::de::Error, - { - $( - if value == $n { - return ::std::result::Result::Ok(__Field::$name); - } - )* - return ::std::result::Result::Err( - $crate::macros::serde::de::Error::custom( - format!("No variants have a value of {}!", value)) - ); - } - } - deserializer.deserialize_struct_field(__FieldVisitor) - } - } - - struct __Visitor; - impl $crate::macros::serde::de::EnumVisitor for __Visitor { - type Value = $impler; - - #[inline] - fn visit<__V>(&mut self, mut visitor: __V) - -> ::std::result::Result<$impler, __V::Error> - where __V: $crate::macros::serde::de::VariantVisitor - { - match try!(visitor.visit_variant()) { - $( - __Field::$name => { - let val = try!(visitor.visit_newtype()); - Ok($impler::$name(val)) - } - ),* - } - } - } - const VARIANTS: &'static [&'static str] = &[ - $( - stringify!($name) - ),* - ]; - deserializer.deserialize_enum(stringify!($impler), VARIANTS, __Visitor) - } - } - ); - // All args are wrapped in a tuple so we can use the newtype variant for each one. - ($impler:ident, $(@$finished:tt)* -- #($n:expr) $name:ident($field:ty) $($req:tt)*) => ( - impl_deserialize!($impler, $(@$finished)* @($name $n) -- #($n + 1) $($req)*); - ); - // Entry - ($impler:ident, $($started:tt)*) => (impl_deserialize!($impler, -- #(0) $($started)*);); -} - -/// The main macro that creates RPC services. -/// -/// Rpc methods are specified, mirroring trait syntax: -/// -/// ``` -/// # #[macro_use] extern crate tarpc; -/// # fn main() {} -/// # service! { -/// #[doc="Say hello"] -/// rpc hello(name: String) -> String; -/// # } -/// ``` -/// -/// There are two rpc names reserved for the default fns `spawn` and `spawn_with_config`. -/// -/// Attributes can be attached to each rpc. These attributes -/// will then be attached to the generated `Service` trait's -/// corresponding method, as well as to the `Client` stub's rpcs methods. -/// -/// The following items are expanded in the enclosing module: -/// -/// * `Service` -- the trait defining the RPC service. It comes with two default methods for -/// starting the server: -/// 1. `spawn` starts the service in another thread using default configuration. -/// 2. `spawn_with_config` starts the service in another thread using the specified -/// `Config`. -/// * `Client` -- a client that makes synchronous requests to the RPC server -/// * `AsyncClient` -- a client that makes asynchronous requests to the RPC server -/// * `Future` -- a handle for asynchronously retrieving the result of an RPC -/// -/// **Warning**: In addition to the above items, there are a few expanded items that -/// are considered implementation details. As with the above items, shadowing -/// these item names in the enclosing module is likely to break things in confusing -/// ways: -/// -/// * `__Server` -- an implementation detail -/// * `__Request` -- an implementation detail -/// * `__Reply` -- an implementation detail -#[macro_export] -macro_rules! service { -// Entry point - ( - $( - $(#[$attr:meta])* - rpc $fn_name:ident( $( $arg:ident : $in_:ty ),* ) $(-> $out:ty)*; - )* - ) => { - service! {{ - $( - $(#[$attr])* - rpc $fn_name( $( $arg : $in_ ),* ) $(-> $out)*; - )* - }} - }; -// Pattern for when the next rpc has an implicit unit return type - ( - { - $(#[$attr:meta])* - rpc $fn_name:ident( $( $arg:ident : $in_:ty ),* ); - - $( $unexpanded:tt )* - } - $( $expanded:tt )* - ) => { - service! { - { $( $unexpanded )* } - - $( $expanded )* - - $(#[$attr])* - rpc $fn_name( $( $arg : $in_ ),* ) -> (); - } - }; -// Pattern for when the next rpc has an explicit return type - ( - { - $(#[$attr:meta])* - rpc $fn_name:ident( $( $arg:ident : $in_:ty ),* ) -> $out:ty; - - $( $unexpanded:tt )* - } - $( $expanded:tt )* - ) => { - service! { - { $( $unexpanded )* } - - $( $expanded )* - - $(#[$attr])* - rpc $fn_name( $( $arg : $in_ ),* ) -> $out; - } - }; -// Pattern for when all return types have been expanded - ( - { } // none left to expand - $( - $(#[$attr:meta])* - rpc $fn_name:ident ( $( $arg:ident : $in_:ty ),* ) -> $out:ty; - )* - ) => { - #[doc="Defines the RPC service"] - pub trait Service: Send + Sync + Sized { - $( - $(#[$attr])* - fn $fn_name(&self, $($arg:$in_),*) -> $out; - )* - - #[doc="Spawn a running service."] - fn spawn(self, - transport: T) - -> $crate::Result< - $crate::protocol::ServeHandle< - ::Dialer>> - where T: $crate::transport::Transport, - Self: 'static, - { - self.spawn_with_config(transport, $crate::Config::default()) - } - - #[doc="Spawn a running service."] - fn spawn_with_config(self, - transport: T, - config: $crate::Config) - -> $crate::Result< - $crate::protocol::ServeHandle< - ::Dialer>> - where T: $crate::transport::Transport, - Self: 'static, - { - let server = __Server(self); - let result = $crate::protocol::Serve::spawn_with_config(server, transport, config); - let handle = try!(result); - ::std::result::Result::Ok(handle) - } - } - - impl Service for P - where P: Send + Sync + Sized + 'static + ::std::ops::Deref, - S: Service - { - $( - $(#[$attr])* - fn $fn_name(&self, $($arg:$in_),*) -> $out { - Service::$fn_name(&**self, $($arg),*) - } - )* - } - - #[allow(non_camel_case_types, unused)] - #[derive(Debug)] - enum __Request { - $( - $fn_name(( $($in_,)* )) - ),* - } - - impl_serialize!(__Request, $($fn_name(($($in_),*)))*); - impl_deserialize!(__Request, $($fn_name(($($in_),*)))*); - - #[allow(non_camel_case_types, unused)] - #[derive(Debug)] - enum __Reply { - $( - $fn_name($out), - )* - } - - impl_serialize!(__Reply, $($fn_name($out))*); - impl_deserialize!(__Reply, $($fn_name($out))*); - - #[allow(unused)] - #[doc="An asynchronous RPC call"] - pub struct Future { - future: $crate::protocol::Future<__Reply>, - mapper: fn(__Reply) -> T, - } - - impl Future { - #[allow(unused)] - #[doc="Block until the result of the RPC call is available"] - pub fn get(self) -> $crate::Result { - self.future.get().map(self.mapper) - } - } - - #[allow(unused)] - #[doc="The client stub that makes RPC calls to the server."] - pub struct Client( - $crate::protocol::Client<__Request, __Reply, S> - ) where S: $crate::transport::Stream; - - impl Client - where S: $crate::transport::Stream - { - #[allow(unused)] - #[doc="Create a new client with default configuration that connects to the given \ - address."] - pub fn new(dialer: D) -> $crate::Result - where D: $crate::transport::Dialer, - { - Self::with_config(dialer, $crate::Config::default()) - } - - #[allow(unused)] - #[doc="Create a new client with the specified configuration that connects to the \ - given address."] - pub fn with_config(dialer: D, config: $crate::Config) -> $crate::Result - where D: $crate::transport::Dialer, - { - let inner = try!($crate::protocol::Client::with_config(dialer, config)); - ::std::result::Result::Ok(Client(inner)) - } - - client_methods!( - $( - { $(#[$attr])* } - $fn_name(($($arg,)*) : ($($in_,)*)) -> $out - )* - ); - - #[allow(unused)] - #[doc="Attempt to clone the client object. This might fail if the underlying TcpStream \ - clone fails."] - pub fn try_clone(&self) -> ::std::io::Result { - ::std::result::Result::Ok(Client(try!(self.0.try_clone()))) - } - } - - #[allow(unused)] - #[doc="The client stub that makes asynchronous RPC calls to the server."] - pub struct AsyncClient( - $crate::protocol::Client<__Request, __Reply, S> - ) where S: $crate::transport::Stream; - - impl AsyncClient - where S: $crate::transport::Stream { - #[allow(unused)] - #[doc="Create a new asynchronous client with default configuration that connects to \ - the given address."] - pub fn new(dialer: D) -> $crate::Result - where D: $crate::transport::Dialer, - { - Self::with_config(dialer, $crate::Config::default()) - } - - #[allow(unused)] - #[doc="Create a new asynchronous client that connects to the given address."] - pub fn with_config(dialer: D, config: $crate::Config) -> $crate::Result - where D: $crate::transport::Dialer, - { - let inner = try!($crate::protocol::Client::with_config(dialer, config)); - ::std::result::Result::Ok(AsyncClient(inner)) - } - - async_client_methods!( - $( - { $(#[$attr])* } - $fn_name(($($arg,)*): ($($in_,)*)) -> $out - )* - ); - - #[allow(unused)] - #[doc="Attempt to clone the client object. This might fail if the underlying TcpStream \ - clone fails."] - pub fn try_clone(&self) -> ::std::io::Result { - ::std::result::Result::Ok(AsyncClient(try!(self.0.try_clone()))) - } - } - - #[allow(unused)] - struct __Server(S) - where S: 'static + Service; - - impl $crate::protocol::Serve for __Server - where S: 'static + Service - { - type Request = __Request; - type Reply = __Reply; - fn serve(&self, request: __Request) -> __Reply { - match request { - $( - __Request::$fn_name(( $($arg,)* )) => - __Reply::$fn_name((self.0).$fn_name($($arg),*)), - )* - } - } - } - } -} - -#[allow(dead_code)] -// because we're just testing that the macro expansion compiles -#[cfg(test)] -mod syntax_test { - // Tests a service definition with a fn that takes no args - mod qux { - service! { - rpc hello() -> String; - } - } - // Tests a service definition with an attribute. - mod bar { - service! { - #[doc="Hello bob"] - rpc baz(s: String) -> String; - } - } - - // Tests a service with implicit return types. - mod no_return { - service! { - rpc ack(); - rpc apply(foo: String) -> i32; - rpc bi_consume(bar: String, baz: u64); - rpc bi_fn(bar: String, baz: u64) -> String; - } - } -} - -#[cfg(test)] -mod functional_test { - extern crate env_logger; - extern crate tempdir; - use transport::unix::UnixTransport; - - service! { - rpc add(x: i32, y: i32) -> i32; - rpc hey(name: String) -> String; - } - - struct Server; - - impl Service for Server { - fn add(&self, x: i32, y: i32) -> i32 { - x + y - } - fn hey(&self, name: String) -> String { - format!("Hey, {}.", name) - } - } - - #[test] - fn simple() { - let _ = env_logger::init(); - let handle = Server.spawn("localhost:0").unwrap(); - let client = Client::new(handle.dialer()).unwrap(); - assert_eq!(3, client.add(1, 2).unwrap()); - assert_eq!("Hey, Tim.", client.hey("Tim".into()).unwrap()); - drop(client); - handle.shutdown(); - } - - #[test] - fn simple_async() { - let _ = env_logger::init(); - let handle = Server.spawn("localhost:0").unwrap(); - let client = AsyncClient::new(handle.dialer()).unwrap(); - assert_eq!(3, client.add(1, 2).get().unwrap()); - assert_eq!("Hey, Adam.", client.hey("Adam".into()).get().unwrap()); - drop(client); - handle.shutdown(); - } - - #[test] - fn try_clone() { - let handle = Server.spawn("localhost:0").unwrap(); - let client1 = Client::new(handle.dialer()).unwrap(); - let client2 = client1.try_clone().unwrap(); - assert_eq!(3, client1.add(1, 2).unwrap()); - assert_eq!(3, client2.add(1, 2).unwrap()); - } - - #[test] - fn async_try_clone() { - let handle = Server.spawn("localhost:0").unwrap(); - let client1 = AsyncClient::new(handle.dialer()).unwrap(); - let client2 = client1.try_clone().unwrap(); - assert_eq!(3, client1.add(1, 2).get().unwrap()); - assert_eq!(3, client2.add(1, 2).get().unwrap()); - } - - #[test] - fn async_try_clone_unix() { - let temp_dir = tempdir::TempDir::new("tarpc").unwrap(); - let temp_file = temp_dir.path() - .join("async_try_clone_unix.tmp"); - let handle = Server.spawn(UnixTransport(temp_file)).unwrap(); - let client1 = AsyncClient::new(handle.dialer()).unwrap(); - let client2 = client1.try_clone().unwrap(); - assert_eq!(3, client1.add(1, 2).get().unwrap()); - assert_eq!(3, client2.add(1, 2).get().unwrap()); - } - - // Tests that a server can be wrapped in an Arc; no need to run, just compile - #[allow(dead_code)] - fn serve_arc_server() { - let _ = ::std::sync::Arc::new(Server).spawn("localhost:0"); - } - - // Tests that a tcp client can be created from &str - #[allow(dead_code)] - fn test_client_str() { - let _ = Client::new("localhost:0"); - } - - #[test] - fn serde() { - use bincode; - let _ = env_logger::init(); - - let request = __Request::add((1, 2)); - let ser = bincode::serde::serialize(&request, bincode::SizeLimit::Infinite).unwrap(); - let de = bincode::serde::deserialize(&ser).unwrap(); - if let __Request::add((1, 2)) = de { - // success - } else { - panic!("Expected __Request::add, got {:?}", de); - } - } -} diff --git a/tarpc/src/protocol/client.rs b/tarpc/src/protocol/client.rs deleted file mode 100644 index b61e076..0000000 --- a/tarpc/src/protocol/client.rs +++ /dev/null @@ -1,269 +0,0 @@ -// Copyright 2016 Google Inc. All Rights Reserved. -// -// Licensed under the MIT License, . -// This file may not be copied, modified, or distributed except according to those terms. - -use serde; -use std::fmt; -use std::io::{self, BufReader, BufWriter}; -use std::collections::HashMap; -use std::mem; -use std::sync::{Arc, Mutex}; -use std::sync::mpsc::{Receiver, Sender, channel}; -use std::thread; - -use super::{Config, Deserialize, Error, Packet, Result, Serialize}; -use transport::{Dialer, Stream}; - -/// A client stub that connects to a server to run rpcs. -pub struct Client - where Request: serde::ser::Serialize, - S: Stream -{ - // The guard is in an option so it can be joined in the drop fn - reader_guard: Arc>>, - outbound: Sender<(Request, Sender>)>, - requests: Arc>>, - shutdown: S, -} - -impl Client - where Request: serde::ser::Serialize + Send + 'static, - Reply: serde::de::Deserialize + Send + 'static, - S: Stream -{ - /// Create a new client that connects to `addr`. The client uses the given timeout - /// for both reads and writes. - pub fn new(dialer: D) -> io::Result - where D: Dialer - { - Self::with_config(dialer, Config::default()) - } - - /// Create a new client that connects to `addr`. The client uses the given timeout - /// for both reads and writes. - pub fn with_config(dialer: D, config: Config) -> io::Result - where D: Dialer - { - let stream = try!(dialer.dial()); - try!(stream.set_read_timeout(config.timeout)); - try!(stream.set_write_timeout(config.timeout)); - let reader_stream = try!(stream.try_clone()); - let writer_stream = try!(stream.try_clone()); - let requests = Arc::new(Mutex::new(RpcFutures::new())); - let reader_requests = requests.clone(); - let writer_requests = requests.clone(); - let (tx, rx) = channel(); - let reader_guard = thread::spawn(move || read(reader_requests, reader_stream)); - thread::spawn(move || write(rx, writer_requests, writer_stream)); - Ok(Client { - reader_guard: Arc::new(Some(reader_guard)), - outbound: tx, - requests: requests, - shutdown: stream, - }) - } - - /// Clones the Client so that it can be shared across threads. - pub fn try_clone(&self) -> io::Result { - Ok(Client { - reader_guard: self.reader_guard.clone(), - outbound: self.outbound.clone(), - requests: self.requests.clone(), - shutdown: try!(self.shutdown.try_clone()), - }) - } - - fn rpc_internal(&self, request: Request) -> Receiver> - where Request: serde::ser::Serialize + fmt::Debug + Send + 'static - { - let (tx, rx) = channel(); - self.outbound.send((request, tx)).expect(pos!()); - rx - } - - /// Run the specified rpc method on the server this client is connected to - pub fn rpc(&self, request: Request) -> Result - where Request: serde::ser::Serialize + fmt::Debug + Send + 'static - { - self.rpc_internal(request) - .recv() - .map_err(|_| self.requests.lock().expect(pos!()).get_error()) - .and_then(|reply| reply) - } - - /// Asynchronously run the specified rpc method on the server this client is connected to - pub fn rpc_async(&self, request: Request) -> Future - where Request: serde::ser::Serialize + fmt::Debug + Send + 'static - { - Future { - rx: self.rpc_internal(request), - requests: self.requests.clone(), - } - } -} - -impl Drop for Client - where Request: serde::ser::Serialize, - S: Stream -{ - fn drop(&mut self) { - debug!("Dropping Client."); - if let Some(reader_guard) = Arc::get_mut(&mut self.reader_guard) { - debug!("Attempting to shut down writer and reader threads."); - if let Err(e) = self.shutdown.shutdown() { - warn!("Client: couldn't shutdown writer and reader threads: {:?}", - e); - } else { - // We only join if we know the TcpStream was shut down. Otherwise we might never - // finish. - debug!("Joining writer and reader."); - reader_guard.take() - .expect(pos!()) - .join() - .expect(pos!()); - debug!("Successfully joined writer and reader."); - } - } - } -} - -/// An asynchronous RPC call -pub struct Future { - rx: Receiver>, - requests: Arc>>, -} - -impl Future { - /// Block until the result of the RPC call is available - pub fn get(self) -> Result { - let requests = self.requests; - self.rx - .recv() - .map_err(|_| requests.lock().expect(pos!()).get_error()) - .and_then(|reply| reply) - } -} - -struct RpcFutures(Result>>>); - -impl RpcFutures { - fn new() -> RpcFutures { - RpcFutures(Ok(HashMap::new())) - } - - fn insert_tx(&mut self, id: u64, tx: Sender>) -> Result<()> { - match self.0 { - Ok(ref mut requests) => { - requests.insert(id, tx); - Ok(()) - } - Err(ref e) => Err(e.clone()), - } - } - - fn remove_tx(&mut self, id: u64) -> Result<()> { - match self.0 { - Ok(ref mut requests) => { - requests.remove(&id); - Ok(()) - } - Err(ref e) => Err(e.clone()), - } - } - - fn complete_reply(&mut self, packet: Packet) { - if let Some(tx) = self.0.as_mut().expect(pos!()).remove(&packet.rpc_id) { - if let Err(e) = tx.send(Ok(packet.message)) { - info!("Reader: could not complete reply: {:?}", e); - } - } else { - warn!("RpcFutures: expected sender for id {} but got None!", - packet.rpc_id); - } - } - - fn set_error(&mut self, err: Error) { - let _ = mem::replace(&mut self.0, Err(err)); - } - - fn get_error(&self) -> Error { - self.0.as_ref().err().expect(pos!()).clone() - } -} - -fn write(outbound: Receiver<(Request, Sender>)>, - requests: Arc>>, - stream: S) - where Request: serde::Serialize, - Reply: serde::Deserialize, - S: Stream -{ - let mut next_id = 0; - let mut stream = BufWriter::new(stream); - loop { - let (request, tx) = match outbound.recv() { - Err(e) => { - debug!("Writer: all senders have exited ({:?}). Returning.", e); - return; - } - Ok(request) => request, - }; - if let Err(e) = requests.lock().expect(pos!()).insert_tx(next_id, tx.clone()) { - report_error(&tx, e); - // Once insert_tx returns Err, it will continue to do so. However, continue here so - // that any other clients who sent requests will also recv the Err. - continue; - } - let id = next_id; - next_id += 1; - let packet = Packet { - rpc_id: id, - message: request, - }; - debug!("Writer: writing rpc, id={:?}", id); - if let Err(e) = stream.serialize(&packet) { - report_error(&tx, e.into()); - // Typically we'd want to notify the client of any Err returned by remove_tx, but in - // this case the client already hit an Err, and doesn't need to know about this one, as - // well. - let _ = requests.lock().expect(pos!()).remove_tx(id); - continue; - } - } - - fn report_error(tx: &Sender>, e: Error) - where Reply: serde::Deserialize - { - // Clone the err so we can log it if sending fails - if let Err(e2) = tx.send(Err(e.clone())) { - debug!("Error encountered while trying to send an error. Initial error: {:?}; Send \ - error: {:?}", - e, - e2); - } - } - -} - -fn read(requests: Arc>>, stream: S) - where Reply: serde::Deserialize, - S: Stream -{ - let mut stream = BufReader::new(stream); - loop { - match stream.deserialize::>() { - Ok(packet) => { - debug!("Client: received message, id={}", packet.rpc_id); - requests.lock().expect(pos!()).complete_reply(packet); - } - Err(err) => { - warn!("Client: reader thread encountered an unexpected error while parsing; \ - returning now. Error: {:?}", - err); - requests.lock().expect(pos!()).set_error(err); - break; - } - } - } -} diff --git a/tarpc/src/protocol/mod.rs b/tarpc/src/protocol/mod.rs deleted file mode 100644 index 5cefea1..0000000 --- a/tarpc/src/protocol/mod.rs +++ /dev/null @@ -1,249 +0,0 @@ -// Copyright 2016 Google Inc. All Rights Reserved. -// -// Licensed under the MIT License, . -// This file may not be copied, modified, or distributed except according to those terms. - -use bincode::{self, SizeLimit}; -use bincode::serde::{deserialize_from, serialize_into}; -use serde; -use serde::de::value::Error::EndOfStream; -use std::io::{self, Read, Write}; -use std::convert; -use std::sync::Arc; -use std::time::Duration; - -mod client; -mod server; -mod packet; - -pub use self::packet::Packet; -pub use self::client::{Client, Future}; -pub use self::server::{Serve, ServeHandle}; - -/// Client errors that can occur during rpc calls -#[derive(Debug, Clone)] -pub enum Error { - /// An IO-related error - Io(Arc), - /// The server hung up. - ConnectionBroken, -} - -impl convert::From for Error { - fn from(err: bincode::serde::SerializeError) -> Error { - match err { - bincode::serde::SerializeError::IoError(err) => Error::Io(Arc::new(err)), - err => panic!("Unexpected error during serialization: {:?}", err), - } - } -} - -impl convert::From for Error { - fn from(err: bincode::serde::DeserializeError) -> Error { - match err { - bincode::serde::DeserializeError::Serde(EndOfStream) => Error::ConnectionBroken, - bincode::serde::DeserializeError::IoError(err) => { - match err.kind() { - io::ErrorKind::ConnectionReset | - io::ErrorKind::UnexpectedEof => Error::ConnectionBroken, - _ => Error::Io(Arc::new(err)), - } - } - err => panic!("Unexpected error during deserialization: {:?}", err), - } - } -} - -impl convert::From for Error { - fn from(err: io::Error) -> Error { - Error::Io(Arc::new(err)) - } -} - -/// Configuration for client and server. -#[derive(Debug, Default)] -pub struct Config { - /// Request/Response timeout between packet delivery. - pub timeout: Option, -} - -/// Return type of rpc calls: either the successful return value, or a client error. -pub type Result = ::std::result::Result; - -trait Deserialize: Read + Sized { - fn deserialize(&mut self) -> Result { - deserialize_from(self, SizeLimit::Infinite).map_err(Error::from) - } -} - -impl Deserialize for R {} - -trait Serialize: Write + Sized { - fn serialize(&mut self, value: &T) -> Result<()> { - try!(serialize_into(self, value, SizeLimit::Infinite)); - try!(self.flush()); - Ok(()) - } -} - -impl Serialize for W {} - -#[cfg(test)] -mod test { - extern crate env_logger; - use super::{Client, Config, Serve}; - use scoped_pool::Pool; - use std::net::TcpStream; - use std::sync::{Arc, Barrier, Mutex}; - use std::thread; - use std::time::Duration; - - fn test_timeout() -> Option { - Some(Duration::from_secs(1)) - } - - struct Server { - counter: Mutex, - } - - impl Serve for Server { - type Request = (); - type Reply = u64; - - fn serve(&self, _: ()) -> u64 { - let mut counter = self.counter.lock().unwrap(); - let reply = *counter; - *counter += 1; - reply - } - } - - impl Server { - fn new() -> Server { - Server { counter: Mutex::new(0) } - } - - fn count(&self) -> u64 { - *self.counter.lock().unwrap() - } - } - - #[test] - fn handle() { - let _ = env_logger::init(); - let server = Arc::new(Server::new()); - let serve_handle = server.spawn("localhost:0").unwrap(); - let client: Client<(), u64, TcpStream> = Client::new(serve_handle.dialer()).unwrap(); - drop(client); - serve_handle.shutdown(); - } - - #[test] - fn simple() { - let _ = env_logger::init(); - let server = Arc::new(Server::new()); - let serve_handle = server.clone().spawn("localhost:0").unwrap(); - // The explicit type is required so that it doesn't deserialize a u32 instead of u64 - let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap(); - assert_eq!(0, client.rpc(()).unwrap()); - assert_eq!(1, server.count()); - assert_eq!(1, client.rpc(()).unwrap()); - assert_eq!(2, server.count()); - drop(client); - serve_handle.shutdown(); - } - - struct BarrierServer { - barrier: Barrier, - inner: Server, - } - - impl Serve for BarrierServer { - type Request = (); - type Reply = u64; - fn serve(&self, request: ()) -> u64 { - self.barrier.wait(); - self.inner.serve(request) - } - } - - impl BarrierServer { - fn new(n: usize) -> BarrierServer { - BarrierServer { - barrier: Barrier::new(n), - inner: Server::new(), - } - } - - fn count(&self) -> u64 { - self.inner.count() - } - } - - #[test] - fn force_shutdown() { - let _ = env_logger::init(); - let server = Arc::new(Server::new()); - let serve_handle = server.spawn_with_config("localhost:0", - Config { timeout: Some(Duration::new(0, 10)) }) - .unwrap(); - let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap(); - let thread = thread::spawn(move || serve_handle.shutdown()); - info!("force_shutdown:: rpc1: {:?}", client.rpc(())); - thread.join().unwrap(); - } - - #[test] - fn client_failed_rpc() { - let _ = env_logger::init(); - let server = Arc::new(Server::new()); - let serve_handle = - server.spawn_with_config("localhost:0", Config { timeout: test_timeout() }) - .unwrap(); - let client: Arc> = Arc::new(Client::new(serve_handle.dialer()).unwrap()); - client.rpc(()).unwrap(); - serve_handle.shutdown(); - match client.rpc(()) { - Err(super::Error::ConnectionBroken) => {} // success - otherwise => panic!("Expected Err(ConnectionBroken), got {:?}", otherwise), - } - let _ = client.rpc(()); // Test whether second failure hangs - } - - #[test] - fn concurrent() { - let _ = env_logger::init(); - let concurrency = 10; - let pool = Pool::new(concurrency); - let server = Arc::new(BarrierServer::new(concurrency)); - let serve_handle = server.clone().spawn("localhost:0").unwrap(); - let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap(); - pool.scoped(|scope| { - for _ in 0..concurrency { - let client = client.try_clone().unwrap(); - scope.execute(move || { - client.rpc(()).unwrap(); - }); - } - }); - assert_eq!(concurrency as u64, server.count()); - drop(client); - serve_handle.shutdown(); - } - - #[test] - fn async() { - let _ = env_logger::init(); - let server = Arc::new(Server::new()); - let serve_handle = server.spawn("localhost:0").unwrap(); - let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap(); - - // Drop future immediately; does the reader channel panic when sending? - client.rpc_async(()); - // If the reader panicked, this won't succeed - client.rpc_async(()); - - drop(client); - serve_handle.shutdown(); - } -} diff --git a/tarpc/src/protocol/packet.rs b/tarpc/src/protocol/packet.rs deleted file mode 100644 index ccffdea..0000000 --- a/tarpc/src/protocol/packet.rs +++ /dev/null @@ -1,78 +0,0 @@ -use serde::{Deserialize, Deserializer, Serialize, Serializer, de}; -use std::marker::PhantomData; - -/// Packet shared between client and server. -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct Packet { - /// Packet id to map response to request. - pub rpc_id: u64, - /// Packet payload. - pub message: T, -} - -const PACKET: &'static str = "Packet"; -const RPC_ID: &'static str = "rpc_id"; -const MESSAGE: &'static str = "message"; - -impl Serialize for Packet { - #[inline] - fn serialize(&self, serializer: &mut S) -> Result<(), S::Error> - where S: Serializer - { - let mut state = try!(serializer.serialize_struct(PACKET, 2)); - try!(serializer.serialize_struct_elt(&mut state, RPC_ID, &self.rpc_id)); - try!(serializer.serialize_struct_elt(&mut state, MESSAGE, &self.message)); - serializer.serialize_struct_end(state) - } -} - -impl Deserialize for Packet { - #[inline] - fn deserialize(deserializer: &mut D) -> Result - where D: Deserializer - { - const FIELDS: &'static [&'static str] = &[RPC_ID, MESSAGE]; - deserializer.deserialize_struct(PACKET, FIELDS, Visitor(PhantomData)) - } -} - -struct Visitor(PhantomData); - -impl de::Visitor for Visitor { - type Value = Packet; - - #[inline] - fn visit_seq(&mut self, mut visitor: V) -> Result, V::Error> - where V: de::SeqVisitor - { - let packet = Packet { - rpc_id: match try!(visitor.visit()) { - Some(rpc_id) => rpc_id, - None => return Err(de::Error::end_of_stream()), - }, - message: match try!(visitor.visit()) { - Some(message) => message, - None => return Err(de::Error::end_of_stream()), - }, - }; - try!(visitor.end()); - Ok(packet) - } -} - -#[cfg(test)] -extern crate env_logger; - -#[test] -fn serde() { - use bincode; - let _ = env_logger::init(); - - let packet = Packet { - rpc_id: 1, - message: (), - }; - let ser = bincode::serde::serialize(&packet, bincode::SizeLimit::Infinite).unwrap(); - let de = bincode::serde::deserialize(&ser); - assert_eq!(packet, de.unwrap()); -} diff --git a/tarpc/src/protocol/server.rs b/tarpc/src/protocol/server.rs deleted file mode 100644 index abeb2fc..0000000 --- a/tarpc/src/protocol/server.rs +++ /dev/null @@ -1,280 +0,0 @@ -// Copyright 2016 Google Inc. All Rights Reserved. -// -// Licensed under the MIT License, . -// This file may not be copied, modified, or distributed except according to those terms. - -use serde; -use scoped_pool::{Pool, Scope}; -use std::fmt; -use std::io::{self, BufReader, BufWriter}; -use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::Duration; -use std::thread::{self, JoinHandle}; -use super::{Config, Deserialize, Error, Packet, Result, Serialize}; -use transport::{Dialer, Listener, Stream, Transport}; -use transport::tcp::TcpDialer; - -struct ConnectionHandler<'a, S, St> - where S: Serve, - St: Stream -{ - read_stream: BufReader, - write_stream: BufWriter, - server: S, - shutdown: &'a AtomicBool, -} - -impl<'a, S, St> ConnectionHandler<'a, S, St> - where S: Serve, - St: Stream -{ - fn handle_conn<'b>(&'b mut self, scope: &Scope<'b>) -> Result<()> { - let ConnectionHandler { - ref mut read_stream, - ref mut write_stream, - ref server, - shutdown, - } = *self; - trace!("ConnectionHandler: serving client..."); - let (tx, rx) = channel(); - scope.execute(move || Self::write(rx, write_stream)); - loop { - match read_stream.deserialize() { - Ok(Packet { rpc_id, message }) => { - let tx = tx.clone(); - scope.execute(move || { - let reply = server.serve(message); - let reply_packet = Packet { - rpc_id: rpc_id, - message: reply, - }; - tx.send(reply_packet).expect(pos!()); - }); - if shutdown.load(Ordering::SeqCst) { - info!("ConnectionHandler: server shutdown, so closing connection."); - break; - } - } - Err(Error::Io(ref err)) if Self::timed_out(err.kind()) => { - if !shutdown.load(Ordering::SeqCst) { - info!("ConnectionHandler: read timed out ({:?}). Server not shutdown, so \ - retrying read.", - err); - continue; - } else { - info!("ConnectionHandler: read timed out ({:?}). Server shutdown, so \ - closing connection.", - err); - break; - } - } - Err(e) => { - warn!("ConnectionHandler: closing client connection due to {:?}", - e); - return Err(e.into()); - } - } - } - Ok(()) - } - - fn timed_out(error_kind: io::ErrorKind) -> bool { - match error_kind { - io::ErrorKind::TimedOut | - io::ErrorKind::WouldBlock => true, - _ => false, - } - } - - fn write(rx: Receiver::Reply>>, stream: &mut BufWriter) { - loop { - match rx.recv() { - Err(e) => { - debug!("Write thread: returning due to {:?}", e); - return; - } - Ok(reply_packet) => { - if let Err(e) = stream.serialize(&reply_packet) { - warn!("Writer: failed to write reply to Client: {:?}", e); - } - } - } - } - } -} - -/// Provides methods for blocking until the server completes, -pub struct ServeHandle - where D: Dialer -{ - tx: Sender<()>, - join_handle: JoinHandle<()>, - dialer: D, -} - -impl ServeHandle - where D: Dialer -{ - /// Block until the server completes - pub fn wait(self) { - self.join_handle.join().expect(pos!()); - } - - /// Returns the dialer to the server. - pub fn dialer(&self) -> &D { - &self.dialer - } - - /// Shutdown the server. Gracefully shuts down the serve thread but currently does not - /// gracefully close open connections. - pub fn shutdown(self) { - info!("ServeHandle: attempting to shut down the server."); - self.tx.send(()).expect(pos!()); - if let Ok(_) = self.dialer.dial() { - self.join_handle.join().expect(pos!()); - } else { - warn!("ServeHandle: best effort shutdown of serve thread failed"); - } - } -} - -struct Server<'a, S: 'a, L> - where L: Listener -{ - server: &'a S, - listener: L, - read_timeout: Option, - die_rx: Receiver<()>, - shutdown: &'a AtomicBool, -} - -impl<'a, S, L> Server<'a, S, L> - where S: Serve + 'static, - L: Listener -{ - fn serve<'b>(self, scope: &Scope<'b>) - where 'a: 'b - { - for conn in self.listener.incoming() { - match self.die_rx.try_recv() { - Ok(_) => { - info!("serve: shutdown received."); - return; - } - Err(TryRecvError::Disconnected) => { - info!("serve: shutdown sender disconnected."); - return; - } - _ => (), - } - let conn = match conn { - Err(err) => { - error!("serve: failed to accept connection: {:?}", err); - return; - } - Ok(c) => c, - }; - if let Err(err) = conn.set_read_timeout(self.read_timeout) { - info!("serve: could not set read timeout: {:?}", err); - continue; - } - let read_conn = match conn.try_clone() { - Err(err) => { - error!("serve: could not clone tcp stream; possibly out of file descriptors? \ - Err: {:?}", - err); - continue; - } - Ok(conn) => conn, - }; - let mut handler = ConnectionHandler { - read_stream: BufReader::new(read_conn), - write_stream: BufWriter::new(conn), - server: self.server, - shutdown: self.shutdown, - }; - scope.recurse(move |scope| { - scope.zoom(|scope| { - if let Err(err) = handler.handle_conn(scope) { - info!("ConnectionHandler: err in connection handling: {:?}", err); - } - }); - }); - } - } -} - -impl<'a, S, L> Drop for Server<'a, S, L> - where L: Listener -{ - fn drop(&mut self) { - debug!("Shutting down connection handlers."); - self.shutdown.store(true, Ordering::SeqCst); - } -} - -/// A service provided by a server -pub trait Serve: Send + Sync + Sized { - /// The type of request received by the server - type Request: 'static + fmt::Debug + serde::ser::Serialize + serde::de::Deserialize + Send; - /// The type of reply sent by the server - type Reply: 'static + fmt::Debug + serde::ser::Serialize + serde::de::Deserialize + Send; - - /// Return a reply for a given request - fn serve(&self, request: Self::Request) -> Self::Reply; - - /// spawn - fn spawn(self, transport: T) -> io::Result::Dialer>> - where T: Transport, - Self: 'static - { - self.spawn_with_config(transport, Config::default()) - } - - /// spawn - fn spawn_with_config(self, - transport: T, - config: Config) - -> io::Result::Dialer>> - where T: Transport, - Self: 'static - { - let listener = try!(transport.bind()); - let dialer = try!(listener.dialer()); - info!("spawn_with_config: spinning up server."); - let (die_tx, die_rx) = channel(); - let timeout = config.timeout; - let join_handle = thread::spawn(move || { - let pool = Pool::new(100); // TODO(tjk): make this configurable, and expire idle threads - let shutdown = AtomicBool::new(false); - let server = Server { - server: &self, - listener: listener, - read_timeout: timeout, - die_rx: die_rx, - shutdown: &shutdown, - }; - pool.scoped(|scope| { - server.serve(scope); - }); - }); - Ok(ServeHandle { - tx: die_tx, - join_handle: join_handle, - dialer: dialer, - }) - } -} - -impl Serve for P - where P: Send + Sync + ::std::ops::Deref, - S: Serve -{ - type Request = S::Request; - type Reply = S::Reply; - - fn serve(&self, request: S::Request) -> S::Reply { - S::serve(self, request) - } -} diff --git a/tarpc/src/transport/mod.rs b/tarpc/src/transport/mod.rs deleted file mode 100644 index f70b2bb..0000000 --- a/tarpc/src/transport/mod.rs +++ /dev/null @@ -1,91 +0,0 @@ -use std::io::{self, Read, Write}; -use std::time::Duration; - -/// A factory for creating a listener on a given address. -/// For TCP, an address might be an IPv4 address; for Unix sockets, it -/// is just a file name. -pub trait Transport { - /// The type of listener that binds to the given address. - type Listener: Listener; - /// Return a listener on the given address, and a dialer to that address. - fn bind(&self) -> io::Result; -} - -/// Accepts incoming connections from dialers. -pub trait Listener: Send + 'static { - /// The type of address being listened on. - type Dialer: Dialer; - /// The type of stream this listener accepts. - type Stream: Stream; - /// Accept an incoming stream. - fn accept(&self) -> io::Result; - /// Returns the local address being listened on. - fn dialer(&self) -> io::Result; - /// Iterate over incoming connections. - fn incoming(&self) -> Incoming { - Incoming { listener: self } - } -} - -/// A cloneable Reader/Writer. -pub trait Stream: Read + Write + Send + Sized + 'static { - /// Creates a new independently owned handle to the Stream. - /// - /// The returned Stream should reference the same stream that this - /// object references. Both handles should read and write the same - /// stream of data, and options set on one stream should be propagated - /// to the other stream. - fn try_clone(&self) -> io::Result; - /// Sets a read timeout. - /// - /// If the value specified is `None`, then read calls will block indefinitely. - /// It is an error to pass the zero `Duration` to this method. - fn set_read_timeout(&self, dur: Option) -> io::Result<()>; - /// Sets a write timeout. - /// - /// If the value specified is `None`, then write calls will block indefinitely. - /// It is an error to pass the zero `Duration` to this method. - fn set_write_timeout(&self, dur: Option) -> io::Result<()>; - /// Shuts down both ends of the stream. - /// - /// Implementations should cause all pending and future I/O on the specified - /// portions to return immediately with an appropriate value. - fn shutdown(&self) -> io::Result<()>; -} - -/// A `Stream` factory. -pub trait Dialer { - /// The type of `Stream` this can create. - type Stream: Stream; - /// Open a stream. - fn dial(&self) -> io::Result; -} - -impl Dialer for P - where P: ::std::ops::Deref, - D: Dialer + 'static -{ - type Stream = D::Stream; - - fn dial(&self) -> io::Result { - (**self).dial() - } -} - -/// Iterates over incoming connections. -pub struct Incoming<'a, L: Listener + ?Sized + 'a> { - listener: &'a L, -} - -impl<'a, L: Listener> Iterator for Incoming<'a, L> { - type Item = io::Result; - - fn next(&mut self) -> Option { - Some(self.listener.accept()) - } -} - -/// Provides a TCP transport. -pub mod tcp; -/// Provides a unix socket transport. -pub mod unix; diff --git a/tarpc/src/transport/tcp.rs b/tarpc/src/transport/tcp.rs deleted file mode 100644 index 09ddf94..0000000 --- a/tarpc/src/transport/tcp.rs +++ /dev/null @@ -1,77 +0,0 @@ -use std::io; -use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs}; -use std::time::Duration; - -/// A transport for TCP. -#[derive(Debug)] -pub struct TcpTransport(pub A); - -impl super::Transport for TcpTransport { - type Listener = TcpListener; - - fn bind(&self) -> io::Result { - TcpListener::bind(&self.0) - } -} - -impl super::Transport for A { - type Listener = TcpListener; - - fn bind(&self) -> io::Result { - TcpListener::bind(self) - } -} - -impl super::Listener for TcpListener { - type Dialer = TcpDialer; - - type Stream = TcpStream; - - fn accept(&self) -> io::Result { - self.accept().map(|(stream, _)| stream) - } - - fn dialer(&self) -> io::Result> { - self.local_addr().map(|addr| TcpDialer(addr)) - } -} - -impl super::Stream for TcpStream { - fn try_clone(&self) -> io::Result { - self.try_clone() - } - - fn set_read_timeout(&self, dur: Option) -> io::Result<()> { - self.set_read_timeout(dur) - } - - fn set_write_timeout(&self, dur: Option) -> io::Result<()> { - self.set_write_timeout(dur) - } - - fn shutdown(&self) -> io::Result<()> { - self.shutdown(::std::net::Shutdown::Both) - } -} - -/// Connects to a socket address. -#[derive(Debug)] -pub struct TcpDialer(pub A) where A: ToSocketAddrs; - -impl super::Dialer for TcpDialer - where A: ToSocketAddrs -{ - type Stream = TcpStream; - - fn dial(&self) -> io::Result { - TcpStream::connect(&self.0) - } -} - -impl super::Dialer for str { - type Stream = TcpStream; - - fn dial(&self) -> io::Result { - TcpStream::connect(self) - } -} diff --git a/tarpc/src/transport/unix.rs b/tarpc/src/transport/unix.rs deleted file mode 100644 index c83dd70..0000000 --- a/tarpc/src/transport/unix.rs +++ /dev/null @@ -1,72 +0,0 @@ -use std::io; -use std::path::{Path, PathBuf}; -use std::time::Duration; -use unix_socket::{UnixListener, UnixStream}; - -/// A transport for unix sockets. -#[derive(Debug)] -pub struct UnixTransport

(pub P) where P: AsRef; - -impl

super::Transport for UnixTransport

- where P: AsRef -{ - type Listener = UnixListener; - - fn bind(&self) -> io::Result { - UnixListener::bind(&self.0) - } -} - -/// Connects to a unix socket address. -#[derive(Debug)] -pub struct UnixDialer

(pub P) where P: AsRef; - -impl

super::Dialer for UnixDialer

- where P: AsRef -{ - type Stream = UnixStream; - - fn dial(&self) -> io::Result { - UnixStream::connect(&self.0) - } -} - -impl super::Listener for UnixListener { - type Stream = UnixStream; - - type Dialer = UnixDialer; - - fn accept(&self) -> io::Result { - self.accept().map(|(stream, _)| stream) - } - - fn dialer(&self) -> io::Result> { - self.local_addr().and_then(|addr| { - match addr.as_pathname() { - Some(path) => Ok(UnixDialer(path.to_owned())), - None => { - Err(io::Error::new(io::ErrorKind::AddrNotAvailable, - "Couldn't get a path to bound unix socket")) - } - } - }) - } -} - -impl super::Stream for UnixStream { - fn try_clone(&self) -> io::Result { - self.try_clone() - } - - fn set_read_timeout(&self, timeout: Option) -> io::Result<()> { - self.set_read_timeout(timeout) - } - - fn set_write_timeout(&self, timeout: Option) -> io::Result<()> { - self.set_write_timeout(timeout) - } - - fn shutdown(&self) -> io::Result<()> { - self.shutdown(::std::net::Shutdown::Both) - } -} diff --git a/tarpc_examples/Cargo.toml b/tarpc_examples/Cargo.toml deleted file mode 100644 index c67b49a..0000000 --- a/tarpc_examples/Cargo.toml +++ /dev/null @@ -1,9 +0,0 @@ -[package] -name = "tarpc_examples" -version = "0.1.0" -authors = ["Adam Wright ", "Tim Kuehn "] - -[dev-dependencies] -tarpc = { path = "../tarpc" } -lazy_static = "0.2" -env_logger = "0.3" diff --git a/tarpc_examples/src/lib.rs b/tarpc_examples/src/lib.rs deleted file mode 100644 index 72bf45a..0000000 --- a/tarpc_examples/src/lib.rs +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright 2016 Google Inc. All Rights Reserved. -// -// Licensed under the MIT License, . -// This file may not be copied, modified, or distributed except according to those terms. - -#![cfg_attr(test, feature(test))] - -#[cfg(test)] -#[macro_use] -extern crate lazy_static; - -#[cfg(test)] -#[macro_use] -extern crate tarpc; - -#[cfg(test)] -#[allow(dead_code)] // generated Client isn't used in this benchmark -mod benchmark { - extern crate env_logger; - extern crate test; - - use tarpc::ServeHandle; - use self::test::Bencher; - use std::sync::{Arc, Mutex}; - - service! { - rpc hello(s: String) -> String; - } - - struct HelloServer; - impl Service for HelloServer { - fn hello(&self, s: String) -> String { - format!("Hello, {}!", s) - } - } - - // Prevents resource exhaustion when benching - lazy_static! { - static ref HANDLE: Arc> = { - let handle = HelloServer.spawn("localhost:0").unwrap(); - Arc::new(Mutex::new(handle)) - }; - static ref CLIENT: Arc> = { - let lock = HANDLE.lock().unwrap(); - let dialer = lock.dialer(); - let client = AsyncClient::new(dialer).unwrap(); - Arc::new(Mutex::new(client)) - }; - } - - #[bench] - fn hello(bencher: &mut Bencher) { - let _ = env_logger::init(); - let client = CLIENT.lock().unwrap(); - let concurrency = 100; - let mut futures = Vec::with_capacity(concurrency); - let mut count = 0; - bencher.iter(|| { - futures.push(client.hello("Bob".into())); - count += 1; - if count % concurrency == 0 { - // We can't block on each rpc call, otherwise we'd be - // benchmarking latency instead of throughput. It's also - // not ideal to call more than one rpc per iteration, because - // it makes the output of the bencher harder to parse (you have - // to mentally divide the number by `concurrency` to get - // the ns / iter for one rpc - for f in futures.drain(..) { - f.get().unwrap(); - } - } - }); - } -}