fixed connection establishing

This commit is contained in:
Svyatoslav Nikolsky 2017-07-20 12:19:29 +03:00
parent 7664ff5acd
commit 80b9e931f5
4 changed files with 67 additions and 34 deletions

View File

@ -289,8 +289,7 @@ impl ClusterCore {
/// Accept connection future.
fn accept_connection_future(handle: &Handle, data: Arc<ClusterData>, stream: TcpStream, node_address: SocketAddr) -> BoxedEmptyFuture {
let disconnected_nodes = data.connections.disconnected_nodes().keys().cloned().collect();
net_accept_connection(node_address, stream, handle, data.self_key_pair.clone(), disconnected_nodes)
net_accept_connection(node_address, stream, handle, data.self_key_pair.clone())
.then(move |result| ClusterCore::process_connection_result(data, true, result))
.then(|_| finished(()))
.boxed()
@ -381,14 +380,16 @@ impl ClusterCore {
finished(Ok(())).boxed()
}
},
Ok(DeadlineStatus::Meet(Err(_))) => {
Ok(DeadlineStatus::Meet(Err(err))) => {
warn!(target: "secretstore_net", "{}: protocol error {} when establishind connection", data.self_key_pair.public(), err);
finished(Ok(())).boxed()
},
Ok(DeadlineStatus::Timeout) => {
warn!(target: "secretstore_net", "{}: timeout when establishind connection", data.self_key_pair.public());
finished(Ok(())).boxed()
},
Err(_) => {
// network error
Err(err) => {
warn!(target: "secretstore_net", "{}: network error {} when establishind connection", data.self_key_pair.public(), err);
finished(Ok(())).boxed()
},
}
@ -699,6 +700,12 @@ impl ClusterConnections {
pub fn insert(&self, connection: Arc<Connection>) -> bool {
let mut data = self.data.write();
if !data.nodes.contains_key(connection.node_id()) {
// incoming connections are checked here
trace!(target: "secretstore_net", "{}: ignoring unknown connection from {} at {}", self.self_node_id, connection.node_id(), connection.node_address());
debug_assert!(connection.is_inbound());
return false;
}
if data.connections.contains_key(connection.node_id()) {
// we have already connected to the same node
// the agreement is that node with lower id must establish connection to node with higher id
@ -746,14 +753,37 @@ impl ClusterConnections {
let mut new_nodes = self.key_server_set.get();
new_nodes.remove(&self.self_node_id);
let mut num_added_nodes = 0;
let mut num_removed_nodes = 0;
let mut num_changed_nodes = 0;
for obsolete_node in data.nodes.keys().cloned().collect::<Vec<_>>() {
if !new_nodes.contains_key(&obsolete_node) {
if let Entry::Occupied(entry) = data.connections.entry(obsolete_node) {
trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address());
entry.remove();
}
data.nodes.remove(&obsolete_node);
data.connections.remove(&obsolete_node);
num_removed_nodes += 1;
}
}
for (new_node_public, new_node_addr) in new_nodes {
data.nodes.insert(new_node_public, new_node_addr);
match data.nodes.insert(new_node_public, new_node_addr) {
None => num_added_nodes += 1,
Some(old_node_addr) => if new_node_addr != old_node_addr {
if let Entry::Occupied(entry) = data.connections.entry(new_node_public) {
trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address());
entry.remove();
}
num_changed_nodes += 1;
},
}
}
if num_added_nodes != 0 || num_removed_nodes != 0 || num_changed_nodes != 0 {
trace!(target: "secretstore_net", "{}: updated nodes set: removed {}, added {}, changed {}", self.self_node_id, num_removed_nodes, num_added_nodes, num_changed_nodes);
}
}
}

View File

@ -45,7 +45,7 @@ pub fn handshake_with_plain_confirmation<A>(a: A, self_confirmation_plain: Resul
state: state,
self_key_pair: self_key_pair,
self_confirmation_plain: self_confirmation_plain.unwrap_or(Default::default()),
trusted_nodes: trusted_nodes,
trusted_nodes: Some(trusted_nodes),
other_node_id: None,
other_confirmation_plain: None,
shared_key: None,
@ -53,7 +53,7 @@ pub fn handshake_with_plain_confirmation<A>(a: A, self_confirmation_plain: Resul
}
/// Wait for handshake procedure to be started by another node from the cluster.
pub fn accept_handshake<A>(a: A, self_key_pair: KeyPair, trusted_nodes: BTreeSet<NodeId>) -> Handshake<A> where A: AsyncWrite + AsyncRead {
pub fn accept_handshake<A>(a: A, self_key_pair: KeyPair) -> Handshake<A> where A: AsyncWrite + AsyncRead {
let self_confirmation_plain = Random.generate().map(|kp| *kp.secret().clone()).map_err(Into::into);
let (error, state) = match self_confirmation_plain.clone() {
Ok(_) => (None, HandshakeState::ReceivePublicKey(read_message(a))),
@ -66,7 +66,7 @@ pub fn accept_handshake<A>(a: A, self_key_pair: KeyPair, trusted_nodes: BTreeSet
state: state,
self_key_pair: self_key_pair,
self_confirmation_plain: self_confirmation_plain.unwrap_or(Default::default()),
trusted_nodes: trusted_nodes,
trusted_nodes: None,
other_node_id: None,
other_confirmation_plain: None,
shared_key: None,
@ -89,7 +89,7 @@ pub struct Handshake<A> {
state: HandshakeState<A>,
self_key_pair: KeyPair,
self_confirmation_plain: H256,
trusted_nodes: BTreeSet<NodeId>,
trusted_nodes: Option<BTreeSet<NodeId>>,
other_node_id: Option<NodeId>,
other_confirmation_plain: Option<H256>,
shared_key: Option<KeyPair>,
@ -172,7 +172,8 @@ impl<A> Future for Handshake<A> where A: AsyncRead + AsyncWrite {
Err(err) => return Ok((stream, Err(err.into())).into()),
};
if !self.trusted_nodes.contains(&*message.node_id) {
if !self.trusted_nodes.as_ref().map(|tn| tn.contains(&*message.node_id)).unwrap_or(true) {
println!("=== HANDSHAKE - INVALID NODE: self.trusted_nodes = {:?}, message.node_id = {:?}", self.trusted_nodes, message.node_id);
return Ok((stream, Err(Error::InvalidNodeId)).into());
}
@ -300,7 +301,7 @@ mod tests {
let trusted_nodes: BTreeSet<_> = vec![io.peer_public().clone()].into_iter().collect();
let shared_key = compute_shared_key(self_key_pair.secret(), trusted_nodes.iter().nth(0).unwrap()).unwrap();
let mut handshake = accept_handshake(io, self_key_pair, trusted_nodes);
let mut handshake = accept_handshake(io, self_key_pair);
handshake.set_self_confirmation_plain(self_confirmation_plain);
let handshake_result = handshake.wait().unwrap();

View File

@ -17,19 +17,18 @@
use std::io;
use std::net::SocketAddr;
use std::time::Duration;
use std::collections::BTreeSet;
use futures::{Future, Poll};
use tokio_core::reactor::Handle;
use tokio_core::net::TcpStream;
use ethkey::KeyPair;
use key_server_cluster::{Error, NodeId};
use key_server_cluster::Error;
use key_server_cluster::io::{accept_handshake, Handshake, Deadline, deadline};
use key_server_cluster::net::Connection;
/// Create future for accepting incoming connection.
pub fn accept_connection(address: SocketAddr, stream: TcpStream, handle: &Handle, self_key_pair: KeyPair, trusted_nodes: BTreeSet<NodeId>) -> Deadline<AcceptConnection> {
pub fn accept_connection(address: SocketAddr, stream: TcpStream, handle: &Handle, self_key_pair: KeyPair) -> Deadline<AcceptConnection> {
let accept = AcceptConnection {
handshake: accept_handshake(stream, self_key_pair, trusted_nodes),
handshake: accept_handshake(stream, self_key_pair),
address: address,
};

View File

@ -28,8 +28,8 @@ use types::all::Public;
const KEY_SERVER_SET_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_server_set";
// TODO: ethabi should be able to generate this.
const ADDED_EVENT_NAME: &'static [u8] = &*b"KeyServerAdded()";
const REMOVED_EVENT_NAME: &'static [u8] = &*b"KeyServerRemoved()";
const ADDED_EVENT_NAME: &'static [u8] = &*b"KeyServerAdded(address)";
const REMOVED_EVENT_NAME: &'static [u8] = &*b"KeyServerRemoved(address)";
lazy_static! {
static ref ADDED_EVENT_NAME_HASH: H256 = ADDED_EVENT_NAME.sha3();
@ -83,7 +83,6 @@ impl KeyServerSet for OnChainKeyServerSet {
impl ChainNotify for OnChainKeyServerSet {
fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, enacted: Vec<H256>, retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: u64) {
println!("=== new_blocks: imported {}, invalid: {}, enactd: {}, retracted: {}, sealed: {}, proposed: {}", _imported.len(), _invalid.len(), enacted.len(), retracted.len(), _sealed.len(), _proposed.len());
self.contract.lock().update(enacted, retracted)
}
}
@ -97,26 +96,33 @@ impl CachedContract {
}
}
pub fn update(&mut self, enacted: Vec<H256>, _retracted: Vec<H256>) {
pub fn update(&mut self, enacted: Vec<H256>, retracted: Vec<H256>) {
if let Some(client) = self.client.upgrade() {
let new_contract_addr = client.registry_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.to_owned());
println!("=== Registry address = {:?}", new_contract_addr);
// new contract installed
if self.contract_addr.as_ref() != new_contract_addr.as_ref() {
self.read_from_registry(&*client, new_contract_addr);
return;
}
// check for events
for enacted_hash in enacted {
let filter = Filter {
from_block: BlockId::Hash(enacted_hash.clone()),
to_block: BlockId::Hash(enacted_hash.clone()),
// check for contract events
let is_set_changed = self.contract_addr.is_some() && enacted.iter()
.chain(retracted.iter())
.any(|block_hash| !client.logs(Filter {
from_block: BlockId::Hash(block_hash.clone()),
to_block: BlockId::Hash(block_hash.clone()),
address: self.contract_addr.clone().map(|a| vec![a]),
topics: vec![Some(vec![*ADDED_EVENT_NAME_HASH]), Some(vec![*REMOVED_EVENT_NAME_HASH])],
limit: None,
};
println!("=== Number of filtered log entries: {}", client.logs(filter).len());
topics: vec![
Some(vec![*ADDED_EVENT_NAME_HASH, *REMOVED_EVENT_NAME_HASH]),
None,
None,
None,
],
limit: Some(1),
}).is_empty());
if is_set_changed {
self.read_from_registry(&*client, new_contract_addr);
}
}
}
@ -126,7 +132,6 @@ println!("=== Registry address = {:?}", new_contract_addr);
}
fn read_from_registry(&mut self, client: &Client, new_contract_address: Option<Address>) {
println!("=== Installing contract from address: {:?}", new_contract_address);
self.key_servers = new_contract_address.map(|contract_addr| {
trace!(target: "secretstore", "Configuring for key server set contract from {}", contract_addr);
@ -146,10 +151,8 @@ println!("=== Installing contract from address: {:?}", new_contract_address);
|a, d| future::done(client.call_contract(BlockId::Latest, a, d)), key_server).wait()
.and_then(|a| a.parse().map_err(|e| format!("Invalid ip address: {}", e)));
if let (Ok(key_server_public), Ok(key_server_ip)) = (key_server_public, key_server_ip) {
println!("=== PARSED {:?} {:?}", key_server_public, key_server_ip);
key_servers.insert(key_server_public, key_server_ip);
}
else { println!("=== ERROR parsing"); }
}
key_servers
})