SecretStore: ShareRemove of 'isolated' nodes (#6630)

* SecretStore: ShareRemove from isolated nodes

* SecretStore: ServersSetChange && isolated nodes

* SecretStore: added threshold check + lost file

* SecretStore: remove isolated nodes before other sessions in ServersSetChange

* removed obsolete TODO
This commit is contained in:
Svyatoslav Nikolsky 2017-10-05 23:37:41 +03:00 committed by Nikolay Volf
parent d8094e0629
commit 6431459bcf
9 changed files with 291 additions and 74 deletions

View File

@ -22,7 +22,7 @@ pub mod share_remove_session;
mod sessions_queue;
use key_server_cluster::{SessionId, NodeId, SessionMeta};
use key_server_cluster::{SessionId, NodeId, SessionMeta, Error};
/// Share change session metadata.
#[derive(Debug, Clone)]
@ -37,12 +37,12 @@ pub struct ShareChangeSessionMeta {
impl ShareChangeSessionMeta {
/// Convert to consensus session meta. `all_nodes_set` is the union of `old_nodes_set` && `new_nodes_set`.
pub fn into_consensus_meta(self, all_nodes_set_len: usize) -> SessionMeta {
SessionMeta {
pub fn into_consensus_meta(self, all_nodes_set_len: usize) -> Result<SessionMeta, Error> {
Ok(SessionMeta {
id: self.id,
master_node_id: self.master_node_id,
self_node_id: self.self_node_id,
threshold: all_nodes_set_len - 1,
}
threshold: all_nodes_set_len.checked_sub(1).ok_or(Error::ConsensusUnreachable)?,
})
}
}

View File

@ -69,7 +69,7 @@ pub struct SessionImpl {
}
/// Session state.
#[derive(PartialEq)]
#[derive(Debug, PartialEq)]
enum SessionState {
/// Establishing consensus.
EstablishingConsensus,
@ -205,7 +205,7 @@ impl SessionImpl {
}
let mut consensus_session = ConsensusSession::new(ConsensusSessionParams {
meta: self.core.meta.clone().into_consensus_meta(self.core.all_nodes_set.len()),
meta: self.core.meta.clone().into_consensus_meta(self.core.all_nodes_set.len())?,
consensus_executor: ServersSetChangeAccessJob::new_on_master(self.core.admin_public.clone(),
self.core.all_nodes_set.clone(),
self.core.all_nodes_set.clone(),
@ -277,7 +277,7 @@ impl SessionImpl {
match &message.message {
&ConsensusMessageWithServersSet::InitializeConsensusSession(_) => {
data.consensus_session = Some(ConsensusSession::new(ConsensusSessionParams {
meta: self.core.meta.clone().into_consensus_meta(self.core.all_nodes_set.len()),
meta: self.core.meta.clone().into_consensus_meta(self.core.all_nodes_set.len())?,
consensus_executor: ServersSetChangeAccessJob::new_on_slave(self.core.admin_public.clone(),
self.core.all_nodes_set.clone(),
),
@ -395,6 +395,7 @@ impl SessionImpl {
true => return Err(Error::InvalidMessage),
false => {
let master_plan = ShareChangeSessionPlan {
isolated_nodes: message.isolated_nodes.iter().cloned().map(Into::into).collect(),
nodes_to_add: message.shares_to_add.iter().map(|(k, v)| (k.clone().into(), v.clone().into())).collect(),
nodes_to_move: message.shares_to_move.iter().map(|(k, v)| (k.clone().into(), v.clone().into())).collect(),
nodes_to_remove: message.shares_to_remove.iter().cloned().map(Into::into).collect(),
@ -409,8 +410,9 @@ impl SessionImpl {
if let Ok(key_share) = self.core.key_storage.get(&key_id) {
let new_nodes_set = data.new_nodes_set.as_ref()
.expect("new_nodes_set is filled during consensus establishing; change sessions are running after this; qed");
let local_plan = prepare_share_change_session_plan(&key_share.id_numbers.keys().cloned().collect(), new_nodes_set)?;
if local_plan.nodes_to_add.keys().any(|n| !local_plan.nodes_to_add.contains_key(n))
let local_plan = prepare_share_change_session_plan(&self.core.all_nodes_set, &key_share.id_numbers.keys().cloned().collect(), new_nodes_set)?;
if local_plan.isolated_nodes != master_plan.isolated_nodes
|| local_plan.nodes_to_add.keys().any(|n| !local_plan.nodes_to_add.contains_key(n))
|| local_plan.nodes_to_add.keys().any(|n| !master_plan.nodes_to_add.contains_key(n))
|| local_plan.nodes_to_move != master_plan.nodes_to_move
|| local_plan.nodes_to_remove != master_plan.nodes_to_remove {
@ -418,10 +420,13 @@ impl SessionImpl {
}
}
data.active_key_sessions.insert(key_id.clone(), Self::create_share_change_session(&self.core, key_id,
let session = Self::create_share_change_session(&self.core, key_id,
message.master_node_id.clone().into(),
message.old_shares_set.iter().cloned().map(Into::into).collect(),
master_plan)?);
master_plan)?;
if !session.is_finished() {
data.active_key_sessions.insert(key_id.clone(), session);
}
},
};
@ -475,8 +480,17 @@ impl SessionImpl {
})));
}
let key_session = data.active_key_sessions.get_mut(&key_id).ok_or(Error::InvalidMessage)?;
key_session.initialize()
// initialize share change session
{
let key_session = data.active_key_sessions.get_mut(&key_id).ok_or(Error::InvalidMessage)?;
key_session.initialize()?;
if !key_session.is_finished() {
return Ok(());
}
}
// complete key session
Self::complete_key_session(&self.core, &mut *data, true, key_id)
}
/// When sessions execution is delegated to this node.
@ -608,19 +622,7 @@ impl SessionImpl {
};
if is_finished {
data.active_key_sessions.remove(&session_id);
let is_general_master = self.core.meta.self_node_id == self.core.meta.master_node_id;
if is_master && !is_general_master {
Self::return_delegated_session(&self.core, &session_id)?;
}
if is_general_master {
Self::disseminate_session_initialization_requests(&self.core, &mut *data)?;
}
if data.result.is_some() && data.active_key_sessions.len() == 0 {
data.state = SessionState::Finished;
self.core.completed.notify_all();
}
Self::complete_key_session(&self.core, &mut *data, is_master, session_id)?;
}
Ok(())
@ -639,6 +641,7 @@ impl SessionImpl {
cluster: core.cluster.clone(),
key_storage: core.key_storage.clone(),
old_nodes_set: old_nodes_set,
cluster_nodes_set: core.all_nodes_set.clone(),
plan: session_plan,
})
}
@ -659,7 +662,7 @@ impl SessionImpl {
// prepare session change plan && check if something needs to be changed
let old_nodes_set = queued_session.nodes();
let session_plan = prepare_share_change_session_plan(&old_nodes_set, new_nodes_set)?;
let session_plan = prepare_share_change_session_plan(&core.all_nodes_set, &old_nodes_set, new_nodes_set)?;
if session_plan.is_empty() {
continue;
}
@ -676,6 +679,7 @@ impl SessionImpl {
let mut confirmations: BTreeSet<_> = old_nodes_set.iter().cloned()
.chain(session_plan.nodes_to_add.keys().cloned())
.chain(session_plan.nodes_to_move.keys().cloned())
.filter(|n| core.all_nodes_set.contains(n))
.collect();
let need_create_session = confirmations.remove(&core.meta.self_node_id);
let initialization_message = Message::ServersSetChange(ServersSetChangeMessage::InitializeShareChangeSession(InitializeShareChangeSession {
@ -684,6 +688,7 @@ impl SessionImpl {
key_id: key_id.clone().into(),
master_node_id: session_master.clone().into(),
old_shares_set: old_nodes_set.iter().cloned().map(Into::into).collect(),
isolated_nodes: session_plan.isolated_nodes.iter().cloned().map(Into::into).collect(),
shares_to_add: session_plan.nodes_to_add.iter()
.map(|(n, nid)| (n.clone().into(), nid.clone().into()))
.collect(),
@ -747,6 +752,25 @@ impl SessionImpl {
})))
}
/// Complete key session.
fn complete_key_session(core: &SessionCore, data: &mut SessionData, is_master: bool, session_id: SessionId) -> Result<(), Error> {
data.active_key_sessions.remove(&session_id);
let is_general_master = core.meta.self_node_id == core.meta.master_node_id;
if is_master && !is_general_master {
Self::return_delegated_session(core, &session_id)?;
}
if is_general_master {
Self::disseminate_session_initialization_requests(core, data)?;
}
if data.result.is_some() && data.active_key_sessions.len() == 0 {
data.state = SessionState::Finished;
core.completed.notify_all();
}
Ok(())
}
/// Complete servers set change session.
fn complete_session(core: &SessionCore, data: &mut SessionData) -> Result<(), Error> {
debug_assert_eq!(core.meta.self_node_id, core.meta.master_node_id);
@ -916,7 +940,7 @@ pub mod tests {
}
impl MessageLoop {
pub fn new(gml: GenerationMessageLoop, master_node_id: NodeId, new_nodes_ids: BTreeSet<NodeId>, removed_nodes_ids: BTreeSet<NodeId>) -> Self {
pub fn new(gml: GenerationMessageLoop, master_node_id: NodeId, new_nodes_ids: BTreeSet<NodeId>, removed_nodes_ids: BTreeSet<NodeId>, isolated_nodes_ids: BTreeSet<NodeId>) -> Self {
// generate admin key pair
let admin_key_pair = Random.generate().unwrap();
let admin_public = admin_key_pair.public().clone();
@ -928,12 +952,20 @@ pub mod tests {
.iter()).unwrap();
let original_key_pair = KeyPair::from_secret(original_secret).unwrap();
let mut all_nodes_set: BTreeSet<_> = gml.nodes.keys().cloned().collect();
// all active nodes set
let mut all_nodes_set: BTreeSet<_> = gml.nodes.keys()
.filter(|n| !isolated_nodes_ids.contains(n))
.cloned()
.collect();
// new nodes set includes all old nodes, except nodes being removed + all nodes being added
let new_nodes_set: BTreeSet<NodeId> = all_nodes_set.iter().cloned()
.chain(new_nodes_ids.iter().cloned())
.filter(|n| !removed_nodes_ids.contains(n))
.collect();
all_nodes_set.extend(new_nodes_ids.iter().cloned());
for isolated_node_id in &isolated_nodes_ids {
all_nodes_set.remove(isolated_node_id);
}
let meta = ShareChangeSessionMeta {
self_node_id: master_node_id.clone(),
@ -958,6 +990,12 @@ pub mod tests {
});
let nodes: BTreeMap<_, _> = old_nodes.chain(new_nodes).map(|n| (n.session.core.meta.self_node_id.clone(), n)).collect();
for node in nodes.values() {
for isolated_node_id in &isolated_nodes_ids {
node.cluster.remove_node(isolated_node_id);
}
}
let all_set_signature = sign(admin_key_pair.secret(), &ordered_nodes_hash(&all_nodes_set)).unwrap();
let new_set_signature = sign(admin_key_pair.secret(), &ordered_nodes_hash(&new_nodes_set)).unwrap();
@ -1018,7 +1056,7 @@ pub mod tests {
// insert 1 node so that it becames 2-of-4 session
let nodes_to_add: BTreeSet<_> = (0..1).map(|_| Random.generate().unwrap().public().clone()).collect();
let mut ml = MessageLoop::new(gml, master_node_id, nodes_to_add, BTreeSet::new());
let mut ml = MessageLoop::new(gml, master_node_id, nodes_to_add, BTreeSet::new(), BTreeSet::new());
ml.nodes[&master_node_id].session.initialize(ml.nodes.keys().cloned().collect(), ml.all_set_signature.clone(), ml.new_set_signature.clone()).unwrap();
ml.run();
@ -1041,7 +1079,7 @@ pub mod tests {
// 3) delegated session is returned back to added node
let nodes_to_add: BTreeSet<_> = (0..1).map(|_| Random.generate().unwrap().public().clone()).collect();
let master_node_id = nodes_to_add.iter().cloned().nth(0).unwrap();
let mut ml = MessageLoop::new(gml, master_node_id, nodes_to_add, BTreeSet::new());
let mut ml = MessageLoop::new(gml, master_node_id, nodes_to_add, BTreeSet::new(), BTreeSet::new());
ml.nodes[&master_node_id].session.initialize(ml.nodes.keys().cloned().collect(), ml.all_set_signature.clone(), ml.new_set_signature.clone()).unwrap();
ml.run();
@ -1058,7 +1096,7 @@ pub mod tests {
// remove 1 node && insert 1 node so that one share is moved
let nodes_to_remove: BTreeSet<_> = gml.nodes.keys().cloned().skip(1).take(1).collect();
let nodes_to_add: BTreeSet<_> = (0..1).map(|_| Random.generate().unwrap().public().clone()).collect();
let mut ml = MessageLoop::new(gml, master_node_id, nodes_to_add.clone(), nodes_to_remove.clone());
let mut ml = MessageLoop::new(gml, master_node_id, nodes_to_add.clone(), nodes_to_remove.clone(), BTreeSet::new());
let new_nodes_set = ml.nodes.keys().cloned().filter(|n| !nodes_to_remove.contains(n)).collect();
ml.nodes[&master_node_id].session.initialize(new_nodes_set, ml.all_set_signature.clone(), ml.new_set_signature.clone()).unwrap();
ml.run();
@ -1085,7 +1123,7 @@ pub mod tests {
// remove 1 node so that session becames 2-of-2
let nodes_to_remove: BTreeSet<_> = gml.nodes.keys().cloned().skip(1).take(1).collect();
let new_nodes_set: BTreeSet<_> = gml.nodes.keys().cloned().filter(|n| !nodes_to_remove.contains(&n)).collect();
let mut ml = MessageLoop::new(gml, master_node_id, BTreeSet::new(), nodes_to_remove.clone());
let mut ml = MessageLoop::new(gml, master_node_id, BTreeSet::new(), nodes_to_remove.clone(), BTreeSet::new());
ml.nodes[&master_node_id].session.initialize(new_nodes_set, ml.all_set_signature.clone(), ml.new_set_signature.clone()).unwrap();
ml.run();
@ -1101,4 +1139,30 @@ pub mod tests {
// check that all sessions have finished
assert!(ml.nodes.values().all(|n| n.session.is_finished()));
}
#[test]
fn isolated_node_removed_using_servers_set_change() {
// initial 2-of-3 session
let gml = generate_key(1, generate_nodes_ids(3));
let master_node_id = gml.nodes.keys().cloned().nth(0).unwrap();
// remove 1 node so that session becames 2-of-2
let nodes_to_isolate: BTreeSet<_> = gml.nodes.keys().cloned().skip(1).take(1).collect();
let new_nodes_set: BTreeSet<_> = gml.nodes.keys().cloned().filter(|n| !nodes_to_isolate.contains(&n)).collect();
let mut ml = MessageLoop::new(gml, master_node_id, BTreeSet::new(), BTreeSet::new(), nodes_to_isolate.clone());
ml.nodes[&master_node_id].session.initialize(new_nodes_set, ml.all_set_signature.clone(), ml.new_set_signature.clone()).unwrap();
ml.run();
// try to recover secret for every possible combination of nodes && check that secret is the same
check_secret_is_preserved(ml.original_key_pair.clone(), ml.nodes.iter()
.filter(|&(k, _)| !nodes_to_isolate.contains(k))
.map(|(k, v)| (k.clone(), v.key_storage.clone()))
.collect());
// check that all isolated nodes still OWN key share
assert!(ml.nodes.iter().filter(|&(k, _)| nodes_to_isolate.contains(k)).all(|(_, v)| v.key_storage.get(&SessionId::default()).is_ok()));
// check that all sessions have finished
assert!(ml.nodes.iter().filter(|&(k, _)| !nodes_to_isolate.contains(k)).all(|(_, v)| v.session.is_finished()));
}
}

View File

@ -273,7 +273,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
.map(|(k, v)| (k.clone(), v.clone().expect("new_nodes_map is updated above so that every value is_some; qed")))
.collect());
let mut consensus_session = ConsensusSession::new(ConsensusSessionParams {
meta: self.core.meta.clone().into_consensus_meta(new_nodes_set.len()),
meta: self.core.meta.clone().into_consensus_meta(new_nodes_set.len())?,
consensus_executor: ServersSetChangeAccessJob::new_on_master(admin_public,
old_nodes_set.clone(),
old_nodes_set.clone(),
@ -329,7 +329,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
.map(|ks| ks.id_numbers.keys().cloned().collect())
.unwrap_or_else(|| message.old_nodes_set.clone().into_iter().map(Into::into).collect());
data.consensus_session = Some(ConsensusSession::new(ConsensusSessionParams {
meta: self.core.meta.clone().into_consensus_meta(message.new_nodes_set.len()),
meta: self.core.meta.clone().into_consensus_meta(message.new_nodes_set.len())?,
consensus_executor: ServersSetChangeAccessJob::new_on_slave(admin_public, current_nodes_set),
consensus_transport: self.core.transport.clone(),
})?);

View File

@ -35,9 +35,10 @@ use key_server_cluster::message::{ShareAddMessage, ShareMoveMessage, ShareRemove
use key_server_cluster::admin_sessions::ShareChangeSessionMeta;
/// Single session meta-change session. Brief overview:
/// 1) new shares are added to the session
/// 2) shares are moved between nodes
/// 3) shares are removed from nodes
/// 1) nodes that have been already removed from cluster (isolated nodes) are removed from session
/// 2) new shares are added to the session
/// 3) shares are moved between nodes
/// 4) shares are removed from nodes
pub struct ShareChangeSession {
/// Servers set change session id.
session_id: SessionId,
@ -51,6 +52,8 @@ pub struct ShareChangeSession {
key_storage: Arc<KeyStorage>,
/// Old nodes set.
old_nodes_set: BTreeSet<NodeId>,
/// All cluster nodes set.
cluster_nodes_set: BTreeSet<NodeId>,
/// Nodes to add shares for.
nodes_to_add: Option<BTreeMap<NodeId, Secret>>,
/// Nodes to move shares from/to.
@ -68,7 +71,10 @@ pub struct ShareChangeSession {
}
/// Share change session plan.
#[derive(Debug)]
pub struct ShareChangeSessionPlan {
/// Nodes that are isolated and need to be removed before share addition.
pub isolated_nodes: BTreeSet<NodeId>,
/// Nodes to add shares for.
pub nodes_to_add: BTreeMap<NodeId, Secret>,
/// Nodes to move shares from/to (keys = target nodes, values = source nodes).
@ -89,6 +95,8 @@ pub struct ShareChangeSessionParams {
pub cluster: Arc<Cluster>,
/// Keys storage.
pub key_storage: Arc<KeyStorage>,
/// All cluster nodes set.
pub cluster_nodes_set: BTreeSet<NodeId>,
/// Old nodes set.
pub old_nodes_set: BTreeSet<NodeId>,
/// Session plan.
@ -110,11 +118,19 @@ impl ShareChangeSession {
/// Create new share change session.
pub fn new(params: ShareChangeSessionParams) -> Result<Self, Error> {
// we can't create sessions right now, because key share is read when session is created, but it can change in previous session
let isolated_nodes = if !params.plan.isolated_nodes.is_empty() { Some(params.plan.isolated_nodes) } else { None };
let nodes_to_add = if !params.plan.nodes_to_add.is_empty() { Some(params.plan.nodes_to_add) } else { None };
let nodes_to_remove = if !params.plan.nodes_to_remove.is_empty() { Some(params.plan.nodes_to_remove) } else { None };
let nodes_to_move = if !params.plan.nodes_to_move.is_empty() { Some(params.plan.nodes_to_move) } else { None };
debug_assert!(nodes_to_add.is_some() || nodes_to_move.is_some() || nodes_to_remove.is_some());
debug_assert!(isolated_nodes.is_some() || nodes_to_add.is_some() || nodes_to_move.is_some() || nodes_to_remove.is_some());
// if it is degenerated session (only isolated nodes are removed && no network communication required)
// => remove isolated nodes && finish session
if let Some(isolated_nodes) = isolated_nodes {
Self::remove_isolated_nodes(&params.meta, &params.key_storage, isolated_nodes)?;
}
let is_finished = nodes_to_add.is_none() && nodes_to_remove.is_none() && nodes_to_move.is_none();
Ok(ShareChangeSession {
session_id: params.session_id,
nonce: params.nonce,
@ -122,13 +138,14 @@ impl ShareChangeSession {
cluster: params.cluster,
key_storage: params.key_storage,
old_nodes_set: params.old_nodes_set,
cluster_nodes_set: params.cluster_nodes_set,
nodes_to_add: nodes_to_add,
nodes_to_remove: nodes_to_remove,
nodes_to_move: nodes_to_move,
share_add_session: None,
share_move_session: None,
share_remove_session: None,
is_finished: false,
is_finished: is_finished,
})
}
@ -246,6 +263,7 @@ impl ShareChangeSession {
let share_remove_session = ShareRemoveSessionImpl::new(ShareRemoveSessionParams {
meta: self.meta.clone(),
nonce: self.nonce,
cluster_nodes_set: self.cluster_nodes_set.clone(),
transport: ShareChangeTransport::new(self.session_id, self.nonce, self.cluster.clone()),
key_storage: self.key_storage.clone(),
admin_public: None,
@ -289,6 +307,18 @@ impl ShareChangeSession {
Ok(())
}
/// Remove isolated nodes from key share.
fn remove_isolated_nodes(meta: &ShareChangeSessionMeta, key_storage: &Arc<KeyStorage>, isolated_nodes: BTreeSet<NodeId>) -> Result<(), Error> {
let mut key_share = key_storage.get(&meta.id).map_err(|e| Error::KeyStorage(e.into()))?;
for isolated_node in &isolated_nodes {
key_share.id_numbers.remove(isolated_node);
}
if key_share.id_numbers.len() < key_share.threshold + 1 {
return Err(Error::InvalidNodesConfiguration);
}
key_storage.update(meta.id.clone(), key_share).map_err(|e| Error::KeyStorage(e.into()))
}
}
impl ShareChangeTransport {
@ -353,10 +383,20 @@ impl ShareRemoveSessionTransport for ShareChangeTransport {
}
/// Prepare share change plan for moving from old `session_nodes` to `new_nodes_set`.
pub fn prepare_share_change_session_plan(session_nodes: &BTreeSet<NodeId>, new_nodes_set: &BTreeSet<NodeId>) -> Result<ShareChangeSessionPlan, Error> {
pub fn prepare_share_change_session_plan(cluster_nodes_set: &BTreeSet<NodeId>, session_nodes: &BTreeSet<NodeId>, new_nodes_set: &BTreeSet<NodeId>) -> Result<ShareChangeSessionPlan, Error> {
let mut nodes_to_add: BTreeSet<_> = new_nodes_set.difference(&session_nodes).cloned().collect();
let mut nodes_to_move = BTreeMap::new();
let mut nodes_to_remove: BTreeSet<_> = session_nodes.difference(&new_nodes_set).cloned().collect();
// isolated nodes are the nodes that are not currently in cluster + that are in new nodes set
let isolated_nodes: BTreeSet<_> = session_nodes.difference(&cluster_nodes_set)
.filter(|n| !new_nodes_set.contains(n))
.cloned()
.collect();
// removed nodes are all old session nodes, except nodes that are in new set + except isolated nodes
let mut nodes_to_remove: BTreeSet<_> = session_nodes.difference(&new_nodes_set)
.filter(|n| !isolated_nodes.contains(n))
.cloned()
.collect();
while !nodes_to_remove.is_empty() && !nodes_to_add.is_empty() {
let source_node = nodes_to_remove.iter().cloned().nth(0).expect("nodes_to_remove.is_empty is checked in while condition; qed");
let target_node = nodes_to_add.iter().cloned().nth(0).expect("nodes_to_add.is_empty is checked in while condition; qed");
@ -366,6 +406,7 @@ pub fn prepare_share_change_session_plan(session_nodes: &BTreeSet<NodeId>, new_n
}
Ok(ShareChangeSessionPlan {
isolated_nodes: isolated_nodes,
nodes_to_add: nodes_to_add.into_iter()
.map(|n| math::generate_random_scalar().map(|s| (n, s)))
.collect::<Result<BTreeMap<_, _>, _>>()?,
@ -377,7 +418,8 @@ pub fn prepare_share_change_session_plan(session_nodes: &BTreeSet<NodeId>, new_n
impl ShareChangeSessionPlan {
/// Is empty (nothing-to-do) plan?
pub fn is_empty(&self) -> bool {
self.nodes_to_add.is_empty()
self.isolated_nodes.is_empty()
&& self.nodes_to_add.is_empty()
&& self.nodes_to_move.is_empty()
&& self.nodes_to_remove.is_empty()
}

View File

@ -206,7 +206,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
consensus_transport.set_shares_to_move_reversed(shares_to_move_reversed.clone());
let mut consensus_session = ConsensusSession::new(ConsensusSessionParams {
meta: self.core.meta.clone().into_consensus_meta(all_nodes_set.len()),
meta: self.core.meta.clone().into_consensus_meta(all_nodes_set.len())?,
consensus_executor: ServersSetChangeAccessJob::new_on_master(admin_public,
old_nodes_set.clone(),
old_nodes_set.clone(),
@ -263,7 +263,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
.unwrap_or_else(|| message.old_nodes_set.clone().into_iter().map(Into::into).collect());
let all_nodes_set_len = message.new_nodes_set.keys().chain(message.old_nodes_set.iter()).collect::<BTreeSet<_>>().len();
data.consensus_session = Some(ConsensusSession::new(ConsensusSessionParams {
meta: self.core.meta.clone().into_consensus_meta(all_nodes_set_len),
meta: self.core.meta.clone().into_consensus_meta(all_nodes_set_len)?,
consensus_executor: ServersSetChangeAccessJob::new_on_slave(admin_public, current_nodes_set),
consensus_transport: self.core.transport.clone(),
})?);

View File

@ -58,6 +58,8 @@ struct SessionCore<T: SessionTransport> {
pub nonce: u64,
/// Original key share.
pub key_share: DocumentKeyShare,
/// All known cluster nodes.
pub cluster_nodes_set: BTreeSet<NodeId>,
/// Session transport to communicate to other cluster nodes.
pub transport: T,
/// Key storage.
@ -91,6 +93,8 @@ pub struct SessionParams<T: SessionTransport> {
pub meta: ShareChangeSessionMeta,
/// Session nonce.
pub nonce: u64,
/// All known cluster nodes.
pub cluster_nodes_set: BTreeSet<NodeId>,
/// Session transport to communicate to other cluster nodes.
pub transport: T,
/// Key storage.
@ -129,6 +133,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
meta: params.meta.clone(),
nonce: params.nonce,
key_share: params.key_storage.get(&params.meta.id).map_err(|e| Error::KeyStorage(e.into()))?,
cluster_nodes_set: params.cluster_nodes_set,
transport: params.transport,
key_storage: params.key_storage,
admin_public: params.admin_public,
@ -155,8 +160,19 @@ impl<T> SessionImpl<T> where T: SessionTransport {
check_shares_to_remove(&self.core, &shares_to_remove)?;
data.remove_confirmations_to_receive = Some(shares_to_remove.clone());
let remove_confirmations_to_receive: BTreeSet<NodeId> = shares_to_remove.iter()
.filter(|n| self.core.cluster_nodes_set.contains(n))
.cloned()
.collect();
let need_wait_for_confirmations = !remove_confirmations_to_receive.is_empty();
data.shares_to_remove = Some(shares_to_remove);
data.remove_confirmations_to_receive = Some(remove_confirmations_to_receive);
// on slave nodes it can happen that all nodes being removed are isolated
// => there's no need to wait for confirmations
if !need_wait_for_confirmations {
Self::complete_session(&self.core, &mut *data)?;
}
Ok(())
}
@ -167,6 +183,10 @@ impl<T> SessionImpl<T> where T: SessionTransport {
let mut data = self.data.lock();
// check state
if data.state == SessionState::Finished {
// probably there are isolated nodes && we only remove isolated nodes from session
return Ok(());
}
if data.state != SessionState::ConsensusEstablishing || data.consensus_session.is_some() {
return Err(Error::InvalidStateForRequest);
}
@ -174,28 +194,33 @@ impl<T> SessionImpl<T> where T: SessionTransport {
// if consensus is not yet established => start consensus session
let is_consensus_pre_established = data.shares_to_remove.is_some();
if !is_consensus_pre_established {
// TODO: even if node was lost, it is still required for ShareRemove session to complete.
// It is wrong - if node is not in all_nodes_set, it must be excluded from consensus.
let shares_to_remove = shares_to_remove.ok_or(Error::InvalidMessage)?;
check_shares_to_remove(&self.core, &shares_to_remove)?;
let old_set_signature = old_set_signature.ok_or(Error::InvalidMessage)?;
let new_set_signature = new_set_signature.ok_or(Error::InvalidMessage)?;
let all_nodes_set: BTreeSet<_> = self.core.key_share.id_numbers.keys().cloned().collect();
let new_nodes_set: BTreeSet<_> = all_nodes_set.iter().cloned().filter(|n| !shares_to_remove.contains(&n)).collect();
let old_nodes_set: BTreeSet<_> = self.core.key_share.id_numbers.keys().cloned().collect();
let new_nodes_set: BTreeSet<_> = old_nodes_set.iter().cloned().filter(|n| !shares_to_remove.contains(&n)).collect();
let mut active_nodes_set = old_nodes_set.clone();
let admin_public = self.core.admin_public.clone().ok_or(Error::InvalidMessage)?;
// if some session nodes were removed from cluster (we treat this as a failure, or as a 'improper' removal)
// => do not require these nodes to be connected
for isolated_node in old_nodes_set.difference(&self.core.cluster_nodes_set) {
active_nodes_set.remove(&isolated_node);
}
let mut consensus_session = ConsensusSession::new(ConsensusSessionParams {
meta: self.core.meta.clone().into_consensus_meta(all_nodes_set.len()),
meta: self.core.meta.clone().into_consensus_meta(active_nodes_set.len())?,
consensus_executor: ServersSetChangeAccessJob::new_on_master(admin_public,
all_nodes_set.clone(),
all_nodes_set.clone(),
old_nodes_set.clone(),
old_nodes_set,
new_nodes_set,
old_set_signature,
new_set_signature),
consensus_transport: self.core.transport.clone(),
})?;
consensus_session.initialize(all_nodes_set)?;
consensus_session.initialize(active_nodes_set)?;
data.consensus_session = Some(consensus_session);
data.remove_confirmations_to_receive = Some(shares_to_remove.clone());
data.shares_to_remove = Some(shares_to_remove);
@ -237,7 +262,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
let admin_public = self.core.admin_public.clone().ok_or(Error::InvalidMessage)?;
let current_nodes_set = self.core.key_share.id_numbers.keys().cloned().collect();
data.consensus_session = Some(ConsensusSession::new(ConsensusSessionParams {
meta: self.core.meta.clone().into_consensus_meta(message.old_nodes_set.len()),
meta: self.core.meta.clone().into_consensus_meta(message.old_nodes_set.len())?,
consensus_executor: ServersSetChangeAccessJob::new_on_slave(admin_public, current_nodes_set),
consensus_transport: self.core.transport.clone(),
})?);
@ -360,9 +385,13 @@ impl<T> SessionImpl<T> where T: SessionTransport {
{
let shares_to_remove = data.shares_to_remove.as_ref()
.expect("shares_to_remove is filled when consensus is established; on_consensus_established is called after consensus is established; qed");
if !shares_to_remove.contains(&core.meta.self_node_id) {
let remove_confirmations_to_receive: BTreeSet<_> = shares_to_remove.iter()
.filter(|n| core.cluster_nodes_set.contains(n))
.cloned()
.collect();
if !shares_to_remove.contains(&core.meta.self_node_id) && !remove_confirmations_to_receive.is_empty() {
// remember remove confirmations to receive
data.remove_confirmations_to_receive = Some(shares_to_remove.iter().cloned().collect());
data.remove_confirmations_to_receive = Some(remove_confirmations_to_receive);
return Ok(());
}
}
@ -375,7 +404,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
fn disseminate_share_remove_requests(core: &SessionCore<T>, data: &mut SessionData<T>) -> Result<(), Error> {
let shares_to_remove = data.shares_to_remove.as_ref()
.expect("shares_to_remove is filled when consensus is established; disseminate_share_remove_requests is called after consensus is established; qed");
for node in shares_to_remove.iter().filter(|n| **n != core.meta.self_node_id) {
for node in shares_to_remove.iter().filter(|n| **n != core.meta.self_node_id && core.cluster_nodes_set.contains(n)) {
core.transport.send(node, ShareRemoveMessage::ShareRemoveRequest(ShareRemoveRequest {
session: core.meta.id.clone().into(),
session_nonce: core.nonce,
@ -396,7 +425,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
if shares_to_remove.contains(&core.meta.self_node_id) {
// send confirmation to all other nodes
let new_nodes_set = core.key_share.id_numbers.keys().filter(|n| !shares_to_remove.contains(n)).collect::<Vec<_>>();
for node in new_nodes_set.into_iter().filter(|n| **n != core.meta.self_node_id) {
for node in new_nodes_set.into_iter().filter(|n| **n != core.meta.self_node_id && core.cluster_nodes_set.contains(n)) {
core.transport.send(&node, ShareRemoveMessage::ShareRemoveConfirm(ShareRemoveConfirm {
session: core.meta.id.clone().into(),
session_nonce: core.nonce,
@ -527,7 +556,6 @@ mod tests {
use std::collections::{VecDeque, BTreeMap, BTreeSet};
use ethkey::{Random, Generator, Public, Signature, KeyPair, sign};
use key_server_cluster::{NodeId, SessionId, Error, KeyStorage, DummyKeyStorage};
use key_server_cluster::cluster::Cluster;
use key_server_cluster::cluster_sessions::ClusterSession;
use key_server_cluster::cluster::tests::DummyCluster;
use key_server_cluster::generation_session::tests::{Node as GenerationNode, generate_nodes_ids};
@ -556,7 +584,7 @@ mod tests {
pub queue: VecDeque<(NodeId, NodeId, Message)>,
}
fn create_session(mut meta: ShareChangeSessionMeta, admin_public: Public, self_node_id: NodeId, cluster: Arc<Cluster>, key_storage: Arc<KeyStorage>) -> SessionImpl<IsolatedSessionTransport> {
fn create_session(mut meta: ShareChangeSessionMeta, admin_public: Public, self_node_id: NodeId, cluster: Arc<DummyCluster>, key_storage: Arc<KeyStorage>, all_cluster_nodes: BTreeSet<NodeId>) -> SessionImpl<IsolatedSessionTransport> {
let session_id = meta.id.clone();
meta.self_node_id = self_node_id;
SessionImpl::new(SessionParams {
@ -564,15 +592,16 @@ mod tests {
transport: IsolatedSessionTransport::new(session_id, 1, cluster),
key_storage: key_storage,
admin_public: Some(admin_public),
cluster_nodes_set: all_cluster_nodes,
nonce: 1,
}).unwrap()
}
fn create_node(meta: ShareChangeSessionMeta, admin_public: Public, node: GenerationNode) -> Node {
fn create_node(meta: ShareChangeSessionMeta, admin_public: Public, node: GenerationNode, all_nodes_set: BTreeSet<NodeId>) -> Node {
Node {
cluster: node.cluster.clone(),
key_storage: node.key_storage.clone(),
session: create_session(meta, admin_public, node.session.node().clone(), node.cluster, node.key_storage),
session: create_session(meta, admin_public, node.session.node().clone(), node.cluster, node.key_storage, all_nodes_set),
}
}
@ -600,7 +629,7 @@ mod tests {
.filter(|n| !shares_to_remove.contains(n))
.cloned()
.collect();
let nodes = gml.nodes.into_iter().map(|gn| create_node(meta.clone(), admin_public.clone(), gn.1));
let nodes = gml.nodes.into_iter().map(|gn| create_node(meta.clone(), admin_public.clone(), gn.1, old_nodes_set.clone()));
let nodes = nodes.map(|n| (n.session.core.meta.self_node_id.clone(), n)).collect();
let old_set_signature = sign(admin_key_pair.secret(), &ordered_nodes_hash(&old_nodes_set)).unwrap();
@ -687,7 +716,7 @@ mod tests {
let t = 1;
let test_cases = vec![(3, 1), (5, 3)];
for (n, nodes_to_remove) in test_cases {
// generate key && prepare ShareMove sessions
// generate key && prepare ShareRemove sessions
let old_nodes_set = generate_nodes_ids(n);
let master_node_id = old_nodes_set.iter().cloned().nth(0).unwrap();
let nodes_to_remove: BTreeSet<_> = old_nodes_set.iter().cloned().take(nodes_to_remove).collect();
@ -715,7 +744,7 @@ mod tests {
let t = 1;
let test_cases = vec![(3, 1), (5, 3)];
for (n, nodes_to_remove) in test_cases {
// generate key && prepare ShareMove sessions
// generate key && prepare ShareRemove sessions
let old_nodes_set = generate_nodes_ids(n);
let master_node_id = old_nodes_set.iter().cloned().nth(0).unwrap();
let nodes_to_remove: BTreeSet<_> = old_nodes_set.iter().cloned().skip(1).take(nodes_to_remove).collect();
@ -737,4 +766,63 @@ mod tests {
.collect());
}
}
#[test]
fn nodes_are_removed_even_if_some_other_nodes_are_isolated_from_cluster() {
let t = 1;
let (n, nodes_to_remove, nodes_to_isolate) = (5, 1, 2);
// generate key && prepare ShareRemove sessions
let old_nodes_set = generate_nodes_ids(n);
let master_node_id = old_nodes_set.iter().cloned().nth(0).unwrap();
let nodes_to_remove: BTreeSet<_> = old_nodes_set.iter().cloned().skip(1).take(nodes_to_remove).collect();
let nodes_to_isolate: BTreeSet<_> = old_nodes_set.iter().cloned().skip(1 + nodes_to_remove.len()).take(nodes_to_isolate).collect();
let mut ml = MessageLoop::new(t, master_node_id.clone(), old_nodes_set, nodes_to_remove.clone());
// simulate node failure - isolate nodes (it is removed from cluster completely, but it is still a part of session)
for node_to_isolate in &nodes_to_isolate {
ml.nodes.remove(node_to_isolate);
}
for node in ml.nodes.values_mut() {
for node_to_isolate in &nodes_to_isolate {
node.session.core.cluster_nodes_set.remove(node_to_isolate);
node.cluster.remove_node(node_to_isolate);
}
}
// initialize session on master node && run to completion
ml.nodes[&master_node_id].session.initialize(Some(nodes_to_remove.clone()),
Some(ml.old_set_signature.clone()),
Some(ml.new_set_signature.clone())).unwrap();
ml.run();
}
#[test]
fn nodes_are_removed_even_if_isolated_from_cluster() {
let t = 1;
let (n, nodes_to_isolate_and_remove) = (5, 3);
// generate key && prepare ShareRemove sessions
let old_nodes_set = generate_nodes_ids(n);
let master_node_id = old_nodes_set.iter().cloned().nth(0).unwrap();
let nodes_to_remove: BTreeSet<_> = old_nodes_set.iter().cloned().skip(1).take(nodes_to_isolate_and_remove).collect();
let mut ml = MessageLoop::new(t, master_node_id.clone(), old_nodes_set, nodes_to_remove.clone());
// simulate node failure - isolate nodes (it is removed from cluster completely, but it is still a part of session)
for node_to_isolate in &nodes_to_remove {
ml.nodes.remove(node_to_isolate);
}
for node in ml.nodes.values_mut() {
for node_to_isolate in &nodes_to_remove {
node.session.core.cluster_nodes_set.remove(node_to_isolate);
node.cluster.remove_node(node_to_isolate);
}
}
// initialize session on master node && run to completion
ml.nodes[&master_node_id].session.initialize(Some(nodes_to_remove.clone()),
Some(ml.old_set_signature.clone()),
Some(ml.new_set_signature.clone())).unwrap();
ml.run();
}
}

View File

@ -1008,11 +1008,13 @@ impl ClusterCore {
ConsensusMessageWithServersSet::InitializeConsensusSession(_) => true,
_ => false,
} => {
let mut all_cluster_nodes = data.connections.all_nodes();
all_cluster_nodes.insert(data.self_key_pair.public().clone());
let mut connected_nodes = data.connections.connected_nodes();
connected_nodes.insert(data.self_key_pair.public().clone());
let cluster = Arc::new(ClusterView::new(data.clone(), connected_nodes));
match data.sessions.new_share_remove_session(sender.clone(), session_id.clone(), Some(session_nonce), cluster) {
match data.sessions.new_share_remove_session(sender.clone(), session_id.clone(), Some(session_nonce), cluster, all_cluster_nodes) {
Ok(session) => Ok(session),
Err(err) => {
// this is new session => it is not yet in container
@ -1149,6 +1151,10 @@ impl ClusterConnections {
}
}
pub fn all_nodes(&self) -> BTreeSet<NodeId> {
self.data.read().nodes.keys().cloned().collect()
}
pub fn connected_nodes(&self) -> BTreeSet<NodeId> {
self.data.read().connections.keys().cloned().collect()
}
@ -1413,11 +1419,13 @@ impl ClusterClient for ClusterClientImpl {
}
fn new_share_remove_session(&self, session_id: SessionId, new_nodes_set: BTreeSet<NodeId>, old_set_signature: Signature, new_set_signature: Signature) -> Result<Arc<AdminSessionWrapper>, Error> {
let mut all_cluster_nodes = self.data.connections.all_nodes();
all_cluster_nodes.insert(self.data.self_key_pair.public().clone());
let mut connected_nodes = self.data.connections.connected_nodes();
connected_nodes.insert(self.data.self_key_pair.public().clone());
let cluster = Arc::new(ClusterView::new(self.data.clone(), connected_nodes));
let session = self.data.sessions.new_share_remove_session(self.data.self_key_pair.public().clone(), session_id, None, cluster)?;
let session = self.data.sessions.new_share_remove_session(self.data.self_key_pair.public().clone(), session_id, None, cluster, all_cluster_nodes)?;
session.as_share_remove()
.expect("created 1 line above; qed")
.initialize(Some(new_nodes_set), Some(old_set_signature), Some(new_set_signature))?;
@ -1425,11 +1433,11 @@ impl ClusterClient for ClusterClientImpl {
}
fn new_servers_set_change_session(&self, session_id: Option<SessionId>, new_nodes_set: BTreeSet<NodeId>, old_set_signature: Signature, new_set_signature: Signature) -> Result<Arc<AdminSessionWrapper>, Error> {
let mut connected_nodes = self.data.connections.connected_nodes();
connected_nodes.insert(self.data.self_key_pair.public().clone());
let mut all_cluster_nodes = self.data.connections.all_nodes();
all_cluster_nodes.insert(self.data.self_key_pair.public().clone());
let cluster = Arc::new(ClusterView::new(self.data.clone(), connected_nodes.clone()));
let session = self.data.sessions.new_servers_set_change_session(self.data.self_key_pair.public().clone(), session_id, None, cluster, connected_nodes)?;
let cluster = Arc::new(ClusterView::new(self.data.clone(), all_cluster_nodes.clone()));
let session = self.data.sessions.new_servers_set_change_session(self.data.self_key_pair.public().clone(), session_id, None, cluster, all_cluster_nodes)?;
let session_id = {
let servers_set_change_session = session.as_servers_set_change().expect("created 1 line above; qed");
servers_set_change_session.initialize(new_nodes_set, old_set_signature, new_set_signature)?;
@ -1501,6 +1509,12 @@ pub mod tests {
self.data.lock().nodes.push(node);
}
pub fn remove_node(&self, node: &NodeId) {
let mut data = self.data.lock();
let position = data.nodes.iter().position(|n| n == node).unwrap();
data.nodes.remove(position);
}
pub fn take_message(&self) -> Option<(NodeId, Message)> {
self.data.lock().messages.pop_front()
}

View File

@ -433,7 +433,7 @@ impl ClusterSessions {
}
/// Create new share remove session.
pub fn new_share_remove_session(&self, master: NodeId, session_id: SessionId, nonce: Option<u64>, cluster: Arc<Cluster>) -> Result<Arc<AdminSession>, Error> {
pub fn new_share_remove_session(&self, master: NodeId, session_id: SessionId, nonce: Option<u64>, cluster: Arc<Cluster>, all_nodes_set: BTreeSet<NodeId>) -> Result<Arc<AdminSession>, Error> {
let nonce = self.check_session_nonce(&master, nonce)?;
let admin_public = self.admin_public.clone().ok_or(Error::AccessDenied)?;
@ -443,6 +443,7 @@ impl ClusterSessions {
self_node_id: self.self_node_id.clone(),
master_node_id: master,
},
cluster_nodes_set: all_nodes_set,
transport: ShareRemoveTransport::new(session_id.clone(), nonce, cluster),
key_storage: self.key_storage.clone(),
admin_public: Some(admin_public),
@ -464,6 +465,12 @@ impl ClusterSessions {
/// Create new servers set change session.
pub fn new_servers_set_change_session(&self, master: NodeId, session_id: Option<SessionId>, nonce: Option<u64>, cluster: Arc<Cluster>, all_nodes_set: BTreeSet<NodeId>) -> Result<Arc<AdminSession>, Error> {
// communicating to all other nodes is crucial for ServersSetChange session
// => check that we have connections to all cluster nodes
if self.nodes.iter().any(|n| !cluster.is_connected(n)) {
return Err(Error::NodeDisconnected);
}
let session_id = match session_id {
Some(session_id) => if session_id == *SERVERS_SET_CHANGE_SESSION_ID {
session_id

View File

@ -632,6 +632,8 @@ pub struct InitializeShareChangeSession {
pub master_node_id: MessageNodeId,
/// Old nodes set.
pub old_shares_set: BTreeSet<MessageNodeId>,
/// Isolated nodes.
pub isolated_nodes: BTreeSet<MessageNodeId>,
/// Shares to add. Values are filled for new nodes only.
pub shares_to_add: BTreeMap<MessageNodeId, SerializableSecret>,
/// Shares to move.