// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see .
use std::io;
use std::time::{Duration, Instant};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::collections::{BTreeMap, BTreeSet};
use std::collections::btree_map::Entry;
use std::net::{SocketAddr, IpAddr};
use futures::{future, Future, Stream};
use parking_lot::{Mutex, RwLock};
use tokio_io::IoFuture;
use tokio::timer::{Interval, timeout::Error as TimeoutError};
use tokio::net::{TcpListener, TcpStream};
use ethkey::{Public, KeyPair, Signature, Random, Generator};
use ethereum_types::{Address, H256};
use parity_runtime::Executor;
use key_server_cluster::{Error, NodeId, SessionId, Requester, AclStorage, KeyStorage, KeyServerSet, NodeKeyPair};
use key_server_cluster::cluster_sessions::{ClusterSession, AdminSession, ClusterSessions, SessionIdWithSubSession,
ClusterSessionsContainer, SERVERS_SET_CHANGE_SESSION_ID, create_cluster_view, AdminSessionCreationData, ClusterSessionsListener};
use key_server_cluster::cluster_sessions_creator::{ClusterSessionCreator, IntoSessionId};
use key_server_cluster::message::{self, Message, ClusterMessage};
use key_server_cluster::generation_session::{SessionImpl as GenerationSession};
use key_server_cluster::decryption_session::{SessionImpl as DecryptionSession};
use key_server_cluster::encryption_session::{SessionImpl as EncryptionSession};
use key_server_cluster::signing_session_ecdsa::{SessionImpl as EcdsaSigningSession};
use key_server_cluster::signing_session_schnorr::{SessionImpl as SchnorrSigningSession};
use key_server_cluster::key_version_negotiation_session::{SessionImpl as KeyVersionNegotiationSession,
IsolatedSessionTransport as KeyVersionNegotiationSessionTransport, ContinueAction};
use key_server_cluster::io::{DeadlineStatus, ReadMessage, SharedTcpStream, read_encrypted_message, WriteMessage, write_encrypted_message};
use key_server_cluster::net::{accept_connection as net_accept_connection, connect as net_connect, Connection as NetConnection};
use key_server_cluster::connection_trigger::{Maintain, ConnectionTrigger, SimpleConnectionTrigger, ServersSetChangeSessionCreatorConnector};
use key_server_cluster::connection_trigger_with_migration::ConnectionTriggerWithMigration;
/// Maintain interval (seconds). Every MAINTAIN_INTERVAL seconds node:
/// 1) checks if connected nodes are responding to KeepAlive messages
/// 2) tries to connect to disconnected nodes
/// 3) checks if enc/dec sessions are time-outed
const MAINTAIN_INTERVAL: u64 = 10;
/// When no messages have been received from node within KEEP_ALIVE_SEND_INTERVAL seconds,
/// we must send KeepAlive message to the node to check if it still responds to messages.
const KEEP_ALIVE_SEND_INTERVAL: Duration = Duration::from_secs(30);
/// When no messages have been received from node within KEEP_ALIVE_DISCONNECT_INTERVAL seconds,
/// we must treat this node as non-responding && disconnect from it.
const KEEP_ALIVE_DISCONNECT_INTERVAL: Duration = Duration::from_secs(60);
/// Empty future.
pub type BoxedEmptyFuture = Box + Send>;
/// Cluster interface for external clients.
pub trait ClusterClient: Send + Sync {
/// Get cluster state.
fn cluster_state(&self) -> ClusterState;
/// Start new generation session.
fn new_generation_session(&self, session_id: SessionId, origin: Option, author: Address, threshold: usize) -> Result, Error>;
/// Start new encryption session.
fn new_encryption_session(&self, session_id: SessionId, author: Requester, common_point: Public, encrypted_point: Public) -> Result, Error>;
/// Start new decryption session.
fn new_decryption_session(&self, session_id: SessionId, origin: Option, requester: Requester, version: Option, is_shadow_decryption: bool, is_broadcast_decryption: bool) -> Result, Error>;
/// Start new Schnorr signing session.
fn new_schnorr_signing_session(&self, session_id: SessionId, requester: Requester, version: Option, message_hash: H256) -> Result, Error>;
/// Start new ECDSA session.
fn new_ecdsa_signing_session(&self, session_id: SessionId, requester: Requester, version: Option, message_hash: H256) -> Result, Error>;
/// Start new key version negotiation session.
fn new_key_version_negotiation_session(&self, session_id: SessionId) -> Result>, Error>;
/// Start new servers set change session.
fn new_servers_set_change_session(&self, session_id: Option, migration_id: Option, new_nodes_set: BTreeSet, old_set_signature: Signature, new_set_signature: Signature) -> Result, Error>;
/// Listen for new generation sessions.
fn add_generation_listener(&self, listener: Arc>);
/// Listen for new decryption sessions.
fn add_decryption_listener(&self, listener: Arc>);
/// Listen for new key version negotiation sessions.
fn add_key_version_negotiation_listener(&self, listener: Arc>>);
/// Ask node to make 'faulty' generation sessions.
#[cfg(test)]
fn make_faulty_generation_sessions(&self);
/// Get active generation session with given id.
#[cfg(test)]
fn generation_session(&self, session_id: &SessionId) -> Option>;
/// Try connect to disconnected nodes.
#[cfg(test)]
fn connect(&self);
/// Get key storage.
#[cfg(test)]
fn key_storage(&self) -> Arc;
}
/// Cluster access for single session participant.
pub trait Cluster: Send + Sync {
/// Broadcast message to all other nodes.
fn broadcast(&self, message: Message) -> Result<(), Error>;
/// Send message to given node.
fn send(&self, to: &NodeId, message: Message) -> Result<(), Error>;
/// Is connected to given node?
fn is_connected(&self, node: &NodeId) -> bool;
/// Get a set of connected nodes.
fn nodes(&self) -> BTreeSet;
/// Get total count of configured key server nodes (valid at the time of ClusterView creation).
fn configured_nodes_count(&self) -> usize;
/// Get total count of connected key server nodes (valid at the time of ClusterView creation).
fn connected_nodes_count(&self) -> usize;
}
/// Cluster initialization parameters.
#[derive(Clone)]
pub struct ClusterConfiguration {
/// Allow connecting to 'higher' nodes.
pub allow_connecting_to_higher_nodes: bool,
/// KeyPair this node holds.
pub self_key_pair: Arc,
/// Interface to listen to.
pub listen_address: (String, u16),
/// Cluster nodes set.
pub key_server_set: Arc,
/// Reference to key storage
pub key_storage: Arc,
/// Reference to ACL storage
pub acl_storage: Arc,
/// Administrator public key.
pub admin_public: Option,
/// Should key servers set change session when servers set changes? This
/// will only work when servers set is configured using KeyServerSet
/// contract.
pub auto_migrate_enabled: bool,
}
/// Cluster state.
pub struct ClusterState {
/// Nodes, to which connections are established.
pub connected: BTreeSet,
}
/// Network cluster implementation.
pub struct ClusterCore {
/// Listen address.
listen_address: SocketAddr,
/// Cluster data.
data: Arc,
}
/// Network cluster client interface implementation.
pub struct ClusterClientImpl {
/// Cluster data.
data: Arc,
}
/// Network cluster view. It is a communication channel, required in single session.
pub struct ClusterView {
core: Arc>,
configured_nodes_count: usize,
connected_nodes_count: usize,
}
/// Cross-thread shareable cluster data.
pub struct ClusterData {
/// Cluster configuration.
pub config: ClusterConfiguration,
/// Handle to the event loop.
pub executor: Executor,
/// KeyPair this node holds.
pub self_key_pair: Arc,
/// Connections data.
pub connections: ClusterConnections,
/// Active sessions data.
pub sessions: ClusterSessions,
/// A shutdown flag.
pub is_shutdown: Arc,
}
/// Connections that are forming the cluster. Lock order: trigger.lock() -> data.lock().
pub struct ClusterConnections {
/// Self node id.
pub self_node_id: NodeId,
/// All known other key servers.
pub key_server_set: Arc,
/// Connections trigger.
pub trigger: Mutex>,
/// Servers set change session creator connector.
pub connector: Arc,
/// Connections data.
pub data: RwLock,
}
/// Cluster connections data.
pub struct ClusterConnectionsData {
/// Is this node isolated from cluster?
pub is_isolated: bool,
/// Active key servers set.
pub nodes: BTreeMap,
/// Active connections to key servers.
pub connections: BTreeMap>,
}
/// Cluster view core.
struct ClusterViewCore {
/// Cluster reference.
cluster: Arc,
/// Subset of nodes, required for this session.
nodes: BTreeSet,
}
/// Connection to single node.
pub struct Connection {
/// Node id.
node_id: NodeId,
/// Node address.
node_address: SocketAddr,
/// Is inbound connection?
is_inbound: bool,
/// Tcp stream.
stream: SharedTcpStream,
/// Connection key.
key: KeyPair,
/// Last message time.
last_message_time: RwLock,
}
impl ClusterCore {
pub fn new(executor: Executor, config: ClusterConfiguration) -> Result, Error> {
let listen_address = make_socket_address(&config.listen_address.0, config.listen_address.1)?;
let connections = ClusterConnections::new(&config)?;
let servers_set_change_creator_connector = connections.connector.clone();
let sessions = ClusterSessions::new(&config, servers_set_change_creator_connector);
let data = ClusterData::new(&executor, config, connections, sessions);
Ok(Arc::new(ClusterCore {
listen_address: listen_address,
data: data,
}))
}
/// Create new client interface.
pub fn client(&self) -> Arc {
Arc::new(ClusterClientImpl::new(self.data.clone()))
}
/// Get cluster configuration.
#[cfg(test)]
pub fn config(&self) -> &ClusterConfiguration {
&self.data.config
}
/// Get connection to given node.
#[cfg(test)]
pub fn connection(&self, node: &NodeId) -> Option> {
self.data.connection(node)
}
/// Run cluster.
pub fn run(&self) -> Result<(), Error> {
self.run_listener()
.and_then(|_| self.run_connections())?;
// schedule maintain procedures
ClusterCore::schedule_maintain(self.data.clone());
Ok(())
}
/// Start listening for incoming connections.
pub fn run_listener(&self) -> Result<(), Error> {
// start listeining for incoming connections
self.data.spawn(ClusterCore::listen(self.data.clone(), self.listen_address.clone())?);
Ok(())
}
/// Start connecting to other nodes.
pub fn run_connections(&self) -> Result<(), Error> {
// try to connect to every other peer
ClusterCore::connect_disconnected_nodes(self.data.clone());
Ok(())
}
/// Connect to peer.
fn connect(data: Arc, node_address: SocketAddr) {
data.clone().spawn(ClusterCore::connect_future(data, node_address));
}
/// Connect to socket using given context and executor.
fn connect_future(data: Arc, node_address: SocketAddr) -> BoxedEmptyFuture {
let disconnected_nodes = data.connections.disconnected_nodes().keys().cloned().collect();
Box::new(net_connect(&node_address, data.self_key_pair.clone(), disconnected_nodes)
.then(move |result| ClusterCore::process_connection_result(data, Some(node_address), result))
.then(|_| future::ok(())))
}
/// Start listening for incoming connections.
fn listen(data: Arc, listen_address: SocketAddr) -> Result {
Ok(Box::new(TcpListener::bind(&listen_address)?
.incoming()
.and_then(move |stream| {
ClusterCore::accept_connection(data.clone(), stream);
Ok(())
})
.for_each(|_| Ok(()))
.then(|_| future::ok(()))))
}
/// Accept connection.
fn accept_connection(data: Arc, stream: TcpStream) {
data.clone().spawn(ClusterCore::accept_connection_future(data, stream))
}
/// Accept connection future.
fn accept_connection_future(data: Arc, stream: TcpStream) -> BoxedEmptyFuture {
Box::new(net_accept_connection(stream, data.self_key_pair.clone())
.then(move |result| ClusterCore::process_connection_result(data, None, result))
.then(|_| future::ok(())))
}
/// Schedule mainatain procedures.
fn schedule_maintain(data: Arc) {
let d = data.clone();
let interval = Interval::new_interval(Duration::new(MAINTAIN_INTERVAL, 0))
.and_then(move |_| Ok(ClusterCore::maintain(data.clone())))
.for_each(|_| Ok(()))
.then(|_| future::ok(()));
d.spawn(interval);
}
/// Execute maintain procedures.
fn maintain(data: Arc) {
trace!(target: "secretstore_net", "{}: executing maintain procedures", data.self_key_pair.public());
ClusterCore::keep_alive(data.clone());
ClusterCore::connect_disconnected_nodes(data.clone());
data.sessions.stop_stalled_sessions();
}
/// Called for every incomming mesage.
fn process_connection_messages(data: Arc, connection: Arc) -> IoFuture> {
Box::new(connection
.read_message()
.then(move |result|
match result {
Ok((_, Ok(message))) => {
ClusterCore::process_connection_message(data.clone(), connection.clone(), message);
// continue serving connection
data.spawn(ClusterCore::process_connection_messages(data.clone(), connection).then(|_| Ok(())));
Box::new(future::ok(Ok(())))
},
Ok((_, Err(err))) => {
warn!(target: "secretstore_net", "{}: protocol error '{}' when reading message from node {}", data.self_key_pair.public(), err, connection.node_id());
// continue serving connection
data.spawn(ClusterCore::process_connection_messages(data.clone(), connection).then(|_| Ok(())));
Box::new(future::ok(Err(err)))
},
Err(err) => {
warn!(target: "secretstore_net", "{}: network error '{}' when reading message from node {}", data.self_key_pair.public(), err, connection.node_id());
// close connection
data.connections.remove(data.clone(), connection.node_id(), connection.is_inbound());
Box::new(future::err(err))
},
}
))
}
/// Send keepalive messages to every othe node.
fn keep_alive(data: Arc) {
data.sessions.sessions_keep_alive();
for connection in data.connections.active_connections() {
let last_message_diff = Instant::now() - connection.last_message_time();
if last_message_diff > KEEP_ALIVE_DISCONNECT_INTERVAL {
warn!(target: "secretstore_net", "{}: keep alive timeout for node {}",
data.self_key_pair.public(), connection.node_id());
data.connections.remove(data.clone(), connection.node_id(), connection.is_inbound());
data.sessions.on_connection_timeout(connection.node_id());
}
else if last_message_diff > KEEP_ALIVE_SEND_INTERVAL {
data.spawn(connection.send_message(Message::Cluster(ClusterMessage::KeepAlive(message::KeepAlive {}))).then(|_| Ok(())));
}
}
}
/// Try to connect to every disconnected node.
fn connect_disconnected_nodes(data: Arc) {
let r = data.connections.update_nodes_set(data.clone());
if let Some(r) = r {
data.spawn(r);
}
// connect to disconnected nodes
for (node_id, node_address) in data.connections.disconnected_nodes() {
if data.config.allow_connecting_to_higher_nodes || data.self_key_pair.public() < &node_id {
ClusterCore::connect(data.clone(), node_address);
}
}
}
/// Process connection future result.
fn process_connection_result(data: Arc, outbound_addr: Option,
result: Result>, TimeoutError>) -> IoFuture>
{
match result {
Ok(DeadlineStatus::Meet(Ok(connection))) => {
let connection = Connection::new(outbound_addr.is_none(), connection);
if data.connections.insert(data.clone(), connection.clone()) {
ClusterCore::process_connection_messages(data.clone(), connection)
} else {
Box::new(future::ok(Ok(())))
}
},
Ok(DeadlineStatus::Meet(Err(err))) => {
warn!(target: "secretstore_net", "{}: protocol error '{}' when establishing {} connection{}",
data.self_key_pair.public(), err, if outbound_addr.is_some() { "outbound" } else { "inbound" },
outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default());
Box::new(future::ok(Ok(())))
},
Ok(DeadlineStatus::Timeout) => {
warn!(target: "secretstore_net", "{}: timeout when establishing {} connection{}",
data.self_key_pair.public(), if outbound_addr.is_some() { "outbound" } else { "inbound" },
outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default());
Box::new(future::ok(Ok(())))
},
Err(err) => {
warn!(target: "secretstore_net", "{}: network error '{}' when establishing {} connection{}",
data.self_key_pair.public(), err, if outbound_addr.is_some() { "outbound" } else { "inbound" },
outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default());
Box::new(future::ok(Ok(())))
},
}
}
/// Process single message from the connection.
fn process_connection_message(data: Arc, connection: Arc, message: Message) {
connection.set_last_message_time(Instant::now());
trace!(target: "secretstore_net", "{}: received message {} from {}", data.self_key_pair.public(), message, connection.node_id());
// error is ignored as we only process errors on session level
match message {
Message::Generation(message) => Self::process_message(&data, &data.sessions.generation_sessions, connection, Message::Generation(message))
.map(|_| ()).unwrap_or_default(),
Message::Encryption(message) => Self::process_message(&data, &data.sessions.encryption_sessions, connection, Message::Encryption(message))
.map(|_| ()).unwrap_or_default(),
Message::Decryption(message) => Self::process_message(&data, &data.sessions.decryption_sessions, connection, Message::Decryption(message))
.map(|_| ()).unwrap_or_default(),
Message::SchnorrSigning(message) => Self::process_message(&data, &data.sessions.schnorr_signing_sessions, connection, Message::SchnorrSigning(message))
.map(|_| ()).unwrap_or_default(),
Message::EcdsaSigning(message) => Self::process_message(&data, &data.sessions.ecdsa_signing_sessions, connection, Message::EcdsaSigning(message))
.map(|_| ()).unwrap_or_default(),
Message::ServersSetChange(message) => {
let message = Message::ServersSetChange(message);
let is_initialization_message = message.is_initialization_message();
let session = Self::process_message(&data, &data.sessions.admin_sessions, connection, message);
if is_initialization_message {
if let Some(session) = session {
data.connections.servers_set_change_creator_connector().set_key_servers_set_change_session(session.clone());
}
}
}
Message::KeyVersionNegotiation(message) => {
let session = Self::process_message(&data, &data.sessions.negotiation_sessions, connection, Message::KeyVersionNegotiation(message));
Self::try_continue_session(&data, session);
},
Message::ShareAdd(message) => Self::process_message(&data, &data.sessions.admin_sessions, connection, Message::ShareAdd(message))
.map(|_| ()).unwrap_or_default(),
Message::Cluster(message) => ClusterCore::process_cluster_message(data, connection, message),
}
}
/// Try to contnue session.
fn try_continue_session(data: &Arc, session: Option>>) {
if let Some(session) = session {
let meta = session.meta();
let is_master_node = meta.self_node_id == meta.master_node_id;
if is_master_node && session.is_finished() {
data.sessions.negotiation_sessions.remove(&session.id());
match session.wait() {
Ok(Some((version, master))) => match session.take_continue_action() {
Some(ContinueAction::Decrypt(session, origin, is_shadow_decryption, is_broadcast_decryption)) => {
let initialization_error = if data.self_key_pair.public() == &master {
session.initialize(origin, version, is_shadow_decryption, is_broadcast_decryption)
} else {
session.delegate(master, origin, version, is_shadow_decryption, is_broadcast_decryption)
};
if let Err(error) = initialization_error {
session.on_session_error(&meta.self_node_id, error);
data.sessions.decryption_sessions.remove(&session.id());
}
},
Some(ContinueAction::SchnorrSign(session, message_hash)) => {
let initialization_error = if data.self_key_pair.public() == &master {
session.initialize(version, message_hash)
} else {
session.delegate(master, version, message_hash)
};
if let Err(error) = initialization_error {
session.on_session_error(&meta.self_node_id, error);
data.sessions.schnorr_signing_sessions.remove(&session.id());
}
},
Some(ContinueAction::EcdsaSign(session, message_hash)) => {
let initialization_error = if data.self_key_pair.public() == &master {
session.initialize(version, message_hash)
} else {
session.delegate(master, version, message_hash)
};
if let Err(error) = initialization_error {
session.on_session_error(&meta.self_node_id, error);
data.sessions.ecdsa_signing_sessions.remove(&session.id());
}
},
None => (),
},
Ok(None) => unreachable!("is_master_node; session is finished; negotiation version always finished with result on master; qed"),
Err(error) => match session.take_continue_action() {
Some(ContinueAction::Decrypt(session, _, _, _)) => {
session.on_session_error(&meta.self_node_id, error);
data.sessions.decryption_sessions.remove(&session.id());
},
Some(ContinueAction::SchnorrSign(session, _)) => {
session.on_session_error(&meta.self_node_id, error);
data.sessions.schnorr_signing_sessions.remove(&session.id());
},
Some(ContinueAction::EcdsaSign(session, _)) => {
session.on_session_error(&meta.self_node_id, error);
data.sessions.ecdsa_signing_sessions.remove(&session.id());
},
None => (),
},
}
}
}
}
/// Get or insert new session.
fn prepare_session, D>(data: &Arc, sessions: &ClusterSessionsContainer, sender: &NodeId, message: &Message) -> Result, Error>
where Message: IntoSessionId {
fn requires_all_connections(message: &Message) -> bool {
match *message {
Message::Generation(_) => true,
Message::ShareAdd(_) => true,
Message::ServersSetChange(_) => true,
_ => false,
}
}
// get or create new session, if required
let session_id = message.into_session_id().expect("into_session_id fails for cluster messages only; only session messages are passed to prepare_session; qed");
let is_initialization_message = message.is_initialization_message();
let is_delegation_message = message.is_delegation_message();
match is_initialization_message || is_delegation_message {
false => sessions.get(&session_id, true).ok_or(Error::NoActiveSessionWithId),
true => {
let creation_data = SC::creation_data_from_message(&message)?;
let master = if is_initialization_message { sender.clone() } else { data.self_key_pair.public().clone() };
let cluster = create_cluster_view(data, requires_all_connections(&message))?;
sessions.insert(cluster, master, session_id, Some(message.session_nonce().ok_or(Error::InvalidMessage)?), message.is_exclusive_session_message(), creation_data)
},
}
}
/// Process single session message from connection.
fn process_message, D>(data: &Arc, sessions: &ClusterSessionsContainer, connection: Arc, mut message: Message) -> Option>
where Message: IntoSessionId {
// get or create new session, if required
let mut sender = connection.node_id().clone();
let session = Self::prepare_session(data, sessions, &sender, &message);
// send error if session is not found, or failed to create
let session = match session {
Ok(session) => session,
Err(error) => {
// this is new session => it is not yet in container
warn!(target: "secretstore_net", "{}: {} session read error '{}' when requested for session from node {}",
data.self_key_pair.public(), S::type_name(), error, sender);
if !message.is_error_message() {
let session_id = message.into_session_id().expect("session_id only fails for cluster messages; only session messages are passed to process_message; qed");
let session_nonce = message.session_nonce().expect("session_nonce only fails for cluster messages; only session messages are passed to process_message; qed");
data.spawn(connection.send_message(SC::make_error_message(session_id, session_nonce, error)).then(|_| Ok(())));
}
return None;
},
};
let session_id = session.id();
let mut is_queued_message = false;
loop {
let message_result = session.on_message(&sender, &message);
match message_result {
Ok(_) => {
// if session is completed => stop
if session.is_finished() {
info!(target: "secretstore_net", "{}: {} session completed", data.self_key_pair.public(), S::type_name());
sessions.remove(&session_id);
return Some(session);
}
// try to dequeue message
match sessions.dequeue_message(&session_id) {
Some((msg_sender, msg)) => {
is_queued_message = true;
sender = msg_sender;
message = msg;
},
None => return Some(session),
}
},
Err(Error::TooEarlyForRequest) => {
sessions.enqueue_message(&session_id, sender, message, is_queued_message);
return Some(session);
},
Err(err) => {
warn!(target: "secretstore_net", "{}: {} session error '{}' when processing message {} from node {}",
data.self_key_pair.public(),
S::type_name(),
err,
message,
sender);
session.on_session_error(data.self_key_pair.public(), err);
sessions.remove(&session_id);
return Some(session);
},
}
}
}
/// Process single cluster message from the connection.
fn process_cluster_message(data: Arc, connection: Arc, message: ClusterMessage) {
match message {
ClusterMessage::KeepAlive(_) => data.spawn(connection.send_message(Message::Cluster(ClusterMessage::KeepAliveResponse(message::KeepAliveResponse {
session_id: None,
}))).then(|_| Ok(()))),
ClusterMessage::KeepAliveResponse(msg) => if let Some(session_id) = msg.session_id {
data.sessions.on_session_keep_alive(connection.node_id(), session_id.into());
},
_ => warn!(target: "secretstore_net", "{}: received unexpected message {} from node {} at {}", data.self_key_pair.public(), message, connection.node_id(), connection.node_address()),
}
}
/// Prevents new tasks from being spawned.
#[cfg(test)]
pub fn shutdown(&self) {
self.data.shutdown()
}
}
impl ClusterConnections {
pub fn new(config: &ClusterConfiguration) -> Result {
let mut nodes = config.key_server_set.snapshot().current_set;
let is_isolated = nodes.remove(config.self_key_pair.public()).is_none();
let trigger: Box = match config.auto_migrate_enabled {
false => Box::new(SimpleConnectionTrigger::new(config.key_server_set.clone(), config.self_key_pair.clone(), config.admin_public.clone())),
true if config.admin_public.is_none() => Box::new(ConnectionTriggerWithMigration::new(config.key_server_set.clone(), config.self_key_pair.clone())),
true => return Err(Error::Internal("secret store admininstrator public key is specified with auto-migration enabled".into())),
};
let connector = trigger.servers_set_change_creator_connector();
Ok(ClusterConnections {
self_node_id: config.self_key_pair.public().clone(),
key_server_set: config.key_server_set.clone(),
trigger: Mutex::new(trigger),
connector: connector,
data: RwLock::new(ClusterConnectionsData {
is_isolated: is_isolated,
nodes: nodes,
connections: BTreeMap::new(),
}),
})
}
pub fn cluster_state(&self) -> ClusterState {
ClusterState {
connected: self.data.read().connections.keys().cloned().collect(),
}
}
pub fn get(&self, node: &NodeId) -> Option> {
self.data.read().connections.get(node).cloned()
}
pub fn insert(&self, data: Arc, connection: Arc) -> bool {
{
let mut data = self.data.write();
if !data.nodes.contains_key(connection.node_id()) {
// incoming connections are checked here
trace!(target: "secretstore_net", "{}: ignoring unknown connection from {} at {}", self.self_node_id, connection.node_id(), connection.node_address());
debug_assert!(connection.is_inbound());
return false;
}
if data.connections.contains_key(connection.node_id()) {
// we have already connected to the same node
// the agreement is that node with lower id must establish connection to node with higher id
if (&self.self_node_id < connection.node_id() && connection.is_inbound())
|| (&self.self_node_id > connection.node_id() && !connection.is_inbound()) {
return false;
}
}
let node = connection.node_id().clone();
trace!(target: "secretstore_net", "{}: inserting connection to {} at {}. Connected to {} of {} nodes",
self.self_node_id, node, connection.node_address(), data.connections.len() + 1, data.nodes.len());
data.connections.insert(node.clone(), connection.clone());
}
let maintain_action = self.trigger.lock().on_connection_established(connection.node_id());
self.maintain_connection_trigger(maintain_action, data);
true
}
pub fn remove(&self, data: Arc, node: &NodeId, is_inbound: bool) {
{
let mut data = self.data.write();
if let Entry::Occupied(entry) = data.connections.entry(node.clone()) {
if entry.get().is_inbound() != is_inbound {
return;
}
trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address());
entry.remove_entry();
} else {
return;
}
}
let maintain_action = self.trigger.lock().on_connection_closed(node);
self.maintain_connection_trigger(maintain_action, data);
}
pub fn connected_nodes(&self) -> Result, Error> {
let data = self.data.read();
if data.is_isolated {
return Err(Error::NodeDisconnected);
}
Ok(data.connections.keys().cloned().collect())
}
pub fn active_connections(&self)-> Vec> {
self.data.read().connections.values().cloned().collect()
}
pub fn disconnected_nodes(&self) -> BTreeMap {
let data = self.data.read();
data.nodes.iter()
.filter(|&(node_id, _)| !data.connections.contains_key(node_id))
.map(|(node_id, node_address)| (node_id.clone(), node_address.clone()))
.collect()
}
pub fn servers_set_change_creator_connector(&self) -> Arc {
self.connector.clone()
}
pub fn update_nodes_set(&self, data: Arc) -> Option {
let maintain_action = self.trigger.lock().on_maintain();
self.maintain_connection_trigger(maintain_action, data);
None
}
fn maintain_connection_trigger(&self, maintain_action: Option, data: Arc) {
if maintain_action == Some(Maintain::SessionAndConnections) || maintain_action == Some(Maintain::Session) {
let client = ClusterClientImpl::new(data);
self.trigger.lock().maintain_session(&client);
}
if maintain_action == Some(Maintain::SessionAndConnections) || maintain_action == Some(Maintain::Connections) {
let mut trigger = self.trigger.lock();
let mut data = self.data.write();
trigger.maintain_connections(&mut *data);
}
}
}
impl ClusterData {
pub fn new(executor: &Executor, config: ClusterConfiguration, connections: ClusterConnections, sessions: ClusterSessions) -> Arc {
Arc::new(ClusterData {
executor: executor.clone(),
self_key_pair: config.self_key_pair.clone(),
connections: connections,
sessions: sessions,
config: config,
is_shutdown: Arc::new(AtomicBool::new(false)),
})
}
/// Get connection to given node.
pub fn connection(&self, node: &NodeId) -> Option> {
self.connections.get(node)
}
/// Spawns a future on the runtime.
pub fn spawn(&self, f: F) where F: Future- + Send + 'static {
if self.is_shutdown.load(Ordering::Acquire) == false {
if let Err(err) = future::Executor::execute(&self.executor, Box::new(f)) {
error!("Secret store runtime unable to spawn task. Runtime is shutting down. ({:?})", err);
}
} else {
error!("Secret store runtime unable to spawn task. Shutdown has been started.");
}
}
/// Sets the `is_shutdown` flag which prevents future tasks from being
/// spawned via `::spawn`.
#[cfg(test)]
pub fn shutdown(&self) {
self.is_shutdown.store(true, Ordering::Release);
}
}
impl Connection {
pub fn new(is_inbound: bool, connection: NetConnection) -> Arc {
Arc::new(Connection {
node_id: connection.node_id,
node_address: connection.address,
is_inbound: is_inbound,
stream: connection.stream,
key: connection.key,
last_message_time: RwLock::new(Instant::now()),
})
}
pub fn is_inbound(&self) -> bool {
self.is_inbound
}
pub fn node_id(&self) -> &NodeId {
&self.node_id
}
pub fn last_message_time(&self) -> Instant {
*self.last_message_time.read()
}
pub fn set_last_message_time(&self, last_message_time: Instant) {
*self.last_message_time.write() = last_message_time;
}
pub fn node_address(&self) -> &SocketAddr {
&self.node_address
}
pub fn send_message(&self, message: Message) -> WriteMessage {
write_encrypted_message(self.stream.clone(), &self.key, message)
}
pub fn read_message(&self) -> ReadMessage {
read_encrypted_message(self.stream.clone(), self.key.clone())
}
}
impl ClusterView {
pub fn new(cluster: Arc, nodes: BTreeSet, configured_nodes_count: usize) -> Self {
ClusterView {
configured_nodes_count: configured_nodes_count,
connected_nodes_count: nodes.len(),
core: Arc::new(RwLock::new(ClusterViewCore {
cluster: cluster,
nodes: nodes,
})),
}
}
}
impl Cluster for ClusterView {
fn broadcast(&self, message: Message) -> Result<(), Error> {
let core = self.core.read();
for node in core.nodes.iter().filter(|n| *n != core.cluster.self_key_pair.public()) {
trace!(target: "secretstore_net", "{}: sent message {} to {}", core.cluster.self_key_pair.public(), message, node);
let connection = core.cluster.connection(node).ok_or(Error::NodeDisconnected)?;
core.cluster.spawn(connection.send_message(message.clone()).then(|_| Ok(())))
}
Ok(())
}
fn send(&self, to: &NodeId, message: Message) -> Result<(), Error> {
let core = self.core.read();
trace!(target: "secretstore_net", "{}: sent message {} to {}", core.cluster.self_key_pair.public(), message, to);
let connection = core.cluster.connection(to).ok_or(Error::NodeDisconnected)?;
core.cluster.spawn(connection.send_message(message).then(|_| Ok(())));
Ok(())
}
fn is_connected(&self, node: &NodeId) -> bool {
self.core.read().nodes.contains(node)
}
fn nodes(&self) -> BTreeSet {
self.core.read().nodes.clone()
}
fn configured_nodes_count(&self) -> usize {
self.configured_nodes_count
}
fn connected_nodes_count(&self) -> usize {
self.connected_nodes_count
}
}
impl ClusterClientImpl {
pub fn new(data: Arc) -> Self {
ClusterClientImpl {
data: data,
}
}
fn create_key_version_negotiation_session(&self, session_id: SessionId) -> Result>, Error> {
let mut connected_nodes = self.data.connections.connected_nodes()?;
connected_nodes.insert(self.data.self_key_pair.public().clone());
let access_key = Random.generate()?.secret().clone();
let session_id = SessionIdWithSubSession::new(session_id, access_key);
let cluster = create_cluster_view(&self.data, false)?;
let session = self.data.sessions.negotiation_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id.clone(), None, false, None)?;
match session.initialize(connected_nodes) {
Ok(()) => Ok(session),
Err(error) => {
self.data.sessions.negotiation_sessions.remove(&session.id());
Err(error)
}
}
}
fn process_initialization_result, D>(result: Result<(), Error>, session: Arc
, sessions: &ClusterSessionsContainer) -> Result, Error> {
match result {
Ok(()) if session.is_finished() => {
sessions.remove(&session.id());
Ok(session)
},
Ok(()) => Ok(session),
Err(error) => {
sessions.remove(&session.id());
Err(error)
},
}
}
}
impl ClusterClient for ClusterClientImpl {
fn cluster_state(&self) -> ClusterState {
self.data.connections.cluster_state()
}
fn new_generation_session(&self, session_id: SessionId, origin: Option, author: Address, threshold: usize) -> Result, Error> {
let mut connected_nodes = self.data.connections.connected_nodes()?;
connected_nodes.insert(self.data.self_key_pair.public().clone());
let cluster = create_cluster_view(&self.data, true)?;
let session = self.data.sessions.generation_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id, None, false, None)?;
Self::process_initialization_result(
session.initialize(origin, author, false, threshold, connected_nodes.into()),
session, &self.data.sessions.generation_sessions)
}
fn new_encryption_session(&self, session_id: SessionId, requester: Requester, common_point: Public, encrypted_point: Public) -> Result, Error> {
let mut connected_nodes = self.data.connections.connected_nodes()?;
connected_nodes.insert(self.data.self_key_pair.public().clone());
let cluster = create_cluster_view(&self.data, true)?;
let session = self.data.sessions.encryption_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id, None, false, None)?;
Self::process_initialization_result(
session.initialize(requester, common_point, encrypted_point),
session, &self.data.sessions.encryption_sessions)
}
fn new_decryption_session(&self, session_id: SessionId, origin: Option, requester: Requester, version: Option, is_shadow_decryption: bool, is_broadcast_decryption: bool) -> Result, Error> {
let mut connected_nodes = self.data.connections.connected_nodes()?;
connected_nodes.insert(self.data.self_key_pair.public().clone());
let access_key = Random.generate()?.secret().clone();
let session_id = SessionIdWithSubSession::new(session_id, access_key);
let cluster = create_cluster_view(&self.data, false)?;
let session = self.data.sessions.decryption_sessions.insert(cluster, self.data.self_key_pair.public().clone(),
session_id.clone(), None, false, Some(requester))?;
let initialization_result = match version {
Some(version) => session.initialize(origin, version, is_shadow_decryption, is_broadcast_decryption),
None => {
self.create_key_version_negotiation_session(session_id.id.clone())
.map(|version_session| {
version_session.set_continue_action(ContinueAction::Decrypt(session.clone(), origin, is_shadow_decryption, is_broadcast_decryption));
ClusterCore::try_continue_session(&self.data, Some(version_session));
})
},
};
Self::process_initialization_result(
initialization_result,
session, &self.data.sessions.decryption_sessions)
}
fn new_schnorr_signing_session(&self, session_id: SessionId, requester: Requester, version: Option, message_hash: H256) -> Result, Error> {
let mut connected_nodes = self.data.connections.connected_nodes()?;
connected_nodes.insert(self.data.self_key_pair.public().clone());
let access_key = Random.generate()?.secret().clone();
let session_id = SessionIdWithSubSession::new(session_id, access_key);
let cluster = create_cluster_view(&self.data, false)?;
let session = self.data.sessions.schnorr_signing_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id.clone(), None, false, Some(requester))?;
let initialization_result = match version {
Some(version) => session.initialize(version, message_hash),
None => {
self.create_key_version_negotiation_session(session_id.id.clone())
.map(|version_session| {
version_session.set_continue_action(ContinueAction::SchnorrSign(session.clone(), message_hash));
ClusterCore::try_continue_session(&self.data, Some(version_session));
})
},
};
Self::process_initialization_result(
initialization_result,
session, &self.data.sessions.schnorr_signing_sessions)
}
fn new_ecdsa_signing_session(&self, session_id: SessionId, requester: Requester, version: Option, message_hash: H256) -> Result, Error> {
let mut connected_nodes = self.data.connections.connected_nodes()?;
connected_nodes.insert(self.data.self_key_pair.public().clone());
let access_key = Random.generate()?.secret().clone();
let session_id = SessionIdWithSubSession::new(session_id, access_key);
let cluster = create_cluster_view(&self.data, false)?;
let session = self.data.sessions.ecdsa_signing_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id.clone(), None, false, Some(requester))?;
let initialization_result = match version {
Some(version) => session.initialize(version, message_hash),
None => {
self.create_key_version_negotiation_session(session_id.id.clone())
.map(|version_session| {
version_session.set_continue_action(ContinueAction::EcdsaSign(session.clone(), message_hash));
ClusterCore::try_continue_session(&self.data, Some(version_session));
})
},
};
Self::process_initialization_result(
initialization_result,
session, &self.data.sessions.ecdsa_signing_sessions)
}
fn new_key_version_negotiation_session(&self, session_id: SessionId) -> Result>, Error> {
let session = self.create_key_version_negotiation_session(session_id)?;
Ok(session)
}
fn new_servers_set_change_session(&self, session_id: Option, migration_id: Option, new_nodes_set: BTreeSet, old_set_signature: Signature, new_set_signature: Signature) -> Result, Error> {
let mut connected_nodes = self.data.connections.connected_nodes()?;
connected_nodes.insert(self.data.self_key_pair.public().clone());
let session_id = match session_id {
Some(session_id) if session_id == *SERVERS_SET_CHANGE_SESSION_ID => session_id,
Some(_) => return Err(Error::InvalidMessage),
None => *SERVERS_SET_CHANGE_SESSION_ID,
};
let cluster = create_cluster_view(&self.data, true)?;
let creation_data = Some(AdminSessionCreationData::ServersSetChange(migration_id, new_nodes_set.clone()));
let session = self.data.sessions.admin_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id, None, true, creation_data)?;
let initialization_result = session.as_servers_set_change().expect("servers set change session is created; qed")
.initialize(new_nodes_set, old_set_signature, new_set_signature);
if initialization_result.is_ok() {
self.data.connections.servers_set_change_creator_connector().set_key_servers_set_change_session(session.clone());
}
Self::process_initialization_result(
initialization_result,
session, &self.data.sessions.admin_sessions)
}
fn add_generation_listener(&self, listener: Arc>) {
self.data.sessions.generation_sessions.add_listener(listener);
}
fn add_decryption_listener(&self, listener: Arc>) {
self.data.sessions.decryption_sessions.add_listener(listener);
}
fn add_key_version_negotiation_listener(&self, listener: Arc>>) {
self.data.sessions.negotiation_sessions.add_listener(listener);
}
#[cfg(test)]
fn connect(&self) {
ClusterCore::connect_disconnected_nodes(self.data.clone());
}
#[cfg(test)]
fn make_faulty_generation_sessions(&self) {
self.data.sessions.make_faulty_generation_sessions();
}
#[cfg(test)]
fn generation_session(&self, session_id: &SessionId) -> Option> {
self.data.sessions.generation_sessions.get(session_id, false)
}
#[cfg(test)]
fn key_storage(&self) -> Arc {
self.data.config.key_storage.clone()
}
}
fn make_socket_address(address: &str, port: u16) -> Result {
let ip_address: IpAddr = address.parse().map_err(|_| Error::InvalidNodeAddress)?;
Ok(SocketAddr::new(ip_address, port))
}
#[cfg(test)]
pub mod tests {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use std::collections::{BTreeSet, VecDeque};
use parking_lot::RwLock;
use tokio::{
prelude::{future, Future},
};
use parity_runtime::{
futures::sync::oneshot,
Runtime, Executor,
};
use ethereum_types::{Address, H256};
use ethkey::{Random, Generator, Public, Signature, sign};
use key_server_cluster::{NodeId, SessionId, Requester, Error, DummyAclStorage, DummyKeyStorage,
MapKeyServerSet, PlainNodeKeyPair, KeyStorage};
use key_server_cluster::message::Message;
use key_server_cluster::cluster::{Cluster, ClusterCore, ClusterConfiguration, ClusterClient, ClusterState};
use key_server_cluster::cluster_sessions::{ClusterSession, AdminSession, ClusterSessionsListener};
use key_server_cluster::generation_session::{SessionImpl as GenerationSession, SessionState as GenerationSessionState};
use key_server_cluster::decryption_session::{SessionImpl as DecryptionSession};
use key_server_cluster::encryption_session::{SessionImpl as EncryptionSession};
use key_server_cluster::signing_session_ecdsa::{SessionImpl as EcdsaSigningSession};
use key_server_cluster::signing_session_schnorr::{SessionImpl as SchnorrSigningSession};
use key_server_cluster::key_version_negotiation_session::{SessionImpl as KeyVersionNegotiationSession,
IsolatedSessionTransport as KeyVersionNegotiationSessionTransport};
const TIMEOUT: Duration = Duration::from_millis(1000);
#[derive(Default)]
pub struct DummyClusterClient {
pub generation_requests_count: AtomicUsize,
}
#[derive(Debug)]
pub struct DummyCluster {
id: NodeId,
data: RwLock,
}
#[derive(Debug, Default)]
struct DummyClusterData {
nodes: BTreeSet,
messages: VecDeque<(NodeId, Message)>,
}
impl ClusterClient for DummyClusterClient {
fn cluster_state(&self) -> ClusterState { unimplemented!("test-only") }
fn new_generation_session(&self, _session_id: SessionId, _origin: Option, _author: Address, _threshold: usize) -> Result, Error> {
self.generation_requests_count.fetch_add(1, Ordering::Relaxed);
Err(Error::Internal("test-error".into()))
}
fn new_encryption_session(&self, _session_id: SessionId, _requester: Requester, _common_point: Public, _encrypted_point: Public) -> Result, Error> { unimplemented!("test-only") }
fn new_decryption_session(&self, _session_id: SessionId, _origin: Option, _requester: Requester, _version: Option, _is_shadow_decryption: bool, _is_broadcast_session: bool) -> Result, Error> { unimplemented!("test-only") }
fn new_schnorr_signing_session(&self, _session_id: SessionId, _requester: Requester, _version: Option, _message_hash: H256) -> Result, Error> { unimplemented!("test-only") }
fn new_ecdsa_signing_session(&self, _session_id: SessionId, _requester: Requester, _version: Option, _message_hash: H256) -> Result, Error> { unimplemented!("test-only") }
fn new_key_version_negotiation_session(&self, _session_id: SessionId) -> Result>, Error> { unimplemented!("test-only") }
fn new_servers_set_change_session(&self, _session_id: Option, _migration_id: Option, _new_nodes_set: BTreeSet, _old_set_signature: Signature, _new_set_signature: Signature) -> Result, Error> { unimplemented!("test-only") }
fn add_generation_listener(&self, _listener: Arc>) {}
fn add_decryption_listener(&self, _listener: Arc>) {}
fn add_key_version_negotiation_listener(&self, _listener: Arc>>) {}
fn make_faulty_generation_sessions(&self) { unimplemented!("test-only") }
fn generation_session(&self, _session_id: &SessionId) -> Option> { unimplemented!("test-only") }
fn connect(&self) { unimplemented!("test-only") }
fn key_storage(&self) -> Arc { unimplemented!("test-only") }
}
impl DummyCluster {
pub fn new(id: NodeId) -> Self {
DummyCluster {
id: id,
data: RwLock::new(DummyClusterData::default())
}
}
pub fn node(&self) -> NodeId {
self.id.clone()
}
pub fn add_node(&self, node: NodeId) {
self.data.write().nodes.insert(node);
}
pub fn add_nodes>(&self, nodes: I) {
self.data.write().nodes.extend(nodes)
}
pub fn remove_node(&self, node: &NodeId) {
self.data.write().nodes.remove(node);
}
pub fn take_message(&self) -> Option<(NodeId, Message)> {
self.data.write().messages.pop_front()
}
}
impl Cluster for DummyCluster {
fn broadcast(&self, message: Message) -> Result<(), Error> {
let mut data = self.data.write();
let all_nodes: Vec<_> = data.nodes.iter().cloned().filter(|n| n != &self.id).collect();
for node in all_nodes {
data.messages.push_back((node, message.clone()));
}
Ok(())
}
fn send(&self, to: &NodeId, message: Message) -> Result<(), Error> {
debug_assert!(&self.id != to);
self.data.write().messages.push_back((to.clone(), message));
Ok(())
}
fn is_connected(&self, node: &NodeId) -> bool {
let data = self.data.read();
&self.id == node || data.nodes.contains(node)
}
fn nodes(&self) -> BTreeSet {
self.data.read().nodes.iter().cloned().collect()
}
fn configured_nodes_count(&self) -> usize {
self.data.read().nodes.len()
}
fn connected_nodes_count(&self) -> usize {
self.data.read().nodes.len()
}
}
/// Blocks the calling thread, looping until `predicate` returns `true` or
/// `timeout` has elapsed.
pub fn loop_until(executor: &Executor, timeout: Duration, predicate: F)
where F: Send + 'static + Fn() -> bool
{
use futures::Stream;
use tokio::timer::Interval;
let start = Instant::now();
let (complete_tx, complete_rx) = oneshot::channel();
executor.spawn(Interval::new_interval(Duration::from_millis(1))
.and_then(move |_| {
if Instant::now() - start > timeout {
panic!("no result in {:?}", timeout);
}
Ok(())
})
.take_while(move |_| future::ok(!predicate()))
.for_each(|_| Ok(()))
.then(|_| {
complete_tx.send(()).expect("receiver dropped");
future::ok::<(), ()>(())
})
);
complete_rx.wait().unwrap();
}
pub fn all_connections_established(cluster: &Arc) -> bool {
cluster.config().key_server_set.snapshot().new_set.keys()
.filter(|p| *p != cluster.config().self_key_pair.public())
.all(|p| cluster.connection(p).is_some())
}
pub fn make_clusters(runtime: &Runtime, ports_begin: u16, num_nodes: usize) -> Vec> {
let key_pairs: Vec<_> = (0..num_nodes).map(|_| Random.generate().unwrap()).collect();
let cluster_params: Vec<_> = (0..num_nodes).map(|i| ClusterConfiguration {
self_key_pair: Arc::new(PlainNodeKeyPair::new(key_pairs[i].clone())),
listen_address: ("127.0.0.1".to_owned(), ports_begin + i as u16),
key_server_set: Arc::new(MapKeyServerSet::new(false, key_pairs.iter().enumerate()
.map(|(j, kp)| (kp.public().clone(), format!("127.0.0.1:{}", ports_begin + j as u16).parse().unwrap()))
.collect())),
allow_connecting_to_higher_nodes: false,
key_storage: Arc::new(DummyKeyStorage::default()),
acl_storage: Arc::new(DummyAclStorage::default()),
admin_public: None,
auto_migrate_enabled: false,
}).collect();
let clusters: Vec<_> = cluster_params.into_iter().enumerate()
.map(|(_, params)| ClusterCore::new(runtime.executor(), params).unwrap())
.collect();
clusters
}
pub fn run_clusters(clusters: &[Arc]) {
for cluster in clusters {
cluster.run_listener().unwrap();
}
for cluster in clusters {
cluster.run_connections().unwrap();
}
}
pub fn shutdown_clusters(clusters: &[Arc]) {
for cluster in clusters {
cluster.shutdown()
}
}
/// Returns a new runtime with a static number of threads.
pub fn new_runtime() -> Runtime {
Runtime::with_thread_count(4)
}
#[test]
fn cluster_connects_to_other_nodes() {
let runtime = new_runtime();
let clusters = make_clusters(&runtime, 6010, 3);
run_clusters(&clusters);
let clusters_clone = clusters.clone();
loop_until(&runtime.executor(), TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
shutdown_clusters(&clusters);
}
#[test]
fn cluster_wont_start_generation_session_if_not_fully_connected() {
let runtime = new_runtime();
let clusters = make_clusters(&runtime, 6013, 3);
clusters[0].run().unwrap();
match clusters[0].client().new_generation_session(SessionId::default(), Default::default(), Default::default(), 1) {
Err(Error::NodeDisconnected) => (),
Err(e) => panic!("unexpected error {:?}", e),
_ => panic!("unexpected success"),
}
shutdown_clusters(&clusters);
}
#[test]
fn error_in_generation_session_broadcasted_to_all_other_nodes() {
//::logger::init_log();
let runtime = new_runtime();
let clusters = make_clusters(&runtime, 6016, 3);
run_clusters(&clusters);
let clusters_clone = clusters.clone();
loop_until(&runtime.executor(), TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
// ask one of nodes to produce faulty generation sessions
clusters[1].client().make_faulty_generation_sessions();
// start && wait for generation session to fail
let session = clusters[0].client().new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap();
let session_clone = session.clone();
let clusters_clone = clusters.clone();
loop_until(&runtime.executor(), TIMEOUT, move || session_clone.joint_public_and_secret().is_some()
&& clusters_clone[0].client().generation_session(&SessionId::default()).is_none());
assert!(session.joint_public_and_secret().unwrap().is_err());
// check that faulty session is either removed from all nodes, or nonexistent (already removed)
for i in 1..3 {
if let Some(session) = clusters[i].client().generation_session(&SessionId::default()) {
let session_clone = session.clone();
let clusters_clone = clusters.clone();
// wait for both session completion && session removal (session completion event is fired
// before session is removed from its own container by cluster)
loop_until(&runtime.executor(), TIMEOUT, move || session_clone.joint_public_and_secret().is_some()
&& clusters_clone[i].client().generation_session(&SessionId::default()).is_none());
assert!(session.joint_public_and_secret().unwrap().is_err());
}
}
shutdown_clusters(&clusters);
}
#[test]
fn generation_session_completion_signalled_if_failed_on_master() {
//::logger::init_log();
let runtime = new_runtime();
let clusters = make_clusters(&runtime, 6025, 3);
run_clusters(&clusters);
let clusters_clone = clusters.clone();
loop_until(&runtime.executor(), TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
// ask one of nodes to produce faulty generation sessions
clusters[0].client().make_faulty_generation_sessions();
// start && wait for generation session to fail
let session = clusters[0].client().new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap();
let session_clone = session.clone();
let clusters_clone = clusters.clone();
loop_until(&runtime.executor(), TIMEOUT, move || session_clone.joint_public_and_secret().is_some()
&& clusters_clone[0].client().generation_session(&SessionId::default()).is_none());
assert!(session.joint_public_and_secret().unwrap().is_err());
// check that faulty session is either removed from all nodes, or nonexistent (already removed)
for i in 1..3 {
if let Some(session) = clusters[i].client().generation_session(&SessionId::default()) {
let session_clone = session.clone();
let clusters_clone = clusters.clone();
// wait for both session completion && session removal (session completion event is fired
// before session is removed from its own container by cluster)
loop_until(&runtime.executor(), TIMEOUT, move || session_clone.joint_public_and_secret().is_some()
&& clusters_clone[i].client().generation_session(&SessionId::default()).is_none());
assert!(session.joint_public_and_secret().unwrap().is_err());
}
}
shutdown_clusters(&clusters);
}
#[test]
fn generation_session_is_removed_when_succeeded() {
//::logger::init_log();
let runtime = new_runtime();
let clusters = make_clusters(&runtime, 6019, 3);
run_clusters(&clusters);
let clusters_clone = clusters.clone();
loop_until(&runtime.executor(), TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
// start && wait for generation session to complete
let session = clusters[0].client().new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap();
let session_clone = session.clone();
let clusters_clone = clusters.clone();
loop_until(&runtime.executor(), TIMEOUT, move || (session_clone.state() == GenerationSessionState::Finished
|| session_clone.state() == GenerationSessionState::Failed)
&& clusters_clone[0].client().generation_session(&SessionId::default()).is_none());
assert!(session.joint_public_and_secret().unwrap().is_ok());
// check that on non-master nodes session is either:
// already removed
// or it is removed right after completion
for i in 1..3 {
if let Some(session) = clusters[i].client().generation_session(&SessionId::default()) {
// run to completion if completion message is still on the way
// AND check that it is actually removed from cluster sessions
let session_clone = session.clone();
let clusters_clone = clusters.clone();
loop_until(&runtime.executor(), TIMEOUT, move || (session_clone.state() == GenerationSessionState::Finished
|| session_clone.state() == GenerationSessionState::Failed)
&& clusters_clone[i].client().generation_session(&SessionId::default()).is_none());
}
}
shutdown_clusters(&clusters);
}
#[test]
fn sessions_are_removed_when_initialization_fails() {
let runtime = new_runtime();
let clusters = make_clusters(&runtime, 6022, 3);
run_clusters(&clusters);
let clusters_clone = clusters.clone();
loop_until(&runtime.executor(), TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
// generation session
{
// try to start generation session => fail in initialization
assert_eq!(clusters[0].client().new_generation_session(SessionId::default(), Default::default(), Default::default(), 100).map(|_| ()),
Err(Error::NotEnoughNodesForThreshold));
// try to start generation session => fails in initialization
assert_eq!(clusters[0].client().new_generation_session(SessionId::default(), Default::default(), Default::default(), 100).map(|_| ()),
Err(Error::NotEnoughNodesForThreshold));
assert!(clusters[0].data.sessions.generation_sessions.is_empty());
}
// decryption session
{
// try to start decryption session => fails in initialization
assert_eq!(clusters[0].client().new_decryption_session(Default::default(), Default::default(), Default::default(), Some(Default::default()), false, false).map(|_| ()),
Err(Error::InvalidMessage));
// try to start generation session => fails in initialization
assert_eq!(clusters[0].client().new_decryption_session(Default::default(), Default::default(), Default::default(), Some(Default::default()), false, false).map(|_| ()),
Err(Error::InvalidMessage));
assert!(clusters[0].data.sessions.decryption_sessions.is_empty());
assert!(clusters[0].data.sessions.negotiation_sessions.is_empty());
}
shutdown_clusters(&clusters);
}
// test ignored because of
//
// https://github.com/paritytech/parity-ethereum/issues/9635
#[test]
#[ignore]
fn schnorr_signing_session_completes_if_node_does_not_have_a_share() {
//::logger::init_log();
let runtime = new_runtime();
let clusters = make_clusters(&runtime, 6028, 3);
run_clusters(&clusters);
let clusters_clone = clusters.clone();
loop_until(&runtime.executor(), TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
// start && wait for generation session to complete
let session = clusters[0].client().new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap();
let session_clone = session.clone();
let clusters_clone = clusters.clone();
loop_until(&runtime.executor(), TIMEOUT, move || (session_clone.state() == GenerationSessionState::Finished
|| session_clone.state() == GenerationSessionState::Failed)
&& clusters_clone[0].client().generation_session(&SessionId::default()).is_none());
assert!(session.joint_public_and_secret().unwrap().is_ok());
// now remove share from node2
assert!((0..3).all(|i| clusters[i].data.sessions.generation_sessions.is_empty()));
clusters[2].data.config.key_storage.remove(&Default::default()).unwrap();
// and try to sign message with generated key
let signature = sign(Random.generate().unwrap().secret(), &Default::default()).unwrap();
let session0 = clusters[0].client().new_schnorr_signing_session(Default::default(), signature.into(), None, Default::default()).unwrap();
let session = clusters[0].data.sessions.schnorr_signing_sessions.first().unwrap();
let session_clone = session.clone();
let clusters_clone = clusters.clone();
loop_until(&runtime.executor(), TIMEOUT, move || session_clone.is_finished() && (0..3).all(|i|
clusters_clone[i].data.sessions.schnorr_signing_sessions.is_empty()));
session0.wait().unwrap();
// and try to sign message with generated key using node that has no key share
let signature = sign(Random.generate().unwrap().secret(), &Default::default()).unwrap();
let session2 = clusters[2].client().new_schnorr_signing_session(Default::default(), signature.into(), None, Default::default()).unwrap();
let session = clusters[2].data.sessions.schnorr_signing_sessions.first().unwrap();
let session_clone = session.clone();
let clusters_clone = clusters.clone();
loop_until(&runtime.executor(), TIMEOUT, move || session_clone.is_finished() && (0..3).all(|i|
clusters_clone[i].data.sessions.schnorr_signing_sessions.is_empty()));
session2.wait().unwrap();
// now remove share from node1
clusters[1].data.config.key_storage.remove(&Default::default()).unwrap();
// and try to sign message with generated key
let signature = sign(Random.generate().unwrap().secret(), &Default::default()).unwrap();
let session1 = clusters[0].client().new_schnorr_signing_session(Default::default(), signature.into(), None, Default::default()).unwrap();
let session = clusters[0].data.sessions.schnorr_signing_sessions.first().unwrap();
let session = session.clone();
loop_until(&runtime.executor(), TIMEOUT, move || session.is_finished());
session1.wait().unwrap_err();
shutdown_clusters(&clusters);
}
// test ignored because of
//
// https://github.com/paritytech/parity-ethereum/issues/9635
#[test]
#[ignore]
fn ecdsa_signing_session_completes_if_node_does_not_have_a_share() {
//::logger::init_log();
let runtime = new_runtime();
let clusters = make_clusters(&runtime, 6041, 4);
run_clusters(&clusters);
let clusters_clone = clusters.clone();
loop_until(&runtime.executor(), TIMEOUT, move || clusters_clone.iter().all(all_connections_established));
// start && wait for generation session to complete
let session = clusters[0].client().new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap();
let session_clone = session.clone();
let clusters_clone = clusters.clone();
loop_until(&runtime.executor(), TIMEOUT, move || (session_clone.state() == GenerationSessionState::Finished
|| session_clone.state() == GenerationSessionState::Failed)
&& clusters_clone[0].client().generation_session(&SessionId::default()).is_none());
assert!(session.joint_public_and_secret().unwrap().is_ok());
// now remove share from node2
assert!((0..3).all(|i| clusters[i].data.sessions.generation_sessions.is_empty()));
clusters[2].data.config.key_storage.remove(&Default::default()).unwrap();
// and try to sign message with generated key
let signature = sign(Random.generate().unwrap().secret(), &Default::default()).unwrap();
let session0 = clusters[0].client().new_ecdsa_signing_session(Default::default(), signature.into(), None, H256::random()).unwrap();
let session = clusters[0].data.sessions.ecdsa_signing_sessions.first().unwrap();
let session_clone = session.clone();
let clusters_clone = clusters.clone();
loop_until(&runtime.executor(), Duration::from_millis(1000), move || session_clone.is_finished() && (0..3).all(|i|
clusters_clone[i].data.sessions.ecdsa_signing_sessions.is_empty()));
session0.wait().unwrap();
// and try to sign message with generated key using node that has no key share
let signature = sign(Random.generate().unwrap().secret(), &Default::default()).unwrap();
let session2 = clusters[2].client().new_ecdsa_signing_session(Default::default(), signature.into(), None, H256::random()).unwrap();
let session = clusters[2].data.sessions.ecdsa_signing_sessions.first().unwrap();
let session_clone = session.clone();
let clusters_clone = clusters.clone();
loop_until(&runtime.executor(), Duration::from_millis(1000), move || session_clone.is_finished() && (0..3).all(|i|
clusters_clone[i].data.sessions.ecdsa_signing_sessions.is_empty()));
session2.wait().unwrap();
// now remove share from node1
clusters[1].data.config.key_storage.remove(&Default::default()).unwrap();
// and try to sign message with generated key
let signature = sign(Random.generate().unwrap().secret(), &Default::default()).unwrap();
let session1 = clusters[0].client().new_ecdsa_signing_session(Default::default(), signature.into(), None, H256::random()).unwrap();
let session = clusters[0].data.sessions.ecdsa_signing_sessions.first().unwrap();
loop_until(&runtime.executor(), Duration::from_millis(1000), move || session.is_finished());
session1.wait().unwrap_err();
shutdown_clusters(&clusters);
}
}