diff --git a/bincode-transport/Cargo.toml b/bincode-transport/Cargo.toml index 2babb2a..c3430e3 100644 --- a/bincode-transport/Cargo.toml +++ b/bincode-transport/Cargo.toml @@ -15,7 +15,7 @@ description = "A bincode-based transport for tarpc services." [dependencies] bincode = { version = "1.0", features = ["i128"] } futures_legacy = { version = "0.1", package = "futures" } -pin-utils = "0.1.0-alpha.3" +pin-utils = "0.1.0-alpha.4" rpc = { package = "tarpc-lib", version = "0.2", path = "../rpc", features = ["serde1"] } serde = "1.0" tokio-io = "0.1" @@ -23,16 +23,16 @@ async-bincode = "0.4" tokio-tcp = "0.1" [target.'cfg(not(test))'.dependencies] -futures-preview = { version = "0.3.0-alpha.9", features = ["compat"] } +futures-preview = { version = "0.3.0-alpha.12", features = ["compat"] } [dev-dependencies] -futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] } -env_logger = "0.5" +futures-preview = { version = "0.3.0-alpha.12", features = ["compat", "tokio-compat"] } +env_logger = "0.6" humantime = "1.0" log = "0.4" -rand = "0.5" +rand = "0.6" tokio = "0.1" tokio-executor = "0.1" tokio-reactor = "0.1" -tokio-serde = "0.2" +tokio-serde = "0.3" tokio-timer = "0.2" diff --git a/bincode-transport/src/compat.rs b/bincode-transport/src/compat.rs index 8b37d49..9156f9b 100644 --- a/bincode-transport/src/compat.rs +++ b/bincode-transport/src/compat.rs @@ -46,7 +46,7 @@ where fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { unsafe { - let inner = &mut Pin::get_mut_unchecked(self).inner; + let inner = &mut Pin::get_unchecked_mut(self).inner; let mut compat = inner.compat(); let compat = Pin::new_unchecked(&mut compat); match ready!(compat.poll_next(waker)) { @@ -66,7 +66,7 @@ where type SinkError = S::SinkError; fn start_send(self: Pin<&mut Self>, item: SinkItem) -> Result<(), S::SinkError> { - let me = unsafe { Pin::get_mut_unchecked(self) }; + let me = unsafe { Pin::get_unchecked_mut(self) }; assert!(me.staged_item.is_none()); me.staged_item = Some(item); Ok(()) @@ -76,7 +76,7 @@ where let notify = &WakerToHandle(waker); executor01::with_notify(notify, 0, move || { - let me = unsafe { Pin::get_mut_unchecked(self) }; + let me = unsafe { Pin::get_unchecked_mut(self) }; match me.staged_item.take() { Some(staged_item) => match me.inner.start_send(staged_item) { Ok(AsyncSink01::Ready) => Poll::Ready(Ok(())), @@ -95,7 +95,7 @@ where let notify = &WakerToHandle(waker); executor01::with_notify(notify, 0, move || { - let me = unsafe { Pin::get_mut_unchecked(self) }; + let me = unsafe { Pin::get_unchecked_mut(self) }; match me.inner.poll_complete() { Ok(Async01::Ready(())) => Poll::Ready(Ok(())), Ok(Async01::NotReady) => Poll::Pending, @@ -108,7 +108,7 @@ where let notify = &WakerToHandle(waker); executor01::with_notify(notify, 0, move || { - let me = unsafe { Pin::get_mut_unchecked(self) }; + let me = unsafe { Pin::get_unchecked_mut(self) }; match me.inner.close() { Ok(Async01::Ready(())) => Poll::Ready(Ok(())), Ok(Async01::NotReady) => Poll::Pending, diff --git a/bincode-transport/src/lib.rs b/bincode-transport/src/lib.rs index a44bf0d..cb8e981 100644 --- a/bincode-transport/src/lib.rs +++ b/bincode-transport/src/lib.rs @@ -6,7 +6,7 @@ //! A TCP [`Transport`] that serializes as bincode. -#![feature(futures_api, pin, arbitrary_self_types, await_macro, async_await)] +#![feature(futures_api, arbitrary_self_types, await_macro, async_await)] #![deny(missing_docs, missing_debug_implementations)] use self::compat::Compat; @@ -57,7 +57,7 @@ where { type Item = io::Result; - fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll>> { + fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll>> { match self.inner().poll_next(waker) { Poll::Pending => Poll::Pending, Poll::Ready(None) => Poll::Ready(None), @@ -77,21 +77,21 @@ where type SinkItem = SinkItem; type SinkError = io::Error; - fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> { + fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> { self.inner() .start_send(item) .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } - fn poll_ready(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { convert(self.inner().poll_ready(waker)) } - fn poll_flush(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { convert(self.inner().poll_flush(waker)) } - fn poll_close(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll_close(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { convert(self.inner().poll_close(waker)) } } @@ -189,7 +189,7 @@ where { type Item = io::Result>; - fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { let next = ready!(self.incoming().poll_next(waker)?); Poll::Ready(next.map(|conn| Ok(new(conn)))) } diff --git a/example-service/Cargo.toml b/example-service/Cargo.toml index 668da22..0941874 100644 --- a/example-service/Cargo.toml +++ b/example-service/Cargo.toml @@ -15,7 +15,7 @@ description = "An example server built on tarpc." [dependencies] bincode-transport = { package = "tarpc-bincode-transport", version = "0.3", path = "../bincode-transport" } clap = "2.0" -futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] } +futures-preview = { version = "0.3.0-alpha.12", features = ["compat", "tokio-compat"] } serde = { version = "1.0" } tarpc = { version = "0.14", path = "../tarpc", features = ["serde1"] } tokio = "0.1" diff --git a/example-service/src/client.rs b/example-service/src/client.rs index c2b3512..31b1c39 100644 --- a/example-service/src/client.rs +++ b/example-service/src/client.rs @@ -4,7 +4,7 @@ // license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. -#![feature(futures_api, pin, arbitrary_self_types, await_macro, async_await)] +#![feature(futures_api, arbitrary_self_types, await_macro, async_await)] use clap::{App, Arg}; use futures::{compat::TokioDefaultSpawner, prelude::*}; diff --git a/example-service/src/lib.rs b/example-service/src/lib.rs index 9bc8b7a..f710d46 100644 --- a/example-service/src/lib.rs +++ b/example-service/src/lib.rs @@ -6,7 +6,6 @@ #![feature( futures_api, - pin, arbitrary_self_types, await_macro, async_await, diff --git a/example-service/src/server.rs b/example-service/src/server.rs index 0aeff40..7d1c2ef 100644 --- a/example-service/src/server.rs +++ b/example-service/src/server.rs @@ -4,7 +4,7 @@ // license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. -#![feature(futures_api, pin, arbitrary_self_types, await_macro, async_await)] +#![feature(futures_api, arbitrary_self_types, await_macro, async_await)] use clap::{App, Arg}; use futures::{ diff --git a/plugins/Cargo.toml b/plugins/Cargo.toml index fb79084..2289a0a 100644 --- a/plugins/Cargo.toml +++ b/plugins/Cargo.toml @@ -15,7 +15,7 @@ description = "Proc macros for tarpc." travis-ci = { repository = "google/tarpc" } [dependencies] -itertools = "0.7" +itertools = "0.8" syn = { version = "0.15", features = ["full", "extra-traits"] } quote = "0.6" proc-macro2 = "0.4" diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 2941637..7ec91cb 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -20,17 +20,17 @@ serde1 = ["trace/serde", "serde", "serde/derive"] fnv = "1.0" humantime = "1.0" log = "0.4" -pin-utils = "0.1.0-alpha.3" -rand = "0.5" +pin-utils = "0.1.0-alpha.4" +rand = "0.6" tokio-timer = "0.2" trace = { package = "tarpc-trace", version = "0.1", path = "../trace" } serde = { optional = true, version = "1.0" } [target.'cfg(not(test))'.dependencies] -futures-preview = { version = "0.3.0-alpha.9", features = ["compat"] } +futures-preview = { version = "0.3.0-alpha.12", features = ["compat"] } [dev-dependencies] -futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] } -futures-test-preview = { version = "0.3.0-alpha.9" } -env_logger = "0.5" +futures-preview = { version = "0.3.0-alpha.12", features = ["compat", "tokio-compat"] } +futures-test-preview = { version = "0.3.0-alpha.12" } +env_logger = "0.6" tokio = "0.1" diff --git a/rpc/src/client/channel.rs b/rpc/src/client/channel.rs index 405c4fc..4312364 100644 --- a/rpc/src/client/channel.rs +++ b/rpc/src/client/channel.rs @@ -83,7 +83,7 @@ impl<'a, Req, Resp> Future for Send<'a, Req, Resp> { type Output = io::Result>; fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll { - self.fut().poll(lw) + self.as_mut().fut().poll(lw) } } @@ -102,7 +102,7 @@ impl<'a, Req, Resp> Future for Call<'a, Req, Resp> { type Output = io::Result; fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll { - self.fut().poll(lw) + self.as_mut().fut().poll(lw) } } @@ -185,8 +185,8 @@ impl Future for DispatchResponse { Poll::Ready(match resp { Ok(resp) => Ok(resp.message?), Err(e) => Err({ - let trace_id = *self.ctx().trace_id(); - let server_addr = *self.server_addr(); + let trace_id = *self.as_mut().ctx().trace_id(); + let server_addr = *self.as_mut().server_addr(); if e.is_elapsed() { io::Error::new( @@ -318,13 +318,13 @@ where unsafe_pinned!(transport: Fuse); fn pump_read(self: &mut Pin<&mut Self>, waker: &LocalWaker) -> PollIo<()> { - Poll::Ready(match ready!(self.transport().poll_next(waker)?) { + Poll::Ready(match ready!(self.as_mut().transport().poll_next(waker)?) { Some(response) => { self.complete(response); Some(Ok(())) } None => { - trace!("[{}] read half closed", self.server_addr()); + trace!("[{}] read half closed", self.as_mut().server_addr()); None } }) @@ -356,12 +356,12 @@ where match (pending_requests_status, canceled_requests_status) { (ReceiverStatus::Closed, ReceiverStatus::Closed) => { - ready!(self.transport().poll_flush(waker)?); + ready!(self.as_mut().transport().poll_flush(waker)?); Poll::Ready(None) } (ReceiverStatus::NotReady, _) | (_, ReceiverStatus::NotReady) => { // No more messages to process, so flush any messages buffered in the transport. - ready!(self.transport().poll_flush(waker)?); + ready!(self.as_mut().transport().poll_flush(waker)?); // Even if we fully-flush, we return Pending, because we have no more requests // or cancellations right now. @@ -375,10 +375,10 @@ where self: &mut Pin<&mut Self>, waker: &LocalWaker, ) -> PollIo> { - if self.in_flight_requests().len() >= self.config.max_in_flight_requests { + if self.as_mut().in_flight_requests().len() >= self.config.max_in_flight_requests { info!( "At in-flight request capacity ({}/{}).", - self.in_flight_requests().len(), + self.as_mut().in_flight_requests().len(), self.config.max_in_flight_requests ); @@ -387,13 +387,13 @@ where return Poll::Pending; } - while let Poll::Pending = self.transport().poll_ready(waker)? { + while let Poll::Pending = self.as_mut().transport().poll_ready(waker)? { // We can't yield a request-to-be-sent before the transport is capable of buffering it. - ready!(self.transport().poll_flush(waker)?); + ready!(self.as_mut().transport().poll_flush(waker)?); } loop { - match ready!(self.pending_requests().poll_next_unpin(waker)) { + match ready!(self.as_mut().pending_requests().poll_next_unpin(waker)) { Some(request) => { if request.response_completion.is_canceled() { trace!( @@ -406,7 +406,7 @@ where return Poll::Ready(Some(Ok(request))); } None => { - trace!("[{}] pending_requests closed", self.server_addr()); + trace!("[{}] pending_requests closed", self.as_mut().server_addr()); return Poll::Ready(None); } } @@ -418,27 +418,27 @@ where self: &mut Pin<&mut Self>, waker: &LocalWaker, ) -> PollIo<(context::Context, u64)> { - while let Poll::Pending = self.transport().poll_ready(waker)? { - ready!(self.transport().poll_flush(waker)?); + while let Poll::Pending = self.as_mut().transport().poll_ready(waker)? { + ready!(self.as_mut().transport().poll_flush(waker)?); } loop { - match ready!(self.canceled_requests().poll_next_unpin(waker)) { + match ready!(self.as_mut().canceled_requests().poll_next_unpin(waker)) { Some(request_id) => { - if let Some(in_flight_data) = self.in_flight_requests().remove(&request_id) { - self.in_flight_requests().compact(0.1); + if let Some(in_flight_data) = self.as_mut().in_flight_requests().remove(&request_id) { + self.as_mut().in_flight_requests().compact(0.1); debug!( "[{}/{}] Removed request.", in_flight_data.ctx.trace_id(), - self.server_addr() + self.as_mut().server_addr() ); return Poll::Ready(Some(Ok((in_flight_data.ctx, request_id)))); } } None => { - trace!("[{}] canceled_requests closed.", self.server_addr()); + trace!("[{}] canceled_requests closed.", self.as_mut().server_addr()); return Poll::Ready(None); } } @@ -458,8 +458,8 @@ where deadline: dispatch_request.ctx.deadline, }), }; - self.transport().start_send(request)?; - self.in_flight_requests().insert( + self.as_mut().transport().start_send(request)?; + self.as_mut().in_flight_requests().insert( request_id, InFlightData { ctx: dispatch_request.ctx, @@ -479,20 +479,20 @@ where trace_context: context.trace_context, message: ClientMessageKind::Cancel { request_id }, }; - self.transport().start_send(cancel)?; - trace!("[{}/{}] Cancel message sent.", trace_id, self.server_addr()); + self.as_mut().transport().start_send(cancel)?; + trace!("[{}/{}] Cancel message sent.", trace_id, self.as_mut().server_addr()); Ok(()) } /// Sends a server response to the client task that initiated the associated request. fn complete(self: &mut Pin<&mut Self>, response: Response) -> bool { - if let Some(in_flight_data) = self.in_flight_requests().remove(&response.request_id) { - self.in_flight_requests().compact(0.1); + if let Some(in_flight_data) = self.as_mut().in_flight_requests().remove(&response.request_id) { + self.as_mut().in_flight_requests().compact(0.1); trace!( "[{}/{}] Received response.", in_flight_data.ctx.trace_id(), - self.server_addr() + self.as_mut().server_addr() ); let _ = in_flight_data.response_completion.send(response); return true; @@ -500,7 +500,7 @@ where debug!( "[{}] No in-flight request found for request_id = {}.", - self.server_addr(), + self.as_mut().server_addr(), response.request_id ); @@ -518,14 +518,14 @@ where type Output = io::Result<()>; fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { - trace!("[{}] RequestDispatch::poll", self.server_addr()); + trace!("[{}] RequestDispatch::poll", self.as_mut().server_addr()); loop { match (self.pump_read(waker)?, self.pump_write(waker)?) { (read, write @ Poll::Ready(None)) => { - if self.in_flight_requests().is_empty() { + if self.as_mut().in_flight_requests().is_empty() { info!( "[{}] Shutdown: write half closed, and no requests in flight.", - self.server_addr() + self.as_mut().server_addr() ); return Poll::Ready(Ok(())); } @@ -534,7 +534,7 @@ where _ => { trace!( "[{}] read: {:?}, write: {:?}, (not ready)", - self.server_addr(), + self.as_mut().server_addr(), read, write, ); @@ -545,7 +545,7 @@ where (read @ Poll::Ready(Some(())), write) | (read, write @ Poll::Ready(Some(()))) => { trace!( "[{}] read: {:?}, write: {:?}", - self.server_addr(), + self.as_mut().server_addr(), read, write, ) @@ -553,7 +553,7 @@ where (read, write) => { trace!( "[{}] read: {:?}, write: {:?} (not ready)", - self.server_addr(), + self.as_mut().server_addr(), read, write, ); @@ -640,7 +640,7 @@ where type Output = io::Result; fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll { - match self.future().try_poll(lw) { + match self.as_mut().future().try_poll(lw) { Poll::Pending => Poll::Pending, Poll::Ready(result) => { self.finished().take().expect( @@ -680,10 +680,11 @@ where type Output = Result, Fut::Error>; fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll { - match self.future().try_poll(lw) { + match self.as_mut().future().try_poll(lw) { Poll::Pending => Poll::Pending, Poll::Ready(result) => { let response = self + .as_mut() .response() .take() .expect("MapOk must not be polled after it returned `Poll::Ready`"); @@ -721,7 +722,7 @@ where { type Output = Result; - fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll { + fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll { self.try_chain().poll(lw, |result| match result { Ok(ok) => TryChainAction::Future(ok), Err(err) => TryChainAction::Output(Err(err)), @@ -760,8 +761,8 @@ where { let mut f = Some(f); - // Safe to call `get_mut_unchecked` because we won't move the futures. - let this = unsafe { Pin::get_mut_unchecked(self) }; + // Safe to call `get_unchecked_mut` because we won't move the futures. + let this = unsafe { Pin::get_unchecked_mut(self) }; loop { let output = match this { diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 23c6511..dc87d8c 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -9,7 +9,6 @@ integer_atomics, try_trait, futures_api, - pin, arbitrary_self_types, await_macro, async_await diff --git a/rpc/src/server/filter.rs b/rpc/src/server/filter.rs index 9412f0e..dacec81 100644 --- a/rpc/src/server/filter.rs +++ b/rpc/src/server/filter.rs @@ -107,28 +107,28 @@ impl ConnectionFilter { } }; - let open_connections = *self.open_connections(); - if open_connections >= self.config().max_connections { + let open_connections = *self.as_mut().open_connections(); + if open_connections >= self.as_mut().config().max_connections { warn!( "[{}] Shedding connection because the maximum open connections \ limit is reached ({}/{}).", peer, open_connections, - self.config().max_connections + self.as_mut().config().max_connections ); return NewConnection::Filtered; } let config = self.config.clone(); let open_connections_for_ip = self.increment_connections_for_ip(&peer)?; - *self.open_connections() += 1; + *self.as_mut().open_connections() += 1; debug!( "[{}] Opening channel ({}/{} connections for IP, {} total).", peer, open_connections_for_ip, config.max_connections_per_ip, - self.open_connections(), + self.as_mut().open_connections(), ); NewConnection::Accepted(Channel { @@ -141,19 +141,19 @@ impl ConnectionFilter { } fn handle_closed_connection(self: &mut Pin<&mut Self>, addr: &SocketAddr) { - *self.open_connections() -= 1; + *self.as_mut().open_connections() -= 1; debug!( "[{}] Closing channel. {} open connections remaining.", addr, self.open_connections ); self.decrement_connections_for_ip(&addr); - self.connections_per_ip().compact(0.1); + self.as_mut().connections_per_ip().compact(0.1); } fn increment_connections_for_ip(self: &mut Pin<&mut Self>, peer: &SocketAddr) -> Option { - let max_connections_per_ip = self.config().max_connections_per_ip; + let max_connections_per_ip = self.as_mut().config().max_connections_per_ip; let mut occupied; - let mut connections_per_ip = self.connections_per_ip(); + let mut connections_per_ip = self.as_mut().connections_per_ip(); let occupied = match connections_per_ip.entry(peer.ip()) { Entry::Vacant(vacant) => vacant.insert(0), Entry::Occupied(o) => { @@ -177,7 +177,7 @@ impl ConnectionFilter { } fn decrement_connections_for_ip(self: &mut Pin<&mut Self>, addr: &SocketAddr) { - let should_compact = match self.connections_per_ip().entry(addr.ip()) { + let should_compact = match self.as_mut().connections_per_ip().entry(addr.ip()) { Entry::Vacant(_) => { error!("[{}] Got vacant entry when closing connection.", addr); return; @@ -193,26 +193,26 @@ impl ConnectionFilter { } }; if should_compact { - self.connections_per_ip().compact(0.1); + self.as_mut().connections_per_ip().compact(0.1); } } fn poll_listener( - self: &mut Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &LocalWaker, ) -> PollIo> where S: Stream>, C: Transport, SinkItem = Response> + Send, { - match ready!(self.listener().poll_next_unpin(cx)?) { + match ready!(self.as_mut().listener().poll_next_unpin(cx)?) { Some(codec) => Poll::Ready(Some(Ok(self.handle_new_connection(codec)))), None => Poll::Ready(None), } } fn poll_closed_connections(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll> { - match ready!(self.closed_connections_rx().poll_next_unpin(cx)) { + match ready!(self.as_mut().closed_connections_rx().poll_next_unpin(cx)) { Some(addr) => { self.handle_closed_connection(&addr); Poll::Ready(Ok(())) @@ -231,20 +231,20 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &LocalWaker) -> PollIo> { loop { - match (self.poll_listener(cx)?, self.poll_closed_connections(cx)?) { + match (self.as_mut().poll_listener(cx)?, self.poll_closed_connections(cx)?) { (Poll::Ready(Some(NewConnection::Accepted(channel))), _) => { return Poll::Ready(Some(Ok(channel))) } (Poll::Ready(Some(NewConnection::Filtered)), _) | (_, Poll::Ready(())) => { - trace!("Filtered a connection; {} open.", self.open_connections()); + trace!("Filtered a connection; {} open.", self.as_mut().open_connections()); continue; } (Poll::Pending, Poll::Pending) => return Poll::Pending, (Poll::Ready(None), Poll::Pending) => { - if *self.open_connections() > 0 { + if *self.as_mut().open_connections() > 0 { trace!( "Listener closed; {} open connections.", - self.open_connections() + self.as_mut().open_connections() ); return Poll::Pending; } diff --git a/rpc/src/server/mod.rs b/rpc/src/server/mod.rs index 9c3f637..63bf209 100644 --- a/rpc/src/server/mod.rs +++ b/rpc/src/server/mod.rs @@ -134,12 +134,12 @@ where type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll<()> { - while let Some(channel) = ready!(self.incoming().poll_next(cx)) { + while let Some(channel) = ready!(self.as_mut().incoming().poll_next(cx)) { match channel { Ok(channel) => { let peer = channel.client_addr; if let Err(e) = - crate::spawn(channel.respond_with(self.request_handler().clone())) + crate::spawn(channel.respond_with(self.as_mut().request_handler().clone())) { warn!("[{}] Failed to spawn connection handler: {:?}", peer, e); } @@ -231,25 +231,25 @@ where Resp: Send, { pub(crate) fn start_send( - self: &mut Pin<&mut Self>, + mut self: Pin<&mut Self>, response: Response, ) -> io::Result<()> { - self.transport().start_send(response) + self.as_mut().transport().start_send(response) } - pub(crate) fn poll_ready(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll> { - self.transport().poll_ready(cx) + pub(crate) fn poll_ready(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll> { + self.as_mut().transport().poll_ready(cx) } - pub(crate) fn poll_flush(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll> { - self.transport().poll_flush(cx) + pub(crate) fn poll_flush(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll> { + self.as_mut().transport().poll_flush(cx) } pub(crate) fn poll_next( - self: &mut Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &LocalWaker, ) -> PollIo> { - self.transport().poll_next(cx) + self.as_mut().transport().poll_next(cx) } /// Returns the address of the client connected to the channel. @@ -317,30 +317,30 @@ where /// If at max in-flight requests, check that there's room to immediately write a throttled /// response. fn poll_ready_if_throttling( - self: &mut Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &LocalWaker, ) -> Poll> { if self.in_flight_requests.len() >= self.channel.config.max_in_flight_requests_per_connection { - let peer = self.channel().client_addr; + let peer = self.as_mut().channel().client_addr; - while let Poll::Pending = self.channel().poll_ready(cx)? { + while let Poll::Pending = self.as_mut().channel().poll_ready(cx)? { info!( "[{}] In-flight requests at max ({}), and transport is not ready.", peer, - self.in_flight_requests().len(), + self.as_mut().in_flight_requests().len(), ); - try_ready!(self.channel().poll_flush(cx)); + try_ready!(self.as_mut().channel().poll_flush(cx)); } } Poll::Ready(Ok(())) } - fn pump_read(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> PollIo<()> { - ready!(self.poll_ready_if_throttling(cx)?); + fn pump_read(mut self: Pin<&mut Self>, cx: &LocalWaker) -> PollIo<()> { + ready!(self.as_mut().poll_ready_if_throttling(cx)?); - Poll::Ready(match ready!(self.channel().poll_next(cx)?) { + Poll::Ready(match ready!(self.as_mut().channel().poll_next(cx)?) { Some(message) => { match message.message { ClientMessageKind::Request(request) => { @@ -360,28 +360,28 @@ where } fn pump_write( - self: &mut Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &LocalWaker, read_half_closed: bool, ) -> PollIo<()> { - match self.poll_next_response(cx)? { + match self.as_mut().poll_next_response(cx)? { Poll::Ready(Some((_, response))) => { - self.channel().start_send(response)?; + self.as_mut().channel().start_send(response)?; Poll::Ready(Some(Ok(()))) } Poll::Ready(None) => { // Shutdown can't be done before we finish pumping out remaining responses. - ready!(self.channel().poll_flush(cx)?); + ready!(self.as_mut().channel().poll_flush(cx)?); Poll::Ready(None) } Poll::Pending => { // No more requests to process, so flush any requests buffered in the transport. - ready!(self.channel().poll_flush(cx)?); + ready!(self.as_mut().channel().poll_flush(cx)?); // Being here means there are no staged requests and all written responses are // fully flushed. So, if the read half is closed and there are no in-flight // requests, then we can close the write half. - if read_half_closed && self.in_flight_requests().is_empty() { + if read_half_closed && self.as_mut().in_flight_requests().is_empty() { Poll::Ready(None) } else { Poll::Pending @@ -391,30 +391,31 @@ where } fn poll_next_response( - self: &mut Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &LocalWaker, ) -> PollIo<(Context, Response)> { // Ensure there's room to write a response. - while let Poll::Pending = self.channel().poll_ready(cx)? { - ready!(self.channel().poll_flush(cx)?); + while let Poll::Pending = self.as_mut().channel().poll_ready(cx)? { + ready!(self.as_mut().channel().poll_flush(cx)?); } - let peer = self.channel().client_addr; + let peer = self.as_mut().channel().client_addr; - match ready!(self.pending_responses().poll_next(cx)) { + match ready!(self.as_mut().pending_responses().poll_next(cx)) { Some((ctx, response)) => { if self + .as_mut() .in_flight_requests() .remove(&response.request_id) .is_some() { - self.in_flight_requests().compact(0.1); + self.as_mut().in_flight_requests().compact(0.1); } trace!( "[{}/{}] Staging response. In-flight requests = {}.", ctx.trace_id(), peer, - self.in_flight_requests().len(), + self.as_mut().in_flight_requests().len(), ); Poll::Ready(Some(Ok((ctx, response)))) } @@ -427,30 +428,30 @@ where } fn handle_request( - self: &mut Pin<&mut Self>, + mut self: Pin<&mut Self>, trace_context: trace::Context, request: Request, ) -> io::Result<()> { let request_id = request.id; - let peer = self.channel().client_addr; + let peer = self.as_mut().channel().client_addr; let ctx = Context { deadline: request.deadline, trace_context, }; let request = request.message; - if self.in_flight_requests().len() - >= self.channel().config.max_in_flight_requests_per_connection + if self.as_mut().in_flight_requests().len() + >= self.as_mut().channel().config.max_in_flight_requests_per_connection { debug!( "[{}/{}] Client has reached in-flight request limit ({}/{}).", ctx.trace_id(), peer, - self.in_flight_requests().len(), - self.channel().config.max_in_flight_requests_per_connection + self.as_mut().in_flight_requests().len(), + self.as_mut().channel().config.max_in_flight_requests_per_connection ); - self.channel().start_send(Response { + self.as_mut().channel().start_send(Response { request_id, message: Err(ServerError { kind: io::ErrorKind::WouldBlock, @@ -469,10 +470,10 @@ where format_rfc3339(deadline), timeout, ); - let mut response_tx = self.responses_tx().clone(); + let mut response_tx = self.as_mut().responses_tx().clone(); let trace_id = *ctx.trace_id(); - let response = self.f().clone()(ctx, request); + let response = self.as_mut().f().clone()(ctx, request); let response = deadline_compat::Deadline::new(response, Instant::now() + timeout).then( async move |result| { let response = Response { @@ -496,18 +497,18 @@ where ), ) })?; - self.in_flight_requests().insert(request_id, abort_handle); + self.as_mut().in_flight_requests().insert(request_id, abort_handle); Ok(()) } - fn cancel_request(self: &mut Pin<&mut Self>, trace_context: &trace::Context, request_id: u64) { + fn cancel_request(mut self: Pin<&mut Self>, trace_context: &trace::Context, request_id: u64) { // It's possible the request was already completed, so it's fine // if this is None. - if let Some(cancel_handle) = self.in_flight_requests().remove(&request_id) { - self.in_flight_requests().compact(0.1); + if let Some(cancel_handle) = self.as_mut().in_flight_requests().remove(&request_id) { + self.as_mut().in_flight_requests().compact(0.1); cancel_handle.abort(); - let remaining = self.in_flight_requests().len(); + let remaining = self.as_mut().in_flight_requests().len(); trace!( "[{}/{}] Request canceled. In-flight requests = {}", trace_context.trace_id, @@ -538,8 +539,8 @@ where fn poll(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll> { trace!("[{}] ClientHandler::poll", self.channel.client_addr); loop { - let read = self.pump_read(cx)?; - match (read, self.pump_write(cx, read == Poll::Ready(None))?) { + let read = self.as_mut().pump_read(cx)?; + match (read, self.as_mut().pump_write(cx, read == Poll::Ready(None))?) { (Poll::Ready(None), Poll::Ready(None)) => { info!("[{}] Client disconnected.", self.channel.client_addr); return Poll::Ready(Ok(())); diff --git a/rpc/src/transport/channel.rs b/rpc/src/transport/channel.rs index 97cf7f0..92471f7 100644 --- a/rpc/src/transport/channel.rs +++ b/rpc/src/transport/channel.rs @@ -45,7 +45,7 @@ impl UnboundedChannel { impl Stream for UnboundedChannel { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &LocalWaker) -> PollIo { + fn poll_next(self: Pin<&mut Self>, cx: &LocalWaker) -> PollIo { self.rx().poll_next(cx).map(|option| option.map(Ok)) } } @@ -54,25 +54,25 @@ impl Sink for UnboundedChannel { type SinkItem = SinkItem; type SinkError = io::Error; - fn poll_ready(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &LocalWaker) -> Poll> { self.tx() .poll_ready(cx) .map_err(|_| io::Error::from(io::ErrorKind::NotConnected)) } - fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> { + fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> { self.tx() .start_send(item) .map_err(|_| io::Error::from(io::ErrorKind::NotConnected)) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &LocalWaker) -> Poll> { self.tx() .poll_flush(cx) .map_err(|_| io::Error::from(io::ErrorKind::NotConnected)) } - fn poll_close(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &LocalWaker) -> Poll> { self.tx() .poll_close(cx) .map_err(|_| io::Error::from(io::ErrorKind::NotConnected)) diff --git a/rpc/src/transport/mod.rs b/rpc/src/transport/mod.rs index 212babf..998ee69 100644 --- a/rpc/src/transport/mod.rs +++ b/rpc/src/transport/mod.rs @@ -71,7 +71,7 @@ where { type Item = S::Item; - fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll_next(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { self.inner().poll_next(waker) } } @@ -83,19 +83,19 @@ where type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(mut self: Pin<&mut Self>, item: S::SinkItem) -> Result<(), S::SinkError> { + fn start_send(self: Pin<&mut Self>, item: S::SinkItem) -> Result<(), S::SinkError> { self.inner().start_send(item) } - fn poll_ready(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { self.inner().poll_ready(waker) } - fn poll_flush(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { self.inner().poll_flush(waker) } - fn poll_close(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn poll_close(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { self.inner().poll_close(waker) } } diff --git a/rpc/src/util/deadline_compat.rs b/rpc/src/util/deadline_compat.rs index 7d155d1..d175614 100644 --- a/rpc/src/util/deadline_compat.rs +++ b/rpc/src/util/deadline_compat.rs @@ -52,7 +52,7 @@ where fn poll(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll { // First, try polling the future - match self.future().try_poll(waker) { + match self.as_mut().future().try_poll(waker) { Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)), Poll::Pending => {} Poll::Ready(Err(e)) => return Poll::Ready(Err(timeout::Error::inner(e))), diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index 2439854..7d084ae 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -25,16 +25,16 @@ tarpc-plugins = { path = "../plugins", version = "0.5.0" } rpc = { package = "tarpc-lib", path = "../rpc", version = "0.2" } [target.'cfg(not(test))'.dependencies] -futures-preview = "0.3.0-alpha.9" +futures-preview = "0.3.0-alpha.12" [dev-dependencies] bincode = "1.0" bytes = { version = "0.4", features = ["serde"] } humantime = "1.0" -futures-preview = { version = "0.3.0-alpha.9", features = ["compat", "tokio-compat"] } +futures-preview = { version = "0.3.0-alpha.12", features = ["compat", "tokio-compat"] } bincode-transport = { package = "tarpc-bincode-transport", version = "0.3", path = "../bincode-transport" } -env_logger = "0.5" +env_logger = "0.6" tokio = "0.1" tokio-executor = "0.1" tokio-tcp = "0.1" -pin-utils = "0.1.0-alpha.3" +pin-utils = "0.1.0-alpha.4" diff --git a/tarpc/examples/pubsub.rs b/tarpc/examples/pubsub.rs index 1b90eb5..9575b28 100644 --- a/tarpc/examples/pubsub.rs +++ b/tarpc/examples/pubsub.rs @@ -6,7 +6,6 @@ #![feature( arbitrary_self_types, - pin, futures_api, await_macro, async_await, diff --git a/tarpc/examples/readme.rs b/tarpc/examples/readme.rs index 2c2cf0c..2675b78 100644 --- a/tarpc/examples/readme.rs +++ b/tarpc/examples/readme.rs @@ -6,7 +6,6 @@ #![feature( futures_api, - pin, arbitrary_self_types, await_macro, async_await, diff --git a/tarpc/examples/server_calling_server.rs b/tarpc/examples/server_calling_server.rs index 101e75d..00ee4ab 100644 --- a/tarpc/examples/server_calling_server.rs +++ b/tarpc/examples/server_calling_server.rs @@ -7,7 +7,6 @@ #![feature( existential_type, arbitrary_self_types, - pin, futures_api, await_macro, async_await, diff --git a/tarpc/examples/service_registry.rs b/tarpc/examples/service_registry.rs index b1f8460..2396c7f 100644 --- a/tarpc/examples/service_registry.rs +++ b/tarpc/examples/service_registry.rs @@ -1,5 +1,4 @@ #![feature( - pin, async_await, await_macro, futures_api, @@ -214,7 +213,7 @@ mod registry { fn poll(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll { unsafe { - match Pin::get_mut_unchecked(self) { + match Pin::get_unchecked_mut(self) { Either::Left(car) => Pin::new_unchecked(car).poll(waker), Either::Right(cdr) => Pin::new_unchecked(cdr).poll(waker), } diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index 0bb9bd8..fde50a0 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -10,7 +10,6 @@ #![cfg_attr( test, feature( - pin, futures_api, await_macro, proc_macro_hygiene, diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index d25f94e..7551ed2 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -181,7 +181,7 @@ macro_rules! service { -> ::std::task::Poll<::std::io::Result> { unsafe { - match ::std::pin::Pin::get_mut_unchecked(self) { + match ::std::pin::Pin::get_unchecked_mut(self) { $( ResponseFut::$fn_name(resp) => ::std::pin::Pin::new_unchecked(resp) diff --git a/tarpc/tests/latency.rs b/tarpc/tests/latency.rs index ec30845..a2b9c54 100644 --- a/tarpc/tests/latency.rs +++ b/tarpc/tests/latency.rs @@ -7,7 +7,6 @@ #![feature( test, arbitrary_self_types, - pin, integer_atomics, futures_api, generators, diff --git a/trace/Cargo.toml b/trace/Cargo.toml index 244a2b9..3cabde7 100644 --- a/trace/Cargo.toml +++ b/trace/Cargo.toml @@ -13,7 +13,7 @@ readme = "../README.md" description = "foundations for tracing in tarpc" [dependencies] -rand = "0.5" +rand = "0.6" [dependencies.serde] version = "1.0"