diff --git a/examples/concurrency.rs b/examples/concurrency.rs index 5c6ddf9..ab21a7d 100644 --- a/examples/concurrency.rs +++ b/examples/concurrency.rs @@ -3,7 +3,7 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -#![feature(inclusive_range_syntax, conservative_impl_trait, plugin)] +#![feature(inclusive_range_syntax, conservative_impl_trait, plugin, never_type)] #![plugin(tarpc_plugins)] extern crate chrono; @@ -20,12 +20,12 @@ extern crate futures_cpupool; use clap::{Arg, App}; use futures::{Future, Stream}; use futures_cpupool::{CpuFuture, CpuPool}; -use std::cmp; -use std::sync::Arc; +use std::{cmp, thread}; +use std::sync::{Arc, mpsc}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::{Duration, Instant}; use tarpc::future::{Connect}; -use tarpc::util::{FirstSocketAddr, Never, spawn_core}; +use tarpc::util::{FirstSocketAddr, Never}; use tokio_core::reactor; service! { @@ -88,6 +88,19 @@ struct Stats { max: Option, } +/// Spawns a `reactor::Core` running forever on a new thread. +fn spawn_core() -> reactor::Remote { + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + let mut core = reactor::Core::new().unwrap(); + tx.send(core.handle().remote().clone()).unwrap(); + + // Run forever + core.run(futures::empty::<(), !>()).unwrap(); + }); + rx.recv().unwrap() +} + fn run_once(clients: Vec, concurrency: u32) -> impl Future + 'static { let start = Instant::now(); let num_clients = clients.len(); diff --git a/src/lib.rs b/src/lib.rs index 4c1d3a1..41a2992 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -121,13 +121,29 @@ pub mod sync { /// Utility specific to futures implementation. pub mod future { pub use client::future::*; - use tokio_core::reactor::Remote; - use util; + use futures; + use tokio_core::reactor; + use std::thread; + use std::sync::mpsc; lazy_static! { /// The `Remote` for the default reactor core. - pub static ref REMOTE: Remote = { - util::spawn_core() + pub static ref REMOTE: reactor::Remote = { + spawn_core() }; } + + /// Spawns a `reactor::Core` running forever on a new thread. + fn spawn_core() -> reactor::Remote { + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + let mut core = reactor::Core::new().unwrap(); + tx.send(core.handle().remote().clone()).unwrap(); + + // Run forever + core.run(futures::empty::<(), !>()).unwrap(); + }); + rx.recv().unwrap() + } + } diff --git a/src/util.rs b/src/util.rs index e55c8f6..64a4113 100644 --- a/src/util.rs +++ b/src/util.rs @@ -3,14 +3,12 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use futures::{self, Future, Poll}; +use futures::{Future, Poll}; use futures::stream::Stream; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::error::Error; -use std::{fmt, io, thread}; +use std::{fmt, io}; use std::net::{SocketAddr, ToSocketAddrs}; -use std::sync::mpsc; -use tokio_core::reactor; /// A bottom type that impls `Error`, `Serialize`, and `Deserialize`. It is impossible to /// instantiate this type. @@ -119,19 +117,6 @@ pub trait FirstSocketAddr: ToSocketAddrs { impl FirstSocketAddr for A {} -/// Spawns a `reactor::Core` running forever on a new thread. -pub fn spawn_core() -> reactor::Remote { - let (tx, rx) = mpsc::channel(); - thread::spawn(move || { - let mut core = reactor::Core::new().unwrap(); - tx.send(core.handle().remote().clone()).unwrap(); - - // Run forever - core.run(futures::empty::<(), !>()).unwrap(); - }); - rx.recv().unwrap() -} - /// A struct that will format as the contained type if the type impls Debug. pub struct Debugger<'a, T: 'a>(pub &'a T);