update_nodes_set in maintain

This commit is contained in:
Svyatoslav Nikolsky 2017-07-19 12:36:40 +03:00
parent 81de7e1075
commit 5080cc3c9e
2 changed files with 52 additions and 34 deletions

View File

@ -158,9 +158,17 @@ pub struct ClusterConnections {
/// Self node id. /// Self node id.
pub self_node_id: NodeId, pub self_node_id: NodeId,
/// All known other key servers. /// All known other key servers.
pub nodes: BTreeMap<NodeId, SocketAddr>, pub key_server_set: Arc<KeyServerSet>,
/// Connections data.
pub data: RwLock<ClusterConnectionsData>,
}
/// Cluster connections data.
pub struct ClusterConnectionsData {
/// Active key servers set.
pub nodes: BTreeMap<Public, SocketAddr>,
/// Active connections to key servers. /// Active connections to key servers.
pub connections: RwLock<BTreeMap<NodeId, Arc<Connection>>>, pub connections: BTreeMap<NodeId, Arc<Connection>>,
} }
/// Cluster view core. /// Cluster view core.
@ -354,6 +362,7 @@ impl ClusterCore {
/// Try to connect to every disconnected node. /// Try to connect to every disconnected node.
fn connect_disconnected_nodes(data: Arc<ClusterData>) { fn connect_disconnected_nodes(data: Arc<ClusterData>) {
data.connections.update_nodes_set();
for (node_id, node_address) in data.connections.disconnected_nodes() { for (node_id, node_address) in data.connections.disconnected_nodes() {
if data.config.allow_connecting_to_higher_nodes || data.self_key_pair.public() < &node_id { if data.config.allow_connecting_to_higher_nodes || data.self_key_pair.public() < &node_id {
ClusterCore::connect(data.clone(), node_address); ClusterCore::connect(data.clone(), node_address);
@ -665,34 +674,29 @@ impl ClusterCore {
impl ClusterConnections { impl ClusterConnections {
pub fn new(config: &ClusterConfiguration) -> Result<Self, Error> { pub fn new(config: &ClusterConfiguration) -> Result<Self, Error> {
let mut connections = ClusterConnections { Ok(ClusterConnections {
self_node_id: config.self_key_pair.public().clone(), self_node_id: config.self_key_pair.public().clone(),
nodes: BTreeMap::new(), key_server_set: config.key_server_set.clone(),
connections: RwLock::new(BTreeMap::new()), data: RwLock::new(ClusterConnectionsData {
}; nodes: config.key_server_set.get(),
connections: BTreeMap::new(),
let nodes = config.key_server_set.get(); }),
for (node_id, socket_address) in nodes.iter().filter(|&(node_id, _)| node_id != config.self_key_pair.public()) { })
//let socket_address = make_socket_address(&node_addr, node_port)?;
connections.nodes.insert(node_id.clone(), socket_address.clone());
}
Ok(connections)
} }
pub fn cluster_state(&self) -> ClusterState { pub fn cluster_state(&self) -> ClusterState {
ClusterState { ClusterState {
connected: self.connections.read().keys().cloned().collect(), connected: self.data.read().connections.keys().cloned().collect(),
} }
} }
pub fn get(&self, node: &NodeId) -> Option<Arc<Connection>> { pub fn get(&self, node: &NodeId) -> Option<Arc<Connection>> {
self.connections.read().get(node).cloned() self.data.read().connections.get(node).cloned()
} }
pub fn insert(&self, connection: Arc<Connection>) -> bool { pub fn insert(&self, connection: Arc<Connection>) -> bool {
let mut connections = self.connections.write(); let mut data = self.data.write();
if connections.contains_key(connection.node_id()) { if data.connections.contains_key(connection.node_id()) {
// we have already connected to the same node // we have already connected to the same node
// the agreement is that node with lower id must establish connection to node with higher id // the agreement is that node with lower id must establish connection to node with higher id
if (&self.self_node_id < connection.node_id() && connection.is_inbound()) if (&self.self_node_id < connection.node_id() && connection.is_inbound())
@ -702,13 +706,13 @@ impl ClusterConnections {
} }
trace!(target: "secretstore_net", "{}: inserting connection to {} at {}", self.self_node_id, connection.node_id(), connection.node_address()); trace!(target: "secretstore_net", "{}: inserting connection to {} at {}", self.self_node_id, connection.node_id(), connection.node_address());
connections.insert(connection.node_id().clone(), connection); data.connections.insert(connection.node_id().clone(), connection);
true true
} }
pub fn remove(&self, node: &NodeId, is_inbound: bool) { pub fn remove(&self, node: &NodeId, is_inbound: bool) {
let mut connections = self.connections.write(); let mut data = self.data.write();
if let Entry::Occupied(entry) = connections.entry(node.clone()) { if let Entry::Occupied(entry) = data.connections.entry(node.clone()) {
if entry.get().is_inbound() != is_inbound { if entry.get().is_inbound() != is_inbound {
return; return;
} }
@ -719,20 +723,34 @@ impl ClusterConnections {
} }
pub fn connected_nodes(&self) -> BTreeSet<NodeId> { pub fn connected_nodes(&self) -> BTreeSet<NodeId> {
self.connections.read().keys().cloned().collect() self.data.read().connections.keys().cloned().collect()
} }
pub fn active_connections(&self)-> Vec<Arc<Connection>> { pub fn active_connections(&self)-> Vec<Arc<Connection>> {
self.connections.read().values().cloned().collect() self.data.read().connections.values().cloned().collect()
} }
pub fn disconnected_nodes(&self) -> BTreeMap<NodeId, SocketAddr> { pub fn disconnected_nodes(&self) -> BTreeMap<NodeId, SocketAddr> {
let connections = self.connections.read(); let data = self.data.read();
self.nodes.iter() data.nodes.iter()
.filter(|&(node_id, _)| !connections.contains_key(node_id)) .filter(|&(node_id, _)| !data.connections.contains_key(node_id))
.map(|(node_id, node_address)| (node_id.clone(), node_address.clone())) .map(|(node_id, node_address)| (node_id.clone(), node_address.clone()))
.collect() .collect()
} }
pub fn update_nodes_set(&self) {
let mut data = self.data.write();
let new_nodes = self.key_server_set.get();
for obsolete_node in data.nodes.keys().cloned().collect::<Vec<_>>() {
if !new_nodes.contains_key(&obsolete_node) {
data.nodes.remove(&obsolete_node);
data.connections.remove(&obsolete_node);
}
}
for (new_node_public, new_node_addr) in new_nodes {
data.nodes.insert(new_node_public, new_node_addr);
}
}
} }
impl ClusterData { impl ClusterData {

View File

@ -16,7 +16,7 @@
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::collections::HashMap; use std::collections::BTreeMap;
use futures::{future, Future}; use futures::{future, Future};
use parking_lot::Mutex; use parking_lot::Mutex;
use ethcore::filter::Filter; use ethcore::filter::Filter;
@ -39,7 +39,7 @@ lazy_static! {
/// Key Server set /// Key Server set
pub trait KeyServerSet: Send + Sync { pub trait KeyServerSet: Send + Sync {
/// Get set of configured key servers /// Get set of configured key servers
fn get(&self) -> HashMap<Public, SocketAddr>; fn get(&self) -> BTreeMap<Public, SocketAddr>;
} }
/// On-chain Key Server set implementation. /// On-chain Key Server set implementation.
@ -55,11 +55,11 @@ struct CachedContract {
/// Contract address. /// Contract address.
contract_addr: Option<Address>, contract_addr: Option<Address>,
/// Active set of key servers. /// Active set of key servers.
key_servers: HashMap<Public, SocketAddr>, key_servers: BTreeMap<Public, SocketAddr>,
} }
impl OnChainKeyServerSet { impl OnChainKeyServerSet {
pub fn new(client: &Arc<Client>, key_servers: HashMap<Public, SocketAddr>) -> Arc<Self> { pub fn new(client: &Arc<Client>, key_servers: BTreeMap<Public, SocketAddr>) -> Arc<Self> {
let key_server_set = Arc::new(OnChainKeyServerSet { let key_server_set = Arc::new(OnChainKeyServerSet {
contract: Mutex::new(CachedContract::new(client, key_servers)), contract: Mutex::new(CachedContract::new(client, key_servers)),
}); });
@ -69,7 +69,7 @@ impl OnChainKeyServerSet {
} }
impl KeyServerSet for OnChainKeyServerSet { impl KeyServerSet for OnChainKeyServerSet {
fn get(&self) -> HashMap<Public, SocketAddr> { fn get(&self) -> BTreeMap<Public, SocketAddr> {
self.contract.lock().get() self.contract.lock().get()
} }
} }
@ -81,7 +81,7 @@ impl ChainNotify for OnChainKeyServerSet {
} }
impl CachedContract { impl CachedContract {
pub fn new(client: &Arc<Client>, key_servers: HashMap<Public, SocketAddr>) -> Self { pub fn new(client: &Arc<Client>, key_servers: BTreeMap<Public, SocketAddr>) -> Self {
CachedContract { CachedContract {
client: Arc::downgrade(client), client: Arc::downgrade(client),
contract_addr: None, contract_addr: None,
@ -102,7 +102,7 @@ println!("=== Installing contract from address: {:?}", new_contract_addr);
KeyServerSetContract::new(contract_addr) KeyServerSetContract::new(contract_addr)
}) })
.map(|contract| { .map(|contract| {
let mut key_servers = HashMap::new(); let mut key_servers = BTreeMap::new();
let do_call = |a, d| future::done(self.client.upgrade().ok_or("Calling contract without client".into()).and_then(|c| c.call_contract(BlockId::Latest, a, d))); let do_call = |a, d| future::done(self.client.upgrade().ok_or("Calling contract without client".into()).and_then(|c| c.call_contract(BlockId::Latest, a, d)));
let key_servers_list = contract.get_key_servers(do_call).wait() let key_servers_list = contract.get_key_servers(do_call).wait()
.map_err(|err| { trace!(target: "secretstore", "Error {} reading list of key servers from contract", err); err }) .map_err(|err| { trace!(target: "secretstore", "Error {} reading list of key servers from contract", err); err })
@ -139,7 +139,7 @@ println!("=== Installing contract from address: {:?}", new_contract_addr);
} }
} }
fn get(&self) -> HashMap<Public, SocketAddr> { fn get(&self) -> BTreeMap<Public, SocketAddr> {
self.key_servers.clone() self.key_servers.clone()
} }
} }