diff --git a/secret_store/src/key_server_cluster/cluster.rs b/secret_store/src/key_server_cluster/cluster.rs index 87c602eae..ff9ff9b22 100644 --- a/secret_store/src/key_server_cluster/cluster.rs +++ b/secret_store/src/key_server_cluster/cluster.rs @@ -289,8 +289,7 @@ impl ClusterCore { /// Accept connection future. fn accept_connection_future(handle: &Handle, data: Arc, stream: TcpStream, node_address: SocketAddr) -> BoxedEmptyFuture { - let disconnected_nodes = data.connections.disconnected_nodes().keys().cloned().collect(); - net_accept_connection(node_address, stream, handle, data.self_key_pair.clone(), disconnected_nodes) + net_accept_connection(node_address, stream, handle, data.self_key_pair.clone()) .then(move |result| ClusterCore::process_connection_result(data, true, result)) .then(|_| finished(())) .boxed() @@ -381,14 +380,16 @@ impl ClusterCore { finished(Ok(())).boxed() } }, - Ok(DeadlineStatus::Meet(Err(_))) => { + Ok(DeadlineStatus::Meet(Err(err))) => { + warn!(target: "secretstore_net", "{}: protocol error {} when establishind connection", data.self_key_pair.public(), err); finished(Ok(())).boxed() }, Ok(DeadlineStatus::Timeout) => { + warn!(target: "secretstore_net", "{}: timeout when establishind connection", data.self_key_pair.public()); finished(Ok(())).boxed() }, - Err(_) => { - // network error + Err(err) => { + warn!(target: "secretstore_net", "{}: network error {} when establishind connection", data.self_key_pair.public(), err); finished(Ok(())).boxed() }, } @@ -699,6 +700,12 @@ impl ClusterConnections { pub fn insert(&self, connection: Arc) -> bool { let mut data = self.data.write(); + if !data.nodes.contains_key(connection.node_id()) { + // incoming connections are checked here + trace!(target: "secretstore_net", "{}: ignoring unknown connection from {} at {}", self.self_node_id, connection.node_id(), connection.node_address()); + debug_assert!(connection.is_inbound()); + return false; + } if data.connections.contains_key(connection.node_id()) { // we have already connected to the same node // the agreement is that node with lower id must establish connection to node with higher id @@ -746,14 +753,37 @@ impl ClusterConnections { let mut new_nodes = self.key_server_set.get(); new_nodes.remove(&self.self_node_id); + let mut num_added_nodes = 0; + let mut num_removed_nodes = 0; + let mut num_changed_nodes = 0; + for obsolete_node in data.nodes.keys().cloned().collect::>() { if !new_nodes.contains_key(&obsolete_node) { + if let Entry::Occupied(entry) = data.connections.entry(obsolete_node) { + trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address()); + entry.remove(); + } + data.nodes.remove(&obsolete_node); - data.connections.remove(&obsolete_node); + num_removed_nodes += 1; } } + for (new_node_public, new_node_addr) in new_nodes { - data.nodes.insert(new_node_public, new_node_addr); + match data.nodes.insert(new_node_public, new_node_addr) { + None => num_added_nodes += 1, + Some(old_node_addr) => if new_node_addr != old_node_addr { + if let Entry::Occupied(entry) = data.connections.entry(new_node_public) { + trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address()); + entry.remove(); + } + num_changed_nodes += 1; + }, + } + } + + if num_added_nodes != 0 || num_removed_nodes != 0 || num_changed_nodes != 0 { + trace!(target: "secretstore_net", "{}: updated nodes set: removed {}, added {}, changed {}", self.self_node_id, num_removed_nodes, num_added_nodes, num_changed_nodes); } } } diff --git a/secret_store/src/key_server_cluster/io/handshake.rs b/secret_store/src/key_server_cluster/io/handshake.rs index 38d8a6ac1..90f4d04cc 100644 --- a/secret_store/src/key_server_cluster/io/handshake.rs +++ b/secret_store/src/key_server_cluster/io/handshake.rs @@ -45,7 +45,7 @@ pub fn handshake_with_plain_confirmation(a: A, self_confirmation_plain: Resul state: state, self_key_pair: self_key_pair, self_confirmation_plain: self_confirmation_plain.unwrap_or(Default::default()), - trusted_nodes: trusted_nodes, + trusted_nodes: Some(trusted_nodes), other_node_id: None, other_confirmation_plain: None, shared_key: None, @@ -53,7 +53,7 @@ pub fn handshake_with_plain_confirmation(a: A, self_confirmation_plain: Resul } /// Wait for handshake procedure to be started by another node from the cluster. -pub fn accept_handshake(a: A, self_key_pair: KeyPair, trusted_nodes: BTreeSet) -> Handshake where A: AsyncWrite + AsyncRead { +pub fn accept_handshake(a: A, self_key_pair: KeyPair) -> Handshake where A: AsyncWrite + AsyncRead { let self_confirmation_plain = Random.generate().map(|kp| *kp.secret().clone()).map_err(Into::into); let (error, state) = match self_confirmation_plain.clone() { Ok(_) => (None, HandshakeState::ReceivePublicKey(read_message(a))), @@ -66,7 +66,7 @@ pub fn accept_handshake(a: A, self_key_pair: KeyPair, trusted_nodes: BTreeSet state: state, self_key_pair: self_key_pair, self_confirmation_plain: self_confirmation_plain.unwrap_or(Default::default()), - trusted_nodes: trusted_nodes, + trusted_nodes: None, other_node_id: None, other_confirmation_plain: None, shared_key: None, @@ -89,7 +89,7 @@ pub struct Handshake { state: HandshakeState, self_key_pair: KeyPair, self_confirmation_plain: H256, - trusted_nodes: BTreeSet, + trusted_nodes: Option>, other_node_id: Option, other_confirmation_plain: Option, shared_key: Option, @@ -172,7 +172,8 @@ impl Future for Handshake where A: AsyncRead + AsyncWrite { Err(err) => return Ok((stream, Err(err.into())).into()), }; - if !self.trusted_nodes.contains(&*message.node_id) { + if !self.trusted_nodes.as_ref().map(|tn| tn.contains(&*message.node_id)).unwrap_or(true) { +println!("=== HANDSHAKE - INVALID NODE: self.trusted_nodes = {:?}, message.node_id = {:?}", self.trusted_nodes, message.node_id); return Ok((stream, Err(Error::InvalidNodeId)).into()); } @@ -300,7 +301,7 @@ mod tests { let trusted_nodes: BTreeSet<_> = vec![io.peer_public().clone()].into_iter().collect(); let shared_key = compute_shared_key(self_key_pair.secret(), trusted_nodes.iter().nth(0).unwrap()).unwrap(); - let mut handshake = accept_handshake(io, self_key_pair, trusted_nodes); + let mut handshake = accept_handshake(io, self_key_pair); handshake.set_self_confirmation_plain(self_confirmation_plain); let handshake_result = handshake.wait().unwrap(); diff --git a/secret_store/src/key_server_cluster/net/accept_connection.rs b/secret_store/src/key_server_cluster/net/accept_connection.rs index 0daa8b2da..339625f3f 100644 --- a/secret_store/src/key_server_cluster/net/accept_connection.rs +++ b/secret_store/src/key_server_cluster/net/accept_connection.rs @@ -17,19 +17,18 @@ use std::io; use std::net::SocketAddr; use std::time::Duration; -use std::collections::BTreeSet; use futures::{Future, Poll}; use tokio_core::reactor::Handle; use tokio_core::net::TcpStream; use ethkey::KeyPair; -use key_server_cluster::{Error, NodeId}; +use key_server_cluster::Error; use key_server_cluster::io::{accept_handshake, Handshake, Deadline, deadline}; use key_server_cluster::net::Connection; /// Create future for accepting incoming connection. -pub fn accept_connection(address: SocketAddr, stream: TcpStream, handle: &Handle, self_key_pair: KeyPair, trusted_nodes: BTreeSet) -> Deadline { +pub fn accept_connection(address: SocketAddr, stream: TcpStream, handle: &Handle, self_key_pair: KeyPair) -> Deadline { let accept = AcceptConnection { - handshake: accept_handshake(stream, self_key_pair, trusted_nodes), + handshake: accept_handshake(stream, self_key_pair), address: address, }; diff --git a/secret_store/src/key_server_set.rs b/secret_store/src/key_server_set.rs index f9ec120fd..7b0bd5c9f 100644 --- a/secret_store/src/key_server_set.rs +++ b/secret_store/src/key_server_set.rs @@ -28,8 +28,8 @@ use types::all::Public; const KEY_SERVER_SET_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_server_set"; // TODO: ethabi should be able to generate this. -const ADDED_EVENT_NAME: &'static [u8] = &*b"KeyServerAdded()"; -const REMOVED_EVENT_NAME: &'static [u8] = &*b"KeyServerRemoved()"; +const ADDED_EVENT_NAME: &'static [u8] = &*b"KeyServerAdded(address)"; +const REMOVED_EVENT_NAME: &'static [u8] = &*b"KeyServerRemoved(address)"; lazy_static! { static ref ADDED_EVENT_NAME_HASH: H256 = ADDED_EVENT_NAME.sha3(); @@ -83,7 +83,6 @@ impl KeyServerSet for OnChainKeyServerSet { impl ChainNotify for OnChainKeyServerSet { fn new_blocks(&self, _imported: Vec, _invalid: Vec, enacted: Vec, retracted: Vec, _sealed: Vec, _proposed: Vec, _duration: u64) { -println!("=== new_blocks: imported {}, invalid: {}, enactd: {}, retracted: {}, sealed: {}, proposed: {}", _imported.len(), _invalid.len(), enacted.len(), retracted.len(), _sealed.len(), _proposed.len()); self.contract.lock().update(enacted, retracted) } } @@ -97,26 +96,33 @@ impl CachedContract { } } - pub fn update(&mut self, enacted: Vec, _retracted: Vec) { + pub fn update(&mut self, enacted: Vec, retracted: Vec) { if let Some(client) = self.client.upgrade() { let new_contract_addr = client.registry_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.to_owned()); -println!("=== Registry address = {:?}", new_contract_addr); + // new contract installed if self.contract_addr.as_ref() != new_contract_addr.as_ref() { self.read_from_registry(&*client, new_contract_addr); return; } - // check for events - for enacted_hash in enacted { - let filter = Filter { - from_block: BlockId::Hash(enacted_hash.clone()), - to_block: BlockId::Hash(enacted_hash.clone()), + // check for contract events + let is_set_changed = self.contract_addr.is_some() && enacted.iter() + .chain(retracted.iter()) + .any(|block_hash| !client.logs(Filter { + from_block: BlockId::Hash(block_hash.clone()), + to_block: BlockId::Hash(block_hash.clone()), address: self.contract_addr.clone().map(|a| vec![a]), - topics: vec![Some(vec![*ADDED_EVENT_NAME_HASH]), Some(vec![*REMOVED_EVENT_NAME_HASH])], - limit: None, - }; - println!("=== Number of filtered log entries: {}", client.logs(filter).len()); + topics: vec![ + Some(vec![*ADDED_EVENT_NAME_HASH, *REMOVED_EVENT_NAME_HASH]), + None, + None, + None, + ], + limit: Some(1), + }).is_empty()); + if is_set_changed { + self.read_from_registry(&*client, new_contract_addr); } } } @@ -126,7 +132,6 @@ println!("=== Registry address = {:?}", new_contract_addr); } fn read_from_registry(&mut self, client: &Client, new_contract_address: Option
) { -println!("=== Installing contract from address: {:?}", new_contract_address); self.key_servers = new_contract_address.map(|contract_addr| { trace!(target: "secretstore", "Configuring for key server set contract from {}", contract_addr); @@ -146,10 +151,8 @@ println!("=== Installing contract from address: {:?}", new_contract_address); |a, d| future::done(client.call_contract(BlockId::Latest, a, d)), key_server).wait() .and_then(|a| a.parse().map_err(|e| format!("Invalid ip address: {}", e))); if let (Ok(key_server_public), Ok(key_server_ip)) = (key_server_public, key_server_ip) { -println!("=== PARSED {:?} {:?}", key_server_public, key_server_ip); key_servers.insert(key_server_public, key_server_ip); } -else { println!("=== ERROR parsing"); } } key_servers })