SecretStore: merge two types of errors into single one + Error::is_non_fatal (#8357)

* SecretStore: error unify initial commit

SecretStore: pass real error in error messages

SecretStore: is_internal_error -> Error::is_non_fatal

warnings

SecretStore: ConsensusTemporaryUnreachable

fix after merge

removed comments

removed comments

SecretStore: updated HTTP error responses

SecretStore: more ConsensusTemporaryUnreachable tests

fix after rebase

* fixed grumbles

* use HashSet in tests
This commit is contained in:
Svyatoslav Nikolsky 2018-05-01 16:02:14 +03:00 committed by Afri Schoedon
parent 849f5d9a44
commit d1f5284fe6
34 changed files with 577 additions and 440 deletions

View File

@ -22,7 +22,7 @@ use ethcore::client::{BlockId, ChainNotify, CallContract, RegistryInfo};
use ethereum_types::{H256, Address};
use bytes::Bytes;
use trusted_client::TrustedClient;
use types::all::{Error, ServerKeyId};
use types::{Error, ServerKeyId};
use_contract!(acl_storage, "AclStorage", "res/acl_storage.json");
@ -113,7 +113,7 @@ impl CachedContract {
self.contract.functions()
.check_permissions()
.call(requester, document.clone(), &do_call)
.map_err(|e| Error::Internal(e.to_string()))
.map_err(|e| Error::Internal(format!("ACL checker call error: {}", e.to_string())))
},
None => Err(Error::Internal("ACL checker contract is not configured".to_owned())),
}

View File

@ -27,7 +27,7 @@ use super::key_storage::KeyStorage;
use super::key_server_set::KeyServerSet;
use key_server_cluster::{math, ClusterCore};
use traits::{AdminSessionsServer, ServerKeyGenerator, DocumentKeyServer, MessageSigner, KeyServer, NodeKeyPair};
use types::all::{Error, Public, RequestSignature, Requester, ServerKeyId, EncryptedDocumentKey, EncryptedDocumentKeyShadow,
use types::{Error, Public, RequestSignature, Requester, ServerKeyId, EncryptedDocumentKey, EncryptedDocumentKeyShadow,
ClusterConfiguration, MessageHash, EncryptedMessageSignature, NodeId};
use key_server_cluster::{ClusterClient, ClusterConfiguration as NetClusterConfiguration};
@ -238,7 +238,7 @@ pub mod tests {
use key_server_set::tests::MapKeyServerSet;
use key_server_cluster::math;
use ethereum_types::{H256, H520};
use types::all::{Error, Public, ClusterConfiguration, NodeAddress, RequestSignature, ServerKeyId,
use types::{Error, Public, ClusterConfiguration, NodeAddress, RequestSignature, ServerKeyId,
EncryptedDocumentKey, EncryptedDocumentKeyShadow, MessageHash, EncryptedMessageSignature,
Requester, NodeId};
use traits::{AdminSessionsServer, ServerKeyGenerator, DocumentKeyServer, MessageSigner, KeyServer};

View File

@ -143,6 +143,10 @@ pub struct FastestResultComputer {
self_node_id: NodeId,
/// Threshold (if known).
threshold: Option<usize>,
/// Count of all configured key server nodes.
configured_nodes_count: usize,
/// Count of all connected key server nodes.
connected_nodes_count: usize,
}
/// Selects version with most support, waiting for responses from all nodes.
@ -185,7 +189,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
/// Return result computer reference.
pub fn version_holders(&self, version: &H256) -> Result<BTreeSet<NodeId>, Error> {
Ok(self.data.lock().versions.as_ref().ok_or(Error::InvalidStateForRequest)?
.get(version).ok_or(Error::KeyStorage("key version not found".into()))?
.get(version).ok_or(Error::ServerKeyIsNotFound)?
.clone())
}
@ -236,7 +240,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
// try to complete session
Self::try_complete(&self.core, &mut *data);
if no_confirmations_required && data.state != SessionState::Finished {
return Err(Error::MissingKeyShare);
return Err(Error::ServerKeyIsNotFound);
} else if data.state == SessionState::Finished {
return Ok(());
}
@ -266,7 +270,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
&KeyVersionNegotiationMessage::KeyVersions(ref message) =>
self.on_key_versions(sender, message),
&KeyVersionNegotiationMessage::KeyVersionsError(ref message) => {
self.on_session_error(sender, Error::Io(message.error.clone()));
self.on_session_error(sender, message.error.clone());
Ok(())
},
}
@ -388,7 +392,7 @@ impl<T> ClusterSession for SessionImpl<T> where T: SessionTransport {
if data.state != SessionState::Finished {
warn!("{}: key version negotiation session failed with timeout", self.core.meta.self_node_id);
data.result = Some(Err(Error::ConsensusUnreachable));
data.result = Some(Err(Error::ConsensusTemporaryUnreachable));
self.core.completed.notify_all();
}
}
@ -431,11 +435,13 @@ impl SessionTransport for IsolatedSessionTransport {
}
impl FastestResultComputer {
pub fn new(self_node_id: NodeId, key_share: Option<&DocumentKeyShare>) -> Self {
pub fn new(self_node_id: NodeId, key_share: Option<&DocumentKeyShare>, configured_nodes_count: usize, connected_nodes_count: usize) -> Self {
let threshold = key_share.map(|ks| ks.threshold);
FastestResultComputer {
self_node_id: self_node_id,
threshold: threshold,
self_node_id,
threshold,
configured_nodes_count,
connected_nodes_count,
}
}}
@ -443,7 +449,7 @@ impl SessionResultComputer for FastestResultComputer {
fn compute_result(&self, threshold: Option<usize>, confirmations: &BTreeSet<NodeId>, versions: &BTreeMap<H256, BTreeSet<NodeId>>) -> Option<Result<(H256, NodeId), Error>> {
match self.threshold.or(threshold) {
// if there's no versions at all && we're not waiting for confirmations anymore
_ if confirmations.is_empty() && versions.is_empty() => Some(Err(Error::MissingKeyShare)),
_ if confirmations.is_empty() && versions.is_empty() => Some(Err(Error::ServerKeyIsNotFound)),
// if we have key share on this node
Some(threshold) => {
// select version this node have, with enough participants
@ -459,7 +465,17 @@ impl SessionResultComputer for FastestResultComputer {
.find(|&(_, ref n)| n.len() >= threshold + 1)
.map(|(version, nodes)| Ok((version.clone(), nodes.iter().cloned().nth(0)
.expect("version is only inserted when there's at least one owner; qed"))))
.unwrap_or(Err(Error::ConsensusUnreachable))),
// if there's no version consensus among all connected nodes
// AND we're connected to ALL configured nodes
// OR there are less than required nodes for key restore
// => this means that we can't restore key with CURRENT configuration => respond with fatal error
// otherwise we could try later, after all nodes are connected
.unwrap_or_else(|| Err(if self.configured_nodes_count == self.connected_nodes_count
|| self.configured_nodes_count < threshold + 1 {
Error::ConsensusUnreachable
} else {
Error::ConsensusTemporaryUnreachable
}))),
}
},
// if we do not have share, then wait for all confirmations
@ -469,7 +485,11 @@ impl SessionResultComputer for FastestResultComputer {
.max_by_key(|&(_, ref n)| n.len())
.map(|(version, nodes)| Ok((version.clone(), nodes.iter().cloned().nth(0)
.expect("version is only inserted when there's at least one owner; qed"))))
.unwrap_or(Err(Error::ConsensusUnreachable))),
.unwrap_or_else(|| Err(if self.configured_nodes_count == self.connected_nodes_count {
Error::ConsensusUnreachable
} else {
Error::ConsensusTemporaryUnreachable
}))),
}
}
}
@ -480,7 +500,7 @@ impl SessionResultComputer for LargestSupportResultComputer {
return None;
}
if versions.is_empty() {
return Some(Err(Error::MissingKeyShare));
return Some(Err(Error::ServerKeyIsNotFound));
}
versions.iter()
@ -552,12 +572,15 @@ mod tests {
id: Default::default(),
self_node_id: node_id.clone(),
master_node_id: master_node_id.clone(),
configured_nodes_count: nodes.len(),
connected_nodes_count: nodes.len(),
},
sub_session: sub_sesion.clone(),
key_share: key_storage.get(&Default::default()).unwrap(),
result_computer: Arc::new(FastestResultComputer::new(
node_id.clone(),
key_storage.get(&Default::default()).unwrap().as_ref(),
nodes.len(), nodes.len()
)),
transport: DummyTransport {
cluster: cluster,
@ -723,13 +746,15 @@ mod tests {
let computer = FastestResultComputer {
self_node_id: Default::default(),
threshold: None,
configured_nodes_count: 1,
connected_nodes_count: 1,
};
assert_eq!(computer.compute_result(Some(10), &Default::default(), &Default::default()), Some(Err(Error::MissingKeyShare)));
assert_eq!(computer.compute_result(Some(10), &Default::default(), &Default::default()), Some(Err(Error::ServerKeyIsNotFound)));
}
#[test]
fn largest_computer_returns_missing_share_if_no_versions_returned() {
let computer = LargestSupportResultComputer;
assert_eq!(computer.compute_result(Some(10), &Default::default(), &Default::default()), Some(Err(Error::MissingKeyShare)));
assert_eq!(computer.compute_result(Some(10), &Default::default(), &Default::default()), Some(Err(Error::ServerKeyIsNotFound)));
}
}

View File

@ -32,6 +32,10 @@ pub struct ShareChangeSessionMeta {
pub master_node_id: NodeId,
/// Id of node, on which this session is running.
pub self_node_id: NodeId,
/// Count of all configured key server nodes.
pub configured_nodes_count: usize,
/// Count of all connected key server nodes.
pub connected_nodes_count: usize,
}
impl ShareChangeSessionMeta {
@ -42,6 +46,8 @@ impl ShareChangeSessionMeta {
master_node_id: self.master_node_id,
self_node_id: self.self_node_id,
threshold: all_nodes_set_len.checked_sub(1).ok_or(Error::ConsensusUnreachable)?,
configured_nodes_count: self.configured_nodes_count,
connected_nodes_count: self.connected_nodes_count,
})
}
}

View File

@ -282,7 +282,7 @@ impl SessionImpl {
&ServersSetChangeMessage::ServersSetChangeShareAddMessage(ref message) =>
self.on_share_add_message(sender, message),
&ServersSetChangeMessage::ServersSetChangeError(ref message) => {
self.on_session_error(sender, Error::Io(message.error.clone()));
self.on_session_error(sender, message.error.clone());
Ok(())
},
&ServersSetChangeMessage::ServersSetChangeCompleted(ref message) =>
@ -416,12 +416,14 @@ impl SessionImpl {
match &message.message {
&KeyVersionNegotiationMessage::RequestKeyVersions(ref message) if sender == &self.core.meta.master_node_id => {
let key_id = message.session.clone().into();
let key_share = self.core.key_storage.get(&key_id).map_err(|e| Error::KeyStorage(e.into()))?;
let key_share = self.core.key_storage.get(&key_id)?;
let negotiation_session = KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams {
meta: ShareChangeSessionMeta {
id: key_id.clone(),
self_node_id: self.core.meta.self_node_id.clone(),
master_node_id: sender.clone(),
configured_nodes_count: self.core.meta.configured_nodes_count,
connected_nodes_count: self.core.meta.connected_nodes_count,
},
sub_session: message.sub_session.clone().into(),
key_share: key_share,
@ -492,7 +494,7 @@ impl SessionImpl {
// on nodes, holding selected key share version, we could check if master node plan is correct
let master_node_id = message.master_node_id.clone().into();
if let Some(key_share) = self.core.key_storage.get(&key_id).map_err(|e| Error::KeyStorage(e.into()))? {
if let Some(key_share) = self.core.key_storage.get(&key_id)? {
let version = message.version.clone().into();
if let Ok(key_version) = key_share.version(&version) {
let key_share_owners = key_version.id_numbers.keys().cloned().collect();
@ -660,7 +662,7 @@ impl SessionImpl {
if !data.new_nodes_set.as_ref()
.expect("new_nodes_set is filled during initialization; session is completed after initialization; qed")
.contains(&self.core.meta.self_node_id) {
self.core.key_storage.clear().map_err(|e| Error::KeyStorage(e.into()))?;
self.core.key_storage.clear()?;
}
data.state = SessionState::Finished;
@ -709,6 +711,8 @@ impl SessionImpl {
id: key_id,
self_node_id: core.meta.self_node_id.clone(),
master_node_id: master_node_id,
configured_nodes_count: core.meta.configured_nodes_count,
connected_nodes_count: core.meta.connected_nodes_count,
},
cluster: core.cluster.clone(),
key_storage: core.key_storage.clone(),
@ -731,12 +735,14 @@ impl SessionImpl {
Some(Ok(key_id)) => key_id,
};
let key_share = core.key_storage.get(&key_id).map_err(|e| Error::KeyStorage(e.into()))?;
let key_share = core.key_storage.get(&key_id)?;
let negotiation_session = KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams {
meta: ShareChangeSessionMeta {
id: key_id,
self_node_id: core.meta.self_node_id.clone(),
master_node_id: core.meta.self_node_id.clone(),
configured_nodes_count: core.meta.configured_nodes_count,
connected_nodes_count: core.meta.connected_nodes_count,
},
sub_session: math::generate_random_scalar()?,
key_share: key_share,
@ -888,7 +894,7 @@ impl SessionImpl {
if !data.new_nodes_set.as_ref()
.expect("new_nodes_set is filled during initialization; session is completed after initialization; qed")
.contains(&core.meta.self_node_id) {
core.key_storage.clear().map_err(|e| Error::KeyStorage(e.into()))?;
core.key_storage.clear()?;
}
data.state = SessionState::Finished;
@ -1011,9 +1017,9 @@ impl KeyVersionNegotiationTransport for ServersSetChangeKeyVersionNegotiationTra
}
fn check_nodes_set(all_nodes_set: &BTreeSet<NodeId>, new_nodes_set: &BTreeSet<NodeId>) -> Result<(), Error> {
// all new nodes must be a part of all nodes set
// all_nodes_set is the set of nodes we're currently connected to (and configured for)
match new_nodes_set.iter().any(|n| !all_nodes_set.contains(n)) {
true => Err(Error::InvalidNodesConfiguration),
true => Err(Error::NodeDisconnected),
false => Ok(())
}
}
@ -1104,6 +1110,8 @@ pub mod tests {
self_node_id: master_node_id.clone(),
master_node_id: master_node_id.clone(),
id: SessionId::default(),
configured_nodes_count: all_nodes_set.len(),
connected_nodes_count: all_nodes_set.len(),
};
let old_nodes = gml.nodes.iter().map(|n| create_node(meta.clone(), admin_public.clone(), all_nodes_set.clone(), n.1));

View File

@ -155,9 +155,7 @@ pub struct IsolatedSessionTransport {
impl<T> SessionImpl<T> where T: SessionTransport {
/// Create new share addition session.
pub fn new(params: SessionParams<T>) -> Result<Self, Error> {
let key_share = params.key_storage
.get(&params.meta.id)
.map_err(|e| Error::KeyStorage(e.into()))?;
let key_share = params.key_storage.get(&params.meta.id)?;
Ok(SessionImpl {
core: SessionCore {
@ -257,8 +255,8 @@ impl<T> SessionImpl<T> where T: SessionTransport {
let admin_public = self.core.admin_public.as_ref().cloned().ok_or(Error::ConsensusUnreachable)?;
// key share version is required on ShareAdd master node
let key_share = self.core.key_share.as_ref().ok_or_else(|| Error::KeyStorage("key share is not found on master node".into()))?;
let key_version = key_share.version(&version).map_err(|e| Error::KeyStorage(e.into()))?;
let key_share = self.core.key_share.as_ref().ok_or_else(|| Error::ServerKeyIsNotFound)?;
let key_version = key_share.version(&version)?;
// old nodes set is all non-isolated owners of version holders
let non_isolated_nodes = self.core.transport.nodes();
@ -326,7 +324,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
&ShareAddMessage::NewKeysDissemination(ref message) =>
self.on_new_keys_dissemination(sender, message),
&ShareAddMessage::ShareAddError(ref message) => {
self.on_session_error(sender, Error::Io(message.error.clone().into()));
self.on_session_error(sender, message.error.clone());
Ok(())
},
}
@ -714,10 +712,10 @@ impl<T> SessionImpl<T> where T: SessionTransport {
// save encrypted data to the key storage
data.state = SessionState::Finished;
if core.key_share.is_some() {
core.key_storage.update(core.meta.id.clone(), refreshed_key_share.clone())
core.key_storage.update(core.meta.id.clone(), refreshed_key_share.clone())?;
} else {
core.key_storage.insert(core.meta.id.clone(), refreshed_key_share.clone())
}.map_err(|e| Error::KeyStorage(e.into()))?;
core.key_storage.insert(core.meta.id.clone(), refreshed_key_share.clone())?;
}
// signal session completion
data.state = SessionState::Finished;
@ -851,7 +849,7 @@ impl SessionTransport for IsolatedSessionTransport {
#[cfg(test)]
pub mod tests {
use std::sync::Arc;
use std::collections::{VecDeque, BTreeMap, BTreeSet};
use std::collections::{VecDeque, BTreeMap, BTreeSet, HashSet};
use ethkey::{Random, Generator, Public, KeyPair, Signature, sign};
use ethereum_types::H256;
use key_server_cluster::{NodeId, SessionId, Error, KeyStorage, DummyKeyStorage};
@ -952,6 +950,8 @@ pub mod tests {
id: SessionId::default(),
self_node_id: NodeId::default(),
master_node_id: master_node_id,
configured_nodes_count: new_nodes_set.iter().chain(old_nodes_set.iter()).collect::<HashSet<_>>().len(),
connected_nodes_count: new_nodes_set.iter().chain(old_nodes_set.iter()).collect::<HashSet<_>>().len(),
};
let new_nodes = new_nodes_set.iter()
.filter(|n| !old_nodes_set.contains(&n))
@ -992,6 +992,8 @@ pub mod tests {
id: SessionId::default(),
self_node_id: NodeId::default(),
master_node_id: master_node_id,
configured_nodes_count: new_nodes_set.iter().chain(ml.nodes.keys()).collect::<BTreeSet<_>>().len(),
connected_nodes_count: new_nodes_set.iter().chain(ml.nodes.keys()).collect::<BTreeSet<_>>().len(),
};
let old_nodes_set = ml.nodes.keys().cloned().collect();
let nodes = ml.nodes.iter()
@ -1102,7 +1104,7 @@ pub mod tests {
assert_eq!(ml.nodes[&master_node_id].session.initialize(Some(ml.version), Some(new_nodes_set),
Some(ml.old_set_signature.clone()),
Some(ml.new_set_signature.clone())
).unwrap_err(), Error::KeyStorage("key share is not found on master node".into()));
).unwrap_err(), Error::ServerKeyIsNotFound);
}
#[test]

View File

@ -154,7 +154,7 @@ impl SessionImpl {
if let Some(key_share) = params.key_share.as_ref() {
// encrypted data must be set
if key_share.common_point.is_none() || key_share.encrypted_point.is_none() {
return Err(Error::NotStartedSessionId);
return Err(Error::DocumentKeyIsNotFound);
}
}
@ -290,7 +290,7 @@ impl SessionImpl {
// check if version exists
let key_version = match self.core.key_share.as_ref() {
None => return Err(Error::InvalidMessage),
Some(key_share) => key_share.version(&version).map_err(|e| Error::KeyStorage(e.into()))?,
Some(key_share) => key_share.version(&version)?,
};
let mut data = self.data.lock();
@ -337,7 +337,7 @@ impl SessionImpl {
&DecryptionMessage::PartialDecryption(ref message) =>
self.on_partial_decryption(sender, message),
&DecryptionMessage::DecryptionSessionError(ref message) =>
self.process_node_error(Some(&sender), Error::Io(message.error.clone())),
self.process_node_error(Some(&sender), message.error.clone()),
&DecryptionMessage::DecryptionSessionCompleted(ref message) =>
self.on_session_completed(sender, message),
&DecryptionMessage::DecryptionSessionDelegation(ref message) =>
@ -432,8 +432,7 @@ impl SessionImpl {
};
let mut data = self.data.lock();
let key_version = key_share.version(data.version.as_ref().ok_or(Error::InvalidMessage)?)
.map_err(|e| Error::KeyStorage(e.into()))?.hash.clone();
let key_version = key_share.version(data.version.as_ref().ok_or(Error::InvalidMessage)?)?.hash.clone();
let requester_public = data.consensus_session.consensus_job().executor().requester()
.ok_or(Error::InvalidStateForRequest)?
.public(&self.core.meta.id)
@ -564,7 +563,7 @@ impl SessionImpl {
match {
match node {
Some(node) => data.consensus_session.on_node_error(node),
Some(node) => data.consensus_session.on_node_error(node, error.clone()),
None => data.consensus_session.on_session_timeout(),
}
} {
@ -601,7 +600,7 @@ impl SessionImpl {
Some(key_share) => key_share,
};
let key_version = key_share.version(version).map_err(|e| Error::KeyStorage(e.into()))?.hash.clone();
let key_version = key_share.version(version)?.hash.clone();
let requester = data.consensus_session.consensus_job().executor().requester().ok_or(Error::InvalidStateForRequest)?.clone();
let requester_public = requester.public(&core.meta.id).map_err(Error::InsufficientRequesterData)?;
let consensus_group = data.consensus_session.select_consensus_group()?.clone();
@ -637,6 +636,8 @@ impl SessionImpl {
master_node_id: core.meta.self_node_id.clone(),
self_node_id: core.meta.self_node_id.clone(),
threshold: core.meta.threshold,
configured_nodes_count: core.meta.configured_nodes_count,
connected_nodes_count: core.meta.connected_nodes_count,
}, job, transport);
job_session.initialize(consensus_group, self_response, core.meta.self_node_id != core.meta.master_node_id)?;
data.broadcast_job_session = Some(job_session);
@ -881,6 +882,8 @@ mod tests {
self_node_id: id_numbers.iter().nth(i).clone().unwrap().0,
master_node_id: id_numbers.iter().nth(0).clone().unwrap().0,
threshold: encrypted_datas[i].threshold,
configured_nodes_count: 5,
connected_nodes_count: 5,
},
access_key: access_key.clone(),
key_share: Some(encrypted_datas[i].clone()),
@ -944,6 +947,8 @@ mod tests {
self_node_id: self_node_id.clone(),
master_node_id: self_node_id.clone(),
threshold: 0,
configured_nodes_count: 1,
connected_nodes_count: 1,
},
access_key: Random.generate().unwrap().secret().clone(),
key_share: Some(DocumentKeyShare {
@ -976,6 +981,8 @@ mod tests {
self_node_id: self_node_id.clone(),
master_node_id: self_node_id.clone(),
threshold: 0,
configured_nodes_count: 1,
connected_nodes_count: 1,
},
access_key: Random.generate().unwrap().secret().clone(),
key_share: None,
@ -998,6 +1005,8 @@ mod tests {
self_node_id: self_node_id.clone(),
master_node_id: self_node_id.clone(),
threshold: 2,
configured_nodes_count: 1,
connected_nodes_count: 1,
},
access_key: Random.generate().unwrap().secret().clone(),
key_share: Some(DocumentKeyShare {
@ -1131,7 +1140,7 @@ mod tests {
let (_, _, _, sessions) = prepare_decryption_sessions();
assert!(sessions[0].decrypted_secret().is_none());
sessions[0].on_session_timeout();
assert_eq!(sessions[0].decrypted_secret().unwrap().unwrap_err(), Error::ConsensusUnreachable);
assert_eq!(sessions[0].decrypted_secret().unwrap().unwrap_err(), Error::ConsensusTemporaryUnreachable);
}
#[test]
@ -1141,7 +1150,7 @@ mod tests {
// 1 node disconnects => we still can recover secret
sessions[0].on_node_timeout(sessions[1].node());
assert!(sessions[0].data.lock().consensus_session.consensus_job().rejects().contains(sessions[1].node()));
assert!(sessions[0].data.lock().consensus_session.consensus_job().rejects().contains_key(sessions[1].node()));
assert!(sessions[0].state() == ConsensusSessionState::EstablishingConsensus);
// 2 node are disconnected => we can not recover secret
@ -1208,7 +1217,7 @@ mod tests {
let disconnected = sessions[0].data.lock().consensus_session.computation_job().requests().iter().cloned().nth(0).unwrap();
sessions[0].on_node_timeout(&disconnected);
assert_eq!(sessions[0].state(), ConsensusSessionState::EstablishingConsensus);
assert!(sessions[0].data.lock().consensus_session.computation_job().rejects().contains(&disconnected));
assert!(sessions[0].data.lock().consensus_session.computation_job().rejects().contains_key(&disconnected));
assert!(!sessions[0].data.lock().consensus_session.computation_job().requests().contains(&disconnected));
}

View File

@ -306,7 +306,7 @@ impl ClusterSession for SessionImpl {
&EncryptionMessage::ConfirmEncryptionInitialization(ref message) =>
self.on_confirm_initialization(sender.clone(), message),
&EncryptionMessage::EncryptionSessionError(ref message) => {
self.on_session_error(sender, Error::Io(message.error.clone().into()));
self.on_session_error(sender, message.error.clone());
Ok(())
},
},
@ -326,7 +326,7 @@ pub fn check_encrypted_data(key_share: Option<&DocumentKeyShare>) -> Result<(),
if let Some(key_share) = key_share {
// check that common_point and encrypted_point are still not set yet
if key_share.common_point.is_some() || key_share.encrypted_point.is_some() {
return Err(Error::CompletedSessionId);
return Err(Error::DocumentKeyAlreadyStored);
}
}
@ -344,5 +344,4 @@ pub fn update_encrypted_data(key_storage: &Arc<KeyStorage>, key_id: ServerKeyId,
key_share.common_point = Some(common_point);
key_share.encrypted_point = Some(encrypted_point);
key_storage.update(key_id, key_share)
.map_err(|e| Error::KeyStorage(e.into()))
}

View File

@ -354,7 +354,7 @@ impl SessionImpl {
&GenerationMessage::PublicKeyShare(ref message) =>
self.on_public_key_share(sender.clone(), message),
&GenerationMessage::SessionError(ref message) => {
self.on_session_error(sender, Error::Io(message.error.clone().into()));
self.on_session_error(sender, message.error.clone());
Ok(())
},
&GenerationMessage::SessionCompleted(ref message) =>
@ -474,7 +474,7 @@ impl SessionImpl {
// simulate failure, if required
if data.simulate_faulty_behaviour {
return Err(Error::Io("simulated error".into()));
return Err(Error::Internal("simulated error".into()));
}
// check state
@ -592,8 +592,7 @@ impl SessionImpl {
};
if let Some(ref key_storage) = self.key_storage {
key_storage.insert(self.id.clone(), encrypted_data.clone())
.map_err(|e| Error::KeyStorage(e.into()))?;
key_storage.insert(self.id.clone(), encrypted_data.clone())?;
}
// then respond with confirmation
@ -790,8 +789,7 @@ impl SessionImpl {
// then save encrypted data to the key storage
if let Some(ref key_storage) = self.key_storage {
key_storage.insert(self.id.clone(), encrypted_data.clone())
.map_err(|e| Error::KeyStorage(e.into()))?;
key_storage.insert(self.id.clone(), encrypted_data.clone())?;
}
// then distribute encrypted data to every other node
@ -925,23 +923,15 @@ impl Debug for SessionImpl {
}
}
pub fn check_cluster_nodes(self_node_id: &NodeId, nodes: &BTreeSet<NodeId>) -> Result<(), Error> {
// at least two nodes must be in cluster
if nodes.len() < 1 {
return Err(Error::InvalidNodesCount);
}
// this node must be a part of cluster
if !nodes.contains(self_node_id) {
return Err(Error::InvalidNodesConfiguration);
}
fn check_cluster_nodes(self_node_id: &NodeId, nodes: &BTreeSet<NodeId>) -> Result<(), Error> {
assert!(nodes.contains(self_node_id));
Ok(())
}
pub fn check_threshold(threshold: usize, nodes: &BTreeSet<NodeId>) -> Result<(), Error> {
fn check_threshold(threshold: usize, nodes: &BTreeSet<NodeId>) -> Result<(), Error> {
// at least threshold + 1 nodes are required to collectively decrypt message
if threshold >= nodes.len() {
return Err(Error::InvalidThreshold);
return Err(Error::NotEnoughNodesForThreshold);
}
Ok(())
@ -1096,25 +1086,10 @@ pub mod tests {
assert!(l.master().initialize(Default::default(), Default::default(), false, 0, l.nodes.keys().cloned().collect::<BTreeSet<_>>().into()).is_ok());
}
#[test]
fn fails_to_initialize_if_not_a_part_of_cluster() {
let node_id = math::generate_random_point().unwrap();
let cluster = Arc::new(DummyCluster::new(node_id.clone()));
let session = SessionImpl::new(SessionParams {
id: SessionId::default(),
self_node_id: node_id.clone(),
key_storage: Some(Arc::new(DummyKeyStorage::default())),
cluster: cluster,
nonce: Some(0),
});
let cluster_nodes: BTreeSet<_> = (0..2).map(|_| math::generate_random_point().unwrap()).collect();
assert_eq!(session.initialize(Default::default(), Default::default(), false, 0, cluster_nodes.into()).unwrap_err(), Error::InvalidNodesConfiguration);
}
#[test]
fn fails_to_initialize_if_threshold_is_wrong() {
match make_simple_cluster(2, 2) {
Err(Error::InvalidThreshold) => (),
Err(Error::NotEnoughNodesForThreshold) => (),
_ => panic!("unexpected"),
}
}
@ -1193,24 +1168,6 @@ pub mod tests {
assert!(l.master().derived_point().unwrap() != passed_point.into());
}
#[test]
fn fails_to_complete_initialization_if_not_a_part_of_cluster() {
let (sid, m, _, l) = make_simple_cluster(0, 2).unwrap();
let mut nodes = BTreeMap::new();
nodes.insert(m, math::generate_random_scalar().unwrap());
nodes.insert(math::generate_random_point().unwrap(), math::generate_random_scalar().unwrap());
assert_eq!(l.first_slave().on_initialize_session(m, &message::InitializeSession {
session: sid.into(),
session_nonce: 0,
origin: None,
author: Address::default().into(),
nodes: nodes.into_iter().map(|(k, v)| (k.into(), v.into())).collect(),
is_zero: false,
threshold: 0,
derived_point: math::generate_random_point().unwrap().into(),
}).unwrap_err(), Error::InvalidNodesConfiguration);
}
#[test]
fn fails_to_complete_initialization_if_threshold_is_wrong() {
let (sid, m, s, l) = make_simple_cluster(0, 2).unwrap();
@ -1226,7 +1183,7 @@ pub mod tests {
is_zero: false,
threshold: 2,
derived_point: math::generate_random_point().unwrap().into(),
}).unwrap_err(), Error::InvalidThreshold);
}).unwrap_err(), Error::NotEnoughNodesForThreshold);
}
#[test]

View File

@ -187,6 +187,8 @@ impl SessionImpl {
master_node_id: params.meta.master_node_id,
self_node_id: params.meta.self_node_id,
threshold: params.meta.threshold * 2,
configured_nodes_count: params.meta.configured_nodes_count,
connected_nodes_count: params.meta.connected_nodes_count,
},
consensus_executor: match requester {
Some(requester) => KeyAccessJob::new_on_master(params.meta.id.clone(), params.acl_storage.clone(), requester),
@ -259,7 +261,7 @@ impl SessionImpl {
// check if version exists
let key_version = match self.core.key_share.as_ref() {
None => return Err(Error::InvalidMessage),
Some(key_share) => key_share.version(&version).map_err(|e| Error::KeyStorage(e.into()))?,
Some(key_share) => key_share.version(&version)?,
};
// select nodes to participate in consensus etablish session
@ -311,7 +313,7 @@ impl SessionImpl {
&EcdsaSigningMessage::EcdsaPartialSignature(ref message) =>
self.on_partial_signature(sender, message),
&EcdsaSigningMessage::EcdsaSigningSessionError(ref message) =>
self.process_node_error(Some(&sender), Error::Io(message.error.clone())),
self.process_node_error(Some(&sender), message.error.clone()),
&EcdsaSigningMessage::EcdsaSigningSessionCompleted(ref message) =>
self.on_session_completed(sender, message),
&EcdsaSigningMessage::EcdsaSigningSessionDelegation(ref message) =>
@ -386,8 +388,7 @@ impl SessionImpl {
let key_share = self.core.key_share.as_ref()
.expect("this is master node; master node is selected so that it has key version; qed");
let key_version = key_share.version(data.version.as_ref()
.expect("this is master node; master node is selected so that it has key version; qed")
).map_err(|e| Error::KeyStorage(e.into()))?;
.expect("this is master node; master node is selected so that it has key version; qed"))?;
let consensus_group = data.consensus_session.select_consensus_group()?.clone();
let mut other_consensus_group_nodes = consensus_group.clone();
@ -680,7 +681,7 @@ impl SessionImpl {
let inv_nonce_share = data.inv_nonce_generation_session.as_ref().expect(nonce_exists_proof).joint_public_and_secret().expect(nonce_exists_proof)?.2;
let version = data.version.as_ref().ok_or(Error::InvalidMessage)?.clone();
let key_version = key_share.version(&version).map_err(|e| Error::KeyStorage(e.into()))?.hash.clone();
let key_version = key_share.version(&version)?.hash.clone();
let signing_job = EcdsaSigningJob::new_on_slave(key_share.clone(), key_version, sig_nonce_public, inv_nonce_share)?;
let signing_transport = self.core.signing_transport();
@ -744,7 +745,7 @@ impl SessionImpl {
match {
match node {
Some(node) => data.consensus_session.on_node_error(node),
Some(node) => data.consensus_session.on_node_error(node, error.clone()),
None => data.consensus_session.on_session_timeout(),
}
} {
@ -970,6 +971,14 @@ impl<F> Cluster for NonceGenerationTransport<F> where F: Fn(SessionId, Secret, u
fn nodes(&self) -> BTreeSet<NodeId> {
self.cluster.nodes()
}
fn configured_nodes_count(&self) -> usize {
self.cluster.configured_nodes_count()
}
fn connected_nodes_count(&self) -> usize {
self.cluster.connected_nodes_count()
}
}
impl SessionCore {
@ -988,7 +997,7 @@ impl SessionCore {
Some(key_share) => key_share,
};
let key_version = key_share.version(version).map_err(|e| Error::KeyStorage(e.into()))?.hash.clone();
let key_version = key_share.version(version)?.hash.clone();
let signing_job = EcdsaSigningJob::new_on_master(key_share.clone(), key_version, nonce_public, inv_nonce_share, inversed_nonce_coeff, message_hash)?;
consensus_session.disseminate_jobs(signing_job, self.signing_transport(), false).map(|_| ())
}
@ -1099,6 +1108,8 @@ mod tests {
self_node_id: gl_node_id.clone(),
master_node_id: master_node_id.clone(),
threshold: gl_node.key_storage.get(&session_id).unwrap().unwrap().threshold,
configured_nodes_count: gl.nodes.len(),
connected_nodes_count: gl.nodes.len(),
},
access_key: "834cb736f02d9c968dfaf0c37658a1d86ff140554fc8b59c9fdad5a8cf810eec".parse().unwrap(),
key_share: Some(gl_node.key_storage.get(&session_id).unwrap().unwrap()),

View File

@ -246,7 +246,7 @@ impl SessionImpl {
// check if version exists
let key_version = match self.core.key_share.as_ref() {
None => return Err(Error::InvalidMessage),
Some(key_share) => key_share.version(&version).map_err(|e| Error::KeyStorage(e.into()))?,
Some(key_share) => key_share.version(&version)?,
};
let mut data = self.data.lock();
@ -313,7 +313,7 @@ impl SessionImpl {
&SchnorrSigningMessage::SchnorrPartialSignature(ref message) =>
self.on_partial_signature(sender, message),
&SchnorrSigningMessage::SchnorrSigningSessionError(ref message) =>
self.process_node_error(Some(&sender), Error::Io(message.error.clone())),
self.process_node_error(Some(&sender), message.error.clone()),
&SchnorrSigningMessage::SchnorrSigningSessionCompleted(ref message) =>
self.on_session_completed(sender, message),
&SchnorrSigningMessage::SchnorrSigningSessionDelegation(ref message) =>
@ -500,8 +500,7 @@ impl SessionImpl {
.expect("session key is generated before signature is computed; we are in SignatureComputing state; qed")
.joint_public_and_secret()
.expect("session key is generated before signature is computed; we are in SignatureComputing state; qed")?;
let key_version = key_share.version(data.version.as_ref().ok_or(Error::InvalidMessage)?)
.map_err(|e| Error::KeyStorage(e.into()))?.hash.clone();
let key_version = key_share.version(data.version.as_ref().ok_or(Error::InvalidMessage)?)?.hash.clone();
let signing_job = SchnorrSigningJob::new_on_slave(self.core.meta.self_node_id.clone(), key_share.clone(), key_version, joint_public_and_secret.0, joint_public_and_secret.1)?;
let signing_transport = self.core.signing_transport();
@ -564,7 +563,7 @@ impl SessionImpl {
match {
match node {
Some(node) => data.consensus_session.on_node_error(node),
Some(node) => data.consensus_session.on_node_error(node, error.clone()),
None => data.consensus_session.on_session_timeout(),
}
} {
@ -717,6 +716,14 @@ impl Cluster for SessionKeyGenerationTransport {
fn nodes(&self) -> BTreeSet<NodeId> {
self.cluster.nodes()
}
fn configured_nodes_count(&self) -> usize {
self.cluster.configured_nodes_count()
}
fn connected_nodes_count(&self) -> usize {
self.cluster.connected_nodes_count()
}
}
impl SessionCore {
@ -735,7 +742,7 @@ impl SessionCore {
Some(key_share) => key_share,
};
let key_version = key_share.version(version).map_err(|e| Error::KeyStorage(e.into()))?.hash.clone();
let key_version = key_share.version(version)?.hash.clone();
let signing_job = SchnorrSigningJob::new_on_master(self.meta.self_node_id.clone(), key_share.clone(), key_version,
session_public, session_secret_share, message_hash)?;
consensus_session.disseminate_jobs(signing_job, self.signing_transport(), false).map(|_| ())
@ -851,6 +858,8 @@ mod tests {
self_node_id: gl_node_id.clone(),
master_node_id: master_node_id.clone(),
threshold: gl_node.key_storage.get(&session_id).unwrap().unwrap().threshold,
configured_nodes_count: gl.nodes.len(),
connected_nodes_count: gl.nodes.len(),
},
access_key: "834cb736f02d9c968dfaf0c37658a1d86ff140554fc8b59c9fdad5a8cf810eec".parse().unwrap(),
key_share: Some(gl_node.key_storage.get(&session_id).unwrap().unwrap()),
@ -971,6 +980,8 @@ mod tests {
self_node_id: self_node_id.clone(),
master_node_id: self_node_id.clone(),
threshold: 0,
configured_nodes_count: 1,
connected_nodes_count: 1,
},
access_key: Random.generate().unwrap().secret().clone(),
key_share: Some(DocumentKeyShare {
@ -1003,6 +1014,8 @@ mod tests {
self_node_id: self_node_id.clone(),
master_node_id: self_node_id.clone(),
threshold: 0,
configured_nodes_count: 1,
connected_nodes_count: 1,
},
access_key: Random.generate().unwrap().secret().clone(),
key_share: None,
@ -1025,6 +1038,8 @@ mod tests {
self_node_id: self_node_id.clone(),
master_node_id: self_node_id.clone(),
threshold: 2,
configured_nodes_count: 1,
connected_nodes_count: 1,
},
access_key: Random.generate().unwrap().secret().clone(),
key_share: Some(DocumentKeyShare {

View File

@ -109,6 +109,10 @@ pub trait Cluster: Send + Sync {
fn is_connected(&self, node: &NodeId) -> bool;
/// Get a set of connected nodes.
fn nodes(&self) -> BTreeSet<NodeId>;
/// Get total count of configured key server nodes (valid at the time of ClusterView creation).
fn configured_nodes_count(&self) -> usize;
/// Get total count of connected key server nodes (valid at the time of ClusterView creation).
fn connected_nodes_count(&self) -> usize;
}
/// Cluster initialization parameters.
@ -160,6 +164,8 @@ pub struct ClusterClientImpl {
/// Network cluster view. It is a communication channel, required in single session.
pub struct ClusterView {
core: Arc<Mutex<ClusterViewCore>>,
configured_nodes_count: usize,
connected_nodes_count: usize,
}
/// Cross-thread shareable cluster data.
@ -554,7 +560,7 @@ impl ClusterCore {
let is_initialization_message = message.is_initialization_message();
let is_delegation_message = message.is_delegation_message();
match is_initialization_message || is_delegation_message {
false => sessions.get(&session_id, true).ok_or(Error::InvalidSessionId),
false => sessions.get(&session_id, true).ok_or(Error::NoActiveSessionWithId),
true => {
let creation_data = SC::creation_data_from_message(&message)?;
let master = if is_initialization_message { sender.clone() } else { data.self_key_pair.public().clone() };
@ -652,7 +658,7 @@ impl ClusterConnections {
let trigger: Box<ConnectionTrigger> = match config.auto_migrate_enabled {
false => Box::new(SimpleConnectionTrigger::new(config.key_server_set.clone(), config.self_key_pair.clone(), config.admin_public.clone())),
true if config.admin_public.is_none() => Box::new(ConnectionTriggerWithMigration::new(config.key_server_set.clone(), config.self_key_pair.clone())),
true => return Err(Error::Io("secret store admininstrator public key is specified with auto-migration enabled".into())), // TODO [Refac]: Io -> Internal
true => return Err(Error::Internal("secret store admininstrator public key is specified with auto-migration enabled".into())),
};
let connector = trigger.servers_set_change_creator_connector();
@ -835,8 +841,10 @@ impl Connection {
}
impl ClusterView {
pub fn new(cluster: Arc<ClusterData>, nodes: BTreeSet<NodeId>) -> Self {
pub fn new(cluster: Arc<ClusterData>, nodes: BTreeSet<NodeId>, configured_nodes_count: usize) -> Self {
ClusterView {
configured_nodes_count: configured_nodes_count,
connected_nodes_count: nodes.len(),
core: Arc::new(Mutex::new(ClusterViewCore {
cluster: cluster,
nodes: nodes,
@ -871,6 +879,14 @@ impl Cluster for ClusterView {
fn nodes(&self) -> BTreeSet<NodeId> {
self.core.lock().nodes.clone()
}
fn configured_nodes_count(&self) -> usize {
self.configured_nodes_count
}
fn connected_nodes_count(&self) -> usize {
self.connected_nodes_count
}
}
impl ClusterClientImpl {
@ -1125,7 +1141,7 @@ pub mod tests {
fn cluster_state(&self) -> ClusterState { unimplemented!("test-only") }
fn new_generation_session(&self, _session_id: SessionId, _origin: Option<Address>, _author: Address, _threshold: usize) -> Result<Arc<GenerationSession>, Error> {
self.generation_requests_count.fetch_add(1, Ordering::Relaxed);
Err(Error::Io("test-errror".into()))
Err(Error::Internal("test-error".into()))
}
fn new_encryption_session(&self, _session_id: SessionId, _requester: Requester, _common_point: Public, _encrypted_point: Public) -> Result<Arc<EncryptionSession>, Error> { unimplemented!("test-only") }
fn new_decryption_session(&self, _session_id: SessionId, _origin: Option<Address>, _requester: Requester, _version: Option<H256>, _is_shadow_decryption: bool, _is_broadcast_session: bool) -> Result<Arc<DecryptionSession>, Error> { unimplemented!("test-only") }
@ -1197,6 +1213,14 @@ pub mod tests {
fn nodes(&self) -> BTreeSet<NodeId> {
self.data.lock().nodes.iter().cloned().collect()
}
fn configured_nodes_count(&self) -> usize {
self.data.lock().nodes.len()
}
fn connected_nodes_count(&self) -> usize {
self.data.lock().nodes.len()
}
}
pub fn loop_until<F>(core: &mut Core, timeout: Duration, predicate: F) where F: Fn() -> bool {
@ -1365,11 +1389,11 @@ pub mod tests {
{
// try to start generation session => fail in initialization
assert_eq!(clusters[0].client().new_generation_session(SessionId::default(), Default::default(), Default::default(), 100).map(|_| ()),
Err(Error::InvalidThreshold));
Err(Error::NotEnoughNodesForThreshold));
// try to start generation session => fails in initialization
assert_eq!(clusters[0].client().new_generation_session(SessionId::default(), Default::default(), Default::default(), 100).map(|_| ()),
Err(Error::InvalidThreshold));
Err(Error::NotEnoughNodesForThreshold));
assert!(clusters[0].data.sessions.generation_sessions.is_empty());
}

View File

@ -547,8 +547,9 @@ impl ClusterSession for AdminSession {
}
}
pub fn create_cluster_view(data: &Arc<ClusterData>, requires_all_connections: bool) -> Result<Arc<Cluster>, Error> {
let disconnected_nodes_count = data.connections.disconnected_nodes().len();
if requires_all_connections {
if !data.connections.disconnected_nodes().is_empty() {
if disconnected_nodes_count != 0 {
return Err(Error::NodeDisconnected);
}
}
@ -556,7 +557,8 @@ pub fn create_cluster_view(data: &Arc<ClusterData>, requires_all_connections: bo
let mut connected_nodes = data.connections.connected_nodes();
connected_nodes.insert(data.self_key_pair.public().clone());
Ok(Arc::new(ClusterView::new(data.clone(), connected_nodes)))
let connected_nodes_count = connected_nodes.len();
Ok(Arc::new(ClusterView::new(data.clone(), connected_nodes, connected_nodes_count + disconnected_nodes_count)))
}
#[cfg(test)]

View File

@ -115,7 +115,7 @@ impl SessionCreatorCore {
/// Read key share && remove disconnected nodes.
fn read_key_share(&self, key_id: &SessionId) -> Result<Option<DocumentKeyShare>, Error> {
self.key_storage.get(key_id).map_err(|e| Error::KeyStorage(e.into()))
self.key_storage.get(key_id)
}
}
@ -146,7 +146,7 @@ impl ClusterSessionCreator<GenerationSessionImpl, ()> for GenerationSessionCreat
fn create(&self, cluster: Arc<Cluster>, master: NodeId, nonce: Option<u64>, id: SessionId, _creation_data: Option<()>) -> Result<Arc<GenerationSessionImpl>, Error> {
// check that there's no finished encryption session with the same id
if self.core.key_storage.contains(&id) {
return Err(Error::DuplicateSessionId);
return Err(Error::ServerKeyAlreadyGenerated);
}
let nonce = self.core.check_session_nonce(&master, nonce)?;
@ -232,6 +232,8 @@ impl ClusterSessionCreator<DecryptionSessionImpl, Requester> for DecryptionSessi
self_node_id: self.core.self_node_id.clone(),
master_node_id: master,
threshold: encrypted_data.as_ref().map(|ks| ks.threshold).unwrap_or_default(),
configured_nodes_count: cluster.configured_nodes_count(),
connected_nodes_count: cluster.connected_nodes_count(),
},
access_key: id.access_key,
key_share: encrypted_data,
@ -278,6 +280,8 @@ impl ClusterSessionCreator<SchnorrSigningSessionImpl, Requester> for SchnorrSign
self_node_id: self.core.self_node_id.clone(),
master_node_id: master,
threshold: encrypted_data.as_ref().map(|ks| ks.threshold).unwrap_or_default(),
configured_nodes_count: cluster.configured_nodes_count(),
connected_nodes_count: cluster.connected_nodes_count(),
},
access_key: id.access_key,
key_share: encrypted_data,
@ -324,6 +328,8 @@ impl ClusterSessionCreator<EcdsaSigningSessionImpl, Requester> for EcdsaSigningS
self_node_id: self.core.self_node_id.clone(),
master_node_id: master,
threshold: encrypted_data.as_ref().map(|ks| ks.threshold).unwrap_or_default(),
configured_nodes_count: cluster.configured_nodes_count(),
connected_nodes_count: cluster.connected_nodes_count(),
},
access_key: id.access_key,
key_share: encrypted_data,
@ -351,14 +357,19 @@ impl ClusterSessionCreator<KeyVersionNegotiationSessionImpl<VersionNegotiationTr
}
fn create(&self, cluster: Arc<Cluster>, master: NodeId, nonce: Option<u64>, id: SessionIdWithSubSession, _creation_data: Option<()>) -> Result<Arc<KeyVersionNegotiationSessionImpl<VersionNegotiationTransport>>, Error> {
let configured_nodes_count = cluster.configured_nodes_count();
let connected_nodes_count = cluster.connected_nodes_count();
let encrypted_data = self.core.read_key_share(&id.id)?;
let nonce = self.core.check_session_nonce(&master, nonce)?;
let computer = Arc::new(FastestResultKeyVersionsResultComputer::new(self.core.self_node_id.clone(), encrypted_data.as_ref()));
let computer = Arc::new(FastestResultKeyVersionsResultComputer::new(self.core.self_node_id.clone(), encrypted_data.as_ref(),
configured_nodes_count, configured_nodes_count));
Ok(Arc::new(KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams {
meta: ShareChangeSessionMeta {
id: id.id.clone(),
self_node_id: self.core.self_node_id.clone(),
master_node_id: master,
configured_nodes_count: configured_nodes_count,
connected_nodes_count: connected_nodes_count,
},
sub_session: id.access_key.clone(),
key_share: encrypted_data,
@ -419,6 +430,8 @@ impl ClusterSessionCreator<AdminSession, AdminSessionCreationData> for AdminSess
id: id.clone(),
self_node_id: self.core.self_node_id.clone(),
master_node_id: master,
configured_nodes_count: cluster.configured_nodes_count(),
connected_nodes_count: cluster.connected_nodes_count(),
},
transport: ShareAddTransport::new(id.clone(), Some(version), nonce, cluster),
key_storage: self.core.key_storage.clone(),
@ -435,6 +448,8 @@ impl ClusterSessionCreator<AdminSession, AdminSessionCreationData> for AdminSess
id: id.clone(),
self_node_id: self.core.self_node_id.clone(),
master_node_id: master,
configured_nodes_count: cluster.configured_nodes_count(),
connected_nodes_count: cluster.connected_nodes_count(),
},
cluster: cluster.clone(),
key_storage: self.core.key_storage.clone(),

View File

@ -23,7 +23,7 @@ use ethkey::Public;
use key_server_cluster::{KeyServerSet, KeyServerSetSnapshot};
use key_server_cluster::cluster::{ClusterClient, ClusterConnectionsData};
use key_server_cluster::cluster_sessions::AdminSession;
use types::all::{Error, NodeId};
use types::{Error, NodeId};
use {NodeKeyPair};
#[derive(Debug, Clone, Copy, PartialEq)]

View File

@ -26,7 +26,7 @@ use key_server_cluster::cluster_sessions::{AdminSession, ClusterSession};
use key_server_cluster::jobs::servers_set_change_access_job::ordered_nodes_hash;
use key_server_cluster::connection_trigger::{Maintain, ConnectionsAction, ConnectionTrigger,
ServersSetChangeSessionCreatorConnector, TriggerConnections};
use types::all::{Error, NodeId};
use types::{Error, NodeId};
use {NodeKeyPair};
/// Key servers set change trigger with automated migration procedure.

View File

@ -232,38 +232,42 @@ impl<ConsensusExecutor, ConsensusTransport, ComputationExecutor, ComputationTran
}
/// When error is received from node.
pub fn on_node_error(&mut self, node: &NodeId) -> Result<bool, Error> {
pub fn on_node_error(&mut self, node: &NodeId, error: Error) -> Result<bool, Error> {
let is_self_master = self.meta.master_node_id == self.meta.self_node_id;
let is_node_master = self.meta.master_node_id == *node;
let (is_restart_needed, timeout_result) = match self.state {
ConsensusSessionState::WaitingForInitialization if is_self_master => {
// it is strange to receive error before session is initialized && slave doesn't know access_key
// => fatal error
// => unreachable
self.state = ConsensusSessionState::Failed;
(false, Err(Error::ConsensusUnreachable))
}
ConsensusSessionState::WaitingForInitialization if is_node_master => {
// can not establish consensus
// => fatal error
// error from master node before establishing consensus
// => unreachable
self.state = ConsensusSessionState::Failed;
(false, Err(Error::ConsensusUnreachable))
(false, Err(if !error.is_non_fatal() {
Error::ConsensusUnreachable
} else {
Error::ConsensusTemporaryUnreachable
}))
},
ConsensusSessionState::EstablishingConsensus => {
debug_assert!(is_self_master);
// consensus still can be established
// => try to live without this node
(false, self.consensus_job.on_node_error(node))
(false, self.consensus_job.on_node_error(node, error))
},
ConsensusSessionState::ConsensusEstablished => {
// we could try to continue without this node, if enough nodes left
(false, self.consensus_job.on_node_error(node))
(false, self.consensus_job.on_node_error(node, error))
},
ConsensusSessionState::WaitingForPartialResults => {
// check if *current* computation job can continue without this node
let is_computation_node = self.computation_job.as_mut()
.expect("WaitingForPartialResults state is only set when computation_job is created; qed")
.on_node_error(node)
.on_node_error(node, error.clone())
.is_err();
if !is_computation_node {
// it is not used by current computation job
@ -275,7 +279,7 @@ impl<ConsensusExecutor, ConsensusTransport, ComputationExecutor, ComputationTran
self.consensus_group.clear();
self.state = ConsensusSessionState::EstablishingConsensus;
let consensus_result = self.consensus_job.on_node_error(node);
let consensus_result = self.consensus_job.on_node_error(node, error);
let is_consensus_established = self.consensus_job.state() == JobSessionState::Finished;
(is_consensus_established, consensus_result)
}
@ -298,7 +302,7 @@ impl<ConsensusExecutor, ConsensusTransport, ComputationExecutor, ComputationTran
self.consensus_group.clear();
self.state = ConsensusSessionState::EstablishingConsensus;
return self.process_result(Err(Error::ConsensusUnreachable)).map(|_| unreachable!());
return self.process_result(Err(Error::ConsensusTemporaryUnreachable)).map(|_| unreachable!());
},
// in all other cases - just ignore error
ConsensusSessionState::Finished | ConsensusSessionState::Failed => return Ok(false),
@ -312,7 +316,7 @@ impl<ConsensusExecutor, ConsensusTransport, ComputationExecutor, ComputationTran
self.consensus_group.clear();
for timeouted_node in timeouted_nodes {
let timeout_result = self.consensus_job.on_node_error(&timeouted_node);
let timeout_result = self.consensus_job.on_node_error(&timeouted_node, Error::NodeDisconnected);
self.state = ConsensusSessionState::EstablishingConsensus;
self.process_result(timeout_result)?;
}
@ -557,29 +561,35 @@ mod tests {
#[test]
fn consensus_session_fails_if_node_error_received_by_uninitialized_master() {
let mut session = make_master_consensus_session(0, None, None);
assert_eq!(session.on_node_error(&NodeId::from(2)), Err(Error::ConsensusUnreachable));
assert_eq!(session.on_node_error(&NodeId::from(2), Error::AccessDenied), Err(Error::ConsensusUnreachable));
assert_eq!(session.state(), ConsensusSessionState::Failed);
}
#[test]
fn consensus_session_fails_if_node_error_received_by_uninitialized_slave_from_master() {
let mut session = make_slave_consensus_session(0, None);
assert_eq!(session.on_node_error(&NodeId::from(1)), Err(Error::ConsensusUnreachable));
assert_eq!(session.on_node_error(&NodeId::from(1), Error::AccessDenied), Err(Error::ConsensusUnreachable));
assert_eq!(session.state(), ConsensusSessionState::Failed);
}
#[test]
fn consensus_sessions_fails_with_temp_error_if_node_error_received_by_uninitialized_slave_from_master() {
let mut session = make_slave_consensus_session(0, None);
assert_eq!(session.on_node_error(&NodeId::from(1), Error::NodeDisconnected).unwrap_err(), Error::ConsensusTemporaryUnreachable);
}
#[test]
fn consensus_session_continues_if_node_error_received_by_master_during_establish_and_enough_nodes_left() {
let mut session = make_master_consensus_session(1, None, None);
session.initialize(vec![NodeId::from(1), NodeId::from(2), NodeId::from(3)].into_iter().collect()).unwrap();
assert_eq!(session.on_node_error(&NodeId::from(2)), Ok(false));
assert_eq!(session.on_node_error(&NodeId::from(2), Error::AccessDenied), Ok(false));
}
#[test]
fn consensus_session_fails_if_node_error_received_by_master_during_establish_and_not_enough_nodes_left() {
let mut session = make_master_consensus_session(1, None, None);
session.initialize(vec![NodeId::from(1), NodeId::from(2)].into_iter().collect()).unwrap();
assert_eq!(session.on_node_error(&NodeId::from(2)), Err(Error::ConsensusUnreachable));
assert_eq!(session.on_node_error(&NodeId::from(2), Error::AccessDenied), Err(Error::ConsensusUnreachable));
assert_eq!(session.state(), ConsensusSessionState::Failed);
}
@ -590,7 +600,7 @@ mod tests {
session.on_consensus_message(&NodeId::from(2), &ConsensusMessage::ConfirmConsensusInitialization(ConfirmConsensusInitialization {
is_confirmed: true,
})).unwrap();
assert_eq!(session.on_node_error(&NodeId::from(2)), Ok(false));
assert_eq!(session.on_node_error(&NodeId::from(2), Error::AccessDenied), Ok(false));
assert_eq!(session.state(), ConsensusSessionState::ConsensusEstablished);
}
@ -601,7 +611,7 @@ mod tests {
session.on_consensus_message(&NodeId::from(2), &ConsensusMessage::ConfirmConsensusInitialization(ConfirmConsensusInitialization {
is_confirmed: true,
})).unwrap();
assert_eq!(session.on_node_error(&NodeId::from(3)), Ok(false));
assert_eq!(session.on_node_error(&NodeId::from(3), Error::AccessDenied), Ok(false));
assert_eq!(session.state(), ConsensusSessionState::ConsensusEstablished);
}
@ -612,7 +622,7 @@ mod tests {
session.on_consensus_message(&NodeId::from(2), &ConsensusMessage::ConfirmConsensusInitialization(ConfirmConsensusInitialization {
is_confirmed: true,
})).unwrap();
assert_eq!(session.on_node_error(&NodeId::from(2)), Err(Error::ConsensusUnreachable));
assert_eq!(session.on_node_error(&NodeId::from(2), Error::AccessDenied), Err(Error::ConsensusUnreachable));
assert_eq!(session.state(), ConsensusSessionState::Failed);
}
@ -627,8 +637,8 @@ mod tests {
is_confirmed: true,
})).unwrap();
session.disseminate_jobs(SquaredSumJobExecutor, DummyJobTransport::default(), false).unwrap();
assert_eq!(session.on_node_error(&NodeId::from(3)), Ok(false));
assert_eq!(session.on_node_error(&NodeId::from(4)), Ok(false));
assert_eq!(session.on_node_error(&NodeId::from(3), Error::AccessDenied), Ok(false));
assert_eq!(session.on_node_error(&NodeId::from(4), Error::AccessDenied), Ok(false));
assert_eq!(session.state(), ConsensusSessionState::WaitingForPartialResults);
}
@ -645,12 +655,12 @@ mod tests {
session.on_consensus_message(&NodeId::from(3), &ConsensusMessage::ConfirmConsensusInitialization(ConfirmConsensusInitialization {
is_confirmed: true,
})).unwrap();
assert_eq!(session.on_node_error(&NodeId::from(2)), Ok(true));
assert_eq!(session.on_node_error(&NodeId::from(2), Error::AccessDenied), Ok(true));
assert_eq!(session.state(), ConsensusSessionState::ConsensusEstablished);
session.disseminate_jobs(SquaredSumJobExecutor, DummyJobTransport::default(), false).unwrap();
assert_eq!(session.state(), ConsensusSessionState::WaitingForPartialResults);
assert_eq!(session.on_node_error(&NodeId::from(3)), Ok(false));
assert_eq!(session.on_node_error(&NodeId::from(3), Error::AccessDenied), Ok(false));
assert_eq!(session.state(), ConsensusSessionState::EstablishingConsensus);
}
@ -662,14 +672,14 @@ mod tests {
is_confirmed: true,
})).unwrap();
session.disseminate_jobs(SquaredSumJobExecutor, DummyJobTransport::default(), false).unwrap();
assert_eq!(session.on_node_error(&NodeId::from(2)), Err(Error::ConsensusUnreachable));
assert_eq!(session.on_node_error(&NodeId::from(2), Error::AccessDenied), Err(Error::ConsensusUnreachable));
assert_eq!(session.state(), ConsensusSessionState::Failed);
}
#[test]
fn consensus_session_fails_if_uninitialized_session_timeouts() {
let mut session = make_master_consensus_session(1, None, None);
assert_eq!(session.on_session_timeout(), Err(Error::ConsensusUnreachable));
assert_eq!(session.on_session_timeout(), Err(Error::ConsensusTemporaryUnreachable));
}
#[test]
@ -759,13 +769,13 @@ mod tests {
is_confirmed: true,
})).unwrap();
assert_eq!(session.on_node_error(&NodeId::from(2)).unwrap(), true);
assert_eq!(session.on_node_error(&NodeId::from(2), Error::AccessDenied).unwrap(), true);
assert_eq!(session.state(), ConsensusSessionState::ConsensusEstablished);
session.disseminate_jobs(SquaredSumJobExecutor, DummyJobTransport::default(), false).unwrap();
assert_eq!(session.state(), ConsensusSessionState::WaitingForPartialResults);
assert_eq!(session.on_node_error(&NodeId::from(3)).unwrap(), false);
assert_eq!(session.on_node_error(&NodeId::from(3), Error::AccessDenied).unwrap(), false);
assert_eq!(session.state(), ConsensusSessionState::EstablishingConsensus);
session.on_consensus_message(&NodeId::from(4), &ConsensusMessage::ConfirmConsensusInitialization(ConfirmConsensusInitialization {

View File

@ -131,7 +131,7 @@ impl JobExecutor for DecryptionJob {
}
fn process_partial_request(&mut self, partial_request: PartialDecryptionRequest) -> Result<JobPartialRequestAction<PartialDecryptionResponse>, Error> {
let key_version = self.key_share.version(&self.key_version).map_err(|e| Error::KeyStorage(e.into()))?;
let key_version = self.key_share.version(&self.key_version)?;
if partial_request.other_nodes_ids.len() != self.key_share.threshold
|| partial_request.other_nodes_ids.contains(&self.self_node_id)
|| partial_request.other_nodes_ids.iter().any(|n| !key_version.id_numbers.contains_key(n)) {

View File

@ -101,8 +101,8 @@ struct JobSessionData<PartialJobResponse> {
struct ActiveJobSessionData<PartialJobResponse> {
/// Active partial requests.
requests: BTreeSet<NodeId>,
/// Rejects to partial requests.
rejects: BTreeSet<NodeId>,
/// Rejects to partial requests (maps to true, if reject is fatal).
rejects: BTreeMap<NodeId, bool>,
/// Received partial responses.
responses: BTreeMap<NodeId, PartialJobResponse>,
}
@ -149,7 +149,7 @@ impl<Executor, Transport> JobSession<Executor, Transport> where Executor: JobExe
/// Get rejects.
#[cfg(test)]
pub fn rejects(&self) -> &BTreeSet<NodeId> {
pub fn rejects(&self) -> &BTreeMap<NodeId, bool> {
debug_assert!(self.meta.self_node_id == self.meta.master_node_id);
&self.data.active_data.as_ref()
@ -201,7 +201,11 @@ impl<Executor, Transport> JobSession<Executor, Transport> where Executor: JobExe
debug_assert!(self.meta.self_node_id == self.meta.master_node_id);
if nodes.len() < self.meta.threshold + 1 {
return Err(Error::ConsensusUnreachable);
return Err(if self.meta.configured_nodes_count < self.meta.threshold + 1 {
Error::ConsensusUnreachable
} else {
Error::ConsensusTemporaryUnreachable
});
}
if self.data.state != JobSessionState::Inactive {
@ -211,7 +215,7 @@ impl<Executor, Transport> JobSession<Executor, Transport> where Executor: JobExe
// result from self
let active_data = ActiveJobSessionData {
requests: nodes.clone(),
rejects: BTreeSet::new(),
rejects: BTreeMap::new(),
responses: BTreeMap::new(),
};
let waits_for_self = active_data.requests.contains(&self.meta.self_node_id);
@ -295,13 +299,14 @@ impl<Executor, Transport> JobSession<Executor, Transport> where Executor: JobExe
match self.executor.check_partial_response(node, &response)? {
JobPartialResponseAction::Ignore => Ok(()),
JobPartialResponseAction::Reject => {
active_data.rejects.insert(node.clone());
// direct reject is always considered as fatal
active_data.rejects.insert(node.clone(), true);
if active_data.requests.len() + active_data.responses.len() >= self.meta.threshold + 1 {
return Ok(());
}
self.data.state = JobSessionState::Failed;
Err(Error::ConsensusUnreachable)
Err(consensus_unreachable(&active_data.rejects))
},
JobPartialResponseAction::Accept => {
active_data.responses.insert(node.clone(), response);
@ -316,22 +321,26 @@ impl<Executor, Transport> JobSession<Executor, Transport> where Executor: JobExe
}
/// When error from node is received.
pub fn on_node_error(&mut self, node: &NodeId) -> Result<(), Error> {
pub fn on_node_error(&mut self, node: &NodeId, error: Error) -> Result<(), Error> {
if self.meta.self_node_id != self.meta.master_node_id {
if node != &self.meta.master_node_id {
return Ok(());
}
self.data.state = JobSessionState::Failed;
return Err(Error::ConsensusUnreachable);
return Err(if !error.is_non_fatal() {
Error::ConsensusUnreachable
} else {
Error::ConsensusTemporaryUnreachable
});
}
if let Some(active_data) = self.data.active_data.as_mut() {
if active_data.rejects.contains(node) {
if active_data.rejects.contains_key(node) {
return Ok(());
}
if active_data.requests.remove(node) || active_data.responses.remove(node).is_some() {
active_data.rejects.insert(node.clone());
active_data.rejects.insert(node.clone(), !error.is_non_fatal());
if self.data.state == JobSessionState::Finished && active_data.responses.len() < self.meta.threshold + 1 {
self.data.state = JobSessionState::Active;
}
@ -340,7 +349,7 @@ impl<Executor, Transport> JobSession<Executor, Transport> where Executor: JobExe
}
self.data.state = JobSessionState::Failed;
return Err(Error::ConsensusUnreachable);
return Err(consensus_unreachable(&active_data.rejects));
}
}
@ -354,7 +363,8 @@ impl<Executor, Transport> JobSession<Executor, Transport> where Executor: JobExe
}
self.data.state = JobSessionState::Failed;
Err(Error::ConsensusUnreachable)
// we have started session => consensus is possible in theory, but now it has failed with timeout
Err(Error::ConsensusTemporaryUnreachable)
}
}
@ -368,6 +378,16 @@ impl<PartialJobResponse> JobPartialRequestAction<PartialJobResponse> {
}
}
/// Returns appropriate 'consensus unreachable' error.
fn consensus_unreachable(rejects: &BTreeMap<NodeId, bool>) -> Error {
// when >= 50% of nodes have responded with fatal reject => ConsensusUnreachable
if rejects.values().filter(|r| **r).count() >= rejects.len() / 2 {
Error::ConsensusUnreachable
} else {
Error::ConsensusTemporaryUnreachable
}
}
#[cfg(test)]
pub mod tests {
use std::collections::{VecDeque, BTreeMap, BTreeSet};
@ -414,11 +434,27 @@ pub mod tests {
}
pub fn make_master_session_meta(threshold: usize) -> SessionMeta {
SessionMeta { id: SessionId::default(), master_node_id: NodeId::from(1), self_node_id: NodeId::from(1), threshold: threshold }
SessionMeta { id: SessionId::default(), master_node_id: NodeId::from(1), self_node_id: NodeId::from(1), threshold: threshold,
configured_nodes_count: 5, connected_nodes_count: 5 }
}
pub fn make_slave_session_meta(threshold: usize) -> SessionMeta {
SessionMeta { id: SessionId::default(), master_node_id: NodeId::from(1), self_node_id: NodeId::from(2), threshold: threshold }
SessionMeta { id: SessionId::default(), master_node_id: NodeId::from(1), self_node_id: NodeId::from(2), threshold: threshold,
configured_nodes_count: 5, connected_nodes_count: 5 }
}
#[test]
fn job_initialize_fails_if_not_enough_nodes_for_threshold_total() {
let mut job = JobSession::new(make_master_session_meta(1), SquaredSumJobExecutor, DummyJobTransport::default());
job.meta.configured_nodes_count = 1;
assert_eq!(job.initialize(vec![Public::from(1)].into_iter().collect(), None, false).unwrap_err(), Error::ConsensusUnreachable);
}
#[test]
fn job_initialize_fails_if_not_enough_nodes_for_threshold_connected() {
let mut job = JobSession::new(make_master_session_meta(1), SquaredSumJobExecutor, DummyJobTransport::default());
job.meta.connected_nodes_count = 3;
assert_eq!(job.initialize(vec![Public::from(1)].into_iter().collect(), None, false).unwrap_err(), Error::ConsensusTemporaryUnreachable);
}
#[test]
@ -528,7 +564,7 @@ pub mod tests {
fn job_node_error_ignored_when_slave_disconnects_from_slave() {
let mut job = JobSession::new(make_slave_session_meta(1), SquaredSumJobExecutor, DummyJobTransport::default());
assert_eq!(job.state(), JobSessionState::Inactive);
job.on_node_error(&NodeId::from(3)).unwrap();
job.on_node_error(&NodeId::from(3), Error::AccessDenied).unwrap();
assert_eq!(job.state(), JobSessionState::Inactive);
}
@ -536,7 +572,7 @@ pub mod tests {
fn job_node_error_leads_to_fail_when_slave_disconnects_from_master() {
let mut job = JobSession::new(make_slave_session_meta(1), SquaredSumJobExecutor, DummyJobTransport::default());
assert_eq!(job.state(), JobSessionState::Inactive);
assert_eq!(job.on_node_error(&NodeId::from(1)).unwrap_err(), Error::ConsensusUnreachable);
assert_eq!(job.on_node_error(&NodeId::from(1), Error::AccessDenied).unwrap_err(), Error::ConsensusUnreachable);
assert_eq!(job.state(), JobSessionState::Failed);
}
@ -546,7 +582,7 @@ pub mod tests {
job.initialize(vec![Public::from(1), Public::from(2), Public::from(3)].into_iter().collect(), None, false).unwrap();
assert_eq!(job.state(), JobSessionState::Active);
job.on_partial_response(&NodeId::from(2), 3).unwrap();
job.on_node_error(&NodeId::from(2)).unwrap();
job.on_node_error(&NodeId::from(2), Error::AccessDenied).unwrap();
assert_eq!(job.state(), JobSessionState::Active);
}
@ -555,7 +591,7 @@ pub mod tests {
let mut job = JobSession::new(make_master_session_meta(1), SquaredSumJobExecutor, DummyJobTransport::default());
job.initialize(vec![Public::from(1), Public::from(2)].into_iter().collect(), None, false).unwrap();
assert_eq!(job.state(), JobSessionState::Active);
job.on_node_error(&NodeId::from(3)).unwrap();
job.on_node_error(&NodeId::from(3), Error::AccessDenied).unwrap();
assert_eq!(job.state(), JobSessionState::Active);
}
@ -564,7 +600,7 @@ pub mod tests {
let mut job = JobSession::new(make_master_session_meta(1), SquaredSumJobExecutor, DummyJobTransport::default());
job.initialize(vec![Public::from(1), Public::from(2), Public::from(3)].into_iter().collect(), None, false).unwrap();
assert_eq!(job.state(), JobSessionState::Active);
job.on_node_error(&NodeId::from(3)).unwrap();
job.on_node_error(&NodeId::from(3), Error::AccessDenied).unwrap();
assert_eq!(job.state(), JobSessionState::Active);
}
@ -573,7 +609,7 @@ pub mod tests {
let mut job = JobSession::new(make_master_session_meta(1), SquaredSumJobExecutor, DummyJobTransport::default());
job.initialize(vec![Public::from(1), Public::from(2)].into_iter().collect(), None, false).unwrap();
assert_eq!(job.state(), JobSessionState::Active);
assert_eq!(job.on_node_error(&NodeId::from(2)).unwrap_err(), Error::ConsensusUnreachable);
assert_eq!(job.on_node_error(&NodeId::from(2), Error::AccessDenied).unwrap_err(), Error::ConsensusUnreachable);
assert_eq!(job.state(), JobSessionState::Failed);
}
@ -592,4 +628,34 @@ pub mod tests {
assert_eq!(job.state(), JobSessionState::Active);
assert!(job.transport().is_empty_response());
}
#[test]
fn job_fails_with_temp_error_if_more_than_half_nodes_respond_with_temp_error() {
let mut job = JobSession::new(make_master_session_meta(2), SquaredSumJobExecutor, DummyJobTransport::default());
job.initialize(vec![Public::from(1), Public::from(2), Public::from(3), Public::from(4)].into_iter().collect(), None, false).unwrap();
job.on_node_error(&NodeId::from(2), Error::NodeDisconnected).unwrap();
assert_eq!(job.on_node_error(&NodeId::from(3), Error::NodeDisconnected).unwrap_err(), Error::ConsensusTemporaryUnreachable);
}
#[test]
fn job_fails_with_temp_error_if_more_than_half_rejects_are_temp() {
let mut job = JobSession::new(make_master_session_meta(2), SquaredSumJobExecutor, DummyJobTransport::default());
job.initialize(vec![Public::from(1), Public::from(2), Public::from(3), Public::from(4)].into_iter().collect(), None, false).unwrap();
job.on_node_error(&NodeId::from(2), Error::NodeDisconnected).unwrap();
assert_eq!(job.on_node_error(&NodeId::from(3), Error::NodeDisconnected).unwrap_err(), Error::ConsensusTemporaryUnreachable);
}
#[test]
fn job_fails_if_more_than_half_rejects_are_non_temp() {
let mut job = JobSession::new(make_master_session_meta(2), SquaredSumJobExecutor, DummyJobTransport::default());
job.initialize(vec![Public::from(1), Public::from(2), Public::from(3), Public::from(4)].into_iter().collect(), None, false).unwrap();
job.on_node_error(&NodeId::from(2), Error::AccessDenied).unwrap();
assert_eq!(job.on_node_error(&NodeId::from(3), Error::AccessDenied).unwrap_err(), Error::ConsensusUnreachable);
}
#[test]
fn job_fails_with_temp_error_when_temp_error_is_reported_by_master_node() {
let mut job = JobSession::new(make_slave_session_meta(2), SquaredSumJobExecutor, DummyJobTransport::default());
assert_eq!(job.on_node_error(&NodeId::from(1), Error::NodeDisconnected).unwrap_err(), Error::ConsensusTemporaryUnreachable);
}
}

View File

@ -108,7 +108,7 @@ impl JobExecutor for EcdsaSigningJob {
fn process_partial_request(&mut self, partial_request: EcdsaPartialSigningRequest) -> Result<JobPartialRequestAction<EcdsaPartialSigningResponse>, Error> {
let inversed_nonce_coeff_mul_nonce = math::compute_secret_mul(&partial_request.inversed_nonce_coeff, &self.inv_nonce_share)?;
let key_version = self.key_share.version(&self.key_version).map_err(|e| Error::KeyStorage(e.into()))?;
let key_version = self.key_share.version(&self.key_version)?;
let signature_r = math::compute_ecdsa_r(&self.nonce_public)?;
let inv_nonce_mul_secret = math::compute_secret_mul(&inversed_nonce_coeff_mul_nonce, &key_version.secret_share)?;
let partial_signature_s = math::compute_ecdsa_s_share(
@ -134,7 +134,7 @@ impl JobExecutor for EcdsaSigningJob {
}
fn compute_response(&self, partial_responses: &BTreeMap<NodeId, EcdsaPartialSigningResponse>) -> Result<Signature, Error> {
let key_version = self.key_share.version(&self.key_version).map_err(|e| Error::KeyStorage(e.into()))?;
let key_version = self.key_share.version(&self.key_version)?;
if partial_responses.keys().any(|n| !key_version.id_numbers.contains_key(n)) {
return Err(Error::InvalidMessage);
}

View File

@ -107,7 +107,7 @@ impl JobExecutor for SchnorrSigningJob {
}
fn process_partial_request(&mut self, partial_request: SchnorrPartialSigningRequest) -> Result<JobPartialRequestAction<SchnorrPartialSigningResponse>, Error> {
let key_version = self.key_share.version(&self.key_version).map_err(|e| Error::KeyStorage(e.into()))?;
let key_version = self.key_share.version(&self.key_version)?;
if partial_request.other_nodes_ids.len() != self.key_share.threshold
|| partial_request.other_nodes_ids.contains(&self.self_node_id)
|| partial_request.other_nodes_ids.iter().any(|n| !key_version.id_numbers.contains_key(n)) {

View File

@ -18,8 +18,8 @@ use std::fmt;
use std::collections::{BTreeSet, BTreeMap};
use ethkey::Secret;
use key_server_cluster::SessionId;
use super::{SerializableH256, SerializablePublic, SerializableSecret, SerializableSignature,
SerializableMessageHash, SerializableRequester, SerializableAddress};
use super::{Error, SerializableH256, SerializablePublic, SerializableSecret,
SerializableSignature, SerializableMessageHash, SerializableRequester, SerializableAddress};
pub type MessageSessionId = SerializableH256;
pub type MessageNodeId = SerializablePublic;
@ -346,7 +346,7 @@ pub struct SessionError {
/// Session-level nonce.
pub session_nonce: u64,
/// Error message.
pub error: String,
pub error: Error,
}
/// When session is completed.
@ -390,7 +390,7 @@ pub struct EncryptionSessionError {
/// Session-level nonce.
pub session_nonce: u64,
/// Error message.
pub error: String,
pub error: Error,
}
/// Node is asked to be part of consensus group.
@ -509,7 +509,7 @@ pub struct SchnorrSigningSessionError {
/// Session-level nonce.
pub session_nonce: u64,
/// Error message.
pub error: String,
pub error: Error,
}
/// Schnorr signing session completed.
@ -662,7 +662,7 @@ pub struct EcdsaSigningSessionError {
/// Session-level nonce.
pub session_nonce: u64,
/// Error message.
pub error: String,
pub error: Error,
}
/// ECDSA signing session completed.
@ -769,7 +769,7 @@ pub struct DecryptionSessionError {
/// Session-level nonce.
pub session_nonce: u64,
/// Error message.
pub error: String,
pub error: Error,
}
/// When decryption session is completed.
@ -936,7 +936,7 @@ pub struct ServersSetChangeError {
/// Session-level nonce.
pub session_nonce: u64,
/// Error message.
pub error: String,
pub error: Error,
}
/// When servers set change session is completed.
@ -999,7 +999,7 @@ pub struct ShareAddError {
/// Session-level nonce.
pub session_nonce: u64,
/// Error message.
pub error: String,
pub error: Error,
}
/// Key versions are requested.
@ -1038,7 +1038,7 @@ pub struct KeyVersionsError {
/// Session-level nonce.
pub session_nonce: u64,
/// Error message.
pub error: String,
pub error: Error,
}
impl Message {

View File

@ -14,14 +14,10 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::fmt;
use std::io::Error as IoError;
use ethkey;
use crypto;
use super::types::all::ServerKeyId;
use super::types::ServerKeyId;
pub use super::traits::NodeKeyPair;
pub use super::types::all::{NodeId, Requester, EncryptedDocumentKeyShadow};
pub use super::types::{Error, NodeId, Requester, EncryptedDocumentKeyShadow};
pub use super::acl_storage::AclStorage;
pub use super::key_storage::{KeyStorage, DocumentKeyShare, DocumentKeyShareVersion};
pub use super::key_server_set::{is_migration_required, KeyServerSet, KeyServerSetSnapshot, KeyServerSetMigration};
@ -53,126 +49,10 @@ pub struct SessionMeta {
pub self_node_id: NodeId,
/// Session threshold.
pub threshold: usize,
}
/// Errors which can occur during encryption/decryption session
#[derive(Clone, Debug, PartialEq)]
pub enum Error {
/// Invalid node address has been passed.
InvalidNodeAddress,
/// Invalid node id has been passed.
InvalidNodeId,
/// Session with the given id already exists.
DuplicateSessionId,
/// Session with the same id already completed.
CompletedSessionId,
/// Session is not ready to start yet (required data is not ready).
NotStartedSessionId,
/// Session with the given id is unknown.
InvalidSessionId,
/// Invalid number of nodes.
/// There must be at least two nodes participating in encryption.
/// There must be at least one node participating in decryption.
InvalidNodesCount,
/// Node which is required to start encryption/decryption session is not a part of cluster.
InvalidNodesConfiguration,
/// Invalid threshold value has been passed.
/// Threshold value must be in [0; n - 1], where n is a number of nodes participating in the encryption.
InvalidThreshold,
/// Current state of encryption/decryption session does not allow to proceed request.
/// Reschedule this request for later processing.
TooEarlyForRequest,
/// Current state of encryption/decryption session does not allow to proceed request.
/// This means that either there is some comm-failure or node is misbehaving/cheating.
InvalidStateForRequest,
/// Request cannot be sent/received from this node.
InvalidNodeForRequest,
/// Message or some data in the message was recognized as invalid.
/// This means that node is misbehaving/cheating.
InvalidMessage,
/// Message version is not supported.
InvalidMessageVersion,
/// Message is invalid because of replay-attack protection.
ReplayProtection,
/// Connection to node, required for this session is not established.
NodeDisconnected,
/// Node is missing requested key share.
MissingKeyShare,
/// Cryptographic error.
EthKey(String),
/// I/O error has occured.
Io(String),
/// Deserialization error has occured.
Serde(String),
/// Key storage error.
KeyStorage(String),
/// Consensus is unreachable.
ConsensusUnreachable,
/// Acl storage error.
AccessDenied,
/// Can't start session, because exclusive session is active.
ExclusiveSessionActive,
/// Can't start exclusive session, because there are other active sessions.
HasActiveSessions,
/// Insufficient requester data.
InsufficientRequesterData(String),
}
impl From<ethkey::Error> for Error {
fn from(err: ethkey::Error) -> Self {
Error::EthKey(err.into())
}
}
impl From<crypto::Error> for Error {
fn from(err: crypto::Error) -> Self {
Error::EthKey(err.into())
}
}
impl From<IoError> for Error {
fn from(err: IoError) -> Self {
Error::Io(err.to_string())
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::InvalidNodeAddress => write!(f, "invalid node address has been passed"),
Error::InvalidNodeId => write!(f, "invalid node id has been passed"),
Error::DuplicateSessionId => write!(f, "session with the same id is already registered"),
Error::CompletedSessionId => write!(f, "session with the same id is already completed"),
Error::NotStartedSessionId => write!(f, "not enough data to start session with the given id"),
Error::InvalidSessionId => write!(f, "invalid session id has been passed"),
Error::InvalidNodesCount => write!(f, "invalid nodes count"),
Error::InvalidNodesConfiguration => write!(f, "invalid nodes configuration"),
Error::InvalidThreshold => write!(f, "invalid threshold value has been passed"),
Error::TooEarlyForRequest => write!(f, "session is not yet ready to process this request"),
Error::InvalidStateForRequest => write!(f, "session is in invalid state for processing this request"),
Error::InvalidNodeForRequest => write!(f, "invalid node for this request"),
Error::InvalidMessage => write!(f, "invalid message is received"),
Error::InvalidMessageVersion => write!(f, "unsupported message is received"),
Error::ReplayProtection => write!(f, "replay message is received"),
Error::NodeDisconnected => write!(f, "node required for this operation is currently disconnected"),
Error::MissingKeyShare => write!(f, "requested key share version is not found"),
Error::EthKey(ref e) => write!(f, "cryptographic error {}", e),
Error::Io(ref e) => write!(f, "i/o error {}", e),
Error::Serde(ref e) => write!(f, "serde error {}", e),
Error::KeyStorage(ref e) => write!(f, "key storage error {}", e),
Error::ConsensusUnreachable => write!(f, "Consensus unreachable"),
Error::AccessDenied => write!(f, "Access denied"),
Error::ExclusiveSessionActive => write!(f, "Exclusive session active"),
Error::HasActiveSessions => write!(f, "Unable to start exclusive session"),
Error::InsufficientRequesterData(ref e) => write!(f, "Insufficient requester data: {}", e),
}
}
}
impl Into<String> for Error {
fn into(self) -> String {
format!("{}", self)
}
/// Count of all configured key server nodes (valid at session start time).
pub configured_nodes_count: usize,
/// Count of all connected key server nodes (valid at session start time).
pub connected_nodes_count: usize,
}
mod admin_sessions;

View File

@ -25,7 +25,7 @@ use ethkey::public_to_address;
use hash::keccak;
use ethereum_types::{H256, Address};
use bytes::Bytes;
use types::all::{Error, Public, NodeAddress, NodeId};
use types::{Error, Public, NodeAddress, NodeId};
use trusted_client::TrustedClient;
use helpers::{get_confirmed_block_hash, REQUEST_CONFIRMATIONS_REQUIRED};
use {NodeKeyPair};

View File

@ -21,7 +21,7 @@ use tiny_keccak::Keccak;
use ethereum_types::{H256, Address};
use ethkey::{Secret, Public, public_to_address};
use kvdb::KeyValueDB;
use types::all::{Error, ServerKeyId, NodeId};
use types::{Error, ServerKeyId, NodeId};
use serialization::{SerializablePublic, SerializableSecret, SerializableH256, SerializableAddress};
/// Key of version value.
@ -419,7 +419,7 @@ pub mod tests {
use ethereum_types::{Address, H256};
use ethkey::{Random, Generator, Public, Secret, public_to_address};
use kvdb_rocksdb::Database;
use types::all::{Error, ServerKeyId};
use types::{Error, ServerKeyId};
use super::{DB_META_KEY_VERSION, CURRENT_VERSION, KeyStorage, PersistentKeyStorage, DocumentKeyShare,
DocumentKeyShareVersion, CurrentSerializableDocumentKeyShare, upgrade_db, SerializableDocumentKeyShareV0,
SerializableDocumentKeyShareV1, SerializableDocumentKeyShareV2, SerializableDocumentKeyShareVersionV2};

View File

@ -76,7 +76,7 @@ use ethcore::client::Client;
use ethcore::miner::Miner;
use sync::SyncProvider;
pub use types::all::{ServerKeyId, EncryptedDocumentKey, RequestSignature, Public,
pub use types::{ServerKeyId, EncryptedDocumentKey, RequestSignature, Public,
Error, NodeAddress, ContractAddress, ServiceConfiguration, ClusterConfiguration};
pub use traits::{NodeKeyPair, KeyServer};
pub use self::node_key_pair::{PlainNodeKeyPair, KeyStoreNodeKeyPair};

View File

@ -29,7 +29,7 @@ use url::percent_encoding::percent_decode;
use traits::KeyServer;
use serialization::{SerializableEncryptedDocumentKeyShadow, SerializableBytes, SerializablePublic};
use types::all::{Error, Public, MessageHash, NodeAddress, RequestSignature, ServerKeyId,
use types::{Error, Public, MessageHash, NodeAddress, RequestSignature, ServerKeyId,
EncryptedDocumentKey, EncryptedDocumentKeyShadow, NodeId};
/// Key server http-requests listener. Available requests:
@ -271,15 +271,16 @@ fn return_bytes<T: Serialize>(req_uri: &Uri, result: Result<Option<T>, Error>) -
}
fn return_error(err: Error) -> HttpResponse {
let mut res = match err {
Error::InsufficientRequesterData(_) => HttpResponse::new().with_status(HttpStatusCode::BadRequest),
Error::AccessDenied => HttpResponse::new().with_status(HttpStatusCode::Forbidden),
Error::DocumentNotFound => HttpResponse::new().with_status(HttpStatusCode::NotFound),
Error::Hyper(_) => HttpResponse::new().with_status(HttpStatusCode::BadRequest),
Error::Serde(_) => HttpResponse::new().with_status(HttpStatusCode::BadRequest),
Error::Database(_) => HttpResponse::new().with_status(HttpStatusCode::InternalServerError),
Error::Internal(_) => HttpResponse::new().with_status(HttpStatusCode::InternalServerError),
};
let mut res = HttpResponse::new().with_status(match err {
Error::AccessDenied | Error::ConsensusUnreachable | Error::ConsensusTemporaryUnreachable =>
HttpStatusCode::Forbidden,
Error::ServerKeyIsNotFound | Error::DocumentKeyIsNotFound =>
HttpStatusCode::NotFound,
Error::InsufficientRequesterData(_) | Error::Hyper(_) | Error::Serde(_)
| Error::DocumentKeyAlreadyStored | Error::ServerKeyAlreadyGenerated =>
HttpStatusCode::BadRequest,
_ => HttpStatusCode::InternalServerError,
});
// return error text. ignore errors when returning error
let error_text = format!("\"{}\"", err);
@ -377,7 +378,7 @@ mod tests {
use ethkey::Public;
use traits::KeyServer;
use key_server::tests::DummyKeyServer;
use types::all::NodeAddress;
use types::NodeAddress;
use super::{parse_request, Request, KeyServerHttpListener};
#[test]

View File

@ -23,7 +23,7 @@ mod tasks_queue;
use std::collections::BTreeSet;
use std::sync::Arc;
use traits::{ServerKeyGenerator, DocumentKeyServer, MessageSigner, AdminSessionsServer, KeyServer};
use types::all::{Error, Public, MessageHash, EncryptedMessageSignature, RequestSignature, ServerKeyId,
use types::{Error, Public, MessageHash, EncryptedMessageSignature, RequestSignature, ServerKeyId,
EncryptedDocumentKey, EncryptedDocumentKeyShadow, NodeId, Requester};
/// Available API mask.

View File

@ -315,7 +315,7 @@ impl ServiceContractListener {
Ok(Some(server_key)) => {
data.contract.publish_generated_server_key(&origin, server_key_id, server_key)
},
Err(ref error) if is_internal_error(error) => Err(format!("{}", error)),
Err(ref error) if error.is_non_fatal() => Err(format!("{}", error)),
Err(ref error) => {
// ignore error as we're already processing an error
let _ = data.contract.publish_server_key_generation_error(&origin, server_key_id)
@ -335,7 +335,7 @@ impl ServiceContractListener {
Ok(None) => {
data.contract.publish_server_key_retrieval_error(&origin, server_key_id)
}
Err(ref error) if is_internal_error(error) => Err(format!("{}", error)),
Err(ref error) if error.is_non_fatal() => Err(format!("{}", error)),
Err(ref error) => {
// ignore error as we're already processing an error
let _ = data.contract.publish_server_key_retrieval_error(&origin, server_key_id)
@ -349,7 +349,7 @@ impl ServiceContractListener {
/// Store document key.
fn store_document_key(data: &Arc<ServiceContractListenerData>, origin: Address, server_key_id: &ServerKeyId, author: &Address, common_point: &Public, encrypted_point: &Public) -> Result<(), String> {
let store_result = data.key_storage.get(server_key_id)
.and_then(|key_share| key_share.ok_or(Error::DocumentNotFound))
.and_then(|key_share| key_share.ok_or(Error::ServerKeyIsNotFound))
.and_then(|key_share| check_encrypted_data(Some(&key_share)).map(|_| key_share).map_err(Into::into))
.and_then(|key_share| update_encrypted_data(&data.key_storage, server_key_id.clone(), key_share,
author.clone(), common_point.clone(), encrypted_point.clone()).map_err(Into::into));
@ -357,7 +357,7 @@ impl ServiceContractListener {
Ok(()) => {
data.contract.publish_stored_document_key(&origin, server_key_id)
},
Err(ref error) if is_internal_error(&error) => Err(format!("{}", error)),
Err(ref error) if error.is_non_fatal() => Err(format!("{}", error)),
Err(ref error) => {
// ignore error as we're already processing an error
let _ = data.contract.publish_document_key_store_error(&origin, server_key_id)
@ -372,17 +372,16 @@ impl ServiceContractListener {
fn retrieve_document_key_common(data: &Arc<ServiceContractListenerData>, origin: Address, server_key_id: &ServerKeyId, requester: &Address) -> Result<(), String> {
let retrieval_result = data.acl_storage.check(requester.clone(), server_key_id)
.and_then(|is_allowed| if !is_allowed { Err(Error::AccessDenied) } else { Ok(()) })
.and_then(|_| data.key_storage.get(server_key_id).and_then(|key_share| key_share.ok_or(Error::DocumentNotFound)))
.and_then(|_| data.key_storage.get(server_key_id).and_then(|key_share| key_share.ok_or(Error::ServerKeyIsNotFound)))
.and_then(|key_share| key_share.common_point
.ok_or(Error::DocumentNotFound)
.and_then(|common_point| math::make_common_shadow_point(key_share.threshold, common_point)
.map_err(|e| Error::Internal(e.into())))
.ok_or(Error::DocumentKeyIsNotFound)
.and_then(|common_point| math::make_common_shadow_point(key_share.threshold, common_point))
.map(|common_point| (common_point, key_share.threshold)));
match retrieval_result {
Ok((common_point, threshold)) => {
data.contract.publish_retrieved_document_key_common(&origin, server_key_id, requester, common_point, threshold)
},
Err(ref error) if is_internal_error(&error) => Err(format!("{}", error)),
Err(ref error) if error.is_non_fatal() => Err(format!("{}", error)),
Err(ref error) => {
// ignore error as we're already processing an error
let _ = data.contract.publish_document_key_retrieval_error(&origin, server_key_id, requester)
@ -406,7 +405,7 @@ impl ServiceContractListener {
Ok(Some((participants, decrypted_secret, shadow))) => {
data.contract.publish_retrieved_document_key_personal(&origin, server_key_id, &requester, &participants, decrypted_secret, shadow)
},
Err(ref error) if is_internal_error(error) => Err(format!("{}", error)),
Err(ref error) if error.is_non_fatal() => Err(format!("{}", error)),
Err(ref error) => {
// ignore error as we're already processing an error
let _ = data.contract.publish_document_key_retrieval_error(&origin, server_key_id, &requester)
@ -511,15 +510,6 @@ impl ::std::fmt::Display for ServiceTask {
}
}
/// Is internal error? Internal error means that it is SS who's responsible for it, like: connectivity, db failure, ...
/// External error is caused by SS misuse, like: trying to generate duplicated key, access denied, ...
/// When internal error occurs, we just ignore request for now and will retry later.
/// When external error occurs, we reject request.
fn is_internal_error(_error: &Error) -> bool {
// TODO [Reliability]: implement me after proper is passed through network
false
}
/// Log service task result.
fn log_service_task_result(task: &ServiceTask, self_id: &Public, result: Result<(), String>) -> Result<(), String> {
match result {

View File

@ -22,7 +22,7 @@ use serde::de::{Visitor, Error as SerdeError};
use ethkey::{Public, Secret, Signature};
use ethereum_types::{H160, H256};
use bytes::Bytes;
use types::all::Requester;
use types::Requester;
macro_rules! impl_bytes_deserialize {
($name: ident, $value: expr, true) => {

View File

@ -17,7 +17,7 @@
use std::collections::BTreeSet;
use ethkey::{KeyPair, Signature, Error as EthKeyError};
use ethereum_types::{H256, Address};
use types::all::{Error, Public, ServerKeyId, MessageHash, EncryptedMessageSignature, RequestSignature, Requester,
use types::{Error, Public, ServerKeyId, MessageHash, EncryptedMessageSignature, RequestSignature, Requester,
EncryptedDocumentKey, EncryptedDocumentKeyShadow, NodeId};
/// Node key pair.

View File

@ -14,13 +14,9 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::fmt;
use std::io;
use std::net;
use std::collections::BTreeMap;
use serde_json;
use {ethkey, kvdb, bytes, ethereum_types, key_server_cluster};
use {ethkey, bytes, ethereum_types};
/// Node id.
pub type NodeId = ethkey::Public;
@ -37,25 +33,6 @@ pub type RequestSignature = ethkey::Signature;
/// Public key type.
pub use ethkey::Public;
/// Secret store error
#[derive(Debug, PartialEq)]
pub enum Error {
/// Insufficient requester data
InsufficientRequesterData(String),
/// Access to resource is denied
AccessDenied,
/// Requested document not found
DocumentNotFound,
/// Hyper error
Hyper(String),
/// Serialization/deserialization error
Serde(String),
/// Database-related error
Database(String),
/// Internal error
Internal(String),
}
/// Secret store configuration
#[derive(Debug, Clone)]
pub struct NodeAddress {
@ -136,69 +113,6 @@ pub enum Requester {
Address(ethereum_types::Address),
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
Error::InsufficientRequesterData(ref e) => write!(f, "Insufficient requester data: {}", e),
Error::AccessDenied => write!(f, "Access dened"),
Error::DocumentNotFound => write!(f, "Document not found"),
Error::Hyper(ref msg) => write!(f, "Hyper error: {}", msg),
Error::Serde(ref msg) => write!(f, "Serialization error: {}", msg),
Error::Database(ref msg) => write!(f, "Database error: {}", msg),
Error::Internal(ref msg) => write!(f, "Internal error: {}", msg),
}
}
}
impl From<serde_json::Error> for Error {
fn from(err: serde_json::Error) -> Self {
Error::Serde(err.to_string())
}
}
impl From<ethkey::Error> for Error {
fn from(err: ethkey::Error) -> Self {
Error::Internal(err.into())
}
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Error {
Error::Internal(err.to_string())
}
}
impl From<net::AddrParseError> for Error {
fn from(err: net::AddrParseError) -> Error {
Error::Internal(err.to_string())
}
}
impl From<kvdb::Error> for Error {
fn from(err: kvdb::Error) -> Self {
Error::Database(err.to_string())
}
}
impl From<key_server_cluster::Error> for Error {
fn from(err: key_server_cluster::Error) -> Self {
match err {
key_server_cluster::Error::InsufficientRequesterData(err)
=> Error::InsufficientRequesterData(err),
key_server_cluster::Error::ConsensusUnreachable
| key_server_cluster::Error::AccessDenied => Error::AccessDenied,
key_server_cluster::Error::MissingKeyShare => Error::DocumentNotFound,
_ => Error::Internal(err.into()),
}
}
}
impl Into<String> for Error {
fn into(self) -> String {
format!("{}", self)
}
}
impl Default for Requester {
fn default() -> Self {
Requester::Signature(Default::default())

View File

@ -0,0 +1,199 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::fmt;
use std::net;
use std::io::Error as IoError;
use {ethkey, crypto, kvdb};
/// Secret store error.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Error {
/// Invalid node address has been passed.
InvalidNodeAddress,
/// Invalid node id has been passed.
InvalidNodeId,
/// Session with the given id already exists.
DuplicateSessionId,
/// No active session with given id.
NoActiveSessionWithId,
/// Invalid threshold value has been passed.
/// Threshold value must be in [0; n - 1], where n is a number of nodes participating in the encryption.
NotEnoughNodesForThreshold,
/// Current state of encryption/decryption session does not allow to proceed request.
/// Reschedule this request for later processing.
TooEarlyForRequest,
/// Current state of encryption/decryption session does not allow to proceed request.
/// This means that either there is some comm-failure or node is misbehaving/cheating.
InvalidStateForRequest,
/// Request cannot be sent/received from this node.
InvalidNodeForRequest,
/// Message or some data in the message was recognized as invalid.
/// This means that node is misbehaving/cheating.
InvalidMessage,
/// Message version is not supported.
InvalidMessageVersion,
/// Message is invalid because of replay-attack protection.
ReplayProtection,
/// Connection to node, required for this session is not established.
NodeDisconnected,
/// Server key with this ID is already generated.
ServerKeyAlreadyGenerated,
/// Server key with this ID is not yet generated.
ServerKeyIsNotFound,
/// Document key with this ID is already stored.
DocumentKeyAlreadyStored,
/// Document key with this ID is not yet stored.
DocumentKeyIsNotFound,
/// Consensus is temporary unreachable. Means that something is currently blocking us from either forming
/// consensus group (like disconnecting from too many nodes, which are AGREE to partticipate in consensus)
/// or from rejecting request (disconnecting from AccessDenied-nodes).
ConsensusTemporaryUnreachable,
/// Consensus is unreachable. It doesn't mean that it will ALWAYS remain unreachable, but right NOW we have
/// enough nodes confirmed that they do not want to be a part of consensus. Example: we're connected to 10
/// of 100 nodes. Key threshold is 6 (i.e. 7 nodes are required for consensus). 4 nodes are responding with
/// reject => consensus is considered unreachable, even though another 90 nodes still can respond with OK.
ConsensusUnreachable,
/// Acl storage error.
AccessDenied,
/// Can't start session, because exclusive session is active.
ExclusiveSessionActive,
/// Can't start exclusive session, because there are other active sessions.
HasActiveSessions,
/// Insufficient requester data.
InsufficientRequesterData(String),
/// Cryptographic error.
EthKey(String),
/// I/O error has occured.
Io(String),
/// Deserialization error has occured.
Serde(String),
/// Hyper error.
Hyper(String),
/// Database-related error.
Database(String),
/// Internal error.
Internal(String),
}
impl Error {
/// Is this a fatal error? Non-fatal means that it is possible to replay the same request with a non-zero
/// chance to success. I.e. the error is not about request itself (or current environment factors that
/// are affecting request processing), but about current SecretStore state.
pub fn is_non_fatal(&self) -> bool {
match *self {
// non-fatal errors:
// session start errors => restarting session is a solution
Error::DuplicateSessionId | Error::NoActiveSessionWithId |
// unexpected message errors => restarting session/excluding node is a solution
Error::TooEarlyForRequest | Error::InvalidStateForRequest | Error::InvalidNodeForRequest |
// invalid message errors => restarting/updating/excluding node is a solution
Error::InvalidMessage | Error::InvalidMessageVersion | Error::ReplayProtection |
// connectivity problems => waiting for reconnect && restarting session is a solution
Error::NodeDisconnected |
// temporary (?) consensus problems, related to other non-fatal errors => restarting is probably (!) a solution
Error::ConsensusTemporaryUnreachable |
// exclusive session errors => waiting && restarting is a solution
Error::ExclusiveSessionActive | Error::HasActiveSessions => true,
// fatal errors:
// config-related errors
Error::InvalidNodeAddress | Error::InvalidNodeId |
// wrong session input params errors
Error::NotEnoughNodesForThreshold | Error::ServerKeyAlreadyGenerated | Error::ServerKeyIsNotFound |
Error::DocumentKeyAlreadyStored | Error::DocumentKeyIsNotFound | Error::InsufficientRequesterData(_) |
// access denied/consensus error
Error::AccessDenied | Error::ConsensusUnreachable |
// indeterminate internal errors, which could be either fatal (db failure, invalid request), or not (network error),
// but we still consider these errors as fatal
Error::EthKey(_) | Error::Serde(_) | Error::Hyper(_) | Error::Database(_) | Error::Internal(_) | Error::Io(_) => false,
}
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
Error::InvalidNodeAddress => write!(f, "invalid node address has been passed"),
Error::InvalidNodeId => write!(f, "invalid node id has been passed"),
Error::DuplicateSessionId => write!(f, "session with the same id is already registered"),
Error::NoActiveSessionWithId => write!(f, "no active session with given id"),
Error::NotEnoughNodesForThreshold => write!(f, "not enough nodes for passed threshold"),
Error::TooEarlyForRequest => write!(f, "session is not yet ready to process this request"),
Error::InvalidStateForRequest => write!(f, "session is in invalid state for processing this request"),
Error::InvalidNodeForRequest => write!(f, "invalid node for this request"),
Error::InvalidMessage => write!(f, "invalid message is received"),
Error::InvalidMessageVersion => write!(f, "unsupported message is received"),
Error::ReplayProtection => write!(f, "replay message is received"),
Error::NodeDisconnected => write!(f, "node required for this operation is currently disconnected"),
Error::ServerKeyAlreadyGenerated => write!(f, "Server key with this ID is already generated"),
Error::ServerKeyIsNotFound => write!(f, "Server key with this ID is not found"),
Error::DocumentKeyAlreadyStored => write!(f, "Document key with this ID is already stored"),
Error::DocumentKeyIsNotFound => write!(f, "Document key with this ID is not found"),
Error::ConsensusUnreachable => write!(f, "Consensus unreachable"),
Error::ConsensusTemporaryUnreachable => write!(f, "Consensus temporary unreachable"),
Error::AccessDenied => write!(f, "Access dened"),
Error::ExclusiveSessionActive => write!(f, "Exclusive session active"),
Error::HasActiveSessions => write!(f, "Unable to start exclusive session"),
Error::InsufficientRequesterData(ref e) => write!(f, "Insufficient requester data: {}", e),
Error::EthKey(ref e) => write!(f, "cryptographic error {}", e),
Error::Hyper(ref msg) => write!(f, "Hyper error: {}", msg),
Error::Serde(ref msg) => write!(f, "Serialization error: {}", msg),
Error::Database(ref msg) => write!(f, "Database error: {}", msg),
Error::Internal(ref msg) => write!(f, "Internal error: {}", msg),
Error::Io(ref msg) => write!(f, "IO error: {}", msg),
}
}
}
impl From<ethkey::Error> for Error {
fn from(err: ethkey::Error) -> Self {
Error::EthKey(err.into())
}
}
impl From<kvdb::Error> for Error {
fn from(err: kvdb::Error) -> Self {
Error::Database(err.to_string())
}
}
impl From<crypto::Error> for Error {
fn from(err: crypto::Error) -> Self {
Error::EthKey(err.into())
}
}
impl From<IoError> for Error {
fn from(err: IoError) -> Self {
Error::Io(err.to_string())
}
}
impl Into<String> for Error {
fn into(self) -> String {
format!("{}", self)
}
}
impl From<net::AddrParseError> for Error {
fn from(err: net::AddrParseError) -> Error {
Error::Internal(err.to_string())
}
}

View File

@ -16,4 +16,8 @@
//! Types used in the public api
pub mod all;
mod all;
mod error;
pub use self::all::*;
pub use self::error::*;