mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-01-22 19:15:22 +01:00
Change sync listen to return a handle which runs the server (#112)
This commit is contained in:
14
README.md
14
README.md
@@ -54,6 +54,8 @@ code. Here's how to use the sync api.
|
||||
#[macro_use]
|
||||
extern crate tarpc;
|
||||
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
use tarpc::{client, server};
|
||||
use tarpc::client::sync::ClientExt;
|
||||
use tarpc::util::{FirstSocketAddr, Never};
|
||||
@@ -72,10 +74,14 @@ impl SyncService for HelloServer {
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let addr = HelloServer.listen("localhost:0".first_socket_addr(),
|
||||
server::Options::default())
|
||||
.unwrap();
|
||||
let mut client = SyncClient::connect(addr, client::Options::default()).unwrap();
|
||||
let (tx, rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let mut handle = HelloServer.listen("localhost:0", server::Options::default())
|
||||
.unwrap();
|
||||
tx.send(handle.addr()).unwrap();
|
||||
handle.run();
|
||||
});
|
||||
let mut client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap();
|
||||
println!("{}", client.hello("Mom".to_string()).unwrap());
|
||||
}
|
||||
```
|
||||
|
||||
@@ -15,6 +15,8 @@ extern crate tokio_core;
|
||||
|
||||
use std::error::Error;
|
||||
use std::fmt;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
use tarpc::{client, server};
|
||||
use tarpc::client::sync::ClientExt;
|
||||
use tarpc::util::FirstSocketAddr;
|
||||
@@ -52,10 +54,14 @@ impl SyncService for HelloServer {
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let addr = HelloServer.listen("localhost:10000".first_socket_addr(),
|
||||
server::Options::default())
|
||||
.unwrap();
|
||||
let mut client = SyncClient::connect(addr, client::Options::default()).unwrap();
|
||||
let (tx, rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let mut handle = HelloServer.listen("localhost:10000", server::Options::default())
|
||||
.unwrap();
|
||||
tx.send(handle.addr()).unwrap();
|
||||
handle.run();
|
||||
});
|
||||
let mut client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap();
|
||||
println!("{}", client.hello("Mom".to_string()).unwrap());
|
||||
println!("{}", client.hello("".to_string()).unwrap_err());
|
||||
}
|
||||
|
||||
@@ -12,6 +12,8 @@ extern crate futures;
|
||||
extern crate tarpc;
|
||||
extern crate tokio_core;
|
||||
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
use tarpc::{client, server};
|
||||
use tarpc::client::sync::ClientExt;
|
||||
use tarpc::util::{FirstSocketAddr, Never};
|
||||
@@ -30,9 +32,13 @@ impl SyncService for HelloServer {
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let addr = HelloServer.listen("localhost:0".first_socket_addr(),
|
||||
server::Options::default())
|
||||
.unwrap();
|
||||
let mut client = SyncClient::connect(addr, client::Options::default()).unwrap();
|
||||
let (tx, rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let mut handle = HelloServer.listen("localhost:0", server::Options::default())
|
||||
.unwrap();
|
||||
tx.send(handle.addr()).unwrap();
|
||||
handle.run();
|
||||
});
|
||||
let mut client = SyncClient::connect(rx.recv().unwrap(), client::Options::default()).unwrap();
|
||||
println!("{}", client.hello("Mom".to_string()).unwrap());
|
||||
}
|
||||
|
||||
15
src/lib.rs
15
src/lib.rs
@@ -39,6 +39,8 @@
|
||||
//! use tarpc::client::sync::ClientExt;
|
||||
//! use tarpc::util::Never;
|
||||
//! use tokio_core::reactor;
|
||||
//! use std::sync::mpsc;
|
||||
//! use std::thread;
|
||||
//!
|
||||
//! service! {
|
||||
//! rpc hello(name: String) -> String;
|
||||
@@ -54,9 +56,14 @@
|
||||
//! }
|
||||
//!
|
||||
//! fn main() {
|
||||
//! let addr = "localhost:10000";
|
||||
//! let reactor = reactor::Core::new().unwrap();
|
||||
//! let _server = HelloServer.listen(addr, server::Options::default());
|
||||
//! let (tx, rx) = mpsc::channel();
|
||||
//! thread::spawn(move || {
|
||||
//! let mut handle = HelloServer.listen("localhost:10000",
|
||||
//! server::Options::default()).unwrap();
|
||||
//! tx.send(handle.addr()).unwrap();
|
||||
//! handle.run();
|
||||
//! });
|
||||
//! let addr = rx.recv().unwrap();
|
||||
//! let mut client = SyncClient::connect(addr, client::Options::default()).unwrap();
|
||||
//! println!("{}", client.hello("Mom".to_string()).unwrap());
|
||||
//! }
|
||||
@@ -109,7 +116,7 @@
|
||||
//! ```
|
||||
//!
|
||||
#![deny(missing_docs)]
|
||||
#![feature(plugin, never_type, struct_field_attributes)]
|
||||
#![feature(conservative_impl_trait, never_type, plugin, struct_field_attributes)]
|
||||
#![plugin(tarpc_plugins)]
|
||||
|
||||
extern crate byteorder;
|
||||
|
||||
275
src/macros.rs
275
src/macros.rs
@@ -339,6 +339,113 @@ macro_rules! service {
|
||||
)*
|
||||
}
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
#[derive(Clone)]
|
||||
struct tarpc_service_AsyncServer__<S>(S);
|
||||
|
||||
impl<S> ::std::fmt::Debug for tarpc_service_AsyncServer__<S> {
|
||||
fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
|
||||
write!(fmt, "tarpc_service_AsyncServer__ {{ .. }}")
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
type tarpc_service_Future__ =
|
||||
$crate::futures::Finished<$crate::server::Response<tarpc_service_Response__,
|
||||
tarpc_service_Error__>,
|
||||
::std::io::Error>;
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
enum tarpc_service_FutureReply__<tarpc_service_S__: FutureService> {
|
||||
DeserializeError(tarpc_service_Future__),
|
||||
$($fn_name(
|
||||
$crate::futures::Then<
|
||||
<ty_snake_to_camel!(tarpc_service_S__::$fn_name)
|
||||
as $crate::futures::IntoFuture>::Future,
|
||||
tarpc_service_Future__,
|
||||
fn(::std::result::Result<$out, $error>)
|
||||
-> tarpc_service_Future__>)),*
|
||||
}
|
||||
|
||||
impl<S: FutureService> $crate::futures::Future for tarpc_service_FutureReply__<S> {
|
||||
type Item = $crate::server::Response<tarpc_service_Response__,
|
||||
tarpc_service_Error__>;
|
||||
|
||||
type Error = ::std::io::Error;
|
||||
|
||||
fn poll(&mut self) -> $crate::futures::Poll<Self::Item, Self::Error> {
|
||||
match *self {
|
||||
tarpc_service_FutureReply__::DeserializeError(
|
||||
ref mut tarpc_service_future__) =>
|
||||
{
|
||||
$crate::futures::Future::poll(tarpc_service_future__)
|
||||
}
|
||||
$(
|
||||
tarpc_service_FutureReply__::$fn_name(
|
||||
ref mut tarpc_service_future__) =>
|
||||
{
|
||||
$crate::futures::Future::poll(tarpc_service_future__)
|
||||
}
|
||||
),*
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
impl<tarpc_service_S__> $crate::tokio_service::Service
|
||||
for tarpc_service_AsyncServer__<tarpc_service_S__>
|
||||
where tarpc_service_S__: FutureService
|
||||
{
|
||||
type Request = ::std::result::Result<tarpc_service_Request__,
|
||||
$crate::bincode::Error>;
|
||||
type Response = $crate::server::Response<tarpc_service_Response__,
|
||||
tarpc_service_Error__>;
|
||||
type Error = ::std::io::Error;
|
||||
type Future = tarpc_service_FutureReply__<tarpc_service_S__>;
|
||||
|
||||
fn call(&self, tarpc_service_request__: Self::Request) -> Self::Future {
|
||||
let tarpc_service_request__ = match tarpc_service_request__ {
|
||||
Ok(tarpc_service_request__) => tarpc_service_request__,
|
||||
Err(tarpc_service_deserialize_err__) => {
|
||||
return tarpc_service_FutureReply__::DeserializeError(
|
||||
$crate::futures::finished(
|
||||
::std::result::Result::Err(
|
||||
$crate::WireError::ServerSerialize(
|
||||
::std::string::ToString::to_string(
|
||||
&tarpc_service_deserialize_err__)))));
|
||||
}
|
||||
};
|
||||
match tarpc_service_request__ {
|
||||
tarpc_service_Request__::NotIrrefutable(()) => unreachable!(),
|
||||
$(
|
||||
tarpc_service_Request__::$fn_name(( $($arg,)* )) => {
|
||||
fn tarpc_service_wrap__(
|
||||
tarpc_service_response__:
|
||||
::std::result::Result<$out, $error>)
|
||||
-> tarpc_service_Future__
|
||||
{
|
||||
$crate::futures::finished(
|
||||
tarpc_service_response__
|
||||
.map(tarpc_service_Response__::$fn_name)
|
||||
.map_err(|tarpc_service_error__| {
|
||||
$crate::WireError::App(
|
||||
tarpc_service_Error__::$fn_name(
|
||||
tarpc_service_error__))
|
||||
})
|
||||
)
|
||||
}
|
||||
return tarpc_service_FutureReply__::$fn_name(
|
||||
$crate::futures::Future::then(
|
||||
$crate::futures::IntoFuture::into_future(
|
||||
FutureService::$fn_name(&self.0, $($arg),*)),
|
||||
tarpc_service_wrap__));
|
||||
}
|
||||
)*
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Provides a function for starting the service. This is a separate trait from
|
||||
/// `FutureService` to prevent collisions with the names of RPCs.
|
||||
pub trait FutureServiceExt: FutureService {
|
||||
@@ -350,117 +457,14 @@ macro_rules! service {
|
||||
options: $crate::server::Options)
|
||||
-> ::std::io::Result<::std::net::SocketAddr>
|
||||
{
|
||||
return $crate::server::listen(move || Ok(tarpc_service_AsyncServer__(self.clone())),
|
||||
$crate::server::listen(move || Ok(tarpc_service_AsyncServer__(self.clone())),
|
||||
addr,
|
||||
handle,
|
||||
options);
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
#[derive(Clone)]
|
||||
struct tarpc_service_AsyncServer__<S>(S);
|
||||
|
||||
impl<S> ::std::fmt::Debug for tarpc_service_AsyncServer__<S> {
|
||||
fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
|
||||
write!(fmt, "tarpc_service_AsyncServer__ {{ .. }}")
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
type tarpc_service_Future__ =
|
||||
$crate::futures::Finished<$crate::server::Response<tarpc_service_Response__,
|
||||
tarpc_service_Error__>,
|
||||
::std::io::Error>;
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
enum tarpc_service_FutureReply__<tarpc_service_S__: FutureService> {
|
||||
DeserializeError(tarpc_service_Future__),
|
||||
$($fn_name(
|
||||
$crate::futures::Then<
|
||||
<ty_snake_to_camel!(tarpc_service_S__::$fn_name)
|
||||
as $crate::futures::IntoFuture>::Future,
|
||||
tarpc_service_Future__,
|
||||
fn(::std::result::Result<$out, $error>)
|
||||
-> tarpc_service_Future__>)),*
|
||||
}
|
||||
|
||||
impl<S: FutureService> $crate::futures::Future for tarpc_service_FutureReply__<S> {
|
||||
type Item = $crate::server::Response<tarpc_service_Response__,
|
||||
tarpc_service_Error__>;
|
||||
|
||||
type Error = ::std::io::Error;
|
||||
|
||||
fn poll(&mut self) -> $crate::futures::Poll<Self::Item, Self::Error> {
|
||||
match *self {
|
||||
tarpc_service_FutureReply__::DeserializeError(
|
||||
ref mut tarpc_service_future__) =>
|
||||
{
|
||||
$crate::futures::Future::poll(tarpc_service_future__)
|
||||
}
|
||||
$(
|
||||
tarpc_service_FutureReply__::$fn_name(
|
||||
ref mut tarpc_service_future__) =>
|
||||
{
|
||||
$crate::futures::Future::poll(tarpc_service_future__)
|
||||
}
|
||||
),*
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
impl<tarpc_service_S__> $crate::tokio_service::Service
|
||||
for tarpc_service_AsyncServer__<tarpc_service_S__>
|
||||
where tarpc_service_S__: FutureService
|
||||
{
|
||||
type Request = ::std::result::Result<tarpc_service_Request__,
|
||||
$crate::bincode::Error>;
|
||||
type Response = $crate::server::Response<tarpc_service_Response__,
|
||||
tarpc_service_Error__>;
|
||||
type Error = ::std::io::Error;
|
||||
type Future = tarpc_service_FutureReply__<tarpc_service_S__>;
|
||||
|
||||
fn call(&self, tarpc_service_request__: Self::Request) -> Self::Future {
|
||||
let tarpc_service_request__ = match tarpc_service_request__ {
|
||||
Ok(tarpc_service_request__) => tarpc_service_request__,
|
||||
Err(tarpc_service_deserialize_err__) => {
|
||||
return tarpc_service_FutureReply__::DeserializeError(
|
||||
$crate::futures::finished(
|
||||
::std::result::Result::Err(
|
||||
$crate::WireError::ServerSerialize(
|
||||
::std::string::ToString::to_string(
|
||||
&tarpc_service_deserialize_err__)))));
|
||||
}
|
||||
};
|
||||
match tarpc_service_request__ {
|
||||
tarpc_service_Request__::NotIrrefutable(()) => unreachable!(),
|
||||
$(
|
||||
tarpc_service_Request__::$fn_name(( $($arg,)* )) => {
|
||||
fn tarpc_service_wrap__(
|
||||
tarpc_service_response__:
|
||||
::std::result::Result<$out, $error>)
|
||||
-> tarpc_service_Future__
|
||||
{
|
||||
$crate::futures::finished(
|
||||
tarpc_service_response__
|
||||
.map(tarpc_service_Response__::$fn_name)
|
||||
.map_err(|tarpc_service_error__| {
|
||||
$crate::WireError::App(
|
||||
tarpc_service_Error__::$fn_name(
|
||||
tarpc_service_error__))
|
||||
})
|
||||
)
|
||||
}
|
||||
return tarpc_service_FutureReply__::$fn_name(
|
||||
$crate::futures::Future::then(
|
||||
$crate::futures::IntoFuture::into_future(
|
||||
FutureService::$fn_name(&self.0, $($arg),*)),
|
||||
tarpc_service_wrap__));
|
||||
}
|
||||
)*
|
||||
}
|
||||
}
|
||||
}
|
||||
options)
|
||||
.map(|(addr_, server_)| {
|
||||
handle.spawn(server_);
|
||||
addr_
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -484,36 +488,24 @@ macro_rules! service {
|
||||
/// Spawns the service, binding to the given address and running on
|
||||
/// the default tokio `Loop`.
|
||||
fn listen<A>(self, addr: A, options: $crate::server::Options)
|
||||
-> ::std::io::Result<::std::net::SocketAddr>
|
||||
-> ::std::io::Result<$crate::server::Handle>
|
||||
where A: ::std::net::ToSocketAddrs
|
||||
{
|
||||
let tarpc_service__ = SyncServer__ {
|
||||
let tarpc_service__ = tarpc_service_AsyncServer__(SyncServer__ {
|
||||
service: self,
|
||||
};
|
||||
});
|
||||
|
||||
let tarpc_service_addr__ =
|
||||
$crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?;
|
||||
|
||||
let (tx_, rx_) = ::std::sync::mpsc::channel();
|
||||
|
||||
::std::thread::spawn(move || {
|
||||
match $crate::tokio_core::reactor::Core::new() {
|
||||
::std::result::Result::Ok(mut reactor_) => {
|
||||
let addr_ = FutureServiceExt::listen(tarpc_service__,
|
||||
tarpc_service_addr__,
|
||||
&reactor_.handle(),
|
||||
options);
|
||||
tx_.send(addr_).unwrap();
|
||||
loop {
|
||||
reactor_.turn(::std::option::Option::None);
|
||||
}
|
||||
}
|
||||
::std::result::Result::Err(error_) => {
|
||||
tx_.send(Err(error_)).unwrap();
|
||||
}
|
||||
}
|
||||
});
|
||||
return rx_.recv().unwrap();
|
||||
let reactor_ = $crate::tokio_core::reactor::Core::new()?;
|
||||
let (addr_, server_) = $crate::server::listen(
|
||||
move || Ok(tarpc_service__.clone()),
|
||||
tarpc_service_addr__,
|
||||
&reactor_.handle(),
|
||||
options)?;
|
||||
reactor_.handle().spawn(server_);
|
||||
return Ok($crate::server::Handle::new(reactor_, addr_));
|
||||
|
||||
#[derive(Clone)]
|
||||
struct SyncServer__<S> {
|
||||
@@ -864,9 +856,14 @@ mod functional_test {
|
||||
fn start_server_with_sync_client<C, S>(server: S) -> io::Result<(SocketAddr, C)>
|
||||
where C: client::sync::ClientExt, S: SyncServiceExt
|
||||
{
|
||||
let server_options = get_tls_server_options();
|
||||
let addr = unwrap!(server.listen("localhost:0".first_socket_addr(),
|
||||
server_options));
|
||||
let options = get_tls_server_options();
|
||||
let (tx, rx) = ::std::sync::mpsc::channel();
|
||||
::std::thread::spawn(move || {
|
||||
let mut handle = unwrap!(server.listen("localhost:0".first_socket_addr(), options));
|
||||
tx.send(handle.addr()).unwrap();
|
||||
handle.run();
|
||||
});
|
||||
let addr = rx.recv().unwrap();
|
||||
let client = unwrap!(C::connect(addr, get_tls_client_options()));
|
||||
Ok((addr, client))
|
||||
}
|
||||
@@ -917,7 +914,13 @@ mod functional_test {
|
||||
where C: client::sync::ClientExt, S: SyncServiceExt
|
||||
{
|
||||
let options = get_server_options();
|
||||
let addr = unwrap!(server.listen("localhost:0".first_socket_addr(), options));
|
||||
let (tx, rx) = ::std::sync::mpsc::channel();
|
||||
::std::thread::spawn(move || {
|
||||
let mut handle = unwrap!(server.listen("localhost:0".first_socket_addr(), options));
|
||||
tx.send(handle.addr()).unwrap();
|
||||
handle.run();
|
||||
});
|
||||
let addr = rx.recv().unwrap();
|
||||
let client = unwrap!(get_sync_client(addr));
|
||||
Ok((addr, client))
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize};
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
use tokio_core::net::TcpListener;
|
||||
use tokio_core::reactor::{self, Handle};
|
||||
use tokio_core::reactor;
|
||||
use tokio_proto::BindServer;
|
||||
use tokio_service::NewService;
|
||||
|
||||
@@ -56,7 +56,7 @@ pub fn listen<S, Req, Resp, E>(new_service: S,
|
||||
addr: SocketAddr,
|
||||
handle: &reactor::Handle,
|
||||
_options: Options)
|
||||
-> io::Result<SocketAddr>
|
||||
-> io::Result<(SocketAddr, impl Future<Item = (), Error = ()>)>
|
||||
where S: NewService<Request = Result<Req, bincode::Error>,
|
||||
Response = Response<Resp, E>,
|
||||
Error = io::Error> + 'static,
|
||||
@@ -77,12 +77,40 @@ pub fn listen<S, Req, Resp, E>(new_service: S,
|
||||
listen_with(new_service, addr, handle, acceptor)
|
||||
}
|
||||
|
||||
/// A handle to a bound server. Must be run to start serving requests.
|
||||
pub struct Handle {
|
||||
reactor: reactor::Core,
|
||||
addr: SocketAddr,
|
||||
}
|
||||
|
||||
impl Handle {
|
||||
#[doc(hidden)]
|
||||
pub fn new(reactor: reactor::Core, addr: SocketAddr) -> Self {
|
||||
Handle {
|
||||
reactor: reactor,
|
||||
addr: addr,
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs the server on the current thread, blocking indefinitely.
|
||||
pub fn run(&mut self) -> ! {
|
||||
loop {
|
||||
self.reactor.turn(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// The socket address the server is bound to.
|
||||
pub fn addr(&self) -> SocketAddr {
|
||||
self.addr
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawns a service that binds to the given address using the given handle.
|
||||
fn listen_with<S, Req, Resp, E>(new_service: S,
|
||||
addr: SocketAddr,
|
||||
handle: &Handle,
|
||||
handle: &reactor::Handle,
|
||||
_acceptor: Acceptor)
|
||||
-> io::Result<SocketAddr>
|
||||
-> io::Result<(SocketAddr, impl Future<Item = (), Error = ()>)>
|
||||
where S: NewService<Request = Result<Req, bincode::Error>,
|
||||
Response = Response<Resp, E>,
|
||||
Error = io::Error> + 'static,
|
||||
@@ -115,11 +143,10 @@ fn listen_with<S, Req, Resp, E>(new_service: S,
|
||||
Ok(())
|
||||
})
|
||||
.map_err(|e| error!("While processing incoming connections: {}", e));
|
||||
handle.spawn(server);
|
||||
Ok(addr)
|
||||
Ok((addr, server))
|
||||
}
|
||||
|
||||
fn listener(addr: &SocketAddr, handle: &Handle) -> io::Result<TcpListener> {
|
||||
fn listener(addr: &SocketAddr, handle: &reactor::Handle) -> io::Result<TcpListener> {
|
||||
const PENDING_CONNECTION_BACKLOG: i32 = 1024;
|
||||
#[cfg(unix)]
|
||||
use net2::unix::UnixTcpBuilderExt;
|
||||
|
||||
Reference in New Issue
Block a user