diff --git a/secret_store/src/key_server_cluster/cluster.rs b/secret_store/src/key_server_cluster/cluster.rs index b929e835d..3f381b6f0 100644 --- a/secret_store/src/key_server_cluster/cluster.rs +++ b/secret_store/src/key_server_cluster/cluster.rs @@ -158,9 +158,17 @@ pub struct ClusterConnections { /// Self node id. pub self_node_id: NodeId, /// All known other key servers. - pub nodes: BTreeMap, + pub key_server_set: Arc, + /// Connections data. + pub data: RwLock, +} + +/// Cluster connections data. +pub struct ClusterConnectionsData { + /// Active key servers set. + pub nodes: BTreeMap, /// Active connections to key servers. - pub connections: RwLock>>, + pub connections: BTreeMap>, } /// Cluster view core. @@ -354,6 +362,7 @@ impl ClusterCore { /// Try to connect to every disconnected node. fn connect_disconnected_nodes(data: Arc) { + data.connections.update_nodes_set(); for (node_id, node_address) in data.connections.disconnected_nodes() { if data.config.allow_connecting_to_higher_nodes || data.self_key_pair.public() < &node_id { ClusterCore::connect(data.clone(), node_address); @@ -665,34 +674,29 @@ impl ClusterCore { impl ClusterConnections { pub fn new(config: &ClusterConfiguration) -> Result { - let mut connections = ClusterConnections { + Ok(ClusterConnections { self_node_id: config.self_key_pair.public().clone(), - nodes: BTreeMap::new(), - connections: RwLock::new(BTreeMap::new()), - }; - - let nodes = config.key_server_set.get(); - for (node_id, socket_address) in nodes.iter().filter(|&(node_id, _)| node_id != config.self_key_pair.public()) { - //let socket_address = make_socket_address(&node_addr, node_port)?; - connections.nodes.insert(node_id.clone(), socket_address.clone()); - } - - Ok(connections) + key_server_set: config.key_server_set.clone(), + data: RwLock::new(ClusterConnectionsData { + nodes: config.key_server_set.get(), + connections: BTreeMap::new(), + }), + }) } pub fn cluster_state(&self) -> ClusterState { ClusterState { - connected: self.connections.read().keys().cloned().collect(), + connected: self.data.read().connections.keys().cloned().collect(), } } pub fn get(&self, node: &NodeId) -> Option> { - self.connections.read().get(node).cloned() + self.data.read().connections.get(node).cloned() } pub fn insert(&self, connection: Arc) -> bool { - let mut connections = self.connections.write(); - if connections.contains_key(connection.node_id()) { + let mut data = self.data.write(); + if data.connections.contains_key(connection.node_id()) { // we have already connected to the same node // the agreement is that node with lower id must establish connection to node with higher id if (&self.self_node_id < connection.node_id() && connection.is_inbound()) @@ -702,13 +706,13 @@ impl ClusterConnections { } trace!(target: "secretstore_net", "{}: inserting connection to {} at {}", self.self_node_id, connection.node_id(), connection.node_address()); - connections.insert(connection.node_id().clone(), connection); + data.connections.insert(connection.node_id().clone(), connection); true } pub fn remove(&self, node: &NodeId, is_inbound: bool) { - let mut connections = self.connections.write(); - if let Entry::Occupied(entry) = connections.entry(node.clone()) { + let mut data = self.data.write(); + if let Entry::Occupied(entry) = data.connections.entry(node.clone()) { if entry.get().is_inbound() != is_inbound { return; } @@ -719,20 +723,34 @@ impl ClusterConnections { } pub fn connected_nodes(&self) -> BTreeSet { - self.connections.read().keys().cloned().collect() + self.data.read().connections.keys().cloned().collect() } pub fn active_connections(&self)-> Vec> { - self.connections.read().values().cloned().collect() + self.data.read().connections.values().cloned().collect() } pub fn disconnected_nodes(&self) -> BTreeMap { - let connections = self.connections.read(); - self.nodes.iter() - .filter(|&(node_id, _)| !connections.contains_key(node_id)) + let data = self.data.read(); + data.nodes.iter() + .filter(|&(node_id, _)| !data.connections.contains_key(node_id)) .map(|(node_id, node_address)| (node_id.clone(), node_address.clone())) .collect() } + + pub fn update_nodes_set(&self) { + let mut data = self.data.write(); + let new_nodes = self.key_server_set.get(); + for obsolete_node in data.nodes.keys().cloned().collect::>() { + if !new_nodes.contains_key(&obsolete_node) { + data.nodes.remove(&obsolete_node); + data.connections.remove(&obsolete_node); + } + } + for (new_node_public, new_node_addr) in new_nodes { + data.nodes.insert(new_node_public, new_node_addr); + } + } } impl ClusterData { diff --git a/secret_store/src/key_server_set.rs b/secret_store/src/key_server_set.rs index 22ffbdb07..eecfa3091 100644 --- a/secret_store/src/key_server_set.rs +++ b/secret_store/src/key_server_set.rs @@ -16,7 +16,7 @@ use std::sync::{Arc, Weak}; use std::net::SocketAddr; -use std::collections::HashMap; +use std::collections::BTreeMap; use futures::{future, Future}; use parking_lot::Mutex; use ethcore::filter::Filter; @@ -39,7 +39,7 @@ lazy_static! { /// Key Server set pub trait KeyServerSet: Send + Sync { /// Get set of configured key servers - fn get(&self) -> HashMap; + fn get(&self) -> BTreeMap; } /// On-chain Key Server set implementation. @@ -55,11 +55,11 @@ struct CachedContract { /// Contract address. contract_addr: Option
, /// Active set of key servers. - key_servers: HashMap, + key_servers: BTreeMap, } impl OnChainKeyServerSet { - pub fn new(client: &Arc, key_servers: HashMap) -> Arc { + pub fn new(client: &Arc, key_servers: BTreeMap) -> Arc { let key_server_set = Arc::new(OnChainKeyServerSet { contract: Mutex::new(CachedContract::new(client, key_servers)), }); @@ -69,7 +69,7 @@ impl OnChainKeyServerSet { } impl KeyServerSet for OnChainKeyServerSet { - fn get(&self) -> HashMap { + fn get(&self) -> BTreeMap { self.contract.lock().get() } } @@ -81,7 +81,7 @@ impl ChainNotify for OnChainKeyServerSet { } impl CachedContract { - pub fn new(client: &Arc, key_servers: HashMap) -> Self { + pub fn new(client: &Arc, key_servers: BTreeMap) -> Self { CachedContract { client: Arc::downgrade(client), contract_addr: None, @@ -102,7 +102,7 @@ println!("=== Installing contract from address: {:?}", new_contract_addr); KeyServerSetContract::new(contract_addr) }) .map(|contract| { - let mut key_servers = HashMap::new(); + let mut key_servers = BTreeMap::new(); let do_call = |a, d| future::done(self.client.upgrade().ok_or("Calling contract without client".into()).and_then(|c| c.call_contract(BlockId::Latest, a, d))); let key_servers_list = contract.get_key_servers(do_call).wait() .map_err(|err| { trace!(target: "secretstore", "Error {} reading list of key servers from contract", err); err }) @@ -139,7 +139,7 @@ println!("=== Installing contract from address: {:?}", new_contract_addr); } } - fn get(&self) -> HashMap { + fn get(&self) -> BTreeMap { self.key_servers.clone() } }