mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-01-05 02:55:38 +01:00
Update to futures 0.3.0-alpha.13 (#211)
This commit is contained in:
@@ -52,7 +52,7 @@ Here's a small service.
|
||||
|
||||
|
||||
use futures::{
|
||||
compat::TokioDefaultSpawner,
|
||||
compat::Executor01CompatExt,
|
||||
future::{self, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
@@ -121,7 +121,7 @@ async fn run() -> io::Result<()> {
|
||||
}
|
||||
|
||||
fn main() {
|
||||
tarpc::init(TokioDefaultSpawner);
|
||||
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
tokio::run(run()
|
||||
.map_err(|e| eprintln!("Oh no: {}", e))
|
||||
.boxed()
|
||||
|
||||
@@ -23,10 +23,10 @@ async-bincode = "0.4"
|
||||
tokio-tcp = "0.1"
|
||||
|
||||
[target.'cfg(not(test))'.dependencies]
|
||||
futures-preview = { version = "0.3.0-alpha.12", features = ["compat"] }
|
||||
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] }
|
||||
|
||||
[dev-dependencies]
|
||||
futures-preview = { version = "0.3.0-alpha.12", features = ["compat", "tokio-compat"] }
|
||||
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] }
|
||||
env_logger = "0.6"
|
||||
humantime = "1.0"
|
||||
log = "0.4"
|
||||
|
||||
@@ -8,7 +8,7 @@ use futures_legacy::{
|
||||
};
|
||||
use std::{
|
||||
pin::Pin,
|
||||
task::{self, LocalWaker, Poll},
|
||||
task::{self, Poll, Waker},
|
||||
};
|
||||
|
||||
/// A shim to convert a 0.1 Sink + Stream to a 0.3 Sink + Stream.
|
||||
@@ -44,7 +44,7 @@ where
|
||||
{
|
||||
type Item = Result<S::Item, S::Error>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<Self::Item>> {
|
||||
fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<Self::Item>> {
|
||||
unsafe {
|
||||
let inner = &mut Pin::get_unchecked_mut(self).inner;
|
||||
let mut compat = inner.compat();
|
||||
@@ -72,7 +72,7 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
|
||||
let notify = &WakerToHandle(waker);
|
||||
|
||||
executor01::with_notify(notify, 0, move || {
|
||||
@@ -91,7 +91,7 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
|
||||
let notify = &WakerToHandle(waker);
|
||||
|
||||
executor01::with_notify(notify, 0, move || {
|
||||
@@ -104,7 +104,7 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
|
||||
let notify = &WakerToHandle(waker);
|
||||
|
||||
executor01::with_notify(notify, 0, move || {
|
||||
@@ -119,7 +119,7 @@ where
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct WakerToHandle<'a>(&'a LocalWaker);
|
||||
struct WakerToHandle<'a>(&'a Waker);
|
||||
|
||||
#[derive(Debug)]
|
||||
struct NotifyWaker(task::Waker);
|
||||
@@ -145,6 +145,6 @@ unsafe impl UnsafeNotify01 for NotifyWaker {
|
||||
|
||||
impl<'a> From<WakerToHandle<'a>> for NotifyHandle01 {
|
||||
fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 {
|
||||
unsafe { NotifyWaker(handle.0.clone().into_waker()).clone_raw() }
|
||||
unsafe { NotifyWaker(handle.0.clone()).clone_raw() }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ use std::{
|
||||
marker::PhantomData,
|
||||
net::SocketAddr,
|
||||
pin::Pin,
|
||||
task::{LocalWaker, Poll},
|
||||
task::{Poll, Waker},
|
||||
};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_tcp::{TcpListener, TcpStream};
|
||||
@@ -57,7 +57,7 @@ where
|
||||
{
|
||||
type Item = io::Result<Item>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<io::Result<Item>>> {
|
||||
fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<io::Result<Item>>> {
|
||||
match self.inner().poll_next(waker) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
@@ -83,15 +83,15 @@ where
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
|
||||
}
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<()>> {
|
||||
convert(self.inner().poll_ready(waker))
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<()>> {
|
||||
convert(self.inner().poll_flush(waker))
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<()>> {
|
||||
convert(self.inner().poll_close(waker))
|
||||
}
|
||||
}
|
||||
@@ -189,7 +189,7 @@ where
|
||||
{
|
||||
type Item = io::Result<Transport<TcpStream, Item, SinkItem>>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<Self::Item>> {
|
||||
fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<Self::Item>> {
|
||||
let next = ready!(self.incoming().poll_next(waker)?);
|
||||
Poll::Ready(next.map(|conn| Ok(new(conn))))
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
extern crate test;
|
||||
|
||||
use self::test::stats::Stats;
|
||||
use futures::{compat::TokioDefaultSpawner, prelude::*};
|
||||
use futures::{compat::Executor01CompatExt, prelude::*};
|
||||
use rpc::{
|
||||
client, context,
|
||||
server::{Handler, Server},
|
||||
@@ -101,7 +101,7 @@ async fn bench() -> io::Result<()> {
|
||||
#[test]
|
||||
fn bench_small_packet() -> io::Result<()> {
|
||||
env_logger::init();
|
||||
rpc::init(TokioDefaultSpawner);
|
||||
rpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
tokio::run(bench().map_err(|e| panic!(e.to_string())).boxed().compat());
|
||||
println!("done");
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
#![feature(generators, await_macro, async_await, futures_api)]
|
||||
|
||||
use futures::{
|
||||
compat::{Future01CompatExt, TokioDefaultSpawner},
|
||||
compat::{Executor01CompatExt, Future01CompatExt},
|
||||
prelude::*,
|
||||
stream,
|
||||
};
|
||||
@@ -136,7 +136,7 @@ async fn run() -> io::Result<()> {
|
||||
#[test]
|
||||
fn cancel_slower() -> io::Result<()> {
|
||||
env_logger::init();
|
||||
rpc::init(TokioDefaultSpawner);
|
||||
rpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
tokio::run(run().boxed().map_err(|e| panic!(e)).compat());
|
||||
Ok(())
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
#![feature(generators, await_macro, async_await, futures_api)]
|
||||
|
||||
use futures::{
|
||||
compat::{Future01CompatExt, TokioDefaultSpawner},
|
||||
compat::{Executor01CompatExt, Future01CompatExt},
|
||||
prelude::*,
|
||||
};
|
||||
use log::{error, info, trace};
|
||||
@@ -105,7 +105,7 @@ async fn run() -> io::Result<()> {
|
||||
#[test]
|
||||
fn ping_pong() -> io::Result<()> {
|
||||
env_logger::init();
|
||||
rpc::init(TokioDefaultSpawner);
|
||||
rpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
tokio::run(
|
||||
run()
|
||||
|
||||
@@ -15,7 +15,7 @@ description = "An example server built on tarpc."
|
||||
[dependencies]
|
||||
bincode-transport = { package = "tarpc-bincode-transport", version = "0.3", path = "../bincode-transport" }
|
||||
clap = "2.0"
|
||||
futures-preview = { version = "0.3.0-alpha.12", features = ["compat", "tokio-compat"] }
|
||||
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] }
|
||||
serde = { version = "1.0" }
|
||||
tarpc = { version = "0.14", path = "../tarpc", features = ["serde1"] }
|
||||
tokio = "0.1"
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
#![feature(futures_api, arbitrary_self_types, await_macro, async_await)]
|
||||
|
||||
use clap::{App, Arg};
|
||||
use futures::{compat::TokioDefaultSpawner, prelude::*};
|
||||
use futures::{compat::Executor01CompatExt, prelude::*};
|
||||
use std::{io, net::SocketAddr};
|
||||
use tarpc::{client, context};
|
||||
|
||||
@@ -53,7 +53,7 @@ fn main() {
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
tarpc::init(TokioDefaultSpawner);
|
||||
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
let server_addr = flags.value_of("server_addr").unwrap();
|
||||
let server_addr = server_addr
|
||||
@@ -62,7 +62,7 @@ fn main() {
|
||||
|
||||
let name = flags.value_of("name").unwrap();
|
||||
|
||||
tarpc::init(TokioDefaultSpawner);
|
||||
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
tokio::run(
|
||||
run(server_addr, name.into())
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
|
||||
use clap::{App, Arg};
|
||||
use futures::{
|
||||
compat::TokioDefaultSpawner,
|
||||
compat::Executor01CompatExt,
|
||||
future::{self, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
@@ -73,7 +73,7 @@ fn main() {
|
||||
.parse()
|
||||
.unwrap_or_else(|e| panic!(r#"--port value "{}" invalid: {}"#, port, e));
|
||||
|
||||
tarpc::init(TokioDefaultSpawner);
|
||||
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
tokio::run(
|
||||
run(([0, 0, 0, 0], port).into())
|
||||
|
||||
@@ -27,10 +27,10 @@ trace = { package = "tarpc-trace", version = "0.1", path = "../trace" }
|
||||
serde = { optional = true, version = "1.0" }
|
||||
|
||||
[target.'cfg(not(test))'.dependencies]
|
||||
futures-preview = { version = "0.3.0-alpha.12", features = ["compat"] }
|
||||
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] }
|
||||
|
||||
[dev-dependencies]
|
||||
futures-preview = { version = "0.3.0-alpha.12", features = ["compat", "tokio-compat"] }
|
||||
futures-test-preview = { version = "0.3.0-alpha.12" }
|
||||
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] }
|
||||
futures-test-preview = { version = "0.3.0-alpha.13" }
|
||||
env_logger = "0.6"
|
||||
tokio = "0.1"
|
||||
|
||||
@@ -15,7 +15,7 @@ use futures::{
|
||||
prelude::*,
|
||||
ready,
|
||||
stream::Fuse,
|
||||
task::LocalWaker,
|
||||
task::Waker,
|
||||
Poll,
|
||||
};
|
||||
use humantime::format_rfc3339;
|
||||
@@ -82,8 +82,8 @@ impl<'a, Req, Resp> Send<'a, Req, Resp> {
|
||||
impl<'a, Req, Resp> Future for Send<'a, Req, Resp> {
|
||||
type Output = io::Result<DispatchResponse<Resp>>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
self.as_mut().fut().poll(lw)
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
|
||||
self.as_mut().fut().poll(waker)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,8 +101,8 @@ impl<'a, Req, Resp> Call<'a, Req, Resp> {
|
||||
impl<'a, Req, Resp> Future for Call<'a, Req, Resp> {
|
||||
type Output = io::Result<Resp>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
self.as_mut().fut().poll(lw)
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
|
||||
self.as_mut().fut().poll(waker)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -177,7 +177,7 @@ impl<Resp> DispatchResponse<Resp> {
|
||||
impl<Resp> Future for DispatchResponse<Resp> {
|
||||
type Output = io::Result<Resp>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<Resp>> {
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<Resp>> {
|
||||
let resp = ready!(self.response.poll_unpin(waker));
|
||||
|
||||
self.complete = true;
|
||||
@@ -317,7 +317,7 @@ where
|
||||
unsafe_pinned!(pending_requests: Fuse<mpsc::Receiver<DispatchRequest<Req, Resp>>>);
|
||||
unsafe_pinned!(transport: Fuse<C>);
|
||||
|
||||
fn pump_read(self: &mut Pin<&mut Self>, waker: &LocalWaker) -> PollIo<()> {
|
||||
fn pump_read(self: &mut Pin<&mut Self>, waker: &Waker) -> PollIo<()> {
|
||||
Poll::Ready(match ready!(self.as_mut().transport().poll_next(waker)?) {
|
||||
Some(response) => {
|
||||
self.complete(response);
|
||||
@@ -330,7 +330,7 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
fn pump_write(self: &mut Pin<&mut Self>, waker: &LocalWaker) -> PollIo<()> {
|
||||
fn pump_write(self: &mut Pin<&mut Self>, waker: &Waker) -> PollIo<()> {
|
||||
enum ReceiverStatus {
|
||||
NotReady,
|
||||
Closed,
|
||||
@@ -373,7 +373,7 @@ where
|
||||
/// Yields the next pending request, if one is ready to be sent.
|
||||
fn poll_next_request(
|
||||
self: &mut Pin<&mut Self>,
|
||||
waker: &LocalWaker,
|
||||
waker: &Waker,
|
||||
) -> PollIo<DispatchRequest<Req, Resp>> {
|
||||
if self.as_mut().in_flight_requests().len() >= self.config.max_in_flight_requests {
|
||||
info!(
|
||||
@@ -416,7 +416,7 @@ where
|
||||
/// Yields the next pending cancellation, and, if one is ready, cancels the associated request.
|
||||
fn poll_next_cancellation(
|
||||
self: &mut Pin<&mut Self>,
|
||||
waker: &LocalWaker,
|
||||
waker: &Waker,
|
||||
) -> PollIo<(context::Context, u64)> {
|
||||
while let Poll::Pending = self.as_mut().transport().poll_ready(waker)? {
|
||||
ready!(self.as_mut().transport().poll_flush(waker)?);
|
||||
@@ -530,7 +530,7 @@ where
|
||||
{
|
||||
type Output = io::Result<()>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<()>> {
|
||||
trace!("[{}] RequestDispatch::poll", self.as_mut().server_addr());
|
||||
loop {
|
||||
match (self.pump_read(waker)?, self.pump_write(waker)?) {
|
||||
@@ -620,7 +620,7 @@ impl RequestCancellation {
|
||||
impl Stream for CanceledRequests {
|
||||
type Item = u64;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<u64>> {
|
||||
fn poll_next(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<u64>> {
|
||||
self.0.poll_next_unpin(waker)
|
||||
}
|
||||
}
|
||||
@@ -652,8 +652,8 @@ where
|
||||
{
|
||||
type Output = io::Result<Fut::Ok>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
match self.as_mut().future().try_poll(lw) {
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
|
||||
match self.as_mut().future().try_poll(waker) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(result) => {
|
||||
self.finished().take().expect(
|
||||
@@ -692,8 +692,8 @@ where
|
||||
{
|
||||
type Output = Result<DispatchResponse<Resp>, Fut::Error>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
match self.as_mut().future().try_poll(lw) {
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
|
||||
match self.as_mut().future().try_poll(waker) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(result) => {
|
||||
let response = self
|
||||
@@ -735,8 +735,8 @@ where
|
||||
{
|
||||
type Output = Result<Fut2::Ok, Fut2::Error>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
self.try_chain().poll(lw, |result| match result {
|
||||
fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
|
||||
self.try_chain().poll(waker, |result| match result {
|
||||
Ok(ok) => TryChainAction::Future(ok),
|
||||
Err(err) => TryChainAction::Output(Err(err)),
|
||||
})
|
||||
@@ -768,7 +768,7 @@ where
|
||||
TryChain::First(fut1)
|
||||
}
|
||||
|
||||
fn poll<F>(self: Pin<&mut Self>, lw: &LocalWaker, f: F) -> Poll<Result<Fut2::Ok, Fut2::Error>>
|
||||
fn poll<F>(self: Pin<&mut Self>, waker: &Waker, f: F) -> Poll<Result<Fut2::Ok, Fut2::Error>>
|
||||
where
|
||||
F: FnOnce(Result<Fut1::Ok, Fut1::Error>) -> TryChainAction<Fut2>,
|
||||
{
|
||||
@@ -781,14 +781,14 @@ where
|
||||
let output = match this {
|
||||
TryChain::First(fut1) => {
|
||||
// Poll the first future
|
||||
match unsafe { Pin::new_unchecked(fut1) }.try_poll(lw) {
|
||||
match unsafe { Pin::new_unchecked(fut1) }.try_poll(waker) {
|
||||
Poll::Pending => return Poll::Pending,
|
||||
Poll::Ready(output) => output,
|
||||
}
|
||||
}
|
||||
TryChain::Second(fut2) => {
|
||||
// Poll the second future
|
||||
return unsafe { Pin::new_unchecked(fut2) }.try_poll(lw);
|
||||
return unsafe { Pin::new_unchecked(fut2) }.try_poll(waker);
|
||||
}
|
||||
TryChain::Empty => {
|
||||
panic!("future must not be polled after it returned `Poll::Ready`");
|
||||
@@ -816,7 +816,7 @@ mod tests {
|
||||
};
|
||||
use fnv::FnvHashMap;
|
||||
use futures::{channel::mpsc, prelude::*, Poll};
|
||||
use futures_test::task::noop_local_waker_ref;
|
||||
use futures_test::task::noop_waker_ref;
|
||||
use std::{
|
||||
marker,
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr},
|
||||
@@ -839,7 +839,7 @@ mod tests {
|
||||
);
|
||||
|
||||
let mut dispatch = Pin::new(&mut dispatch);
|
||||
let waker = &noop_local_waker_ref();
|
||||
let waker = &noop_waker_ref();
|
||||
|
||||
let req = dispatch.poll_next_request(waker).ready();
|
||||
assert!(req.is_some());
|
||||
@@ -866,7 +866,7 @@ mod tests {
|
||||
drop(channel);
|
||||
|
||||
let mut dispatch = Pin::new(&mut dispatch);
|
||||
let waker = &noop_local_waker_ref();
|
||||
let waker = &noop_waker_ref();
|
||||
|
||||
dispatch.poll_next_cancellation(waker).unwrap();
|
||||
assert!(dispatch.poll_next_request(waker).ready().is_none());
|
||||
@@ -890,7 +890,7 @@ mod tests {
|
||||
drop(channel);
|
||||
|
||||
let mut dispatch = Pin::new(&mut dispatch);
|
||||
let waker = &noop_local_waker_ref();
|
||||
let waker = &noop_waker_ref();
|
||||
assert!(dispatch.poll_next_request(waker).ready().is_none());
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ use futures::{
|
||||
prelude::*,
|
||||
ready,
|
||||
stream::Fuse,
|
||||
task::{LocalWaker, Poll},
|
||||
task::{Poll, Waker},
|
||||
};
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use pin_utils::unsafe_pinned;
|
||||
@@ -197,10 +197,7 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_listener<C>(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
) -> PollIo<NewConnection<Req, Resp, C>>
|
||||
fn poll_listener<C>(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<NewConnection<Req, Resp, C>>
|
||||
where
|
||||
S: Stream<Item = Result<C, io::Error>>,
|
||||
C: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
|
||||
@@ -211,7 +208,7 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_closed_connections(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
fn poll_closed_connections(self: &mut Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
|
||||
match ready!(self.as_mut().closed_connections_rx().poll_next_unpin(cx)) {
|
||||
Some(addr) => {
|
||||
self.handle_closed_connection(&addr);
|
||||
@@ -229,7 +226,7 @@ where
|
||||
{
|
||||
type Item = io::Result<Channel<Req, Resp, T>>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &LocalWaker) -> PollIo<Channel<Req, Resp, T>> {
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<Channel<Req, Resp, T>> {
|
||||
loop {
|
||||
match (
|
||||
self.as_mut().poll_listener(cx)?,
|
||||
|
||||
@@ -17,7 +17,7 @@ use futures::{
|
||||
prelude::*,
|
||||
ready,
|
||||
stream::Fuse,
|
||||
task::{LocalWaker, Poll},
|
||||
task::{Poll, Waker},
|
||||
try_ready,
|
||||
};
|
||||
use humantime::format_rfc3339;
|
||||
@@ -133,7 +133,7 @@ where
|
||||
{
|
||||
type Output = ();
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<()> {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<()> {
|
||||
while let Some(channel) = ready!(self.as_mut().incoming().poll_next(cx)) {
|
||||
match channel {
|
||||
Ok(channel) => {
|
||||
@@ -234,18 +234,15 @@ where
|
||||
self.as_mut().transport().start_send(response)
|
||||
}
|
||||
|
||||
pub(crate) fn poll_ready(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
pub(crate) fn poll_ready(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
|
||||
self.as_mut().transport().poll_ready(cx)
|
||||
}
|
||||
|
||||
pub(crate) fn poll_flush(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
pub(crate) fn poll_flush(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
|
||||
self.as_mut().transport().poll_flush(cx)
|
||||
}
|
||||
|
||||
pub(crate) fn poll_next(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
) -> PollIo<ClientMessage<Req>> {
|
||||
pub(crate) fn poll_next(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<ClientMessage<Req>> {
|
||||
self.as_mut().transport().poll_next(cx)
|
||||
}
|
||||
|
||||
@@ -313,7 +310,7 @@ where
|
||||
{
|
||||
/// If at max in-flight requests, check that there's room to immediately write a throttled
|
||||
/// response.
|
||||
fn poll_ready_if_throttling(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
fn poll_ready_if_throttling(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
|
||||
if self.in_flight_requests.len()
|
||||
>= self.channel.config.max_in_flight_requests_per_connection
|
||||
{
|
||||
@@ -331,7 +328,7 @@ where
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn pump_read(mut self: Pin<&mut Self>, cx: &LocalWaker) -> PollIo<()> {
|
||||
fn pump_read(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<()> {
|
||||
ready!(self.as_mut().poll_ready_if_throttling(cx)?);
|
||||
|
||||
Poll::Ready(match ready!(self.as_mut().channel().poll_next(cx)?) {
|
||||
@@ -353,7 +350,7 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
fn pump_write(mut self: Pin<&mut Self>, cx: &LocalWaker, read_half_closed: bool) -> PollIo<()> {
|
||||
fn pump_write(mut self: Pin<&mut Self>, cx: &Waker, read_half_closed: bool) -> PollIo<()> {
|
||||
match self.as_mut().poll_next_response(cx)? {
|
||||
Poll::Ready(Some((_, response))) => {
|
||||
self.as_mut().channel().start_send(response)?;
|
||||
@@ -382,7 +379,7 @@ where
|
||||
|
||||
fn poll_next_response(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
cx: &Waker,
|
||||
) -> PollIo<(Context, Response<Resp>)> {
|
||||
// Ensure there's room to write a response.
|
||||
while let Poll::Pending = self.as_mut().channel().poll_ready(cx)? {
|
||||
@@ -535,7 +532,7 @@ where
|
||||
{
|
||||
type Output = io::Result<()>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
|
||||
trace!("[{}] ClientHandler::poll", self.channel.client_addr);
|
||||
loop {
|
||||
let read = self.as_mut().pump_read(cx)?;
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
//! Transports backed by in-memory channels.
|
||||
|
||||
use crate::{PollIo, Transport};
|
||||
use futures::{channel::mpsc, task::LocalWaker, Poll, Sink, Stream};
|
||||
use futures::{channel::mpsc, task::Waker, Poll, Sink, Stream};
|
||||
use pin_utils::unsafe_pinned;
|
||||
use std::pin::Pin;
|
||||
use std::{
|
||||
@@ -45,7 +45,7 @@ impl<Item, SinkItem> UnboundedChannel<Item, SinkItem> {
|
||||
impl<Item, SinkItem> Stream for UnboundedChannel<Item, SinkItem> {
|
||||
type Item = Result<Item, io::Error>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &LocalWaker) -> PollIo<Item> {
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &Waker) -> PollIo<Item> {
|
||||
self.rx().poll_next(cx).map(|option| option.map(Ok))
|
||||
}
|
||||
}
|
||||
@@ -54,7 +54,7 @@ impl<Item, SinkItem> Sink for UnboundedChannel<Item, SinkItem> {
|
||||
type SinkItem = SinkItem;
|
||||
type SinkError = io::Error;
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
|
||||
self.tx()
|
||||
.poll_ready(cx)
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
|
||||
@@ -66,13 +66,13 @@ impl<Item, SinkItem> Sink for UnboundedChannel<Item, SinkItem> {
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<Result<(), Self::SinkError>> {
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &Waker) -> Poll<Result<(), Self::SinkError>> {
|
||||
self.tx()
|
||||
.poll_flush(cx)
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> {
|
||||
self.tx()
|
||||
.poll_close(cx)
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
|
||||
@@ -99,14 +99,15 @@ mod tests {
|
||||
server::{Handler, Server},
|
||||
transport,
|
||||
};
|
||||
use futures::{compat::TokioDefaultSpawner, prelude::*, stream};
|
||||
use futures::compat::Executor01CompatExt;
|
||||
use futures::{prelude::*, stream};
|
||||
use log::trace;
|
||||
use std::io;
|
||||
|
||||
#[test]
|
||||
fn integration() {
|
||||
let _ = env_logger::try_init();
|
||||
crate::init(TokioDefaultSpawner);
|
||||
crate::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
let (client_channel, server_channel) = transport::channel::unbounded();
|
||||
let server = Server::<String, u64>::default()
|
||||
|
||||
@@ -14,7 +14,7 @@ use std::{
|
||||
io,
|
||||
net::SocketAddr,
|
||||
pin::Pin,
|
||||
task::{LocalWaker, Poll},
|
||||
task::{Poll, Waker},
|
||||
};
|
||||
|
||||
pub mod channel;
|
||||
@@ -71,7 +71,7 @@ where
|
||||
{
|
||||
type Item = S::Item;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<S::Item>> {
|
||||
fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<S::Item>> {
|
||||
self.inner().poll_next(waker)
|
||||
}
|
||||
}
|
||||
@@ -87,15 +87,15 @@ where
|
||||
self.inner().start_send(item)
|
||||
}
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
|
||||
self.inner().poll_ready(waker)
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
|
||||
self.inner().poll_flush(waker)
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
|
||||
self.inner().poll_close(waker)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ use futures::{
|
||||
compat::{Compat01As03, Future01CompatExt},
|
||||
prelude::*,
|
||||
ready,
|
||||
task::{LocalWaker, Poll},
|
||||
task::{Poll, Waker},
|
||||
};
|
||||
use pin_utils::unsafe_pinned;
|
||||
use std::pin::Pin;
|
||||
@@ -50,7 +50,7 @@ where
|
||||
{
|
||||
type Output = Result<T::Ok, timeout::Error<T::Error>>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Self::Output> {
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
|
||||
// First, try polling the future
|
||||
match self.as_mut().future().try_poll(waker) {
|
||||
Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)),
|
||||
|
||||
@@ -25,13 +25,13 @@ tarpc-plugins = { path = "../plugins", version = "0.5.0" }
|
||||
rpc = { package = "tarpc-lib", path = "../rpc", version = "0.2" }
|
||||
|
||||
[target.'cfg(not(test))'.dependencies]
|
||||
futures-preview = "0.3.0-alpha.12"
|
||||
futures-preview = "0.3.0-alpha.13"
|
||||
|
||||
[dev-dependencies]
|
||||
bincode = "1.0"
|
||||
bytes = { version = "0.4", features = ["serde"] }
|
||||
humantime = "1.0"
|
||||
futures-preview = { version = "0.3.0-alpha.12", features = ["compat", "tokio-compat"] }
|
||||
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] }
|
||||
bincode-transport = { package = "tarpc-bincode-transport", version = "0.3", path = "../bincode-transport" }
|
||||
env_logger = "0.6"
|
||||
tokio = "0.1"
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
)]
|
||||
|
||||
use futures::{
|
||||
compat::TokioDefaultSpawner,
|
||||
compat::Executor01CompatExt,
|
||||
future::{self, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
@@ -82,7 +82,7 @@ async fn run() -> io::Result<()> {
|
||||
}
|
||||
|
||||
fn main() {
|
||||
tarpc::init(TokioDefaultSpawner);
|
||||
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
tokio::run(
|
||||
run()
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
|
||||
use crate::{add::Service as AddService, double::Service as DoubleService};
|
||||
use futures::{
|
||||
compat::TokioDefaultSpawner,
|
||||
compat::Executor01CompatExt,
|
||||
future::{self, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
@@ -102,6 +102,6 @@ async fn run() -> io::Result<()> {
|
||||
|
||||
fn main() {
|
||||
env_logger::init();
|
||||
tarpc::init(TokioDefaultSpawner);
|
||||
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
tokio::run(run().map_err(|e| panic!(e)).boxed().compat());
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ mod registry {
|
||||
io,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{LocalWaker, Poll},
|
||||
task::{Poll, Waker},
|
||||
};
|
||||
use tarpc::{
|
||||
client::{self, Client},
|
||||
@@ -213,7 +213,7 @@ mod registry {
|
||||
{
|
||||
type Output = Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Output> {
|
||||
fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Output> {
|
||||
unsafe {
|
||||
match Pin::get_unchecked_mut(self) {
|
||||
Either::Left(car) => Pin::new_unchecked(car).poll(waker),
|
||||
@@ -239,6 +239,7 @@ mod registry {
|
||||
// Example
|
||||
use bytes::Bytes;
|
||||
use futures::{
|
||||
compat::Executor01CompatExt,
|
||||
future::{ready, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
@@ -408,6 +409,6 @@ async fn run() -> io::Result<()> {
|
||||
}
|
||||
|
||||
fn main() {
|
||||
tarpc::init(futures::compat::TokioDefaultSpawner);
|
||||
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
tokio::run(run().boxed().map_err(|e| panic!(e)).boxed().compat());
|
||||
}
|
||||
|
||||
@@ -177,7 +177,7 @@ macro_rules! service {
|
||||
impl<S: Service> ::std::future::Future for ResponseFut<S> {
|
||||
type Output = ::std::io::Result<Response>;
|
||||
|
||||
fn poll(self: ::std::pin::Pin<&mut Self>, waker: &::std::task::LocalWaker)
|
||||
fn poll(self: ::std::pin::Pin<&mut Self>, waker: &::std::task::Waker)
|
||||
-> ::std::task::Poll<::std::io::Result<Response>>
|
||||
{
|
||||
unsafe {
|
||||
@@ -282,7 +282,7 @@ mod syntax_test {
|
||||
#[cfg(test)]
|
||||
mod functional_test {
|
||||
use futures::{
|
||||
compat::TokioDefaultSpawner,
|
||||
compat::Executor01CompatExt,
|
||||
future::{ready, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
@@ -315,7 +315,7 @@ mod functional_test {
|
||||
#[test]
|
||||
fn sequential() {
|
||||
let _ = env_logger::try_init();
|
||||
rpc::init(TokioDefaultSpawner);
|
||||
rpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
let test = async {
|
||||
let (tx, rx) = channel::unbounded();
|
||||
@@ -344,7 +344,7 @@ mod functional_test {
|
||||
#[test]
|
||||
fn concurrent() {
|
||||
let _ = env_logger::try_init();
|
||||
rpc::init(TokioDefaultSpawner);
|
||||
rpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
let test = async {
|
||||
let (tx, rx) = channel::unbounded();
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
extern crate test;
|
||||
|
||||
use self::test::stats::Stats;
|
||||
use futures::{compat::TokioDefaultSpawner, future, prelude::*};
|
||||
use futures::{compat::Executor01CompatExt, future, prelude::*};
|
||||
use rpc::{
|
||||
client, context,
|
||||
server::{Handler, Server},
|
||||
@@ -119,7 +119,7 @@ async fn bench() -> io::Result<()> {
|
||||
#[test]
|
||||
fn bench_small_packet() {
|
||||
env_logger::init();
|
||||
tarpc::init(TokioDefaultSpawner);
|
||||
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
|
||||
|
||||
tokio::run(bench().map_err(|e| panic!(e.to_string())).boxed().compat())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user