Merge pull request #70 from tikue/master

Make spawn_core private
This commit is contained in:
Adam Wright
2017-01-11 21:00:15 -08:00
committed by GitHub
3 changed files with 39 additions and 25 deletions

View File

@@ -3,7 +3,7 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms.
#![feature(inclusive_range_syntax, conservative_impl_trait, plugin)]
#![feature(inclusive_range_syntax, conservative_impl_trait, plugin, never_type)]
#![plugin(tarpc_plugins)]
extern crate chrono;
@@ -20,12 +20,12 @@ extern crate futures_cpupool;
use clap::{Arg, App};
use futures::{Future, Stream};
use futures_cpupool::{CpuFuture, CpuPool};
use std::cmp;
use std::sync::Arc;
use std::{cmp, thread};
use std::sync::{Arc, mpsc};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use tarpc::future::{Connect};
use tarpc::util::{FirstSocketAddr, Never, spawn_core};
use tarpc::util::{FirstSocketAddr, Never};
use tokio_core::reactor;
service! {
@@ -88,6 +88,19 @@ struct Stats {
max: Option<Duration>,
}
/// Spawns a `reactor::Core` running forever on a new thread.
fn spawn_core() -> reactor::Remote {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut core = reactor::Core::new().unwrap();
tx.send(core.handle().remote().clone()).unwrap();
// Run forever
core.run(futures::empty::<(), !>()).unwrap();
});
rx.recv().unwrap()
}
fn run_once(clients: Vec<FutureClient>, concurrency: u32) -> impl Future<Item=(), Error=()> + 'static {
let start = Instant::now();
let num_clients = clients.len();

View File

@@ -121,13 +121,29 @@ pub mod sync {
/// Utility specific to futures implementation.
pub mod future {
pub use client::future::*;
use tokio_core::reactor::Remote;
use util;
use futures;
use tokio_core::reactor;
use std::thread;
use std::sync::mpsc;
lazy_static! {
/// The `Remote` for the default reactor core.
pub static ref REMOTE: Remote = {
util::spawn_core()
pub static ref REMOTE: reactor::Remote = {
spawn_core()
};
}
/// Spawns a `reactor::Core` running forever on a new thread.
fn spawn_core() -> reactor::Remote {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut core = reactor::Core::new().unwrap();
tx.send(core.handle().remote().clone()).unwrap();
// Run forever
core.run(futures::empty::<(), !>()).unwrap();
});
rx.recv().unwrap()
}
}

View File

@@ -3,14 +3,12 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms.
use futures::{self, Future, Poll};
use futures::{Future, Poll};
use futures::stream::Stream;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::error::Error;
use std::{fmt, io, thread};
use std::{fmt, io};
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::mpsc;
use tokio_core::reactor;
/// A bottom type that impls `Error`, `Serialize`, and `Deserialize`. It is impossible to
/// instantiate this type.
@@ -119,19 +117,6 @@ pub trait FirstSocketAddr: ToSocketAddrs {
impl<A: ToSocketAddrs> FirstSocketAddr for A {}
/// Spawns a `reactor::Core` running forever on a new thread.
pub fn spawn_core() -> reactor::Remote {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut core = reactor::Core::new().unwrap();
tx.send(core.handle().remote().clone()).unwrap();
// Run forever
core.run(futures::empty::<(), !>()).unwrap();
});
rx.recv().unwrap()
}
/// A struct that will format as the contained type if the type impls Debug.
pub struct Debugger<'a, T: 'a>(pub &'a T);