From 8579a56f7137c21cd4cbf8fdedd54faa297f7df2 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Mon, 12 Feb 2018 20:05:33 +0300 Subject: [PATCH] SecretStore: 'broadcast' decryption session (#7843) --- .../servers_set_change_session.rs | 2 +- .../client_sessions/decryption_session.rs | 284 +++++++++++++----- .../client_sessions/signing_session.rs | 2 +- .../src/key_server_cluster/cluster.rs | 6 +- .../jobs/consensus_session.rs | 35 +-- .../key_server_cluster/jobs/decryption_job.rs | 20 +- .../key_server_cluster/jobs/job_session.rs | 82 +++-- .../key_server_cluster/jobs/signing_job.rs | 1 + .../src/key_server_cluster/message.rs | 6 + .../src/listener/service_contract_listener.rs | 4 +- 10 files changed, 324 insertions(+), 118 deletions(-) diff --git a/secret_store/src/key_server_cluster/admin_sessions/servers_set_change_session.rs b/secret_store/src/key_server_cluster/admin_sessions/servers_set_change_session.rs index c14e34daa..271649307 100644 --- a/secret_store/src/key_server_cluster/admin_sessions/servers_set_change_session.rs +++ b/secret_store/src/key_server_cluster/admin_sessions/servers_set_change_session.rs @@ -337,7 +337,7 @@ impl SessionImpl { } let unknown_sessions_job = UnknownSessionsJob::new_on_master(self.core.key_storage.clone(), self.core.meta.self_node_id.clone()); - consensus_session.disseminate_jobs(unknown_sessions_job, self.unknown_sessions_transport()) + consensus_session.disseminate_jobs(unknown_sessions_job, self.unknown_sessions_transport(), false) } /// When unknown sessions are requested. diff --git a/secret_store/src/key_server_cluster/client_sessions/decryption_session.rs b/secret_store/src/key_server_cluster/client_sessions/decryption_session.rs index d9277261d..ad1fe2f2e 100644 --- a/secret_store/src/key_server_cluster/client_sessions/decryption_session.rs +++ b/secret_store/src/key_server_cluster/client_sessions/decryption_session.rs @@ -25,7 +25,7 @@ use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSessi use key_server_cluster::message::{Message, DecryptionMessage, DecryptionConsensusMessage, RequestPartialDecryption, PartialDecryption, DecryptionSessionError, DecryptionSessionCompleted, ConsensusMessage, InitializeConsensusSession, ConfirmConsensusInitialization, DecryptionSessionDelegation, DecryptionSessionDelegationCompleted}; -use key_server_cluster::jobs::job_session::JobTransport; +use key_server_cluster::jobs::job_session::{JobSession, JobTransport}; use key_server_cluster::jobs::key_access_job::KeyAccessJob; use key_server_cluster::jobs::decryption_job::{PartialDecryptionRequest, PartialDecryptionResponse, DecryptionJob}; use key_server_cluster::jobs::consensus_session::{ConsensusSessionParams, ConsensusSessionState, ConsensusSession}; @@ -63,6 +63,8 @@ struct SessionCore { /// Decryption consensus session type. type DecryptionConsensusSession = ConsensusSession; +/// Broadcast decryption job session type. +type BroadcastDecryptionJobSession = JobSession; /// Mutable session data. struct SessionData { @@ -70,8 +72,13 @@ struct SessionData { pub version: Option, /// Consensus-based decryption session. pub consensus_session: DecryptionConsensusSession, + /// Broadcast decryption job. + pub broadcast_job_session: Option, /// Is shadow decryption requested? pub is_shadow_decryption: Option, + /// Decryption result must be reconstructed on all participating nodes. This is useful + /// for service contract API so that all nodes from consensus group can confirm decryption. + pub is_broadcast_session: Option, /// Delegation status. pub delegation_status: Option, /// Decryption result. @@ -116,6 +123,10 @@ struct DecryptionJobTransport { access_key: Secret, /// Session-level nonce. nonce: u64, + /// Is this a broadcast transport? If true, requests are not send and responses are sent only to non-master nodes. + is_broadcast_transport: bool, + /// Master node id. + master_node_id: NodeId, /// Cluster. cluster: Arc, } @@ -169,7 +180,9 @@ impl SessionImpl { data: Mutex::new(SessionData { version: None, consensus_session: consensus_session, + broadcast_job_session: None, is_shadow_decryption: None, + is_broadcast_session: None, delegation_status: None, result: None, }), @@ -206,7 +219,7 @@ impl SessionImpl { } /// Delegate session to other node. - pub fn delegate(&self, master: NodeId, version: H256, is_shadow_decryption: bool) -> Result<(), Error> { + pub fn delegate(&self, master: NodeId, version: H256, is_shadow_decryption: bool, is_broadcast_session: bool) -> Result<(), Error> { if self.core.meta.master_node_id != self.core.meta.self_node_id { return Err(Error::InvalidStateForRequest); } @@ -226,13 +239,14 @@ impl SessionImpl { .clone().into(), version: version.into(), is_shadow_decryption: is_shadow_decryption, + is_broadcast_session: is_broadcast_session, })))?; data.delegation_status = Some(DelegationStatus::DelegatedTo(master)); Ok(()) } /// Initialize decryption session on master node. - pub fn initialize(&self, version: H256, is_shadow_decryption: bool) -> Result<(), Error> { + pub fn initialize(&self, version: H256, is_shadow_decryption: bool, is_broadcast_session: bool) -> Result<(), Error> { debug_assert_eq!(self.core.meta.self_node_id, self.core.meta.master_node_id); // check if version exists @@ -255,10 +269,11 @@ impl SessionImpl { data.consensus_session.consensus_job_mut().transport_mut().version = Some(version.clone()); data.version = Some(version.clone()); data.is_shadow_decryption = Some(is_shadow_decryption); + data.is_broadcast_session = Some(is_broadcast_session); data.consensus_session.initialize(consensus_nodes)?; if data.consensus_session.state() == ConsensusSessionState::ConsensusEstablished { - self.core.disseminate_jobs(&mut data.consensus_session, &version, is_shadow_decryption)?; + Self::disseminate_jobs(&self.core, &mut *data, &version, is_shadow_decryption, is_broadcast_session)?; debug_assert!(data.consensus_session.state() == ConsensusSessionState::Finished); let result = data.consensus_session.result()?; @@ -307,7 +322,7 @@ impl SessionImpl { data.delegation_status = Some(DelegationStatus::DelegatedFrom(sender.clone(), message.session_nonce)); } - self.initialize(message.version.clone().into(), message.is_shadow_decryption) + self.initialize(message.version.clone().into(), message.is_shadow_decryption, message.is_broadcast_session) } /// When delegated session is completed on other node. @@ -359,7 +374,9 @@ impl SessionImpl { let version = data.version.as_ref().ok_or(Error::InvalidMessage)?.clone(); let is_shadow_decryption = data.is_shadow_decryption .expect("we are on master node; on master node is_shadow_decryption is filled in initialize(); on_consensus_message follows initialize (state check in consensus_session); qed"); - self.core.disseminate_jobs(&mut data.consensus_session, &version, is_shadow_decryption) + let is_broadcast_session = data.is_broadcast_session + .expect("we are on master node; on master node is_broadcast_session is filled in initialize(); on_consensus_message follows initialize (state check in consensus_session); qed"); + Self::disseminate_jobs(&self.core, &mut *data, &version, is_shadow_decryption, is_broadcast_session) } /// When partial decryption is requested. @@ -378,13 +395,27 @@ impl SessionImpl { .map_err(|e| Error::KeyStorage(e.into()))?.hash.clone(); let requester = data.consensus_session.consensus_job().executor().requester()?.ok_or(Error::InvalidStateForRequest)?.clone(); let decryption_job = DecryptionJob::new_on_slave(self.core.meta.self_node_id.clone(), self.core.access_key.clone(), requester, key_share.clone(), key_version)?; - let decryption_transport = self.core.decryption_transport(); + let decryption_transport = self.core.decryption_transport(false); - data.consensus_session.on_job_request(&sender, PartialDecryptionRequest { + // respond to request + data.consensus_session.on_job_request(sender, PartialDecryptionRequest { id: message.request_id.clone().into(), is_shadow_decryption: message.is_shadow_decryption, + is_broadcast_session: message.is_broadcast_session, other_nodes_ids: message.nodes.iter().cloned().map(Into::into).collect(), - }, decryption_job, decryption_transport) + }, decryption_job, decryption_transport)?; + + // ...and prepare decryption job session if we need to broadcast result + if message.is_broadcast_session { + let consensus_group: BTreeSet<_> = message.nodes.iter().cloned().map(Into::into).collect(); + let broadcast_decryption_job = DecryptionJob::new_on_master(self.core.meta.self_node_id.clone(), + self.core.access_key.clone(), requester, key_share.clone(), key_version, + message.is_shadow_decryption, message.is_broadcast_session)?; + Self::create_broadcast_decryption_job(&self.core, &mut *data, consensus_group, broadcast_decryption_job, + message.request_id.clone().into())?; + } + + Ok(()) } /// When partial decryption is received. @@ -394,11 +425,22 @@ impl SessionImpl { debug_assert!(sender != &self.core.meta.self_node_id); let mut data = self.data.lock(); - data.consensus_session.on_job_response(sender, PartialDecryptionResponse { - request_id: message.request_id.clone().into(), - shadow_point: message.shadow_point.clone().into(), - decrypt_shadow: message.decrypt_shadow.clone(), - })?; + if self.core.meta.self_node_id == self.core.meta.master_node_id { + data.consensus_session.on_job_response(sender, PartialDecryptionResponse { + request_id: message.request_id.clone().into(), + shadow_point: message.shadow_point.clone().into(), + decrypt_shadow: message.decrypt_shadow.clone(), + })?; + } else { + match data.broadcast_job_session.as_mut() { + Some(broadcast_job_session) => broadcast_job_session.on_partial_response(sender, PartialDecryptionResponse { + request_id: message.request_id.clone().into(), + shadow_point: message.shadow_point.clone().into(), + decrypt_shadow: message.decrypt_shadow.clone(), + })?, + None => return Err(Error::TooEarlyForRequest), + } + } if data.consensus_session.state() != ConsensusSessionState::Finished { return Ok(()); @@ -425,7 +467,24 @@ impl SessionImpl { debug_assert!(self.core.access_key == *message.sub_session); debug_assert!(sender != &self.core.meta.self_node_id); - self.data.lock().consensus_session.on_session_completed(sender) + let mut data = self.data.lock(); + + // if it is a broadcast session, wait for all answers before completing the session + let decryption_result = match data.broadcast_job_session.as_ref() { + Some(broadcast_job_session) => { + if !broadcast_job_session.is_result_ready() { + return Err(Error::TooEarlyForRequest); + } + + Some(broadcast_job_session.result()) + }, + None => None, + }; + if let Some(decryption_result) = decryption_result { + Self::set_decryption_result(&self.core, &mut *data, decryption_result); + } + + data.consensus_session.on_session_completed(sender) } /// Process error from the other node. @@ -447,8 +506,10 @@ impl SessionImpl { Ok(false) => Ok(()), Ok(true) => { let version = data.version.as_ref().ok_or(Error::InvalidMessage)?.clone(); - let is_shadow_decryption = data.is_shadow_decryption.expect("on_node_error returned true; this means that jobs must be REsent; this means that jobs already have been sent; jobs are sent when is_shadow_decryption.is_some(); qed"); - let disseminate_result = self.core.disseminate_jobs(&mut data.consensus_session, &version, is_shadow_decryption); + let proof = "on_node_error returned true; this means that jobs must be REsent; this means that jobs already have been sent; jobs are sent when is_shadow_decryption.is_some(); qed"; + let is_shadow_decryption = data.is_shadow_decryption.expect(proof); + let is_broadcast_session = data.is_broadcast_session.expect(proof); + let disseminate_result = Self::disseminate_jobs(&self.core, &mut *data, &version, is_shadow_decryption, is_broadcast_session); match disseminate_result { Ok(()) => Ok(()), Err(err) => { @@ -468,6 +529,52 @@ impl SessionImpl { } } + /// Disseminate jobs on session master. + fn disseminate_jobs(core: &SessionCore, data: &mut SessionData, version: &H256, is_shadow_decryption: bool, is_broadcast_session: bool) -> Result<(), Error> { + let key_share = match core.key_share.as_ref() { + None => return Err(Error::InvalidMessage), + Some(key_share) => key_share, + }; + + let key_version = key_share.version(version).map_err(|e| Error::KeyStorage(e.into()))?.hash.clone(); + let requester = data.consensus_session.consensus_job().executor().requester()?.ok_or(Error::InvalidStateForRequest)?.clone(); + let consensus_group = data.consensus_session.select_consensus_group()?.clone(); + let decryption_job = DecryptionJob::new_on_master(core.meta.self_node_id.clone(), + core.access_key.clone(), requester, key_share.clone(), key_version, + is_shadow_decryption, is_broadcast_session)?; + let decryption_request_id = decryption_job.request_id().clone().expect("TODO"); + let decryption_transport = core.decryption_transport(false); + data.consensus_session.disseminate_jobs(decryption_job, decryption_transport, data.is_broadcast_session.expect("TODO"))?; + + // ...and prepare decryption job session if we need to broadcast result + if data.is_broadcast_session.expect("TODO") { + let broadcast_decryption_job = DecryptionJob::new_on_master(core.meta.self_node_id.clone(), + core.access_key.clone(), requester, key_share.clone(), key_version, is_shadow_decryption, is_broadcast_session)?; + Self::create_broadcast_decryption_job(&core, data, consensus_group, broadcast_decryption_job, + decryption_request_id)?; + } + + Ok(()) + } + + /// Create broadcast decryption job. + fn create_broadcast_decryption_job(core: &SessionCore, data: &mut SessionData, mut consensus_group: BTreeSet, mut job: DecryptionJob, request_id: Secret) -> Result<(), Error> { + consensus_group.insert(core.meta.self_node_id.clone()); + job.set_request_id(request_id.clone().into()); + + let transport = core.decryption_transport(true); + let mut job_session = JobSession::new(SessionMeta { + id: core.meta.id.clone(), + master_node_id: core.meta.self_node_id.clone(), + self_node_id: core.meta.self_node_id.clone(), + threshold: core.meta.threshold, + }, job, transport); + job_session.initialize(consensus_group, core.meta.self_node_id != core.meta.master_node_id)?; + data.broadcast_job_session = Some(job_session); + + Ok(()) + } + /// Set decryption result. fn set_decryption_result(core: &SessionCore, data: &mut SessionData, result: Result) { if let Some(DelegationStatus::DelegatedFrom(master, nonce)) = data.delegation_status.take() { @@ -555,26 +662,16 @@ impl ClusterSession for SessionImpl { } impl SessionCore { - pub fn decryption_transport(&self) -> DecryptionJobTransport { + pub fn decryption_transport(&self, is_broadcast_transport: bool) -> DecryptionJobTransport { DecryptionJobTransport { id: self.meta.id.clone(), access_key: self.access_key.clone(), nonce: self.nonce, - cluster: self.cluster.clone() + is_broadcast_transport: is_broadcast_transport, + master_node_id: self.meta.master_node_id.clone(), + cluster: self.cluster.clone(), } } - - pub fn disseminate_jobs(&self, consensus_session: &mut DecryptionConsensusSession, version: &H256, is_shadow_decryption: bool) -> Result<(), Error> { - let key_share = match self.key_share.as_ref() { - None => return Err(Error::InvalidMessage), - Some(key_share) => key_share, - }; - - let key_version = key_share.version(version).map_err(|e| Error::KeyStorage(e.into()))?.hash.clone(); - let requester = consensus_session.consensus_job().executor().requester()?.ok_or(Error::InvalidStateForRequest)?.clone(); - let decryption_job = DecryptionJob::new_on_master(self.meta.self_node_id.clone(), self.access_key.clone(), requester, key_share.clone(), key_version, is_shadow_decryption)?; - consensus_session.disseminate_jobs(decryption_job, self.decryption_transport()) - } } impl JobTransport for DecryptionConsensusTransport { @@ -612,32 +709,41 @@ impl JobTransport for DecryptionJobTransport { type PartialJobResponse=PartialDecryptionResponse; fn send_partial_request(&self, node: &NodeId, request: PartialDecryptionRequest) -> Result<(), Error> { - self.cluster.send(node, Message::Decryption(DecryptionMessage::RequestPartialDecryption(RequestPartialDecryption { - session: self.id.clone().into(), - sub_session: self.access_key.clone().into(), - session_nonce: self.nonce, - request_id: request.id.into(), - is_shadow_decryption: request.is_shadow_decryption, - nodes: request.other_nodes_ids.into_iter().map(Into::into).collect(), - }))) + if !self.is_broadcast_transport { + self.cluster.send(node, Message::Decryption(DecryptionMessage::RequestPartialDecryption(RequestPartialDecryption { + session: self.id.clone().into(), + sub_session: self.access_key.clone().into(), + session_nonce: self.nonce, + request_id: request.id.into(), + is_shadow_decryption: request.is_shadow_decryption, + is_broadcast_session: request.is_broadcast_session, + nodes: request.other_nodes_ids.into_iter().map(Into::into).collect(), + })))?; + } + + Ok(()) } fn send_partial_response(&self, node: &NodeId, response: PartialDecryptionResponse) -> Result<(), Error> { - self.cluster.send(node, Message::Decryption(DecryptionMessage::PartialDecryption(PartialDecryption { - session: self.id.clone().into(), - sub_session: self.access_key.clone().into(), - session_nonce: self.nonce, - request_id: response.request_id.into(), - shadow_point: response.shadow_point.into(), - decrypt_shadow: response.decrypt_shadow, - }))) + if !self.is_broadcast_transport || *node != self.master_node_id { + self.cluster.send(node, Message::Decryption(DecryptionMessage::PartialDecryption(PartialDecryption { + session: self.id.clone().into(), + sub_session: self.access_key.clone().into(), + session_nonce: self.nonce, + request_id: response.request_id.into(), + shadow_point: response.shadow_point.into(), + decrypt_shadow: response.decrypt_shadow, + })))?; + } + + Ok(()) } } #[cfg(test)] mod tests { use std::sync::Arc; - use std::collections::BTreeMap; + use std::collections::{BTreeMap, VecDeque}; use acl_storage::DummyAclStorage; use ethkey::{self, KeyPair, Random, Generator, Public, Secret}; use key_server_cluster::{NodeId, DocumentKeyShare, DocumentKeyShareVersion, SessionId, Error, EncryptedDocumentKeyShadow, SessionMeta}; @@ -719,15 +825,36 @@ mod tests { } fn do_messages_exchange_until(clusters: &[Arc], sessions: &[SessionImpl], mut cond: F) -> Result<(), Error> where F: FnMut(&NodeId, &NodeId, &Message) -> bool { - while let Some((from, to, message)) = clusters.iter().filter_map(|c| c.take_message().map(|(to, msg)| (c.node(), to, msg))).next() { - let session = &sessions[sessions.iter().position(|s| s.node() == &to).unwrap()]; + let mut queue: VecDeque<(NodeId, NodeId, Message)> = VecDeque::new(); + while let Some((mut from, mut to, mut message)) = clusters.iter().filter_map(|c| c.take_message().map(|(to, msg)| (c.node(), to, msg))).next() { if cond(&from, &to, &message) { break; } - match message { - Message::Decryption(message) => session.process_message(&from, &message)?, - _ => unreachable!(), + let mut is_queued_message = false; + loop { + let session = &sessions[sessions.iter().position(|s| s.node() == &to).unwrap()]; + match session.on_message(&from, &message) { + Ok(_) => { + if let Some(qmessage) = queue.pop_front() { + from = qmessage.0; + to = qmessage.1; + message = qmessage.2; + is_queued_message = true; + continue; + } + break; + }, + Err(Error::TooEarlyForRequest) => { + if is_queued_message { + queue.push_front((from, to, message)); + } else { + queue.push_back((from, to, message)); + } + break; + }, + Err(err) => return Err(err), + } } } @@ -784,7 +911,7 @@ mod tests { cluster: Arc::new(DummyCluster::new(self_node_id.clone())), nonce: 0, }, Some(ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap())).unwrap(); - assert_eq!(session.initialize(Default::default(), false), Err(Error::InvalidMessage)); + assert_eq!(session.initialize(Default::default(), false, false), Err(Error::InvalidMessage)); } #[test] @@ -817,20 +944,20 @@ mod tests { cluster: Arc::new(DummyCluster::new(self_node_id.clone())), nonce: 0, }, Some(ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap())).unwrap(); - assert_eq!(session.initialize(Default::default(), false), Err(Error::ConsensusUnreachable)); + assert_eq!(session.initialize(Default::default(), false, false), Err(Error::ConsensusUnreachable)); } #[test] fn fails_to_initialize_when_already_initialized() { let (_, _, _, sessions) = prepare_decryption_sessions(); - assert_eq!(sessions[0].initialize(Default::default(), false).unwrap(), ()); - assert_eq!(sessions[0].initialize(Default::default(), false).unwrap_err(), Error::InvalidStateForRequest); + assert_eq!(sessions[0].initialize(Default::default(), false, false).unwrap(), ()); + assert_eq!(sessions[0].initialize(Default::default(), false, false).unwrap_err(), Error::InvalidStateForRequest); } #[test] fn fails_to_accept_initialization_when_already_initialized() { let (_, _, _, sessions) = prepare_decryption_sessions(); - assert_eq!(sessions[0].initialize(Default::default(), false).unwrap(), ()); + assert_eq!(sessions[0].initialize(Default::default(), false, false).unwrap(), ()); assert_eq!(sessions[0].on_consensus_message(sessions[1].node(), &message::DecryptionConsensusMessage { session: SessionId::default().into(), sub_session: sessions[0].access_key().clone().into(), @@ -860,6 +987,7 @@ mod tests { session_nonce: 0, request_id: Random.generate().unwrap().secret().clone().into(), is_shadow_decryption: false, + is_broadcast_session: false, nodes: sessions.iter().map(|s| s.node().clone().into()).take(4).collect(), }).unwrap_err(), Error::InvalidMessage); } @@ -882,6 +1010,7 @@ mod tests { session_nonce: 0, request_id: Random.generate().unwrap().secret().clone().into(), is_shadow_decryption: false, + is_broadcast_session: false, nodes: sessions.iter().map(|s| s.node().clone().into()).take(2).collect(), }).unwrap_err(), Error::InvalidMessage); } @@ -902,7 +1031,7 @@ mod tests { #[test] fn fails_to_accept_partial_decrypt_twice() { let (_, clusters, _, sessions) = prepare_decryption_sessions(); - sessions[0].initialize(Default::default(), false).unwrap(); + sessions[0].initialize(Default::default(), false, false).unwrap(); let mut pd_from = None; let mut pd_msg = None; @@ -930,7 +1059,7 @@ mod tests { #[test] fn node_is_marked_rejected_when_timed_out_during_initialization_confirmation() { let (_, _, _, sessions) = prepare_decryption_sessions(); - sessions[0].initialize(Default::default(), false).unwrap(); + sessions[0].initialize(Default::default(), false, false).unwrap(); // 1 node disconnects => we still can recover secret sessions[0].on_node_timeout(sessions[1].node()); @@ -948,7 +1077,7 @@ mod tests { let key_pair = Random.generate().unwrap(); acl_storages[1].prohibit(key_pair.public().clone(), SessionId::default()); - sessions[0].initialize(Default::default(), false).unwrap(); + sessions[0].initialize(Default::default(), false, false).unwrap(); do_messages_exchange_until(&clusters, &sessions, |_, _, _| sessions[0].state() == ConsensusSessionState::WaitingForPartialResults).unwrap(); @@ -960,7 +1089,7 @@ mod tests { #[test] fn session_does_not_fail_if_requested_node_disconnects() { let (_, clusters, _, sessions) = prepare_decryption_sessions(); - sessions[0].initialize(Default::default(), false).unwrap(); + sessions[0].initialize(Default::default(), false, false).unwrap(); do_messages_exchange_until(&clusters, &sessions, |_, _, _| sessions[0].state() == ConsensusSessionState::WaitingForPartialResults).unwrap(); @@ -976,7 +1105,7 @@ mod tests { #[test] fn session_does_not_fail_if_node_with_shadow_point_disconnects() { let (_, clusters, _, sessions) = prepare_decryption_sessions(); - sessions[0].initialize(Default::default(), false).unwrap(); + sessions[0].initialize(Default::default(), false, false).unwrap(); do_messages_exchange_until(&clusters, &sessions, |_, _, _| sessions[0].state() == ConsensusSessionState::WaitingForPartialResults && sessions[0].data.lock().consensus_session.computation_job().responses().len() == 2).unwrap(); @@ -993,7 +1122,7 @@ mod tests { #[test] fn session_restarts_if_confirmed_node_disconnects() { let (_, clusters, _, sessions) = prepare_decryption_sessions(); - sessions[0].initialize(Default::default(), false).unwrap(); + sessions[0].initialize(Default::default(), false, false).unwrap(); do_messages_exchange_until(&clusters, &sessions, |_, _, _| sessions[0].state() == ConsensusSessionState::WaitingForPartialResults).unwrap(); @@ -1008,7 +1137,7 @@ mod tests { #[test] fn session_does_not_fail_if_non_master_node_disconnects_from_non_master_node() { let (_, clusters, _, sessions) = prepare_decryption_sessions(); - sessions[0].initialize(Default::default(), false).unwrap(); + sessions[0].initialize(Default::default(), false, false).unwrap(); do_messages_exchange_until(&clusters, &sessions, |_, _, _| sessions[0].state() == ConsensusSessionState::WaitingForPartialResults).unwrap(); @@ -1023,7 +1152,7 @@ mod tests { let (_, clusters, _, sessions) = prepare_decryption_sessions(); // now let's try to do a decryption - sessions[0].initialize(Default::default(), false).unwrap(); + sessions[0].initialize(Default::default(), false, false).unwrap(); do_messages_exchange(&clusters, &sessions).unwrap(); @@ -1045,7 +1174,7 @@ mod tests { let (key_pair, clusters, _, sessions) = prepare_decryption_sessions(); // now let's try to do a decryption - sessions[0].initialize(Default::default(), true).unwrap(); + sessions[0].initialize(Default::default(), true, false).unwrap(); do_messages_exchange(&clusters, &sessions).unwrap(); @@ -1076,7 +1205,7 @@ mod tests { let (key_pair, clusters, acl_storages, sessions) = prepare_decryption_sessions(); // now let's try to do a decryption - sessions[0].initialize(Default::default(), false).unwrap(); + sessions[0].initialize(Default::default(), false, false).unwrap(); // we need 4 out of 5 nodes to agree to do a decryption // let's say that 2 of these nodes are disagree @@ -1099,7 +1228,7 @@ mod tests { acl_storages[0].prohibit(key_pair.public().clone(), SessionId::default()); // now let's try to do a decryption - sessions[0].initialize(Default::default(), false).unwrap(); + sessions[0].initialize(Default::default(), false, false).unwrap(); do_messages_exchange(&clusters, &sessions).unwrap(); @@ -1139,7 +1268,7 @@ mod tests { ); // now let's try to do a decryption - sessions[1].delegate(sessions[0].core.meta.self_node_id.clone(), Default::default(), false).unwrap(); + sessions[1].delegate(sessions[0].core.meta.self_node_id.clone(), Default::default(), false, false).unwrap(); do_messages_exchange(&clusters, &sessions).unwrap(); // now check that: @@ -1165,7 +1294,7 @@ mod tests { } // now let's try to do a decryption - sessions[0].initialize(Default::default(), false).unwrap(); + sessions[0].initialize(Default::default(), false, false).unwrap(); do_messages_exchange(&clusters, &sessions).unwrap(); assert_eq!(sessions[0].decrypted_secret().unwrap().unwrap(), EncryptedDocumentKeyShadow { @@ -1174,4 +1303,17 @@ mod tests { decrypt_shadows: None, }); } + + #[test] + fn decryption_result_restored_on_all_nodes_if_broadcast_session_is_completed() { + let (_, clusters, _, sessions) = prepare_decryption_sessions(); + sessions[0].initialize(Default::default(), false, true).unwrap(); + do_messages_exchange(&clusters, &sessions).unwrap(); + + // decryption result must be the same and available on 4 nodes + let result = sessions[0].decrypted_secret(); + assert!(result.clone().unwrap().is_ok()); + assert_eq!(3, sessions.iter().skip(1).filter(|s| s.decrypted_secret() == result).count()); + assert_eq!(1, sessions.iter().skip(1).filter(|s| s.decrypted_secret().is_none()).count()); + } } diff --git a/secret_store/src/key_server_cluster/client_sessions/signing_session.rs b/secret_store/src/key_server_cluster/client_sessions/signing_session.rs index fe8ecf2fa..4c5676fc1 100644 --- a/secret_store/src/key_server_cluster/client_sessions/signing_session.rs +++ b/secret_store/src/key_server_cluster/client_sessions/signing_session.rs @@ -734,7 +734,7 @@ impl SessionCore { let key_version = key_share.version(version).map_err(|e| Error::KeyStorage(e.into()))?.hash.clone(); let signing_job = SigningJob::new_on_master(self.meta.self_node_id.clone(), key_share.clone(), key_version, session_public, session_secret_share, message_hash)?; - consensus_session.disseminate_jobs(signing_job, self.signing_transport()) + consensus_session.disseminate_jobs(signing_job, self.signing_transport(), false) } } diff --git a/secret_store/src/key_server_cluster/cluster.rs b/secret_store/src/key_server_cluster/cluster.rs index 9312ad58e..d5f4b776b 100644 --- a/secret_store/src/key_server_cluster/cluster.rs +++ b/secret_store/src/key_server_cluster/cluster.rs @@ -474,9 +474,9 @@ impl ClusterCore { Ok((version, master)) => match session.continue_action() { Some(ContinueAction::Decrypt(session, is_shadow_decryption)) => { let initialization_error = if data.self_key_pair.public() == &master { - session.initialize(version, is_shadow_decryption) + session.initialize(version, is_shadow_decryption, false) } else { - session.delegate(master, version, is_shadow_decryption) + session.delegate(master, version, is_shadow_decryption, false) }; if let Err(error) = initialization_error { @@ -920,7 +920,7 @@ impl ClusterClient for ClusterClientImpl { let session = self.data.sessions.decryption_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id.clone(), None, false, Some(requestor_signature))?; let initialization_result = match version { - Some(version) => session.initialize(version, is_shadow_decryption), + Some(version) => session.initialize(version, is_shadow_decryption, false), None => { self.create_key_version_negotiation_session(session_id.id.clone()) .map(|version_session| { diff --git a/secret_store/src/key_server_cluster/jobs/consensus_session.rs b/secret_store/src/key_server_cluster/jobs/consensus_session.rs index 88af7689e..d57b57e5f 100644 --- a/secret_store/src/key_server_cluster/jobs/consensus_session.rs +++ b/secret_store/src/key_server_cluster/jobs/consensus_session.rs @@ -141,7 +141,7 @@ impl) -> Result<(), Error> { debug_assert!(self.meta.self_node_id == self.meta.master_node_id); - let initialization_result = self.consensus_job.initialize(nodes); + let initialization_result = self.consensus_job.initialize(nodes, false); self.state = ConsensusSessionState::EstablishingConsensus; self.process_result(initialization_result) } @@ -180,12 +180,12 @@ impl Result<(), Error> { + pub fn disseminate_jobs(&mut self, executor: ComputationExecutor, transport: ComputationTransport, broadcast_self_response: bool) -> Result<(), Error> { let consensus_group = self.select_consensus_group()?.clone(); self.consensus_group.clear(); let mut computation_job = JobSession::new(self.meta.clone(), executor, transport); - let computation_result = computation_job.initialize(consensus_group); + let computation_result = computation_job.initialize(consensus_group, broadcast_self_response); self.computation_job = Some(computation_job); self.state = ConsensusSessionState::WaitingForPartialResults; self.process_result(computation_result) @@ -212,6 +212,7 @@ impl, /// Is shadow decryption requested. is_shadow_decryption: Option, + /// Is broadcast decryption requested. + is_broadcast_session: Option, } /// Decryption job partial request. @@ -47,11 +49,14 @@ pub struct PartialDecryptionRequest { pub id: Secret, /// Is shadow decryption requested. pub is_shadow_decryption: bool, + /// Is broadcast decryption requested. + pub is_broadcast_session: bool, /// Id of other nodes, participating in decryption. pub other_nodes_ids: BTreeSet, } /// Decryption job partial response. +#[derive(Clone)] pub struct PartialDecryptionResponse { /// Request id. pub request_id: Secret, @@ -72,10 +77,11 @@ impl DecryptionJob { key_version: key_version, request_id: None, is_shadow_decryption: None, + is_broadcast_session: None, }) } - pub fn new_on_master(self_node_id: NodeId, access_key: Secret, requester: Public, key_share: DocumentKeyShare, key_version: H256, is_shadow_decryption: bool) -> Result { + pub fn new_on_master(self_node_id: NodeId, access_key: Secret, requester: Public, key_share: DocumentKeyShare, key_version: H256, is_shadow_decryption: bool, is_broadcast_session: bool) -> Result { debug_assert!(key_share.common_point.is_some() && key_share.encrypted_point.is_some()); Ok(DecryptionJob { self_node_id: self_node_id, @@ -85,8 +91,17 @@ impl DecryptionJob { key_version: key_version, request_id: Some(math::generate_random_scalar()?), is_shadow_decryption: Some(is_shadow_decryption), + is_broadcast_session: Some(is_broadcast_session), }) } + + pub fn request_id(&self) -> &Option { + &self.request_id + } + + pub fn set_request_id(&mut self, request_id: Secret) { + self.request_id = Some(request_id); + } } impl JobExecutor for DecryptionJob { @@ -101,12 +116,15 @@ impl JobExecutor for DecryptionJob { .expect("prepare_partial_request is only called on master nodes; request_id is filed in constructor on master nodes; qed"); let is_shadow_decryption = self.is_shadow_decryption .expect("prepare_partial_request is only called on master nodes; is_shadow_decryption is filed in constructor on master nodes; qed"); + let is_broadcast_session = self.is_broadcast_session + .expect("prepare_partial_request is only called on master nodes; is_broadcast_session is filed in constructor on master nodes; qed"); let mut other_nodes_ids = nodes.clone(); other_nodes_ids.remove(node); Ok(PartialDecryptionRequest { id: request_id.clone(), is_shadow_decryption: is_shadow_decryption, + is_broadcast_session: is_broadcast_session, other_nodes_ids: other_nodes_ids, }) } diff --git a/secret_store/src/key_server_cluster/jobs/job_session.rs b/secret_store/src/key_server_cluster/jobs/job_session.rs index bfd520785..0299fdb14 100644 --- a/secret_store/src/key_server_cluster/jobs/job_session.rs +++ b/secret_store/src/key_server_cluster/jobs/job_session.rs @@ -40,7 +40,7 @@ pub enum JobPartialRequestAction { /// Job executor. pub trait JobExecutor { type PartialJobRequest; - type PartialJobResponse; + type PartialJobResponse: Clone; type JobResponse; /// Prepare job request for given node. @@ -175,6 +175,14 @@ impl JobSession where Executor: JobExe .responses } + /// Returns true if enough responses are ready to compute result. + pub fn is_result_ready(&self) -> bool { + debug_assert!(self.meta.self_node_id == self.meta.master_node_id); + self.data.active_data.as_ref() + .expect("is_result_ready is only called on master nodes after initialization; on master nodes active_data is filled during initialization; qed") + .responses.len() >= self.meta.threshold + 1 + } + /// Get job result. pub fn result(&self) -> Result { debug_assert!(self.meta.self_node_id == self.meta.master_node_id); @@ -189,7 +197,7 @@ impl JobSession where Executor: JobExe } /// Initialize. - pub fn initialize(&mut self, nodes: BTreeSet) -> Result<(), Error> { + pub fn initialize(&mut self, nodes: BTreeSet, broadcast_self_response: bool) -> Result<(), Error> { debug_assert!(self.meta.self_node_id == self.meta.master_node_id); if nodes.len() < self.meta.threshold + 1 { @@ -213,25 +221,32 @@ impl JobSession where Executor: JobExe } else { None }; + let self_response = match self_response { + Some(JobPartialRequestAction::Respond(self_response)) => Some(self_response), + Some(JobPartialRequestAction::Reject(self_response)) => Some(self_response), + None => None, + }; // update state self.data.active_data = Some(active_data); self.data.state = JobSessionState::Active; // if we are waiting for response from self => do it - if let Some(self_response) = self_response { + if let Some(self_response) = self_response.clone() { let self_node_id = self.meta.self_node_id.clone(); - match self_response { - JobPartialRequestAction::Respond(self_response) => self.on_partial_response(&self_node_id, self_response)?, - JobPartialRequestAction::Reject(self_response) => self.on_partial_response(&self_node_id, self_response)?, - } + self.on_partial_response(&self_node_id, self_response)?; } // send requests to save nodes. we only send requests if session is still active. - if self.data.state == JobSessionState::Active { - for node in nodes.iter().filter(|n| **n != self.meta.self_node_id) { + for node in nodes.iter().filter(|n| **n != self.meta.self_node_id) { + if self.data.state == JobSessionState::Active { self.transport.send_partial_request(node, self.executor.prepare_partial_request(node, &nodes)?)?; } + if broadcast_self_response { + if let Some(self_response) = self_response.clone() { + self.transport.send_partial_response(node, self_response)?; + } + } } Ok(()) @@ -372,6 +387,10 @@ pub mod tests { } impl DummyJobTransport { + pub fn is_empty_response(&self) -> bool { + self.responses.lock().is_empty() + } + pub fn response(&self) -> (NodeId, U) { self.responses.lock().pop_front().unwrap() } @@ -396,22 +415,23 @@ pub mod tests { #[test] fn job_initialize_fails_if_not_inactive() { let mut job = JobSession::new(make_master_session_meta(0), SquaredSumJobExecutor, DummyJobTransport::default()); - job.initialize(vec![Public::from(1)].into_iter().collect()).unwrap(); - assert_eq!(job.initialize(vec![Public::from(1)].into_iter().collect()).unwrap_err(), Error::InvalidStateForRequest); + job.initialize(vec![Public::from(1)].into_iter().collect(), false).unwrap(); + assert_eq!(job.initialize(vec![Public::from(1)].into_iter().collect(), false).unwrap_err(), Error::InvalidStateForRequest); } #[test] fn job_initialization_leads_to_finish_if_single_node_is_required() { let mut job = JobSession::new(make_master_session_meta(0), SquaredSumJobExecutor, DummyJobTransport::default()); - job.initialize(vec![Public::from(1)].into_iter().collect()).unwrap(); + job.initialize(vec![Public::from(1)].into_iter().collect(), false).unwrap(); assert_eq!(job.state(), JobSessionState::Finished); + assert!(job.is_result_ready()); assert_eq!(job.result(), Ok(4)); } #[test] fn job_initialization_does_not_leads_to_finish_if_single_other_node_is_required() { let mut job = JobSession::new(make_master_session_meta(0), SquaredSumJobExecutor, DummyJobTransport::default()); - job.initialize(vec![Public::from(2)].into_iter().collect()).unwrap(); + job.initialize(vec![Public::from(2)].into_iter().collect(), false).unwrap(); assert_eq!(job.state(), JobSessionState::Active); } @@ -454,7 +474,7 @@ pub mod tests { #[test] fn job_response_fails_if_comes_to_failed_state() { let mut job = JobSession::new(make_master_session_meta(0), SquaredSumJobExecutor, DummyJobTransport::default()); - job.initialize(vec![Public::from(2)].into_iter().collect()).unwrap(); + job.initialize(vec![Public::from(2)].into_iter().collect(), false).unwrap(); job.on_session_timeout().unwrap_err(); assert_eq!(job.on_partial_response(&NodeId::from(2), 2).unwrap_err(), Error::InvalidStateForRequest); } @@ -462,14 +482,14 @@ pub mod tests { #[test] fn job_response_fails_if_comes_from_unknown_node() { let mut job = JobSession::new(make_master_session_meta(0), SquaredSumJobExecutor, DummyJobTransport::default()); - job.initialize(vec![Public::from(2)].into_iter().collect()).unwrap(); + job.initialize(vec![Public::from(2)].into_iter().collect(), false).unwrap(); assert_eq!(job.on_partial_response(&NodeId::from(3), 2).unwrap_err(), Error::InvalidNodeForRequest); } #[test] fn job_response_leads_to_failure_if_too_few_nodes_left() { let mut job = JobSession::new(make_master_session_meta(1), SquaredSumJobExecutor, DummyJobTransport::default()); - job.initialize(vec![Public::from(1), Public::from(2)].into_iter().collect()).unwrap(); + job.initialize(vec![Public::from(1), Public::from(2)].into_iter().collect(), false).unwrap(); assert_eq!(job.state(), JobSessionState::Active); assert_eq!(job.on_partial_response(&NodeId::from(2), 3).unwrap_err(), Error::ConsensusUnreachable); assert_eq!(job.state(), JobSessionState::Failed); @@ -478,16 +498,18 @@ pub mod tests { #[test] fn job_response_succeeds() { let mut job = JobSession::new(make_master_session_meta(2), SquaredSumJobExecutor, DummyJobTransport::default()); - job.initialize(vec![Public::from(1), Public::from(2), Public::from(3)].into_iter().collect()).unwrap(); + job.initialize(vec![Public::from(1), Public::from(2), Public::from(3)].into_iter().collect(), false).unwrap(); assert_eq!(job.state(), JobSessionState::Active); + assert!(!job.is_result_ready()); job.on_partial_response(&NodeId::from(2), 2).unwrap(); assert_eq!(job.state(), JobSessionState::Active); + assert!(!job.is_result_ready()); } #[test] fn job_response_leads_to_finish() { let mut job = JobSession::new(make_master_session_meta(1), SquaredSumJobExecutor, DummyJobTransport::default()); - job.initialize(vec![Public::from(1), Public::from(2)].into_iter().collect()).unwrap(); + job.initialize(vec![Public::from(1), Public::from(2)].into_iter().collect(), false).unwrap(); assert_eq!(job.state(), JobSessionState::Active); job.on_partial_response(&NodeId::from(2), 2).unwrap(); assert_eq!(job.state(), JobSessionState::Finished); @@ -512,7 +534,7 @@ pub mod tests { #[test] fn job_node_error_ignored_when_disconnects_from_rejected() { let mut job = JobSession::new(make_master_session_meta(1), SquaredSumJobExecutor, DummyJobTransport::default()); - job.initialize(vec![Public::from(1), Public::from(2), Public::from(3)].into_iter().collect()).unwrap(); + job.initialize(vec![Public::from(1), Public::from(2), Public::from(3)].into_iter().collect(), false).unwrap(); assert_eq!(job.state(), JobSessionState::Active); job.on_partial_response(&NodeId::from(2), 3).unwrap(); job.on_node_error(&NodeId::from(2)).unwrap(); @@ -522,7 +544,7 @@ pub mod tests { #[test] fn job_node_error_ignored_when_disconnects_from_unknown() { let mut job = JobSession::new(make_master_session_meta(1), SquaredSumJobExecutor, DummyJobTransport::default()); - job.initialize(vec![Public::from(1), Public::from(2)].into_iter().collect()).unwrap(); + job.initialize(vec![Public::from(1), Public::from(2)].into_iter().collect(), false).unwrap(); assert_eq!(job.state(), JobSessionState::Active); job.on_node_error(&NodeId::from(3)).unwrap(); assert_eq!(job.state(), JobSessionState::Active); @@ -531,7 +553,7 @@ pub mod tests { #[test] fn job_node_error_ignored_when_disconnects_from_requested_and_enough_nodes_left() { let mut job = JobSession::new(make_master_session_meta(1), SquaredSumJobExecutor, DummyJobTransport::default()); - job.initialize(vec![Public::from(1), Public::from(2), Public::from(3)].into_iter().collect()).unwrap(); + job.initialize(vec![Public::from(1), Public::from(2), Public::from(3)].into_iter().collect(), false).unwrap(); assert_eq!(job.state(), JobSessionState::Active); job.on_node_error(&NodeId::from(3)).unwrap(); assert_eq!(job.state(), JobSessionState::Active); @@ -540,9 +562,25 @@ pub mod tests { #[test] fn job_node_error_leads_to_fail_when_disconnects_from_requested_and_not_enough_nodes_left() { let mut job = JobSession::new(make_master_session_meta(1), SquaredSumJobExecutor, DummyJobTransport::default()); - job.initialize(vec![Public::from(1), Public::from(2)].into_iter().collect()).unwrap(); + job.initialize(vec![Public::from(1), Public::from(2)].into_iter().collect(), false).unwrap(); assert_eq!(job.state(), JobSessionState::Active); assert_eq!(job.on_node_error(&NodeId::from(2)).unwrap_err(), Error::ConsensusUnreachable); assert_eq!(job.state(), JobSessionState::Failed); } + + #[test] + fn job_broadcasts_self_response() { + let mut job = JobSession::new(make_master_session_meta(1), SquaredSumJobExecutor, DummyJobTransport::default()); + job.initialize(vec![Public::from(1), Public::from(2)].into_iter().collect(), true).unwrap(); + assert_eq!(job.state(), JobSessionState::Active); + assert_eq!(job.transport().response(), (NodeId::from(2), 4)); + } + + #[test] + fn job_does_not_broadcasts_self_response() { + let mut job = JobSession::new(make_master_session_meta(1), SquaredSumJobExecutor, DummyJobTransport::default()); + job.initialize(vec![Public::from(1), Public::from(2)].into_iter().collect(), false).unwrap(); + assert_eq!(job.state(), JobSessionState::Active); + assert!(job.transport().is_empty_response()); + } } diff --git a/secret_store/src/key_server_cluster/jobs/signing_job.rs b/secret_store/src/key_server_cluster/jobs/signing_job.rs index cf9bd533d..fc6903196 100644 --- a/secret_store/src/key_server_cluster/jobs/signing_job.rs +++ b/secret_store/src/key_server_cluster/jobs/signing_job.rs @@ -50,6 +50,7 @@ pub struct PartialSigningRequest { } /// Signing job partial response. +#[derive(Clone)] pub struct PartialSigningResponse { /// Request id. pub request_id: Secret, diff --git a/secret_store/src/key_server_cluster/message.rs b/secret_store/src/key_server_cluster/message.rs index 073a6460e..de435ab95 100644 --- a/secret_store/src/key_server_cluster/message.rs +++ b/secret_store/src/key_server_cluster/message.rs @@ -548,6 +548,9 @@ pub struct RequestPartialDecryption { /// Is shadow decryption requested? When true, decryption result /// will be visible to the owner of requestor public key only. pub is_shadow_decryption: bool, + /// Decryption result must be reconstructed on all participating nodes. This is useful + /// for service contract API so that all nodes from consensus group can confirm decryption. + pub is_broadcast_session: bool, /// Nodes that are agreed to do a decryption. pub nodes: BTreeSet, } @@ -609,6 +612,9 @@ pub struct DecryptionSessionDelegation { /// Is shadow decryption requested? When true, decryption result /// will be visible to the owner of requestor public key only. pub is_shadow_decryption: bool, + /// Decryption result must be reconstructed on all participating nodes. This is useful + /// for service contract API so that all nodes from consensus group can confirm decryption. + pub is_broadcast_session: bool, } /// When delegated decryption session is completed. diff --git a/secret_store/src/listener/service_contract_listener.rs b/secret_store/src/listener/service_contract_listener.rs index 5686c677c..0d04a7dae 100644 --- a/secret_store/src/listener/service_contract_listener.rs +++ b/secret_store/src/listener/service_contract_listener.rs @@ -583,7 +583,7 @@ mod tests { } #[test] - fn server_key_generation_is_scheduled_when_requested_key_is_unknnown() { + fn server_key_generation_is_scheduled_when_requested_key_is_unknown() { let mut contract = DummyServiceContract::default(); contract.logs.push(vec![Default::default(), Default::default(), Default::default()]); let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None); @@ -605,7 +605,7 @@ mod tests { } #[test] - fn server_key_restore_is_scheduled_when_requested_key_is_knnown() { + fn server_key_restore_is_scheduled_when_requested_key_is_known() { let mut contract = DummyServiceContract::default(); contract.logs.push(vec![Default::default(), Default::default(), Default::default()]); let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None);