Remove secret_store runtimes. (#9888)

* Remove the independent runtimes from `KeyServerHttpListener` and
  `KeyServerCore` and instead require a `parity_runtime::Executor`
  to be passed upon creation of each.

* Remove the `threads` parameter from both `ClusterConfiguration` structs.

* Implement the `future::Executor` trait for `parity_runtime::Executor`.

* Update tests.
  - Update the `loop_until` function to instead use a oneshot to signal
    completion.
  - Modify the `make_key_servers` function to create and return a runtime.
This commit is contained in:
Nick Sanders 2018-11-25 09:36:43 -08:00 committed by Wei Tang
parent f20f4c74d2
commit c880716f16
12 changed files with 131 additions and 144 deletions

1
Cargo.lock generated
View File

@ -918,6 +918,7 @@ dependencies = [
"log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-bytes 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-crypto 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-runtime 0.1.0",
"parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -739,7 +739,7 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
account_provider: account_provider,
accounts_passwords: &passwords,
};
let secretstore_key_server = secretstore::start(cmd.secretstore_conf.clone(), secretstore_deps)?;
let secretstore_key_server = secretstore::start(cmd.secretstore_conf.clone(), secretstore_deps, runtime.executor())?;
// the ipfs server
let ipfs_server = ipfs::start_server(cmd.ipfs_conf.clone(), client.clone())?;

View File

@ -24,6 +24,7 @@ use ethcore::miner::Miner;
use ethkey::{Secret, Public};
use sync::SyncProvider;
use ethereum_types::Address;
use parity_runtime::Executor;
/// This node secret key.
#[derive(Debug, PartialEq, Clone)]
@ -100,14 +101,14 @@ pub struct Dependencies<'a> {
#[cfg(not(feature = "secretstore"))]
mod server {
use super::{Configuration, Dependencies};
use super::{Configuration, Dependencies, Executor};
/// Noop key server implementation
pub struct KeyServer;
impl KeyServer {
/// Create new noop key server
pub fn new(_conf: Configuration, _deps: Dependencies) -> Result<Self, String> {
pub fn new(_conf: Configuration, _deps: Dependencies, _executor: Executor) -> Result<Self, String> {
Ok(KeyServer)
}
}
@ -120,7 +121,7 @@ mod server {
use ethkey::KeyPair;
use ansi_term::Colour::{Red, White};
use db;
use super::{Configuration, Dependencies, NodeSecretKey, ContractAddress};
use super::{Configuration, Dependencies, NodeSecretKey, ContractAddress, Executor};
fn into_service_contract_address(address: ContractAddress) -> ethcore_secretstore::ContractAddress {
match address {
@ -136,7 +137,7 @@ mod server {
impl KeyServer {
/// Create new key server
pub fn new(mut conf: Configuration, deps: Dependencies) -> Result<Self, String> {
pub fn new(mut conf: Configuration, deps: Dependencies, executor: Executor) -> Result<Self, String> {
let self_secret: Arc<ethcore_secretstore::NodeKeyPair> = match conf.self_secret.take() {
Some(NodeSecretKey::Plain(secret)) => Arc::new(ethcore_secretstore::PlainNodeKeyPair::new(
KeyPair::from_secret(secret).map_err(|e| format!("invalid secret: {}", e))?)),
@ -179,7 +180,6 @@ mod server {
service_contract_doc_sretr_address: conf.service_contract_doc_sretr_address.map(into_service_contract_address),
acl_check_contract_address: conf.acl_check_contract_address.map(into_service_contract_address),
cluster_config: ethcore_secretstore::ClusterConfiguration {
threads: 4,
listener_address: ethcore_secretstore::NodeAddress {
address: conf.interface.clone(),
port: conf.port,
@ -198,7 +198,7 @@ mod server {
cconf.cluster_config.nodes.insert(self_secret.public().clone(), cconf.cluster_config.listener_address.clone());
let db = db::open_secretstore_db(&conf.data_path)?;
let key_server = ethcore_secretstore::start(deps.client, deps.sync, deps.miner, self_secret, cconf, db)
let key_server = ethcore_secretstore::start(deps.client, deps.sync, deps.miner, self_secret, cconf, db, executor)
.map_err(|e| format!("Error starting KeyServer {}: {}", key_server_name, e))?;
Ok(KeyServer {
@ -238,11 +238,11 @@ impl Default for Configuration {
}
/// Start secret store-related functionality
pub fn start(conf: Configuration, deps: Dependencies) -> Result<Option<KeyServer>, String> {
pub fn start(conf: Configuration, deps: Dependencies, executor: Executor) -> Result<Option<KeyServer>, String> {
if !conf.enabled {
return Ok(None);
}
KeyServer::new(conf, deps)
KeyServer::new(conf, deps, executor)
.map(|s| Some(s))
}

View File

@ -17,6 +17,7 @@ futures = "0.1"
rustc-hex = "1.0"
tiny-keccak = "1.4"
tokio = "~0.1.11"
parity-runtime = { path = "../util/runtime" }
tokio-io = "0.1"
tokio-service = "0.1"
url = "1.0"

View File

@ -15,14 +15,11 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::BTreeSet;
use std::thread;
use std::sync::Arc;
use std::sync::mpsc;
use futures::{self, Future};
use parking_lot::Mutex;
use tokio::runtime;
use crypto::DEFAULT_MAC;
use ethkey::crypto;
use parity_runtime::Executor;
use super::acl_storage::AclStorage;
use super::key_storage::KeyStorage;
use super::key_server_set::KeyServerSet;
@ -39,16 +36,16 @@ pub struct KeyServerImpl {
/// Secret store key server data.
pub struct KeyServerCore {
close: Option<futures::Complete<()>>,
handle: Option<thread::JoinHandle<()>>,
cluster: Arc<ClusterClient>,
}
impl KeyServerImpl {
/// Create new key server instance
pub fn new(config: &ClusterConfiguration, key_server_set: Arc<KeyServerSet>, self_key_pair: Arc<NodeKeyPair>, acl_storage: Arc<AclStorage>, key_storage: Arc<KeyStorage>) -> Result<Self, Error> {
pub fn new(config: &ClusterConfiguration, key_server_set: Arc<KeyServerSet>, self_key_pair: Arc<NodeKeyPair>,
acl_storage: Arc<AclStorage>, key_storage: Arc<KeyStorage>, executor: Executor) -> Result<Self, Error>
{
Ok(KeyServerImpl {
data: Arc::new(Mutex::new(KeyServerCore::new(config, key_server_set, self_key_pair, acl_storage, key_storage)?)),
data: Arc::new(Mutex::new(KeyServerCore::new(config, key_server_set, self_key_pair, acl_storage, key_storage, executor)?)),
})
}
@ -175,9 +172,10 @@ impl MessageSigner for KeyServerImpl {
}
impl KeyServerCore {
pub fn new(config: &ClusterConfiguration, key_server_set: Arc<KeyServerSet>, self_key_pair: Arc<NodeKeyPair>, acl_storage: Arc<AclStorage>, key_storage: Arc<KeyStorage>) -> Result<Self, Error> {
pub fn new(config: &ClusterConfiguration, key_server_set: Arc<KeyServerSet>, self_key_pair: Arc<NodeKeyPair>,
acl_storage: Arc<AclStorage>, key_storage: Arc<KeyStorage>, executor: Executor) -> Result<Self, Error>
{
let config = NetClusterConfiguration {
threads: config.threads,
self_key_pair: self_key_pair.clone(),
listen_address: (config.listener_address.address.clone(), config.listener_address.port),
key_server_set: key_server_set,
@ -188,45 +186,16 @@ impl KeyServerCore {
auto_migrate_enabled: config.auto_migrate_enabled,
};
let (stop, stopped) = futures::oneshot();
let (tx, rx) = mpsc::channel();
let handle = thread::Builder::new().name("KeyServerLoop".into()).spawn(move || {
let runtime_res = runtime::Builder::new()
.core_threads(config.threads)
.build();
let mut el = match runtime_res {
Ok(el) => el,
Err(e) => {
tx.send(Err(Error::Internal(format!("error initializing event loop: {}", e)))).expect("Rx is blocking upper thread.");
return;
},
};
let cluster = ClusterCore::new(el.executor(), config);
let cluster_client = cluster.and_then(|c| c.run().map(|_| c.client()));
tx.send(cluster_client.map_err(Into::into)).expect("Rx is blocking upper thread.");
let _ = el.block_on(futures::empty().select(stopped));
trace!(target: "secretstore_net", "{}: KeyServerLoop thread stopped", self_key_pair.public());
}).map_err(|e| Error::Internal(format!("{}", e)))?;
let cluster = rx.recv().map_err(|e| Error::Internal(format!("error initializing event loop: {}", e)))??;
let cluster = ClusterCore::new(executor, config)
.and_then(|c| c.run().map(|_| c.client()))
.map_err(|err| Error::from(err))?;
Ok(KeyServerCore {
close: Some(stop),
handle: Some(handle),
cluster: cluster,
cluster,
})
}
}
impl Drop for KeyServerCore {
fn drop(&mut self) {
self.close.take().map(|v| v.send(()));
self.handle.take().map(|h| h.join());
}
}
#[cfg(test)]
pub mod tests {
use std::collections::BTreeSet;
@ -243,6 +212,7 @@ pub mod tests {
use key_server_set::tests::MapKeyServerSet;
use key_server_cluster::math;
use ethereum_types::{H256, H520};
use parity_runtime::Runtime;
use types::{Error, Public, ClusterConfiguration, NodeAddress, RequestSignature, ServerKeyId,
EncryptedDocumentKey, EncryptedDocumentKeyShadow, MessageHash, EncryptedMessageSignature,
Requester, NodeId};
@ -294,10 +264,9 @@ pub mod tests {
}
}
fn make_key_servers(start_port: u16, num_nodes: usize) -> (Vec<KeyServerImpl>, Vec<Arc<DummyKeyStorage>>) {
fn make_key_servers(start_port: u16, num_nodes: usize) -> (Vec<KeyServerImpl>, Vec<Arc<DummyKeyStorage>>, Runtime) {
let key_pairs: Vec<_> = (0..num_nodes).map(|_| Random.generate().unwrap()).collect();
let configs: Vec<_> = (0..num_nodes).map(|i| ClusterConfiguration {
threads: 1,
listener_address: NodeAddress {
address: "127.0.0.1".into(),
port: start_port + (i as u16),
@ -316,11 +285,12 @@ pub mod tests {
.map(|(k, a)| (k.clone(), format!("{}:{}", a.address, a.port).parse().unwrap()))
.collect();
let key_storages = (0..num_nodes).map(|_| Arc::new(DummyKeyStorage::default())).collect::<Vec<_>>();
let runtime = Runtime::with_thread_count(4);
let key_servers: Vec<_> = configs.into_iter().enumerate().map(|(i, cfg)|
KeyServerImpl::new(&cfg, Arc::new(MapKeyServerSet::new(false, key_servers_set.clone())),
Arc::new(PlainNodeKeyPair::new(key_pairs[i].clone())),
Arc::new(DummyAclStorage::default()),
key_storages[i].clone()).unwrap()
key_storages[i].clone(), runtime.executor()).unwrap()
).collect();
// wait until connections are established. It is fast => do not bother with events here
@ -350,13 +320,13 @@ pub mod tests {
}
}
(key_servers, key_storages)
(key_servers, key_storages, runtime)
}
#[test]
fn document_key_generation_and_retrievement_works_over_network_with_single_node() {
//::logger::init_log();
let (key_servers, _) = make_key_servers(6070, 1);
let (key_servers, _, runtime) = make_key_servers(6070, 1);
// generate document key
let threshold = 0;
@ -372,12 +342,13 @@ pub mod tests {
let retrieved_key = crypto::ecies::decrypt(&secret, &DEFAULT_MAC, &retrieved_key).unwrap();
assert_eq!(retrieved_key, generated_key);
}
drop(runtime);
}
#[test]
fn document_key_generation_and_retrievement_works_over_network_with_3_nodes() {
//::logger::init_log();
let (key_servers, key_storages) = make_key_servers(6080, 3);
let (key_servers, key_storages, runtime) = make_key_servers(6080, 3);
let test_cases = [0, 1, 2];
for threshold in &test_cases {
@ -399,12 +370,13 @@ pub mod tests {
assert!(key_share.encrypted_point.is_some());
}
}
drop(runtime);
}
#[test]
fn server_key_generation_and_storing_document_key_works_over_network_with_3_nodes() {
//::logger::init_log();
let (key_servers, _) = make_key_servers(6090, 3);
let (key_servers, _, runtime) = make_key_servers(6090, 3);
let test_cases = [0, 1, 2];
for threshold in &test_cases {
@ -430,12 +402,13 @@ pub mod tests {
assert_eq!(retrieved_key, generated_key);
}
}
drop(runtime);
}
#[test]
fn server_key_generation_and_message_signing_works_over_network_with_3_nodes() {
//::logger::init_log();
let (key_servers, _) = make_key_servers(6100, 3);
let (key_servers, _, runtime) = make_key_servers(6100, 3);
let test_cases = [0, 1, 2];
for threshold in &test_cases {
@ -455,12 +428,13 @@ pub mod tests {
// check signature
assert_eq!(math::verify_schnorr_signature(&server_public, &(signature_c, signature_s), &message_hash), Ok(true));
}
drop(runtime);
}
#[test]
fn decryption_session_is_delegated_when_node_does_not_have_key_share() {
//::logger::init_log();
let (key_servers, _) = make_key_servers(6110, 3);
let (key_servers, _, runtime) = make_key_servers(6110, 3);
// generate document key
let threshold = 0;
@ -477,12 +451,13 @@ pub mod tests {
let retrieved_key = key_servers[0].restore_document_key(&document, &signature.into()).unwrap();
let retrieved_key = crypto::ecies::decrypt(&secret, &DEFAULT_MAC, &retrieved_key).unwrap();
assert_eq!(retrieved_key, generated_key);
drop(runtime);
}
#[test]
fn schnorr_signing_session_is_delegated_when_node_does_not_have_key_share() {
//::logger::init_log();
let (key_servers, _) = make_key_servers(6114, 3);
let (key_servers, _, runtime) = make_key_servers(6114, 3);
let threshold = 1;
// generate server key
@ -503,12 +478,13 @@ pub mod tests {
// check signature
assert_eq!(math::verify_schnorr_signature(&server_public, &(signature_c, signature_s), &message_hash), Ok(true));
drop(runtime);
}
#[test]
fn ecdsa_signing_session_is_delegated_when_node_does_not_have_key_share() {
//::logger::init_log();
let (key_servers, _) = make_key_servers(6117, 4);
let (key_servers, _, runtime) = make_key_servers(6117, 4);
let threshold = 1;
// generate server key
@ -528,6 +504,7 @@ pub mod tests {
// check signature
assert!(verify_public(&server_public, &signature.into(), &message_hash).unwrap());
drop(runtime);
}
#[test]

View File

@ -1367,12 +1367,12 @@ pub mod tests {
let clusters_clone = clusters.clone();
// establish connections
loop_until(&mut core, CONN_TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
loop_until(&core.executor(), CONN_TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
// run session to completion
let session_id = SessionId::default();
let session = clusters[0].client().new_generation_session(session_id, Default::default(), Default::default(), threshold).unwrap();
loop_until(&mut core, SESSION_TIMEOUT, move || session.joint_public_and_secret().is_some());
loop_until(&core.executor(), SESSION_TIMEOUT, move || session.joint_public_and_secret().is_some());
}
}

View File

@ -24,11 +24,11 @@ use std::net::{SocketAddr, IpAddr};
use futures::{future, Future, Stream};
use parking_lot::{Mutex, RwLock};
use tokio_io::IoFuture;
use tokio::runtime::TaskExecutor;
use tokio::timer::{Interval, timeout::Error as TimeoutError};
use tokio::net::{TcpListener, TcpStream};
use ethkey::{Public, KeyPair, Signature, Random, Generator};
use ethereum_types::{Address, H256};
use parity_runtime::Executor;
use key_server_cluster::{Error, NodeId, SessionId, Requester, AclStorage, KeyStorage, KeyServerSet, NodeKeyPair};
use key_server_cluster::cluster_sessions::{ClusterSession, AdminSession, ClusterSessions, SessionIdWithSubSession,
ClusterSessionsContainer, SERVERS_SET_CHANGE_SESSION_ID, create_cluster_view, AdminSessionCreationData, ClusterSessionsListener};
@ -121,8 +121,6 @@ pub trait Cluster: Send + Sync {
/// Cluster initialization parameters.
#[derive(Clone)]
pub struct ClusterConfiguration {
/// Number of threads reserved by cluster.
pub threads: usize,
/// Allow connecting to 'higher' nodes.
pub allow_connecting_to_higher_nodes: bool,
/// KeyPair this node holds.
@ -175,14 +173,14 @@ pub struct ClusterData {
/// Cluster configuration.
pub config: ClusterConfiguration,
/// Handle to the event loop.
pub executor: TaskExecutor,
pub executor: Executor,
/// KeyPair this node holds.
pub self_key_pair: Arc<NodeKeyPair>,
/// Connections data.
pub connections: ClusterConnections,
/// Active sessions data.
pub sessions: ClusterSessions,
/// Shutdown flag:
/// A shutdown flag.
pub is_shutdown: Arc<AtomicBool>,
}
@ -235,7 +233,7 @@ pub struct Connection {
}
impl ClusterCore {
pub fn new(executor: TaskExecutor, config: ClusterConfiguration) -> Result<Arc<Self>, Error> {
pub fn new(executor: Executor, config: ClusterConfiguration) -> Result<Arc<Self>, Error> {
let listen_address = make_socket_address(&config.listen_address.0, config.listen_address.1)?;
let connections = ClusterConnections::new(&config)?;
let servers_set_change_creator_connector = connections.connector.clone();
@ -790,7 +788,7 @@ impl ClusterConnections {
}
impl ClusterData {
pub fn new(executor: &TaskExecutor, config: ClusterConfiguration, connections: ClusterConnections, sessions: ClusterSessions) -> Arc<Self> {
pub fn new(executor: &Executor, config: ClusterConfiguration, connections: ClusterConnections, sessions: ClusterSessions) -> Arc<Self> {
Arc::new(ClusterData {
executor: executor.clone(),
self_key_pair: config.self_key_pair.clone(),
@ -807,12 +805,6 @@ impl ClusterData {
}
/// Spawns a future on the runtime.
//
// TODO: Consider implementing a more graceful shutdown process using an
// `AtomicBool`, etc. which would prevent tasks from being spawned after a
// shutdown signal is given. (Recursive calls, in
// `process_connection_messages` for example, appear to continue
// indefinitely.)
pub fn spawn<F>(&self, f: F) where F: Future<Item = (), Error = ()> + Send + 'static {
if self.is_shutdown.load(Ordering::Acquire) == false {
if let Err(err) = future::Executor::execute(&self.executor, Box::new(f)) {
@ -1139,9 +1131,12 @@ pub mod tests {
use std::collections::{BTreeSet, VecDeque};
use parking_lot::RwLock;
use tokio::{
runtime::{Runtime, Builder as RuntimeBuilder},
prelude::{future, Future},
};
use parity_runtime::{
futures::sync::oneshot,
Runtime, Executor,
};
use ethereum_types::{Address, H256};
use ethkey::{Random, Generator, Public, Signature, sign};
use key_server_cluster::{NodeId, SessionId, Requester, Error, DummyAclStorage, DummyKeyStorage,
@ -1263,16 +1258,18 @@ pub mod tests {
}
}
/// Loops until `predicate` returns `true` or `timeout` has elapsed.
pub fn loop_until<F>(runtime: &mut Runtime, timeout: Duration, predicate: F)
/// Blocks the calling thread, looping until `predicate` returns `true` or
/// `timeout` has elapsed.
pub fn loop_until<F>(executor: &Executor, timeout: Duration, predicate: F)
where F: Send + 'static + Fn() -> bool
{
use futures::Stream;
use tokio::timer::Interval;
let start = Instant::now();
let (complete_tx, complete_rx) = oneshot::channel();
runtime.block_on(Interval::new_interval(Duration::from_millis(1))
executor.spawn(Interval::new_interval(Duration::from_millis(1))
.and_then(move |_| {
if Instant::now() - start > timeout {
panic!("no result in {:?}", timeout);
@ -1282,8 +1279,13 @@ pub mod tests {
})
.take_while(move |_| future::ok(!predicate()))
.for_each(|_| Ok(()))
.then(|_| future::ok::<(), ()>(()))
).unwrap();
.then(|_| {
complete_tx.send(()).expect("receiver dropped");
future::ok::<(), ()>(())
})
);
complete_rx.wait().unwrap();
}
pub fn all_connections_established(cluster: &Arc<ClusterCore>) -> bool {
@ -1295,7 +1297,6 @@ pub mod tests {
pub fn make_clusters(runtime: &Runtime, ports_begin: u16, num_nodes: usize) -> Vec<Arc<ClusterCore>> {
let key_pairs: Vec<_> = (0..num_nodes).map(|_| Random.generate().unwrap()).collect();
let cluster_params: Vec<_> = (0..num_nodes).map(|i| ClusterConfiguration {
threads: 1,
self_key_pair: Arc::new(PlainNodeKeyPair::new(key_pairs[i].clone())),
listen_address: ("127.0.0.1".to_owned(), ports_begin + i as u16),
key_server_set: Arc::new(MapKeyServerSet::new(false, key_pairs.iter().enumerate()
@ -1331,21 +1332,17 @@ pub mod tests {
/// Returns a new runtime with a static number of threads.
pub fn new_runtime() -> Runtime {
RuntimeBuilder::new()
.core_threads(4)
.build()
.expect("Unable to create tokio runtime")
Runtime::with_thread_count(4)
}
#[test]
fn cluster_connects_to_other_nodes() {
let mut runtime = new_runtime();
let runtime = new_runtime();
let clusters = make_clusters(&runtime, 6010, 3);
run_clusters(&clusters);
let clusters_clone = clusters.clone();
loop_until(&mut runtime, TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
loop_until(&runtime.executor(), TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
shutdown_clusters(&clusters);
runtime.shutdown_now().wait().unwrap();
}
#[test]
@ -1359,17 +1356,16 @@ pub mod tests {
_ => panic!("unexpected success"),
}
shutdown_clusters(&clusters);
runtime.shutdown_now().wait().unwrap();
}
#[test]
fn error_in_generation_session_broadcasted_to_all_other_nodes() {
//::logger::init_log();
let mut runtime = new_runtime();
let runtime = new_runtime();
let clusters = make_clusters(&runtime, 6016, 3);
run_clusters(&clusters);
let clusters_clone = clusters.clone();
loop_until(&mut runtime, TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
loop_until(&runtime.executor(), TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
// ask one of nodes to produce faulty generation sessions
clusters[1].client().make_faulty_generation_sessions();
@ -1378,7 +1374,7 @@ pub mod tests {
let session = clusters[0].client().new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap();
let session_clone = session.clone();
let clusters_clone = clusters.clone();
loop_until(&mut runtime, TIMEOUT, move || session_clone.joint_public_and_secret().is_some()
loop_until(&runtime.executor(), TIMEOUT, move || session_clone.joint_public_and_secret().is_some()
&& clusters_clone[0].client().generation_session(&SessionId::default()).is_none());
assert!(session.joint_public_and_secret().unwrap().is_err());
@ -1389,24 +1385,22 @@ pub mod tests {
let clusters_clone = clusters.clone();
// wait for both session completion && session removal (session completion event is fired
// before session is removed from its own container by cluster)
loop_until(&mut runtime, TIMEOUT, move || session_clone.joint_public_and_secret().is_some()
loop_until(&runtime.executor(), TIMEOUT, move || session_clone.joint_public_and_secret().is_some()
&& clusters_clone[i].client().generation_session(&SessionId::default()).is_none());
assert!(session.joint_public_and_secret().unwrap().is_err());
}
}
shutdown_clusters(&clusters);
runtime.shutdown_now().wait().unwrap();
}
#[test]
fn generation_session_completion_signalled_if_failed_on_master() {
//::logger::init_log();
let mut runtime = new_runtime();
let runtime = new_runtime();
let clusters = make_clusters(&runtime, 6025, 3);
run_clusters(&clusters);
let clusters_clone = clusters.clone();
loop_until(&mut runtime, TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
loop_until(&runtime.executor(), TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
// ask one of nodes to produce faulty generation sessions
clusters[0].client().make_faulty_generation_sessions();
@ -1415,7 +1409,7 @@ pub mod tests {
let session = clusters[0].client().new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap();
let session_clone = session.clone();
let clusters_clone = clusters.clone();
loop_until(&mut runtime, TIMEOUT, move || session_clone.joint_public_and_secret().is_some()
loop_until(&runtime.executor(), TIMEOUT, move || session_clone.joint_public_and_secret().is_some()
&& clusters_clone[0].client().generation_session(&SessionId::default()).is_none());
assert!(session.joint_public_and_secret().unwrap().is_err());
@ -1426,29 +1420,28 @@ pub mod tests {
let clusters_clone = clusters.clone();
// wait for both session completion && session removal (session completion event is fired
// before session is removed from its own container by cluster)
loop_until(&mut runtime, TIMEOUT, move || session_clone.joint_public_and_secret().is_some()
loop_until(&runtime.executor(), TIMEOUT, move || session_clone.joint_public_and_secret().is_some()
&& clusters_clone[i].client().generation_session(&SessionId::default()).is_none());
assert!(session.joint_public_and_secret().unwrap().is_err());
}
}
shutdown_clusters(&clusters);
runtime.shutdown_now().wait().unwrap();
}
#[test]
fn generation_session_is_removed_when_succeeded() {
//::logger::init_log();
let mut runtime = new_runtime();
let runtime = new_runtime();
let clusters = make_clusters(&runtime, 6019, 3);
run_clusters(&clusters);
let clusters_clone = clusters.clone();
loop_until(&mut runtime, TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
loop_until(&runtime.executor(), TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
// start && wait for generation session to complete
let session = clusters[0].client().new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap();
let session_clone = session.clone();
let clusters_clone = clusters.clone();
loop_until(&mut runtime, TIMEOUT, move || (session_clone.state() == GenerationSessionState::Finished
loop_until(&runtime.executor(), TIMEOUT, move || (session_clone.state() == GenerationSessionState::Finished
|| session_clone.state() == GenerationSessionState::Failed)
&& clusters_clone[0].client().generation_session(&SessionId::default()).is_none());
assert!(session.joint_public_and_secret().unwrap().is_ok());
@ -1462,22 +1455,21 @@ pub mod tests {
// AND check that it is actually removed from cluster sessions
let session_clone = session.clone();
let clusters_clone = clusters.clone();
loop_until(&mut runtime, TIMEOUT, move || (session_clone.state() == GenerationSessionState::Finished
loop_until(&runtime.executor(), TIMEOUT, move || (session_clone.state() == GenerationSessionState::Finished
|| session_clone.state() == GenerationSessionState::Failed)
&& clusters_clone[i].client().generation_session(&SessionId::default()).is_none());
}
}
shutdown_clusters(&clusters);
runtime.shutdown_now().wait().unwrap();
}
#[test]
fn sessions_are_removed_when_initialization_fails() {
let mut runtime = new_runtime();
let runtime = new_runtime();
let clusters = make_clusters(&runtime, 6022, 3);
run_clusters(&clusters);
let clusters_clone = clusters.clone();
loop_until(&mut runtime, TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
loop_until(&runtime.executor(), TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
// generation session
{
@ -1506,7 +1498,6 @@ pub mod tests {
assert!(clusters[0].data.sessions.negotiation_sessions.is_empty());
}
shutdown_clusters(&clusters);
runtime.shutdown_now().wait().unwrap();
}
// test ignored because of
@ -1516,17 +1507,17 @@ pub mod tests {
#[ignore]
fn schnorr_signing_session_completes_if_node_does_not_have_a_share() {
//::logger::init_log();
let mut runtime = new_runtime();
let runtime = new_runtime();
let clusters = make_clusters(&runtime, 6028, 3);
run_clusters(&clusters);
let clusters_clone = clusters.clone();
loop_until(&mut runtime, TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
loop_until(&runtime.executor(), TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
// start && wait for generation session to complete
let session = clusters[0].client().new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap();
let session_clone = session.clone();
let clusters_clone = clusters.clone();
loop_until(&mut runtime, TIMEOUT, move || (session_clone.state() == GenerationSessionState::Finished
loop_until(&runtime.executor(), TIMEOUT, move || (session_clone.state() == GenerationSessionState::Finished
|| session_clone.state() == GenerationSessionState::Failed)
&& clusters_clone[0].client().generation_session(&SessionId::default()).is_none());
assert!(session.joint_public_and_secret().unwrap().is_ok());
@ -1542,7 +1533,7 @@ pub mod tests {
let session_clone = session.clone();
let clusters_clone = clusters.clone();
loop_until(&mut runtime, TIMEOUT, move || session_clone.is_finished() && (0..3).all(|i|
loop_until(&runtime.executor(), TIMEOUT, move || session_clone.is_finished() && (0..3).all(|i|
clusters_clone[i].data.sessions.schnorr_signing_sessions.is_empty()));
session0.wait().unwrap();
@ -1553,7 +1544,7 @@ pub mod tests {
let session_clone = session.clone();
let clusters_clone = clusters.clone();
loop_until(&mut runtime, TIMEOUT, move || session_clone.is_finished() && (0..3).all(|i|
loop_until(&runtime.executor(), TIMEOUT, move || session_clone.is_finished() && (0..3).all(|i|
clusters_clone[i].data.sessions.schnorr_signing_sessions.is_empty()));
session2.wait().unwrap();
@ -1566,10 +1557,9 @@ pub mod tests {
let session = clusters[0].data.sessions.schnorr_signing_sessions.first().unwrap();
let session = session.clone();
loop_until(&mut runtime, TIMEOUT, move || session.is_finished());
loop_until(&runtime.executor(), TIMEOUT, move || session.is_finished());
session1.wait().unwrap_err();
shutdown_clusters(&clusters);
runtime.shutdown_now().wait().unwrap();
}
// test ignored because of
@ -1579,17 +1569,17 @@ pub mod tests {
#[ignore]
fn ecdsa_signing_session_completes_if_node_does_not_have_a_share() {
//::logger::init_log();
let mut runtime = new_runtime();
let runtime = new_runtime();
let clusters = make_clusters(&runtime, 6041, 4);
run_clusters(&clusters);
let clusters_clone = clusters.clone();
loop_until(&mut runtime, TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
loop_until(&runtime.executor(), TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
// start && wait for generation session to complete
let session = clusters[0].client().new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap();
let session_clone = session.clone();
let clusters_clone = clusters.clone();
loop_until(&mut runtime, TIMEOUT, move || (session_clone.state() == GenerationSessionState::Finished
loop_until(&runtime.executor(), TIMEOUT, move || (session_clone.state() == GenerationSessionState::Finished
|| session_clone.state() == GenerationSessionState::Failed)
&& clusters_clone[0].client().generation_session(&SessionId::default()).is_none());
assert!(session.joint_public_and_secret().unwrap().is_ok());
@ -1605,7 +1595,7 @@ pub mod tests {
let session_clone = session.clone();
let clusters_clone = clusters.clone();
loop_until(&mut runtime, Duration::from_millis(1000), move || session_clone.is_finished() && (0..3).all(|i|
loop_until(&runtime.executor(), Duration::from_millis(1000), move || session_clone.is_finished() && (0..3).all(|i|
clusters_clone[i].data.sessions.ecdsa_signing_sessions.is_empty()));
session0.wait().unwrap();
@ -1615,7 +1605,7 @@ pub mod tests {
let session = clusters[2].data.sessions.ecdsa_signing_sessions.first().unwrap();
let session_clone = session.clone();
let clusters_clone = clusters.clone();
loop_until(&mut runtime, Duration::from_millis(1000), move || session_clone.is_finished() && (0..3).all(|i|
loop_until(&runtime.executor(), Duration::from_millis(1000), move || session_clone.is_finished() && (0..3).all(|i|
clusters_clone[i].data.sessions.ecdsa_signing_sessions.is_empty()));
session2.wait().unwrap();
@ -1626,9 +1616,8 @@ pub mod tests {
let signature = sign(Random.generate().unwrap().secret(), &Default::default()).unwrap();
let session1 = clusters[0].client().new_ecdsa_signing_session(Default::default(), signature.into(), None, H256::random()).unwrap();
let session = clusters[0].data.sessions.ecdsa_signing_sessions.first().unwrap();
loop_until(&mut runtime, Duration::from_millis(1000), move || session.is_finished());
loop_until(&runtime.executor(), Duration::from_millis(1000), move || session.is_finished());
session1.wait().unwrap_err();
shutdown_clusters(&clusters);
runtime.shutdown_now().wait().unwrap();
}
}

View File

@ -582,7 +582,6 @@ mod tests {
pub fn make_cluster_sessions() -> ClusterSessions {
let key_pair = Random.generate().unwrap();
let config = ClusterConfiguration {
threads: 1,
self_key_pair: Arc::new(PlainNodeKeyPair::new(key_pair.clone())),
listen_address: ("127.0.0.1".to_owned(), 100_u16),
key_server_set: Arc::new(MapKeyServerSet::new(false, vec![(key_pair.public().clone(), format!("127.0.0.1:{}", 100).parse().unwrap())].into_iter().collect())),

View File

@ -33,6 +33,7 @@ extern crate serde;
extern crate serde_json;
extern crate tiny_keccak;
extern crate tokio;
extern crate parity_runtime;
extern crate tokio_io;
extern crate tokio_service;
extern crate url;
@ -72,6 +73,7 @@ use kvdb::KeyValueDB;
use ethcore::client::Client;
use ethcore::miner::Miner;
use sync::SyncProvider;
use parity_runtime::Executor;
pub use types::{ServerKeyId, EncryptedDocumentKey, RequestSignature, Public,
Error, NodeAddress, ContractAddress, ServiceConfiguration, ClusterConfiguration};
@ -79,7 +81,9 @@ pub use traits::{NodeKeyPair, KeyServer};
pub use self::node_key_pair::{PlainNodeKeyPair, KeyStoreNodeKeyPair};
/// Start new key server instance
pub fn start(client: Arc<Client>, sync: Arc<SyncProvider>, miner: Arc<Miner>, self_key_pair: Arc<NodeKeyPair>, mut config: ServiceConfiguration, db: Arc<KeyValueDB>) -> Result<Box<KeyServer>, Error> {
pub fn start(client: Arc<Client>, sync: Arc<SyncProvider>, miner: Arc<Miner>, self_key_pair: Arc<NodeKeyPair>, mut config: ServiceConfiguration,
db: Arc<KeyValueDB>, executor: Executor) -> Result<Box<KeyServer>, Error>
{
let trusted_client = trusted_client::TrustedClient::new(self_key_pair.clone(), client.clone(), sync, miner);
let acl_storage: Arc<acl_storage::AclStorage> = match config.acl_check_contract_address.take() {
Some(acl_check_contract_address) => acl_storage::OnChainAclStorage::new(trusted_client.clone(), acl_check_contract_address)?,
@ -89,13 +93,14 @@ pub fn start(client: Arc<Client>, sync: Arc<SyncProvider>, miner: Arc<Miner>, se
let key_server_set = key_server_set::OnChainKeyServerSet::new(trusted_client.clone(), config.cluster_config.key_server_set_contract_address.take(),
self_key_pair.clone(), config.cluster_config.auto_migrate_enabled, config.cluster_config.nodes.clone())?;
let key_storage = Arc::new(key_storage::PersistentKeyStorage::new(db)?);
let key_server = Arc::new(key_server::KeyServerImpl::new(&config.cluster_config, key_server_set.clone(), self_key_pair.clone(), acl_storage.clone(), key_storage.clone())?);
let key_server = Arc::new(key_server::KeyServerImpl::new(&config.cluster_config, key_server_set.clone(), self_key_pair.clone(),
acl_storage.clone(), key_storage.clone(), executor.clone())?);
let cluster = key_server.cluster();
let key_server: Arc<KeyServer> = key_server;
// prepare HTTP listener
let http_listener = match config.listener_address {
Some(listener_address) => Some(listener::http_listener::KeyServerHttpListener::start(listener_address, Arc::downgrade(&key_server))?),
Some(listener_address) => Some(listener::http_listener::KeyServerHttpListener::start(listener_address, Arc::downgrade(&key_server), executor)?),
None => None,
};

View File

@ -26,7 +26,7 @@ use serde::Serialize;
use serde_json;
use tokio;
use tokio::net::TcpListener;
use tokio::runtime::{Runtime, Builder as RuntimeBuilder};
use parity_runtime::Executor;
use futures::{future, Future, Stream};
use url::percent_encoding::percent_decode;
@ -46,7 +46,7 @@ use types::{Error, Public, MessageHash, NodeAddress, RequestSignature, ServerKey
/// To change servers set: POST /admin/servers_set_change/{old_signature}/{new_signature} + BODY: json array of hex-encoded nodes ids
pub struct KeyServerHttpListener {
_runtime: Runtime,
_executor: Executor,
_handler: Arc<KeyServerSharedHttpHandler>,
}
@ -86,15 +86,11 @@ struct KeyServerSharedHttpHandler {
impl KeyServerHttpListener {
/// Start KeyServer http listener
pub fn start(listener_address: NodeAddress, key_server: Weak<KeyServer>) -> Result<Self, Error> {
pub fn start(listener_address: NodeAddress, key_server: Weak<KeyServer>, executor: Executor) -> Result<Self, Error> {
let shared_handler = Arc::new(KeyServerSharedHttpHandler {
key_server: key_server,
});
let mut runtime = RuntimeBuilder::new()
// TODO: Add config option/arg?
.core_threads(2)
.build()?;
let listener_address = format!("{}:{}", listener_address.address, listener_address.port).parse()?;
let listener = TcpListener::bind(&listener_address)?;
@ -113,10 +109,10 @@ impl KeyServerHttpListener {
tokio::spawn(serve)
});
runtime.spawn(server);
executor.spawn(server);
let listener = KeyServerHttpListener {
_runtime: runtime,
_executor: executor,
_handler: shared_handler,
};
@ -419,13 +415,16 @@ mod tests {
use traits::KeyServer;
use key_server::tests::DummyKeyServer;
use types::NodeAddress;
use parity_runtime::Runtime;
use super::{parse_request, Request, KeyServerHttpListener};
#[test]
fn http_listener_successfully_drops() {
let key_server: Arc<KeyServer> = Arc::new(DummyKeyServer::default());
let address = NodeAddress { address: "127.0.0.1".into(), port: 9000 };
let listener = KeyServerHttpListener::start(address, Arc::downgrade(&key_server)).unwrap();
let runtime = Runtime::with_thread_count(1);
let listener = KeyServerHttpListener::start(address, Arc::downgrade(&key_server),
runtime.executor()).unwrap();
drop(listener);
}

View File

@ -75,8 +75,6 @@ pub struct ServiceConfiguration {
/// Key server cluster configuration
#[derive(Debug)]
pub struct ClusterConfiguration {
/// Number of threads reserved by cluster.
pub threads: usize,
/// This node address.
pub listener_address: NodeAddress,
/// All cluster nodes addresses.

View File

@ -16,8 +16,8 @@
//! Tokio Runtime wrapper.
extern crate futures;
extern crate tokio;
pub extern crate futures;
pub extern crate tokio;
use std::{fmt, thread};
use std::sync::mpsc;
@ -222,6 +222,24 @@ impl Executor {
}
}
impl<F: Future<Item = (), Error = ()> + Send + 'static> future::Executor<F> for Executor {
fn execute(&self, future: F) -> Result<(), future::ExecuteError<F>> {
match self.inner {
Mode::Tokio(ref executor) => executor.execute(future),
Mode::Sync => {
let _= future.wait();
Ok(())
},
Mode::ThreadPerFuture => {
thread::spawn(move || {
let _= future.wait();
});
Ok(())
},
}
}
}
/// A handle to a runtime. Dropping the handle will cause runtime to shutdown.
pub struct RuntimeHandle {
close: Option<futures::Complete<()>>,