do not connect to self
This commit is contained in:
parent
5080cc3c9e
commit
7664ff5acd
@ -674,11 +674,14 @@ impl ClusterCore {
|
|||||||
|
|
||||||
impl ClusterConnections {
|
impl ClusterConnections {
|
||||||
pub fn new(config: &ClusterConfiguration) -> Result<Self, Error> {
|
pub fn new(config: &ClusterConfiguration) -> Result<Self, Error> {
|
||||||
|
let mut nodes = config.key_server_set.get();
|
||||||
|
nodes.remove(config.self_key_pair.public());
|
||||||
|
|
||||||
Ok(ClusterConnections {
|
Ok(ClusterConnections {
|
||||||
self_node_id: config.self_key_pair.public().clone(),
|
self_node_id: config.self_key_pair.public().clone(),
|
||||||
key_server_set: config.key_server_set.clone(),
|
key_server_set: config.key_server_set.clone(),
|
||||||
data: RwLock::new(ClusterConnectionsData {
|
data: RwLock::new(ClusterConnectionsData {
|
||||||
nodes: config.key_server_set.get(),
|
nodes: nodes,
|
||||||
connections: BTreeMap::new(),
|
connections: BTreeMap::new(),
|
||||||
}),
|
}),
|
||||||
})
|
})
|
||||||
@ -740,7 +743,9 @@ impl ClusterConnections {
|
|||||||
|
|
||||||
pub fn update_nodes_set(&self) {
|
pub fn update_nodes_set(&self) {
|
||||||
let mut data = self.data.write();
|
let mut data = self.data.write();
|
||||||
let new_nodes = self.key_server_set.get();
|
let mut new_nodes = self.key_server_set.get();
|
||||||
|
new_nodes.remove(&self.self_node_id);
|
||||||
|
|
||||||
for obsolete_node in data.nodes.keys().cloned().collect::<Vec<_>>() {
|
for obsolete_node in data.nodes.keys().cloned().collect::<Vec<_>>() {
|
||||||
if !new_nodes.contains_key(&obsolete_node) {
|
if !new_nodes.contains_key(&obsolete_node) {
|
||||||
data.nodes.remove(&obsolete_node);
|
data.nodes.remove(&obsolete_node);
|
||||||
|
@ -60,8 +60,15 @@ struct CachedContract {
|
|||||||
|
|
||||||
impl OnChainKeyServerSet {
|
impl OnChainKeyServerSet {
|
||||||
pub fn new(client: &Arc<Client>, key_servers: BTreeMap<Public, SocketAddr>) -> Arc<Self> {
|
pub fn new(client: &Arc<Client>, key_servers: BTreeMap<Public, SocketAddr>) -> Arc<Self> {
|
||||||
|
let mut cached_contract = CachedContract::new(client, key_servers);
|
||||||
|
let key_server_contract_address = client.registry_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.to_owned());
|
||||||
|
// only initialize from contract if it is installed. otherwise - use default nodes
|
||||||
|
if key_server_contract_address.is_some() {
|
||||||
|
cached_contract.read_from_registry(&*client, key_server_contract_address);
|
||||||
|
}
|
||||||
|
|
||||||
let key_server_set = Arc::new(OnChainKeyServerSet {
|
let key_server_set = Arc::new(OnChainKeyServerSet {
|
||||||
contract: Mutex::new(CachedContract::new(client, key_servers)),
|
contract: Mutex::new(cached_contract),
|
||||||
});
|
});
|
||||||
client.add_notify(key_server_set.clone());
|
client.add_notify(key_server_set.clone());
|
||||||
key_server_set
|
key_server_set
|
||||||
@ -76,6 +83,7 @@ impl KeyServerSet for OnChainKeyServerSet {
|
|||||||
|
|
||||||
impl ChainNotify for OnChainKeyServerSet {
|
impl ChainNotify for OnChainKeyServerSet {
|
||||||
fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, enacted: Vec<H256>, retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: u64) {
|
fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, enacted: Vec<H256>, retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: u64) {
|
||||||
|
println!("=== new_blocks: imported {}, invalid: {}, enactd: {}, retracted: {}, sealed: {}, proposed: {}", _imported.len(), _invalid.len(), enacted.len(), retracted.len(), _sealed.len(), _proposed.len());
|
||||||
self.contract.lock().update(enacted, retracted)
|
self.contract.lock().update(enacted, retracted)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -92,36 +100,10 @@ impl CachedContract {
|
|||||||
pub fn update(&mut self, enacted: Vec<H256>, _retracted: Vec<H256>) {
|
pub fn update(&mut self, enacted: Vec<H256>, _retracted: Vec<H256>) {
|
||||||
if let Some(client) = self.client.upgrade() {
|
if let Some(client) = self.client.upgrade() {
|
||||||
let new_contract_addr = client.registry_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.to_owned());
|
let new_contract_addr = client.registry_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.to_owned());
|
||||||
|
println!("=== Registry address = {:?}", new_contract_addr);
|
||||||
// new contract installed
|
// new contract installed
|
||||||
if self.contract_addr.as_ref() != new_contract_addr.as_ref() {
|
if self.contract_addr.as_ref() != new_contract_addr.as_ref() {
|
||||||
println!("=== Installing contract from address: {:?}", new_contract_addr);
|
self.read_from_registry(&*client, new_contract_addr);
|
||||||
self.key_servers = new_contract_addr.map(|contract_addr| {
|
|
||||||
trace!(target: "secretstore", "Configuring for key server set contract from {}", contract_addr);
|
|
||||||
|
|
||||||
KeyServerSetContract::new(contract_addr)
|
|
||||||
})
|
|
||||||
.map(|contract| {
|
|
||||||
let mut key_servers = BTreeMap::new();
|
|
||||||
let do_call = |a, d| future::done(self.client.upgrade().ok_or("Calling contract without client".into()).and_then(|c| c.call_contract(BlockId::Latest, a, d)));
|
|
||||||
let key_servers_list = contract.get_key_servers(do_call).wait()
|
|
||||||
.map_err(|err| { trace!(target: "secretstore", "Error {} reading list of key servers from contract", err); err })
|
|
||||||
.unwrap_or_default();
|
|
||||||
for key_server in key_servers_list {
|
|
||||||
let key_server_public = contract.get_key_server_public(
|
|
||||||
|a, d| future::done(self.client.upgrade().ok_or("Calling contract without client".into()).and_then(|c| c.call_contract(BlockId::Latest, a, d))), key_server).wait()
|
|
||||||
.and_then(|p| if p.len() == 64 { Ok(Public::from_slice(&p)) } else { Err(format!("Invalid public length {}", p.len())) });
|
|
||||||
let key_server_ip = contract.get_key_server_address(
|
|
||||||
|a, d| future::done(self.client.upgrade().ok_or("Calling contract without client".into()).and_then(|c| c.call_contract(BlockId::Latest, a, d))), key_server).wait()
|
|
||||||
.and_then(|a| a.parse().map_err(|e| format!("Invalid ip address: {}", e)));
|
|
||||||
if let (Ok(key_server_public), Ok(key_server_ip)) = (key_server_public, key_server_ip) {
|
|
||||||
key_servers.insert(key_server_public, key_server_ip);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
key_servers
|
|
||||||
})
|
|
||||||
.unwrap_or_default();
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -139,7 +121,39 @@ println!("=== Installing contract from address: {:?}", new_contract_addr);
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get(&self) -> BTreeMap<Public, SocketAddr> {
|
pub fn get(&self) -> BTreeMap<Public, SocketAddr> {
|
||||||
self.key_servers.clone()
|
self.key_servers.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn read_from_registry(&mut self, client: &Client, new_contract_address: Option<Address>) {
|
||||||
|
println!("=== Installing contract from address: {:?}", new_contract_address);
|
||||||
|
self.key_servers = new_contract_address.map(|contract_addr| {
|
||||||
|
trace!(target: "secretstore", "Configuring for key server set contract from {}", contract_addr);
|
||||||
|
|
||||||
|
KeyServerSetContract::new(contract_addr)
|
||||||
|
})
|
||||||
|
.map(|contract| {
|
||||||
|
let mut key_servers = BTreeMap::new();
|
||||||
|
let do_call = |a, d| future::done(client.call_contract(BlockId::Latest, a, d));
|
||||||
|
let key_servers_list = contract.get_key_servers(do_call).wait()
|
||||||
|
.map_err(|err| { trace!(target: "secretstore", "Error {} reading list of key servers from contract", err); err })
|
||||||
|
.unwrap_or_default();
|
||||||
|
for key_server in key_servers_list {
|
||||||
|
let key_server_public = contract.get_key_server_public(
|
||||||
|
|a, d| future::done(client.call_contract(BlockId::Latest, a, d)), key_server).wait()
|
||||||
|
.and_then(|p| if p.len() == 64 { Ok(Public::from_slice(&p)) } else { Err(format!("Invalid public length {}", p.len())) });
|
||||||
|
let key_server_ip = contract.get_key_server_address(
|
||||||
|
|a, d| future::done(client.call_contract(BlockId::Latest, a, d)), key_server).wait()
|
||||||
|
.and_then(|a| a.parse().map_err(|e| format!("Invalid ip address: {}", e)));
|
||||||
|
if let (Ok(key_server_public), Ok(key_server_ip)) = (key_server_public, key_server_ip) {
|
||||||
|
println!("=== PARSED {:?} {:?}", key_server_public, key_server_ip);
|
||||||
|
key_servers.insert(key_server_public, key_server_ip);
|
||||||
|
}
|
||||||
|
else { println!("=== ERROR parsing"); }
|
||||||
|
}
|
||||||
|
key_servers
|
||||||
|
})
|
||||||
|
.unwrap_or_default();
|
||||||
|
self.contract_addr = new_contract_address;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user