diff --git a/README.md b/README.md index 5864a03..c71409e 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ For a more real-world example, see [example-service](example-service). First, let's set up the dependencies and service definition. ```rust -#![feature(arbitrary_self_types, async_await, proc_macro_hygiene)] +#![feature(async_await, proc_macro_hygiene)] # extern crate futures; use futures::{ @@ -86,7 +86,7 @@ This service definition generates a trait called `Service`. Next we need to implement it for our Server struct. ```rust -# #![feature(arbitrary_self_types, async_await, proc_macro_hygiene)] +# #![feature(async_await, proc_macro_hygiene)] # extern crate futures; # # use futures::{ @@ -132,7 +132,7 @@ tarpc also ships a that uses bincode over TCP. ```rust -# #![feature(arbitrary_self_types, async_await, proc_macro_hygiene)] +# #![feature(async_await, proc_macro_hygiene)] # extern crate futures; # # use futures::{ @@ -203,7 +203,7 @@ call `tarpc::init()` to initialize the executor tarpc uses internally to run background tasks for the client and server. ```rust -# #![feature(arbitrary_self_types, async_await, proc_macro_hygiene)] +# #![feature(async_await, proc_macro_hygiene)] # extern crate futures; # # use futures::{ diff --git a/bincode-transport/src/lib.rs b/bincode-transport/src/lib.rs index 80b8ff1..e6582fa 100644 --- a/bincode-transport/src/lib.rs +++ b/bincode-transport/src/lib.rs @@ -6,7 +6,7 @@ //! A TCP [`Transport`] that serializes as bincode. -#![feature(arbitrary_self_types, async_await)] +#![feature(async_await)] #![deny(missing_docs, missing_debug_implementations)] use async_bincode::{AsyncBincodeStream, AsyncDestination}; diff --git a/example-service/src/client.rs b/example-service/src/client.rs index cbaa04d..4e69e61 100644 --- a/example-service/src/client.rs +++ b/example-service/src/client.rs @@ -4,7 +4,7 @@ // license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. -#![feature(arbitrary_self_types, async_await)] +#![feature(async_await)] use clap::{App, Arg}; use futures::{compat::Executor01CompatExt, prelude::*}; diff --git a/example-service/src/lib.rs b/example-service/src/lib.rs index eb0246e..01c43ca 100644 --- a/example-service/src/lib.rs +++ b/example-service/src/lib.rs @@ -4,7 +4,7 @@ // license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. -#![feature(arbitrary_self_types, async_await, proc_macro_hygiene)] +#![feature(async_await, proc_macro_hygiene)] // This is the service definition. It looks a lot like a trait definition. // It defines one RPC, hello, which takes one arg, name, and returns a String. diff --git a/example-service/src/server.rs b/example-service/src/server.rs index c0312f1..d5b30a2 100644 --- a/example-service/src/server.rs +++ b/example-service/src/server.rs @@ -4,7 +4,7 @@ // license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. -#![feature(arbitrary_self_types, async_await)] +#![feature(async_await)] use clap::{App, Arg}; use futures::{ diff --git a/json-transport/src/lib.rs b/json-transport/src/lib.rs index c12f059..26a0086 100644 --- a/json-transport/src/lib.rs +++ b/json-transport/src/lib.rs @@ -6,7 +6,7 @@ //! A TCP [`Transport`] that serializes as JSON. -#![feature(arbitrary_self_types, async_await)] +#![feature(async_await)] #![deny(missing_docs)] use futures::{compat::*, prelude::*, ready}; diff --git a/rpc/src/client/channel.rs b/rpc/src/client/channel.rs index ecd6913..0eaa126 100644 --- a/rpc/src/client/channel.rs +++ b/rpc/src/client/channel.rs @@ -303,7 +303,7 @@ where unsafe_pinned!(pending_requests: Fuse>>); unsafe_pinned!(transport: Fuse); - fn pump_read(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> { + fn pump_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> { Poll::Ready(match ready!(self.as_mut().transport().poll_next(cx)?) { Some(response) => { self.complete(response); @@ -313,24 +313,24 @@ where }) } - fn pump_write(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> { + fn pump_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> { enum ReceiverStatus { NotReady, Closed, } - let pending_requests_status = match self.poll_next_request(cx)? { + let pending_requests_status = match self.as_mut().poll_next_request(cx)? { Poll::Ready(Some(dispatch_request)) => { - self.write_request(dispatch_request)?; + self.as_mut().write_request(dispatch_request)?; return Poll::Ready(Some(Ok(()))); } Poll::Ready(None) => ReceiverStatus::Closed, Poll::Pending => ReceiverStatus::NotReady, }; - let canceled_requests_status = match self.poll_next_cancellation(cx)? { + let canceled_requests_status = match self.as_mut().poll_next_cancellation(cx)? { Poll::Ready(Some((context, request_id))) => { - self.write_cancel(context, request_id)?; + self.as_mut().write_cancel(context, request_id)?; return Poll::Ready(Some(Ok(()))); } Poll::Ready(None) => ReceiverStatus::Closed, @@ -355,7 +355,7 @@ where /// Yields the next pending request, if one is ready to be sent. fn poll_next_request( - self: &mut Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> PollIo> { if self.as_mut().in_flight_requests().len() >= self.config.max_in_flight_requests { @@ -395,7 +395,7 @@ where /// Yields the next pending cancellation, and, if one is ready, cancels the associated request. fn poll_next_cancellation( - self: &mut Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> PollIo<(context::Context, u64)> { while let Poll::Pending = self.as_mut().transport().poll_ready(cx)? { @@ -420,7 +420,7 @@ where } fn write_request( - self: &mut Pin<&mut Self>, + mut self: Pin<&mut Self>, dispatch_request: DispatchRequest, ) -> io::Result<()> { let request_id = dispatch_request.request_id; @@ -444,7 +444,7 @@ where } fn write_cancel( - self: &mut Pin<&mut Self>, + mut self: Pin<&mut Self>, context: context::Context, request_id: u64, ) -> io::Result<()> { @@ -459,7 +459,7 @@ where } /// Sends a server response to the client task that initiated the associated request. - fn complete(self: &mut Pin<&mut Self>, response: Response) -> bool { + fn complete(mut self: Pin<&mut Self>, response: Response) -> bool { if let Some(in_flight_data) = self .as_mut() .in_flight_requests() @@ -492,7 +492,7 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { - match (self.pump_read(cx)?, self.pump_write(cx)?) { + match (self.as_mut().pump_read(cx)?, self.as_mut().pump_write(cx)?) { (read, Poll::Ready(None)) => { if self.as_mut().in_flight_requests().is_empty() { info!("Shutdown: write half closed, and no requests in flight."); @@ -803,7 +803,7 @@ mod tests { #[test] fn stage_request() { let (mut dispatch, mut channel, _server_channel) = set_up(); - let mut dispatch = Pin::new(&mut dispatch); + let dispatch = Pin::new(&mut dispatch); let cx = &mut Context::from_waker(&noop_waker_ref()); let _resp = send_request(&mut channel, "hi"); @@ -840,7 +840,7 @@ mod tests { #[test] fn stage_request_response_future_dropped_is_canceled_before_sending() { let (mut dispatch, mut channel, _server_channel) = set_up(); - let mut dispatch = Pin::new(&mut dispatch); + let dispatch = Pin::new(&mut dispatch); let cx = &mut Context::from_waker(&noop_waker_ref()); let _ = send_request(&mut channel, "hi"); @@ -877,7 +877,7 @@ mod tests { #[test] fn stage_request_response_closed_skipped() { let (mut dispatch, mut channel, _server_channel) = set_up(); - let mut dispatch = Pin::new(&mut dispatch); + let dispatch = Pin::new(&mut dispatch); let cx = &mut Context::from_waker(&noop_waker_ref()); // Test that a request future that's closed its receiver but not yet canceled its request -- diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 7823558..ff23207 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -7,7 +7,6 @@ #![feature( weak_counts, non_exhaustive, - arbitrary_self_types, async_await, trait_alias, )] diff --git a/rpc/src/server/filter.rs b/rpc/src/server/filter.rs index 78419bf..c081eda 100644 --- a/rpc/src/server/filter.rs +++ b/rpc/src/server/filter.rs @@ -173,11 +173,11 @@ where F: Fn(&S::Item) -> K, { fn handle_new_channel( - self: &mut Pin<&mut Self>, + mut self: Pin<&mut Self>, stream: S::Item, ) -> Result, K> { let key = self.as_mut().keymaker()(&stream); - let tracker = self.increment_channels_for_key(key.clone())?; + let tracker = self.as_mut().increment_channels_for_key(key.clone())?; trace!( "[{}] Opening channel ({}/{}) channels for key.", @@ -192,7 +192,7 @@ where }) } - fn increment_channels_for_key(self: &mut Pin<&mut Self>, key: K) -> Result>, K> { + fn increment_channels_for_key(mut self: Pin<&mut Self>, key: K) -> Result>, K> { let channels_per_key = self.channels_per_key; let dropped_keys = self.dropped_keys_tx.clone(); let key_counts = &mut self.as_mut().key_counts(); @@ -239,7 +239,7 @@ where } } - fn poll_closed_channels(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + fn poll_closed_channels(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { match ready!(self.as_mut().dropped_keys().poll_next_unpin(cx)) { Some(key) => { debug!("All channels dropped for key [{}]", key); @@ -267,7 +267,7 @@ where loop { match ( self.as_mut().poll_listener(cx), - self.poll_closed_channels(cx), + self.as_mut().poll_closed_channels(cx), ) { (Poll::Ready(Some(Ok(channel))), _) => { return Poll::Ready(Some(channel)); @@ -358,9 +358,9 @@ fn channel_filter_increment_channels_for_key() { let (_, listener) = mpsc::unbounded(); let filter = ChannelFilter::new(listener, 2, |chan: &TestChannel| chan.key); pin_mut!(filter); - let tracker1 = filter.increment_channels_for_key("key").unwrap(); + let tracker1 = filter.as_mut().increment_channels_for_key("key").unwrap(); assert_eq!(Arc::strong_count(&tracker1), 1); - let tracker2 = filter.increment_channels_for_key("key").unwrap(); + let tracker2 = filter.as_mut().increment_channels_for_key("key").unwrap(); assert_eq!(Arc::strong_count(&tracker1), 2); assert_matches!(filter.increment_channels_for_key("key"), Err("key")); drop(tracker2); @@ -380,11 +380,13 @@ fn channel_filter_handle_new_channel() { let filter = ChannelFilter::new(listener, 2, |chan: &TestChannel| chan.key); pin_mut!(filter); let channel1 = filter + .as_mut() .handle_new_channel(TestChannel { key: "key" }) .unwrap(); assert_eq!(Arc::strong_count(&channel1.tracker), 1); let channel2 = filter + .as_mut() .handle_new_channel(TestChannel { key: "key" }) .unwrap(); assert_eq!(Arc::strong_count(&channel1.tracker), 2); @@ -454,7 +456,7 @@ fn channel_filter_poll_closed_channels() { assert_eq!(filter.key_counts.len(), 1); drop(channel); - assert_matches!(filter.poll_closed_channels(&mut ctx()), Poll::Ready(())); + assert_matches!(filter.as_mut().poll_closed_channels(&mut ctx()), Poll::Ready(())); assert!(filter.key_counts.is_empty()); } diff --git a/tarpc/examples/pubsub.rs b/tarpc/examples/pubsub.rs index 83a53f2..7ec0347 100644 --- a/tarpc/examples/pubsub.rs +++ b/tarpc/examples/pubsub.rs @@ -5,7 +5,6 @@ // https://opensource.org/licenses/MIT. #![feature( - arbitrary_self_types, async_await, existential_type, proc_macro_hygiene diff --git a/tarpc/examples/readme.rs b/tarpc/examples/readme.rs index adf95dc..2996430 100644 --- a/tarpc/examples/readme.rs +++ b/tarpc/examples/readme.rs @@ -4,7 +4,7 @@ // license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. -#![feature(arbitrary_self_types, async_await, proc_macro_hygiene)] +#![feature(async_await, proc_macro_hygiene)] use futures::{ compat::Executor01CompatExt, diff --git a/tarpc/examples/server_calling_server.rs b/tarpc/examples/server_calling_server.rs index 2dc532d..a8cd11e 100644 --- a/tarpc/examples/server_calling_server.rs +++ b/tarpc/examples/server_calling_server.rs @@ -6,7 +6,6 @@ #![feature( existential_type, - arbitrary_self_types, async_await, proc_macro_hygiene )] diff --git a/tarpc/examples/service_registry.rs b/tarpc/examples/service_registry.rs index df732e8..c151fd7 100644 --- a/tarpc/examples/service_registry.rs +++ b/tarpc/examples/service_registry.rs @@ -1,4 +1,4 @@ -#![feature(async_await, arbitrary_self_types, proc_macro_hygiene)] +#![feature(async_await, proc_macro_hygiene)] mod registry { use bytes::Bytes; diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index 6af5a11..35e1be6 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -7,7 +7,7 @@ #![doc(include = "../README.md")] #![deny(missing_docs, missing_debug_implementations)] #![feature(async_await, external_doc)] -#![cfg_attr(test, feature(proc_macro_hygiene, arbitrary_self_types))] +#![cfg_attr(test, feature(proc_macro_hygiene))] #[doc(hidden)] pub use futures; diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index 21f83ec..1fde331 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -30,7 +30,7 @@ macro_rules! add_serde_if_enabled { /// Rpc methods are specified, mirroring trait syntax: /// /// ``` -/// # #![feature(arbitrary_self_types, async_await, proc_macro_hygiene)] +/// # #![feature(async_await, proc_macro_hygiene)] /// # fn main() {} /// # tarpc::service! { /// /// Say hello