Files
tarpc/src/server.rs
Tim 7aabfb3c14 Rewrite using tokio (#44)
* Rewrite tarpc on top of tokio.

* Add examples

* Move error types to their own module.

Also, cull unused error variants.

* Remove unused fn

* Remove CanonicalRpcError* types. They're 100% useless.

* Track tokio master (WIP)

* The great error revamp.

Removed the canonical rpc error type. Instead, the user declares
the error type for each rpc:

In the above example, the error type is Baz. Declaring an error is
optional; if none is specified, it defaults to Never, a convenience
struct that wraps the never type (exclamation mark) to impl Serialize, Deserialize,
Error, etc. Also adds the convenience type StringError for easily
using a String as an error type.

* Add missing license header

* Minor cleanup

* Rename StringError => Message

* Create a sync::Connect trait.

Along with this, the existing Connect trait moves to future::Connect. The future
and sync modules are reexported from the crate root.

Additionally, the utility errors Never and Message are no longer reexported from
the crate root.

* Update readme

* Track tokio/futures master. Add a Spawn utility trait to replace the removed forget.

* Fix pre-push hook

* Add doc comment to SyncServiceExt.

* Fix up some documentation

* Track tokio-proto master

* Don't set tcp nodelay

* Make future::Connect take an associated type for the future.

* Unbox FutureClient::connect return type

* Use type alias instead of newtype struct for ClientFuture

* Fix benches/latency.rs

* Write a plugin to convert lower_snake_case idents/types to UpperCamelCase.

Use it to add associated types to FutureService instead of boxing the return futures.

* Specify plugin = true in snake_to_camel/Cargo.toml. Weird things happen otherwise.

* Add clippy.toml
2016-09-04 16:09:50 -07:00

72 lines
2.4 KiB
Rust

// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms.
use errors::{SerializableError, WireError};
use futures::{self, Future};
use futures::stream::Empty;
use futures_cpupool::{CpuFuture, CpuPool};
use protocol::{LOOP_HANDLE, TarpcTransport};
use protocol::writer::Packet;
use serde::Serialize;
use std::io;
use std::net::ToSocketAddrs;
use tokio_proto::pipeline;
use tokio_proto::NewService;
use tokio_proto::server::{self, ServerHandle};
/// Start a Tarpc service listening on the given address.
pub fn listen<A, T>(addr: A, new_service: T) -> io::Result<ServerHandle>
where T: NewService<Req = Vec<u8>,
Resp = pipeline::Message<Packet, Empty<(), io::Error>>,
Error = io::Error> + Send + 'static,
A: ToSocketAddrs
{
let mut addrs = addr.to_socket_addrs()?;
let addr = if let Some(a) = addrs.next() {
a
} else {
return Err(io::Error::new(io::ErrorKind::AddrNotAvailable,
"`ToSocketAddrs::to_socket_addrs` returned an empty iterator."));
};
server::listen(LOOP_HANDLE.clone(), addr, move |stream| {
pipeline::Server::new(new_service.new_service()?, TarpcTransport::new(stream))
})
.wait()
}
/// Returns a future containing the serialized reply.
///
/// Because serialization can take a non-trivial
/// amount of cpu time, it is run on a thread pool.
#[doc(hidden)]
#[inline]
pub fn serialize_reply<T: Serialize + Send + 'static,
E: SerializableError>(result: Result<T, WireError<E>>)
-> SerializeFuture
{
POOL.spawn(futures::lazy(move || {
let packet = match Packet::serialize(&result) {
Ok(packet) => packet,
Err(e) => {
let err: Result<T, WireError<E>> =
Err(WireError::ServerSerialize(e.to_string()));
Packet::serialize(&err).unwrap()
}
};
futures::finished(pipeline::Message::WithoutBody(packet))
}))
}
#[doc(hidden)]
pub type SerializeFuture = CpuFuture<SerializedReply, io::Error>;
#[doc(hidden)]
pub type SerializedReply = pipeline::Message<Packet, Empty<(), io::Error>>;
lazy_static! {
static ref POOL: CpuPool = { CpuPool::new_num_cpus() };
}