mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-01-01 00:51:25 +01:00
@@ -15,7 +15,7 @@ description = "A bincode-based transport for tarpc services."
|
||||
[dependencies]
|
||||
bincode = { version = "1.0", features = ["i128"] }
|
||||
futures_legacy = { version = "0.1", package = "futures" }
|
||||
pin-utils = "0.1.0-alpha.3"
|
||||
pin-utils = "0.1.0-alpha.4"
|
||||
rpc = { package = "tarpc-lib", version = "0.2", path = "../rpc", features = ["serde1"] }
|
||||
serde = "1.0"
|
||||
tokio-io = "0.1"
|
||||
@@ -23,16 +23,16 @@ async-bincode = "0.4"
|
||||
tokio-tcp = "0.1"
|
||||
|
||||
[target.'cfg(not(test))'.dependencies]
|
||||
futures-preview = { version = "0.3.0-alpha.9", features = ["compat"] }
|
||||
futures-preview = { version = "0.3.0-alpha.12", features = ["compat"] }
|
||||
|
||||
[dev-dependencies]
|
||||
futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] }
|
||||
env_logger = "0.5"
|
||||
futures-preview = { version = "0.3.0-alpha.12", features = ["compat", "tokio-compat"] }
|
||||
env_logger = "0.6"
|
||||
humantime = "1.0"
|
||||
log = "0.4"
|
||||
rand = "0.5"
|
||||
rand = "0.6"
|
||||
tokio = "0.1"
|
||||
tokio-executor = "0.1"
|
||||
tokio-reactor = "0.1"
|
||||
tokio-serde = "0.2"
|
||||
tokio-serde = "0.3"
|
||||
tokio-timer = "0.2"
|
||||
|
||||
@@ -46,7 +46,7 @@ where
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<Self::Item>> {
|
||||
unsafe {
|
||||
let inner = &mut Pin::get_mut_unchecked(self).inner;
|
||||
let inner = &mut Pin::get_unchecked_mut(self).inner;
|
||||
let mut compat = inner.compat();
|
||||
let compat = Pin::new_unchecked(&mut compat);
|
||||
match ready!(compat.poll_next(waker)) {
|
||||
@@ -66,7 +66,7 @@ where
|
||||
type SinkError = S::SinkError;
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> Result<(), S::SinkError> {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
let me = unsafe { Pin::get_unchecked_mut(self) };
|
||||
assert!(me.staged_item.is_none());
|
||||
me.staged_item = Some(item);
|
||||
Ok(())
|
||||
@@ -76,7 +76,7 @@ where
|
||||
let notify = &WakerToHandle(waker);
|
||||
|
||||
executor01::with_notify(notify, 0, move || {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
let me = unsafe { Pin::get_unchecked_mut(self) };
|
||||
match me.staged_item.take() {
|
||||
Some(staged_item) => match me.inner.start_send(staged_item) {
|
||||
Ok(AsyncSink01::Ready) => Poll::Ready(Ok(())),
|
||||
@@ -95,7 +95,7 @@ where
|
||||
let notify = &WakerToHandle(waker);
|
||||
|
||||
executor01::with_notify(notify, 0, move || {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
let me = unsafe { Pin::get_unchecked_mut(self) };
|
||||
match me.inner.poll_complete() {
|
||||
Ok(Async01::Ready(())) => Poll::Ready(Ok(())),
|
||||
Ok(Async01::NotReady) => Poll::Pending,
|
||||
@@ -108,7 +108,7 @@ where
|
||||
let notify = &WakerToHandle(waker);
|
||||
|
||||
executor01::with_notify(notify, 0, move || {
|
||||
let me = unsafe { Pin::get_mut_unchecked(self) };
|
||||
let me = unsafe { Pin::get_unchecked_mut(self) };
|
||||
match me.inner.close() {
|
||||
Ok(Async01::Ready(())) => Poll::Ready(Ok(())),
|
||||
Ok(Async01::NotReady) => Poll::Pending,
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
//! A TCP [`Transport`] that serializes as bincode.
|
||||
|
||||
#![feature(futures_api, pin, arbitrary_self_types, await_macro, async_await)]
|
||||
#![feature(futures_api, arbitrary_self_types, await_macro, async_await)]
|
||||
#![deny(missing_docs, missing_debug_implementations)]
|
||||
|
||||
use self::compat::Compat;
|
||||
@@ -57,7 +57,7 @@ where
|
||||
{
|
||||
type Item = io::Result<Item>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<io::Result<Item>>> {
|
||||
fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<io::Result<Item>>> {
|
||||
match self.inner().poll_next(waker) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
@@ -77,21 +77,21 @@ where
|
||||
type SinkItem = SinkItem;
|
||||
type SinkError = io::Error;
|
||||
|
||||
fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
|
||||
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
|
||||
self.inner()
|
||||
.start_send(item)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
|
||||
}
|
||||
|
||||
fn poll_ready(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
fn poll_ready(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
convert(self.inner().poll_ready(waker))
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
fn poll_flush(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
convert(self.inner().poll_flush(waker))
|
||||
}
|
||||
|
||||
fn poll_close(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
fn poll_close(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
convert(self.inner().poll_close(waker))
|
||||
}
|
||||
}
|
||||
@@ -189,7 +189,7 @@ where
|
||||
{
|
||||
type Item = io::Result<Transport<TcpStream, Item, SinkItem>>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<Self::Item>> {
|
||||
fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<Self::Item>> {
|
||||
let next = ready!(self.incoming().poll_next(waker)?);
|
||||
Poll::Ready(next.map(|conn| Ok(new(conn))))
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ description = "An example server built on tarpc."
|
||||
[dependencies]
|
||||
bincode-transport = { package = "tarpc-bincode-transport", version = "0.3", path = "../bincode-transport" }
|
||||
clap = "2.0"
|
||||
futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] }
|
||||
futures-preview = { version = "0.3.0-alpha.12", features = ["compat", "tokio-compat"] }
|
||||
serde = { version = "1.0" }
|
||||
tarpc = { version = "0.14", path = "../tarpc", features = ["serde1"] }
|
||||
tokio = "0.1"
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
// license that can be found in the LICENSE file or at
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
#![feature(futures_api, pin, arbitrary_self_types, await_macro, async_await)]
|
||||
#![feature(futures_api, arbitrary_self_types, await_macro, async_await)]
|
||||
|
||||
use clap::{App, Arg};
|
||||
use futures::{compat::TokioDefaultSpawner, prelude::*};
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
|
||||
#![feature(
|
||||
futures_api,
|
||||
pin,
|
||||
arbitrary_self_types,
|
||||
await_macro,
|
||||
async_await,
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
// license that can be found in the LICENSE file or at
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
#![feature(futures_api, pin, arbitrary_self_types, await_macro, async_await)]
|
||||
#![feature(futures_api, arbitrary_self_types, await_macro, async_await)]
|
||||
|
||||
use clap::{App, Arg};
|
||||
use futures::{
|
||||
|
||||
@@ -15,7 +15,7 @@ description = "Proc macros for tarpc."
|
||||
travis-ci = { repository = "google/tarpc" }
|
||||
|
||||
[dependencies]
|
||||
itertools = "0.7"
|
||||
itertools = "0.8"
|
||||
syn = { version = "0.15", features = ["full", "extra-traits"] }
|
||||
quote = "0.6"
|
||||
proc-macro2 = "0.4"
|
||||
|
||||
@@ -20,17 +20,17 @@ serde1 = ["trace/serde", "serde", "serde/derive"]
|
||||
fnv = "1.0"
|
||||
humantime = "1.0"
|
||||
log = "0.4"
|
||||
pin-utils = "0.1.0-alpha.3"
|
||||
rand = "0.5"
|
||||
pin-utils = "0.1.0-alpha.4"
|
||||
rand = "0.6"
|
||||
tokio-timer = "0.2"
|
||||
trace = { package = "tarpc-trace", version = "0.1", path = "../trace" }
|
||||
serde = { optional = true, version = "1.0" }
|
||||
|
||||
[target.'cfg(not(test))'.dependencies]
|
||||
futures-preview = { version = "0.3.0-alpha.9", features = ["compat"] }
|
||||
futures-preview = { version = "0.3.0-alpha.12", features = ["compat"] }
|
||||
|
||||
[dev-dependencies]
|
||||
futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] }
|
||||
futures-test-preview = { version = "0.3.0-alpha.9" }
|
||||
env_logger = "0.5"
|
||||
futures-preview = { version = "0.3.0-alpha.12", features = ["compat", "tokio-compat"] }
|
||||
futures-test-preview = { version = "0.3.0-alpha.12" }
|
||||
env_logger = "0.6"
|
||||
tokio = "0.1"
|
||||
|
||||
@@ -83,7 +83,7 @@ impl<'a, Req, Resp> Future for Send<'a, Req, Resp> {
|
||||
type Output = io::Result<DispatchResponse<Resp>>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
self.fut().poll(lw)
|
||||
self.as_mut().fut().poll(lw)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,7 +102,7 @@ impl<'a, Req, Resp> Future for Call<'a, Req, Resp> {
|
||||
type Output = io::Result<Resp>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
self.fut().poll(lw)
|
||||
self.as_mut().fut().poll(lw)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -185,8 +185,8 @@ impl<Resp> Future for DispatchResponse<Resp> {
|
||||
Poll::Ready(match resp {
|
||||
Ok(resp) => Ok(resp.message?),
|
||||
Err(e) => Err({
|
||||
let trace_id = *self.ctx().trace_id();
|
||||
let server_addr = *self.server_addr();
|
||||
let trace_id = *self.as_mut().ctx().trace_id();
|
||||
let server_addr = *self.as_mut().server_addr();
|
||||
|
||||
if e.is_elapsed() {
|
||||
io::Error::new(
|
||||
@@ -318,13 +318,13 @@ where
|
||||
unsafe_pinned!(transport: Fuse<C>);
|
||||
|
||||
fn pump_read(self: &mut Pin<&mut Self>, waker: &LocalWaker) -> PollIo<()> {
|
||||
Poll::Ready(match ready!(self.transport().poll_next(waker)?) {
|
||||
Poll::Ready(match ready!(self.as_mut().transport().poll_next(waker)?) {
|
||||
Some(response) => {
|
||||
self.complete(response);
|
||||
Some(Ok(()))
|
||||
}
|
||||
None => {
|
||||
trace!("[{}] read half closed", self.server_addr());
|
||||
trace!("[{}] read half closed", self.as_mut().server_addr());
|
||||
None
|
||||
}
|
||||
})
|
||||
@@ -356,12 +356,12 @@ where
|
||||
|
||||
match (pending_requests_status, canceled_requests_status) {
|
||||
(ReceiverStatus::Closed, ReceiverStatus::Closed) => {
|
||||
ready!(self.transport().poll_flush(waker)?);
|
||||
ready!(self.as_mut().transport().poll_flush(waker)?);
|
||||
Poll::Ready(None)
|
||||
}
|
||||
(ReceiverStatus::NotReady, _) | (_, ReceiverStatus::NotReady) => {
|
||||
// No more messages to process, so flush any messages buffered in the transport.
|
||||
ready!(self.transport().poll_flush(waker)?);
|
||||
ready!(self.as_mut().transport().poll_flush(waker)?);
|
||||
|
||||
// Even if we fully-flush, we return Pending, because we have no more requests
|
||||
// or cancellations right now.
|
||||
@@ -375,10 +375,10 @@ where
|
||||
self: &mut Pin<&mut Self>,
|
||||
waker: &LocalWaker,
|
||||
) -> PollIo<DispatchRequest<Req, Resp>> {
|
||||
if self.in_flight_requests().len() >= self.config.max_in_flight_requests {
|
||||
if self.as_mut().in_flight_requests().len() >= self.config.max_in_flight_requests {
|
||||
info!(
|
||||
"At in-flight request capacity ({}/{}).",
|
||||
self.in_flight_requests().len(),
|
||||
self.as_mut().in_flight_requests().len(),
|
||||
self.config.max_in_flight_requests
|
||||
);
|
||||
|
||||
@@ -387,13 +387,13 @@ where
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
while let Poll::Pending = self.transport().poll_ready(waker)? {
|
||||
while let Poll::Pending = self.as_mut().transport().poll_ready(waker)? {
|
||||
// We can't yield a request-to-be-sent before the transport is capable of buffering it.
|
||||
ready!(self.transport().poll_flush(waker)?);
|
||||
ready!(self.as_mut().transport().poll_flush(waker)?);
|
||||
}
|
||||
|
||||
loop {
|
||||
match ready!(self.pending_requests().poll_next_unpin(waker)) {
|
||||
match ready!(self.as_mut().pending_requests().poll_next_unpin(waker)) {
|
||||
Some(request) => {
|
||||
if request.response_completion.is_canceled() {
|
||||
trace!(
|
||||
@@ -406,7 +406,7 @@ where
|
||||
return Poll::Ready(Some(Ok(request)));
|
||||
}
|
||||
None => {
|
||||
trace!("[{}] pending_requests closed", self.server_addr());
|
||||
trace!("[{}] pending_requests closed", self.as_mut().server_addr());
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
}
|
||||
@@ -418,27 +418,27 @@ where
|
||||
self: &mut Pin<&mut Self>,
|
||||
waker: &LocalWaker,
|
||||
) -> PollIo<(context::Context, u64)> {
|
||||
while let Poll::Pending = self.transport().poll_ready(waker)? {
|
||||
ready!(self.transport().poll_flush(waker)?);
|
||||
while let Poll::Pending = self.as_mut().transport().poll_ready(waker)? {
|
||||
ready!(self.as_mut().transport().poll_flush(waker)?);
|
||||
}
|
||||
|
||||
loop {
|
||||
match ready!(self.canceled_requests().poll_next_unpin(waker)) {
|
||||
match ready!(self.as_mut().canceled_requests().poll_next_unpin(waker)) {
|
||||
Some(request_id) => {
|
||||
if let Some(in_flight_data) = self.in_flight_requests().remove(&request_id) {
|
||||
self.in_flight_requests().compact(0.1);
|
||||
if let Some(in_flight_data) = self.as_mut().in_flight_requests().remove(&request_id) {
|
||||
self.as_mut().in_flight_requests().compact(0.1);
|
||||
|
||||
debug!(
|
||||
"[{}/{}] Removed request.",
|
||||
in_flight_data.ctx.trace_id(),
|
||||
self.server_addr()
|
||||
self.as_mut().server_addr()
|
||||
);
|
||||
|
||||
return Poll::Ready(Some(Ok((in_flight_data.ctx, request_id))));
|
||||
}
|
||||
}
|
||||
None => {
|
||||
trace!("[{}] canceled_requests closed.", self.server_addr());
|
||||
trace!("[{}] canceled_requests closed.", self.as_mut().server_addr());
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
}
|
||||
@@ -458,8 +458,8 @@ where
|
||||
deadline: dispatch_request.ctx.deadline,
|
||||
}),
|
||||
};
|
||||
self.transport().start_send(request)?;
|
||||
self.in_flight_requests().insert(
|
||||
self.as_mut().transport().start_send(request)?;
|
||||
self.as_mut().in_flight_requests().insert(
|
||||
request_id,
|
||||
InFlightData {
|
||||
ctx: dispatch_request.ctx,
|
||||
@@ -479,20 +479,20 @@ where
|
||||
trace_context: context.trace_context,
|
||||
message: ClientMessageKind::Cancel { request_id },
|
||||
};
|
||||
self.transport().start_send(cancel)?;
|
||||
trace!("[{}/{}] Cancel message sent.", trace_id, self.server_addr());
|
||||
self.as_mut().transport().start_send(cancel)?;
|
||||
trace!("[{}/{}] Cancel message sent.", trace_id, self.as_mut().server_addr());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Sends a server response to the client task that initiated the associated request.
|
||||
fn complete(self: &mut Pin<&mut Self>, response: Response<Resp>) -> bool {
|
||||
if let Some(in_flight_data) = self.in_flight_requests().remove(&response.request_id) {
|
||||
self.in_flight_requests().compact(0.1);
|
||||
if let Some(in_flight_data) = self.as_mut().in_flight_requests().remove(&response.request_id) {
|
||||
self.as_mut().in_flight_requests().compact(0.1);
|
||||
|
||||
trace!(
|
||||
"[{}/{}] Received response.",
|
||||
in_flight_data.ctx.trace_id(),
|
||||
self.server_addr()
|
||||
self.as_mut().server_addr()
|
||||
);
|
||||
let _ = in_flight_data.response_completion.send(response);
|
||||
return true;
|
||||
@@ -500,7 +500,7 @@ where
|
||||
|
||||
debug!(
|
||||
"[{}] No in-flight request found for request_id = {}.",
|
||||
self.server_addr(),
|
||||
self.as_mut().server_addr(),
|
||||
response.request_id
|
||||
);
|
||||
|
||||
@@ -518,14 +518,14 @@ where
|
||||
type Output = io::Result<()>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
trace!("[{}] RequestDispatch::poll", self.server_addr());
|
||||
trace!("[{}] RequestDispatch::poll", self.as_mut().server_addr());
|
||||
loop {
|
||||
match (self.pump_read(waker)?, self.pump_write(waker)?) {
|
||||
(read, write @ Poll::Ready(None)) => {
|
||||
if self.in_flight_requests().is_empty() {
|
||||
if self.as_mut().in_flight_requests().is_empty() {
|
||||
info!(
|
||||
"[{}] Shutdown: write half closed, and no requests in flight.",
|
||||
self.server_addr()
|
||||
self.as_mut().server_addr()
|
||||
);
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
@@ -534,7 +534,7 @@ where
|
||||
_ => {
|
||||
trace!(
|
||||
"[{}] read: {:?}, write: {:?}, (not ready)",
|
||||
self.server_addr(),
|
||||
self.as_mut().server_addr(),
|
||||
read,
|
||||
write,
|
||||
);
|
||||
@@ -545,7 +545,7 @@ where
|
||||
(read @ Poll::Ready(Some(())), write) | (read, write @ Poll::Ready(Some(()))) => {
|
||||
trace!(
|
||||
"[{}] read: {:?}, write: {:?}",
|
||||
self.server_addr(),
|
||||
self.as_mut().server_addr(),
|
||||
read,
|
||||
write,
|
||||
)
|
||||
@@ -553,7 +553,7 @@ where
|
||||
(read, write) => {
|
||||
trace!(
|
||||
"[{}] read: {:?}, write: {:?} (not ready)",
|
||||
self.server_addr(),
|
||||
self.as_mut().server_addr(),
|
||||
read,
|
||||
write,
|
||||
);
|
||||
@@ -640,7 +640,7 @@ where
|
||||
type Output = io::Result<Fut::Ok>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
match self.future().try_poll(lw) {
|
||||
match self.as_mut().future().try_poll(lw) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(result) => {
|
||||
self.finished().take().expect(
|
||||
@@ -680,10 +680,11 @@ where
|
||||
type Output = Result<DispatchResponse<Resp>, Fut::Error>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
match self.future().try_poll(lw) {
|
||||
match self.as_mut().future().try_poll(lw) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(result) => {
|
||||
let response = self
|
||||
.as_mut()
|
||||
.response()
|
||||
.take()
|
||||
.expect("MapOk must not be polled after it returned `Poll::Ready`");
|
||||
@@ -721,7 +722,7 @@ where
|
||||
{
|
||||
type Output = Result<Fut2::Ok, Fut2::Error>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
self.try_chain().poll(lw, |result| match result {
|
||||
Ok(ok) => TryChainAction::Future(ok),
|
||||
Err(err) => TryChainAction::Output(Err(err)),
|
||||
@@ -760,8 +761,8 @@ where
|
||||
{
|
||||
let mut f = Some(f);
|
||||
|
||||
// Safe to call `get_mut_unchecked` because we won't move the futures.
|
||||
let this = unsafe { Pin::get_mut_unchecked(self) };
|
||||
// Safe to call `get_unchecked_mut` because we won't move the futures.
|
||||
let this = unsafe { Pin::get_unchecked_mut(self) };
|
||||
|
||||
loop {
|
||||
let output = match this {
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
integer_atomics,
|
||||
try_trait,
|
||||
futures_api,
|
||||
pin,
|
||||
arbitrary_self_types,
|
||||
await_macro,
|
||||
async_await
|
||||
|
||||
@@ -107,28 +107,28 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
|
||||
}
|
||||
};
|
||||
|
||||
let open_connections = *self.open_connections();
|
||||
if open_connections >= self.config().max_connections {
|
||||
let open_connections = *self.as_mut().open_connections();
|
||||
if open_connections >= self.as_mut().config().max_connections {
|
||||
warn!(
|
||||
"[{}] Shedding connection because the maximum open connections \
|
||||
limit is reached ({}/{}).",
|
||||
peer,
|
||||
open_connections,
|
||||
self.config().max_connections
|
||||
self.as_mut().config().max_connections
|
||||
);
|
||||
return NewConnection::Filtered;
|
||||
}
|
||||
|
||||
let config = self.config.clone();
|
||||
let open_connections_for_ip = self.increment_connections_for_ip(&peer)?;
|
||||
*self.open_connections() += 1;
|
||||
*self.as_mut().open_connections() += 1;
|
||||
|
||||
debug!(
|
||||
"[{}] Opening channel ({}/{} connections for IP, {} total).",
|
||||
peer,
|
||||
open_connections_for_ip,
|
||||
config.max_connections_per_ip,
|
||||
self.open_connections(),
|
||||
self.as_mut().open_connections(),
|
||||
);
|
||||
|
||||
NewConnection::Accepted(Channel {
|
||||
@@ -141,19 +141,19 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
|
||||
}
|
||||
|
||||
fn handle_closed_connection(self: &mut Pin<&mut Self>, addr: &SocketAddr) {
|
||||
*self.open_connections() -= 1;
|
||||
*self.as_mut().open_connections() -= 1;
|
||||
debug!(
|
||||
"[{}] Closing channel. {} open connections remaining.",
|
||||
addr, self.open_connections
|
||||
);
|
||||
self.decrement_connections_for_ip(&addr);
|
||||
self.connections_per_ip().compact(0.1);
|
||||
self.as_mut().connections_per_ip().compact(0.1);
|
||||
}
|
||||
|
||||
fn increment_connections_for_ip(self: &mut Pin<&mut Self>, peer: &SocketAddr) -> Option<usize> {
|
||||
let max_connections_per_ip = self.config().max_connections_per_ip;
|
||||
let max_connections_per_ip = self.as_mut().config().max_connections_per_ip;
|
||||
let mut occupied;
|
||||
let mut connections_per_ip = self.connections_per_ip();
|
||||
let mut connections_per_ip = self.as_mut().connections_per_ip();
|
||||
let occupied = match connections_per_ip.entry(peer.ip()) {
|
||||
Entry::Vacant(vacant) => vacant.insert(0),
|
||||
Entry::Occupied(o) => {
|
||||
@@ -177,7 +177,7 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
|
||||
}
|
||||
|
||||
fn decrement_connections_for_ip(self: &mut Pin<&mut Self>, addr: &SocketAddr) {
|
||||
let should_compact = match self.connections_per_ip().entry(addr.ip()) {
|
||||
let should_compact = match self.as_mut().connections_per_ip().entry(addr.ip()) {
|
||||
Entry::Vacant(_) => {
|
||||
error!("[{}] Got vacant entry when closing connection.", addr);
|
||||
return;
|
||||
@@ -193,26 +193,26 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
|
||||
}
|
||||
};
|
||||
if should_compact {
|
||||
self.connections_per_ip().compact(0.1);
|
||||
self.as_mut().connections_per_ip().compact(0.1);
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_listener<C>(
|
||||
self: &mut Pin<&mut Self>,
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
) -> PollIo<NewConnection<Req, Resp, C>>
|
||||
where
|
||||
S: Stream<Item = Result<C, io::Error>>,
|
||||
C: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
|
||||
{
|
||||
match ready!(self.listener().poll_next_unpin(cx)?) {
|
||||
match ready!(self.as_mut().listener().poll_next_unpin(cx)?) {
|
||||
Some(codec) => Poll::Ready(Some(Ok(self.handle_new_connection(codec)))),
|
||||
None => Poll::Ready(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_closed_connections(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
match ready!(self.closed_connections_rx().poll_next_unpin(cx)) {
|
||||
match ready!(self.as_mut().closed_connections_rx().poll_next_unpin(cx)) {
|
||||
Some(addr) => {
|
||||
self.handle_closed_connection(&addr);
|
||||
Poll::Ready(Ok(()))
|
||||
@@ -231,20 +231,20 @@ where
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &LocalWaker) -> PollIo<Channel<Req, Resp, T>> {
|
||||
loop {
|
||||
match (self.poll_listener(cx)?, self.poll_closed_connections(cx)?) {
|
||||
match (self.as_mut().poll_listener(cx)?, self.poll_closed_connections(cx)?) {
|
||||
(Poll::Ready(Some(NewConnection::Accepted(channel))), _) => {
|
||||
return Poll::Ready(Some(Ok(channel)))
|
||||
}
|
||||
(Poll::Ready(Some(NewConnection::Filtered)), _) | (_, Poll::Ready(())) => {
|
||||
trace!("Filtered a connection; {} open.", self.open_connections());
|
||||
trace!("Filtered a connection; {} open.", self.as_mut().open_connections());
|
||||
continue;
|
||||
}
|
||||
(Poll::Pending, Poll::Pending) => return Poll::Pending,
|
||||
(Poll::Ready(None), Poll::Pending) => {
|
||||
if *self.open_connections() > 0 {
|
||||
if *self.as_mut().open_connections() > 0 {
|
||||
trace!(
|
||||
"Listener closed; {} open connections.",
|
||||
self.open_connections()
|
||||
self.as_mut().open_connections()
|
||||
);
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
@@ -134,12 +134,12 @@ where
|
||||
type Output = ();
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<()> {
|
||||
while let Some(channel) = ready!(self.incoming().poll_next(cx)) {
|
||||
while let Some(channel) = ready!(self.as_mut().incoming().poll_next(cx)) {
|
||||
match channel {
|
||||
Ok(channel) => {
|
||||
let peer = channel.client_addr;
|
||||
if let Err(e) =
|
||||
crate::spawn(channel.respond_with(self.request_handler().clone()))
|
||||
crate::spawn(channel.respond_with(self.as_mut().request_handler().clone()))
|
||||
{
|
||||
warn!("[{}] Failed to spawn connection handler: {:?}", peer, e);
|
||||
}
|
||||
@@ -231,25 +231,25 @@ where
|
||||
Resp: Send,
|
||||
{
|
||||
pub(crate) fn start_send(
|
||||
self: &mut Pin<&mut Self>,
|
||||
mut self: Pin<&mut Self>,
|
||||
response: Response<Resp>,
|
||||
) -> io::Result<()> {
|
||||
self.transport().start_send(response)
|
||||
self.as_mut().transport().start_send(response)
|
||||
}
|
||||
|
||||
pub(crate) fn poll_ready(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
self.transport().poll_ready(cx)
|
||||
pub(crate) fn poll_ready(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
self.as_mut().transport().poll_ready(cx)
|
||||
}
|
||||
|
||||
pub(crate) fn poll_flush(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
self.transport().poll_flush(cx)
|
||||
pub(crate) fn poll_flush(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
self.as_mut().transport().poll_flush(cx)
|
||||
}
|
||||
|
||||
pub(crate) fn poll_next(
|
||||
self: &mut Pin<&mut Self>,
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
) -> PollIo<ClientMessage<Req>> {
|
||||
self.transport().poll_next(cx)
|
||||
self.as_mut().transport().poll_next(cx)
|
||||
}
|
||||
|
||||
/// Returns the address of the client connected to the channel.
|
||||
@@ -317,30 +317,30 @@ where
|
||||
/// If at max in-flight requests, check that there's room to immediately write a throttled
|
||||
/// response.
|
||||
fn poll_ready_if_throttling(
|
||||
self: &mut Pin<&mut Self>,
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
) -> Poll<io::Result<()>> {
|
||||
if self.in_flight_requests.len()
|
||||
>= self.channel.config.max_in_flight_requests_per_connection
|
||||
{
|
||||
let peer = self.channel().client_addr;
|
||||
let peer = self.as_mut().channel().client_addr;
|
||||
|
||||
while let Poll::Pending = self.channel().poll_ready(cx)? {
|
||||
while let Poll::Pending = self.as_mut().channel().poll_ready(cx)? {
|
||||
info!(
|
||||
"[{}] In-flight requests at max ({}), and transport is not ready.",
|
||||
peer,
|
||||
self.in_flight_requests().len(),
|
||||
self.as_mut().in_flight_requests().len(),
|
||||
);
|
||||
try_ready!(self.channel().poll_flush(cx));
|
||||
try_ready!(self.as_mut().channel().poll_flush(cx));
|
||||
}
|
||||
}
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn pump_read(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> PollIo<()> {
|
||||
ready!(self.poll_ready_if_throttling(cx)?);
|
||||
fn pump_read(mut self: Pin<&mut Self>, cx: &LocalWaker) -> PollIo<()> {
|
||||
ready!(self.as_mut().poll_ready_if_throttling(cx)?);
|
||||
|
||||
Poll::Ready(match ready!(self.channel().poll_next(cx)?) {
|
||||
Poll::Ready(match ready!(self.as_mut().channel().poll_next(cx)?) {
|
||||
Some(message) => {
|
||||
match message.message {
|
||||
ClientMessageKind::Request(request) => {
|
||||
@@ -360,28 +360,28 @@ where
|
||||
}
|
||||
|
||||
fn pump_write(
|
||||
self: &mut Pin<&mut Self>,
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
read_half_closed: bool,
|
||||
) -> PollIo<()> {
|
||||
match self.poll_next_response(cx)? {
|
||||
match self.as_mut().poll_next_response(cx)? {
|
||||
Poll::Ready(Some((_, response))) => {
|
||||
self.channel().start_send(response)?;
|
||||
self.as_mut().channel().start_send(response)?;
|
||||
Poll::Ready(Some(Ok(())))
|
||||
}
|
||||
Poll::Ready(None) => {
|
||||
// Shutdown can't be done before we finish pumping out remaining responses.
|
||||
ready!(self.channel().poll_flush(cx)?);
|
||||
ready!(self.as_mut().channel().poll_flush(cx)?);
|
||||
Poll::Ready(None)
|
||||
}
|
||||
Poll::Pending => {
|
||||
// No more requests to process, so flush any requests buffered in the transport.
|
||||
ready!(self.channel().poll_flush(cx)?);
|
||||
ready!(self.as_mut().channel().poll_flush(cx)?);
|
||||
|
||||
// Being here means there are no staged requests and all written responses are
|
||||
// fully flushed. So, if the read half is closed and there are no in-flight
|
||||
// requests, then we can close the write half.
|
||||
if read_half_closed && self.in_flight_requests().is_empty() {
|
||||
if read_half_closed && self.as_mut().in_flight_requests().is_empty() {
|
||||
Poll::Ready(None)
|
||||
} else {
|
||||
Poll::Pending
|
||||
@@ -391,30 +391,31 @@ where
|
||||
}
|
||||
|
||||
fn poll_next_response(
|
||||
self: &mut Pin<&mut Self>,
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &LocalWaker,
|
||||
) -> PollIo<(Context, Response<Resp>)> {
|
||||
// Ensure there's room to write a response.
|
||||
while let Poll::Pending = self.channel().poll_ready(cx)? {
|
||||
ready!(self.channel().poll_flush(cx)?);
|
||||
while let Poll::Pending = self.as_mut().channel().poll_ready(cx)? {
|
||||
ready!(self.as_mut().channel().poll_flush(cx)?);
|
||||
}
|
||||
|
||||
let peer = self.channel().client_addr;
|
||||
let peer = self.as_mut().channel().client_addr;
|
||||
|
||||
match ready!(self.pending_responses().poll_next(cx)) {
|
||||
match ready!(self.as_mut().pending_responses().poll_next(cx)) {
|
||||
Some((ctx, response)) => {
|
||||
if self
|
||||
.as_mut()
|
||||
.in_flight_requests()
|
||||
.remove(&response.request_id)
|
||||
.is_some()
|
||||
{
|
||||
self.in_flight_requests().compact(0.1);
|
||||
self.as_mut().in_flight_requests().compact(0.1);
|
||||
}
|
||||
trace!(
|
||||
"[{}/{}] Staging response. In-flight requests = {}.",
|
||||
ctx.trace_id(),
|
||||
peer,
|
||||
self.in_flight_requests().len(),
|
||||
self.as_mut().in_flight_requests().len(),
|
||||
);
|
||||
Poll::Ready(Some(Ok((ctx, response))))
|
||||
}
|
||||
@@ -427,30 +428,30 @@ where
|
||||
}
|
||||
|
||||
fn handle_request(
|
||||
self: &mut Pin<&mut Self>,
|
||||
mut self: Pin<&mut Self>,
|
||||
trace_context: trace::Context,
|
||||
request: Request<Req>,
|
||||
) -> io::Result<()> {
|
||||
let request_id = request.id;
|
||||
let peer = self.channel().client_addr;
|
||||
let peer = self.as_mut().channel().client_addr;
|
||||
let ctx = Context {
|
||||
deadline: request.deadline,
|
||||
trace_context,
|
||||
};
|
||||
let request = request.message;
|
||||
|
||||
if self.in_flight_requests().len()
|
||||
>= self.channel().config.max_in_flight_requests_per_connection
|
||||
if self.as_mut().in_flight_requests().len()
|
||||
>= self.as_mut().channel().config.max_in_flight_requests_per_connection
|
||||
{
|
||||
debug!(
|
||||
"[{}/{}] Client has reached in-flight request limit ({}/{}).",
|
||||
ctx.trace_id(),
|
||||
peer,
|
||||
self.in_flight_requests().len(),
|
||||
self.channel().config.max_in_flight_requests_per_connection
|
||||
self.as_mut().in_flight_requests().len(),
|
||||
self.as_mut().channel().config.max_in_flight_requests_per_connection
|
||||
);
|
||||
|
||||
self.channel().start_send(Response {
|
||||
self.as_mut().channel().start_send(Response {
|
||||
request_id,
|
||||
message: Err(ServerError {
|
||||
kind: io::ErrorKind::WouldBlock,
|
||||
@@ -469,10 +470,10 @@ where
|
||||
format_rfc3339(deadline),
|
||||
timeout,
|
||||
);
|
||||
let mut response_tx = self.responses_tx().clone();
|
||||
let mut response_tx = self.as_mut().responses_tx().clone();
|
||||
|
||||
let trace_id = *ctx.trace_id();
|
||||
let response = self.f().clone()(ctx, request);
|
||||
let response = self.as_mut().f().clone()(ctx, request);
|
||||
let response = deadline_compat::Deadline::new(response, Instant::now() + timeout).then(
|
||||
async move |result| {
|
||||
let response = Response {
|
||||
@@ -496,18 +497,18 @@ where
|
||||
),
|
||||
)
|
||||
})?;
|
||||
self.in_flight_requests().insert(request_id, abort_handle);
|
||||
self.as_mut().in_flight_requests().insert(request_id, abort_handle);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn cancel_request(self: &mut Pin<&mut Self>, trace_context: &trace::Context, request_id: u64) {
|
||||
fn cancel_request(mut self: Pin<&mut Self>, trace_context: &trace::Context, request_id: u64) {
|
||||
// It's possible the request was already completed, so it's fine
|
||||
// if this is None.
|
||||
if let Some(cancel_handle) = self.in_flight_requests().remove(&request_id) {
|
||||
self.in_flight_requests().compact(0.1);
|
||||
if let Some(cancel_handle) = self.as_mut().in_flight_requests().remove(&request_id) {
|
||||
self.as_mut().in_flight_requests().compact(0.1);
|
||||
|
||||
cancel_handle.abort();
|
||||
let remaining = self.in_flight_requests().len();
|
||||
let remaining = self.as_mut().in_flight_requests().len();
|
||||
trace!(
|
||||
"[{}/{}] Request canceled. In-flight requests = {}",
|
||||
trace_context.trace_id,
|
||||
@@ -538,8 +539,8 @@ where
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
trace!("[{}] ClientHandler::poll", self.channel.client_addr);
|
||||
loop {
|
||||
let read = self.pump_read(cx)?;
|
||||
match (read, self.pump_write(cx, read == Poll::Ready(None))?) {
|
||||
let read = self.as_mut().pump_read(cx)?;
|
||||
match (read, self.as_mut().pump_write(cx, read == Poll::Ready(None))?) {
|
||||
(Poll::Ready(None), Poll::Ready(None)) => {
|
||||
info!("[{}] Client disconnected.", self.channel.client_addr);
|
||||
return Poll::Ready(Ok(()));
|
||||
|
||||
@@ -45,7 +45,7 @@ impl<Item, SinkItem> UnboundedChannel<Item, SinkItem> {
|
||||
impl<Item, SinkItem> Stream for UnboundedChannel<Item, SinkItem> {
|
||||
type Item = Result<Item, io::Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &LocalWaker) -> PollIo<Item> {
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &LocalWaker) -> PollIo<Item> {
|
||||
self.rx().poll_next(cx).map(|option| option.map(Ok))
|
||||
}
|
||||
}
|
||||
@@ -54,25 +54,25 @@ impl<Item, SinkItem> Sink for UnboundedChannel<Item, SinkItem> {
|
||||
type SinkItem = SinkItem;
|
||||
type SinkError = io::Error;
|
||||
|
||||
fn poll_ready(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
self.tx()
|
||||
.poll_ready(cx)
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
|
||||
}
|
||||
|
||||
fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
|
||||
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
|
||||
self.tx()
|
||||
.start_send(item)
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<Result<(), Self::SinkError>> {
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<Result<(), Self::SinkError>> {
|
||||
self.tx()
|
||||
.poll_flush(cx)
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
|
||||
}
|
||||
|
||||
fn poll_close(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<io::Result<()>> {
|
||||
self.tx()
|
||||
.poll_close(cx)
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
|
||||
|
||||
@@ -71,7 +71,7 @@ where
|
||||
{
|
||||
type Item = S::Item;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<S::Item>> {
|
||||
fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<S::Item>> {
|
||||
self.inner().poll_next(waker)
|
||||
}
|
||||
}
|
||||
@@ -83,19 +83,19 @@ where
|
||||
type SinkItem = S::SinkItem;
|
||||
type SinkError = S::SinkError;
|
||||
|
||||
fn start_send(mut self: Pin<&mut Self>, item: S::SinkItem) -> Result<(), S::SinkError> {
|
||||
fn start_send(self: Pin<&mut Self>, item: S::SinkItem) -> Result<(), S::SinkError> {
|
||||
self.inner().start_send(item)
|
||||
}
|
||||
|
||||
fn poll_ready(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
fn poll_ready(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
self.inner().poll_ready(waker)
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
fn poll_flush(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
self.inner().poll_flush(waker)
|
||||
}
|
||||
|
||||
fn poll_close(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
fn poll_close(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), S::SinkError>> {
|
||||
self.inner().poll_close(waker)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ where
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Self::Output> {
|
||||
// First, try polling the future
|
||||
match self.future().try_poll(waker) {
|
||||
match self.as_mut().future().try_poll(waker) {
|
||||
Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)),
|
||||
Poll::Pending => {}
|
||||
Poll::Ready(Err(e)) => return Poll::Ready(Err(timeout::Error::inner(e))),
|
||||
|
||||
@@ -25,16 +25,16 @@ tarpc-plugins = { path = "../plugins", version = "0.5.0" }
|
||||
rpc = { package = "tarpc-lib", path = "../rpc", version = "0.2" }
|
||||
|
||||
[target.'cfg(not(test))'.dependencies]
|
||||
futures-preview = "0.3.0-alpha.9"
|
||||
futures-preview = "0.3.0-alpha.12"
|
||||
|
||||
[dev-dependencies]
|
||||
bincode = "1.0"
|
||||
bytes = { version = "0.4", features = ["serde"] }
|
||||
humantime = "1.0"
|
||||
futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] }
|
||||
futures-preview = { version = "0.3.0-alpha.12", features = ["compat", "tokio-compat"] }
|
||||
bincode-transport = { package = "tarpc-bincode-transport", version = "0.3", path = "../bincode-transport" }
|
||||
env_logger = "0.5"
|
||||
env_logger = "0.6"
|
||||
tokio = "0.1"
|
||||
tokio-executor = "0.1"
|
||||
tokio-tcp = "0.1"
|
||||
pin-utils = "0.1.0-alpha.3"
|
||||
pin-utils = "0.1.0-alpha.4"
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
|
||||
#![feature(
|
||||
arbitrary_self_types,
|
||||
pin,
|
||||
futures_api,
|
||||
await_macro,
|
||||
async_await,
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
|
||||
#![feature(
|
||||
futures_api,
|
||||
pin,
|
||||
arbitrary_self_types,
|
||||
await_macro,
|
||||
async_await,
|
||||
|
||||
@@ -7,7 +7,6 @@
|
||||
#![feature(
|
||||
existential_type,
|
||||
arbitrary_self_types,
|
||||
pin,
|
||||
futures_api,
|
||||
await_macro,
|
||||
async_await,
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
#![feature(
|
||||
pin,
|
||||
async_await,
|
||||
await_macro,
|
||||
futures_api,
|
||||
@@ -214,7 +213,7 @@ mod registry {
|
||||
|
||||
fn poll(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Output> {
|
||||
unsafe {
|
||||
match Pin::get_mut_unchecked(self) {
|
||||
match Pin::get_unchecked_mut(self) {
|
||||
Either::Left(car) => Pin::new_unchecked(car).poll(waker),
|
||||
Either::Right(cdr) => Pin::new_unchecked(cdr).poll(waker),
|
||||
}
|
||||
|
||||
@@ -10,7 +10,6 @@
|
||||
#![cfg_attr(
|
||||
test,
|
||||
feature(
|
||||
pin,
|
||||
futures_api,
|
||||
await_macro,
|
||||
proc_macro_hygiene,
|
||||
|
||||
@@ -181,7 +181,7 @@ macro_rules! service {
|
||||
-> ::std::task::Poll<::std::io::Result<Response>>
|
||||
{
|
||||
unsafe {
|
||||
match ::std::pin::Pin::get_mut_unchecked(self) {
|
||||
match ::std::pin::Pin::get_unchecked_mut(self) {
|
||||
$(
|
||||
ResponseFut::$fn_name(resp) =>
|
||||
::std::pin::Pin::new_unchecked(resp)
|
||||
|
||||
@@ -7,7 +7,6 @@
|
||||
#![feature(
|
||||
test,
|
||||
arbitrary_self_types,
|
||||
pin,
|
||||
integer_atomics,
|
||||
futures_api,
|
||||
generators,
|
||||
|
||||
@@ -13,7 +13,7 @@ readme = "../README.md"
|
||||
description = "foundations for tracing in tarpc"
|
||||
|
||||
[dependencies]
|
||||
rand = "0.5"
|
||||
rand = "0.6"
|
||||
|
||||
[dependencies.serde]
|
||||
version = "1.0"
|
||||
|
||||
Reference in New Issue
Block a user