diff --git a/Cargo.lock b/Cargo.lock index 519b3c4d1..f7cccfb18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -918,6 +918,7 @@ dependencies = [ "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "parity-bytes 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-crypto 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "parity-runtime 0.1.0", "parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/parity/run.rs b/parity/run.rs index 7ee0250d6..3a7299df3 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -739,7 +739,7 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: account_provider: account_provider, accounts_passwords: &passwords, }; - let secretstore_key_server = secretstore::start(cmd.secretstore_conf.clone(), secretstore_deps)?; + let secretstore_key_server = secretstore::start(cmd.secretstore_conf.clone(), secretstore_deps, runtime.executor())?; // the ipfs server let ipfs_server = ipfs::start_server(cmd.ipfs_conf.clone(), client.clone())?; diff --git a/parity/secretstore.rs b/parity/secretstore.rs index 1e322e3ba..80a3c76f8 100644 --- a/parity/secretstore.rs +++ b/parity/secretstore.rs @@ -24,6 +24,7 @@ use ethcore::miner::Miner; use ethkey::{Secret, Public}; use sync::SyncProvider; use ethereum_types::Address; +use parity_runtime::Executor; /// This node secret key. #[derive(Debug, PartialEq, Clone)] @@ -100,14 +101,14 @@ pub struct Dependencies<'a> { #[cfg(not(feature = "secretstore"))] mod server { - use super::{Configuration, Dependencies}; + use super::{Configuration, Dependencies, Executor}; /// Noop key server implementation pub struct KeyServer; impl KeyServer { /// Create new noop key server - pub fn new(_conf: Configuration, _deps: Dependencies) -> Result { + pub fn new(_conf: Configuration, _deps: Dependencies, _executor: Executor) -> Result { Ok(KeyServer) } } @@ -120,7 +121,7 @@ mod server { use ethkey::KeyPair; use ansi_term::Colour::{Red, White}; use db; - use super::{Configuration, Dependencies, NodeSecretKey, ContractAddress}; + use super::{Configuration, Dependencies, NodeSecretKey, ContractAddress, Executor}; fn into_service_contract_address(address: ContractAddress) -> ethcore_secretstore::ContractAddress { match address { @@ -136,7 +137,7 @@ mod server { impl KeyServer { /// Create new key server - pub fn new(mut conf: Configuration, deps: Dependencies) -> Result { + pub fn new(mut conf: Configuration, deps: Dependencies, executor: Executor) -> Result { let self_secret: Arc = match conf.self_secret.take() { Some(NodeSecretKey::Plain(secret)) => Arc::new(ethcore_secretstore::PlainNodeKeyPair::new( KeyPair::from_secret(secret).map_err(|e| format!("invalid secret: {}", e))?)), @@ -179,7 +180,6 @@ mod server { service_contract_doc_sretr_address: conf.service_contract_doc_sretr_address.map(into_service_contract_address), acl_check_contract_address: conf.acl_check_contract_address.map(into_service_contract_address), cluster_config: ethcore_secretstore::ClusterConfiguration { - threads: 4, listener_address: ethcore_secretstore::NodeAddress { address: conf.interface.clone(), port: conf.port, @@ -198,7 +198,7 @@ mod server { cconf.cluster_config.nodes.insert(self_secret.public().clone(), cconf.cluster_config.listener_address.clone()); let db = db::open_secretstore_db(&conf.data_path)?; - let key_server = ethcore_secretstore::start(deps.client, deps.sync, deps.miner, self_secret, cconf, db) + let key_server = ethcore_secretstore::start(deps.client, deps.sync, deps.miner, self_secret, cconf, db, executor) .map_err(|e| format!("Error starting KeyServer {}: {}", key_server_name, e))?; Ok(KeyServer { @@ -238,11 +238,11 @@ impl Default for Configuration { } /// Start secret store-related functionality -pub fn start(conf: Configuration, deps: Dependencies) -> Result, String> { +pub fn start(conf: Configuration, deps: Dependencies, executor: Executor) -> Result, String> { if !conf.enabled { return Ok(None); } - KeyServer::new(conf, deps) + KeyServer::new(conf, deps, executor) .map(|s| Some(s)) } diff --git a/secret_store/Cargo.toml b/secret_store/Cargo.toml index 15ce83e53..ee379acc5 100644 --- a/secret_store/Cargo.toml +++ b/secret_store/Cargo.toml @@ -17,6 +17,7 @@ futures = "0.1" rustc-hex = "1.0" tiny-keccak = "1.4" tokio = "~0.1.11" +parity-runtime = { path = "../util/runtime" } tokio-io = "0.1" tokio-service = "0.1" url = "1.0" diff --git a/secret_store/src/key_server.rs b/secret_store/src/key_server.rs index ea88f46b9..93baba111 100644 --- a/secret_store/src/key_server.rs +++ b/secret_store/src/key_server.rs @@ -15,14 +15,11 @@ // along with Parity. If not, see . use std::collections::BTreeSet; -use std::thread; use std::sync::Arc; -use std::sync::mpsc; -use futures::{self, Future}; use parking_lot::Mutex; -use tokio::runtime; use crypto::DEFAULT_MAC; use ethkey::crypto; +use parity_runtime::Executor; use super::acl_storage::AclStorage; use super::key_storage::KeyStorage; use super::key_server_set::KeyServerSet; @@ -39,16 +36,16 @@ pub struct KeyServerImpl { /// Secret store key server data. pub struct KeyServerCore { - close: Option>, - handle: Option>, cluster: Arc, } impl KeyServerImpl { /// Create new key server instance - pub fn new(config: &ClusterConfiguration, key_server_set: Arc, self_key_pair: Arc, acl_storage: Arc, key_storage: Arc) -> Result { + pub fn new(config: &ClusterConfiguration, key_server_set: Arc, self_key_pair: Arc, + acl_storage: Arc, key_storage: Arc, executor: Executor) -> Result + { Ok(KeyServerImpl { - data: Arc::new(Mutex::new(KeyServerCore::new(config, key_server_set, self_key_pair, acl_storage, key_storage)?)), + data: Arc::new(Mutex::new(KeyServerCore::new(config, key_server_set, self_key_pair, acl_storage, key_storage, executor)?)), }) } @@ -175,9 +172,10 @@ impl MessageSigner for KeyServerImpl { } impl KeyServerCore { - pub fn new(config: &ClusterConfiguration, key_server_set: Arc, self_key_pair: Arc, acl_storage: Arc, key_storage: Arc) -> Result { + pub fn new(config: &ClusterConfiguration, key_server_set: Arc, self_key_pair: Arc, + acl_storage: Arc, key_storage: Arc, executor: Executor) -> Result + { let config = NetClusterConfiguration { - threads: config.threads, self_key_pair: self_key_pair.clone(), listen_address: (config.listener_address.address.clone(), config.listener_address.port), key_server_set: key_server_set, @@ -188,45 +186,16 @@ impl KeyServerCore { auto_migrate_enabled: config.auto_migrate_enabled, }; - let (stop, stopped) = futures::oneshot(); - let (tx, rx) = mpsc::channel(); - let handle = thread::Builder::new().name("KeyServerLoop".into()).spawn(move || { - let runtime_res = runtime::Builder::new() - .core_threads(config.threads) - .build(); - - let mut el = match runtime_res { - Ok(el) => el, - Err(e) => { - tx.send(Err(Error::Internal(format!("error initializing event loop: {}", e)))).expect("Rx is blocking upper thread."); - return; - }, - }; - - let cluster = ClusterCore::new(el.executor(), config); - let cluster_client = cluster.and_then(|c| c.run().map(|_| c.client())); - tx.send(cluster_client.map_err(Into::into)).expect("Rx is blocking upper thread."); - let _ = el.block_on(futures::empty().select(stopped)); - - trace!(target: "secretstore_net", "{}: KeyServerLoop thread stopped", self_key_pair.public()); - }).map_err(|e| Error::Internal(format!("{}", e)))?; - let cluster = rx.recv().map_err(|e| Error::Internal(format!("error initializing event loop: {}", e)))??; + let cluster = ClusterCore::new(executor, config) + .and_then(|c| c.run().map(|_| c.client())) + .map_err(|err| Error::from(err))?; Ok(KeyServerCore { - close: Some(stop), - handle: Some(handle), - cluster: cluster, + cluster, }) } } -impl Drop for KeyServerCore { - fn drop(&mut self) { - self.close.take().map(|v| v.send(())); - self.handle.take().map(|h| h.join()); - } -} - #[cfg(test)] pub mod tests { use std::collections::BTreeSet; @@ -243,6 +212,7 @@ pub mod tests { use key_server_set::tests::MapKeyServerSet; use key_server_cluster::math; use ethereum_types::{H256, H520}; + use parity_runtime::Runtime; use types::{Error, Public, ClusterConfiguration, NodeAddress, RequestSignature, ServerKeyId, EncryptedDocumentKey, EncryptedDocumentKeyShadow, MessageHash, EncryptedMessageSignature, Requester, NodeId}; @@ -294,10 +264,9 @@ pub mod tests { } } - fn make_key_servers(start_port: u16, num_nodes: usize) -> (Vec, Vec>) { + fn make_key_servers(start_port: u16, num_nodes: usize) -> (Vec, Vec>, Runtime) { let key_pairs: Vec<_> = (0..num_nodes).map(|_| Random.generate().unwrap()).collect(); let configs: Vec<_> = (0..num_nodes).map(|i| ClusterConfiguration { - threads: 1, listener_address: NodeAddress { address: "127.0.0.1".into(), port: start_port + (i as u16), @@ -316,11 +285,12 @@ pub mod tests { .map(|(k, a)| (k.clone(), format!("{}:{}", a.address, a.port).parse().unwrap())) .collect(); let key_storages = (0..num_nodes).map(|_| Arc::new(DummyKeyStorage::default())).collect::>(); + let runtime = Runtime::with_thread_count(4); let key_servers: Vec<_> = configs.into_iter().enumerate().map(|(i, cfg)| KeyServerImpl::new(&cfg, Arc::new(MapKeyServerSet::new(false, key_servers_set.clone())), Arc::new(PlainNodeKeyPair::new(key_pairs[i].clone())), Arc::new(DummyAclStorage::default()), - key_storages[i].clone()).unwrap() + key_storages[i].clone(), runtime.executor()).unwrap() ).collect(); // wait until connections are established. It is fast => do not bother with events here @@ -350,13 +320,13 @@ pub mod tests { } } - (key_servers, key_storages) + (key_servers, key_storages, runtime) } #[test] fn document_key_generation_and_retrievement_works_over_network_with_single_node() { //::logger::init_log(); - let (key_servers, _) = make_key_servers(6070, 1); + let (key_servers, _, runtime) = make_key_servers(6070, 1); // generate document key let threshold = 0; @@ -372,12 +342,13 @@ pub mod tests { let retrieved_key = crypto::ecies::decrypt(&secret, &DEFAULT_MAC, &retrieved_key).unwrap(); assert_eq!(retrieved_key, generated_key); } + drop(runtime); } #[test] fn document_key_generation_and_retrievement_works_over_network_with_3_nodes() { //::logger::init_log(); - let (key_servers, key_storages) = make_key_servers(6080, 3); + let (key_servers, key_storages, runtime) = make_key_servers(6080, 3); let test_cases = [0, 1, 2]; for threshold in &test_cases { @@ -399,12 +370,13 @@ pub mod tests { assert!(key_share.encrypted_point.is_some()); } } + drop(runtime); } #[test] fn server_key_generation_and_storing_document_key_works_over_network_with_3_nodes() { //::logger::init_log(); - let (key_servers, _) = make_key_servers(6090, 3); + let (key_servers, _, runtime) = make_key_servers(6090, 3); let test_cases = [0, 1, 2]; for threshold in &test_cases { @@ -430,12 +402,13 @@ pub mod tests { assert_eq!(retrieved_key, generated_key); } } + drop(runtime); } #[test] fn server_key_generation_and_message_signing_works_over_network_with_3_nodes() { //::logger::init_log(); - let (key_servers, _) = make_key_servers(6100, 3); + let (key_servers, _, runtime) = make_key_servers(6100, 3); let test_cases = [0, 1, 2]; for threshold in &test_cases { @@ -455,12 +428,13 @@ pub mod tests { // check signature assert_eq!(math::verify_schnorr_signature(&server_public, &(signature_c, signature_s), &message_hash), Ok(true)); } + drop(runtime); } #[test] fn decryption_session_is_delegated_when_node_does_not_have_key_share() { //::logger::init_log(); - let (key_servers, _) = make_key_servers(6110, 3); + let (key_servers, _, runtime) = make_key_servers(6110, 3); // generate document key let threshold = 0; @@ -477,12 +451,13 @@ pub mod tests { let retrieved_key = key_servers[0].restore_document_key(&document, &signature.into()).unwrap(); let retrieved_key = crypto::ecies::decrypt(&secret, &DEFAULT_MAC, &retrieved_key).unwrap(); assert_eq!(retrieved_key, generated_key); + drop(runtime); } #[test] fn schnorr_signing_session_is_delegated_when_node_does_not_have_key_share() { //::logger::init_log(); - let (key_servers, _) = make_key_servers(6114, 3); + let (key_servers, _, runtime) = make_key_servers(6114, 3); let threshold = 1; // generate server key @@ -503,12 +478,13 @@ pub mod tests { // check signature assert_eq!(math::verify_schnorr_signature(&server_public, &(signature_c, signature_s), &message_hash), Ok(true)); + drop(runtime); } #[test] fn ecdsa_signing_session_is_delegated_when_node_does_not_have_key_share() { //::logger::init_log(); - let (key_servers, _) = make_key_servers(6117, 4); + let (key_servers, _, runtime) = make_key_servers(6117, 4); let threshold = 1; // generate server key @@ -528,6 +504,7 @@ pub mod tests { // check signature assert!(verify_public(&server_public, &signature.into(), &message_hash).unwrap()); + drop(runtime); } #[test] diff --git a/secret_store/src/key_server_cluster/client_sessions/generation_session.rs b/secret_store/src/key_server_cluster/client_sessions/generation_session.rs index 2368635df..8984f61ff 100644 --- a/secret_store/src/key_server_cluster/client_sessions/generation_session.rs +++ b/secret_store/src/key_server_cluster/client_sessions/generation_session.rs @@ -1367,12 +1367,12 @@ pub mod tests { let clusters_clone = clusters.clone(); // establish connections - loop_until(&mut core, CONN_TIMEOUT, move || clusters_clone.iter().all(all_connections_established)); + loop_until(&core.executor(), CONN_TIMEOUT, move || clusters_clone.iter().all(all_connections_established)); // run session to completion let session_id = SessionId::default(); let session = clusters[0].client().new_generation_session(session_id, Default::default(), Default::default(), threshold).unwrap(); - loop_until(&mut core, SESSION_TIMEOUT, move || session.joint_public_and_secret().is_some()); + loop_until(&core.executor(), SESSION_TIMEOUT, move || session.joint_public_and_secret().is_some()); } } diff --git a/secret_store/src/key_server_cluster/cluster.rs b/secret_store/src/key_server_cluster/cluster.rs index 9607d9173..61fb2bcec 100644 --- a/secret_store/src/key_server_cluster/cluster.rs +++ b/secret_store/src/key_server_cluster/cluster.rs @@ -24,11 +24,11 @@ use std::net::{SocketAddr, IpAddr}; use futures::{future, Future, Stream}; use parking_lot::{Mutex, RwLock}; use tokio_io::IoFuture; -use tokio::runtime::TaskExecutor; use tokio::timer::{Interval, timeout::Error as TimeoutError}; use tokio::net::{TcpListener, TcpStream}; use ethkey::{Public, KeyPair, Signature, Random, Generator}; use ethereum_types::{Address, H256}; +use parity_runtime::Executor; use key_server_cluster::{Error, NodeId, SessionId, Requester, AclStorage, KeyStorage, KeyServerSet, NodeKeyPair}; use key_server_cluster::cluster_sessions::{ClusterSession, AdminSession, ClusterSessions, SessionIdWithSubSession, ClusterSessionsContainer, SERVERS_SET_CHANGE_SESSION_ID, create_cluster_view, AdminSessionCreationData, ClusterSessionsListener}; @@ -121,8 +121,6 @@ pub trait Cluster: Send + Sync { /// Cluster initialization parameters. #[derive(Clone)] pub struct ClusterConfiguration { - /// Number of threads reserved by cluster. - pub threads: usize, /// Allow connecting to 'higher' nodes. pub allow_connecting_to_higher_nodes: bool, /// KeyPair this node holds. @@ -175,14 +173,14 @@ pub struct ClusterData { /// Cluster configuration. pub config: ClusterConfiguration, /// Handle to the event loop. - pub executor: TaskExecutor, + pub executor: Executor, /// KeyPair this node holds. pub self_key_pair: Arc, /// Connections data. pub connections: ClusterConnections, /// Active sessions data. pub sessions: ClusterSessions, - /// Shutdown flag: + /// A shutdown flag. pub is_shutdown: Arc, } @@ -235,7 +233,7 @@ pub struct Connection { } impl ClusterCore { - pub fn new(executor: TaskExecutor, config: ClusterConfiguration) -> Result, Error> { + pub fn new(executor: Executor, config: ClusterConfiguration) -> Result, Error> { let listen_address = make_socket_address(&config.listen_address.0, config.listen_address.1)?; let connections = ClusterConnections::new(&config)?; let servers_set_change_creator_connector = connections.connector.clone(); @@ -790,7 +788,7 @@ impl ClusterConnections { } impl ClusterData { - pub fn new(executor: &TaskExecutor, config: ClusterConfiguration, connections: ClusterConnections, sessions: ClusterSessions) -> Arc { + pub fn new(executor: &Executor, config: ClusterConfiguration, connections: ClusterConnections, sessions: ClusterSessions) -> Arc { Arc::new(ClusterData { executor: executor.clone(), self_key_pair: config.self_key_pair.clone(), @@ -807,12 +805,6 @@ impl ClusterData { } /// Spawns a future on the runtime. - // - // TODO: Consider implementing a more graceful shutdown process using an - // `AtomicBool`, etc. which would prevent tasks from being spawned after a - // shutdown signal is given. (Recursive calls, in - // `process_connection_messages` for example, appear to continue - // indefinitely.) pub fn spawn(&self, f: F) where F: Future + Send + 'static { if self.is_shutdown.load(Ordering::Acquire) == false { if let Err(err) = future::Executor::execute(&self.executor, Box::new(f)) { @@ -1139,9 +1131,12 @@ pub mod tests { use std::collections::{BTreeSet, VecDeque}; use parking_lot::RwLock; use tokio::{ - runtime::{Runtime, Builder as RuntimeBuilder}, prelude::{future, Future}, }; + use parity_runtime::{ + futures::sync::oneshot, + Runtime, Executor, + }; use ethereum_types::{Address, H256}; use ethkey::{Random, Generator, Public, Signature, sign}; use key_server_cluster::{NodeId, SessionId, Requester, Error, DummyAclStorage, DummyKeyStorage, @@ -1263,16 +1258,18 @@ pub mod tests { } } - /// Loops until `predicate` returns `true` or `timeout` has elapsed. - pub fn loop_until(runtime: &mut Runtime, timeout: Duration, predicate: F) + /// Blocks the calling thread, looping until `predicate` returns `true` or + /// `timeout` has elapsed. + pub fn loop_until(executor: &Executor, timeout: Duration, predicate: F) where F: Send + 'static + Fn() -> bool { use futures::Stream; use tokio::timer::Interval; let start = Instant::now(); + let (complete_tx, complete_rx) = oneshot::channel(); - runtime.block_on(Interval::new_interval(Duration::from_millis(1)) + executor.spawn(Interval::new_interval(Duration::from_millis(1)) .and_then(move |_| { if Instant::now() - start > timeout { panic!("no result in {:?}", timeout); @@ -1282,8 +1279,13 @@ pub mod tests { }) .take_while(move |_| future::ok(!predicate())) .for_each(|_| Ok(())) - .then(|_| future::ok::<(), ()>(())) - ).unwrap(); + .then(|_| { + complete_tx.send(()).expect("receiver dropped"); + future::ok::<(), ()>(()) + }) + ); + + complete_rx.wait().unwrap(); } pub fn all_connections_established(cluster: &Arc) -> bool { @@ -1295,7 +1297,6 @@ pub mod tests { pub fn make_clusters(runtime: &Runtime, ports_begin: u16, num_nodes: usize) -> Vec> { let key_pairs: Vec<_> = (0..num_nodes).map(|_| Random.generate().unwrap()).collect(); let cluster_params: Vec<_> = (0..num_nodes).map(|i| ClusterConfiguration { - threads: 1, self_key_pair: Arc::new(PlainNodeKeyPair::new(key_pairs[i].clone())), listen_address: ("127.0.0.1".to_owned(), ports_begin + i as u16), key_server_set: Arc::new(MapKeyServerSet::new(false, key_pairs.iter().enumerate() @@ -1331,21 +1332,17 @@ pub mod tests { /// Returns a new runtime with a static number of threads. pub fn new_runtime() -> Runtime { - RuntimeBuilder::new() - .core_threads(4) - .build() - .expect("Unable to create tokio runtime") + Runtime::with_thread_count(4) } #[test] fn cluster_connects_to_other_nodes() { - let mut runtime = new_runtime(); + let runtime = new_runtime(); let clusters = make_clusters(&runtime, 6010, 3); run_clusters(&clusters); let clusters_clone = clusters.clone(); - loop_until(&mut runtime, TIMEOUT, move || clusters_clone.iter().all(all_connections_established)); + loop_until(&runtime.executor(), TIMEOUT, move || clusters_clone.iter().all(all_connections_established)); shutdown_clusters(&clusters); - runtime.shutdown_now().wait().unwrap(); } #[test] @@ -1359,17 +1356,16 @@ pub mod tests { _ => panic!("unexpected success"), } shutdown_clusters(&clusters); - runtime.shutdown_now().wait().unwrap(); } #[test] fn error_in_generation_session_broadcasted_to_all_other_nodes() { //::logger::init_log(); - let mut runtime = new_runtime(); + let runtime = new_runtime(); let clusters = make_clusters(&runtime, 6016, 3); run_clusters(&clusters); let clusters_clone = clusters.clone(); - loop_until(&mut runtime, TIMEOUT, move || clusters_clone.iter().all(all_connections_established)); + loop_until(&runtime.executor(), TIMEOUT, move || clusters_clone.iter().all(all_connections_established)); // ask one of nodes to produce faulty generation sessions clusters[1].client().make_faulty_generation_sessions(); @@ -1378,7 +1374,7 @@ pub mod tests { let session = clusters[0].client().new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap(); let session_clone = session.clone(); let clusters_clone = clusters.clone(); - loop_until(&mut runtime, TIMEOUT, move || session_clone.joint_public_and_secret().is_some() + loop_until(&runtime.executor(), TIMEOUT, move || session_clone.joint_public_and_secret().is_some() && clusters_clone[0].client().generation_session(&SessionId::default()).is_none()); assert!(session.joint_public_and_secret().unwrap().is_err()); @@ -1389,24 +1385,22 @@ pub mod tests { let clusters_clone = clusters.clone(); // wait for both session completion && session removal (session completion event is fired // before session is removed from its own container by cluster) - loop_until(&mut runtime, TIMEOUT, move || session_clone.joint_public_and_secret().is_some() + loop_until(&runtime.executor(), TIMEOUT, move || session_clone.joint_public_and_secret().is_some() && clusters_clone[i].client().generation_session(&SessionId::default()).is_none()); assert!(session.joint_public_and_secret().unwrap().is_err()); } } shutdown_clusters(&clusters); - runtime.shutdown_now().wait().unwrap(); } #[test] fn generation_session_completion_signalled_if_failed_on_master() { //::logger::init_log(); - let mut runtime = new_runtime(); - + let runtime = new_runtime(); let clusters = make_clusters(&runtime, 6025, 3); run_clusters(&clusters); let clusters_clone = clusters.clone(); - loop_until(&mut runtime, TIMEOUT, move || clusters_clone.iter().all(all_connections_established)); + loop_until(&runtime.executor(), TIMEOUT, move || clusters_clone.iter().all(all_connections_established)); // ask one of nodes to produce faulty generation sessions clusters[0].client().make_faulty_generation_sessions(); @@ -1415,7 +1409,7 @@ pub mod tests { let session = clusters[0].client().new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap(); let session_clone = session.clone(); let clusters_clone = clusters.clone(); - loop_until(&mut runtime, TIMEOUT, move || session_clone.joint_public_and_secret().is_some() + loop_until(&runtime.executor(), TIMEOUT, move || session_clone.joint_public_and_secret().is_some() && clusters_clone[0].client().generation_session(&SessionId::default()).is_none()); assert!(session.joint_public_and_secret().unwrap().is_err()); @@ -1426,29 +1420,28 @@ pub mod tests { let clusters_clone = clusters.clone(); // wait for both session completion && session removal (session completion event is fired // before session is removed from its own container by cluster) - loop_until(&mut runtime, TIMEOUT, move || session_clone.joint_public_and_secret().is_some() + loop_until(&runtime.executor(), TIMEOUT, move || session_clone.joint_public_and_secret().is_some() && clusters_clone[i].client().generation_session(&SessionId::default()).is_none()); assert!(session.joint_public_and_secret().unwrap().is_err()); } } shutdown_clusters(&clusters); - runtime.shutdown_now().wait().unwrap(); } #[test] fn generation_session_is_removed_when_succeeded() { //::logger::init_log(); - let mut runtime = new_runtime(); + let runtime = new_runtime(); let clusters = make_clusters(&runtime, 6019, 3); run_clusters(&clusters); let clusters_clone = clusters.clone(); - loop_until(&mut runtime, TIMEOUT, move || clusters_clone.iter().all(all_connections_established)); + loop_until(&runtime.executor(), TIMEOUT, move || clusters_clone.iter().all(all_connections_established)); // start && wait for generation session to complete let session = clusters[0].client().new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap(); let session_clone = session.clone(); let clusters_clone = clusters.clone(); - loop_until(&mut runtime, TIMEOUT, move || (session_clone.state() == GenerationSessionState::Finished + loop_until(&runtime.executor(), TIMEOUT, move || (session_clone.state() == GenerationSessionState::Finished || session_clone.state() == GenerationSessionState::Failed) && clusters_clone[0].client().generation_session(&SessionId::default()).is_none()); assert!(session.joint_public_and_secret().unwrap().is_ok()); @@ -1462,22 +1455,21 @@ pub mod tests { // AND check that it is actually removed from cluster sessions let session_clone = session.clone(); let clusters_clone = clusters.clone(); - loop_until(&mut runtime, TIMEOUT, move || (session_clone.state() == GenerationSessionState::Finished + loop_until(&runtime.executor(), TIMEOUT, move || (session_clone.state() == GenerationSessionState::Finished || session_clone.state() == GenerationSessionState::Failed) && clusters_clone[i].client().generation_session(&SessionId::default()).is_none()); } } shutdown_clusters(&clusters); - runtime.shutdown_now().wait().unwrap(); } #[test] fn sessions_are_removed_when_initialization_fails() { - let mut runtime = new_runtime(); + let runtime = new_runtime(); let clusters = make_clusters(&runtime, 6022, 3); run_clusters(&clusters); let clusters_clone = clusters.clone(); - loop_until(&mut runtime, TIMEOUT, move || clusters_clone.iter().all(all_connections_established)); + loop_until(&runtime.executor(), TIMEOUT, move || clusters_clone.iter().all(all_connections_established)); // generation session { @@ -1506,7 +1498,6 @@ pub mod tests { assert!(clusters[0].data.sessions.negotiation_sessions.is_empty()); } shutdown_clusters(&clusters); - runtime.shutdown_now().wait().unwrap(); } // test ignored because of @@ -1516,17 +1507,17 @@ pub mod tests { #[ignore] fn schnorr_signing_session_completes_if_node_does_not_have_a_share() { //::logger::init_log(); - let mut runtime = new_runtime(); + let runtime = new_runtime(); let clusters = make_clusters(&runtime, 6028, 3); run_clusters(&clusters); let clusters_clone = clusters.clone(); - loop_until(&mut runtime, TIMEOUT, move || clusters_clone.iter().all(all_connections_established)); + loop_until(&runtime.executor(), TIMEOUT, move || clusters_clone.iter().all(all_connections_established)); // start && wait for generation session to complete let session = clusters[0].client().new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap(); let session_clone = session.clone(); let clusters_clone = clusters.clone(); - loop_until(&mut runtime, TIMEOUT, move || (session_clone.state() == GenerationSessionState::Finished + loop_until(&runtime.executor(), TIMEOUT, move || (session_clone.state() == GenerationSessionState::Finished || session_clone.state() == GenerationSessionState::Failed) && clusters_clone[0].client().generation_session(&SessionId::default()).is_none()); assert!(session.joint_public_and_secret().unwrap().is_ok()); @@ -1542,7 +1533,7 @@ pub mod tests { let session_clone = session.clone(); let clusters_clone = clusters.clone(); - loop_until(&mut runtime, TIMEOUT, move || session_clone.is_finished() && (0..3).all(|i| + loop_until(&runtime.executor(), TIMEOUT, move || session_clone.is_finished() && (0..3).all(|i| clusters_clone[i].data.sessions.schnorr_signing_sessions.is_empty())); session0.wait().unwrap(); @@ -1553,7 +1544,7 @@ pub mod tests { let session_clone = session.clone(); let clusters_clone = clusters.clone(); - loop_until(&mut runtime, TIMEOUT, move || session_clone.is_finished() && (0..3).all(|i| + loop_until(&runtime.executor(), TIMEOUT, move || session_clone.is_finished() && (0..3).all(|i| clusters_clone[i].data.sessions.schnorr_signing_sessions.is_empty())); session2.wait().unwrap(); @@ -1566,10 +1557,9 @@ pub mod tests { let session = clusters[0].data.sessions.schnorr_signing_sessions.first().unwrap(); let session = session.clone(); - loop_until(&mut runtime, TIMEOUT, move || session.is_finished()); + loop_until(&runtime.executor(), TIMEOUT, move || session.is_finished()); session1.wait().unwrap_err(); shutdown_clusters(&clusters); - runtime.shutdown_now().wait().unwrap(); } // test ignored because of @@ -1579,17 +1569,17 @@ pub mod tests { #[ignore] fn ecdsa_signing_session_completes_if_node_does_not_have_a_share() { //::logger::init_log(); - let mut runtime = new_runtime(); + let runtime = new_runtime(); let clusters = make_clusters(&runtime, 6041, 4); run_clusters(&clusters); let clusters_clone = clusters.clone(); - loop_until(&mut runtime, TIMEOUT, move || clusters_clone.iter().all(all_connections_established)); + loop_until(&runtime.executor(), TIMEOUT, move || clusters_clone.iter().all(all_connections_established)); // start && wait for generation session to complete let session = clusters[0].client().new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap(); let session_clone = session.clone(); let clusters_clone = clusters.clone(); - loop_until(&mut runtime, TIMEOUT, move || (session_clone.state() == GenerationSessionState::Finished + loop_until(&runtime.executor(), TIMEOUT, move || (session_clone.state() == GenerationSessionState::Finished || session_clone.state() == GenerationSessionState::Failed) && clusters_clone[0].client().generation_session(&SessionId::default()).is_none()); assert!(session.joint_public_and_secret().unwrap().is_ok()); @@ -1605,7 +1595,7 @@ pub mod tests { let session_clone = session.clone(); let clusters_clone = clusters.clone(); - loop_until(&mut runtime, Duration::from_millis(1000), move || session_clone.is_finished() && (0..3).all(|i| + loop_until(&runtime.executor(), Duration::from_millis(1000), move || session_clone.is_finished() && (0..3).all(|i| clusters_clone[i].data.sessions.ecdsa_signing_sessions.is_empty())); session0.wait().unwrap(); @@ -1615,7 +1605,7 @@ pub mod tests { let session = clusters[2].data.sessions.ecdsa_signing_sessions.first().unwrap(); let session_clone = session.clone(); let clusters_clone = clusters.clone(); - loop_until(&mut runtime, Duration::from_millis(1000), move || session_clone.is_finished() && (0..3).all(|i| + loop_until(&runtime.executor(), Duration::from_millis(1000), move || session_clone.is_finished() && (0..3).all(|i| clusters_clone[i].data.sessions.ecdsa_signing_sessions.is_empty())); session2.wait().unwrap(); @@ -1626,9 +1616,8 @@ pub mod tests { let signature = sign(Random.generate().unwrap().secret(), &Default::default()).unwrap(); let session1 = clusters[0].client().new_ecdsa_signing_session(Default::default(), signature.into(), None, H256::random()).unwrap(); let session = clusters[0].data.sessions.ecdsa_signing_sessions.first().unwrap(); - loop_until(&mut runtime, Duration::from_millis(1000), move || session.is_finished()); + loop_until(&runtime.executor(), Duration::from_millis(1000), move || session.is_finished()); session1.wait().unwrap_err(); shutdown_clusters(&clusters); - runtime.shutdown_now().wait().unwrap(); } } diff --git a/secret_store/src/key_server_cluster/cluster_sessions.rs b/secret_store/src/key_server_cluster/cluster_sessions.rs index b34485638..d068c25b9 100644 --- a/secret_store/src/key_server_cluster/cluster_sessions.rs +++ b/secret_store/src/key_server_cluster/cluster_sessions.rs @@ -582,7 +582,6 @@ mod tests { pub fn make_cluster_sessions() -> ClusterSessions { let key_pair = Random.generate().unwrap(); let config = ClusterConfiguration { - threads: 1, self_key_pair: Arc::new(PlainNodeKeyPair::new(key_pair.clone())), listen_address: ("127.0.0.1".to_owned(), 100_u16), key_server_set: Arc::new(MapKeyServerSet::new(false, vec![(key_pair.public().clone(), format!("127.0.0.1:{}", 100).parse().unwrap())].into_iter().collect())), diff --git a/secret_store/src/lib.rs b/secret_store/src/lib.rs index ddd8f6d44..1acbde380 100644 --- a/secret_store/src/lib.rs +++ b/secret_store/src/lib.rs @@ -33,6 +33,7 @@ extern crate serde; extern crate serde_json; extern crate tiny_keccak; extern crate tokio; +extern crate parity_runtime; extern crate tokio_io; extern crate tokio_service; extern crate url; @@ -72,6 +73,7 @@ use kvdb::KeyValueDB; use ethcore::client::Client; use ethcore::miner::Miner; use sync::SyncProvider; +use parity_runtime::Executor; pub use types::{ServerKeyId, EncryptedDocumentKey, RequestSignature, Public, Error, NodeAddress, ContractAddress, ServiceConfiguration, ClusterConfiguration}; @@ -79,7 +81,9 @@ pub use traits::{NodeKeyPair, KeyServer}; pub use self::node_key_pair::{PlainNodeKeyPair, KeyStoreNodeKeyPair}; /// Start new key server instance -pub fn start(client: Arc, sync: Arc, miner: Arc, self_key_pair: Arc, mut config: ServiceConfiguration, db: Arc) -> Result, Error> { +pub fn start(client: Arc, sync: Arc, miner: Arc, self_key_pair: Arc, mut config: ServiceConfiguration, + db: Arc, executor: Executor) -> Result, Error> +{ let trusted_client = trusted_client::TrustedClient::new(self_key_pair.clone(), client.clone(), sync, miner); let acl_storage: Arc = match config.acl_check_contract_address.take() { Some(acl_check_contract_address) => acl_storage::OnChainAclStorage::new(trusted_client.clone(), acl_check_contract_address)?, @@ -89,13 +93,14 @@ pub fn start(client: Arc, sync: Arc, miner: Arc, se let key_server_set = key_server_set::OnChainKeyServerSet::new(trusted_client.clone(), config.cluster_config.key_server_set_contract_address.take(), self_key_pair.clone(), config.cluster_config.auto_migrate_enabled, config.cluster_config.nodes.clone())?; let key_storage = Arc::new(key_storage::PersistentKeyStorage::new(db)?); - let key_server = Arc::new(key_server::KeyServerImpl::new(&config.cluster_config, key_server_set.clone(), self_key_pair.clone(), acl_storage.clone(), key_storage.clone())?); + let key_server = Arc::new(key_server::KeyServerImpl::new(&config.cluster_config, key_server_set.clone(), self_key_pair.clone(), + acl_storage.clone(), key_storage.clone(), executor.clone())?); let cluster = key_server.cluster(); let key_server: Arc = key_server; // prepare HTTP listener let http_listener = match config.listener_address { - Some(listener_address) => Some(listener::http_listener::KeyServerHttpListener::start(listener_address, Arc::downgrade(&key_server))?), + Some(listener_address) => Some(listener::http_listener::KeyServerHttpListener::start(listener_address, Arc::downgrade(&key_server), executor)?), None => None, }; diff --git a/secret_store/src/listener/http_listener.rs b/secret_store/src/listener/http_listener.rs index 9877a4241..fdaac861b 100644 --- a/secret_store/src/listener/http_listener.rs +++ b/secret_store/src/listener/http_listener.rs @@ -26,7 +26,7 @@ use serde::Serialize; use serde_json; use tokio; use tokio::net::TcpListener; -use tokio::runtime::{Runtime, Builder as RuntimeBuilder}; +use parity_runtime::Executor; use futures::{future, Future, Stream}; use url::percent_encoding::percent_decode; @@ -46,7 +46,7 @@ use types::{Error, Public, MessageHash, NodeAddress, RequestSignature, ServerKey /// To change servers set: POST /admin/servers_set_change/{old_signature}/{new_signature} + BODY: json array of hex-encoded nodes ids pub struct KeyServerHttpListener { - _runtime: Runtime, + _executor: Executor, _handler: Arc, } @@ -86,15 +86,11 @@ struct KeyServerSharedHttpHandler { impl KeyServerHttpListener { /// Start KeyServer http listener - pub fn start(listener_address: NodeAddress, key_server: Weak) -> Result { + pub fn start(listener_address: NodeAddress, key_server: Weak, executor: Executor) -> Result { let shared_handler = Arc::new(KeyServerSharedHttpHandler { key_server: key_server, }); - let mut runtime = RuntimeBuilder::new() - // TODO: Add config option/arg? - .core_threads(2) - .build()?; let listener_address = format!("{}:{}", listener_address.address, listener_address.port).parse()?; let listener = TcpListener::bind(&listener_address)?; @@ -113,10 +109,10 @@ impl KeyServerHttpListener { tokio::spawn(serve) }); - runtime.spawn(server); + executor.spawn(server); let listener = KeyServerHttpListener { - _runtime: runtime, + _executor: executor, _handler: shared_handler, }; @@ -419,13 +415,16 @@ mod tests { use traits::KeyServer; use key_server::tests::DummyKeyServer; use types::NodeAddress; + use parity_runtime::Runtime; use super::{parse_request, Request, KeyServerHttpListener}; #[test] fn http_listener_successfully_drops() { let key_server: Arc = Arc::new(DummyKeyServer::default()); let address = NodeAddress { address: "127.0.0.1".into(), port: 9000 }; - let listener = KeyServerHttpListener::start(address, Arc::downgrade(&key_server)).unwrap(); + let runtime = Runtime::with_thread_count(1); + let listener = KeyServerHttpListener::start(address, Arc::downgrade(&key_server), + runtime.executor()).unwrap(); drop(listener); } diff --git a/secret_store/src/types/all.rs b/secret_store/src/types/all.rs index feca4141f..bf05b538d 100644 --- a/secret_store/src/types/all.rs +++ b/secret_store/src/types/all.rs @@ -75,8 +75,6 @@ pub struct ServiceConfiguration { /// Key server cluster configuration #[derive(Debug)] pub struct ClusterConfiguration { - /// Number of threads reserved by cluster. - pub threads: usize, /// This node address. pub listener_address: NodeAddress, /// All cluster nodes addresses. diff --git a/util/runtime/src/lib.rs b/util/runtime/src/lib.rs index aa7b6e73c..97bed9b13 100644 --- a/util/runtime/src/lib.rs +++ b/util/runtime/src/lib.rs @@ -16,8 +16,8 @@ //! Tokio Runtime wrapper. -extern crate futures; -extern crate tokio; +pub extern crate futures; +pub extern crate tokio; use std::{fmt, thread}; use std::sync::mpsc; @@ -222,6 +222,24 @@ impl Executor { } } +impl + Send + 'static> future::Executor for Executor { + fn execute(&self, future: F) -> Result<(), future::ExecuteError> { + match self.inner { + Mode::Tokio(ref executor) => executor.execute(future), + Mode::Sync => { + let _= future.wait(); + Ok(()) + }, + Mode::ThreadPerFuture => { + thread::spawn(move || { + let _= future.wait(); + }); + Ok(()) + }, + } + } +} + /// A handle to a runtime. Dropping the handle will cause runtime to shutdown. pub struct RuntimeHandle { close: Option>,