Port to runtime crate

This commit is contained in:
Artem Vorotnikov
2019-07-24 01:59:07 +03:00
committed by Tim
parent 650c60fe44
commit 49f2641e3c
14 changed files with 140 additions and 315 deletions

View File

@@ -124,7 +124,7 @@ impl Service for HelloServer {
}
```
Next let's write a function to start our server. While this example uses an
Lastly let's write our `main` that will start the server. While this example uses an
[in-process
channel](https://docs.rs/tarpc/0.18.0/tarpc/transport/channel/struct.UnboundedChannel.html),
tarpc also ships a
@@ -136,7 +136,6 @@ that uses bincode over TCP.
# extern crate futures;
#
# use futures::{
# compat::Executor01CompatExt,
# future::{self, Ready},
# prelude::*,
# };
@@ -169,7 +168,8 @@ that uses bincode over TCP.
# }
# }
#
async fn run() -> io::Result<()> {
#[runtime::main(runtime_tokio::Tokio)]
async fn main() -> io::Result<()> {
let (client_transport, server_transport) = tarpc::transport::channel::unbounded();
let server = server::new(server::Config::default())
@@ -180,7 +180,7 @@ async fn run() -> io::Result<()> {
// the generated Service trait.
.respond_with(serve(HelloServer));
tokio::spawn(server.unit_error().boxed().compat());
let _ = runtime::spawn(server);
// new_stub is generated by the service! macro. Like Server, it takes a config and any
// Transport as input, and returns a Client, also generated by the macro.
@@ -198,86 +198,6 @@ async fn run() -> io::Result<()> {
}
```
Lastly, we'll call `run()` from `main`. Before running a tarpc server or client,
call `tarpc::init()` to initialize the executor tarpc uses internally to run
background tasks for the client and server.
```rust
# #![feature(async_await, proc_macro_hygiene)]
# extern crate futures;
#
# use futures::{
# compat::Executor01CompatExt,
# future::{self, Ready},
# prelude::*,
# };
# use tarpc::{
# client, context,
# server::{self, Handler},
# };
# use std::io;
#
# // This is the service definition. It looks a lot like a trait definition.
# // It defines one RPC, hello, which takes one arg, name, and returns a String.
# tarpc::service! {
# /// Returns a greeting for name.
# rpc hello(name: String) -> String;
# }
#
# // This is the type that implements the generated Service trait. It is the business logic
# // and is used to start the server.
# #[derive(Clone)]
# struct HelloServer;
#
# impl Service for HelloServer {
# // Each defined rpc generates two items in the trait, a fn that serves the RPC, and
# // an associated type representing the future output by the fn.
#
# type HelloFut = Ready<String>;
#
# fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
# future::ready(format!("Hello, {}!", name))
# }
# }
#
# async fn run() -> io::Result<()> {
# let (client_transport, server_transport) = tarpc::transport::channel::unbounded();
#
# let server = server::new(server::Config::default())
# // incoming() takes a stream of transports such as would be returned by
# // TcpListener::incoming (but a stream instead of an iterator).
# .incoming(stream::once(future::ready(server_transport)))
# // serve is generated by the service! macro. It takes as input any type implementing
# // the generated Service trait.
# .respond_with(serve(HelloServer));
#
# tokio::spawn(server.unit_error().boxed().compat());
#
# // new_stub is generated by the service! macro. Like Server, it takes a config and any
# // Transport as input, and returns a Client, also generated by the macro.
# // by the service mcro.
# let mut client = new_stub(client::Config::default(), client_transport).await?;
#
# // The client has an RPC method for each RPC defined in service!. It takes the same args
# // as defined, with the addition of a Context, which is always the first arg. The Context
# // specifies a deadline and trace information which can be helpful in debugging requests.
# let hello = client.hello(context::current(), "Stim".to_string()).await?;
#
# println!("{}", hello);
#
# Ok(())
# }
#
fn main() {
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(run()
.map_err(|e| eprintln!("Oh no: {}", e))
.boxed()
.compat(),
);
}
```
## Service Documentation

View File

@@ -16,9 +16,10 @@ description = "An example server built on tarpc."
json-transport = { package = "tarpc-json-transport", version = "0.1", path = "../json-transport" }
clap = "2.0"
futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] }
runtime = "0.3.0-alpha.6"
runtime-tokio = "0.3.0-alpha.5"
serde = { version = "1.0" }
tarpc = { version = "0.18", path = "../tarpc", features = ["serde1"] }
tokio = "0.1"
env_logger = "0.6"
[lib]

View File

@@ -7,29 +7,11 @@
#![feature(async_await)]
use clap::{App, Arg};
use futures::{compat::Executor01CompatExt, prelude::*};
use std::{io, net::SocketAddr};
use std::io;
use tarpc::{client, context};
async fn run(server_addr: SocketAddr, name: String) -> io::Result<()> {
let transport = json_transport::connect(&server_addr).await?;
// new_stub is generated by the service! macro. Like Server, it takes a config and any
// Transport as input, and returns a Client, also generated by the macro.
// by the service mcro.
let mut client = service::new_stub(client::Config::default(), transport).await?;
// The client has an RPC method for each RPC defined in service!. It takes the same args
// as defined, with the addition of a Context, which is always the first arg. The Context
// specifies a deadline and trace information which can be helpful in debugging requests.
let hello = client.hello(context::current(), name).await?;
println!("{}", hello);
Ok(())
}
fn main() {
#[runtime::main(runtime_tokio::Tokio)]
async fn main() -> io::Result<()> {
let flags = App::new("Hello Client")
.version("0.1")
.author("Tim <tikue@google.com>")
@@ -53,21 +35,26 @@ fn main() {
)
.get_matches();
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
let server_addr = flags.value_of("server_addr").unwrap();
let server_addr = server_addr
.parse()
.unwrap_or_else(|e| panic!(r#"--server_addr value "{}" invalid: {}"#, server_addr, e));
let name = flags.value_of("name").unwrap();
let name = flags.value_of("name").unwrap().into();
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
let transport = json_transport::connect(&server_addr).await?;
tokio::run(
run(server_addr, name.into())
.map_err(|e| eprintln!("Oh no: {}", e))
.boxed()
.compat(),
);
// new_stub is generated by the service! macro. Like Server, it takes a config and any
// Transport as input, and returns a Client, also generated by the macro.
// by the service mcro.
let mut client = service::new_stub(client::Config::default(), transport).await?;
// The client has an RPC method for each RPC defined in service!. It takes the same args
// as defined, with the addition of a Context, which is always the first arg. The Context
// specifies a deadline and trace information which can be helpful in debugging requests.
let hello = client.hello(context::current(), name).await?;
println!("{}", hello);
Ok(())
}

View File

@@ -8,7 +8,6 @@
use clap::{App, Arg};
use futures::{
compat::Executor01CompatExt,
future::{self, Ready},
prelude::*,
};
@@ -37,30 +36,8 @@ impl service::Service for HelloServer {
}
}
async fn run(server_addr: SocketAddr) -> io::Result<()> {
// bincode_transport is provided by the associated crate bincode-transport. It makes it easy
// to start up a serde-powered bincode serialization strategy over TCP.
json_transport::listen(&server_addr)?
// Ignore accept errors.
.filter_map(|r| future::ready(r.ok()))
.map(server::BaseChannel::with_defaults)
// Limit channels to 1 per IP.
.max_channels_per_key(1, |t| t.as_ref().peer_addr().unwrap().ip())
// serve is generated by the service! macro. It takes as input any type implementing
// the generated Service trait.
.map(|channel| {
let server = HelloServer(channel.as_ref().as_ref().peer_addr().unwrap());
channel.respond_with(service::serve(server))
})
// Max 10 channels.
.buffer_unordered(10)
.for_each(|_| futures::future::ready(()))
.await;
Ok(())
}
fn main() {
#[runtime::main(runtime_tokio::Tokio)]
async fn main() -> io::Result<()> {
env_logger::init();
let flags = App::new("Hello Server")
@@ -83,12 +60,26 @@ fn main() {
.parse()
.unwrap_or_else(|e| panic!(r#"--port value "{}" invalid: {}"#, port, e));
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
let server_addr = ([0, 0, 0, 0], port).into();
tokio::run(
run(([0, 0, 0, 0], port).into())
.map_err(|e| eprintln!("Oh no: {}", e))
.boxed()
.compat(),
);
// bincode_transport is provided by the associated crate bincode-transport. It makes it easy
// to start up a serde-powered bincode serialization strategy over TCP.
json_transport::listen(&server_addr)?
// Ignore accept errors.
.filter_map(|r| future::ready(r.ok()))
.map(server::BaseChannel::with_defaults)
// Limit channels to 1 per IP.
.max_channels_per_key(1, |t| t.as_ref().peer_addr().unwrap().ip())
// serve is generated by the service! macro. It takes as input any type implementing
// the generated Service trait.
.map(|channel| {
let server = HelloServer(channel.as_ref().as_ref().peer_addr().unwrap());
channel.respond_with(service::serve(server))
})
// Max 10 channels.
.buffer_unordered(10)
.for_each(|_| futures::future::ready(()))
.await;
Ok(())
}

View File

@@ -18,7 +18,7 @@ futures_legacy = { version = "0.1", package = "futures" }
pin-utils = "0.1.0-alpha.4"
serde = "1.0"
serde_json = "1.0"
tokio = "0.1"
tokio = { version = "0.1", default-features = false, features = ["codec"] }
tokio-io = "0.1"
tokio-serde-json = "0.2"
tokio-tcp = "0.1"

View File

@@ -24,6 +24,8 @@ log = "0.4"
once_cell = "0.2"
pin-utils = "0.1.0-alpha.4"
rand = "0.7"
runtime = "0.3.0-alpha.6"
runtime-raw = "0.3.0-alpha.4"
tokio-timer = "0.2"
trace = { package = "tarpc-trace", version = "0.2", path = "../trace" }
serde = { optional = true, version = "1.0" }
@@ -31,6 +33,7 @@ serde = { optional = true, version = "1.0" }
[dev-dependencies]
futures-test-preview = { version = "0.3.0-alpha.17" }
env_logger = "0.6"
runtime-tokio = "0.3.0-alpha.5"
tokio = "0.1"
tokio-executor = "0.1"
assert_matches = "1.0"

View File

@@ -35,7 +35,7 @@ pub use crate::{client::Client, server::Server, transport::Transport};
use futures::{
task::{Poll, Spawn, SpawnError, SpawnExt},
Future,
Future, FutureExt,
};
use once_cell::sync::OnceCell;
use std::{cell::RefCell, io, time::SystemTime};
@@ -141,7 +141,11 @@ pub fn init(spawn: impl Spawn + Clone + Send + Sync + 'static) {
}
pub(crate) fn spawn(future: impl Future<Output = ()> + Send + 'static) -> Result<(), SpawnError> {
SPAWN.with(|spawn| spawn.borrow_mut().spawn(future))
if SEED_SPAWN.get().is_some() {
SPAWN.with(|spawn| spawn.borrow_mut().spawn(future))
} else {
runtime_raw::current_runtime().spawn_boxed(future.boxed())
}
}
trait CloneSpawn: Spawn {

View File

@@ -82,64 +82,40 @@ mod tests {
server::{Handler, Server},
transport,
};
use futures::compat::Executor01CompatExt;
use assert_matches::assert_matches;
use futures::{prelude::*, stream};
use log::trace;
use std::io;
#[test]
fn integration() {
#[runtime::test(runtime_tokio::Tokio)]
async fn integration() -> io::Result<()> {
let _ = env_logger::try_init();
crate::init(tokio::executor::DefaultExecutor::current().compat());
let (client_channel, server_channel) = transport::channel::unbounded();
let server = Server::<String, u64>::default()
.incoming(stream::once(future::ready(server_channel)))
.respond_with(|_ctx, request| {
future::ready(request.parse::<u64>().map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!("{:?} is not an int", request),
)
}))
});
crate::spawn(
Server::<String, u64>::default()
.incoming(stream::once(future::ready(server_channel)))
.respond_with(|_ctx, request| {
future::ready(request.parse::<u64>().map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!("{:?} is not an int", request),
)
}))
}),
)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let responses = async {
let mut client = client::new(client::Config::default(), client_channel).await?;
let mut client = client::new(client::Config::default(), client_channel).await?;
let response1 = client.call(context::current(), "123".into()).await;
let response2 = client.call(context::current(), "abc".into()).await;
Ok::<_, io::Error>((response1, response2))
};
let (response1, response2) = run_future(future::join(
server,
responses.unwrap_or_else(|e| panic!(e)),
))
.1;
let response1 = client.call(context::current(), "123".into()).await;
let response2 = client.call(context::current(), "abc".into()).await;
trace!("response1: {:?}, response2: {:?}", response1, response2);
assert!(response1.is_ok());
assert_eq!(response1.ok().unwrap(), 123);
assert_matches!(response1, Ok(123));
assert_matches!(response2, Err(ref e) if e.kind() == io::ErrorKind::InvalidInput);
assert!(response2.is_err());
assert_eq!(response2.err().unwrap().kind(), io::ErrorKind::InvalidInput);
}
fn run_future<F>(f: F) -> F::Output
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let (tx, rx) = futures::channel::oneshot::channel();
tokio::run(
f.map(|result| tx.send(result).unwrap_or_else(|_| unreachable!()))
.boxed()
.unit_error()
.compat(),
);
futures::executor::block_on(rx).unwrap()
Ok(())
}
}

View File

@@ -21,6 +21,7 @@ travis-ci = { repository = "google/tarpc" }
[dependencies]
futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] }
log = "0.4"
runtime = "0.3.0-alpha.6"
serde = { optional = true, version = "1.0" }
rpc = { package = "tarpc-lib", path = "../rpc", version = "0.6" }
tarpc-plugins = { path = "../plugins", version = "0.5.0" }
@@ -31,7 +32,6 @@ bytes = { version = "0.4", features = ["serde"] }
humantime = "1.0"
bincode-transport = { package = "tarpc-bincode-transport", version = "0.7", path = "../bincode-transport" }
env_logger = "0.6"
tokio = "0.1"
tokio-executor = "0.1"
runtime-tokio = "0.3.0-alpha.5"
tokio-tcp = "0.1"
pin-utils = "0.1.0-alpha.4"

View File

@@ -58,14 +58,11 @@ impl Subscriber {
let incoming = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?
.filter_map(|r| future::ready(r.ok()));
let addr = incoming.get_ref().local_addr();
tokio_executor::spawn(
let _ = runtime::spawn(
server::new(config)
.incoming(incoming)
.take(1)
.respond_with(subscriber::serve(Subscriber { id }))
.unit_error()
.boxed()
.compat(),
.respond_with(subscriber::serve(Subscriber { id })),
);
Ok(addr)
}
@@ -134,19 +131,18 @@ impl publisher::Service for Publisher {
}
}
async fn run() -> io::Result<()> {
#[runtime::main(runtime_tokio::Tokio)]
async fn main() -> io::Result<()> {
env_logger::init();
let transport = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?
.filter_map(|r| future::ready(r.ok()));
let publisher_addr = transport.get_ref().local_addr();
tokio_executor::spawn(
let _ = runtime::spawn(
transport
.take(1)
.map(server::BaseChannel::with_defaults)
.respond_with(publisher::serve(Publisher::new()))
.unit_error()
.boxed()
.compat(),
.respond_with(publisher::serve(Publisher::new())),
);
let subscriber1 = Subscriber::listen(0, server::Config::default()).await?;
@@ -177,10 +173,8 @@ async fn run() -> io::Result<()> {
publisher
.broadcast(context::current(), "hi again".to_string())
.await?;
thread::sleep(Duration::from_millis(100));
Ok(())
}
fn main() {
tokio::run(run().boxed().map_err(|e| panic!(e)).boxed().compat());
thread::sleep(Duration::from_millis(100));
}

View File

@@ -7,7 +7,6 @@
#![feature(async_await, proc_macro_hygiene)]
use futures::{
compat::Executor01CompatExt,
future::{self, Ready},
prelude::*,
};
@@ -40,7 +39,8 @@ impl Service for HelloServer {
}
}
async fn run() -> io::Result<()> {
#[runtime::main(runtime_tokio::Tokio)]
async fn main() -> io::Result<()> {
// bincode_transport is provided by the associated crate bincode-transport. It makes it easy
// to start up a serde-powered bincode serialization strategy over TCP.
let mut transport = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
@@ -58,7 +58,7 @@ async fn run() -> io::Result<()> {
// the generated Service trait.
.respond_with(serve(HelloServer));
tokio::spawn(server.unit_error().boxed().compat());
let _ = runtime::spawn(server);
let transport = bincode_transport::connect(&addr).await?;
@@ -76,14 +76,3 @@ async fn run() -> io::Result<()> {
Ok(())
}
fn main() {
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(
run()
.map_err(|e| eprintln!("Oh no: {}", e))
.boxed()
.compat(),
);
}

View File

@@ -8,7 +8,6 @@
use crate::{add::Service as AddService, double::Service as DoubleService};
use futures::{
compat::Executor01CompatExt,
future::{self, Ready},
prelude::*,
};
@@ -63,7 +62,10 @@ impl DoubleService for DoubleServer {
}
}
async fn run() -> io::Result<()> {
#[runtime::main(runtime_tokio::Tokio)]
async fn main() -> io::Result<()> {
env_logger::init();
let add_listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?
.filter_map(|r| future::ready(r.ok()));
let addr = add_listener.get_ref().local_addr();
@@ -71,7 +73,7 @@ async fn run() -> io::Result<()> {
.incoming(add_listener)
.take(1)
.respond_with(add::serve(AddServer));
tokio_executor::spawn(add_server.unit_error().boxed().compat());
let _ = runtime::spawn(add_server);
let to_add_server = bincode_transport::connect(&addr).await?;
let add_client = add::new_stub(client::Config::default(), to_add_server).await?;
@@ -83,7 +85,7 @@ async fn run() -> io::Result<()> {
.incoming(double_listener)
.take(1)
.respond_with(double::serve(DoubleServer { add_client }));
tokio_executor::spawn(double_server.unit_error().boxed().compat());
let _ = runtime::spawn(double_server);
let to_double_server = bincode_transport::connect(&addr).await?;
let mut double_client = double::new_stub(client::Config::default(), to_double_server).await?;
@@ -93,9 +95,3 @@ async fn run() -> io::Result<()> {
}
Ok(())
}
fn main() {
env_logger::init();
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(run().map_err(|e| panic!(e)).boxed().compat());
}

View File

@@ -232,7 +232,6 @@ mod registry {
// Example
use bytes::Bytes;
use futures::{
compat::Executor01CompatExt,
future::{ready, Ready},
prelude::*,
};
@@ -294,19 +293,6 @@ impl read_service::Service for Server {
}
}
trait DefaultSpawn {
fn spawn(self);
}
impl<F> DefaultSpawn for F
where
F: Future<Output = ()> + Send + 'static,
{
fn spawn(self) {
tokio_executor::spawn(self.unit_error().boxed().compat())
}
}
struct BincodeRegistry<Services> {
registry: registry::Registry<Services>,
}
@@ -365,7 +351,10 @@ where
registry::new_client(service_name, channel, serialize, deserialize)
}
async fn run() -> io::Result<()> {
#[runtime::main(runtime_tokio::Tokio)]
async fn main() -> io::Result<()> {
env_logger::init();
let server = Server::default();
let registry = BincodeRegistry::default()
.register(
@@ -384,7 +373,7 @@ async fn run() -> io::Result<()> {
.incoming(listener)
.take(1)
.respond_with(registry.serve());
tokio_executor::spawn(server.unit_error().boxed().compat());
let _ = runtime::spawn(server);
let transport = bincode_transport::connect(&server_addr).await?;
let channel = client::new(client::Config::default(), transport).await?;
@@ -405,8 +394,3 @@ async fn run() -> io::Result<()> {
Ok(())
}
fn main() {
tarpc::init(tokio::executor::DefaultExecutor::current().compat());
tokio::run(run().boxed().map_err(|e| panic!(e)).boxed().compat());
}

View File

@@ -280,13 +280,10 @@ mod syntax_test {
#[cfg(test)]
mod functional_test {
use futures::{
compat::Executor01CompatExt,
future::{ready, Ready},
prelude::*,
};
use rpc::{client, context, server::Handler, transport::channel};
use std::io;
use tokio::runtime::current_thread;
service! {
rpc add(x: i32, y: i32) -> i32;
@@ -310,66 +307,49 @@ mod functional_test {
}
}
#[test]
fn sequential() {
#[runtime::test(runtime_tokio::TokioCurrentThread)]
async fn sequential() {
let _ = env_logger::try_init();
rpc::init(tokio::executor::DefaultExecutor::current().compat());
let test = async {
let (tx, rx) = channel::unbounded();
tokio_executor::spawn(
crate::Server::default()
.incoming(stream::once(ready(rx)))
.respond_with(serve(Server))
.unit_error()
.boxed()
.compat(),
);
let (tx, rx) = channel::unbounded();
let _ = runtime::spawn(
crate::Server::default()
.incoming(stream::once(ready(rx)))
.respond_with(serve(Server)),
);
let mut client = new_stub(client::Config::default(), tx).await?;
assert_eq!(3, client.add(context::current(), 1, 2).await?);
assert_eq!(
"Hey, Tim.",
client.hey(context::current(), "Tim".to_string()).await?
);
Ok::<_, io::Error>(())
}
.map_err(|e| panic!(e.to_string()));
current_thread::block_on_all(test.boxed().compat()).unwrap();
let mut client = new_stub(client::Config::default(), tx).await.unwrap();
assert_eq!(3, client.add(context::current(), 1, 2).await.unwrap());
assert_eq!(
"Hey, Tim.",
client
.hey(context::current(), "Tim".to_string())
.await
.unwrap()
);
}
#[test]
fn concurrent() {
#[runtime::test(runtime_tokio::TokioCurrentThread)]
async fn concurrent() {
let _ = env_logger::try_init();
rpc::init(tokio::executor::DefaultExecutor::current().compat());
let test = async {
let (tx, rx) = channel::unbounded();
tokio_executor::spawn(
rpc::Server::default()
.incoming(stream::once(ready(rx)))
.respond_with(serve(Server))
.unit_error()
.boxed()
.compat(),
);
let (tx, rx) = channel::unbounded();
let _ = runtime::spawn(
rpc::Server::default()
.incoming(stream::once(ready(rx)))
.respond_with(serve(Server)),
);
let client = new_stub(client::Config::default(), tx).await?;
let mut c = client.clone();
let req1 = c.add(context::current(), 1, 2);
let mut c = client.clone();
let req2 = c.add(context::current(), 3, 4);
let mut c = client.clone();
let req3 = c.hey(context::current(), "Tim".to_string());
let client = new_stub(client::Config::default(), tx).await.unwrap();
let mut c = client.clone();
let req1 = c.add(context::current(), 1, 2);
let mut c = client.clone();
let req2 = c.add(context::current(), 3, 4);
let mut c = client.clone();
let req3 = c.hey(context::current(), "Tim".to_string());
assert_eq!(3, req1.await?);
assert_eq!(7, req2.await?);
assert_eq!("Hey, Tim.", req3.await?);
Ok::<_, io::Error>(())
}
.map_err(|e| panic!("test failed: {}", e));
current_thread::block_on_all(test.boxed().compat()).unwrap();
assert_eq!(3, req1.await.unwrap());
assert_eq!(7, req2.await.unwrap());
assert_eq!("Hey, Tim.", req3.await.unwrap());
}
}