diff --git a/parity/cli/mod.rs b/parity/cli/mod.rs index 5e49890e0..330abe936 100644 --- a/parity/cli/mod.rs +++ b/parity/cli/mod.rs @@ -553,7 +553,7 @@ usage! { ARG arg_secretstore_contract: (String) = "none", or |c: &Config| otry!(c.secretstore).service_contract.clone(), "--secretstore-contract=[SOURCE]", - "Secret Store Service contract source: none, registry (contract address is read from registry) or address.", + "Secret Store Service contract address source: none, registry (contract address is read from registry) or address.", ARG arg_secretstore_nodes: (String) = "", or |c: &Config| otry!(c.secretstore).nodes.as_ref().map(|vec| vec.join(",")), "--secretstore-nodes=[NODES]", diff --git a/parity/configuration.rs b/parity/configuration.rs index ec35020f5..b9a614c8b 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -1082,7 +1082,7 @@ impl Configuration { Ok(match self.args.arg_secretstore_contract.as_ref() { "none" => None, "registry" => Some(SecretStoreContractAddress::Registry), - a @ _ => Some(SecretStoreContractAddress::Address(a.parse().map_err(|e| format!("{}", e))?)), + a => Some(SecretStoreContractAddress::Address(a.parse().map_err(|e| format!("{}", e))?)), }) } diff --git a/secret_store/src/key_server_cluster/admin_sessions/key_version_negotiation_session.rs b/secret_store/src/key_server_cluster/admin_sessions/key_version_negotiation_session.rs index f86275f3a..105df299c 100644 --- a/secret_store/src/key_server_cluster/admin_sessions/key_version_negotiation_session.rs +++ b/secret_store/src/key_server_cluster/admin_sessions/key_version_negotiation_session.rs @@ -203,9 +203,8 @@ impl SessionImpl where T: SessionTransport { self.core.completed.wait(&mut data); } - data.result.as_ref() + data.result.clone() .expect("checked above or waited for completed; completed is only signaled when result.is_some(); qed") - .clone() } /// Initialize session. diff --git a/secret_store/src/key_server_cluster/admin_sessions/share_add_session.rs b/secret_store/src/key_server_cluster/admin_sessions/share_add_session.rs index 3156d6f87..39fd70cd4 100644 --- a/secret_store/src/key_server_cluster/admin_sessions/share_add_session.rs +++ b/secret_store/src/key_server_cluster/admin_sessions/share_add_session.rs @@ -86,16 +86,8 @@ struct SessionData { pub version: Option, /// Consensus session. pub consensus_session: Option>, - /// NewKeyShare: threshold. - pub key_share_threshold: Option, - /// NewKeyShare: author. - pub key_share_author: Option, - /// NewKeyShare: joint public. - pub key_share_joint_public: Option, - /// NewKeyShare: Common (shared) encryption point. - pub key_share_common_point: Option, - /// NewKeyShare: Encrypted point. - pub key_share_encrypted_point: Option, + /// NewKeyShare (for nodes being added). + pub new_key_share: Option, /// Nodes id numbers. pub id_numbers: Option>>, /// Secret subshares received from nodes. @@ -104,6 +96,20 @@ struct SessionData { pub result: Option>, } +/// New key share. +struct NewKeyShare { + /// NewKeyShare: threshold. + pub threshold: usize, + /// NewKeyShare: author. + pub author: Public, + /// NewKeyShare: joint public. + pub joint_public: Public, + /// NewKeyShare: Common (shared) encryption point. + pub common_point: Option, + /// NewKeyShare: Encrypted point. + pub encrypted_point: Option, +} + /// Session state. #[derive(Debug, PartialEq)] enum SessionState { @@ -167,11 +173,7 @@ impl SessionImpl where T: SessionTransport { state: SessionState::ConsensusEstablishing, version: None, consensus_session: None, - key_share_threshold: None, - key_share_author: None, - key_share_joint_public: None, - key_share_common_point: None, - key_share_encrypted_point: None, + new_key_share: None, id_numbers: None, secret_subshares: None, result: None, @@ -427,9 +429,7 @@ impl SessionImpl where T: SessionTransport { } // we only expect this message once - if data.key_share_threshold.is_some() || data.key_share_author.is_some() || - data.key_share_common_point.is_some() || data.key_share_encrypted_point.is_some() || - data.key_share_joint_public.is_some() { + if data.new_key_share.is_some() { return Err(Error::InvalidStateForRequest); } @@ -444,11 +444,13 @@ impl SessionImpl where T: SessionTransport { // update data data.state = SessionState::WaitingForKeysDissemination; - data.key_share_threshold = Some(message.threshold); - data.key_share_author = Some(message.author.clone().into()); - data.key_share_joint_public = Some(message.joint_public.clone().into()); - data.key_share_common_point = message.common_point.clone().map(Into::into); - data.key_share_encrypted_point = message.encrypted_point.clone().map(Into::into); + data.new_key_share = Some(NewKeyShare { + threshold: message.threshold, + author: message.author.clone().into(), + joint_public: message.joint_public.clone().into(), + common_point: message.common_point.clone().map(Into::into), + encrypted_point: message.encrypted_point.clone().map(Into::into), + }); let id_numbers = data.id_numbers.as_mut() .expect("common key share data is expected after initialization; id_numers are filled during initialization; qed"); @@ -667,8 +669,9 @@ impl SessionImpl where T: SessionTransport { let id_numbers = data.id_numbers.as_ref().expect(explanation); let secret_subshares = data.secret_subshares.as_ref().expect(explanation); let threshold = core.key_share.as_ref().map(|ks| ks.threshold) - .unwrap_or_else(|| *data.key_share_threshold.as_ref() - .expect("computation occurs after receiving key share threshold if not having one already; qed")); + .unwrap_or_else(|| data.new_key_share.as_ref() + .expect("computation occurs after receiving key share threshold if not having one already; qed") + .threshold); let explanation = "id_numbers are checked to have Some value for every consensus group node when consensus is establishe; qed"; let sender_id_number = id_numbers[sender].as_ref().expect(explanation); @@ -694,16 +697,17 @@ impl SessionImpl where T: SessionTransport { let refreshed_key_version = DocumentKeyShareVersion::new(id_numbers.clone().into_iter().map(|(k, v)| (k.clone(), v.expect("id_numbers are checked to have Some value for every consensus group node when consensus is establishe; qed"))).collect(), secret_share); - let mut refreshed_key_share = core.key_share.as_ref().cloned().unwrap_or_else(|| DocumentKeyShare { - author: data.key_share_author.clone() - .expect("this is new node; on new nodes this field is filled before KRD; session is completed after KRD; qed"), - threshold: data.key_share_threshold.clone() - .expect("this is new node; on new nodes this field is filled before KRD; session is completed after KRD; qed"), - public: data.key_share_joint_public.clone() - .expect("this is new node; on new nodes this field is filled before KRD; session is completed after KRD; qed"), - common_point: data.key_share_common_point.clone(), - encrypted_point: data.key_share_encrypted_point.clone(), - versions: Vec::new(), + let mut refreshed_key_share = core.key_share.as_ref().cloned().unwrap_or_else(|| { + let new_key_share = data.new_key_share.as_ref() + .expect("this is new node; on new nodes this field is filled before KRD; session is completed after KRD; qed"); + DocumentKeyShare { + author: new_key_share.author.clone(), + threshold: new_key_share.threshold, + public: new_key_share.joint_public.clone(), + common_point: new_key_share.common_point.clone(), + encrypted_point: new_key_share.encrypted_point.clone(), + versions: Vec::new(), + } }); refreshed_key_share.versions.push(refreshed_key_version); diff --git a/secret_store/src/key_server_cluster/cluster_sessions.rs b/secret_store/src/key_server_cluster/cluster_sessions.rs index 2e83c407d..cd07045df 100644 --- a/secret_store/src/key_server_cluster/cluster_sessions.rs +++ b/secret_store/src/key_server_cluster/cluster_sessions.rs @@ -294,25 +294,7 @@ impl ClusterSessionsContainer where S: ClusterSession, SC: C queue: VecDeque::new(), }; sessions.insert(session_id, queued_session); - - // notify listeners - let mut listeners = self.listeners.lock(); - let mut listener_index = 0; - loop { - if listener_index >= listeners.len() { - break; - } - - match listeners[listener_index].upgrade() { - Some(listener) => { - listener.on_session_inserted(session.clone()); - listener_index += 1; - }, - None => { - listeners.swap_remove(listener_index); - }, - } - } + self.notify_listeners(|l| l.on_session_inserted(session.clone())); Ok(session) } @@ -320,25 +302,7 @@ impl ClusterSessionsContainer where S: ClusterSession, SC: C pub fn remove(&self, session_id: &S::Id) { if let Some(session) = self.sessions.write().remove(session_id) { self.container_state.lock().on_session_completed(); - - // notify listeners - let mut listeners = self.listeners.lock(); - let mut listener_index = 0; - loop { - if listener_index >= listeners.len() { - break; - } - - match listeners[listener_index].upgrade() { - Some(listener) => { - listener.on_session_removed(session.session.clone()); - listener_index += 1; - }, - None => { - listeners.swap_remove(listener_index); - }, - } - } + self.notify_listeners(|l| l.on_session_removed(session.session.clone())); } } @@ -385,6 +349,22 @@ impl ClusterSessionsContainer where S: ClusterSession, SC: C } } } + + fn notify_listeners) -> ()>(&self, callback: F) { + let mut listeners = self.listeners.lock(); + let mut listener_index = 0; + while listener_index < listeners.len() { + match listeners[listener_index].upgrade() { + Some(listener) => { + callback(&*listener); + listener_index += 1; + }, + None => { + listeners.swap_remove(listener_index); + }, + } + } + } } impl ClusterSessionsContainer where S: ClusterSession, SC: ClusterSessionCreator, SessionId: From { diff --git a/secret_store/src/lib.rs b/secret_store/src/lib.rs index ecb446892..b2139c000 100644 --- a/secret_store/src/lib.rs +++ b/secret_store/src/lib.rs @@ -77,7 +77,7 @@ pub fn start(client: Arc, sync: Arc, self_key_pair: Arc, sync: &Arc, address: ContractAddress, self_key_pair: Arc) -> Self { - let contract_addr = match &address { - &ContractAddress::Registry => client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) + let contract_addr = match address { + ContractAddress::Registry => client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) .map(|address| { trace!(target: "secretstore", "{}: installing service contract from address {}", self_key_pair.public(), address); address }) .unwrap_or_default(), - &ContractAddress::Address(ref address) => { + ContractAddress::Address(ref address) => { trace!(target: "secretstore", "{}: installing service contract from address {}", self_key_pair.public(), address); address.clone() @@ -231,12 +231,10 @@ impl ServiceContract for OnChainServiceContract { )?; // send transaction - if contract.address != Default::default() { - client.transact_contract( - contract.address.clone(), - transaction_data - ).map_err(|e| format!("{}", e))?; - } + client.transact_contract( + contract.address.clone(), + transaction_data + ).map_err(|e| format!("{}", e))?; Ok(()) } diff --git a/secret_store/src/listener/service_contract_listener.rs b/secret_store/src/listener/service_contract_listener.rs index f11c5cce8..d9567125c 100644 --- a/secret_store/src/listener/service_contract_listener.rs +++ b/secret_store/src/listener/service_contract_listener.rs @@ -31,11 +31,11 @@ use key_storage::KeyStorage; use listener::service_contract::ServiceContract; use {ServerKeyId, NodeKeyPair, KeyServer}; -/// Retry interval (in blocks). Every RETRY_INTEVAL_BLOCKS blocks each KeyServer reads pending requests from +/// Retry interval (in blocks). Every RETRY_INTERVAL_BLOCKS blocks each KeyServer reads pending requests from /// service contract && tries to re-execute. The reason to have this mechanism is primarily because keys /// servers set change takes a lot of time + there could be some races, when blocks are coming to different /// KS at different times. This isn't intended to fix && respond to general session errors! -const RETRY_INTEVAL_BLOCKS: usize = 30; +const RETRY_INTERVAL_BLOCKS: usize = 30; /// Max failed retry requests (in single retry interval). The reason behind this constant is that if several /// pending requests have failed, then most probably other will fail too. @@ -255,7 +255,7 @@ impl ServiceContractListener { // only process request, which haven't been processed recently // there could be a lag when we've just generated server key && retrying on the same block // (or before our tx is mined) - state is not updated yet - if retry_data.generated_keys.contains(&server_key_id){ + if retry_data.generated_keys.contains(&server_key_id) { continue; } @@ -343,7 +343,7 @@ impl ChainNotify for ServiceContractListener { // schedule retry if received enough blocks since last retry // it maybe inaccurate when switching syncing/synced states, but that's ok - if self.data.last_retry.fetch_add(enacted_len, Ordering::Relaxed) >= RETRY_INTEVAL_BLOCKS { + if self.data.last_retry.fetch_add(enacted_len, Ordering::Relaxed) >= RETRY_INTERVAL_BLOCKS { self.data.tasks_queue.push(::std::iter::once(ServiceTask::Retry)); self.data.last_retry.store(0, Ordering::Relaxed); } @@ -356,22 +356,17 @@ impl ClusterSessionsListener for ServiceContractListener { // when it is started by this node, it is published from process_service_task if !is_processed_by_this_key_server(&*self.data.key_server_set, &*self.data.self_key_pair, &session.id()) { // by this time sesion must already be completed - either successfully, or not - debug_assert!(session.is_finished()); + assert!(session.is_finished()); // ignore result - the only thing that we can do is to log the error - let _ = session.wait(Some(Default::default())) + match session.wait(Some(Default::default())) .map_err(|e| format!("{}", e)) - .and_then(|server_key| Self::publish_server_key(&self.data, &session.id(), &server_key)) - .map(|_| { - trace!(target: "secretstore", "{}: completed foreign GenerateServerKey({}) request", - self.data.self_key_pair.public(), session.id()); - () - }) - .map_err(|error| { - warn!(target: "secretstore", "{}: failed to process GenerateServerKey({}) request with: {}", - self.data.self_key_pair.public(), session.id(), error); - error - }); + .and_then(|server_key| Self::publish_server_key(&self.data, &session.id(), &server_key)) { + Ok(_) => trace!(target: "secretstore", "{}: completed foreign GenerateServerKey({}) request", + self.data.self_key_pair.public(), session.id()), + Err(error) => warn!(target: "secretstore", "{}: failed to process GenerateServerKey({}) request with: {}", + self.data.self_key_pair.public(), session.id(), error), + } } } } @@ -417,9 +412,12 @@ impl TasksQueue { fn is_processed_by_this_key_server(key_server_set: &KeyServerSet, self_key_pair: &NodeKeyPair, server_key_id: &H256) -> bool { let servers = key_server_set.get(); let total_servers_count = servers.len(); - if total_servers_count == 0 { - return false; + match total_servers_count { + 0 => return false, + 1 => return true, + _ => (), } + let this_server_index = match servers.keys().enumerate().find(|&(_, s)| s == self_key_pair.public()) { Some((index, _)) => index, None => return false, @@ -480,13 +478,24 @@ mod tests { } #[test] - fn is_not_processed_by_this_key_server_when_not_a_part_of_servers_set() { + fn is_processed_by_this_key_server_with_single_server() { + let self_key_pair = Random.generate().unwrap(); assert_eq!(is_processed_by_this_key_server( + &MapKeyServerSet::new(vec![ + (self_key_pair.public().clone(), "127.0.0.1:8080".parse().unwrap()) + ].into_iter().collect()), + &PlainNodeKeyPair::new(self_key_pair), + &Default::default()), true); + } + + #[test] + fn is_not_processed_by_this_key_server_when_not_a_part_of_servers_set() { + assert!(is_processed_by_this_key_server( &MapKeyServerSet::new(vec![ (Random.generate().unwrap().public().clone(), "127.0.0.1:8080".parse().unwrap()) ].into_iter().collect()), &PlainNodeKeyPair::new(Random.generate().unwrap()), - &Default::default()), false); + &Default::default())); } #[test]