Merge pull request #92 from tikue/sync-reactor

Change SyncClient to drive its own execution with an internal reactor::Core.
This commit is contained in:
Adam Wright
2017-02-13 21:37:49 -08:00
committed by GitHub
10 changed files with 176 additions and 277 deletions

View File

@@ -38,8 +38,13 @@ impl FutureService for Server {
#[bench]
fn latency(bencher: &mut Bencher) {
let _ = env_logger::init();
let addr = Server.listen("localhost:0".first_socket_addr(), server::Options::default()).wait().unwrap();
let addr = Server.listen("localhost:0".first_socket_addr(),
server::Options::default())
.wait()
.unwrap();
let client = SyncClient::connect(addr, client::Options::default()).unwrap();
bencher.iter(|| { client.ack().unwrap(); });
bencher.iter(|| {
client.ack().unwrap();
});
}

View File

@@ -122,7 +122,7 @@ fn main() {
.wait()
.unwrap();
let publisher_client =
let mut publisher_client =
publisher::SyncClient::connect(publisher_addr, client::Options::default()).unwrap();
let subscriber1 = Subscriber::listen(0);

View File

@@ -51,7 +51,7 @@ impl SyncService for HelloServer {
fn main() {
let addr = HelloServer.listen("localhost:10000", server::Options::default()).unwrap();
let client = SyncClient::connect(addr, client::Options::default()).unwrap();
let mut client = SyncClient::connect(addr, client::Options::default()).unwrap();
println!("{}", client.hello("Mom".to_string()).unwrap());
println!("{}", client.hello("".to_string()).unwrap_err());
}

View File

@@ -31,6 +31,6 @@ impl SyncService for HelloServer {
fn main() {
let addr = "localhost:10000";
HelloServer.listen(addr, server::Options::default()).unwrap();
let client = SyncClient::connect(addr, client::Options::default()).unwrap();
let mut client = SyncClient::connect(addr, client::Options::default()).unwrap();
println!("{}", client.hello("Mom".to_string()).unwrap());
}

View File

@@ -86,7 +86,7 @@ fn main() {
.wait()
.unwrap();
let double_client = double::SyncClient::connect(double_addr, client::Options::default())
let mut double_client = double::SyncClient::connect(double_addr, client::Options::default())
.unwrap();
for i in 0..5 {
println!("{:?}", double_client.double(i).unwrap());

View File

@@ -57,7 +57,7 @@ fn bench_tarpc(target: u64) {
server::Options::default())
.wait()
.unwrap();
let client = SyncClient::connect(addr, client::Options::default()).unwrap();
let mut client = SyncClient::connect(addr, client::Options::default()).unwrap();
let start = time::Instant::now();
let mut nread = 0;
while nread < target {

View File

@@ -68,8 +68,8 @@ fn main() {
.wait()
.unwrap();
let bar_client = bar::SyncClient::connect(bar_addr, client::Options::default()).unwrap();
let baz_client = baz::SyncClient::connect(baz_addr, client::Options::default()).unwrap();
let mut bar_client = bar::SyncClient::connect(bar_addr, client::Options::default()).unwrap();
let mut baz_client = baz::SyncClient::connect(baz_addr, client::Options::default()).unwrap();
info!("Result: {:?}", bar_client.bar(17));

View File

@@ -5,24 +5,11 @@
use {Reactor, WireError};
use bincode;
use futures::{self, Future};
use protocol::Proto;
#[cfg(feature = "tls")]
use self::tls::*;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::io;
use stream_type::StreamType;
use tokio_core::reactor;
use tokio_proto::BindClient as ProtoBindClient;
use tokio_proto::multiplex::Multiplex;
use tokio_service::Service;
type WireResponse<Resp, E> = Result<Result<Resp, WireError<E>>, bincode::Error>;
type ResponseFuture<Req, Resp, E> = futures::Map<<BindClient<Req, Resp, E> as Service>::Future,
fn(WireResponse<Resp, E>) -> Result<Resp, ::Error<E>>>;
type BindClient<Req, Resp, E> = <Proto<Req, Result<Resp, WireError<E>>> as
ProtoBindClient<Multiplex, StreamType>>::BindClient;
/// TLS-specific functionality
#[cfg(feature = "tls")]
@@ -64,74 +51,6 @@ pub mod tls {
}
}
/// A client that impls `tokio_service::Service` that writes and reads bytes.
///
/// Typically, this would be combined with a serialization pre-processing step
/// and a deserialization post-processing step.
#[doc(hidden)]
pub struct Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
inner: BindClient<Req, Resp, E>,
}
impl<Req, Resp, E> Clone for Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
fn clone(&self) -> Self {
Client { inner: self.inner.clone() }
}
}
impl<Req, Resp, E> Service for Client<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
type Request = Req;
type Response = Result<Resp, ::Error<E>>;
type Error = io::Error;
type Future = ResponseFuture<Req, Resp, E>;
fn call(&self, request: Self::Request) -> Self::Future {
self.inner.call(request).map(Self::map_err)
}
}
impl<Req, Resp, E> Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
fn new(inner: BindClient<Req, Resp, E>) -> Self
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
Client { inner: inner }
}
fn map_err(resp: WireResponse<Resp, E>) -> Result<Resp, ::Error<E>> {
resp.map(|r| r.map_err(::Error::from))
.map_err(::Error::ClientSerialize)
.and_then(|r| r)
}
}
impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "Client {{ .. }}")
}
}
/// Additional options to configure how the client connects and operates.
#[derive(Default)]
pub struct Options {
@@ -163,24 +82,92 @@ impl Options {
/// Exposes a trait for connecting asynchronously to servers.
pub mod future {
use {REMOTE, Reactor};
use futures::{self, Async, Future, future};
use {REMOTE, Reactor, WireError};
use futures::{self, Future, future};
use protocol::Proto;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::net::SocketAddr;
use stream_type::StreamType;
use super::{Client, Options};
use tokio_core::net::{TcpStream, TcpStreamNew};
use super::{Options, WireResponse};
use tokio_core::net::TcpStream;
use tokio_core::reactor;
use tokio_proto::BindClient;
cfg_if! {
if #[cfg(feature = "tls")] {
use tokio_tls::{ConnectAsync, TlsStream, TlsConnectorExt};
use super::tls::Context;
use errors::native_to_io;
} else {}
use tokio_proto::BindClient as ProtoBindClient;
use tokio_proto::multiplex::Multiplex;
use tokio_service::Service;
#[cfg(feature = "tls")]
use tokio_tls::TlsConnectorExt;
#[cfg(feature = "tls")]
use errors::native_to_io;
/// A client that impls `tokio_service::Service` that writes and reads bytes.
#[doc(hidden)]
pub struct Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
inner: BindClient<Req, Resp, E>,
}
impl<Req, Resp, E> Clone for Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
fn clone(&self) -> Self {
Client { inner: self.inner.clone() }
}
}
impl<Req, Resp, E> Service for Client<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
type Request = Req;
type Response = Resp;
type Error = ::Error<E>;
type Future = ResponseFuture<Req, Resp, E>;
fn call(&self, request: Self::Request) -> Self::Future {
fn identity<T>(t: T) -> T { t }
self.inner.call(request)
.map(Self::map_err as _)
.map_err(::Error::from as _)
.and_then(identity as _)
}
}
impl<Req, Resp, E> Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
fn new(inner: BindClient<Req, Resp, E>) -> Self
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
Client { inner: inner }
}
fn map_err(resp: WireResponse<Resp, E>) -> Result<Resp, ::Error<E>> {
resp.map(|r| r.map_err(::Error::from))
.map_err(::Error::ClientSerialize)
.and_then(|r| r)
}
}
impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "Client {{ .. }}")
}
}
/// Types that can connect to a server asynchronously.
@@ -192,98 +179,10 @@ pub mod future {
fn connect(addr: SocketAddr, options: Options) -> Self::ConnectFut;
}
type ConnectFutureInner<Req, Resp, E, T> = future::Either<futures::Map<futures::AndThen<
TcpStreamNew, T, ConnectFn>, MultiplexConnect<Req, Resp, E>>, futures::Flatten<
futures::MapErr<futures::Oneshot<io::Result<Client<Req, Resp, E>>>,
fn(futures::Canceled) -> io::Error>>>;
/// A future that resolves to a `Client` or an `io::Error`.
#[doc(hidden)]
pub struct ConnectFuture<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
#[cfg(not(feature = "tls"))]
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
inner: ConnectFutureInner<Req, Resp, E, future::FutureResult<StreamType, io::Error>>,
#[cfg(feature = "tls")]
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
inner: ConnectFutureInner<Req, Resp, E, future::Either<future::FutureResult<
StreamType, io::Error>, futures::Map<futures::MapErr<ConnectAsync<TcpStream>,
fn(::native_tls::Error) -> io::Error>, fn(TlsStream<TcpStream>) -> StreamType>>>,
}
impl<Req, Resp, E> Future for ConnectFuture<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
type Item = Client<Req, Resp, E>;
type Error = io::Error;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
// Ok to unwrap because we ensure the oneshot is always completed.
match Future::poll(&mut self.inner)? {
Async::Ready(client) => Ok(Async::Ready(client)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
struct MultiplexConnect<Req, Resp, E>(reactor::Handle, PhantomData<(Req, Resp, E)>);
impl<Req, Resp, E> MultiplexConnect<Req, Resp, E> {
fn new(handle: reactor::Handle) -> Self {
MultiplexConnect(handle, PhantomData)
}
}
impl<Req, Resp, E, I> FnOnce<(I,)> for MultiplexConnect<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static,
I: Into<StreamType>
{
type Output = Client<Req, Resp, E>;
extern "rust-call" fn call_once(self, (stream,): (I,)) -> Self::Output {
Client::new(Proto::new().bind_client(&self.0, stream.into()))
}
}
/// Provides the connection Fn impl for Tls
struct ConnectFn {
#[cfg(feature = "tls")]
tls_ctx: Option<Context>,
}
impl FnOnce<(TcpStream,)> for ConnectFn {
#[cfg(feature = "tls")]
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
type Output = future::Either<future::FutureResult<StreamType, io::Error>,
futures::Map<futures::MapErr<ConnectAsync<TcpStream>,
fn(::native_tls::Error)
-> io::Error>,
fn(TlsStream<TcpStream>) -> StreamType>>;
#[cfg(not(feature = "tls"))]
type Output = future::FutureResult<StreamType, io::Error>;
extern "rust-call" fn call_once(self, (tcp,): (TcpStream,)) -> Self::Output {
#[cfg(feature = "tls")]
match self.tls_ctx {
None => future::Either::A(future::ok(StreamType::from(tcp))),
Some(tls_ctx) => {
future::Either::B(tls_ctx.tls_connector
.connect_async(&tls_ctx.domain, tcp)
.map_err(native_to_io as fn(_) -> _)
.map(StreamType::from as fn(_) -> _))
}
}
#[cfg(not(feature = "tls"))]
future::ok(StreamType::from(tcp))
}
}
pub type ConnectFuture<Req, Resp, E> = futures::Flatten<
futures::MapErr<futures::Oneshot<io::Result<Client<Req, Resp, E>>>,
fn(futures::Canceled) -> io::Error>>;
impl<Req, Resp, E> Connect for Client<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
@@ -300,42 +199,39 @@ pub mod future {
#[cfg(feature = "tls")]
let tls_ctx = options.tls_ctx.take();
let connect = move |handle: &reactor::Handle| {
let handle2 = handle.clone();
TcpStream::connect(&addr, handle)
.and_then(move |socket| {
#[cfg(feature = "tls")]
match tls_ctx {
Some(tls_ctx) => {
future::Either::A(tls_ctx.tls_connector
.connect_async(&tls_ctx.domain, socket)
.map(StreamType::Tls)
.map_err(native_to_io))
}
None => future::Either::B(future::ok(StreamType::Tcp(socket))),
}
#[cfg(not(feature = "tls"))]
future::ok(StreamType::Tcp(socket))
})
.map(move |tcp| Client::new(Proto::new().bind_client(&handle2, tcp)))
};
let setup = move |tx: futures::sync::oneshot::Sender<_>| {
move |handle: &reactor::Handle| {
let handle2 = handle.clone();
TcpStream::connect(&addr, handle)
.and_then(move |socket| {
#[cfg(feature = "tls")]
match tls_ctx {
Some(tls_ctx) => {
future::Either::A(tls_ctx.tls_connector
.connect_async(&tls_ctx.domain, socket)
.map(StreamType::Tls)
.map_err(native_to_io))
}
None => future::Either::B(future::ok(StreamType::Tcp(socket))),
}
#[cfg(not(feature = "tls"))]
future::ok(StreamType::Tcp(socket))
})
.map(move |tcp| Client::new(Proto::new().bind_client(&handle2, tcp)))
.then(move |result| {
tx.complete(result);
Ok(())
})
connect(handle).then(move |result| {
tx.complete(result);
Ok(())
})
}
};
let rx = match options.reactor {
Some(Reactor::Handle(handle)) => {
#[cfg(feature = "tls")]
let connect_fn = ConnectFn { tls_ctx: options.tls_ctx };
#[cfg(not(feature = "tls"))]
let connect_fn = ConnectFn {};
let tcp = TcpStream::connect(&addr, &handle)
.and_then(connect_fn)
.map(MultiplexConnect::new(handle));
return ConnectFuture { inner: future::Either::A(tcp) };
let (tx, rx) = futures::oneshot();
handle.spawn(setup(tx)(&handle));
rx
}
Some(Reactor::Remote(remote)) => {
let (tx, rx) = futures::oneshot();
@@ -351,39 +247,31 @@ pub mod future {
fn panic(canceled: futures::Canceled) -> io::Error {
unreachable!(canceled)
}
ConnectFuture { inner: future::Either::B(rx.map_err(panic as fn(_) -> _).flatten()) }
rx.map_err(panic as _).flatten()
}
}
type ResponseFuture<Req, Resp, E> =
futures::AndThen<futures::MapErr<
futures::Map<<BindClient<Req, Resp, E> as Service>::Future,
fn(WireResponse<Resp, E>) -> Result<Resp, ::Error<E>>>,
fn(io::Error) -> ::Error<E>>,
Result<Resp, ::Error<E>>,
fn(Result<Resp, ::Error<E>>) -> Result<Resp, ::Error<E>>>;
type BindClient<Req, Resp, E> =
<Proto<Req, Result<Resp, WireError<E>>>
as ProtoBindClient<Multiplex, StreamType>>::BindClient;
}
/// Exposes a trait for connecting synchronously to servers.
pub mod sync {
use client::future::Connect as FutureConnect;
use futures::{Future, future};
use serde::{Deserialize, Serialize};
use std::io;
use std::net::ToSocketAddrs;
use super::{Client, Options};
use util::FirstSocketAddr;
use super::Options;
/// Types that can connect to a server synchronously.
pub trait Connect: Sized {
/// Connects to a server located at the given address.
fn connect<A>(addr: A, options: Options) -> Result<Self, io::Error> where A: ToSocketAddrs;
}
impl<Req, Resp, E> Connect for Client<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
fn connect<A>(addr: A, options: Options) -> Result<Self, io::Error>
where A: ToSocketAddrs
{
let addr = addr.try_first_socket_addr()?;
// Wrapped in a lazy future to ensure execution occurs when a task is present.
future::lazy(move || <Self as FutureConnect>::connect(addr, options)).wait()
}
}
}

View File

@@ -54,7 +54,7 @@
//! fn main() {
//! let addr = "localhost:10000";
//! let _server = HelloServer.listen(addr, server::Options::default());
//! let client = SyncClient::connect(addr, client::Options::default()).unwrap();
//! let mut client = SyncClient::connect(addr, client::Options::default()).unwrap();
//! println!("{}", client.hello("Mom".to_string()).unwrap());
//! }
//! ```
@@ -97,7 +97,7 @@
//! let addr = "localhost:10000";
//! let acceptor = get_acceptor();
//! let _server = HelloServer.listen(addr, server::Options::default().tls(acceptor));
//! let client = SyncClient::connect(addr,
//! let mut client = SyncClient::connect(addr,
//! client::Options::default()
//! .tls(client::tls::Context::new("foobar.com").unwrap()))
//! .unwrap();
@@ -106,8 +106,7 @@
//! ```
//!
#![deny(missing_docs)]
#![feature(plugin, conservative_impl_trait, never_type, unboxed_closures, fn_traits,
specialization)]
#![feature(plugin, conservative_impl_trait, never_type, unboxed_closures, specialization)]
#![plugin(tarpc_plugins)]
extern crate byteorder;

View File

@@ -77,7 +77,9 @@ macro_rules! impl_deserialize {
impl $crate::serde::de::Visitor for impl_deserialize_FieldVisitor__ {
type Value = impl_deserialize_Field__;
fn expecting(&self, formatter: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
fn expecting(&self, formatter: &mut ::std::fmt::Formatter)
-> ::std::fmt::Result
{
formatter.write_str("an unsigned integer")
}
@@ -541,22 +543,33 @@ macro_rules! service {
impl<S> SyncServiceExt for S where S: SyncService {}
#[allow(unused)]
#[derive(Clone, Debug)]
/// The client stub that makes RPC calls to the server. Exposes a blocking interface.
pub struct SyncClient(FutureClient);
pub struct SyncClient {
inner: FutureClient,
reactor: $crate::tokio_core::reactor::Core,
}
impl ::std::fmt::Debug for SyncClient {
fn fmt(&self, formatter: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
write!(formatter, "SyncClient {{ inner: {:?}, .. }}", self.inner)
}
}
impl $crate::client::sync::Connect for SyncClient {
fn connect<A>(addr_: A, opts_: $crate::client::Options)
fn connect<A>(addr_: A, options_: $crate::client::Options)
-> ::std::result::Result<Self, ::std::io::Error>
where A: ::std::net::ToSocketAddrs,
{
let addr_ = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr_)?;
// Wrapped in a lazy future to ensure execution occurs when a task is present.
let client_ = $crate::futures::Future::wait($crate::futures::future::lazy(move || {
<FutureClient as $crate::client::future::Connect>::connect(addr_, opts_)
}))?;
let client_ = SyncClient(client_);
::std::result::Result::Ok(client_)
let mut reactor_ = $crate::tokio_core::reactor::Core::new()?;
let options_ = options_.handle(reactor_.handle());
let client_ = <FutureClient as $crate::client::future::Connect>::connect(addr_,
options_);
let client_ = reactor_.run(client_)?;
::std::result::Result::Ok(SyncClient {
inner: client_,
reactor: reactor_,
})
}
}
@@ -564,20 +577,17 @@ macro_rules! service {
$(
#[allow(unused)]
$(#[$attr])*
pub fn $fn_name(&self, $($arg: $in_),*)
pub fn $fn_name(&mut self, $($arg: $in_),*)
-> ::std::result::Result<$out, $crate::Error<$error>>
{
// Wrapped in a lazy future to ensure execution occurs when a task is present.
$crate::futures::Future::wait($crate::futures::future::lazy(move || {
(self.0).$fn_name($($arg),*)
}))
self.reactor.run(self.inner.$fn_name($($arg),*))
}
)*
}
#[allow(non_camel_case_types)]
type tarpc_service_Client__ =
$crate::client::Client<tarpc_service_Request__,
$crate::client::future::Client<tarpc_service_Request__,
tarpc_service_Response__,
tarpc_service_Error__>;
@@ -629,10 +639,8 @@ macro_rules! service {
-> $crate::futures::future::Then<
<tarpc_service_Client__ as $crate::tokio_service::Service>::Future,
::std::result::Result<$out, $crate::Error<$error>>,
fn(::std::result::Result<
::std::result::Result<tarpc_service_Response__,
$crate::Error<tarpc_service_Error__>>,
::std::io::Error>)
fn(::std::result::Result<tarpc_service_Response__,
$crate::Error<tarpc_service_Error__>>)
-> ::std::result::Result<$out, $crate::Error<$error>>> {
let tarpc_service_req__ = tarpc_service_Request__::$fn_name(($($arg,)*));
@@ -641,12 +649,10 @@ macro_rules! service {
return $crate::futures::Future::then(tarpc_service_fut__, then__);
fn then__(tarpc_service_msg__:
::std::result::Result<
::std::result::Result<tarpc_service_Response__,
$crate::Error<tarpc_service_Error__>>,
::std::io::Error>)
$crate::Error<tarpc_service_Error__>>)
-> ::std::result::Result<$out, $crate::Error<$error>> {
match tarpc_service_msg__? {
match tarpc_service_msg__ {
::std::result::Result::Ok(tarpc_service_msg__) => {
if let tarpc_service_Response__::$fn_name(tarpc_service_msg__) =
tarpc_service_msg__
@@ -895,7 +901,7 @@ mod functional_test {
fn simple() {
let _ = env_logger::init();
let (_, client) = start_server_with_sync_client::<SyncClient, Server>(Server);
let client = unwrap!(client);
let mut client = unwrap!(client);
assert_eq!(3, client.add(1, 2).unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
}
@@ -905,7 +911,7 @@ mod functional_test {
let _ = env_logger::init();
let (_, client) = start_server_with_sync_client::<super::other_service::SyncClient,
Server>(Server);
let client = client.expect("Could not connect!");
let mut client = client.expect("Could not connect!");
match client.foo().err().expect("failed unwrap") {
::Error::ServerSerialize(_) => {} // good
bad => panic!("Expected Error::ServerSerialize but got {}", bad),
@@ -972,16 +978,17 @@ mod functional_test {
use util::FirstSocketAddr;
use server;
use super::FutureServiceExt;
let _ = env_logger::init();
let addr = Server.listen("localhost:0".first_socket_addr(), server::Options::default())
let addr = Server.listen("localhost:0".first_socket_addr(),
server::Options::default())
.wait()
.unwrap();
Server.listen(addr, server::Options::default())
.wait()
.unwrap();
}
#[cfg(feature = "tls")]
#[test]
fn tcp_and_tls() {
@@ -1044,7 +1051,7 @@ mod functional_test {
.wait()
.unwrap();
let client = get_sync_client::<SyncClient>(addr).unwrap();
let mut client = get_sync_client::<SyncClient>(addr).unwrap();
match client.bar().err().unwrap() {
::Error::App(e) => {
assert_eq!(e.description(), "lol jk");