From a4d95818881dc56c0dbb74110a96fce4a58849d5 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Mon, 29 Jul 2019 23:16:51 -0700 Subject: [PATCH] Remove service_registry example --- tarpc/Cargo.toml | 4 - tarpc/examples/service_registry.rs | 402 ----------------------------- 2 files changed, 406 deletions(-) delete mode 100644 tarpc/examples/service_registry.rs diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index 933de98..6a4b11a 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -36,10 +36,6 @@ runtime-tokio = "0.3.0-alpha.5" tokio-tcp = "0.1" pin-utils = "0.1.0-alpha.4" -[[example]] -name = "service_registry" -required-features = ["serde1"] - [[example]] name = "server_calling_server" required-features = ["serde1"] diff --git a/tarpc/examples/service_registry.rs b/tarpc/examples/service_registry.rs deleted file mode 100644 index 59f68c4..0000000 --- a/tarpc/examples/service_registry.rs +++ /dev/null @@ -1,402 +0,0 @@ -#![feature(async_await)] - -mod registry { - use bytes::Bytes; - use futures::{ - future::{ready, Ready}, - prelude::*, - }; - use serde::{Deserialize, Serialize}; - use std::{ - io, - pin::Pin, - sync::Arc, - task::{Context, Poll}, - }; - use tarpc::{ - client::{self, Client}, - context, - }; - - /// A request to a named service. - #[derive(Serialize, Deserialize)] - pub struct ServiceRequest { - service_name: String, - request: Bytes, - } - - /// A response from a named service. - #[derive(Serialize, Deserialize)] - pub struct ServiceResponse { - response: Bytes, - } - - /// A list of registered services. - pub struct Registry { - registrations: Services, - } - - impl Default for Registry { - fn default() -> Self { - Registry { registrations: Nil } - } - } - - impl Registry { - /// Returns a function that serves requests for the registered services. - pub fn serve( - self, - ) -> impl FnOnce( - context::Context, - ServiceRequest, - ) -> Either>> - + Clone { - let registrations = Arc::new(self.registrations); - move |cx, req: ServiceRequest| match registrations.serve(cx, &req) { - Some(serve) => Either::Left(serve), - None => Either::Right(ready(Err(io::Error::new( - io::ErrorKind::NotFound, - format!("Service '{}' not registered", req.service_name), - )))), - } - } - - /// Registers `serve` with the given `name` using the given serialization scheme. - pub fn register( - self, - name: String, - serve: S, - deserialize: De, - serialize: Ser, - ) -> Registry> - where - Req: Send, - S: FnOnce(context::Context, Req) -> RespFut + Send + 'static + Clone, - RespFut: Future> + Send + 'static, - De: FnOnce(Bytes) -> io::Result + Send + 'static + Clone, - Ser: FnOnce(Resp) -> io::Result + Send + 'static + Clone, - { - let registrations = Registration { - name: name, - serve: move |cx, req: Bytes| { - async move { - let req = deserialize.clone()(req)?; - let response = serve.clone()(cx, req).await?; - let response = serialize.clone()(response)?; - Ok(ServiceResponse { response }) - } - }, - rest: self.registrations, - }; - Registry { registrations } - } - } - - /// Creates a client that sends requests to a service - /// named `service_name`, over the given channel, using - /// the specified serialization scheme. - pub fn new_client( - service_name: String, - channel: &client::Channel, - mut serialize: Ser, - mut deserialize: De, - ) -> client::MapResponse< - client::WithRequest< - client::Channel, - impl FnMut(Req) -> ServiceRequest, - >, - impl FnMut(ServiceResponse) -> Resp, - > - where - Req: Send + 'static, - Resp: Send + 'static, - De: FnMut(Bytes) -> io::Result + Clone + Send + 'static, - Ser: FnMut(Req) -> io::Result + Clone + Send + 'static, - { - channel - .clone() - .with_request(move |req| { - ServiceRequest { - service_name: service_name.clone(), - // TODO: shouldn't need to unwrap here. Maybe with_request should allow for - // returning Result. - request: serialize(req).unwrap(), - } - }) - // TODO: same thing. Maybe this should be more like and_then rather than map. - .map_response(move |resp| deserialize(resp.response).unwrap()) - } - - /// Serves a request. - /// - /// This trait is mostly an implementation detail that isn't used outside of the registry - /// internals. - pub trait Serve: Clone + Send + 'static { - type Response: Future> + Send + 'static; - fn serve(self, cx: context::Context, request: Bytes) -> Self::Response; - } - - /// Serves a request if the request is for a registered service. - /// - /// This trait is mostly an implementation detail that isn't used outside of the registry - /// internals. - pub trait MaybeServe: Send + 'static { - type Future: Future> + Send + 'static; - - fn serve(&self, cx: context::Context, request: &ServiceRequest) -> Option; - } - - /// A registry starting with service S, followed by Rest. - /// - /// This type is mostly an implementation detail that is not used directly - /// outside of the registry internals. - pub struct Registration { - /// The registered service's name. Must be unique across all registered services. - name: String, - /// The registered service. - serve: S, - /// Any remaining registered services. - rest: Rest, - } - - /// An empty registry. - /// - /// This type is mostly an implementation detail that is not used directly - /// outside of the registry internals. - pub struct Nil; - - impl MaybeServe for Nil { - type Future = futures::future::Ready>; - - fn serve(&self, _: context::Context, _: &ServiceRequest) -> Option { - None - } - } - - impl MaybeServe for Registration - where - S: Serve, - Rest: MaybeServe, - { - type Future = Either; - - fn serve(&self, cx: context::Context, request: &ServiceRequest) -> Option { - if self.name == request.service_name { - Some(Either::Left( - self.serve.clone().serve(cx, request.request.clone()), - )) - } else { - self.rest.serve(cx, request).map(Either::Right) - } - } - } - - /// Wraps either of two future types that both resolve to the same output type. - #[derive(Debug)] - #[must_use = "futures do nothing unless polled"] - pub enum Either { - Left(Left), - Right(Right), - } - - impl Future for Either - where - Left: Future, - Right: Future, - { - type Output = Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - unsafe { - match Pin::get_unchecked_mut(self) { - Either::Left(car) => Pin::new_unchecked(car).poll(cx), - Either::Right(cdr) => Pin::new_unchecked(cdr).poll(cx), - } - } - } - } - - impl Serve for F - where - F: FnOnce(context::Context, Bytes) -> Resp + Clone + Send + 'static, - Resp: Future> + Send + 'static, - { - type Response = Resp; - - fn serve(self, cx: context::Context, request: Bytes) -> Resp { - self(cx, request) - } - } -} - -// Example -use bytes::Bytes; -use futures::{ - future::{ready, Ready}, - prelude::*, -}; -use serde::{Deserialize, Serialize}; -use std::{ - collections::HashMap, - io, - sync::{Arc, RwLock}, -}; -use tarpc::{client, context, server::Handler}; - -fn deserialize(req: Bytes) -> io::Result -where - Req: for<'a> Deserialize<'a> + Send, -{ - bincode::deserialize(req.as_ref()).map_err(|e| io::Error::new(io::ErrorKind::Other, e)) -} - -fn serialize(resp: Resp) -> io::Result -where - Resp: Serialize, -{ - Ok(bincode::serialize(&resp) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))? - .into()) -} - -mod write_service { - pub use ServiceClient as Client; - - #[tarpc::service] - pub trait Service { - async fn write(key: String, value: String); - } -} - -mod read_service { - pub use ServiceClient as Client; - - #[tarpc::service] - pub trait Service { - async fn read(key: String) -> Option; - } -} - -#[derive(Debug, Default, Clone)] -struct Server { - data: Arc>>, -} - -impl write_service::Service for Server { - type WriteFut = Ready<()>; - - fn write(self, _: context::Context, key: String, value: String) -> Self::WriteFut { - self.data.write().unwrap().insert(key, value); - ready(()) - } -} - -impl read_service::Service for Server { - type ReadFut = Ready>; - - fn read(self, _: context::Context, key: String) -> Self::ReadFut { - ready(self.data.read().unwrap().get(&key).cloned()) - } -} - -struct BincodeRegistry { - registry: registry::Registry, -} - -impl Default for BincodeRegistry { - fn default() -> Self { - BincodeRegistry { - registry: registry::Registry::default(), - } - } -} - -impl BincodeRegistry { - fn serve( - self, - ) -> impl FnOnce( - context::Context, - registry::ServiceRequest, - ) -> registry::Either< - Services::Future, - Ready>, - > + Clone { - self.registry.serve() - } - - fn register( - self, - name: String, - serve: S, - ) -> BincodeRegistry> - where - Req: for<'a> Deserialize<'a> + Send + 'static, - Resp: Serialize + 'static, - S: FnOnce(context::Context, Req) -> RespFut + Send + 'static + Clone, - RespFut: Future> + Send + 'static, - { - let registry = self.registry.register(name, serve, deserialize, serialize); - BincodeRegistry { registry } - } -} - -pub fn new_client( - service_name: String, - channel: &client::Channel, -) -> client::MapResponse< - client::WithRequest< - client::Channel, - impl FnMut(Req) -> registry::ServiceRequest, - >, - impl FnMut(registry::ServiceResponse) -> Resp, -> -where - Req: Serialize + Send + 'static, - Resp: for<'a> Deserialize<'a> + Send + 'static, -{ - registry::new_client(service_name, channel, serialize, deserialize) -} - -#[runtime::main(runtime_tokio::Tokio)] -async fn main() -> io::Result<()> { - env_logger::init(); - - let server = Server::default(); - let registry = BincodeRegistry::default() - .register( - "WriteService".to_string(), - write_service::serve(server.clone()), - ) - .register( - "ReadService".to_string(), - read_service::serve(server.clone()), - ); - - let listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())? - .filter_map(|r| future::ready(r.ok())); - let server_addr = listener.get_ref().local_addr(); - let server = tarpc::Server::default() - .incoming(listener) - .take(1) - .respond_with(registry.serve()); - let _ = runtime::spawn(server); - - let transport = bincode_transport::connect(&server_addr).await?; - let channel = client::new(client::Config::default(), transport).await?; - - let write_client = new_client("WriteService".to_string(), &channel); - let mut write_client = write_service::Client::from(write_client); - - let read_client = new_client("ReadService".to_string(), &channel); - let mut read_client = read_service::Client::from(read_client); - - write_client - .write(context::current(), "key".to_string(), "val".to_string()) - .await?; - let val = read_client - .read(context::current(), "key".to_string()) - .await?; - eprintln!("{:?}", val); - - Ok(()) -}