From 6f758bc7b16637f20d70e97e4fce023a50808761 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Thu, 14 Jun 2018 10:01:52 +0300 Subject: [PATCH] SecretStore: service pack 1 (#8435) * SecretStore: error unify initial commit SecretStore: pass real error in error messages SecretStore: is_internal_error -> Error::is_non_fatal warnings SecretStore: ConsensusTemporaryUnreachable fix after merge removed comments removed comments SecretStore: updated HTTP error responses SecretStore: more ConsensusTemporaryUnreachable tests fix after rebase * SecretStore: unified SS contract config options && read * SecretStore: service pack SecretStore: service pack (continue) * fixed grumbles --- parity/cli/mod.rs | 51 ++-- parity/cli/tests/config.full.toml | 3 +- parity/configuration.rs | 25 +- parity/secretstore.rs | 14 +- secret_store/src/acl_storage.rs | 47 ++-- secret_store/src/key_server.rs | 3 +- .../key_version_negotiation_session.rs | 168 ++++++++++-- .../servers_set_change_session.rs | 97 +++++-- .../admin_sessions/share_add_session.rs | 67 +++-- .../admin_sessions/share_change_session.rs | 13 +- .../client_sessions/decryption_session.rs | 22 ++ .../src/key_server_cluster/cluster.rs | 50 ++-- .../key_server_cluster/cluster_sessions.rs | 61 ++++- .../cluster_sessions_creator.rs | 3 + .../key_server_cluster/connection_trigger.rs | 38 ++- .../key_server_cluster/jobs/key_access_job.rs | 1 - .../src/key_server_cluster/message.rs | 15 ++ secret_store/src/key_server_set.rs | 165 ++++++------ secret_store/src/key_storage.rs | 1 - secret_store/src/lib.rs | 15 +- secret_store/src/listener/service_contract.rs | 155 +++++------ .../src/listener/service_contract_listener.rs | 248 +++++++++++------- secret_store/src/trusted_client.rs | 19 +- secret_store/src/types/all.rs | 6 +- 24 files changed, 853 insertions(+), 434 deletions(-) diff --git a/parity/cli/mod.rs b/parity/cli/mod.rs index a3f3d4887..b9c2ad7df 100644 --- a/parity/cli/mod.rs +++ b/parity/cli/mod.rs @@ -563,38 +563,42 @@ usage! { "--no-secretstore-http", "Disable Secret Store HTTP API.", - FLAG flag_no_secretstore_acl_check: (bool) = false, or |c: &Config| c.secretstore.as_ref()?.disable_acl_check.clone(), - "--no-acl-check", - "Disable ACL check (useful for test environments).", - FLAG flag_no_secretstore_auto_migrate: (bool) = false, or |c: &Config| c.secretstore.as_ref()?.disable_auto_migrate.clone(), "--no-secretstore-auto-migrate", "Do not run servers set change session automatically when servers set changes. This option has no effect when servers set is read from configuration file.", - ARG arg_secretstore_contract: (String) = "none", or |c: &Config| c.secretstore.as_ref()?.service_contract.clone(), + ARG arg_secretstore_acl_contract: (Option) = Some("registry".into()), or |c: &Config| c.secretstore.as_ref()?.acl_contract.clone(), + "--secretstore-acl-contract=[SOURCE]", + "Secret Store permissioning contract address source: none, registry (contract address is read from 'secretstore_acl_checker' entry in registry) or address.", + + ARG arg_secretstore_contract: (Option) = None, or |c: &Config| c.secretstore.as_ref()?.service_contract.clone(), "--secretstore-contract=[SOURCE]", - "Secret Store Service contract address source: none, registry (contract address is read from secretstore_service entry in registry) or address.", + "Secret Store Service contract address source: none, registry (contract address is read from 'secretstore_service' entry in registry) or address.", - ARG arg_secretstore_srv_gen_contract: (String) = "none", or |c: &Config| c.secretstore.as_ref()?.service_contract_srv_gen.clone(), + ARG arg_secretstore_srv_gen_contract: (Option) = None, or |c: &Config| c.secretstore.as_ref()?.service_contract_srv_gen.clone(), "--secretstore-srv-gen-contract=[SOURCE]", - "Secret Store Service server key generation contract address source: none, registry (contract address is read from secretstore_service_srv_gen entry in registry) or address.", + "Secret Store Service server key generation contract address source: none, registry (contract address is read from 'secretstore_service_srv_gen' entry in registry) or address.", - ARG arg_secretstore_srv_retr_contract: (String) = "none", or |c: &Config| c.secretstore.as_ref()?.service_contract_srv_retr.clone(), + ARG arg_secretstore_srv_retr_contract: (Option) = None, or |c: &Config| c.secretstore.as_ref()?.service_contract_srv_retr.clone(), "--secretstore-srv-retr-contract=[SOURCE]", - "Secret Store Service server key retrieval contract address source: none, registry (contract address is read from secretstore_service_srv_retr entry in registry) or address.", + "Secret Store Service server key retrieval contract address source: none, registry (contract address is read from 'secretstore_service_srv_retr' entry in registry) or address.", - ARG arg_secretstore_doc_store_contract: (String) = "none", or |c: &Config| c.secretstore.as_ref()?.service_contract_doc_store.clone(), + ARG arg_secretstore_doc_store_contract: (Option) = None, or |c: &Config| c.secretstore.as_ref()?.service_contract_doc_store.clone(), "--secretstore-doc-store-contract=[SOURCE]", - "Secret Store Service document key store contract address source: none, registry (contract address is read from secretstore_service_doc_store entry in registry) or address.", + "Secret Store Service document key store contract address source: none, registry (contract address is read from 'secretstore_service_doc_store' entry in registry) or address.", - ARG arg_secretstore_doc_sretr_contract: (String) = "none", or |c: &Config| c.secretstore.as_ref()?.service_contract_doc_sretr.clone(), + ARG arg_secretstore_doc_sretr_contract: (Option) = None, or |c: &Config| c.secretstore.as_ref()?.service_contract_doc_sretr.clone(), "--secretstore-doc-sretr-contract=[SOURCE]", - "Secret Store Service document key shadow retrieval contract address source: none, registry (contract address is read from secretstore_service_doc_sretr entry in registry) or address.", + "Secret Store Service document key shadow retrieval contract address source: none, registry (contract address is read from 'secretstore_service_doc_sretr' entry in registry) or address.", ARG arg_secretstore_nodes: (String) = "", or |c: &Config| c.secretstore.as_ref()?.nodes.as_ref().map(|vec| vec.join(",")), "--secretstore-nodes=[NODES]", "Comma-separated list of other secret store cluster nodes in form NODE_PUBLIC_KEY_IN_HEX@NODE_IP_ADDR:NODE_PORT.", + ARG arg_secretstore_server_set_contract: (Option) = Some("registry".into()), or |c: &Config| c.secretstore.as_ref()?.server_set_contract.clone(), + "--secretstore-server-set-contract=[SOURCE]", + "Secret Store server set contract address source: none, registry (contract address is read from 'secretstore_server_set' entry in registry) or address.", + ARG arg_secretstore_interface: (String) = "local", or |c: &Config| c.secretstore.as_ref()?.interface.clone(), "--secretstore-interface=[IP]", "Specify the hostname portion for listening to Secret Store Key Server internal requests, IP should be an interface's IP address, or local.", @@ -1193,8 +1197,8 @@ struct Dapps { struct SecretStore { disable: Option, disable_http: Option, - disable_acl_check: Option, disable_auto_migrate: Option, + acl_contract: Option, service_contract: Option, service_contract_srv_gen: Option, service_contract_srv_retr: Option, @@ -1203,6 +1207,7 @@ struct SecretStore { self_secret: Option, admin_public: Option, nodes: Option>, + server_set_contract: Option, interface: Option, port: Option, http_interface: Option, @@ -1620,16 +1625,17 @@ mod tests { // SECRETSTORE flag_no_secretstore: false, flag_no_secretstore_http: false, - flag_no_secretstore_acl_check: false, flag_no_secretstore_auto_migrate: false, - arg_secretstore_contract: "none".into(), - arg_secretstore_srv_gen_contract: "none".into(), - arg_secretstore_srv_retr_contract: "none".into(), - arg_secretstore_doc_store_contract: "none".into(), - arg_secretstore_doc_sretr_contract: "none".into(), + arg_secretstore_acl_contract: Some("registry".into()), + arg_secretstore_contract: Some("none".into()), + arg_secretstore_srv_gen_contract: Some("none".into()), + arg_secretstore_srv_retr_contract: Some("none".into()), + arg_secretstore_doc_store_contract: Some("none".into()), + arg_secretstore_doc_sretr_contract: Some("none".into()), arg_secretstore_secret: None, arg_secretstore_admin_public: None, arg_secretstore_nodes: "".into(), + arg_secretstore_server_set_contract: Some("registry".into()), arg_secretstore_interface: "local".into(), arg_secretstore_port: 8083u16, arg_secretstore_http_interface: "local".into(), @@ -1881,8 +1887,8 @@ mod tests { secretstore: Some(SecretStore { disable: None, disable_http: None, - disable_acl_check: None, disable_auto_migrate: None, + acl_contract: None, service_contract: None, service_contract_srv_gen: None, service_contract_srv_retr: None, @@ -1891,6 +1897,7 @@ mod tests { self_secret: None, admin_public: None, nodes: None, + server_set_contract: None, interface: None, port: Some(8083), http_interface: None, diff --git a/parity/cli/tests/config.full.toml b/parity/cli/tests/config.full.toml index fb3614aa9..2bc8778d0 100644 --- a/parity/cli/tests/config.full.toml +++ b/parity/cli/tests/config.full.toml @@ -91,12 +91,13 @@ pass = "test_pass" [secretstore] disable = false disable_http = false -disable_acl_check = false +acl_contract = "registry" service_contract = "none" service_contract_srv_gen = "none" service_contract_srv_retr = "none" service_contract_doc_store = "none" service_contract_doc_sretr = "none" +server_set_contract = "registry" nodes = [] http_interface = "local" http_port = 8082 diff --git a/parity/configuration.rs b/parity/configuration.rs index ec73046a4..2b8bd820f 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -604,8 +604,8 @@ impl Configuration { Ok(SecretStoreConfiguration { enabled: self.secretstore_enabled(), http_enabled: self.secretstore_http_enabled(), - acl_check_enabled: self.secretstore_acl_check_enabled(), auto_migrate_enabled: self.secretstore_auto_migrate_enabled(), + acl_check_contract_address: self.secretstore_acl_check_contract_address()?, service_contract_address: self.secretstore_service_contract_address()?, service_contract_srv_gen_address: self.secretstore_service_contract_srv_gen_address()?, service_contract_srv_retr_address: self.secretstore_service_contract_srv_retr_address()?, @@ -613,6 +613,7 @@ impl Configuration { service_contract_doc_sretr_address: self.secretstore_service_contract_doc_sretr_address()?, self_secret: self.secretstore_self_secret()?, nodes: self.secretstore_nodes()?, + key_server_set_contract_address: self.secretstore_key_server_set_contract_address()?, interface: self.secretstore_interface(), port: self.args.arg_ports_shift + self.args.arg_secretstore_port, http_interface: self.secretstore_http_interface(), @@ -1099,14 +1100,14 @@ impl Configuration { !self.args.flag_no_secretstore_http && cfg!(feature = "secretstore") } - fn secretstore_acl_check_enabled(&self) -> bool { - !self.args.flag_no_secretstore_acl_check - } - fn secretstore_auto_migrate_enabled(&self) -> bool { !self.args.flag_no_secretstore_auto_migrate } + fn secretstore_acl_check_contract_address(&self) -> Result, String> { + into_secretstore_service_contract_address(self.args.arg_secretstore_acl_contract.as_ref()) + } + fn secretstore_service_contract_address(&self) -> Result, String> { into_secretstore_service_contract_address(self.args.arg_secretstore_contract.as_ref()) } @@ -1127,6 +1128,10 @@ impl Configuration { into_secretstore_service_contract_address(self.args.arg_secretstore_doc_sretr_contract.as_ref()) } + fn secretstore_key_server_set_contract_address(&self) -> Result, String> { + into_secretstore_service_contract_address(self.args.arg_secretstore_server_set_contract.as_ref()) + } + fn verifier_settings(&self) -> VerifierSettings { let mut settings = VerifierSettings::default(); settings.scale_verifiers = self.args.flag_scale_verifiers; @@ -1145,11 +1150,11 @@ impl Configuration { } } -fn into_secretstore_service_contract_address(s: &str) -> Result, String> { - match s { - "none" => Ok(None), - "registry" => Ok(Some(SecretStoreContractAddress::Registry)), - a => Ok(Some(SecretStoreContractAddress::Address(a.parse().map_err(|e| format!("{}", e))?))), +fn into_secretstore_service_contract_address(s: Option<&String>) -> Result, String> { + match s.map(String::as_str) { + None | Some("none") => Ok(None), + Some("registry") => Ok(Some(SecretStoreContractAddress::Registry)), + Some(a) => Ok(Some(SecretStoreContractAddress::Address(a.parse().map_err(|e| format!("{}", e))?))), } } diff --git a/parity/secretstore.rs b/parity/secretstore.rs index 677c1c4aa..e018897bd 100644 --- a/parity/secretstore.rs +++ b/parity/secretstore.rs @@ -50,10 +50,10 @@ pub struct Configuration { pub enabled: bool, /// Is HTTP API enabled? pub http_enabled: bool, - /// Is ACL check enabled. - pub acl_check_enabled: bool, /// Is auto migrate enabled. pub auto_migrate_enabled: bool, + /// ACL check contract address. + pub acl_check_contract_address: Option, /// Service contract address. pub service_contract_address: Option, /// Server key generation service contract address. @@ -68,6 +68,8 @@ pub struct Configuration { pub self_secret: Option, /// Other nodes IDs + addresses. pub nodes: BTreeMap, + /// Key Server Set contract address. If None, 'nodes' map is used. + pub key_server_set_contract_address: Option, /// Interface to listen to pub interface: String, /// Port to listen to @@ -135,7 +137,7 @@ mod server { impl KeyServer { /// Create new key server pub fn new(mut conf: Configuration, deps: Dependencies) -> Result { - if !conf.acl_check_enabled { + if conf.acl_check_contract_address.is_none() { warn!("Running SecretStore with disabled ACL check: {}", Red.bold().paint("everyone has access to stored keys")); } @@ -174,7 +176,7 @@ mod server { service_contract_srv_retr_address: conf.service_contract_srv_retr_address.map(into_service_contract_address), service_contract_doc_store_address: conf.service_contract_doc_store_address.map(into_service_contract_address), service_contract_doc_sretr_address: conf.service_contract_doc_sretr_address.map(into_service_contract_address), - acl_check_enabled: conf.acl_check_enabled, + acl_check_contract_address: conf.acl_check_contract_address.map(into_service_contract_address), cluster_config: ethcore_secretstore::ClusterConfiguration { threads: 4, listener_address: ethcore_secretstore::NodeAddress { @@ -185,6 +187,7 @@ mod server { address: ip, port: port, })).collect(), + key_server_set_contract_address: conf.key_server_set_contract_address.map(into_service_contract_address), allow_connecting_to_higher_nodes: true, admin_public: conf.admin_public, auto_migrate_enabled: conf.auto_migrate_enabled, @@ -212,8 +215,8 @@ impl Default for Configuration { Configuration { enabled: true, http_enabled: true, - acl_check_enabled: true, auto_migrate_enabled: true, + acl_check_contract_address: Some(ContractAddress::Registry), service_contract_address: None, service_contract_srv_gen_address: None, service_contract_srv_retr_address: None, @@ -222,6 +225,7 @@ impl Default for Configuration { self_secret: None, admin_public: None, nodes: BTreeMap::new(), + key_server_set_contract_address: Some(ContractAddress::Registry), interface: "127.0.0.1".to_owned(), port: 8083, http_interface: "127.0.0.1".to_owned(), diff --git a/secret_store/src/acl_storage.rs b/secret_store/src/acl_storage.rs index 10b58a9c7..efc01f030 100644 --- a/secret_store/src/acl_storage.rs +++ b/secret_store/src/acl_storage.rs @@ -18,11 +18,11 @@ use std::sync::Arc; use std::collections::{HashMap, HashSet}; use std::time::Duration; use parking_lot::{Mutex, RwLock}; -use ethcore::client::{BlockId, ChainNotify, ChainRoute, CallContract, RegistryInfo}; +use ethcore::client::{BlockId, ChainNotify, ChainRoute, CallContract}; use ethereum_types::{H256, Address}; use bytes::Bytes; use trusted_client::TrustedClient; -use types::{Error, ServerKeyId}; +use types::{Error, ServerKeyId, ContractAddress}; use_contract!(acl_storage, "AclStorage", "res/acl_storage.json"); @@ -44,8 +44,10 @@ pub struct OnChainAclStorage { struct CachedContract { /// Blockchain client. client: TrustedClient, - /// Contract address. - contract_addr: Option
, + /// Contract address source. + address_source: ContractAddress, + /// Current contract address. + contract_address: Option
, /// Contract at given address. contract: acl_storage::AclStorage, } @@ -57,10 +59,10 @@ pub struct DummyAclStorage { } impl OnChainAclStorage { - pub fn new(trusted_client: TrustedClient) -> Result, Error> { + pub fn new(trusted_client: TrustedClient, address_source: ContractAddress) -> Result, Error> { let client = trusted_client.get_untrusted(); let acl_storage = Arc::new(OnChainAclStorage { - contract: Mutex::new(CachedContract::new(trusted_client)), + contract: Mutex::new(CachedContract::new(trusted_client, address_source)), }); client .ok_or_else(|| Error::Internal("Constructing OnChainAclStorage without active Client".into()))? @@ -78,36 +80,37 @@ impl AclStorage for OnChainAclStorage { impl ChainNotify for OnChainAclStorage { fn new_blocks(&self, _imported: Vec, _invalid: Vec, route: ChainRoute, _sealed: Vec, _proposed: Vec, _duration: Duration) { if !route.enacted().is_empty() || !route.retracted().is_empty() { - self.contract.lock().update() + self.contract.lock().update_contract_address() } } } impl CachedContract { - pub fn new(client: TrustedClient) -> Self { - CachedContract { + pub fn new(client: TrustedClient, address_source: ContractAddress) -> Self { + let mut contract = CachedContract { client, - contract_addr: None, + address_source, + contract_address: None, contract: acl_storage::AclStorage::default(), - } + }; + contract.update_contract_address(); + contract } - pub fn update(&mut self) { - if let Some(client) = self.client.get() { - match client.registry_address(ACL_CHECKER_CONTRACT_REGISTRY_NAME.to_owned(), BlockId::Latest) { - Some(new_contract_addr) if Some(new_contract_addr).as_ref() != self.contract_addr.as_ref() => { - trace!(target: "secretstore", "Configuring for ACL checker contract from {}", new_contract_addr); - self.contract_addr = Some(new_contract_addr); - }, - Some(_) | None => () - } + pub fn update_contract_address(&mut self) { + let contract_address = self.client.read_contract_address(ACL_CHECKER_CONTRACT_REGISTRY_NAME.into(), &self.address_source); + if contract_address != self.contract_address { + trace!(target: "secretstore", "Configuring for ACL checker contract from address {:?}", + contract_address); + + self.contract_address = contract_address; } } pub fn check(&mut self, requester: Address, document: &ServerKeyId) -> Result { if let Some(client) = self.client.get() { - // call contract to check access - match self.contract_addr { + // call contract to check accesss + match self.contract_address { Some(contract_address) => { let do_call = |data| client.call_contract(BlockId::Latest, contract_address, data); self.contract.functions() diff --git a/secret_store/src/key_server.rs b/secret_store/src/key_server.rs index 099d8aa45..3b2419977 100644 --- a/secret_store/src/key_server.rs +++ b/secret_store/src/key_server.rs @@ -303,6 +303,7 @@ pub mod tests { address: "127.0.0.1".into(), port: start_port + (j as u16), })).collect(), + key_server_set_contract_address: None, allow_connecting_to_higher_nodes: false, admin_public: None, auto_migrate_enabled: false, @@ -312,7 +313,7 @@ pub mod tests { .collect(); let key_storages = (0..num_nodes).map(|_| Arc::new(DummyKeyStorage::default())).collect::>(); let key_servers: Vec<_> = configs.into_iter().enumerate().map(|(i, cfg)| - KeyServerImpl::new(&cfg, Arc::new(MapKeyServerSet::new(key_servers_set.clone())), + KeyServerImpl::new(&cfg, Arc::new(MapKeyServerSet::new(false, key_servers_set.clone())), Arc::new(PlainNodeKeyPair::new(key_pairs[i].clone())), Arc::new(DummyAclStorage::default()), key_storages[i].clone()).unwrap() diff --git a/secret_store/src/key_server_cluster/admin_sessions/key_version_negotiation_session.rs b/secret_store/src/key_server_cluster/admin_sessions/key_version_negotiation_session.rs index 9aeb5ca34..4839ac41e 100644 --- a/secret_store/src/key_server_cluster/admin_sessions/key_version_negotiation_session.rs +++ b/secret_store/src/key_server_cluster/admin_sessions/key_version_negotiation_session.rs @@ -25,7 +25,8 @@ use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSessi use key_server_cluster::decryption_session::SessionImpl as DecryptionSession; use key_server_cluster::signing_session_ecdsa::SessionImpl as EcdsaSigningSession; use key_server_cluster::signing_session_schnorr::SessionImpl as SchnorrSigningSession; -use key_server_cluster::message::{Message, KeyVersionNegotiationMessage, RequestKeyVersions, KeyVersions}; +use key_server_cluster::message::{Message, KeyVersionNegotiationMessage, RequestKeyVersions, + KeyVersions, KeyVersionsError, FailedKeyVersionContinueAction}; use key_server_cluster::admin_sessions::ShareChangeSessionMeta; // TODO [Opt]: change sessions so that versions are sent by chunks. @@ -34,6 +35,8 @@ const VERSIONS_PER_MESSAGE: usize = 32; /// Key version negotiation transport. pub trait SessionTransport { + /// Broadcast message to all nodes. + fn broadcast(&self, message: KeyVersionNegotiationMessage) -> Result<(), Error>; /// Send message to given node. fn send(&self, node: &NodeId, message: KeyVersionNegotiationMessage) -> Result<(), Error>; } @@ -63,6 +66,13 @@ pub enum ContinueAction { EcdsaSign(Arc, H256), } +/// Failed action after key version is negotiated. +#[derive(Clone, Debug, PartialEq)] +pub enum FailedContinueAction { + /// Decryption origin + requester. + Decrypt(Option
, Address), +} + /// Immutable session data. struct SessionCore { /// Session meta. @@ -92,9 +102,11 @@ struct SessionData { /// { Version => Nodes } pub versions: Option>>, /// Session result. - pub result: Option>, + pub result: Option, Error>>, /// Continue action. pub continue_with: Option, + /// Failed continue action (reported in error message by master node). + pub failed_continue_with: Option, } /// SessionImpl creation parameters @@ -155,6 +167,7 @@ pub struct LargestSupportResultComputer; impl SessionImpl where T: SessionTransport { /// Create new session. pub fn new(params: SessionParams) -> Self { + let threshold = params.key_share.as_ref().map(|key_share| key_share.threshold); SessionImpl { core: SessionCore { meta: params.meta, @@ -168,10 +181,11 @@ impl SessionImpl where T: SessionTransport { data: Mutex::new(SessionData { state: SessionState::WaitingForInitialization, confirmations: None, - threshold: None, + threshold: threshold, versions: None, result: None, continue_with: None, + failed_continue_with: None, }) } } @@ -183,7 +197,8 @@ impl SessionImpl where T: SessionTransport { /// Return key threshold. pub fn key_threshold(&self) -> Result { - Ok(self.data.lock().threshold.clone().ok_or(Error::InvalidStateForRequest)?) + self.data.lock().threshold.clone() + .ok_or(Error::InvalidStateForRequest) } /// Return result computer reference. @@ -203,8 +218,13 @@ impl SessionImpl where T: SessionTransport { self.data.lock().continue_with.take() } + /// Take failed continue action. + pub fn take_failed_continue_action(&self) -> Option { + self.data.lock().failed_continue_with.take() + } + /// Wait for session completion. - pub fn wait(&self) -> Result<(H256, NodeId), Error> { + pub fn wait(&self) -> Result, Error> { Self::wait_session(&self.core.completed, &self.data, None, |data| data.result.clone()) .expect("wait_session returns Some if called without timeout; qed") } @@ -270,6 +290,12 @@ impl SessionImpl where T: SessionTransport { &KeyVersionNegotiationMessage::KeyVersions(ref message) => self.on_key_versions(sender, message), &KeyVersionNegotiationMessage::KeyVersionsError(ref message) => { + // remember failed continue action + if let Some(FailedKeyVersionContinueAction::Decrypt(Some(ref origin), ref requester)) = message.continue_with { + self.data.lock().failed_continue_with = + Some(FailedContinueAction::Decrypt(Some(origin.clone().into()), requester.clone().into())); + } + self.on_session_error(sender, message.error.clone()); Ok(()) }, @@ -309,6 +335,8 @@ impl SessionImpl where T: SessionTransport { // update state data.state = SessionState::Finished; + data.result = Some(Ok(None)); + self.core.completed.notify_all(); Ok(()) } @@ -361,8 +389,47 @@ impl SessionImpl where T: SessionTransport { let confirmations = data.confirmations.as_ref().expect(reason); let versions = data.versions.as_ref().expect(reason); if let Some(result) = core.result_computer.compute_result(data.threshold.clone(), confirmations, versions) { + // when the master node processing decryption service request, it starts with a key version negotiation session + // if the negotiation fails, only master node knows about it + // => if the error is fatal, only the master will know about it and report it to the contract && the request will never be rejected + // => let's broadcast fatal error so that every other node know about it, and, if it trusts to master node + // will report error to the contract + if let (Some(continue_with), Err(error)) = (data.continue_with.as_ref(), result.as_ref()) { + let origin = match *continue_with { + ContinueAction::Decrypt(_, origin, _, _) => origin.clone(), + _ => None, + }; + + let requester = match *continue_with { + ContinueAction::Decrypt(ref session, _, _, _) => session.requester().and_then(|r| r.address(&core.meta.id).ok()), + _ => None, + }; + + if origin.is_some() && requester.is_some() && !error.is_non_fatal() { + let requester = requester.expect("checked in above condition; qed"); + data.failed_continue_with = + Some(FailedContinueAction::Decrypt(origin.clone(), requester.clone())); + + let send_result = core.transport.broadcast(KeyVersionNegotiationMessage::KeyVersionsError(KeyVersionsError { + session: core.meta.id.clone().into(), + sub_session: core.sub_session.clone().into(), + session_nonce: core.nonce, + error: error.clone(), + continue_with: Some(FailedKeyVersionContinueAction::Decrypt( + origin.map(Into::into), + requester.into(), + )), + })); + + if let Err(send_error) = send_result { + warn!(target: "secretstore_net", "{}: failed to broadcast key version negotiation error {}: {}", + core.meta.self_node_id, error, send_error); + } + } + } + data.state = SessionState::Finished; - data.result = Some(result); + data.result = Some(result.map(Some)); core.completed.notify_all(); } } @@ -390,7 +457,7 @@ impl ClusterSession for SessionImpl where T: SessionTransport { data.confirmations.as_mut().expect("checked a line above; qed").clear(); Self::try_complete(&self.core, &mut *data); if data.state != SessionState::Finished { - warn!("{}: key version negotiation session failed with timeout", self.core.meta.self_node_id); + warn!(target: "secretstore_net", "{}: key version negotiation session failed with timeout", self.core.meta.self_node_id); data.result = Some(Err(Error::ConsensusTemporaryUnreachable)); self.core.completed.notify_all(); @@ -407,17 +474,22 @@ impl ClusterSession for SessionImpl where T: SessionTransport { if data.confirmations.is_some() { let is_waiting_for_confirmation = data.confirmations.as_mut().expect("checked a line above; qed").remove(node); - if is_waiting_for_confirmation { - Self::try_complete(&self.core, &mut *data); - if data.state != SessionState::Finished { - warn!("{}: key version negotiation session failed because {} connection has timeouted", self.core.meta.self_node_id, node); + if !is_waiting_for_confirmation { + return; + } - data.state = SessionState::Finished; - data.result = Some(Err(error)); - self.core.completed.notify_all(); - } + Self::try_complete(&self.core, &mut *data); + if data.state == SessionState::Finished { + return; } } + + warn!(target: "secretstore_net", "{}: key version negotiation session failed because of {} from {}", + self.core.meta.self_node_id, error, node); + + data.state = SessionState::Finished; + data.result = Some(Err(error)); + self.core.completed.notify_all(); } fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error> { @@ -429,6 +501,10 @@ impl ClusterSession for SessionImpl where T: SessionTransport { } impl SessionTransport for IsolatedSessionTransport { + fn broadcast(&self, message: KeyVersionNegotiationMessage) -> Result<(), Error> { + self.cluster.broadcast(Message::KeyVersionNegotiation(message)) + } + fn send(&self, node: &NodeId, message: KeyVersionNegotiationMessage) -> Result<(), Error> { self.cluster.send(node, Message::KeyVersionNegotiation(message)) } @@ -514,20 +590,28 @@ impl SessionResultComputer for LargestSupportResultComputer { mod tests { use std::sync::Arc; use std::collections::{VecDeque, BTreeMap, BTreeSet}; - use key_server_cluster::{NodeId, SessionId, Error, KeyStorage, DummyKeyStorage, DocumentKeyShare, DocumentKeyShareVersion}; + use ethkey::public_to_address; + use key_server_cluster::{NodeId, SessionId, Error, KeyStorage, DummyKeyStorage, + DocumentKeyShare, DocumentKeyShareVersion}; use key_server_cluster::math; use key_server_cluster::cluster::Cluster; use key_server_cluster::cluster::tests::DummyCluster; + use key_server_cluster::cluster_sessions::ClusterSession; use key_server_cluster::admin_sessions::ShareChangeSessionMeta; + use key_server_cluster::decryption_session::create_default_decryption_session; use key_server_cluster::message::{Message, KeyVersionNegotiationMessage, RequestKeyVersions, KeyVersions}; use super::{SessionImpl, SessionTransport, SessionParams, FastestResultComputer, LargestSupportResultComputer, - SessionResultComputer, SessionState}; + SessionResultComputer, SessionState, ContinueAction, FailedContinueAction}; struct DummyTransport { cluster: Arc, } impl SessionTransport for DummyTransport { + fn broadcast(&self, message: KeyVersionNegotiationMessage) -> Result<(), Error> { + self.cluster.broadcast(Message::KeyVersionNegotiation(message)) + } + fn send(&self, node: &NodeId, message: KeyVersionNegotiationMessage) -> Result<(), Error> { self.cluster.send(node, Message::KeyVersionNegotiation(message)) } @@ -600,6 +684,27 @@ mod tests { pub fn session(&self, idx: usize) -> &SessionImpl { &self.nodes.values().nth(idx).unwrap().session } + + pub fn take_message(&mut self) -> Option<(NodeId, NodeId, Message)> { + self.nodes.values() + .filter_map(|n| n.cluster.take_message().map(|m| (n.session.meta().self_node_id.clone(), m.0, m.1))) + .nth(0) + .or_else(|| self.queue.pop_front()) + } + + pub fn process_message(&mut self, msg: (NodeId, NodeId, Message)) -> Result<(), Error> { + match msg.2 { + Message::KeyVersionNegotiation(message) => + self.nodes[&msg.1].session.process_message(&msg.0, &message), + _ => panic!("unexpected"), + } + } + + pub fn run(&mut self) { + while let Some((from, to, message)) = self.take_message() { + self.process_message((from, to, message)).unwrap(); + } + } } #[test] @@ -739,6 +844,9 @@ mod tests { ml.session(0).initialize(ml.nodes.keys().cloned().collect()).unwrap(); // we can't be sure that node has given key version because previous ShareAdd session could fail assert!(ml.session(0).data.lock().state != SessionState::Finished); + + // check that upon completion, threshold is known + assert_eq!(ml.session(0).key_threshold(), Ok(1)); } #[test] @@ -757,4 +865,30 @@ mod tests { let computer = LargestSupportResultComputer; assert_eq!(computer.compute_result(Some(10), &Default::default(), &Default::default()), Some(Err(Error::ServerKeyIsNotFound))); } + + #[test] + fn fatal_error_is_not_broadcasted_if_started_without_origin() { + let mut ml = MessageLoop::empty(3); + ml.session(0).set_continue_action(ContinueAction::Decrypt(create_default_decryption_session(), None, false, false)); + ml.session(0).initialize(ml.nodes.keys().cloned().collect()).unwrap(); + ml.run(); + + assert!(ml.nodes.values().all(|n| n.session.is_finished() && + n.session.take_failed_continue_action().is_none())); + } + + #[test] + fn fatal_error_is_broadcasted_if_started_with_origin() { + let mut ml = MessageLoop::empty(3); + ml.session(0).set_continue_action(ContinueAction::Decrypt(create_default_decryption_session(), Some(1.into()), true, true)); + ml.session(0).initialize(ml.nodes.keys().cloned().collect()).unwrap(); + ml.run(); + + // on all nodes session is completed + assert!(ml.nodes.values().all(|n| n.session.is_finished())); + + // slave nodes have non-empty failed continue action + assert!(ml.nodes.values().skip(1).all(|n| n.session.take_failed_continue_action() + == Some(FailedContinueAction::Decrypt(Some(1.into()), public_to_address(&2.into()))))); + } } diff --git a/secret_store/src/key_server_cluster/admin_sessions/servers_set_change_session.rs b/secret_store/src/key_server_cluster/admin_sessions/servers_set_change_session.rs index 01cb03131..af612c3d9 100644 --- a/secret_store/src/key_server_cluster/admin_sessions/servers_set_change_session.rs +++ b/secret_store/src/key_server_cluster/admin_sessions/servers_set_change_session.rs @@ -249,10 +249,16 @@ impl SessionImpl { })?; consensus_session.initialize(self.core.all_nodes_set.clone())?; - debug_assert!(consensus_session.state() != ConsensusSessionState::ConsensusEstablished); + + let is_finished = consensus_session.state() == ConsensusSessionState::ConsensusEstablished; data.consensus_session = Some(consensus_session); data.new_nodes_set = Some(new_nodes_set); + // this is the case when all other nodes are isolated + if is_finished { + Self::complete_session(&self.core, &mut *data)?; + } + Ok(()) } @@ -483,6 +489,7 @@ impl SessionImpl { false => { let master_plan = ShareChangeSessionPlan { key_version: message.version.clone().into(), + version_holders: message.version_holders.iter().cloned().map(Into::into).collect(), consensus_group: message.consensus_group.iter().cloned().map(Into::into).collect(), new_nodes_map: message.new_nodes_map.iter().map(|(k, v)| (k.clone().into(), v.clone().map(Into::into))).collect(), }; @@ -496,22 +503,20 @@ impl SessionImpl { let master_node_id = message.master_node_id.clone().into(); if let Some(key_share) = self.core.key_storage.get(&key_id)? { let version = message.version.clone().into(); - if let Ok(key_version) = key_share.version(&version) { - let key_share_owners = key_version.id_numbers.keys().cloned().collect(); - let new_nodes_set = data.new_nodes_set.as_ref() - .expect("new_nodes_set is filled during consensus establishing; change sessions are running after this; qed"); - let local_plan = prepare_share_change_session_plan( - &self.core.all_nodes_set, - key_share.threshold, - &key_id, - version, - &master_node_id, - &key_share_owners, - new_nodes_set)?; + let key_share_owners = message.version_holders.iter().cloned().map(Into::into).collect(); + let new_nodes_set = data.new_nodes_set.as_ref() + .expect("new_nodes_set is filled during consensus establishing; change sessions are running after this; qed"); + let local_plan = prepare_share_change_session_plan( + &self.core.all_nodes_set, + key_share.threshold, + &key_id, + version, + &master_node_id, + &key_share_owners, + new_nodes_set)?; - if local_plan.new_nodes_map.keys().collect::>() != master_plan.new_nodes_map.keys().collect::>() { - return Err(Error::InvalidMessage); - } + if local_plan.new_nodes_map.keys().collect::>() != master_plan.new_nodes_map.keys().collect::>() { + return Err(Error::InvalidMessage); } } @@ -791,7 +796,9 @@ impl SessionImpl { // get selected version && old nodes set from key negotiation session let negotiation_session = data.negotiation_sessions.remove(&key_id) .expect("share change session is only initialized when negotiation is completed; qed"); - let (selected_version, selected_master) = negotiation_session.wait()?; + let (selected_version, selected_master) = negotiation_session + .wait()? + .expect("initialize_share_change_session is only called on share change master; negotiation session completes with some on master; qed"); let selected_version_holders = negotiation_session.version_holders(&selected_version)?; let selected_version_threshold = negotiation_session.key_threshold()?; @@ -818,6 +825,7 @@ impl SessionImpl { session_nonce: core.nonce, key_id: key_id.clone().into(), version: selected_version.into(), + version_holders: old_nodes_set.iter().cloned().map(Into::into).collect(), master_node_id: selected_master.clone().into(), consensus_group: session_plan.consensus_group.iter().cloned().map(Into::into).collect(), new_nodes_map: session_plan.new_nodes_map.iter() @@ -942,7 +950,8 @@ impl ClusterSession for SessionImpl { let mut data = self.data.lock(); - warn!("{}: servers set change session failed: {} on {}", self.core.meta.self_node_id, error, node); + warn!(target: "secretstore_net", "{}: servers set change session failed: {} on {}", + self.core.meta.self_node_id, error, node); data.state = SessionState::Finished; data.result = Some(Err(error)); @@ -1007,6 +1016,14 @@ impl JobTransport for UnknownSessionsJobTransport { } impl KeyVersionNegotiationTransport for ServersSetChangeKeyVersionNegotiationTransport { + fn broadcast(&self, message: KeyVersionNegotiationMessage) -> Result<(), Error> { + self.cluster.broadcast(Message::ServersSetChange(ServersSetChangeMessage::ShareChangeKeyVersionNegotiation(ShareChangeKeyVersionNegotiation { + session: self.id.clone().into(), + session_nonce: self.nonce, + message: message, + }))) + } + fn send(&self, node: &NodeId, message: KeyVersionNegotiationMessage) -> Result<(), Error> { self.cluster.send(node, Message::ServersSetChange(ServersSetChangeMessage::ShareChangeKeyVersionNegotiation(ShareChangeKeyVersionNegotiation { session: self.id.clone().into(), @@ -1343,4 +1360,48 @@ pub mod tests { // check that all sessions have finished assert!(ml.nodes.iter().filter(|&(k, _)| !nodes_to_remove.contains(k)).all(|(_, n)| n.session.is_finished())); } + + #[test] + fn removing_node_from_cluster_of_2_works() { + // initial 2-of-2 session + let gml = generate_key(1, generate_nodes_ids(2)); + let master_node_id = gml.nodes.keys().cloned().nth(0).unwrap(); + + // make 2nd node isolated so that key becomes irrecoverable (make sure the session is completed, even though key is irrecoverable) + let nodes_to_isolate: BTreeSet<_> = gml.nodes.keys().cloned().skip(1).take(1).collect(); + let new_nodes_set: BTreeSet<_> = gml.nodes.keys().cloned().filter(|n| !nodes_to_isolate.contains(&n)).collect(); + let mut ml = MessageLoop::new(&gml, master_node_id, None, BTreeSet::new(), BTreeSet::new(), nodes_to_isolate.clone()); + ml.nodes[&master_node_id].session.initialize(new_nodes_set, ml.all_set_signature.clone(), ml.new_set_signature.clone()).unwrap(); + ml.run(); + + // check that session on master node has completed (session on 2nd node is not even started in network mode) + assert!(ml.nodes.values().take(1).all(|n| n.session.is_finished())); + } + + #[test] + fn adding_node_that_has_lost_its_database_works() { + // initial 2-of-2 session + let gml = generate_key(1, generate_nodes_ids(2)); + let master_node_id = gml.nodes.keys().cloned().nth(0).unwrap(); + + // insert 1 node so that it becames 2-of-3 session + let nodes_to_add: BTreeSet<_> = (0..1).map(|_| Random.generate().unwrap().public().clone()).collect(); + let mut ml = MessageLoop::new(&gml, master_node_id, None, nodes_to_add.clone(), BTreeSet::new(), BTreeSet::new()); + ml.nodes[&master_node_id].session.initialize(ml.nodes.keys().cloned().collect(), ml.all_set_signature.clone(), ml.new_set_signature.clone()).unwrap(); + ml.run(); + + // now let's say new node has lost its db and we're trying to join it again + ml.nodes[nodes_to_add.iter().nth(0).unwrap()].key_storage.clear().unwrap(); + + // this time old nodes have version, where new node is mentioned, but it doesn't report it when negotiating + let mut ml = MessageLoop::new(&gml, master_node_id, None, nodes_to_add, BTreeSet::new(), BTreeSet::new()); + ml.nodes[&master_node_id].session.initialize(ml.nodes.keys().cloned().collect(), ml.all_set_signature.clone(), ml.new_set_signature.clone()).unwrap(); + ml.run(); + + // try to recover secret for every possible combination of nodes && check that secret is the same + check_secret_is_preserved(ml.original_key_pair.clone(), ml.nodes.iter().map(|(k, v)| (k.clone(), v.key_storage.clone())).collect()); + + // check that all sessions have finished + assert!(ml.nodes.values().all(|n| n.session.is_finished())); + } } diff --git a/secret_store/src/key_server_cluster/admin_sessions/share_add_session.rs b/secret_store/src/key_server_cluster/admin_sessions/share_add_session.rs index 4b79473b5..51a027dca 100644 --- a/secret_store/src/key_server_cluster/admin_sessions/share_add_session.rs +++ b/secret_store/src/key_server_cluster/admin_sessions/share_add_session.rs @@ -39,7 +39,7 @@ pub trait SessionTransport: Clone + JobTransport Result<(), Error>; /// Set data for master node (sent to slave nodes in consensus session initialization message). - fn set_master_data(&mut self, consensus_group: BTreeSet, id_numbers: BTreeMap>); + fn set_master_data(&mut self, consensus_group: BTreeSet, version_holders: BTreeSet, id_numbers: BTreeMap>); } /// Share addition session. @@ -86,6 +86,8 @@ struct SessionData { pub version: Option, /// Consensus session. pub consensus_session: Option>, + /// Holders of key version. + pub version_holders: Option>, /// NewKeyShare (for nodes being added). pub new_key_share: Option, /// Nodes id numbers. @@ -144,6 +146,8 @@ pub struct IsolatedSessionTransport { version: Option, /// Session-level nonce. nonce: u64, + /// Holders of key version. + version_holders: Option>, /// Consensus group. consensus_group: Option>, /// Id numbers of all new nodes. @@ -171,6 +175,7 @@ impl SessionImpl where T: SessionTransport { state: SessionState::ConsensusEstablishing, version: None, consensus_session: None, + version_holders: None, new_key_share: None, id_numbers: None, secret_subshares: None, @@ -180,7 +185,7 @@ impl SessionImpl where T: SessionTransport { } /// Set pre-established consensus data. - pub fn set_consensus_output(&self, version: &H256, consensus_group: BTreeSet, mut new_nodes_map: BTreeMap>) -> Result<(), Error> { + pub fn set_consensus_output(&self, version: &H256, consensus_group: BTreeSet, version_holders: BTreeSet, mut new_nodes_map: BTreeMap>) -> Result<(), Error> { let mut data = self.data.lock(); // check state @@ -191,18 +196,30 @@ impl SessionImpl where T: SessionTransport { // key share version is required on ShareAdd master node if let Some(key_share) = self.core.key_share.as_ref() { if let Ok(key_version) = key_share.version(version) { + let non_isolated_nodes = self.core.transport.nodes(); for (node, id_number) in &key_version.id_numbers { { let external_id_number = new_nodes_map.get(node); match external_id_number { Some(&Some(ref external_id_number)) => { + if !version_holders.contains(node) { + // possible when joining version holder, that has lost its database + // and haven't reported version ownership + continue; + } if external_id_number == id_number { continue; } + return Err(Error::ConsensusUnreachable); }, Some(&None) => (), - None => return Err(Error::ConsensusUnreachable), + None => { + if non_isolated_nodes.contains(node) { + return Err(Error::ConsensusUnreachable) + } + continue; + }, } } @@ -217,7 +234,7 @@ impl SessionImpl where T: SessionTransport { } // check passed consensus data - Self::check_nodes_map(&self.core, version, &consensus_group, &new_nodes_map)?; + Self::check_nodes_map(&self.core, version, &consensus_group, &version_holders, &new_nodes_map)?; // update data data.version = Some(version.clone()); @@ -225,6 +242,7 @@ impl SessionImpl where T: SessionTransport { data.secret_subshares = Some(consensus_group.into_iter() .map(|n| (n, None)) .collect()); + data.version_holders = Some(version_holders); Ok(()) } @@ -281,13 +299,14 @@ impl SessionImpl where T: SessionTransport { .take(key_share.threshold) .cloned()) .collect(); + let version_holders = &old_nodes_set; // now check nodes map - Self::check_nodes_map(&self.core, &version, &consensus_group, &new_nodes_map)?; + Self::check_nodes_map(&self.core, &version, &consensus_group, version_holders, &new_nodes_map)?; // prepare consensus session transport let mut consensus_transport = self.core.transport.clone(); - consensus_transport.set_master_data(consensus_group.clone(), new_nodes_map.clone()); + consensus_transport.set_master_data(consensus_group.clone(), version_holders.clone(), new_nodes_map.clone()); // create && initialize consensus session let mut consensus_session = ConsensusSession::new(ConsensusSessionParams { @@ -306,6 +325,7 @@ impl SessionImpl where T: SessionTransport { data.consensus_session = Some(consensus_session); data.id_numbers = Some(new_nodes_map); data.secret_subshares = Some(consensus_group.into_iter().map(|n| (n, None)).collect()); + data.version_holders = Some(version_holders.clone()); Ok(()) } @@ -351,16 +371,17 @@ impl SessionImpl where T: SessionTransport { }; // process consensus message - let (is_establishing_consensus, is_consensus_established, version, new_nodes_map, consensus_group) = { + let (is_establishing_consensus, is_consensus_established, version, new_nodes_map, consensus_group, version_holders) = { let consensus_session = data.consensus_session.as_mut().ok_or(Error::InvalidMessage)?; let is_establishing_consensus = consensus_session.state() == ConsensusSessionState::EstablishingConsensus; - let (version, new_nodes_map, consensus_group) = match &message.message { + let (version, new_nodes_map, consensus_group, version_holders) = match &message.message { &ConsensusMessageOfShareAdd::InitializeConsensusSession(ref message) => { consensus_session.on_consensus_partial_request(sender, ServersSetChangeAccessRequest::from(message))?; let version = message.version.clone().into(); let consensus_group = message.consensus_group.iter().cloned().map(Into::into).collect(); + let version_holders = message.version_holders.iter().cloned().map(Into::into).collect(); let new_nodes_map: BTreeMap<_, _> = message.new_nodes_map.iter() .map(|(n, nn)| (n.clone().into(), Some(nn.clone().into()))) .collect(); @@ -371,13 +392,13 @@ impl SessionImpl where T: SessionTransport { } // check old set of nodes - Self::check_nodes_map(&self.core, &version, &consensus_group, &new_nodes_map)?; + Self::check_nodes_map(&self.core, &version, &consensus_group, &version_holders, &new_nodes_map)?; - (Some(version), Some(new_nodes_map), Some(consensus_group)) + (Some(version), Some(new_nodes_map), Some(consensus_group), Some(version_holders)) }, &ConsensusMessageOfShareAdd::ConfirmConsensusInitialization(ref message) => { consensus_session.on_consensus_partial_response(sender, message.is_confirmed)?; - (None, None, None) + (None, None, None, None) }, }; @@ -387,6 +408,7 @@ impl SessionImpl where T: SessionTransport { version, new_nodes_map, consensus_group, + version_holders, ) }; @@ -400,6 +422,9 @@ impl SessionImpl where T: SessionTransport { if let Some(consensus_group) = consensus_group { data.secret_subshares = Some(consensus_group.into_iter().map(|n| (n, None)).collect()); } + if let Some(version_holders) = version_holders { + data.version_holders = Some(version_holders); + } // if consensus is stablished, proceed if !is_establishing_consensus || !is_consensus_established || self.core.meta.self_node_id != self.core.meta.master_node_id { @@ -451,7 +476,7 @@ impl SessionImpl where T: SessionTransport { }); let id_numbers = data.id_numbers.as_mut() - .expect("common key share data is expected after initialization; id_numers are filled during initialization; qed"); + .expect("common key share data is expected after initialization; id_numbers are filled during initialization; qed"); for (node, id_number) in &message.id_numbers { let id_number: Secret = id_number.clone().into(); { @@ -519,7 +544,7 @@ impl SessionImpl where T: SessionTransport { } /// Check nodes map. - fn check_nodes_map(core: &SessionCore, version: &H256, consensus_group: &BTreeSet, new_nodes_map: &BTreeMap>) -> Result<(), Error> { + fn check_nodes_map(core: &SessionCore, version: &H256, consensus_group: &BTreeSet, version_holders: &BTreeSet, new_nodes_map: &BTreeMap>) -> Result<(), Error> { // check if this node has given version let has_this_version = match core.key_share.as_ref() { Some(key_share) => key_share.version(version).is_ok(), @@ -546,7 +571,7 @@ impl SessionImpl where T: SessionTransport { } // there must be at least one new node in new_nodes_map - if key_version.id_numbers.len() >= new_nodes_map.len() { + if key_version.id_numbers.keys().filter(|n| non_isolated_nodes.contains(n) && version_holders.contains(n)).count() >= new_nodes_map.len() { return Err(Error::ConsensusUnreachable); } }, @@ -607,6 +632,8 @@ impl SessionImpl where T: SessionTransport { let explanation = "disseminate_common_share_data is only called on master node; master node has specified version of the key; qed"; let old_key_share = core.key_share.as_ref().expect(explanation); let old_key_version = old_key_share.version(data.version.as_ref().expect(explanation)).expect(explanation); + let version_holders = data.version_holders.as_ref() + .expect("disseminate_common_share_data is only called on master node; version holders is created during initialization on master node; qed"); let consensus_group = data.secret_subshares.as_ref() .expect("disseminate_common_share_data is only called on master node; consensus group is created during initialization on master node; qed"); let nodes = data.id_numbers.as_ref() @@ -622,7 +649,9 @@ impl SessionImpl where T: SessionTransport { joint_public: old_key_share.public.clone().into(), common_point: old_key_share.common_point.clone().map(Into::into), encrypted_point: old_key_share.encrypted_point.clone().map(Into::into), - id_numbers: old_key_version.id_numbers.iter().map(|(k, v)| (k.clone().into(), v.clone().into())).collect(), + id_numbers: old_key_version.id_numbers.iter() + .filter(|&(k, _)| version_holders.contains(k)) + .map(|(k, v)| (k.clone().into(), v.clone().into())).collect(), }))?; } @@ -765,7 +794,8 @@ impl ClusterSession for SessionImpl where T: SessionTransport { let mut data = self.data.lock(); - warn!("{}: share add session failed: {} on {}", self.core.meta.self_node_id, error, node); + warn!(target: "secretstore_net", "{}: share add session failed: {} on {}", + self.core.meta.self_node_id, error, node); data.state = SessionState::Finished; data.result = Some(Err(error)); @@ -788,6 +818,7 @@ impl IsolatedSessionTransport { nonce: nonce, cluster: cluster, id_numbers: None, + version_holders: None, consensus_group: None, } } @@ -806,6 +837,7 @@ impl JobTransport for IsolatedSessionTransport { session_nonce: self.nonce, message: ConsensusMessageOfShareAdd::InitializeConsensusSession(InitializeConsensusSessionOfShareAdd { version: self.version.clone().expect(explanation).into(), + version_holders: self.version_holders.as_ref().expect(explanation).iter().cloned().map(Into::into).collect(), consensus_group: self.consensus_group.as_ref().expect(explanation).iter().cloned().map(Into::into).collect(), old_nodes_set: request.old_servers_set.into_iter().map(Into::into).collect(), new_nodes_map: request.new_servers_set.into_iter() @@ -836,7 +868,8 @@ impl SessionTransport for IsolatedSessionTransport { self.cluster.nodes() } - fn set_master_data(&mut self, consensus_group: BTreeSet, id_numbers: BTreeMap>) { + fn set_master_data(&mut self, consensus_group: BTreeSet, version_holders: BTreeSet, id_numbers: BTreeMap>) { + self.version_holders = Some(version_holders); self.consensus_group = Some(consensus_group); self.id_numbers = Some(id_numbers); } diff --git a/secret_store/src/key_server_cluster/admin_sessions/share_change_session.rs b/secret_store/src/key_server_cluster/admin_sessions/share_change_session.rs index af16ef2f6..48cb81c13 100644 --- a/secret_store/src/key_server_cluster/admin_sessions/share_change_session.rs +++ b/secret_store/src/key_server_cluster/admin_sessions/share_change_session.rs @@ -48,6 +48,8 @@ pub struct ShareChangeSession { key_storage: Arc, /// Key version. key_version: H256, + /// Nodes that have reported version ownership. + version_holders: Option>, /// Consensus group to use in ShareAdd session. consensus_group: Option>, /// Nodes to add shares for. @@ -63,6 +65,8 @@ pub struct ShareChangeSession { pub struct ShareChangeSessionPlan { /// Key version that plan is valid for. pub key_version: H256, + /// Nodes that have reported version ownership. + pub version_holders: BTreeSet, /// Consensus group to use in ShareAdd session. pub consensus_group: BTreeSet, /// Nodes to add shares for. @@ -102,6 +106,7 @@ impl ShareChangeSession { // we can't create sessions right now, because key share is read when session is created, but it can change in previous session let key_version = params.plan.key_version; let consensus_group = if !params.plan.consensus_group.is_empty() { Some(params.plan.consensus_group) } else { None }; + let version_holders = if !params.plan.version_holders.is_empty() { Some(params.plan.version_holders) } else { None }; let new_nodes_map = if !params.plan.new_nodes_map.is_empty() { Some(params.plan.new_nodes_map) } else { None }; debug_assert!(new_nodes_map.is_some()); @@ -113,6 +118,7 @@ impl ShareChangeSession { cluster: params.cluster, key_storage: params.key_storage, key_version: key_version, + version_holders: version_holders, consensus_group: consensus_group, new_nodes_map: new_nodes_map, share_add_session: None, @@ -158,6 +164,7 @@ impl ShareChangeSession { /// Create new share add session. fn create_share_add_session(&mut self) -> Result<(), Error> { let consensus_group = self.consensus_group.take().ok_or(Error::InvalidStateForRequest)?; + let version_holders = self.version_holders.take().ok_or(Error::InvalidStateForRequest)?; let new_nodes_map = self.new_nodes_map.take().ok_or(Error::InvalidStateForRequest)?; let share_add_session = ShareAddSessionImpl::new(ShareAddSessionParams { meta: self.meta.clone(), @@ -166,7 +173,7 @@ impl ShareChangeSession { key_storage: self.key_storage.clone(), admin_public: None, })?; - share_add_session.set_consensus_output(&self.key_version, consensus_group, new_nodes_map)?; + share_add_session.set_consensus_output(&self.key_version, consensus_group, version_holders, new_nodes_map)?; self.share_add_session = Some(share_add_session); Ok(()) } @@ -221,7 +228,7 @@ impl ShareAddSessionTransport for ShareChangeTransport { self.cluster.nodes() } - fn set_master_data(&mut self, _consensus_group: BTreeSet, _id_numbers: BTreeMap>) { + fn set_master_data(&mut self, _consensus_group: BTreeSet, _version_holders: BTreeSet, _id_numbers: BTreeMap>) { unreachable!("only called when establishing consensus; this transport is never used for establishing consensus; qed") } @@ -242,6 +249,7 @@ pub fn prepare_share_change_session_plan(cluster_nodes: &BTreeSet, thres key_id, threshold, old_key_version_owners.len()); return Ok(ShareChangeSessionPlan { key_version: key_version, + version_holders: Default::default(), consensus_group: Default::default(), new_nodes_map: Default::default(), }); @@ -279,6 +287,7 @@ pub fn prepare_share_change_session_plan(cluster_nodes: &BTreeSet, thres Ok(ShareChangeSessionPlan { key_version: key_version, + version_holders: old_key_version_owners.clone(), consensus_group: consensus_group, new_nodes_map: new_nodes_map, }) diff --git a/secret_store/src/key_server_cluster/client_sessions/decryption_session.rs b/secret_store/src/key_server_cluster/client_sessions/decryption_session.rs index 724d2fe49..9172a03b1 100644 --- a/secret_store/src/key_server_cluster/client_sessions/decryption_session.rs +++ b/secret_store/src/key_server_cluster/client_sessions/decryption_session.rs @@ -812,6 +812,28 @@ impl JobTransport for DecryptionJobTransport { } } +#[cfg(test)] +pub fn create_default_decryption_session() -> Arc { + use acl_storage::DummyAclStorage; + use key_server_cluster::cluster::tests::DummyCluster; + + Arc::new(SessionImpl::new(SessionParams { + meta: SessionMeta { + id: Default::default(), + self_node_id: Default::default(), + master_node_id: Default::default(), + threshold: 0, + configured_nodes_count: 0, + connected_nodes_count: 0, + }, + access_key: Secret::zero(), + key_share: Default::default(), + acl_storage: Arc::new(DummyAclStorage::default()), + cluster: Arc::new(DummyCluster::new(Default::default())), + nonce: 0, + }, Some(Requester::Public(2.into()))).unwrap()) +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/secret_store/src/key_server_cluster/cluster.rs b/secret_store/src/key_server_cluster/cluster.rs index 86de005b7..b48290a4f 100644 --- a/secret_store/src/key_server_cluster/cluster.rs +++ b/secret_store/src/key_server_cluster/cluster.rs @@ -84,6 +84,8 @@ pub trait ClusterClient: Send + Sync { fn add_generation_listener(&self, listener: Arc>); /// Listen for new decryption sessions. fn add_decryption_listener(&self, listener: Arc>); + /// Listen for new key version negotiation sessions. + fn add_key_version_negotiation_listener(&self, listener: Arc>>); /// Ask node to make 'faulty' generation sessions. #[cfg(test)] @@ -198,9 +200,10 @@ pub struct ClusterConnections { pub data: RwLock, } -#[derive(Default)] /// Cluster connections data. pub struct ClusterConnectionsData { + /// Is this node isolated from cluster? + pub is_isolated: bool, /// Active key servers set. pub nodes: BTreeMap, /// Active connections to key servers. @@ -384,6 +387,9 @@ impl ClusterCore { for connection in data.connections.active_connections() { let last_message_diff = Instant::now() - connection.last_message_time(); if last_message_diff > KEEP_ALIVE_DISCONNECT_INTERVAL { + warn!(target: "secretstore_net", "{}: keep alive timeout for node {}", + data.self_key_pair.public(), connection.node_id()); + data.connections.remove(data.clone(), connection.node_id(), connection.is_inbound()); data.sessions.on_connection_timeout(connection.node_id()); } @@ -484,7 +490,7 @@ impl ClusterCore { if is_master_node && session.is_finished() { data.sessions.negotiation_sessions.remove(&session.id()); match session.wait() { - Ok((version, master)) => match session.take_continue_action() { + Ok(Some((version, master))) => match session.take_continue_action() { Some(ContinueAction::Decrypt(session, origin, is_shadow_decryption, is_broadcast_decryption)) => { let initialization_error = if data.self_key_pair.public() == &master { session.initialize(origin, version, is_shadow_decryption, is_broadcast_decryption) @@ -523,18 +529,19 @@ impl ClusterCore { }, None => (), }, + Ok(None) => unreachable!("is_master_node; session is finished; negotiation version always finished with result on master; qed"), Err(error) => match session.take_continue_action() { Some(ContinueAction::Decrypt(session, _, _, _)) => { - data.sessions.decryption_sessions.remove(&session.id()); session.on_session_error(&meta.self_node_id, error); + data.sessions.decryption_sessions.remove(&session.id()); }, Some(ContinueAction::SchnorrSign(session, _)) => { - data.sessions.schnorr_signing_sessions.remove(&session.id()); session.on_session_error(&meta.self_node_id, error); + data.sessions.schnorr_signing_sessions.remove(&session.id()); }, Some(ContinueAction::EcdsaSign(session, _)) => { - data.sessions.ecdsa_signing_sessions.remove(&session.id()); session.on_session_error(&meta.self_node_id, error); + data.sessions.ecdsa_signing_sessions.remove(&session.id()); }, None => (), }, @@ -653,7 +660,7 @@ impl ClusterCore { impl ClusterConnections { pub fn new(config: &ClusterConfiguration) -> Result { let mut nodes = config.key_server_set.snapshot().current_set; - nodes.remove(config.self_key_pair.public()); + let is_isolated = nodes.remove(config.self_key_pair.public()).is_none(); let trigger: Box = match config.auto_migrate_enabled { false => Box::new(SimpleConnectionTrigger::new(config.key_server_set.clone(), config.self_key_pair.clone(), config.admin_public.clone())), @@ -668,6 +675,7 @@ impl ClusterConnections { trigger: Mutex::new(trigger), connector: connector, data: RwLock::new(ClusterConnectionsData { + is_isolated: is_isolated, nodes: nodes, connections: BTreeMap::new(), }), @@ -734,8 +742,13 @@ impl ClusterConnections { self.maintain_connection_trigger(maintain_action, data); } - pub fn connected_nodes(&self) -> BTreeSet { - self.data.read().connections.keys().cloned().collect() + pub fn connected_nodes(&self) -> Result, Error> { + let data = self.data.read(); + if data.is_isolated { + return Err(Error::NodeDisconnected); + } + + Ok(data.connections.keys().cloned().collect()) } pub fn active_connections(&self)-> Vec> { @@ -897,7 +910,7 @@ impl ClusterClientImpl { } fn create_key_version_negotiation_session(&self, session_id: SessionId) -> Result>, Error> { - let mut connected_nodes = self.data.connections.connected_nodes(); + let mut connected_nodes = self.data.connections.connected_nodes()?; connected_nodes.insert(self.data.self_key_pair.public().clone()); let access_key = Random.generate()?.secret().clone(); @@ -934,7 +947,7 @@ impl ClusterClient for ClusterClientImpl { } fn new_generation_session(&self, session_id: SessionId, origin: Option
, author: Address, threshold: usize) -> Result, Error> { - let mut connected_nodes = self.data.connections.connected_nodes(); + let mut connected_nodes = self.data.connections.connected_nodes()?; connected_nodes.insert(self.data.self_key_pair.public().clone()); let cluster = create_cluster_view(&self.data, true)?; @@ -945,7 +958,7 @@ impl ClusterClient for ClusterClientImpl { } fn new_encryption_session(&self, session_id: SessionId, requester: Requester, common_point: Public, encrypted_point: Public) -> Result, Error> { - let mut connected_nodes = self.data.connections.connected_nodes(); + let mut connected_nodes = self.data.connections.connected_nodes()?; connected_nodes.insert(self.data.self_key_pair.public().clone()); let cluster = create_cluster_view(&self.data, true)?; @@ -956,7 +969,7 @@ impl ClusterClient for ClusterClientImpl { } fn new_decryption_session(&self, session_id: SessionId, origin: Option
, requester: Requester, version: Option, is_shadow_decryption: bool, is_broadcast_decryption: bool) -> Result, Error> { - let mut connected_nodes = self.data.connections.connected_nodes(); + let mut connected_nodes = self.data.connections.connected_nodes()?; connected_nodes.insert(self.data.self_key_pair.public().clone()); let access_key = Random.generate()?.secret().clone(); @@ -982,7 +995,7 @@ impl ClusterClient for ClusterClientImpl { } fn new_schnorr_signing_session(&self, session_id: SessionId, requester: Requester, version: Option, message_hash: H256) -> Result, Error> { - let mut connected_nodes = self.data.connections.connected_nodes(); + let mut connected_nodes = self.data.connections.connected_nodes()?; connected_nodes.insert(self.data.self_key_pair.public().clone()); let access_key = Random.generate()?.secret().clone(); @@ -1007,7 +1020,7 @@ impl ClusterClient for ClusterClientImpl { } fn new_ecdsa_signing_session(&self, session_id: SessionId, requester: Requester, version: Option, message_hash: H256) -> Result, Error> { - let mut connected_nodes = self.data.connections.connected_nodes(); + let mut connected_nodes = self.data.connections.connected_nodes()?; connected_nodes.insert(self.data.self_key_pair.public().clone()); let access_key = Random.generate()?.secret().clone(); @@ -1037,7 +1050,7 @@ impl ClusterClient for ClusterClientImpl { } fn new_servers_set_change_session(&self, session_id: Option, migration_id: Option, new_nodes_set: BTreeSet, old_set_signature: Signature, new_set_signature: Signature) -> Result, Error> { - let mut connected_nodes = self.data.connections.connected_nodes(); + let mut connected_nodes = self.data.connections.connected_nodes()?; connected_nodes.insert(self.data.self_key_pair.public().clone()); let session_id = match session_id { @@ -1069,6 +1082,10 @@ impl ClusterClient for ClusterClientImpl { self.data.sessions.decryption_sessions.add_listener(listener); } + fn add_key_version_negotiation_listener(&self, listener: Arc>>) { + self.data.sessions.negotiation_sessions.add_listener(listener); + } + #[cfg(test)] fn connect(&self) { ClusterCore::connect_disconnected_nodes(self.data.clone()); @@ -1153,6 +1170,7 @@ pub mod tests { fn add_generation_listener(&self, _listener: Arc>) {} fn add_decryption_listener(&self, _listener: Arc>) {} + fn add_key_version_negotiation_listener(&self, _listener: Arc>>) {} fn make_faulty_generation_sessions(&self) { unimplemented!("test-only") } fn generation_session(&self, _session_id: &SessionId) -> Option> { unimplemented!("test-only") } @@ -1249,7 +1267,7 @@ pub mod tests { threads: 1, self_key_pair: Arc::new(PlainNodeKeyPair::new(key_pairs[i].clone())), listen_address: ("127.0.0.1".to_owned(), ports_begin + i as u16), - key_server_set: Arc::new(MapKeyServerSet::new(key_pairs.iter().enumerate() + key_server_set: Arc::new(MapKeyServerSet::new(false, key_pairs.iter().enumerate() .map(|(j, kp)| (kp.public().clone(), format!("127.0.0.1:{}", ports_begin + j as u16).parse().unwrap())) .collect())), allow_connecting_to_higher_nodes: false, diff --git a/secret_store/src/key_server_cluster/cluster_sessions.rs b/secret_store/src/key_server_cluster/cluster_sessions.rs index 4dcfd3f88..b34485638 100644 --- a/secret_store/src/key_server_cluster/cluster_sessions.rs +++ b/secret_store/src/key_server_cluster/cluster_sessions.rs @@ -330,10 +330,7 @@ impl ClusterSessionsContainer where S: ClusterSession, SC: C } pub fn remove(&self, session_id: &S::Id) { - if let Some(session) = self.sessions.write().remove(session_id) { - self.container_state.lock().on_session_completed(); - self.notify_listeners(|l| l.on_session_removed(session.session.clone())); - } + self.do_remove(session_id, &mut *self.sessions.write()); } pub fn enqueue_message(&self, session_id: &S::Id, sender: NodeId, message: Message, is_queued_message: bool) { @@ -361,7 +358,7 @@ impl ClusterSessionsContainer where S: ClusterSession, SC: C }; if remove_session { - sessions.remove(&sid); + self.do_remove(&sid, &mut *sessions); } } } @@ -374,12 +371,20 @@ impl ClusterSessionsContainer where S: ClusterSession, SC: C session.session.on_node_timeout(node_id); session.session.is_finished() }; + if remove_session { - sessions.remove(&sid); + self.do_remove(&sid, &mut *sessions); } } } + fn do_remove(&self, session_id: &S::Id, sessions: &mut BTreeMap>) { + if let Some(session) = sessions.remove(session_id) { + self.container_state.lock().on_session_completed(); + self.notify_listeners(|l| l.on_session_removed(session.session.clone())); + } + } + fn notify_listeners) -> ()>(&self, callback: F) { let mut listeners = self.listeners.lock(); let mut listener_index = 0; @@ -554,7 +559,7 @@ pub fn create_cluster_view(data: &Arc, requires_all_connections: bo } } - let mut connected_nodes = data.connections.connected_nodes(); + let mut connected_nodes = data.connections.connected_nodes()?; connected_nodes.insert(data.self_key_pair.public().clone()); let connected_nodes_count = connected_nodes.len(); @@ -571,7 +576,8 @@ mod tests { use key_server_cluster::connection_trigger::SimpleServersSetChangeSessionCreatorConnector; use key_server_cluster::cluster::tests::DummyCluster; use key_server_cluster::generation_session::{SessionImpl as GenerationSession}; - use super::{ClusterSessions, AdminSessionCreationData, ClusterSessionsListener}; + use super::{ClusterSessions, AdminSessionCreationData, ClusterSessionsListener, + ClusterSessionsContainerState, SESSION_TIMEOUT_INTERVAL}; pub fn make_cluster_sessions() -> ClusterSessions { let key_pair = Random.generate().unwrap(); @@ -579,7 +585,7 @@ mod tests { threads: 1, self_key_pair: Arc::new(PlainNodeKeyPair::new(key_pair.clone())), listen_address: ("127.0.0.1".to_owned(), 100_u16), - key_server_set: Arc::new(MapKeyServerSet::new(vec![(key_pair.public().clone(), format!("127.0.0.1:{}", 100).parse().unwrap())].into_iter().collect())), + key_server_set: Arc::new(MapKeyServerSet::new(false, vec![(key_pair.public().clone(), format!("127.0.0.1:{}", 100).parse().unwrap())].into_iter().collect())), allow_connecting_to_higher_nodes: false, key_storage: Arc::new(DummyKeyStorage::default()), acl_storage: Arc::new(DummyAclStorage::default()), @@ -644,4 +650,41 @@ mod tests { assert_eq!(listener.inserted.load(Ordering::Relaxed), 1); assert_eq!(listener.removed.load(Ordering::Relaxed), 1); } + + #[test] + fn last_session_removal_sets_container_state_to_idle() { + let sessions = make_cluster_sessions(); + + sessions.generation_sessions.insert(Arc::new(DummyCluster::new(Default::default())), Default::default(), Default::default(), None, false, None).unwrap(); + assert_eq!(*sessions.generation_sessions.container_state.lock(), ClusterSessionsContainerState::Active(1)); + + sessions.generation_sessions.remove(&Default::default()); + assert_eq!(*sessions.generation_sessions.container_state.lock(), ClusterSessionsContainerState::Idle); + } + + #[test] + fn last_session_removal_by_timeout_sets_container_state_to_idle() { + let sessions = make_cluster_sessions(); + + sessions.generation_sessions.insert(Arc::new(DummyCluster::new(Default::default())), Default::default(), Default::default(), None, false, None).unwrap(); + assert_eq!(*sessions.generation_sessions.container_state.lock(), ClusterSessionsContainerState::Active(1)); + + sessions.generation_sessions.sessions.write().get_mut(&Default::default()).unwrap().last_message_time -= SESSION_TIMEOUT_INTERVAL * 2; + + sessions.generation_sessions.stop_stalled_sessions(); + assert_eq!(sessions.generation_sessions.sessions.read().len(), 0); + assert_eq!(*sessions.generation_sessions.container_state.lock(), ClusterSessionsContainerState::Idle); + } + + #[test] + fn last_session_removal_by_node_timeout_sets_container_state_to_idle() { + let sessions = make_cluster_sessions(); + + sessions.generation_sessions.insert(Arc::new(DummyCluster::new(Default::default())), Default::default(), Default::default(), None, false, None).unwrap(); + assert_eq!(*sessions.generation_sessions.container_state.lock(), ClusterSessionsContainerState::Active(1)); + + sessions.generation_sessions.on_connection_timeout(&Default::default()); + assert_eq!(sessions.generation_sessions.sessions.read().len(), 0); + assert_eq!(*sessions.generation_sessions.container_state.lock(), ClusterSessionsContainerState::Idle); + } } diff --git a/secret_store/src/key_server_cluster/cluster_sessions_creator.rs b/secret_store/src/key_server_cluster/cluster_sessions_creator.rs index e1b5125ac..15192542b 100644 --- a/secret_store/src/key_server_cluster/cluster_sessions_creator.rs +++ b/secret_store/src/key_server_cluster/cluster_sessions_creator.rs @@ -353,6 +353,9 @@ impl ClusterSessionCreator) { if !required_set.contains_key(self_node_id) { - trace!(target: "secretstore_net", "{}: isolated from cluser", self_node_id); + if !data.is_isolated { + trace!(target: "secretstore_net", "{}: isolated from cluser", self_node_id); + } + + data.is_isolated = true; data.connections.clear(); data.nodes.clear(); return; } + data.is_isolated = false; for node_to_disconnect in select_nodes_to_disconnect(&data.nodes, required_set) { if let Entry::Occupied(entry) = data.connections.entry(node_to_disconnect.clone()) { - trace!(target: "secretstore_net", "{}: removing connection to {} at {}", + trace!(target: "secretstore_net", "{}: adjusting connections - removing connection to {} at {}", self_node_id, entry.get().node_id(), entry.get().node_address()); entry.remove(); } @@ -204,6 +209,14 @@ mod tests { use super::{Maintain, TriggerConnections, ConnectionsAction, ConnectionTrigger, SimpleConnectionTrigger, select_nodes_to_disconnect, adjust_connections}; + fn default_connection_data() -> ClusterConnectionsData { + ClusterConnectionsData { + is_isolated: false, + nodes: Default::default(), + connections: Default::default(), + } + } + fn create_connections() -> TriggerConnections { TriggerConnections { self_key_pair: Arc::new(PlainNodeKeyPair::new(Random.generate().unwrap())), @@ -252,59 +265,64 @@ mod tests { fn adjust_connections_disconnects_from_all_nodes_if_not_a_part_of_key_server() { let self_node_id = Random.generate().unwrap().public().clone(); let other_node_id = Random.generate().unwrap().public().clone(); - let mut connection_data: ClusterConnectionsData = Default::default(); + let mut connection_data = default_connection_data(); connection_data.nodes.insert(other_node_id.clone(), "127.0.0.1:8081".parse().unwrap()); let required_set = connection_data.nodes.clone(); adjust_connections(&self_node_id, &mut connection_data, &required_set); assert!(connection_data.nodes.is_empty()); + assert!(connection_data.is_isolated); } #[test] fn adjust_connections_connects_to_new_nodes() { let self_node_id = Random.generate().unwrap().public().clone(); let other_node_id = Random.generate().unwrap().public().clone(); - let mut connection_data: ClusterConnectionsData = Default::default(); + let mut connection_data = default_connection_data(); let required_set = vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap()), (other_node_id.clone(), "127.0.0.1:8082".parse().unwrap())].into_iter().collect(); adjust_connections(&self_node_id, &mut connection_data, &required_set); assert!(connection_data.nodes.contains_key(&other_node_id)); + assert!(!connection_data.is_isolated); } #[test] fn adjust_connections_reconnects_from_changed_nodes() { let self_node_id = Random.generate().unwrap().public().clone(); let other_node_id = Random.generate().unwrap().public().clone(); - let mut connection_data: ClusterConnectionsData = Default::default(); + let mut connection_data = default_connection_data(); connection_data.nodes.insert(other_node_id.clone(), "127.0.0.1:8082".parse().unwrap()); let required_set = vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap()), (other_node_id.clone(), "127.0.0.1:8083".parse().unwrap())].into_iter().collect(); adjust_connections(&self_node_id, &mut connection_data, &required_set); assert_eq!(connection_data.nodes.get(&other_node_id), Some(&"127.0.0.1:8083".parse().unwrap())); + assert!(!connection_data.is_isolated); } #[test] fn adjust_connections_disconnects_from_removed_nodes() { let self_node_id = Random.generate().unwrap().public().clone(); let other_node_id = Random.generate().unwrap().public().clone(); - let mut connection_data: ClusterConnectionsData = Default::default(); + let mut connection_data = default_connection_data(); connection_data.nodes.insert(other_node_id.clone(), "127.0.0.1:8082".parse().unwrap()); let required_set = vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(); adjust_connections(&self_node_id, &mut connection_data, &required_set); assert!(connection_data.nodes.is_empty()); + assert!(!connection_data.is_isolated); } #[test] fn adjust_connections_does_not_connects_to_self() { let self_node_id = Random.generate().unwrap().public().clone(); - let mut connection_data: ClusterConnectionsData = Default::default(); + let mut connection_data = default_connection_data(); let required_set = vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(); adjust_connections(&self_node_id, &mut connection_data, &required_set); assert!(connection_data.nodes.is_empty()); + assert!(!connection_data.is_isolated); } #[test] @@ -315,7 +333,7 @@ mod tests { let migration_node_id = Random.generate().unwrap().public().clone(); let new_node_id = Random.generate().unwrap().public().clone(); - let mut connections_data: ClusterConnectionsData = Default::default(); + let mut connections_data = default_connection_data(); connections.maintain(ConnectionsAction::ConnectToCurrentSet, &mut connections_data, &KeyServerSetSnapshot { current_set: vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap()), (current_node_id.clone(), "127.0.0.1:8082".parse().unwrap())].into_iter().collect(), @@ -337,7 +355,7 @@ mod tests { let migration_node_id = Random.generate().unwrap().public().clone(); let new_node_id = Random.generate().unwrap().public().clone(); - let mut connections_data: ClusterConnectionsData = Default::default(); + let mut connections_data = default_connection_data(); connections.maintain(ConnectionsAction::ConnectToMigrationSet, &mut connections_data, &KeyServerSetSnapshot { current_set: vec![(current_node_id.clone(), "127.0.0.1:8082".parse().unwrap())].into_iter().collect(), new_set: vec![(new_node_id.clone(), "127.0.0.1:8083".parse().unwrap())].into_iter().collect(), @@ -354,7 +372,7 @@ mod tests { #[test] fn simple_connections_trigger_only_maintains_connections() { - let key_server_set = Arc::new(MapKeyServerSet::new(Default::default())); + let key_server_set = Arc::new(MapKeyServerSet::new(false, Default::default())); let self_key_pair = Arc::new(PlainNodeKeyPair::new(Random.generate().unwrap())); let mut trigger = SimpleConnectionTrigger::new(key_server_set, self_key_pair, None); assert_eq!(trigger.on_maintain(), Some(Maintain::Connections)); diff --git a/secret_store/src/key_server_cluster/jobs/key_access_job.rs b/secret_store/src/key_server_cluster/jobs/key_access_job.rs index 6a0577f02..7ded90afa 100644 --- a/secret_store/src/key_server_cluster/jobs/key_access_job.rs +++ b/secret_store/src/key_server_cluster/jobs/key_access_job.rs @@ -79,7 +79,6 @@ impl JobExecutor for KeyAccessJob { self.requester = Some(partial_request.clone()); self.acl_storage.check(partial_request.address(&self.id).map_err(Error::InsufficientRequesterData)?, &self.id) - .map_err(|_| Error::AccessDenied) .map(|is_confirmed| if is_confirmed { JobPartialRequestAction::Respond(true) } else { JobPartialRequestAction::Reject(false) }) } diff --git a/secret_store/src/key_server_cluster/message.rs b/secret_store/src/key_server_cluster/message.rs index 8aecdc9dd..1bc487eb7 100644 --- a/secret_store/src/key_server_cluster/message.rs +++ b/secret_store/src/key_server_cluster/message.rs @@ -429,6 +429,8 @@ pub struct InitializeConsensusSessionWithServersSet { pub struct InitializeConsensusSessionOfShareAdd { /// Key version. pub version: SerializableH256, + /// Nodes that have reported version ownership. + pub version_holders: BTreeSet, /// threshold+1 nodes from old_nodes_set selected for shares redistribution. pub consensus_group: BTreeSet, /// Old nodes set: all non-isolated owners of selected key share version. @@ -876,6 +878,8 @@ pub struct InitializeShareChangeSession { pub key_id: MessageSessionId, /// Key vesion to use in ShareAdd session. pub version: SerializableH256, + /// Nodes that have confirmed version ownership. + pub version_holders: BTreeSet, /// Master node. pub master_node_id: MessageNodeId, /// Consensus group to use in ShareAdd session. @@ -1039,6 +1043,16 @@ pub struct KeyVersionsError { pub session_nonce: u64, /// Error message. pub error: Error, + /// Continue action from failed node (if any). This field is oly filled + /// when error has occured when trying to compute result on master node. + pub continue_with: Option, +} + +/// Key version continue action from failed node. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum FailedKeyVersionContinueAction { + /// Decryption session: origin + requester. + Decrypt(Option, SerializableAddress), } impl Message { @@ -1059,6 +1073,7 @@ impl Message { _ => false }, Message::KeyVersionNegotiation(KeyVersionNegotiationMessage::RequestKeyVersions(_)) => true, + Message::KeyVersionNegotiation(KeyVersionNegotiationMessage::KeyVersionsError(ref msg)) if msg.continue_with.is_some() => true, Message::ShareAdd(ShareAddMessage::ShareAddConsensusMessage(ref msg)) => match msg.message { ConsensusMessageOfShareAdd::InitializeConsensusSession(_) => true, _ => false diff --git a/secret_store/src/key_server_set.rs b/secret_store/src/key_server_set.rs index cf95e917a..7bae27017 100644 --- a/secret_store/src/key_server_set.rs +++ b/secret_store/src/key_server_set.rs @@ -19,16 +19,13 @@ use std::net::SocketAddr; use std::collections::{BTreeMap, HashSet}; use std::time::Duration; use parking_lot::Mutex; -use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify, ChainRoute, CallContract, RegistryInfo}; -use ethcore::filter::Filter; +use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify, ChainRoute, CallContract}; use ethkey::public_to_address; -use hash::keccak; use ethereum_types::{H256, Address}; use bytes::Bytes; use types::{Error, Public, NodeAddress, NodeId}; use trusted_client::TrustedClient; -use helpers::{get_confirmed_block_hash, REQUEST_CONFIRMATIONS_REQUIRED}; -use {NodeKeyPair}; +use {NodeKeyPair, ContractAddress}; use_contract!(key_server, "KeyServerSet", "res/key_server_set.json"); @@ -39,22 +36,6 @@ const MIGRATION_CONFIRMATIONS_REQUIRED: u64 = 5; /// Number of blocks before the same-migration transaction (be it start or confirmation) will be retried. const TRANSACTION_RETRY_INTERVAL_BLOCKS: u64 = 30; -/// Key server has been added to the set. -const ADDED_EVENT_NAME: &'static [u8] = &*b"KeyServerAdded(address)"; -/// Key server has been removed from the set. -const REMOVED_EVENT_NAME: &'static [u8] = &*b"KeyServerRemoved(address)"; -/// Migration has started. -const MIGRATION_STARTED_EVENT_NAME: &'static [u8] = &*b"MigrationStarted()"; -/// Migration has completed. -const MIGRATION_COMPLETED_EVENT_NAME: &'static [u8] = &*b"MigrationCompleted()"; - -lazy_static! { - static ref ADDED_EVENT_NAME_HASH: H256 = keccak(ADDED_EVENT_NAME); - static ref REMOVED_EVENT_NAME_HASH: H256 = keccak(REMOVED_EVENT_NAME); - static ref MIGRATION_STARTED_EVENT_NAME_HASH: H256 = keccak(MIGRATION_STARTED_EVENT_NAME); - static ref MIGRATION_COMPLETED_EVENT_NAME_HASH: H256 = keccak(MIGRATION_COMPLETED_EVENT_NAME); -} - #[derive(Default, Debug, Clone, PartialEq)] /// Key Server Set state. pub struct KeyServerSetSnapshot { @@ -81,6 +62,8 @@ pub struct KeyServerSetMigration { /// Key Server Set pub trait KeyServerSet: Send + Sync { + /// Is this node currently isolated from the set? + fn is_isolated(&self) -> bool; /// Get server set state. fn snapshot(&self) -> KeyServerSetSnapshot; /// Start migration. @@ -117,7 +100,9 @@ struct PreviousMigrationTransaction { struct CachedContract { /// Blockchain client. client: TrustedClient, - /// Contract address. + /// Contract address source. + contract_address_source: Option, + /// Current contract address. contract_address: Option
, /// Contract interface. contract: key_server::KeyServerSet, @@ -136,10 +121,10 @@ struct CachedContract { } impl OnChainKeyServerSet { - pub fn new(trusted_client: TrustedClient, self_key_pair: Arc, auto_migrate_enabled: bool, key_servers: BTreeMap) -> Result, Error> { + pub fn new(trusted_client: TrustedClient, contract_address_source: Option, self_key_pair: Arc, auto_migrate_enabled: bool, key_servers: BTreeMap) -> Result, Error> { let client = trusted_client.get_untrusted(); let key_server_set = Arc::new(OnChainKeyServerSet { - contract: Mutex::new(CachedContract::new(trusted_client, self_key_pair, auto_migrate_enabled, key_servers)?), + contract: Mutex::new(CachedContract::new(trusted_client, contract_address_source, self_key_pair, auto_migrate_enabled, key_servers)?), }); client .ok_or_else(|| Error::Internal("Constructing OnChainKeyServerSet without active Client".into()))? @@ -149,6 +134,10 @@ impl OnChainKeyServerSet { } impl KeyServerSet for OnChainKeyServerSet { + fn is_isolated(&self) -> bool { + self.contract.lock().is_isolated() + } + fn snapshot(&self) -> KeyServerSetSnapshot { self.contract.lock().snapshot() } @@ -244,16 +233,21 @@ impl ) -> Result, String>> KeyServerSubset for NewKeySe } impl CachedContract { - pub fn new(client: TrustedClient, self_key_pair: Arc, auto_migrate_enabled: bool, key_servers: BTreeMap) -> Result { - let server_set = key_servers.into_iter() - .map(|(p, addr)| { - let addr = format!("{}:{}", addr.address, addr.port).parse() - .map_err(|err| Error::Internal(format!("error parsing node address: {}", err)))?; - Ok((p, addr)) - }) - .collect::, Error>>()?; - Ok(CachedContract { + pub fn new(client: TrustedClient, contract_address_source: Option, self_key_pair: Arc, auto_migrate_enabled: bool, key_servers: BTreeMap) -> Result { + let server_set = match contract_address_source.is_none() { + true => key_servers.into_iter() + .map(|(p, addr)| { + let addr = format!("{}:{}", addr.address, addr.port).parse() + .map_err(|err| Error::Internal(format!("error parsing node address: {}", err)))?; + Ok((p, addr)) + }) + .collect::, Error>>()?, + false => Default::default(), + }; + + let mut contract = CachedContract { client: client, + contract_address_source: contract_address_source, contract_address: None, contract: key_server::KeyServerSet::default(), auto_migrate_enabled: auto_migrate_enabled, @@ -266,19 +260,46 @@ impl CachedContract { ..Default::default() }, self_key_pair: self_key_pair, - }) + }; + contract.update_contract_address(); + + Ok(contract) + } + + pub fn update_contract_address(&mut self) { + if let Some(ref contract_address_source) = self.contract_address_source { + let contract_address = self.client.read_contract_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.into(), contract_address_source); + if contract_address != self.contract_address { + trace!(target: "secretstore", "{}: Configuring for key server set contract from address {:?}", + self.self_key_pair.public(), contract_address); + + self.contract_address = contract_address; + } + } } pub fn update(&mut self, enacted: Vec, retracted: Vec) { + // no need to update when servers set is hardcoded + if self.contract_address_source.is_none() { + return; + } + if let Some(client) = self.client.get() { - // read new snapshot from registry (if something has changed) - self.read_from_registry_if_required(&*client, enacted, retracted); + // read new snapshot from reqistry (if something has chnaged) + if !enacted.is_empty() || !retracted.is_empty() { + self.update_contract_address(); + self.read_from_registry(&*client); + } // update number of confirmations (if there's future new set) self.update_number_of_confirmations_if_required(&*client); } } + fn is_isolated(&self) -> bool { + !self.snapshot.current_set.contains_key(self.self_key_pair.public()) + } + fn snapshot(&self) -> KeyServerSetSnapshot { self.snapshot.clone() } @@ -295,12 +316,11 @@ impl CachedContract { let transaction_data = self.contract.functions().start_migration().input(migration_id); // send transaction - if let Err(error) = self.client.transact_contract(*contract_address, transaction_data) { - warn!(target: "secretstore_net", "{}: failed to submit auto-migration start transaction: {}", - self.self_key_pair.public(), error); - } else { - trace!(target: "secretstore_net", "{}: sent auto-migration start transaction", - self.self_key_pair.public()); + match self.client.transact_contract(*contract_address, transaction_data) { + Ok(_) => trace!(target: "secretstore_net", "{}: sent auto-migration start transaction", + self.self_key_pair.public()), + Err(error) => warn!(target: "secretstore_net", "{}: failed to submit auto-migration start transaction: {}", + self.self_key_pair.public(), error), } } } @@ -317,55 +337,16 @@ impl CachedContract { let transaction_data = self.contract.functions().confirm_migration().input(migration_id); // send transaction - if let Err(error) = self.client.transact_contract(contract_address, transaction_data) { - warn!(target: "secretstore_net", "{}: failed to submit auto-migration confirmation transaction: {}", - self.self_key_pair.public(), error); - } else { - trace!(target: "secretstore_net", "{}: sent auto-migration confirm transaction", - self.self_key_pair.public()); + match self.client.transact_contract(contract_address, transaction_data) { + Ok(_) => trace!(target: "secretstore_net", "{}: sent auto-migration confirm transaction", + self.self_key_pair.public()), + Err(error) => warn!(target: "secretstore_net", "{}: failed to submit auto-migration confirmation transaction: {}", + self.self_key_pair.public(), error), } } } - fn read_from_registry_if_required(&mut self, client: &Client, enacted: Vec, retracted: Vec) { - // read new contract from registry - let new_contract_addr = get_confirmed_block_hash(&*client, REQUEST_CONFIRMATIONS_REQUIRED).and_then(|block_hash| client.registry_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.to_owned(), BlockId::Hash(block_hash))); - - // new contract installed => read nodes set from the contract - if self.contract_address.as_ref() != new_contract_addr.as_ref() { - self.read_from_registry(&*client, new_contract_addr); - return; - } - - // check for contract events - let is_set_changed = self.contract_address.is_some() && enacted.iter() - .chain(retracted.iter()) - .any(|block_hash| !client.logs(Filter { - from_block: BlockId::Hash(block_hash.clone()), - to_block: BlockId::Hash(block_hash.clone()), - address: self.contract_address.map(|address| vec![address]), - topics: vec![ - Some(vec![*ADDED_EVENT_NAME_HASH, *REMOVED_EVENT_NAME_HASH, - *MIGRATION_STARTED_EVENT_NAME_HASH, *MIGRATION_COMPLETED_EVENT_NAME_HASH]), - None, - None, - None, - ], - limit: Some(1), - }).is_empty()); - - // to simplify processing - just re-read the whole nodes set from the contract - if is_set_changed { - self.read_from_registry(&*client, new_contract_addr); - } - } - - fn read_from_registry(&mut self, client: &Client, new_contract_address: Option
) { - if let Some(ref contract_addr) = new_contract_address { - trace!(target: "secretstore", "Configuring for key server set contract from {}", contract_addr); - } - self.contract_address = new_contract_address; - + fn read_from_registry(&mut self, client: &Client) { let contract_address = match self.contract_address { Some(contract_address) => contract_address, None => { @@ -505,7 +486,7 @@ fn update_future_set(future_new_set: &mut Option, new_snapshot: &m return; } - // new no migration is required => no need to delay visibility + // no migration is required => no need to delay visibility if !is_migration_required(&new_snapshot.current_set, &new_snapshot.new_set) { *future_new_set = None; return; @@ -602,18 +583,24 @@ pub mod tests { #[derive(Default)] pub struct MapKeyServerSet { + is_isolated: bool, nodes: BTreeMap, } impl MapKeyServerSet { - pub fn new(nodes: BTreeMap) -> Self { + pub fn new(is_isolated: bool, nodes: BTreeMap) -> Self { MapKeyServerSet { + is_isolated: is_isolated, nodes: nodes, } } } impl KeyServerSet for MapKeyServerSet { + fn is_isolated(&self) -> bool { + self.is_isolated + } + fn snapshot(&self) -> KeyServerSetSnapshot { KeyServerSetSnapshot { current_set: self.nodes.clone(), diff --git a/secret_store/src/key_storage.rs b/secret_store/src/key_storage.rs index f5d6df801..f19630c5c 100644 --- a/secret_store/src/key_storage.rs +++ b/secret_store/src/key_storage.rs @@ -466,7 +466,6 @@ pub mod tests { #[test] fn persistent_key_storage() { let tempdir = TempDir::new("").unwrap(); - let key1 = ServerKeyId::from(1); let value1 = DocumentKeyShare { author: Default::default(), diff --git a/secret_store/src/lib.rs b/secret_store/src/lib.rs index 404c278d5..74cde2c5a 100644 --- a/secret_store/src/lib.rs +++ b/secret_store/src/lib.rs @@ -82,16 +82,15 @@ pub use traits::{NodeKeyPair, KeyServer}; pub use self::node_key_pair::{PlainNodeKeyPair, KeyStoreNodeKeyPair}; /// Start new key server instance -pub fn start(client: Arc, sync: Arc, miner: Arc, self_key_pair: Arc, config: ServiceConfiguration, db: Arc) -> Result, Error> { +pub fn start(client: Arc, sync: Arc, miner: Arc, self_key_pair: Arc, mut config: ServiceConfiguration, db: Arc) -> Result, Error> { let trusted_client = trusted_client::TrustedClient::new(self_key_pair.clone(), client.clone(), sync, miner); - let acl_storage: Arc = if config.acl_check_enabled { - acl_storage::OnChainAclStorage::new(trusted_client.clone())? - } else { - Arc::new(acl_storage::DummyAclStorage::default()) - }; + let acl_storage: Arc = match config.acl_check_contract_address.take() { + Some(acl_check_contract_address) => acl_storage::OnChainAclStorage::new(trusted_client.clone(), acl_check_contract_address)?, + None => Arc::new(acl_storage::DummyAclStorage::default()), + }; - let key_server_set = key_server_set::OnChainKeyServerSet::new(trusted_client.clone(), self_key_pair.clone(), - config.cluster_config.auto_migrate_enabled, config.cluster_config.nodes.clone())?; + let key_server_set = key_server_set::OnChainKeyServerSet::new(trusted_client.clone(), config.cluster_config.key_server_set_contract_address.take(), + self_key_pair.clone(), config.cluster_config.auto_migrate_enabled, config.cluster_config.nodes.clone())?; let key_storage = Arc::new(key_storage::PersistentKeyStorage::new(db)?); let key_server = Arc::new(key_server::KeyServerImpl::new(&config.cluster_config, key_server_set.clone(), self_key_pair.clone(), acl_storage.clone(), key_storage.clone())?); let cluster = key_server.cluster(); diff --git a/secret_store/src/listener/service_contract.rs b/secret_store/src/listener/service_contract.rs index 72c23b86b..daf70cd64 100644 --- a/secret_store/src/listener/service_contract.rs +++ b/secret_store/src/listener/service_contract.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use parking_lot::RwLock; use ethabi::RawLog; use ethcore::filter::Filter; -use ethcore::client::{Client, BlockChainClient, BlockId, RegistryInfo, CallContract}; +use ethcore::client::{Client, BlockChainClient, BlockId, CallContract}; use ethkey::{Public, public_to_address}; use hash::keccak; use bytes::Bytes; @@ -99,8 +99,8 @@ pub struct OnChainServiceContract { self_key_pair: Arc, /// Contract registry name (if any). name: String, - /// Contract address. - address: ContractAddress, + /// Contract address source. + address_source: ContractAddress, /// Contract. contract: service::Service, /// Contract. @@ -109,8 +109,8 @@ pub struct OnChainServiceContract { /// On-chain service contract data. struct ServiceData { - /// Actual contract address. - pub contract_address: Address, + /// Current contract address. + pub contract_address: Option
, /// Last block we have read logs from. pub last_log_block: Option, } @@ -136,38 +136,26 @@ struct DocumentKeyShadowRetrievalService; impl OnChainServiceContract { /// Create new on-chain service contract. - pub fn new(mask: ApiMask, client: TrustedClient, name: String, address: ContractAddress, self_key_pair: Arc) -> Self { - let contract_addr = match address { - ContractAddress::Registry => client.get().and_then(|c| c.registry_address(name.clone(), BlockId::Latest) - .map(|address| { - trace!(target: "secretstore", "{}: installing {} service contract from address {}", - self_key_pair.public(), name, address); - address - })) - .unwrap_or_default(), - ContractAddress::Address(ref address) => { - trace!(target: "secretstore", "{}: installing service contract from address {}", - self_key_pair.public(), address); - address.clone() - }, - }; - - OnChainServiceContract { + pub fn new(mask: ApiMask, client: TrustedClient, name: String, address_source: ContractAddress, self_key_pair: Arc) -> Self { + let contract = OnChainServiceContract { mask: mask, client: client, self_key_pair: self_key_pair, name: name, - address: address, + address_source: address_source, contract: service::Service::default(), data: RwLock::new(ServiceData { - contract_address: contract_addr, + contract_address: None, last_log_block: None, }), - } + }; + + contract.update_contract_address(); + contract } /// Send transaction to the service contract. - fn send_contract_transaction(&self, origin: &Address, server_key_id: &ServerKeyId, is_response_required: C, prepare_tx: P) -> Result<(), String> + fn send_contract_transaction(&self, tx_name: &str, origin: &Address, server_key_id: &ServerKeyId, is_response_required: C, prepare_tx: P) -> Result<(), String> where C: FnOnce(&Client, &Address, &service::Service, &ServerKeyId, &Address) -> bool, P: FnOnce(&Client, &Address, &service::Service) -> Result { // only publish if contract address is set && client is online @@ -193,6 +181,9 @@ impl OnChainServiceContract { transaction_data ).map_err(|e| format!("{}", e))?; + trace!(target: "secretstore", "{}: transaction {} sent to service contract", + self.self_key_pair.public(), tx_name); + Ok(()) } @@ -228,26 +219,25 @@ impl OnChainServiceContract { .ok() .unwrap_or_else(|| Box::new(::std::iter::empty())) } + + /// Update service contract address. + fn update_contract_address(&self) -> bool { + let contract_address = self.client.read_contract_address(self.name.clone(), &self.address_source); + let mut data = self.data.write(); + if contract_address != data.contract_address { + trace!(target: "secretstore", "{}: installing {} service contract from address {:?}", + self.self_key_pair.public(), self.name, contract_address); + + data.contract_address = contract_address; + } + + data.contract_address.is_some() + } } impl ServiceContract for OnChainServiceContract { fn update(&self) -> bool { - if let &ContractAddress::Registry = &self.address { - if let Some(client) = self.client.get() { - if let Some(block_hash) = get_confirmed_block_hash(&*client, REQUEST_CONFIRMATIONS_REQUIRED) { - // update contract address from registry - let service_contract_addr = client.registry_address(self.name.clone(), BlockId::Hash(block_hash)).unwrap_or_default(); - if self.data.read().contract_address != service_contract_addr { - trace!(target: "secretstore", "{}: installing {} service contract from address {}", - self.self_key_pair.public(), self.name, service_contract_addr); - self.data.write().contract_address = service_contract_addr; - } - } - } - } - - self.data.read().contract_address != Default::default() - && self.client.get().is_some() + self.update_contract_address() && self.client.get().is_some() } fn read_logs(&self) -> Box> { @@ -263,7 +253,10 @@ impl ServiceContract for OnChainServiceContract { // prepare range of blocks to read logs from let (address, first_block, last_block) = { let mut data = self.data.write(); - let address = data.contract_address; + let address = match data.contract_address { + Some(address) => address, + None => return Box::new(::std::iter::empty()), // no contract installed + }; let confirmed_block = match get_confirmed_block_hash(&*client, REQUEST_CONFIRMATIONS_REQUIRED) { Some(confirmed_block) => confirmed_block, None => return Box::new(::std::iter::empty()), // no block with enough confirmations @@ -326,31 +319,31 @@ impl ServiceContract for OnChainServiceContract { // we only need requests that are here for more than REQUEST_CONFIRMATIONS_REQUIRED blocks // => we're reading from Latest - (REQUEST_CONFIRMATIONS_REQUIRED + 1) block let data = self.data.read(); - match data.contract_address == Default::default() { - true => Box::new(::std::iter::empty()), - false => get_confirmed_block_hash(&*client, REQUEST_CONFIRMATIONS_REQUIRED + 1) + match data.contract_address { + None => Box::new(::std::iter::empty()), + Some(contract_address) => get_confirmed_block_hash(&*client, REQUEST_CONFIRMATIONS_REQUIRED + 1) .map(|b| { let block = BlockId::Hash(b); let iter = match self.mask.server_key_generation_requests { - true => Box::new(self.create_pending_requests_iterator(client.clone(), &data.contract_address, &block, + true => Box::new(self.create_pending_requests_iterator(client.clone(), &contract_address, &block, &ServerKeyGenerationService::read_pending_requests_count, &ServerKeyGenerationService::read_pending_request)) as Box>, false => Box::new(::std::iter::empty()), }; let iter = match self.mask.server_key_retrieval_requests { - true => Box::new(iter.chain(self.create_pending_requests_iterator(client.clone(), &data.contract_address, &block, + true => Box::new(iter.chain(self.create_pending_requests_iterator(client.clone(), &contract_address, &block, &ServerKeyRetrievalService::read_pending_requests_count, &ServerKeyRetrievalService::read_pending_request))), false => iter, }; let iter = match self.mask.document_key_store_requests { - true => Box::new(iter.chain(self.create_pending_requests_iterator(client.clone(), &data.contract_address, &block, + true => Box::new(iter.chain(self.create_pending_requests_iterator(client.clone(), &contract_address, &block, &DocumentKeyStoreService::read_pending_requests_count, &DocumentKeyStoreService::read_pending_request))), false => iter, }; let iter = match self.mask.document_key_shadow_retrieval_requests { - true => Box::new(iter.chain(self.create_pending_requests_iterator(client, &data.contract_address, &block, + true => Box::new(iter.chain(self.create_pending_requests_iterator(client, &contract_address, &block, &DocumentKeyShadowRetrievalService::read_pending_requests_count, &DocumentKeyShadowRetrievalService::read_pending_request))), false => iter @@ -363,63 +356,59 @@ impl ServiceContract for OnChainServiceContract { } fn publish_generated_server_key(&self, origin: &Address, server_key_id: &ServerKeyId, server_key: Public) -> Result<(), String> { - self.send_contract_transaction(origin, server_key_id, ServerKeyGenerationService::is_response_required, |_, _, service| - Ok(ServerKeyGenerationService::prepare_pubish_tx_data(service, server_key_id, &server_key)) - ) + self.send_contract_transaction("publish_generated_server_key", origin, server_key_id, ServerKeyGenerationService::is_response_required, + |_, _, service| Ok(ServerKeyGenerationService::prepare_pubish_tx_data(service, server_key_id, &server_key))) } fn publish_server_key_generation_error(&self, origin: &Address, server_key_id: &ServerKeyId) -> Result<(), String> { - self.send_contract_transaction(origin, server_key_id, ServerKeyGenerationService::is_response_required, |_, _, service| - Ok(ServerKeyGenerationService::prepare_error_tx_data(service, server_key_id)) - ) + self.send_contract_transaction("publish_server_key_generation_error", origin, server_key_id, ServerKeyGenerationService::is_response_required, + |_, _, service| Ok(ServerKeyGenerationService::prepare_error_tx_data(service, server_key_id))) } fn publish_retrieved_server_key(&self, origin: &Address, server_key_id: &ServerKeyId, server_key: Public, threshold: usize) -> Result<(), String> { let threshold = serialize_threshold(threshold)?; - self.send_contract_transaction(origin, server_key_id, ServerKeyRetrievalService::is_response_required, |_, _, service| - Ok(ServerKeyRetrievalService::prepare_pubish_tx_data(service, server_key_id, server_key, threshold)) - ) + self.send_contract_transaction("publish_retrieved_server_key", origin, server_key_id, ServerKeyRetrievalService::is_response_required, + |_, _, service| Ok(ServerKeyRetrievalService::prepare_pubish_tx_data(service, server_key_id, server_key, threshold))) } fn publish_server_key_retrieval_error(&self, origin: &Address, server_key_id: &ServerKeyId) -> Result<(), String> { - self.send_contract_transaction(origin, server_key_id, ServerKeyRetrievalService::is_response_required, |_, _, service| - Ok(ServerKeyRetrievalService::prepare_error_tx_data(service, server_key_id)) - ) + self.send_contract_transaction("publish_server_key_retrieval_error", origin, server_key_id, ServerKeyRetrievalService::is_response_required, + |_, _, service| Ok(ServerKeyRetrievalService::prepare_error_tx_data(service, server_key_id))) } fn publish_stored_document_key(&self, origin: &Address, server_key_id: &ServerKeyId) -> Result<(), String> { - self.send_contract_transaction(origin, server_key_id, DocumentKeyStoreService::is_response_required, |_, _, service| - Ok(DocumentKeyStoreService::prepare_pubish_tx_data(service, server_key_id)) - ) + self.send_contract_transaction("publish_stored_document_key", origin, server_key_id, DocumentKeyStoreService::is_response_required, + |_, _, service| Ok(DocumentKeyStoreService::prepare_pubish_tx_data(service, server_key_id))) } fn publish_document_key_store_error(&self, origin: &Address, server_key_id: &ServerKeyId) -> Result<(), String> { - self.send_contract_transaction(origin, server_key_id, DocumentKeyStoreService::is_response_required, |_, _, service| - Ok(DocumentKeyStoreService::prepare_error_tx_data(service, server_key_id)) - ) + self.send_contract_transaction("publish_document_key_store_error", origin, server_key_id, DocumentKeyStoreService::is_response_required, + |_, _, service| Ok(DocumentKeyStoreService::prepare_error_tx_data(service, server_key_id))) } fn publish_retrieved_document_key_common(&self, origin: &Address, server_key_id: &ServerKeyId, requester: &Address, common_point: Public, threshold: usize) -> Result<(), String> { let threshold = serialize_threshold(threshold)?; - self.send_contract_transaction(origin, server_key_id, |client, contract_address, contract, server_key_id, key_server| - DocumentKeyShadowRetrievalService::is_response_required(client, contract_address, contract, server_key_id, requester, key_server), - |_, _, service| - Ok(DocumentKeyShadowRetrievalService::prepare_pubish_common_tx_data(service, server_key_id, requester, common_point, threshold)) + self.send_contract_transaction("publish_retrieved_document_key_common", origin, server_key_id, + |client, contract_address, contract, server_key_id, key_server| + DocumentKeyShadowRetrievalService::is_response_required(client, contract_address, contract, server_key_id, requester, key_server), + |_, _, service| + Ok(DocumentKeyShadowRetrievalService::prepare_pubish_common_tx_data(service, server_key_id, requester, common_point, threshold)) ) } fn publish_retrieved_document_key_personal(&self, origin: &Address, server_key_id: &ServerKeyId, requester: &Address, participants: &[Address], decrypted_secret: Public, shadow: Bytes) -> Result<(), String> { - self.send_contract_transaction(origin, server_key_id, |_, _, _, _, _| true, + self.send_contract_transaction("publish_retrieved_document_key_personal", origin, server_key_id, |_, _, _, _, _| true, move |client, address, service| DocumentKeyShadowRetrievalService::prepare_pubish_personal_tx_data(client, address, service, server_key_id, requester, participants, decrypted_secret, shadow) ) } fn publish_document_key_retrieval_error(&self, origin: &Address, server_key_id: &ServerKeyId, requester: &Address) -> Result<(), String> { - self.send_contract_transaction(origin, server_key_id, |client, contract_address, contract, server_key_id, key_server| - DocumentKeyShadowRetrievalService::is_response_required(client, contract_address, contract, server_key_id, requester, key_server), - |_, _, service| - Ok(DocumentKeyShadowRetrievalService::prepare_error_tx_data(service, server_key_id, requester)) + self.send_contract_transaction("publish_document_key_retrieval_error", origin, server_key_id, + |client, contract_address, contract, server_key_id, key_server| + DocumentKeyShadowRetrievalService::is_response_required(client, contract_address, contract, server_key_id, requester, key_server), + |_, _, service| + Ok(DocumentKeyShadowRetrievalService::prepare_error_tx_data(service, server_key_id, requester)) ) } } @@ -742,16 +731,16 @@ impl DocumentKeyShadowRetrievalService { .map(|not_confirmed| ( not_confirmed, match is_common_retrieval_completed { - true => ServiceTask::RetrieveShadowDocumentKeyCommon( + true => ServiceTask::RetrieveShadowDocumentKeyPersonal( + contract_address.clone(), + server_key_id, + requester, + ), + false => ServiceTask::RetrieveShadowDocumentKeyCommon( contract_address.clone(), server_key_id, public_to_address(&requester), ), - false => ServiceTask::RetrieveShadowDocumentKeyPersonal( - contract_address.clone(), - server_key_id, - requester, - ) }, )) .map_err(|error| format!("{}", error)) diff --git a/secret_store/src/listener/service_contract_listener.rs b/secret_store/src/listener/service_contract_listener.rs index 724c902d1..8dc549a72 100644 --- a/secret_store/src/listener/service_contract_listener.rs +++ b/secret_store/src/listener/service_contract_listener.rs @@ -25,11 +25,13 @@ use ethkey::{Public, public_to_address}; use bytes::Bytes; use ethereum_types::{H256, U256, Address}; use key_server_set::KeyServerSet; -use key_server_cluster::{ClusterClient, ClusterSessionsListener, ClusterSession}; +use key_server_cluster::{NodeId, ClusterClient, ClusterSessionsListener, ClusterSession}; use key_server_cluster::math; use key_server_cluster::generation_session::SessionImpl as GenerationSession; use key_server_cluster::encryption_session::{check_encrypted_data, update_encrypted_data}; use key_server_cluster::decryption_session::SessionImpl as DecryptionSession; +use key_server_cluster::key_version_negotiation_session::{SessionImpl as KeyVersionNegotiationSession, + IsolatedSessionTransport as KeyVersionNegotiationTransport, FailedContinueAction}; use key_storage::KeyStorage; use acl_storage::AclStorage; use listener::service_contract::ServiceContract; @@ -138,7 +140,6 @@ impl ServiceContractListener { key_server_set: params.key_server_set, key_storage: params.key_storage, }); - data.tasks_queue.push(ServiceTask::Retry); // we are not starting thread when in test mode let service_handle = if cfg!(test) { @@ -154,11 +155,17 @@ impl ServiceContractListener { }); contract.data.cluster.add_generation_listener(contract.clone()); contract.data.cluster.add_decryption_listener(contract.clone()); + contract.data.cluster.add_key_version_negotiation_listener(contract.clone()); Ok(contract) } /// Process incoming events of service contract. fn process_service_contract_events(&self) { + // shortcut: do not process events if we're isolated from the cluster + if self.data.key_server_set.is_isolated() { + return; + } + self.data.tasks_queue.push_many(self.data.contract.read_logs() .filter_map(|task| Self::filter_task(&self.data, task))); } @@ -168,7 +175,7 @@ impl ServiceContractListener { match task { // when this node should be master of this server key generation session ServiceTask::GenerateServerKey(origin, server_key_id, author, threshold) if is_processed_by_this_key_server( - &*data.key_server_set, &*data.self_key_pair, &server_key_id) => + &*data.key_server_set, data.self_key_pair.public(), &server_key_id) => Some(ServiceTask::GenerateServerKey(origin, server_key_id, author, threshold)), // when server key is not yet generated and generation must be initiated by other node ServiceTask::GenerateServerKey(_, _, _, _) => None, @@ -187,7 +194,7 @@ impl ServiceContractListener { // when this node should be master of this document key decryption session ServiceTask::RetrieveShadowDocumentKeyPersonal(origin, server_key_id, requester) if is_processed_by_this_key_server( - &*data.key_server_set, &*data.self_key_pair, &server_key_id) => + &*data.key_server_set, data.self_key_pair.public(), &server_key_id) => Some(ServiceTask::RetrieveShadowDocumentKeyPersonal(origin, server_key_id, requester)), // when server key is not yet generated and generation must be initiated by other node ServiceTask::RetrieveShadowDocumentKeyPersonal(_, _, _) => None, @@ -211,7 +218,7 @@ impl ServiceContractListener { }; } - trace!(target: "secretstore_net", "{}: ServiceContractListener thread stopped", data.self_key_pair.public()); + trace!(target: "secretstore", "{}: ServiceContractListener thread stopped", data.self_key_pair.public()); } /// Process single service task. @@ -430,7 +437,7 @@ impl Drop for ServiceContractListener { impl ChainNotify for ServiceContractListener { fn new_blocks(&self, _imported: Vec, _invalid: Vec, route: ChainRoute, _sealed: Vec, _proposed: Vec, _duration: Duration) { let enacted_len = route.enacted().len(); - if enacted_len == 0 { + if enacted_len == 0 && route.retracted().is_empty() { return; } @@ -443,8 +450,11 @@ impl ChainNotify for ServiceContractListener { // schedule retry if received enough blocks since last retry // it maybe inaccurate when switching syncing/synced states, but that's ok if self.data.last_retry.fetch_add(enacted_len, Ordering::Relaxed) >= RETRY_INTERVAL_BLOCKS { - self.data.tasks_queue.push(ServiceTask::Retry); - self.data.last_retry.store(0, Ordering::Relaxed); + // shortcut: do not retry if we're isolated from the cluster + if !self.data.key_server_set.is_isolated() { + self.data.tasks_queue.push(ServiceTask::Retry); + self.data.last_retry.store(0, Ordering::Relaxed); + } } } } @@ -491,6 +501,35 @@ impl ClusterSessionsListener for ServiceContractListener { } } +impl ClusterSessionsListener> for ServiceContractListener { + fn on_session_removed(&self, session: Arc>) { + // by this time sesion must already be completed - either successfully, or not + assert!(session.is_finished()); + + // we're interested in: + // 1) sessions failed with fatal error + // 2) with decryption continue action + let error = match session.wait() { + Err(ref error) if !error.is_non_fatal() => error.clone(), + _ => return, + }; + + let (origin, requester) = match session.take_failed_continue_action() { + Some(FailedContinueAction::Decrypt(Some(origin), requester)) => (origin, requester), + _ => return, + }; + + // check if master node is responsible for processing key requests + let meta = session.meta(); + if !is_processed_by_this_key_server(&*self.data.key_server_set, &meta.master_node_id, &meta.id) { + return; + } + + // ignore result as we're already processing an error + let _ = Self::process_document_key_retrieval_result(&self.data, origin, &meta.id, &requester, Err(error)); + } +} + impl ::std::fmt::Display for ServiceTask { fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { match *self { @@ -520,8 +559,8 @@ fn log_service_task_result(task: &ServiceTask, self_id: &Public, result: Result< result } -/// Returns true when session, related to `server_key_id` must be started on this KeyServer. -fn is_processed_by_this_key_server(key_server_set: &KeyServerSet, self_key_pair: &NodeKeyPair, server_key_id: &H256) -> bool { +/// Returns true when session, related to `server_key_id` must be started on `node`. +fn is_processed_by_this_key_server(key_server_set: &KeyServerSet, node: &NodeId, server_key_id: &H256) -> bool { let servers = key_server_set.snapshot().current_set; let total_servers_count = servers.len(); match total_servers_count { @@ -530,7 +569,7 @@ fn is_processed_by_this_key_server(key_server_set: &KeyServerSet, self_key_pair: _ => (), } - let this_server_index = match servers.keys().enumerate().find(|&(_, s)| s == self_key_pair.public()) { + let this_server_index = match servers.keys().enumerate().find(|&(_, s)| s == node) { Some((index, _)) => index, None => return false, }; @@ -554,8 +593,9 @@ mod tests { use acl_storage::{AclStorage, DummyAclStorage}; use key_storage::{KeyStorage, DocumentKeyShare}; use key_storage::tests::DummyKeyStorage; + use key_server_set::KeyServerSet; use key_server_set::tests::MapKeyServerSet; - use {PlainNodeKeyPair, ServerKeyId}; + use {NodeKeyPair, PlainNodeKeyPair, ServerKeyId}; use super::{ServiceTask, ServiceContractListener, ServiceContractListenerParams, is_processed_by_this_key_server}; fn create_non_empty_key_storage(has_doc_key: bool) -> Arc { @@ -571,19 +611,23 @@ mod tests { key_storage } - fn make_service_contract_listener(contract: Option>, cluster: Option>, key_storage: Option>, acl_storage: Option>) -> Arc { - let contract = contract.unwrap_or_else(|| Arc::new(DummyServiceContract::default())); - let cluster = cluster.unwrap_or_else(|| Arc::new(DummyClusterClient::default())); - let key_storage = key_storage.unwrap_or_else(|| Arc::new(DummyKeyStorage::default())); - let acl_storage = acl_storage.unwrap_or_else(|| Arc::new(DummyAclStorage::default())); - let servers_set = Arc::new(MapKeyServerSet::new(vec![ + fn make_servers_set(is_isolated: bool) -> Arc { + Arc::new(MapKeyServerSet::new(is_isolated, vec![ ("79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8".parse().unwrap(), "127.0.0.1:8080".parse().unwrap()), ("c6047f9441ed7d6d3045406e95c07cd85c778e4b8cef3ca7abac09b95c709ee51ae168fea63dc339a3c58419466ceaeef7f632653266d0e1236431a950cfe52a".parse().unwrap(), "127.0.0.1:8080".parse().unwrap()), ("f9308a019258c31049344f85f89d5229b531c845836f99b08601f113bce036f9388f7b0f632de8140fe337e62a37f3566500a99934c2231b6cb9fd7584b8e672".parse().unwrap(), "127.0.0.1:8080".parse().unwrap()), - ].into_iter().collect())); + ].into_iter().collect())) + } + + fn make_service_contract_listener(contract: Option>, cluster: Option>, key_storage: Option>, acl_storage: Option>, servers_set: Option>) -> Arc { + let contract = contract.unwrap_or_else(|| Arc::new(DummyServiceContract::default())); + let cluster = cluster.unwrap_or_else(|| Arc::new(DummyClusterClient::default())); + let key_storage = key_storage.unwrap_or_else(|| Arc::new(DummyKeyStorage::default())); + let acl_storage = acl_storage.unwrap_or_else(|| Arc::new(DummyAclStorage::default())); + let servers_set = servers_set.unwrap_or_else(|| make_servers_set(false)); let self_key_pair = Arc::new(PlainNodeKeyPair::new(KeyPair::from_secret("0000000000000000000000000000000000000000000000000000000000000001".parse().unwrap()).unwrap())); ServiceContractListener::new(ServiceContractListenerParams { contract: contract, @@ -599,7 +643,7 @@ mod tests { fn is_not_processed_by_this_key_server_with_zero_servers() { assert_eq!(is_processed_by_this_key_server( &MapKeyServerSet::default(), - &PlainNodeKeyPair::new(Random.generate().unwrap()), + Random.generate().unwrap().public(), &Default::default()), false); } @@ -607,27 +651,27 @@ mod tests { fn is_processed_by_this_key_server_with_single_server() { let self_key_pair = Random.generate().unwrap(); assert_eq!(is_processed_by_this_key_server( - &MapKeyServerSet::new(vec![ + &MapKeyServerSet::new(false, vec![ (self_key_pair.public().clone(), "127.0.0.1:8080".parse().unwrap()) ].into_iter().collect()), - &PlainNodeKeyPair::new(self_key_pair), + self_key_pair.public(), &Default::default()), true); } #[test] fn is_not_processed_by_this_key_server_when_not_a_part_of_servers_set() { assert!(is_processed_by_this_key_server( - &MapKeyServerSet::new(vec![ + &MapKeyServerSet::new(false, vec![ (Random.generate().unwrap().public().clone(), "127.0.0.1:8080".parse().unwrap()) ].into_iter().collect()), - &PlainNodeKeyPair::new(Random.generate().unwrap()), + Random.generate().unwrap().public(), &Default::default())); } #[test] fn is_processed_by_this_key_server_in_set_of_3() { // servers set is ordered && server range depends on index of this server - let servers_set = MapKeyServerSet::new(vec![ + let servers_set = MapKeyServerSet::new(false, vec![ // secret: 0000000000000000000000000000000000000000000000000000000000000001 ("79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8".parse().unwrap(), "127.0.0.1:8080".parse().unwrap()), @@ -642,46 +686,46 @@ mod tests { // 1st server: process hashes [0x0; 0x555...555] let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret( "0000000000000000000000000000000000000000000000000000000000000001".parse().unwrap()).unwrap()); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"0000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"3000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"5555555555555555555555555555555555555555555555555555555555555555".parse().unwrap()), true); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"5555555555555555555555555555555555555555555555555555555555555556".parse().unwrap()), false); // 2nd server: process hashes from 0x555...556 to 0xaaa...aab let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret( "0000000000000000000000000000000000000000000000000000000000000002".parse().unwrap()).unwrap()); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"5555555555555555555555555555555555555555555555555555555555555555".parse().unwrap()), false); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"5555555555555555555555555555555555555555555555555555555555555556".parse().unwrap()), true); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"7555555555555555555555555555555555555555555555555555555555555555".parse().unwrap()), true); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab".parse().unwrap()), true); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaac".parse().unwrap()), false); // 3rd server: process hashes from 0x800...000 to 0xbff...ff let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret( "0000000000000000000000000000000000000000000000000000000000000003".parse().unwrap()).unwrap()); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab".parse().unwrap()), false); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaac".parse().unwrap()), true); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"daaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaac".parse().unwrap()), true); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true); } #[test] fn is_processed_by_this_key_server_in_set_of_4() { // servers set is ordered && server range depends on index of this server - let servers_set = MapKeyServerSet::new(vec![ + let servers_set = MapKeyServerSet::new(false, vec![ // secret: 0000000000000000000000000000000000000000000000000000000000000001 ("79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8".parse().unwrap(), "127.0.0.1:8080".parse().unwrap()), @@ -699,62 +743,72 @@ mod tests { // 1st server: process hashes [0x0; 0x3ff...ff] let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret( "0000000000000000000000000000000000000000000000000000000000000001".parse().unwrap()).unwrap()); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"0000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"2000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"3fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"4000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), false); // 2nd server: process hashes from 0x400...000 to 0x7ff...ff let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret( "0000000000000000000000000000000000000000000000000000000000000002".parse().unwrap()).unwrap()); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"3fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), false); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"4000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"6000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"8000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), false); // 3rd server: process hashes from 0x800...000 to 0xbff...ff let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret( "0000000000000000000000000000000000000000000000000000000000000004".parse().unwrap()).unwrap()); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), false); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"8000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"a000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"bfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"c000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), false); // 4th server: process hashes from 0xc00...000 to 0xfff...ff let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret( "0000000000000000000000000000000000000000000000000000000000000003".parse().unwrap()).unwrap()); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"bfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), false); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"c000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"e000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true); - assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(), &"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true); } #[test] fn no_tasks_scheduled_when_no_contract_events() { - let listener = make_service_contract_listener(None, None, None, None); - assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); + let listener = make_service_contract_listener(None, None, None, None, None); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 0); listener.process_service_contract_events(); - assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 0); + } + + #[test] + fn tasks_are_not_scheduled_on_isolated_node() { + let mut contract = DummyServiceContract::default(); + contract.logs.push(ServiceTask::GenerateServerKey(Default::default(), Default::default(), Default::default(), 0)); + let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None, Some(make_servers_set(true))); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 0); + listener.process_service_contract_events(); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 0); } // server key generation tests @@ -763,10 +817,10 @@ mod tests { fn server_key_generation_is_scheduled_when_requested() { let mut contract = DummyServiceContract::default(); contract.logs.push(ServiceTask::GenerateServerKey(Default::default(), Default::default(), Default::default(), 0)); - let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None); - assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); + let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None, None); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 0); listener.process_service_contract_events(); - assert_eq!(listener.data.tasks_queue.snapshot().len(), 2); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); assert_eq!(listener.data.tasks_queue.snapshot().pop_back(), Some(ServiceTask::GenerateServerKey( Default::default(), Default::default(), Default::default(), 0))); } @@ -776,16 +830,16 @@ mod tests { let server_key_id = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap(); let mut contract = DummyServiceContract::default(); contract.logs.push(ServiceTask::GenerateServerKey(Default::default(), server_key_id, Default::default(), 0)); - let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None); - assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); + let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None, None); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 0); listener.process_service_contract_events(); - assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 0); } #[test] fn generation_session_is_created_when_processing_generate_server_key_task() { let cluster = Arc::new(DummyClusterClient::default()); - let listener = make_service_contract_listener(None, Some(cluster.clone()), None, None); + let listener = make_service_contract_listener(None, Some(cluster.clone()), None, None, None); ServiceContractListener::process_service_task(&listener.data, ServiceTask::GenerateServerKey( Default::default(), Default::default(), Default::default(), Default::default())).unwrap_err(); assert_eq!(cluster.generation_requests_count.load(Ordering::Relaxed), 1); @@ -797,7 +851,7 @@ mod tests { contract.pending_requests.push((false, ServiceTask::GenerateServerKey(Default::default(), Default::default(), Default::default(), Default::default()))); let cluster = Arc::new(DummyClusterClient::default()); - let listener = make_service_contract_listener(Some(Arc::new(contract)), Some(cluster.clone()), None, None); + let listener = make_service_contract_listener(Some(Arc::new(contract)), Some(cluster.clone()), None, None, None); listener.data.retry_data.lock().affected_server_keys.insert(Default::default()); ServiceContractListener::retry_pending_requests(&listener.data).unwrap(); assert_eq!(cluster.generation_requests_count.load(Ordering::Relaxed), 0); @@ -809,10 +863,10 @@ mod tests { fn server_key_retrieval_is_scheduled_when_requested() { let mut contract = DummyServiceContract::default(); contract.logs.push(ServiceTask::RetrieveServerKey(Default::default(), Default::default())); - let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None); - assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); + let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None, None); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 0); listener.process_service_contract_events(); - assert_eq!(listener.data.tasks_queue.snapshot().len(), 2); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); assert_eq!(listener.data.tasks_queue.snapshot().pop_back(), Some(ServiceTask::RetrieveServerKey( Default::default(), Default::default()))); } @@ -822,10 +876,10 @@ mod tests { let server_key_id: ServerKeyId = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap(); let mut contract = DummyServiceContract::default(); contract.logs.push(ServiceTask::RetrieveServerKey(Default::default(), server_key_id.clone())); - let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None); - assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); + let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None, None); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 0); listener.process_service_contract_events(); - assert_eq!(listener.data.tasks_queue.snapshot().len(), 2); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); assert_eq!(listener.data.tasks_queue.snapshot().pop_back(), Some(ServiceTask::RetrieveServerKey( Default::default(), server_key_id))); } @@ -834,7 +888,7 @@ mod tests { fn server_key_is_retrieved_when_processing_retrieve_server_key_task() { let contract = Arc::new(DummyServiceContract::default()); let key_storage = create_non_empty_key_storage(false); - let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage), None); + let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage), None, None); ServiceContractListener::process_service_task(&listener.data, ServiceTask::RetrieveServerKey( Default::default(), Default::default())).unwrap(); assert_eq!(*contract.retrieved_server_keys.lock(), vec![(Default::default(), @@ -844,7 +898,7 @@ mod tests { #[test] fn server_key_retrieval_failure_is_reported_when_processing_retrieve_server_key_task_and_key_is_unknown() { let contract = Arc::new(DummyServiceContract::default()); - let listener = make_service_contract_listener(Some(contract.clone()), None, None, None); + let listener = make_service_contract_listener(Some(contract.clone()), None, None, None, None); ServiceContractListener::process_service_task(&listener.data, ServiceTask::RetrieveServerKey( Default::default(), Default::default())).unwrap(); assert_eq!(*contract.server_keys_retrieval_failures.lock(), vec![Default::default()]); @@ -855,7 +909,7 @@ mod tests { let mut contract = DummyServiceContract::default(); contract.pending_requests.push((false, ServiceTask::RetrieveServerKey(Default::default(), Default::default()))); let cluster = Arc::new(DummyClusterClient::default()); - let listener = make_service_contract_listener(Some(Arc::new(contract)), Some(cluster.clone()), None, None); + let listener = make_service_contract_listener(Some(Arc::new(contract)), Some(cluster.clone()), None, None, None); listener.data.retry_data.lock().affected_server_keys.insert(Default::default()); ServiceContractListener::retry_pending_requests(&listener.data).unwrap(); assert_eq!(cluster.generation_requests_count.load(Ordering::Relaxed), 0); @@ -868,10 +922,10 @@ mod tests { let mut contract = DummyServiceContract::default(); contract.logs.push(ServiceTask::StoreDocumentKey(Default::default(), Default::default(), Default::default(), Default::default(), Default::default())); - let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None); - assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); + let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None, None); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 0); listener.process_service_contract_events(); - assert_eq!(listener.data.tasks_queue.snapshot().len(), 2); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); assert_eq!(listener.data.tasks_queue.snapshot().pop_back(), Some(ServiceTask::StoreDocumentKey( Default::default(), Default::default(), Default::default(), Default::default(), Default::default()))); } @@ -882,10 +936,10 @@ mod tests { let mut contract = DummyServiceContract::default(); contract.logs.push(ServiceTask::StoreDocumentKey(Default::default(), server_key_id.clone(), Default::default(), Default::default(), Default::default())); - let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None); - assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); + let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None, None); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 0); listener.process_service_contract_events(); - assert_eq!(listener.data.tasks_queue.snapshot().len(), 2); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); assert_eq!(listener.data.tasks_queue.snapshot().pop_back(), Some(ServiceTask::StoreDocumentKey( Default::default(), server_key_id, Default::default(), Default::default(), Default::default()))); } @@ -894,7 +948,7 @@ mod tests { fn document_key_is_stored_when_processing_store_document_key_task() { let contract = Arc::new(DummyServiceContract::default()); let key_storage = create_non_empty_key_storage(false); - let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage.clone()), None); + let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage.clone()), None, None); ServiceContractListener::process_service_task(&listener.data, ServiceTask::StoreDocumentKey( Default::default(), Default::default(), Default::default(), Default::default(), Default::default())).unwrap(); assert_eq!(*contract.stored_document_keys.lock(), vec![Default::default()]); @@ -907,7 +961,7 @@ mod tests { #[test] fn document_key_store_failure_reported_when_no_server_key() { let contract = Arc::new(DummyServiceContract::default()); - let listener = make_service_contract_listener(Some(contract.clone()), None, None, None); + let listener = make_service_contract_listener(Some(contract.clone()), None, None, None, None); ServiceContractListener::process_service_task(&listener.data, ServiceTask::StoreDocumentKey( Default::default(), Default::default(), Default::default(), Default::default(), Default::default())).unwrap_err(); assert_eq!(*contract.document_keys_store_failures.lock(), vec![Default::default()]); @@ -917,7 +971,7 @@ mod tests { fn document_key_store_failure_reported_when_document_key_already_set() { let contract = Arc::new(DummyServiceContract::default()); let key_storage = create_non_empty_key_storage(true); - let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage), None); + let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage), None, None); ServiceContractListener::process_service_task(&listener.data, ServiceTask::StoreDocumentKey( Default::default(), Default::default(), Default::default(), Default::default(), Default::default())).unwrap_err(); assert_eq!(*contract.document_keys_store_failures.lock(), vec![Default::default()]); @@ -927,7 +981,7 @@ mod tests { fn document_key_store_failure_reported_when_author_differs() { let contract = Arc::new(DummyServiceContract::default()); let key_storage = create_non_empty_key_storage(false); - let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage), None); + let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage), None, None); ServiceContractListener::process_service_task(&listener.data, ServiceTask::StoreDocumentKey( Default::default(), Default::default(), 1.into(), Default::default(), Default::default())).unwrap_err(); assert_eq!(*contract.document_keys_store_failures.lock(), vec![Default::default()]); @@ -939,10 +993,10 @@ mod tests { fn document_key_shadow_common_retrieval_is_scheduled_when_requested() { let mut contract = DummyServiceContract::default(); contract.logs.push(ServiceTask::RetrieveShadowDocumentKeyCommon(Default::default(), Default::default(), Default::default())); - let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None); - assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); + let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None, None); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 0); listener.process_service_contract_events(); - assert_eq!(listener.data.tasks_queue.snapshot().len(), 2); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); assert_eq!(listener.data.tasks_queue.snapshot().pop_back(), Some(ServiceTask::RetrieveShadowDocumentKeyCommon( Default::default(), Default::default(), Default::default()))); } @@ -952,10 +1006,10 @@ mod tests { let server_key_id: ServerKeyId = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap(); let mut contract = DummyServiceContract::default(); contract.logs.push(ServiceTask::RetrieveShadowDocumentKeyCommon(Default::default(), server_key_id.clone(), Default::default())); - let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None); - assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); + let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None, None); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 0); listener.process_service_contract_events(); - assert_eq!(listener.data.tasks_queue.snapshot().len(), 2); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); assert_eq!(listener.data.tasks_queue.snapshot().pop_back(), Some(ServiceTask::RetrieveShadowDocumentKeyCommon( Default::default(), server_key_id, Default::default()))); } @@ -964,7 +1018,7 @@ mod tests { fn document_key_shadow_common_is_retrieved_when_processing_document_key_shadow_common_retrieval_task() { let contract = Arc::new(DummyServiceContract::default()); let key_storage = create_non_empty_key_storage(true); - let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage.clone()), None); + let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage.clone()), None, None); ServiceContractListener::process_service_task(&listener.data, ServiceTask::RetrieveShadowDocumentKeyCommon( Default::default(), Default::default(), Default::default())).unwrap(); assert_eq!(*contract.common_shadow_retrieved_document_keys.lock(), vec![(Default::default(), Default::default(), @@ -977,7 +1031,7 @@ mod tests { acl_storage.prohibit(Default::default(), Default::default()); let contract = Arc::new(DummyServiceContract::default()); let key_storage = create_non_empty_key_storage(true); - let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage.clone()), Some(Arc::new(acl_storage))); + let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage.clone()), Some(Arc::new(acl_storage)), None); ServiceContractListener::process_service_task(&listener.data, ServiceTask::RetrieveShadowDocumentKeyCommon( Default::default(), Default::default(), Default::default())).unwrap_err(); assert_eq!(*contract.document_keys_shadow_retrieval_failures.lock(), vec![(Default::default(), Default::default())]); @@ -986,7 +1040,7 @@ mod tests { #[test] fn document_key_shadow_common_retrieval_failure_reported_when_no_server_key() { let contract = Arc::new(DummyServiceContract::default()); - let listener = make_service_contract_listener(Some(contract.clone()), None, None, None); + let listener = make_service_contract_listener(Some(contract.clone()), None, None, None, None); ServiceContractListener::process_service_task(&listener.data, ServiceTask::RetrieveShadowDocumentKeyCommon( Default::default(), Default::default(), Default::default())).unwrap_err(); assert_eq!(*contract.document_keys_shadow_retrieval_failures.lock(), vec![(Default::default(), Default::default())]); @@ -996,7 +1050,7 @@ mod tests { fn document_key_shadow_common_retrieval_failure_reported_when_no_document_key() { let contract = Arc::new(DummyServiceContract::default()); let key_storage = create_non_empty_key_storage(false); - let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage.clone()), None); + let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage.clone()), None, None); ServiceContractListener::process_service_task(&listener.data, ServiceTask::RetrieveShadowDocumentKeyCommon( Default::default(), Default::default(), Default::default())).unwrap_err(); assert_eq!(*contract.document_keys_shadow_retrieval_failures.lock(), vec![(Default::default(), Default::default())]); diff --git a/secret_store/src/trusted_client.rs b/secret_store/src/trusted_client.rs index cf9c987be..24db21460 100644 --- a/secret_store/src/trusted_client.rs +++ b/secret_store/src/trusted_client.rs @@ -17,11 +17,12 @@ use std::sync::{Arc, Weak}; use bytes::Bytes; use ethereum_types::Address; -use ethcore::client::{Client, BlockChainClient, ChainInfo, Nonce}; +use ethcore::client::{Client, BlockChainClient, ChainInfo, Nonce, BlockId, RegistryInfo}; use ethcore::miner::{Miner, MinerService}; use sync::SyncProvider; use transaction::{Transaction, SignedTransaction, Action}; -use {Error, NodeKeyPair}; +use helpers::{get_confirmed_block_hash, REQUEST_CONFIRMATIONS_REQUIRED}; +use {Error, NodeKeyPair, ContractAddress}; #[derive(Clone)] /// 'Trusted' client weak reference. @@ -84,6 +85,18 @@ impl TrustedClient { let signed = SignedTransaction::new(transaction.with_signature(signature, chain_id))?; miner.import_own_transaction(&*client, signed.into()) .map_err(|e| Error::Internal(format!("failed to import tx: {}", e))) - .map(|_| ()) + } + + /// Read contract address. If address source is registry, address only returned if current client state is + /// trusted. Address from registry is read from registry from block latest block with + /// REQUEST_CONFIRMATIONS_REQUIRED confirmations. + pub fn read_contract_address(&self, registry_name: String, address: &ContractAddress) -> Option
{ + match *address { + ContractAddress::Address(ref address) => Some(address.clone()), + ContractAddress::Registry => self.get().and_then(|client| + get_confirmed_block_hash(&*client, REQUEST_CONFIRMATIONS_REQUIRED) + .and_then(|block| client.registry_address(registry_name, BlockId::Hash(block))) + ), + } } } diff --git a/secret_store/src/types/all.rs b/secret_store/src/types/all.rs index f0e038810..feca4141f 100644 --- a/secret_store/src/types/all.rs +++ b/secret_store/src/types/all.rs @@ -66,8 +66,8 @@ pub struct ServiceConfiguration { pub service_contract_doc_store_address: Option, /// Document key shadow retrieval service contract address. pub service_contract_doc_sretr_address: Option, - /// Is ACL check enabled. If false, everyone has access to all keys. Useful for tests only. - pub acl_check_enabled: bool, + /// ACL check contract address. If None, everyone has access to all keys. Useful for tests only. + pub acl_check_contract_address: Option, /// Cluster configuration. pub cluster_config: ClusterConfiguration, } @@ -81,6 +81,8 @@ pub struct ClusterConfiguration { pub listener_address: NodeAddress, /// All cluster nodes addresses. pub nodes: BTreeMap, + /// Key Server Set contract address. If None, servers from 'nodes' map are used. + pub key_server_set_contract_address: Option, /// Allow outbound connections to 'higher' nodes. /// This is useful for tests, but slower a bit for production. pub allow_connecting_to_higher_nodes: bool,