mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-02-23 15:49:54 +01:00
Add concurrent tests using join and join_all
These tests are essentially copies of the `concurrent` test, specifically using `join` and `join_all`. Note that for the `join_all` example to work, all of the `Client` clones must be created before *any* requests are added, otherwise there will be a lifetime problem with the second request, saying that second client, `c2`, is still borrowed when `req1` is dropped. It would require a larger redesign to fix this issue.
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
use assert_matches::assert_matches;
|
||||
use futures::{
|
||||
future::{ready, Ready},
|
||||
future::{join_all, ready, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
use std::io;
|
||||
@@ -10,6 +10,7 @@ use tarpc::{
|
||||
server::{self, BaseChannel, Channel, Handler},
|
||||
transport::channel,
|
||||
};
|
||||
use tokio::join;
|
||||
use tokio_serde::formats::Json;
|
||||
|
||||
#[tarpc_plugins::service]
|
||||
@@ -110,3 +111,59 @@ async fn concurrent() -> io::Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(threaded_scheduler)]
|
||||
async fn concurrent_join() -> io::Result<()> {
|
||||
let _ = env_logger::try_init();
|
||||
|
||||
let (tx, rx) = channel::unbounded();
|
||||
tokio::spawn(
|
||||
tarpc::Server::default()
|
||||
.incoming(stream::once(ready(rx)))
|
||||
.respond_with(Server.serve()),
|
||||
);
|
||||
|
||||
let client = ServiceClient::new(client::Config::default(), tx).spawn()?;
|
||||
|
||||
let mut c = client.clone();
|
||||
let req1 = c.add(context::current(), 1, 2);
|
||||
|
||||
let mut c = client.clone();
|
||||
let req2 = c.add(context::current(), 3, 4);
|
||||
|
||||
let mut c = client.clone();
|
||||
let req3 = c.hey(context::current(), "Tim".to_string());
|
||||
|
||||
let (resp1, resp2, resp3) = join!(req1, req2, req3);
|
||||
assert_matches!(resp1, Ok(3));
|
||||
assert_matches!(resp2, Ok(7));
|
||||
assert_matches!(resp3, Ok(ref s) if s == "Hey, Tim.");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(threaded_scheduler)]
|
||||
async fn concurrent_join_all() -> io::Result<()> {
|
||||
let _ = env_logger::try_init();
|
||||
|
||||
let (tx, rx) = channel::unbounded();
|
||||
tokio::spawn(
|
||||
tarpc::Server::default()
|
||||
.incoming(stream::once(ready(rx)))
|
||||
.respond_with(Server.serve()),
|
||||
);
|
||||
|
||||
let client = ServiceClient::new(client::Config::default(), tx).spawn()?;
|
||||
|
||||
let mut c1 = client.clone();
|
||||
let mut c2 = client.clone();
|
||||
|
||||
let req1 = c1.add(context::current(), 1, 2);
|
||||
let req2 = c2.add(context::current(), 3, 4);
|
||||
|
||||
let responses = join_all(vec![req1, req2]).await;
|
||||
assert_matches!(responses[0], Ok(3));
|
||||
assert_matches!(responses[1], Ok(7));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user