diff --git a/README.md b/README.md index 3c1ee36..aa73824 100644 --- a/README.md +++ b/README.md @@ -60,8 +60,8 @@ For this example, in addition to tarpc, also add two other dependencies to your `Cargo.toml`: ```toml -futures-preview = "0.3.0-alpha.18" -tokio = "0.2.0-alpha.3" +futures = "0.3" +tokio = "0.2" ``` In the following example, we use an in-process channel for communication between diff --git a/example-service/Cargo.toml b/example-service/Cargo.toml index 6b6fed3..47bfa16 100644 --- a/example-service/Cargo.toml +++ b/example-service/Cargo.toml @@ -14,10 +14,10 @@ description = "An example server built on tarpc." [dependencies] clap = "2.0" -futures-preview = { version = "0.3.0-alpha.18" } +futures = "0.3" serde = { version = "1.0" } -tarpc = { version = "0.18", path = "../tarpc", features = ["json-transport", "serde1"] } -tokio = "0.2.0-alpha.3" +tarpc = { version = "0.18", path = "../tarpc", features = ["full"] } +tokio = "0.2" env_logger = "0.6" [lib] diff --git a/plugins/Cargo.toml b/plugins/Cargo.toml index c1b892e..53d1fbf 100644 --- a/plugins/Cargo.toml +++ b/plugins/Cargo.toml @@ -27,6 +27,6 @@ proc-macro2 = "0.4" proc-macro = true [dev-dependencies] -futures-preview = { version = "0.3.0-alpha.18" } +futures = "0.3" serde = { version = "1.0", features = ["derive"] } tarpc = { path = "../tarpc" } diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index 72014f5..73493b4 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -17,8 +17,8 @@ default = [] serde1 = ["tarpc-plugins/serde1", "serde", "serde/derive"] tokio1 = ["tokio"] -bincode-transport = ["async-bincode", "futures-legacy", "futures-test-preview", "futures-preview/compat", "tokio-io", "tokio-tcp"] -json-transport = ["tokio/codec", "tokio/io", "tokio/net", "tokio-net", "tokio-serde-json"] +bincode-transport = ["async-bincode", "futures-legacy", "futures-test", "futures/compat", "tokio-io", "tokio-tcp"] +json-transport = ["tokio/net", "tokio-serde/json", "tokio-util/codec"] full = ["serde1", "tokio1", "bincode-transport", "json-transport"] @@ -27,35 +27,34 @@ travis-ci = { repository = "google/tarpc" } [dependencies] fnv = "1.0" -futures-preview = { version = "0.3.0-alpha.18" } +futures = "0.3" humantime = "1.0" log = "0.4" pin-project = "0.4" raii-counter = "0.2" rand = "0.7" -tokio-timer = "0.3.0-alpha" serde = { optional = true, version = "1.0", features = ["derive"] } -tokio = { optional = true, version = "0.2.0-alpha" } +tokio = { optional = true, version = "0.2", features = ["time"] } +tokio-util = { optional = true, version = "0.2" } tarpc-plugins = { path = "../plugins" } async-bincode = { optional = true, version = "0.4" } futures-legacy = { optional = true, version = "0.1", package = "futures" } -futures-test-preview = { optional = true, version = "0.3.0-alpha" } +futures-test = { optional = true, version = "0.3" } tokio-io = { optional = true, version = "0.1" } tokio-tcp = { optional = true, version = "0.1" } -tokio-net = { optional = true, version = "0.2.0-alpha" } -tokio-serde-json = { optional = true, version = "0.3" } +tokio-serde = { optional = true, version = "0.5" } [dev-dependencies] assert_matches = "1.0" -bytes = { version = "0.4", features = ["serde"] } +bytes = { version = "0.5", features = ["serde"] } env_logger = "0.6" -futures-preview = { version = "0.3.0-alpha" } +futures = "0.3" humantime = "1.0" log = "0.4" pin-utils = "0.1.0-alpha" -tokio = "0.2.0-alpha" +tokio = { version = "0.2", features = ["full"] } [[example]] name = "server_calling_server" @@ -63,7 +62,7 @@ required-features = ["serde1"] [[example]] name = "readme" -required-features = ["serde1"] +required-features = ["serde1", "tokio1"] [[example]] name = "pubsub" diff --git a/tarpc/src/bincode_transport/mod.rs b/tarpc/src/bincode_transport/mod.rs index 6b2fa26..d81e94e 100644 --- a/tarpc/src/bincode_transport/mod.rs +++ b/tarpc/src/bincode_transport/mod.rs @@ -9,17 +9,10 @@ #![deny(missing_docs, missing_debug_implementations)] use async_bincode::{AsyncBincodeStream, AsyncDestination}; -use futures::{compat::*, prelude::*, ready}; +use futures::{compat::*, prelude::*, ready, task::*}; use pin_project::pin_project; use serde::{Deserialize, Serialize}; -use std::{ - error::Error, - io, - marker::PhantomData, - net::SocketAddr, - pin::Pin, - task::{Context, Poll}, -}; +use std::{error::Error, io, marker::PhantomData, net::SocketAddr, pin::Pin}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_tcp::{TcpListener, TcpStream}; @@ -183,13 +176,9 @@ where mod tests { use super::Transport; use assert_matches::assert_matches; - use futures::{Sink, Stream}; - use futures_test::task::noop_waker_ref; + use futures::{task::*, Sink, Stream}; use pin_utils::pin_mut; - use std::{ - io::Cursor, - task::{Context, Poll}, - }; + use std::io::Cursor; fn ctx() -> Context<'static> { Context::from_waker(&noop_waker_ref()) diff --git a/tarpc/src/json_transport/mod.rs b/tarpc/src/json_transport/mod.rs index 78dd5e9..210b513 100644 --- a/tarpc/src/json_transport/mod.rs +++ b/tarpc/src/json_transport/mod.rs @@ -8,28 +8,24 @@ #![deny(missing_docs)] -use futures::{prelude::*, ready}; +use futures::{prelude::*, ready, task::*}; use pin_project::pin_project; use serde::{Deserialize, Serialize}; -use std::{ - error::Error, - io, - marker::PhantomData, - net::SocketAddr, - pin::Pin, - task::{Context, Poll}, -}; -use tokio::codec::{length_delimited::LengthDelimitedCodec, Framed}; +use std::{error::Error, io, marker::PhantomData, net::SocketAddr, pin::Pin}; use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::net::{TcpListener, TcpStream}; -use tokio_net::ToSocketAddrs; -use tokio_serde_json::*; +use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; +use tokio_serde::{formats::*, *}; +use tokio_util::codec::{length_delimited::LengthDelimitedCodec, Framed}; /// A transport that serializes to, and deserializes from, a [`TcpStream`]. #[pin_project] pub struct Transport { #[pin] - inner: ReadJson, SinkItem>, Item>, + inner: FramedRead< + FramedWrite, SinkItem, Json>, + Item, + Json, + >, } impl Stream for Transport @@ -115,10 +111,13 @@ impl Self { Transport { - inner: ReadJson::new(WriteJson::new(Framed::new( - inner, - LengthDelimitedCodec::new(), - ))), + inner: FramedRead::new( + FramedWrite::new( + Framed::new(inner, LengthDelimitedCodec::new()), + Json::default(), + ), + Json::default(), + ), } } } @@ -142,23 +141,18 @@ where { let listener = TcpListener::bind(addr).await?; let local_addr = listener.local_addr()?; - let incoming = Box::pin(listener.incoming()); Ok(Incoming { - incoming, + listener, local_addr, ghost: PhantomData, }) } -trait IncomingTrait: Stream> + std::fmt::Debug + Send {} -impl> + std::fmt::Debug + Send> IncomingTrait for T {} - /// A [`TcpListener`] that wraps connections in JSON transports. #[pin_project] #[derive(Debug)] pub struct Incoming { - #[pin] - incoming: Pin>, + listener: TcpListener, local_addr: SocketAddr, ghost: PhantomData<(Item, SinkItem)>, } @@ -178,7 +172,7 @@ where type Item = io::Result>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let next = ready!(self.project().incoming.poll_next(cx)?); + let next = ready!(Pin::new(&mut self.project().listener.incoming()).poll_next(cx)?); Poll::Ready(next.map(|conn| Ok(new(conn)))) } } @@ -187,13 +181,11 @@ where mod tests { use super::Transport; use assert_matches::assert_matches; - use futures::task::noop_waker_ref; - use futures::{Sink, Stream}; + use futures::{task::*, Sink, Stream}; use pin_utils::pin_mut; use std::{ io::{self, Cursor}, pin::Pin, - task::{Context, Poll}, }; use tokio::io::{AsyncRead, AsyncWrite}; diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index e2eaf25..9422895 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -63,8 +63,8 @@ //! your `Cargo.toml`: //! //! ```toml -//! futures-preview = "0.3.0-alpha.18" -//! tokio = "0.2.0-alpha.3" +//! futures = "0.3" +//! tokio = "0.2" //! ``` //! //! In the following example, we use an in-process channel for communication between diff --git a/tarpc/src/rpc/client/channel.rs b/tarpc/src/rpc/client/channel.rs index 3464e0a..389b714 100644 --- a/tarpc/src/rpc/client/channel.rs +++ b/tarpc/src/rpc/client/channel.rs @@ -16,8 +16,7 @@ use futures::{ prelude::*, ready, stream::Fuse, - task::Context, - Poll, + task::*, }; use log::{debug, info, trace}; use pin_project::{pin_project, pinned_drop}; @@ -29,7 +28,6 @@ use std::{ Arc, }, }; -use tokio_timer::{timeout, Timeout}; use super::{Config, NewClient}; @@ -118,7 +116,7 @@ impl Channel { response_completion, })), DispatchResponse { - response: Timeout::new(response, timeout), + response: tokio::time::timeout(timeout, response), complete: false, request_id, cancellation, @@ -142,7 +140,7 @@ impl Channel { #[pin_project(PinnedDrop)] #[derive(Debug)] struct DispatchResponse { - response: Timeout>>, + response: tokio::time::Timeout>>, ctx: context::Context, complete: bool, cancellation: RequestCancellation, @@ -168,7 +166,7 @@ impl Future for DispatchResponse { } } } - Err(timeout::Elapsed { .. }) => Err(io::Error::new( + Err(tokio::time::Elapsed { .. }) => Err(io::Error::new( io::ErrorKind::TimedOut, "Client dropped expired request.".to_string(), )), @@ -716,48 +714,35 @@ mod tests { use futures::{ channel::{mpsc, oneshot}, prelude::*, - task::Context, - Poll, + task::*, }; - use futures_test::task::noop_waker_ref; use std::time::Duration; use std::{pin::Pin, sync::atomic::AtomicU64, sync::Arc}; - use tokio::runtime::current_thread; - use tokio_timer::Timeout; - #[test] - fn dispatch_response_cancels_on_timeout() { + #[tokio::test(threaded_scheduler)] + async fn dispatch_response_cancels_on_timeout() { let (_response_completion, response) = oneshot::channel(); let (cancellation, mut canceled_requests) = cancellations(); let resp = DispatchResponse:: { // Timeout in the past should cause resp to error out when polled. - response: Timeout::new(response, Duration::from_secs(0)), + response: tokio::time::timeout(Duration::from_secs(0), response), complete: false, request_id: 3, cancellation, ctx: context::current(), }; - { - pin_utils::pin_mut!(resp); - let timer = tokio_timer::Timer::default(); - let handle = timer.handle(); - let _guard = tokio_timer::set_default(&handle); - - let _ = resp - .as_mut() - .poll(&mut Context::from_waker(&noop_waker_ref())); - // End of block should cause resp.drop() to run, which should send a cancel message. - } + let _ = futures::poll!(resp); + // resp's drop() is run, which should send a cancel message. assert!(canceled_requests.0.try_next().unwrap() == Some(3)); } - #[test] - fn stage_request() { + #[tokio::test(threaded_scheduler)] + async fn stage_request() { let (mut dispatch, mut channel, _server_channel) = set_up(); let dispatch = Pin::new(&mut dispatch); let cx = &mut Context::from_waker(&noop_waker_ref()); - let _resp = send_request(&mut channel, "hi"); + let _resp = send_request(&mut channel, "hi").await; let req = dispatch.poll_next_request(cx).ready(); assert!(req.is_some()); @@ -767,18 +752,14 @@ mod tests { assert_eq!(req.request, "hi".to_string()); } - fn block_on(f: F) -> F::Output { - current_thread::Runtime::new().unwrap().block_on(f) - } - // Regression test for https://github.com/google/tarpc/issues/220 - #[test] - fn stage_request_channel_dropped_doesnt_panic() { + #[tokio::test(threaded_scheduler)] + async fn stage_request_channel_dropped_doesnt_panic() { let (mut dispatch, mut channel, mut server_channel) = set_up(); let mut dispatch = Pin::new(&mut dispatch); let cx = &mut Context::from_waker(&noop_waker_ref()); - let _ = send_request(&mut channel, "hi"); + let _ = send_request(&mut channel, "hi").await; drop(channel); assert!(dispatch.as_mut().poll(cx).is_ready()); @@ -789,17 +770,18 @@ mod tests { message: Ok("hello".into()), _non_exhaustive: (), }, - ); - block_on(dispatch).unwrap(); + ) + .await; + dispatch.await.unwrap(); } - #[test] - fn stage_request_response_future_dropped_is_canceled_before_sending() { + #[tokio::test(threaded_scheduler)] + async fn stage_request_response_future_dropped_is_canceled_before_sending() { let (mut dispatch, mut channel, _server_channel) = set_up(); let dispatch = Pin::new(&mut dispatch); let cx = &mut Context::from_waker(&noop_waker_ref()); - let _ = send_request(&mut channel, "hi"); + let _ = send_request(&mut channel, "hi").await; // Drop the channel so polling returns none if no requests are currently ready. drop(channel); @@ -808,13 +790,13 @@ mod tests { assert!(dispatch.poll_next_request(cx).ready().is_none()); } - #[test] - fn stage_request_response_future_dropped_is_canceled_after_sending() { + #[tokio::test(threaded_scheduler)] + async fn stage_request_response_future_dropped_is_canceled_after_sending() { let (mut dispatch, mut channel, _server_channel) = set_up(); let cx = &mut Context::from_waker(&noop_waker_ref()); let mut dispatch = Pin::new(&mut dispatch); - let req = send_request(&mut channel, "hi"); + let req = send_request(&mut channel, "hi").await; assert!(dispatch.as_mut().pump_write(cx).ready().is_some()); assert!(!dispatch.as_mut().project().in_flight_requests.is_empty()); @@ -830,8 +812,8 @@ mod tests { assert!(dispatch.project().in_flight_requests.is_empty()); } - #[test] - fn stage_request_response_closed_skipped() { + #[tokio::test(threaded_scheduler)] + async fn stage_request_response_closed_skipped() { let (mut dispatch, mut channel, _server_channel) = set_up(); let dispatch = Pin::new(&mut dispatch); let cx = &mut Context::from_waker(&noop_waker_ref()); @@ -839,7 +821,7 @@ mod tests { // Test that a request future that's closed its receiver but not yet canceled its request -- // i.e. still in `drop fn` -- will cause the request to not be added to the in-flight request // map. - let mut resp = send_request(&mut channel, "hi"); + let mut resp = send_request(&mut channel, "hi").await; resp.response.get_mut().close(); assert!(dispatch.poll_next_request(cx).is_pending()); @@ -874,18 +856,21 @@ mod tests { (dispatch, channel, server_channel) } - fn send_request( + async fn send_request( channel: &mut Channel, request: &str, ) -> DispatchResponse { - block_on(channel.send(context::current(), request.to_string())).unwrap() + channel + .send(context::current(), request.to_string()) + .await + .unwrap() } - fn send_response( + async fn send_response( channel: &mut UnboundedChannel, Response>, response: Response, ) { - block_on(channel.send(response)).unwrap(); + channel.send(response).await.unwrap(); } trait PollTest { diff --git a/tarpc/src/rpc/mod.rs b/tarpc/src/rpc/mod.rs index 9b39df9..1843f0a 100644 --- a/tarpc/src/rpc/mod.rs +++ b/tarpc/src/rpc/mod.rs @@ -32,7 +32,7 @@ pub(crate) mod util; pub use crate::{client::Client, server::Server, trace, transport::sealed::Transport}; -use futures::task::Poll; +use futures::task::*; use std::{io, time::SystemTime}; /// A message from a client to a server. diff --git a/tarpc/src/rpc/server/filter.rs b/tarpc/src/rpc/server/filter.rs index 024ea46..d4a53af 100644 --- a/tarpc/src/rpc/server/filter.rs +++ b/tarpc/src/rpc/server/filter.rs @@ -9,14 +9,7 @@ use crate::{ util::Compact, }; use fnv::FnvHashMap; -use futures::{ - channel::mpsc, - future::AbortRegistration, - prelude::*, - ready, - stream::Fuse, - task::{Context, Poll}, -}; +use futures::{channel::mpsc, future::AbortRegistration, prelude::*, ready, stream::Fuse, task::*}; use log::{debug, info, trace}; use pin_project::pin_project; use raii_counter::{Counter, WeakCounter}; @@ -301,7 +294,7 @@ where #[cfg(test)] fn ctx() -> Context<'static> { - use futures_test::task::noop_waker_ref; + use futures::task::*; Context::from_waker(&noop_waker_ref()) } diff --git a/tarpc/src/rpc/server/mod.rs b/tarpc/src/rpc/server/mod.rs index 5a22052..880e5e3 100644 --- a/tarpc/src/rpc/server/mod.rs +++ b/tarpc/src/rpc/server/mod.rs @@ -17,13 +17,13 @@ use futures::{ prelude::*, ready, stream::Fuse, - task::{Context, Poll}, + task::*, }; use humantime::format_rfc3339; use log::{debug, trace}; use pin_project::pin_project; use std::{fmt, hash::Hash, io, marker::PhantomData, pin::Pin, time::SystemTime}; -use tokio_timer::{timeout, Timeout}; +use tokio::time::Timeout; mod filter; #[cfg(test)] @@ -487,7 +487,7 @@ where request_id, ctx, deadline, - f: Timeout::new(response, timeout), + f: tokio::time::timeout(timeout, response), response: None, response_tx: self.as_mut().project().responses_tx.clone(), }; @@ -554,7 +554,7 @@ where request_id: self.request_id, message: match result { Ok(message) => Ok(message), - Err(timeout::Elapsed { .. }) => { + Err(tokio::time::Elapsed { .. }) => { debug!( "[{}] Response did not complete before deadline of {}s.", self.ctx.trace_id(), diff --git a/tarpc/src/rpc/server/testing.rs b/tarpc/src/rpc/server/testing.rs index 26bc1df..c155fef 100644 --- a/tarpc/src/rpc/server/testing.rs +++ b/tarpc/src/rpc/server/testing.rs @@ -1,14 +1,15 @@ use crate::server::{Channel, Config}; use crate::{context, Request, Response}; use fnv::FnvHashSet; -use futures::future::{AbortHandle, AbortRegistration}; -use futures::{Sink, Stream}; -use futures_test::task::noop_waker_ref; +use futures::{ + future::{AbortHandle, AbortRegistration}, + task::*, + Sink, Stream, +}; use pin_project::pin_project; use std::collections::VecDeque; use std::io; use std::pin::Pin; -use std::task::{Context, Poll}; use std::time::SystemTime; #[pin_project] @@ -27,8 +28,8 @@ where { type Item = In; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.project().stream.poll_next(cx) + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + Poll::Ready(self.project().stream.pop_front()) } } @@ -98,10 +99,10 @@ impl FakeChannel>, Response> { impl FakeChannel<(), ()> { pub fn default() -> FakeChannel>, Response> { FakeChannel { - stream: VecDeque::default(), - sink: VecDeque::default(), - config: Config::default(), - in_flight_requests: FnvHashSet::default(), + stream: Default::default(), + sink: Default::default(), + config: Default::default(), + in_flight_requests: Default::default(), } } } diff --git a/tarpc/src/rpc/server/throttle.rs b/tarpc/src/rpc/server/throttle.rs index 7da7080..e3977c9 100644 --- a/tarpc/src/rpc/server/throttle.rs +++ b/tarpc/src/rpc/server/throttle.rs @@ -1,11 +1,6 @@ use super::{Channel, Config}; use crate::{Response, ServerError}; -use futures::{ - future::AbortRegistration, - prelude::*, - ready, - task::{Context, Poll}, -}; +use futures::{future::AbortRegistration, prelude::*, ready, task::*}; use log::debug; use pin_project::pin_project; use std::{io, pin::Pin}; diff --git a/tarpc/src/rpc/transport/channel.rs b/tarpc/src/rpc/transport/channel.rs index 45cb0fc..fb9b591 100644 --- a/tarpc/src/rpc/transport/channel.rs +++ b/tarpc/src/rpc/transport/channel.rs @@ -7,7 +7,7 @@ //! Transports backed by in-memory channels. use crate::PollIo; -use futures::{channel::mpsc, task::Context, Poll, Sink, Stream}; +use futures::{channel::mpsc, task::*, Sink, Stream}; use pin_project::pin_project; use std::io; use std::pin::Pin; @@ -89,7 +89,8 @@ mod tests { use log::trace; use std::io; - #[tokio::test] + #[cfg(feature = "tokio1")] + #[tokio::test(threaded_scheduler)] async fn integration() -> io::Result<()> { let _ = env_logger::try_init(); diff --git a/tarpc/tests/service_functional.rs b/tarpc/tests/service_functional.rs index a1ee8d6..6d9d2a6 100644 --- a/tarpc/tests/service_functional.rs +++ b/tarpc/tests/service_functional.rs @@ -3,9 +3,9 @@ use futures::{ future::{ready, Ready}, prelude::*, }; -use std::{io, rc::Rc}; +use std::io; use tarpc::{ - client::{self, NewClient}, + client::{self}, context, json_transport, server::{self, BaseChannel, Channel, Handler}, transport::channel, @@ -34,7 +34,7 @@ impl Service for Server { } } -#[tokio::test] +#[tokio::test(threaded_scheduler)] async fn sequential() -> io::Result<()> { let _ = env_logger::try_init(); @@ -57,7 +57,7 @@ async fn sequential() -> io::Result<()> { } #[cfg(feature = "serde1")] -#[tokio::test] +#[tokio::test(threaded_scheduler)] async fn serde() -> io::Result<()> { let _ = env_logger::try_init(); @@ -81,7 +81,7 @@ async fn serde() -> io::Result<()> { Ok(()) } -#[tokio::test] +#[tokio::test(threaded_scheduler)] async fn concurrent() -> io::Result<()> { let _ = env_logger::try_init(); @@ -109,64 +109,3 @@ async fn concurrent() -> io::Result<()> { Ok(()) } - -#[tarpc::service(derive_serde = false)] -trait InMemory { - async fn strong_count(rc: Rc<()>) -> usize; - async fn weak_count(rc: Rc<()>) -> usize; -} - -impl InMemory for () { - type StrongCountFut = Ready; - fn strong_count(self, _: context::Context, rc: Rc<()>) -> Self::StrongCountFut { - ready(Rc::strong_count(&rc)) - } - - type WeakCountFut = Ready; - fn weak_count(self, _: context::Context, rc: Rc<()>) -> Self::WeakCountFut { - ready(Rc::weak_count(&rc)) - } -} - -#[test] -fn in_memory_single_threaded() -> io::Result<()> { - use log::warn; - - let _ = env_logger::try_init(); - let mut runtime = tokio::runtime::current_thread::Runtime::new()?; - - let (tx, rx) = channel::unbounded(); - - let server = BaseChannel::new(server::Config::default(), rx) - .respond_with(().serve()) - .try_for_each(|r| async move { Ok(r.await) }); - runtime.spawn(async { - if let Err(e) = server.await { - warn!("Error while running server: {}", e); - } - }); - - let NewClient { - mut client, - dispatch, - } = InMemoryClient::new(client::Config::default(), tx); - runtime.spawn(async move { - if let Err(e) = dispatch.await { - warn!("Error while running client dispatch: {}", e) - } - }); - - let rc = Rc::new(()); - assert_matches!( - runtime.block_on(client.strong_count(context::current(), rc.clone())), - Ok(2) - ); - - let _weak = Rc::downgrade(&rc); - assert_matches!( - runtime.block_on(client.weak_count(context::current(), rc)), - Ok(1) - ); - - Ok(()) -}