From be156f4d6bcdf0d056625acc0940356afeba78e4 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sun, 11 Dec 2016 17:23:43 -0800 Subject: [PATCH] Cut down size of readme_expanded. --- examples/readme_expanded.rs | 240 ++++-------------------------------- 1 file changed, 21 insertions(+), 219 deletions(-) diff --git a/examples/readme_expanded.rs b/examples/readme_expanded.rs index e01f16f..566e49b 100644 --- a/examples/readme_expanded.rs +++ b/examples/readme_expanded.rs @@ -19,256 +19,58 @@ extern crate tokio_core; extern crate tokio_service; use bincode::serde::DeserializeError; -use futures::Future; +use futures::{Future, IntoFuture}; use std::io; -use std::net::{SocketAddr, ToSocketAddrs}; -use std::thread; -use tarpc::WireError; +use std::net::SocketAddr; use tarpc::future::Connect; use tarpc::util::FirstSocketAddr; use tarpc::util::Never; -use tokio_core::reactor::{Handle, Remote}; use tokio_service::Service; -#[derive(Debug, Serialize, Deserialize)] -enum Request { - Hello(String), -} +#[derive(Clone, Copy)] +struct HelloServer; -#[derive(Debug, Serialize, Deserialize)] -enum Response { - Hello(String), -} - -#[derive(Debug, Serialize, Deserialize)] -enum Error { - Hello(Never), -} - -/// Defines the `Future` RPC service. Implementors must be `Clone`, `Send`, and `'static`, -/// as required by `tokio_proto::NewService`. This is required so that the service can be used -/// to respond to multiple requests concurrently. -pub trait FutureService: Send + Clone + 'static { - type HelloFut: Future; - fn hello(&self, name: String) -> Self::HelloFut; -} - -/// Provides a function for starting the service. This is a separate trait from -/// `FutureService` to prevent collisions with the names of RPCs. -pub trait FutureServiceExt: FutureService { - fn listen(self, addr: SocketAddr) -> tarpc::ListenFuture { - let (tx, rx) = futures::oneshot(); - tarpc::REMOTE.spawn(move |handle| Ok(tx.complete(Self::listen_with(self, addr, handle.clone())))); - tarpc::ListenFuture::from_oneshot(rx) - } - - /// Spawns the service, binding to the given address and running on the default tokio `Loop`. - fn listen_with(self, addr: SocketAddr, handle: Handle) -> io::Result { - return tarpc::listen_with(addr, move || Ok(AsyncServer(self.clone())), handle); - - #[derive(Clone, Debug)] - struct AsyncServer(S); - - type Fut = futures::Finished, io::Error>; - - enum FutureReply { - DeserializeError(Fut), - Hello(futures::Then) -> Fut>), - } - - impl Future for FutureReply { - type Item = tarpc::Response; - type Error = io::Error; - - fn poll(&mut self) -> futures::Poll { - match *self { - FutureReply::DeserializeError(ref mut future) => future.poll(), - FutureReply::Hello(ref mut future) => future.poll(), - } - } - } - - impl Service for AsyncServer - where S: FutureService - { - type Request = Result; - type Response = tarpc::Response; - type Error = io::Error; - type Future = FutureReply; - - fn call(&self, request: Self::Request) -> Self::Future { - let request = match request { - Ok(request) => request, - Err(deserialize_err) => { - let err = Err(WireError::ServerDeserialize(deserialize_err.to_string())); - return FutureReply::DeserializeError(futures::finished(err)); - } - }; - - match request { - Request::Hello(name) => { - fn wrap(response: Result) -> Fut { - let fut = response.map(Response::Hello) - .map_err(|error| WireError::App(Error::Hello(error))); - futures::finished(fut) - } - return FutureReply::Hello(self.0.hello(name).then(wrap)); - } - } - } - } - } -} - -/// Defines the blocking RPC service. Must be `Clone`, `Send`, and `'static`, -/// as required by `tokio_proto::NewService`. This is required so that the service can be used -/// to respond to multiple requests concurrently. -pub trait SyncService: Send + Clone + 'static { - fn hello(&self, name: String) -> Result; -} - -/// Provides a function for starting the service. This is a separate trait from -/// `SyncService` to prevent collisions with the names of RPCs. -pub trait SyncServiceExt: SyncService { - fn listen(self, addr: L) -> io::Result - where L: ToSocketAddrs - { - let addr = addr.try_first_socket_addr()?; +impl HelloServer { + fn listen(addr: SocketAddr) -> impl Future { let (tx, rx) = futures::oneshot(); tarpc::REMOTE.spawn(move |handle| { - Ok(tx.complete(Self::listen_with(self, addr, handle.clone()))) + Ok(tx.complete(tarpc::listen_with(addr, move || Ok(HelloServer), handle.clone()))) }); - tarpc::ListenFuture::from_oneshot(rx).wait() - } - - /// Spawns the service, binding to the given address and running on - /// the default tokio `Loop`. - fn listen_with(self, addr: L, handle: Handle) -> io::Result - where L: ToSocketAddrs - { - let service = SyncServer { service: self }; - return service.listen_with(addr.try_first_socket_addr()?, handle); - - #[derive(Clone)] - struct SyncServer { - service: S, - } - - impl FutureService for SyncServer - where S: SyncService - { - type HelloFut = futures::Flatten>, fn(futures::Canceled) -> Never>>; - - fn hello(&self, name: String) -> Self::HelloFut { - fn unimplemented(_: futures::Canceled) -> Never { - unimplemented!() - } - - let (complete, promise) = futures::oneshot(); - let service = self.clone(); - const UNIMPLEMENTED: fn(futures::Canceled) -> Never = unimplemented; - thread::spawn(move || { - let reply = SyncService::hello(&service.service, name); - complete.complete(futures::IntoFuture::into_future(reply)); - }); - promise.map_err(UNIMPLEMENTED).flatten() - } - } + rx.map_err(|e| panic!(e)).and_then(|result| result) } } -impl FutureServiceExt for A where A: FutureService {} -impl SyncServiceExt for S where S: SyncService {} -type Client = tarpc::Client; - -/// Implementation detail: Pending connection. -pub struct ConnectFuture { - inner: futures::Map, fn(Client) -> T>, -} - -impl Future for ConnectFuture { - type Item = T; +impl Service for HelloServer { + type Request = Result; + type Response = tarpc::Response; type Error = io::Error; + type Future = Box, Error = io::Error>>; - fn poll(&mut self) -> futures::Poll { - self.inner.poll() - } -} - -/// Implementation detail: Pending connection. -pub struct ConnectWithFuture<'a, T> { - inner: futures::Map, fn(Client) -> T>, -} - -impl<'a, T> Future for ConnectWithFuture<'a, T> { - type Item = T; - type Error = io::Error; - fn poll(&mut self) -> futures::Poll { - self.inner.poll() + fn call(&self, request: Self::Request) -> Self::Future { + Ok(Ok(format!("Hello, {}!", request.unwrap()))).into_future().boxed() } } /// The client stub that makes RPC calls to the server. Exposes a Future interface. #[derive(Debug)] -pub struct FutureClient(Client); - -impl<'a> tarpc::future::Connect<'a> for FutureClient { - type ConnectFut = ConnectFuture; - type ConnectWithFut = ConnectWithFuture<'a, Self>; - - fn connect_remotely(addr: &SocketAddr, remote: &Remote) -> Self::ConnectFut { - let client = Client::connect_remotely(addr, remote); - ConnectFuture { inner: client.map(FutureClient) } - } - - fn connect_with(addr: &SocketAddr, handle: &'a Handle) -> Self::ConnectWithFut { - let client = Client::connect_with(addr, handle); - ConnectWithFuture { inner: client.map(FutureClient) } - } -} +pub struct FutureClient(tarpc::Client); impl FutureClient { + fn connect(addr: &SocketAddr) -> impl Future { + tarpc::Client::connect_remotely(addr, &tarpc::REMOTE).map(FutureClient) + } + pub fn hello(&self, name: String) -> impl Future> + 'static { - let request = Request::Hello(name); - - self.0.call(request).then(move |msg| { - match msg? { - Ok(Response::Hello(msg)) => Ok(msg), - Err(err) => { - Err(match err { - tarpc::Error::App(Error::Hello(err)) => tarpc::Error::App(err), - tarpc::Error::ServerDeserialize(err) => { - tarpc::Error::ServerDeserialize(err) - } - tarpc::Error::ServerSerialize(err) => tarpc::Error::ServerSerialize(err), - tarpc::Error::ClientDeserialize(err) => { - tarpc::Error::ClientDeserialize(err) - } - tarpc::Error::ClientSerialize(err) => tarpc::Error::ClientSerialize(err), - tarpc::Error::Io(error) => tarpc::Error::Io(error), - }) - } - } - }) - } -} - -#[derive(Clone)] -struct HelloServer; - -impl SyncService for HelloServer { - fn hello(&self, name: String) -> Result { - info!("Got request: {}", name); - Ok(format!("Hello, {}!", name)) + self.0.call(name).then(|msg| msg.unwrap()) } } fn main() { let _ = env_logger::init(); let mut core = tokio_core::reactor::Core::new().unwrap(); - let addr = HelloServer.listen("localhost:10000").unwrap(); + let addr = HelloServer::listen("localhost:10000".first_socket_addr()).wait().unwrap(); let f = FutureClient::connect(&addr) .map_err(tarpc::Error::from) .and_then(|client| {