Files
tarpc/tarpc/tests/service_functional.rs
Johann Hemmann 026083d653 Bump tokio from 0.2 to 0.3 (#319)
# Bump `tokio` from 0.2 to 0.3

* `Cargo.toml`:
    * bump `tokio` from 0.2 to 0.3
    * bump `tokio-util` from 0.3 to 0.4
    * remove feature `time` from `tokio`
    * fix alphabetical order of dependencies
* `tarpc::rpc`:
    * `client, server`: `tokio::time::Elapsed` -> `tokio::time::error::Elapsed`
    * `client, transport`, `::tests`: Fix `#[tokio::test]` macro usage
* `tarpc::serde_transport`:
    * `TcpListener.incoming().poll_next(...)` -> `TcpListener.poll_accept(...)`
      -> https://github.com/tokio-rs/tokio/discussions/2983
    * Adapt `AsyncRead`, `AsynWrite` implements in tests
* `README.md`, `tarpc::lib`: Adapt tokio version in docs

# Satisfy clippy

* replace `match`-statements with `matches!(...)`-macro
2020-10-17 17:33:08 -07:00

170 lines
4.4 KiB
Rust

use assert_matches::assert_matches;
use futures::{
future::{join_all, ready, Ready},
prelude::*,
};
use std::io;
use tarpc::{
client::{self},
context, serde_transport,
server::{self, BaseChannel, Channel, Handler},
transport::channel,
};
use tokio::join;
use tokio_serde::formats::Json;
#[tarpc_plugins::service]
trait Service {
async fn add(x: i32, y: i32) -> i32;
async fn hey(name: String) -> String;
}
#[derive(Clone)]
struct Server;
impl Service for Server {
type AddFut = Ready<i32>;
fn add(self, _: context::Context, x: i32, y: i32) -> Self::AddFut {
ready(x + y)
}
type HeyFut = Ready<String>;
fn hey(self, _: context::Context, name: String) -> Self::HeyFut {
ready(format!("Hey, {}.", name))
}
}
#[tokio::test]
async fn sequential() -> io::Result<()> {
let _ = env_logger::try_init();
let (tx, rx) = channel::unbounded();
tokio::spawn(
BaseChannel::new(server::Config::default(), rx)
.respond_with(Server.serve())
.execute(),
);
let mut client = ServiceClient::new(client::Config::default(), tx).spawn()?;
assert_matches!(client.add(context::current(), 1, 2).await, Ok(3));
assert_matches!(
client.hey(context::current(), "Tim".into()).await,
Ok(ref s) if s == "Hey, Tim.");
Ok(())
}
#[cfg(feature = "serde1")]
#[tokio::test]
async fn serde() -> io::Result<()> {
let _ = env_logger::try_init();
let transport = serde_transport::tcp::listen("localhost:56789", Json::default).await?;
let addr = transport.local_addr();
tokio::spawn(
tarpc::Server::default()
.incoming(transport.take(1).filter_map(|r| async { r.ok() }))
.respond_with(Server.serve()),
);
let transport = serde_transport::tcp::connect(addr, Json::default).await?;
let mut client = ServiceClient::new(client::Config::default(), transport).spawn()?;
assert_matches!(client.add(context::current(), 1, 2).await, Ok(3));
assert_matches!(
client.hey(context::current(), "Tim".to_string()).await,
Ok(ref s) if s == "Hey, Tim."
);
Ok(())
}
#[tokio::test]
async fn concurrent() -> io::Result<()> {
let _ = env_logger::try_init();
let (tx, rx) = channel::unbounded();
tokio::spawn(
tarpc::Server::default()
.incoming(stream::once(ready(rx)))
.respond_with(Server.serve()),
);
let client = ServiceClient::new(client::Config::default(), tx).spawn()?;
let mut c = client.clone();
let req1 = c.add(context::current(), 1, 2);
let mut c = client.clone();
let req2 = c.add(context::current(), 3, 4);
let mut c = client.clone();
let req3 = c.hey(context::current(), "Tim".to_string());
assert_matches!(req1.await, Ok(3));
assert_matches!(req2.await, Ok(7));
assert_matches!(req3.await, Ok(ref s) if s == "Hey, Tim.");
Ok(())
}
#[tokio::test]
async fn concurrent_join() -> io::Result<()> {
let _ = env_logger::try_init();
let (tx, rx) = channel::unbounded();
tokio::spawn(
tarpc::Server::default()
.incoming(stream::once(ready(rx)))
.respond_with(Server.serve()),
);
let client = ServiceClient::new(client::Config::default(), tx).spawn()?;
let mut c = client.clone();
let req1 = c.add(context::current(), 1, 2);
let mut c = client.clone();
let req2 = c.add(context::current(), 3, 4);
let mut c = client.clone();
let req3 = c.hey(context::current(), "Tim".to_string());
let (resp1, resp2, resp3) = join!(req1, req2, req3);
assert_matches!(resp1, Ok(3));
assert_matches!(resp2, Ok(7));
assert_matches!(resp3, Ok(ref s) if s == "Hey, Tim.");
Ok(())
}
#[tokio::test]
async fn concurrent_join_all() -> io::Result<()> {
let _ = env_logger::try_init();
let (tx, rx) = channel::unbounded();
tokio::spawn(
tarpc::Server::default()
.incoming(stream::once(ready(rx)))
.respond_with(Server.serve()),
);
let client = ServiceClient::new(client::Config::default(), tx).spawn()?;
let mut c1 = client.clone();
let mut c2 = client.clone();
let req1 = c1.add(context::current(), 1, 2);
let req2 = c2.add(context::current(), 3, 4);
let responses = join_all(vec![req1, req2]).await;
assert_matches!(responses[0], Ok(3));
assert_matches!(responses[1], Ok(7));
Ok(())
}