2018-06-04 10:19:50 +02:00
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
2017-03-13 12:54:56 +01:00
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
2017-04-03 11:13:51 +02:00
use std ::io ;
2018-04-02 10:47:56 +02:00
use std ::time ::{ Duration , Instant } ;
2017-07-06 14:02:10 +02:00
use std ::sync ::Arc ;
use std ::collections ::{ BTreeMap , BTreeSet } ;
2017-04-03 11:13:51 +02:00
use std ::collections ::btree_map ::Entry ;
use std ::net ::{ SocketAddr , IpAddr } ;
2017-10-02 15:27:31 +02:00
use futures ::{ finished , failed , Future , Stream } ;
2017-04-03 11:13:51 +02:00
use futures_cpupool ::CpuPool ;
use parking_lot ::{ RwLock , Mutex } ;
2017-04-08 11:26:16 +02:00
use tokio_io ::IoFuture ;
2017-04-25 21:34:03 +02:00
use tokio_core ::reactor ::{ Handle , Remote , Interval } ;
2017-04-03 11:13:51 +02:00
use tokio_core ::net ::{ TcpListener , TcpStream } ;
2018-04-03 16:54:34 +02:00
use ethkey ::{ Public , KeyPair , Signature , Random , Generator } ;
use ethereum_types ::{ Address , H256 } ;
2018-03-19 06:42:40 +01:00
use key_server_cluster ::{ Error , NodeId , SessionId , Requester , AclStorage , KeyStorage , KeyServerSet , NodeKeyPair } ;
2017-11-22 08:21:14 +01:00
use key_server_cluster ::cluster_sessions ::{ ClusterSession , AdminSession , ClusterSessions , SessionIdWithSubSession ,
2017-11-22 08:05:14 +01:00
ClusterSessionsContainer , SERVERS_SET_CHANGE_SESSION_ID , create_cluster_view , AdminSessionCreationData , ClusterSessionsListener } ;
2017-11-02 15:33:11 +01:00
use key_server_cluster ::cluster_sessions_creator ::{ ClusterSessionCreator , IntoSessionId } ;
use key_server_cluster ::message ::{ self , Message , ClusterMessage } ;
2017-11-22 08:21:14 +01:00
use key_server_cluster ::generation_session ::{ SessionImpl as GenerationSession } ;
use key_server_cluster ::decryption_session ::{ SessionImpl as DecryptionSession } ;
use key_server_cluster ::encryption_session ::{ SessionImpl as EncryptionSession } ;
2018-03-01 09:59:21 +01:00
use key_server_cluster ::signing_session_ecdsa ::{ SessionImpl as EcdsaSigningSession } ;
use key_server_cluster ::signing_session_schnorr ::{ SessionImpl as SchnorrSigningSession } ;
2017-11-22 08:05:14 +01:00
use key_server_cluster ::key_version_negotiation_session ::{ SessionImpl as KeyVersionNegotiationSession ,
2017-11-22 08:21:14 +01:00
IsolatedSessionTransport as KeyVersionNegotiationSessionTransport , ContinueAction } ;
2017-04-03 11:13:51 +02:00
use key_server_cluster ::io ::{ DeadlineStatus , ReadMessage , SharedTcpStream , read_encrypted_message , WriteMessage , write_encrypted_message } ;
use key_server_cluster ::net ::{ accept_connection as net_accept_connection , connect as net_connect , Connection as NetConnection } ;
2018-01-10 11:33:45 +01:00
use key_server_cluster ::connection_trigger ::{ Maintain , ConnectionTrigger , SimpleConnectionTrigger , ServersSetChangeSessionCreatorConnector } ;
use key_server_cluster ::connection_trigger_with_migration ::ConnectionTriggerWithMigration ;
2017-04-03 11:13:51 +02:00
2017-04-25 21:34:03 +02:00
/// Maintain interval (seconds). Every MAINTAIN_INTERVAL seconds node:
/// 1) checks if connected nodes are responding to KeepAlive messages
/// 2) tries to connect to disconnected nodes
/// 3) checks if enc/dec sessions are time-outed
const MAINTAIN_INTERVAL : u64 = 10 ;
/// When no messages have been received from node within KEEP_ALIVE_SEND_INTERVAL seconds,
/// we must send KeepAlive message to the node to check if it still responds to messages.
2018-04-02 10:47:56 +02:00
const KEEP_ALIVE_SEND_INTERVAL : Duration = Duration ::from_secs ( 30 ) ;
2017-04-25 21:34:03 +02:00
/// When no messages have been received from node within KEEP_ALIVE_DISCONNECT_INTERVAL seconds,
/// we must treat this node as non-responding && disconnect from it.
2018-04-02 10:47:56 +02:00
const KEEP_ALIVE_DISCONNECT_INTERVAL : Duration = Duration ::from_secs ( 60 ) ;
2017-04-25 21:34:03 +02:00
/// Empty future.
2018-01-10 11:33:45 +01:00
pub type BoxedEmptyFuture = Box < Future < Item = ( ) , Error = ( ) > + Send > ;
2017-04-03 11:13:51 +02:00
/// Cluster interface for external clients.
pub trait ClusterClient : Send + Sync {
/// Get cluster state.
fn cluster_state ( & self ) -> ClusterState ;
2017-07-06 14:02:10 +02:00
/// Start new generation session.
2018-04-03 16:54:34 +02:00
fn new_generation_session ( & self , session_id : SessionId , origin : Option < Address > , author : Address , threshold : usize ) -> Result < Arc < GenerationSession > , Error > ;
2017-04-03 11:13:51 +02:00
/// Start new encryption session.
2018-04-03 16:54:34 +02:00
fn new_encryption_session ( & self , session_id : SessionId , author : Requester , common_point : Public , encrypted_point : Public ) -> Result < Arc < EncryptionSession > , Error > ;
2017-04-03 11:13:51 +02:00
/// Start new decryption session.
2018-04-03 16:54:34 +02:00
fn new_decryption_session ( & self , session_id : SessionId , origin : Option < Address > , requester : Requester , version : Option < H256 > , is_shadow_decryption : bool , is_broadcast_decryption : bool ) -> Result < Arc < DecryptionSession > , Error > ;
2018-03-19 06:42:40 +01:00
/// Start new Schnorr signing session.
fn new_schnorr_signing_session ( & self , session_id : SessionId , requester : Requester , version : Option < H256 > , message_hash : H256 ) -> Result < Arc < SchnorrSigningSession > , Error > ;
2018-03-01 09:59:21 +01:00
/// Start new ECDSA session.
2018-03-19 06:42:40 +01:00
fn new_ecdsa_signing_session ( & self , session_id : SessionId , requester : Requester , version : Option < H256 > , message_hash : H256 ) -> Result < Arc < EcdsaSigningSession > , Error > ;
2017-11-02 15:33:11 +01:00
/// Start new key version negotiation session.
2017-11-22 08:05:14 +01:00
fn new_key_version_negotiation_session ( & self , session_id : SessionId ) -> Result < Arc < KeyVersionNegotiationSession < KeyVersionNegotiationSessionTransport > > , Error > ;
2017-10-02 15:27:31 +02:00
/// Start new servers set change session.
2018-01-10 11:33:45 +01:00
fn new_servers_set_change_session ( & self , session_id : Option < SessionId > , migration_id : Option < H256 > , new_nodes_set : BTreeSet < NodeId > , old_set_signature : Signature , new_set_signature : Signature ) -> Result < Arc < AdminSession > , Error > ;
2017-04-25 21:34:03 +02:00
2017-11-22 08:05:14 +01:00
/// Listen for new generation sessions.
fn add_generation_listener ( & self , listener : Arc < ClusterSessionsListener < GenerationSession > > ) ;
2018-04-03 16:54:34 +02:00
/// Listen for new decryption sessions.
fn add_decryption_listener ( & self , listener : Arc < ClusterSessionsListener < DecryptionSession > > ) ;
2018-06-14 09:01:52 +02:00
/// Listen for new key version negotiation sessions.
fn add_key_version_negotiation_listener ( & self , listener : Arc < ClusterSessionsListener < KeyVersionNegotiationSession < KeyVersionNegotiationSessionTransport > > > ) ;
2017-11-22 08:05:14 +01:00
2017-07-06 14:02:10 +02:00
/// Ask node to make 'faulty' generation sessions.
2017-04-25 21:34:03 +02:00
#[ cfg(test) ]
2017-09-06 11:09:22 +02:00
fn make_faulty_generation_sessions ( & self ) ;
2017-07-06 14:02:10 +02:00
/// Get active generation session with given id.
2017-04-25 21:34:03 +02:00
#[ cfg(test) ]
2017-11-22 08:05:14 +01:00
fn generation_session ( & self , session_id : & SessionId ) -> Option < Arc < GenerationSession > > ;
2017-04-25 21:34:03 +02:00
/// Try connect to disconnected nodes.
2017-09-06 11:09:22 +02:00
#[ cfg(test) ]
2017-04-25 21:34:03 +02:00
fn connect ( & self ) ;
2017-11-02 15:33:11 +01:00
/// Get key storage.
#[ cfg(test) ]
fn key_storage ( & self ) -> Arc < KeyStorage > ;
2017-04-03 11:13:51 +02:00
}
2017-03-13 12:54:56 +01:00
2017-10-02 15:27:31 +02:00
/// Cluster access for single session participant.
2017-04-03 11:13:51 +02:00
pub trait Cluster : Send + Sync {
2017-03-13 12:54:56 +01:00
/// Broadcast message to all other nodes.
fn broadcast ( & self , message : Message ) -> Result < ( ) , Error > ;
/// Send message to given node.
fn send ( & self , to : & NodeId , message : Message ) -> Result < ( ) , Error > ;
2017-10-03 10:35:31 +02:00
/// Is connected to given node?
fn is_connected ( & self , node : & NodeId ) -> bool ;
/// Get a set of connected nodes.
fn nodes ( & self ) -> BTreeSet < NodeId > ;
2018-05-01 15:02:14 +02:00
/// Get total count of configured key server nodes (valid at the time of ClusterView creation).
fn configured_nodes_count ( & self ) -> usize ;
/// Get total count of connected key server nodes (valid at the time of ClusterView creation).
fn connected_nodes_count ( & self ) -> usize ;
2017-03-13 12:54:56 +01:00
}
2017-04-03 11:13:51 +02:00
/// Cluster initialization parameters.
2017-09-06 11:09:22 +02:00
#[ derive(Clone) ]
2017-04-03 11:13:51 +02:00
pub struct ClusterConfiguration {
/// Number of threads reserved by cluster.
pub threads : usize ,
/// Allow connecting to 'higher' nodes.
pub allow_connecting_to_higher_nodes : bool ,
/// KeyPair this node holds.
2017-07-25 08:24:54 +02:00
pub self_key_pair : Arc < NodeKeyPair > ,
2017-04-03 11:13:51 +02:00
/// Interface to listen to.
pub listen_address : ( String , u16 ) ,
2017-07-19 10:35:17 +02:00
/// Cluster nodes set.
pub key_server_set : Arc < KeyServerSet > ,
2017-04-03 11:13:51 +02:00
/// Reference to key storage
pub key_storage : Arc < KeyStorage > ,
/// Reference to ACL storage
pub acl_storage : Arc < AclStorage > ,
2017-10-02 15:27:31 +02:00
/// Administrator public key.
pub admin_public : Option < Public > ,
2018-01-10 11:33:45 +01:00
/// Should key servers set change session should be started when servers set changes.
/// This will only work when servers set is configured using KeyServerSet contract.
pub auto_migrate_enabled : bool ,
2017-04-03 11:13:51 +02:00
}
/// Cluster state.
pub struct ClusterState {
/// Nodes, to which connections are established.
pub connected : BTreeSet < NodeId > ,
}
/// Network cluster implementation.
pub struct ClusterCore {
/// Handle to the event loop.
handle : Handle ,
/// Listen address.
listen_address : SocketAddr ,
/// Cluster data.
data : Arc < ClusterData > ,
}
/// Network cluster client interface implementation.
pub struct ClusterClientImpl {
/// Cluster data.
data : Arc < ClusterData > ,
}
/// Network cluster view. It is a communication channel, required in single session.
pub struct ClusterView {
core : Arc < Mutex < ClusterViewCore > > ,
2018-05-01 15:02:14 +02:00
configured_nodes_count : usize ,
connected_nodes_count : usize ,
2017-04-03 11:13:51 +02:00
}
/// Cross-thread shareable cluster data.
pub struct ClusterData {
/// Cluster configuration.
2017-11-02 15:33:11 +01:00
pub config : ClusterConfiguration ,
2017-04-03 11:13:51 +02:00
/// Handle to the event loop.
2017-11-02 15:33:11 +01:00
pub handle : Remote ,
2017-04-03 11:13:51 +02:00
/// Handle to the cpu thread pool.
2017-11-02 15:33:11 +01:00
pub pool : CpuPool ,
2017-04-03 11:13:51 +02:00
/// KeyPair this node holds.
2017-11-02 15:33:11 +01:00
pub self_key_pair : Arc < NodeKeyPair > ,
2017-04-03 11:13:51 +02:00
/// Connections data.
2017-11-02 15:33:11 +01:00
pub connections : ClusterConnections ,
2017-04-03 11:13:51 +02:00
/// Active sessions data.
2017-11-02 15:33:11 +01:00
pub sessions : ClusterSessions ,
2017-04-03 11:13:51 +02:00
}
2018-01-10 11:33:45 +01:00
/// Connections that are forming the cluster. Lock order: trigger.lock() -> data.lock().
2017-04-03 11:13:51 +02:00
pub struct ClusterConnections {
/// Self node id.
pub self_node_id : NodeId ,
/// All known other key servers.
2017-07-19 11:36:40 +02:00
pub key_server_set : Arc < KeyServerSet > ,
2018-01-10 11:33:45 +01:00
/// Connections trigger.
pub trigger : Mutex < Box < ConnectionTrigger > > ,
/// Servers set change session creator connector.
pub connector : Arc < ServersSetChangeSessionCreatorConnector > ,
2017-07-19 11:36:40 +02:00
/// Connections data.
pub data : RwLock < ClusterConnectionsData > ,
}
/// Cluster connections data.
pub struct ClusterConnectionsData {
2018-06-14 09:01:52 +02:00
/// Is this node isolated from cluster?
pub is_isolated : bool ,
2017-07-19 11:36:40 +02:00
/// Active key servers set.
pub nodes : BTreeMap < Public , SocketAddr > ,
2017-04-03 11:13:51 +02:00
/// Active connections to key servers.
2017-07-19 11:36:40 +02:00
pub connections : BTreeMap < NodeId , Arc < Connection > > ,
2017-04-03 11:13:51 +02:00
}
/// Cluster view core.
struct ClusterViewCore {
/// Cluster reference.
cluster : Arc < ClusterData > ,
/// Subset of nodes, required for this session.
nodes : BTreeSet < NodeId > ,
}
/// Connection to single node.
pub struct Connection {
/// Node id.
node_id : NodeId ,
/// Node address.
node_address : SocketAddr ,
/// Is inbound connection?
is_inbound : bool ,
/// Tcp stream.
stream : SharedTcpStream ,
/// Connection key.
2017-04-08 11:26:16 +02:00
key : KeyPair ,
2017-04-03 11:13:51 +02:00
/// Last message time.
2018-04-02 10:47:56 +02:00
last_message_time : Mutex < Instant > ,
2017-04-03 11:13:51 +02:00
}
impl ClusterCore {
pub fn new ( handle : Handle , config : ClusterConfiguration ) -> Result < Arc < Self > , Error > {
let listen_address = make_socket_address ( & config . listen_address . 0 , config . listen_address . 1 ) ? ;
let connections = ClusterConnections ::new ( & config ) ? ;
2018-01-10 11:33:45 +01:00
let servers_set_change_creator_connector = connections . connector . clone ( ) ;
let sessions = ClusterSessions ::new ( & config , servers_set_change_creator_connector ) ;
2017-04-03 11:13:51 +02:00
let data = ClusterData ::new ( & handle , config , connections , sessions ) ;
Ok ( Arc ::new ( ClusterCore {
handle : handle ,
listen_address : listen_address ,
data : data ,
} ) )
}
/// Create new client interface.
pub fn client ( & self ) -> Arc < ClusterClient > {
Arc ::new ( ClusterClientImpl ::new ( self . data . clone ( ) ) )
}
/// Get cluster configuration.
2017-09-06 11:09:22 +02:00
#[ cfg(test) ]
2017-04-03 11:13:51 +02:00
pub fn config ( & self ) -> & ClusterConfiguration {
& self . data . config
}
/// Get connection to given node.
2017-09-06 11:09:22 +02:00
#[ cfg(test) ]
2017-04-03 11:13:51 +02:00
pub fn connection ( & self , node : & NodeId ) -> Option < Arc < Connection > > {
self . data . connection ( node )
}
2017-04-25 21:34:03 +02:00
/// Run cluster.
2017-04-03 11:13:51 +02:00
pub fn run ( & self ) -> Result < ( ) , Error > {
2017-04-25 21:34:03 +02:00
self . run_listener ( )
. and_then ( | _ | self . run_connections ( ) ) ? ;
2017-04-03 11:13:51 +02:00
// schedule maintain procedures
ClusterCore ::schedule_maintain ( & self . handle , self . data . clone ( ) ) ;
2017-04-25 21:34:03 +02:00
Ok ( ( ) )
}
/// Start listening for incoming connections.
pub fn run_listener ( & self ) -> Result < ( ) , Error > {
// start listeining for incoming connections
2017-04-03 11:13:51 +02:00
self . handle . spawn ( ClusterCore ::listen ( & self . handle , self . data . clone ( ) , self . listen_address . clone ( ) ) ? ) ;
2017-04-25 21:34:03 +02:00
Ok ( ( ) )
}
2017-04-03 11:13:51 +02:00
2017-04-25 21:34:03 +02:00
/// Start connecting to other nodes.
pub fn run_connections ( & self ) -> Result < ( ) , Error > {
// try to connect to every other peer
ClusterCore ::connect_disconnected_nodes ( self . data . clone ( ) ) ;
2017-04-03 11:13:51 +02:00
Ok ( ( ) )
}
/// Connect to peer.
fn connect ( data : Arc < ClusterData > , node_address : SocketAddr ) {
data . handle . clone ( ) . spawn ( move | handle | {
data . pool . clone ( ) . spawn ( ClusterCore ::connect_future ( handle , data , node_address ) )
} )
}
/// Connect to socket using given context and handle.
fn connect_future ( handle : & Handle , data : Arc < ClusterData > , node_address : SocketAddr ) -> BoxedEmptyFuture {
let disconnected_nodes = data . connections . disconnected_nodes ( ) . keys ( ) . cloned ( ) . collect ( ) ;
2017-10-02 15:27:31 +02:00
Box ::new ( net_connect ( & node_address , handle , data . self_key_pair . clone ( ) , disconnected_nodes )
2017-07-26 13:09:41 +02:00
. then ( move | result | ClusterCore ::process_connection_result ( data , Some ( node_address ) , result ) )
2017-10-02 15:27:31 +02:00
. then ( | _ | finished ( ( ) ) ) )
2017-04-03 11:13:51 +02:00
}
/// Start listening for incoming connections.
fn listen ( handle : & Handle , data : Arc < ClusterData > , listen_address : SocketAddr ) -> Result < BoxedEmptyFuture , Error > {
2017-10-02 15:27:31 +02:00
Ok ( Box ::new ( TcpListener ::bind ( & listen_address , & handle ) ?
2017-04-03 11:13:51 +02:00
. incoming ( )
. and_then ( move | ( stream , node_address ) | {
ClusterCore ::accept_connection ( data . clone ( ) , stream , node_address ) ;
Ok ( ( ) )
} )
. for_each ( | _ | Ok ( ( ) ) )
2017-10-02 15:27:31 +02:00
. then ( | _ | finished ( ( ) ) ) ) )
2017-04-03 11:13:51 +02:00
}
/// Accept connection.
fn accept_connection ( data : Arc < ClusterData > , stream : TcpStream , node_address : SocketAddr ) {
data . handle . clone ( ) . spawn ( move | handle | {
data . pool . clone ( ) . spawn ( ClusterCore ::accept_connection_future ( handle , data , stream , node_address ) )
} )
}
/// Accept connection future.
fn accept_connection_future ( handle : & Handle , data : Arc < ClusterData > , stream : TcpStream , node_address : SocketAddr ) -> BoxedEmptyFuture {
2017-10-02 15:27:31 +02:00
Box ::new ( net_accept_connection ( node_address , stream , handle , data . self_key_pair . clone ( ) )
2017-07-26 13:09:41 +02:00
. then ( move | result | ClusterCore ::process_connection_result ( data , None , result ) )
2017-10-02 15:27:31 +02:00
. then ( | _ | finished ( ( ) ) ) )
2017-04-03 11:13:51 +02:00
}
/// Schedule mainatain procedures.
fn schedule_maintain ( handle : & Handle , data : Arc < ClusterData > ) {
2017-04-25 21:34:03 +02:00
let d = data . clone ( ) ;
2018-04-02 10:47:56 +02:00
let interval : BoxedEmptyFuture = Box ::new ( Interval ::new ( Duration ::new ( MAINTAIN_INTERVAL , 0 ) , handle )
2017-04-03 11:13:51 +02:00
. expect ( " failed to create interval " )
2017-04-25 21:34:03 +02:00
. and_then ( move | _ | Ok ( ClusterCore ::maintain ( data . clone ( ) ) ) )
2017-04-03 11:13:51 +02:00
. for_each ( | _ | Ok ( ( ) ) )
2017-10-02 15:27:31 +02:00
. then ( | _ | finished ( ( ) ) ) ) ;
2017-04-03 11:13:51 +02:00
2017-04-25 21:34:03 +02:00
d . spawn ( interval ) ;
}
/// Execute maintain procedures.
fn maintain ( data : Arc < ClusterData > ) {
trace! ( target : " secretstore_net " , " {}: executing maintain procedures " , data . self_key_pair . public ( ) ) ;
ClusterCore ::keep_alive ( data . clone ( ) ) ;
ClusterCore ::connect_disconnected_nodes ( data . clone ( ) ) ;
data . sessions . stop_stalled_sessions ( ) ;
2017-04-03 11:13:51 +02:00
}
/// Called for every incomming mesage.
fn process_connection_messages ( data : Arc < ClusterData > , connection : Arc < Connection > ) -> IoFuture < Result < ( ) , Error > > {
2017-10-02 15:27:31 +02:00
Box ::new ( connection
2017-04-03 11:13:51 +02:00
. read_message ( )
. then ( move | result |
match result {
Ok ( ( _ , Ok ( message ) ) ) = > {
ClusterCore ::process_connection_message ( data . clone ( ) , connection . clone ( ) , message ) ;
// continue serving connection
data . spawn ( ClusterCore ::process_connection_messages ( data . clone ( ) , connection ) ) ;
2017-10-02 15:27:31 +02:00
Box ::new ( finished ( Ok ( ( ) ) ) )
2017-04-03 11:13:51 +02:00
} ,
Ok ( ( _ , Err ( err ) ) ) = > {
2017-09-14 19:29:01 +02:00
warn! ( target : " secretstore_net " , " {}: protocol error '{}' when reading message from node {} " , data . self_key_pair . public ( ) , err , connection . node_id ( ) ) ;
2017-04-03 11:13:51 +02:00
// continue serving connection
data . spawn ( ClusterCore ::process_connection_messages ( data . clone ( ) , connection ) ) ;
2017-10-02 15:27:31 +02:00
Box ::new ( finished ( Err ( err ) ) )
2017-04-03 11:13:51 +02:00
} ,
Err ( err ) = > {
2017-09-14 19:29:01 +02:00
warn! ( target : " secretstore_net " , " {}: network error '{}' when reading message from node {} " , data . self_key_pair . public ( ) , err , connection . node_id ( ) ) ;
2017-04-03 11:13:51 +02:00
// close connection
2018-01-10 11:33:45 +01:00
data . connections . remove ( data . clone ( ) , connection . node_id ( ) , connection . is_inbound ( ) ) ;
2017-10-02 15:27:31 +02:00
Box ::new ( failed ( err ) )
2017-04-03 11:13:51 +02:00
} ,
}
2017-10-02 15:27:31 +02:00
) )
2017-04-03 11:13:51 +02:00
}
/// Send keepalive messages to every othe node.
fn keep_alive ( data : Arc < ClusterData > ) {
2017-10-05 22:38:23 +02:00
data . sessions . sessions_keep_alive ( ) ;
2017-04-03 11:13:51 +02:00
for connection in data . connections . active_connections ( ) {
2018-04-02 10:47:56 +02:00
let last_message_diff = Instant ::now ( ) - connection . last_message_time ( ) ;
if last_message_diff > KEEP_ALIVE_DISCONNECT_INTERVAL {
2018-06-14 09:01:52 +02:00
warn! ( target : " secretstore_net " , " {}: keep alive timeout for node {} " ,
data . self_key_pair . public ( ) , connection . node_id ( ) ) ;
2018-01-10 11:33:45 +01:00
data . connections . remove ( data . clone ( ) , connection . node_id ( ) , connection . is_inbound ( ) ) ;
2017-04-03 11:13:51 +02:00
data . sessions . on_connection_timeout ( connection . node_id ( ) ) ;
}
2018-04-02 10:47:56 +02:00
else if last_message_diff > KEEP_ALIVE_SEND_INTERVAL {
2017-04-03 11:13:51 +02:00
data . spawn ( connection . send_message ( Message ::Cluster ( ClusterMessage ::KeepAlive ( message ::KeepAlive { } ) ) ) ) ;
}
}
}
/// Try to connect to every disconnected node.
fn connect_disconnected_nodes ( data : Arc < ClusterData > ) {
2018-01-10 11:33:45 +01:00
let r = data . connections . update_nodes_set ( data . clone ( ) ) ;
if let Some ( r ) = r {
data . spawn ( r ) ;
2017-10-02 15:27:31 +02:00
}
2018-01-10 11:33:45 +01:00
// connect to disconnected nodes
2017-04-03 11:13:51 +02:00
for ( node_id , node_address ) in data . connections . disconnected_nodes ( ) {
if data . config . allow_connecting_to_higher_nodes | | data . self_key_pair . public ( ) < & node_id {
ClusterCore ::connect ( data . clone ( ) , node_address ) ;
}
}
}
/// Process connection future result.
2017-07-26 13:09:41 +02:00
fn process_connection_result ( data : Arc < ClusterData > , outbound_addr : Option < SocketAddr > , result : Result < DeadlineStatus < Result < NetConnection , Error > > , io ::Error > ) -> IoFuture < Result < ( ) , Error > > {
2017-04-03 11:13:51 +02:00
match result {
Ok ( DeadlineStatus ::Meet ( Ok ( connection ) ) ) = > {
2017-07-26 13:09:41 +02:00
let connection = Connection ::new ( outbound_addr . is_none ( ) , connection ) ;
2018-01-10 11:33:45 +01:00
if data . connections . insert ( data . clone ( ) , connection . clone ( ) ) {
2017-04-03 11:13:51 +02:00
ClusterCore ::process_connection_messages ( data . clone ( ) , connection )
} else {
2017-10-02 15:27:31 +02:00
Box ::new ( finished ( Ok ( ( ) ) ) )
2017-04-03 11:13:51 +02:00
}
} ,
2017-07-20 11:19:29 +02:00
Ok ( DeadlineStatus ::Meet ( Err ( err ) ) ) = > {
2017-09-14 19:29:01 +02:00
warn! ( target : " secretstore_net " , " {}: protocol error '{}' when establishing {} connection{} " ,
2017-07-26 13:09:41 +02:00
data . self_key_pair . public ( ) , err , if outbound_addr . is_some ( ) { " outbound " } else { " inbound " } ,
outbound_addr . map ( | a | format! ( " with {} " , a ) ) . unwrap_or_default ( ) ) ;
2017-10-02 15:27:31 +02:00
Box ::new ( finished ( Ok ( ( ) ) ) )
2017-04-03 11:13:51 +02:00
} ,
Ok ( DeadlineStatus ::Timeout ) = > {
2017-07-26 13:09:41 +02:00
warn! ( target : " secretstore_net " , " {}: timeout when establishing {} connection{} " ,
data . self_key_pair . public ( ) , if outbound_addr . is_some ( ) { " outbound " } else { " inbound " } ,
outbound_addr . map ( | a | format! ( " with {} " , a ) ) . unwrap_or_default ( ) ) ;
2017-10-02 15:27:31 +02:00
Box ::new ( finished ( Ok ( ( ) ) ) )
2017-04-03 11:13:51 +02:00
} ,
2017-07-20 11:19:29 +02:00
Err ( err ) = > {
2017-09-14 19:29:01 +02:00
warn! ( target : " secretstore_net " , " {}: network error '{}' when establishing {} connection{} " ,
2017-07-26 13:09:41 +02:00
data . self_key_pair . public ( ) , err , if outbound_addr . is_some ( ) { " outbound " } else { " inbound " } ,
outbound_addr . map ( | a | format! ( " with {} " , a ) ) . unwrap_or_default ( ) ) ;
2017-10-02 15:27:31 +02:00
Box ::new ( finished ( Ok ( ( ) ) ) )
2017-04-03 11:13:51 +02:00
} ,
}
}
/// Process single message from the connection.
fn process_connection_message ( data : Arc < ClusterData > , connection : Arc < Connection > , message : Message ) {
2018-04-02 10:47:56 +02:00
connection . set_last_message_time ( Instant ::now ( ) ) ;
2017-04-25 21:34:03 +02:00
trace! ( target : " secretstore_net " , " {}: received message {} from {} " , data . self_key_pair . public ( ) , message , connection . node_id ( ) ) ;
2017-11-02 15:33:11 +01:00
// error is ignored as we only process errors on session level
2017-04-03 11:13:51 +02:00
match message {
2017-11-02 15:33:11 +01:00
Message ::Generation ( message ) = > Self ::process_message ( & data , & data . sessions . generation_sessions , connection , Message ::Generation ( message ) )
. map ( | _ | ( ) ) . unwrap_or_default ( ) ,
Message ::Encryption ( message ) = > Self ::process_message ( & data , & data . sessions . encryption_sessions , connection , Message ::Encryption ( message ) )
. map ( | _ | ( ) ) . unwrap_or_default ( ) ,
Message ::Decryption ( message ) = > Self ::process_message ( & data , & data . sessions . decryption_sessions , connection , Message ::Decryption ( message ) )
. map ( | _ | ( ) ) . unwrap_or_default ( ) ,
2018-03-01 09:59:21 +01:00
Message ::SchnorrSigning ( message ) = > Self ::process_message ( & data , & data . sessions . schnorr_signing_sessions , connection , Message ::SchnorrSigning ( message ) )
. map ( | _ | ( ) ) . unwrap_or_default ( ) ,
Message ::EcdsaSigning ( message ) = > Self ::process_message ( & data , & data . sessions . ecdsa_signing_sessions , connection , Message ::EcdsaSigning ( message ) )
2017-11-02 15:33:11 +01:00
. map ( | _ | ( ) ) . unwrap_or_default ( ) ,
2018-01-10 11:33:45 +01:00
Message ::ServersSetChange ( message ) = > {
let message = Message ::ServersSetChange ( message ) ;
let is_initialization_message = message . is_initialization_message ( ) ;
let session = Self ::process_message ( & data , & data . sessions . admin_sessions , connection , message ) ;
if is_initialization_message {
if let Some ( session ) = session {
data . connections . servers_set_change_creator_connector ( ) . set_key_servers_set_change_session ( session . clone ( ) ) ;
}
}
}
2017-11-02 15:33:11 +01:00
Message ::KeyVersionNegotiation ( message ) = > {
let session = Self ::process_message ( & data , & data . sessions . negotiation_sessions , connection , Message ::KeyVersionNegotiation ( message ) ) ;
Self ::try_continue_session ( & data , session ) ;
} ,
Message ::ShareAdd ( message ) = > Self ::process_message ( & data , & data . sessions . admin_sessions , connection , Message ::ShareAdd ( message ) )
. map ( | _ | ( ) ) . unwrap_or_default ( ) ,
2017-04-03 11:13:51 +02:00
Message ::Cluster ( message ) = > ClusterCore ::process_cluster_message ( data , connection , message ) ,
}
}
2017-11-02 15:33:11 +01:00
/// Try to contnue session.
2017-11-22 08:05:14 +01:00
fn try_continue_session ( data : & Arc < ClusterData > , session : Option < Arc < KeyVersionNegotiationSession < KeyVersionNegotiationSessionTransport > > > ) {
2017-11-02 15:33:11 +01:00
if let Some ( session ) = session {
let meta = session . meta ( ) ;
let is_master_node = meta . self_node_id = = meta . master_node_id ;
if is_master_node & & session . is_finished ( ) {
data . sessions . negotiation_sessions . remove ( & session . id ( ) ) ;
2017-12-19 09:02:13 +01:00
match session . wait ( ) {
2018-06-14 09:01:52 +02:00
Ok ( Some ( ( version , master ) ) ) = > match session . take_continue_action ( ) {
2018-04-03 16:54:34 +02:00
Some ( ContinueAction ::Decrypt ( session , origin , is_shadow_decryption , is_broadcast_decryption ) ) = > {
2017-11-02 15:33:11 +01:00
let initialization_error = if data . self_key_pair . public ( ) = = & master {
2018-04-03 16:54:34 +02:00
session . initialize ( origin , version , is_shadow_decryption , is_broadcast_decryption )
2017-11-02 15:33:11 +01:00
} else {
2018-04-03 16:54:34 +02:00
session . delegate ( master , origin , version , is_shadow_decryption , is_broadcast_decryption )
2017-11-02 15:33:11 +01:00
} ;
if let Err ( error ) = initialization_error {
session . on_session_error ( & meta . self_node_id , error ) ;
data . sessions . decryption_sessions . remove ( & session . id ( ) ) ;
}
2017-07-06 14:02:10 +02:00
} ,
2018-03-01 09:59:21 +01:00
Some ( ContinueAction ::SchnorrSign ( session , message_hash ) ) = > {
let initialization_error = if data . self_key_pair . public ( ) = = & master {
session . initialize ( version , message_hash )
} else {
session . delegate ( master , version , message_hash )
} ;
if let Err ( error ) = initialization_error {
session . on_session_error ( & meta . self_node_id , error ) ;
data . sessions . schnorr_signing_sessions . remove ( & session . id ( ) ) ;
}
} ,
Some ( ContinueAction ::EcdsaSign ( session , message_hash ) ) = > {
2017-11-02 15:33:11 +01:00
let initialization_error = if data . self_key_pair . public ( ) = = & master {
session . initialize ( version , message_hash )
} else {
session . delegate ( master , version , message_hash )
} ;
if let Err ( error ) = initialization_error {
session . on_session_error ( & meta . self_node_id , error ) ;
2018-03-01 09:59:21 +01:00
data . sessions . ecdsa_signing_sessions . remove ( & session . id ( ) ) ;
2017-11-02 15:33:11 +01:00
}
2017-04-03 11:13:51 +02:00
} ,
2017-11-02 15:33:11 +01:00
None = > ( ) ,
2017-12-19 09:02:13 +01:00
} ,
2018-06-14 09:01:52 +02:00
Ok ( None ) = > unreachable! ( " is_master_node; session is finished; negotiation version always finished with result on master; qed " ) ,
2018-02-14 10:49:56 +01:00
Err ( error ) = > match session . take_continue_action ( ) {
2018-04-03 16:54:34 +02:00
Some ( ContinueAction ::Decrypt ( session , _ , _ , _ ) ) = > {
2017-12-19 09:02:13 +01:00
session . on_session_error ( & meta . self_node_id , error ) ;
2018-06-14 09:01:52 +02:00
data . sessions . decryption_sessions . remove ( & session . id ( ) ) ;
2017-07-06 14:02:10 +02:00
} ,
2018-03-01 09:59:21 +01:00
Some ( ContinueAction ::SchnorrSign ( session , _ ) ) = > {
session . on_session_error ( & meta . self_node_id , error ) ;
2018-06-14 09:01:52 +02:00
data . sessions . schnorr_signing_sessions . remove ( & session . id ( ) ) ;
2018-03-01 09:59:21 +01:00
} ,
Some ( ContinueAction ::EcdsaSign ( session , _ ) ) = > {
2017-12-19 09:02:13 +01:00
session . on_session_error ( & meta . self_node_id , error ) ;
2018-06-14 09:01:52 +02:00
data . sessions . ecdsa_signing_sessions . remove ( & session . id ( ) ) ;
2017-04-03 11:13:51 +02:00
} ,
2017-11-02 15:33:11 +01:00
None = > ( ) ,
2017-12-19 09:02:13 +01:00
} ,
2017-10-02 15:27:31 +02:00
}
}
}
}
2017-11-02 15:33:11 +01:00
/// Get or insert new session.
fn prepare_session < S : ClusterSession , SC : ClusterSessionCreator < S , D > , D > ( data : & Arc < ClusterData > , sessions : & ClusterSessionsContainer < S , SC , D > , sender : & NodeId , message : & Message ) -> Result < Arc < S > , Error >
where Message : IntoSessionId < S ::Id > {
fn requires_all_connections ( message : & Message ) -> bool {
match * message {
Message ::Generation ( _ ) = > true ,
Message ::ShareAdd ( _ ) = > true ,
Message ::ServersSetChange ( _ ) = > true ,
2017-10-02 15:27:31 +02:00
_ = > false ,
}
}
2017-11-02 15:33:11 +01:00
// get or create new session, if required
let session_id = message . into_session_id ( ) . expect ( " into_session_id fails for cluster messages only; only session messages are passed to prepare_session; qed " ) ;
let is_initialization_message = message . is_initialization_message ( ) ;
let is_delegation_message = message . is_delegation_message ( ) ;
match is_initialization_message | | is_delegation_message {
2018-05-01 15:02:14 +02:00
false = > sessions . get ( & session_id , true ) . ok_or ( Error ::NoActiveSessionWithId ) ,
2017-11-02 15:33:11 +01:00
true = > {
let creation_data = SC ::creation_data_from_message ( & message ) ? ;
let master = if is_initialization_message { sender . clone ( ) } else { data . self_key_pair . public ( ) . clone ( ) } ;
let cluster = create_cluster_view ( data , requires_all_connections ( & message ) ) ? ;
2018-01-10 11:33:45 +01:00
2017-11-16 17:34:23 +01:00
sessions . insert ( cluster , master , session_id , Some ( message . session_nonce ( ) . ok_or ( Error ::InvalidMessage ) ? ) , message . is_exclusive_session_message ( ) , creation_data )
2017-10-02 15:27:31 +02:00
} ,
}
}
2017-11-02 15:33:11 +01:00
/// Process single session message from connection.
fn process_message < S : ClusterSession , SC : ClusterSessionCreator < S , D > , D > ( data : & Arc < ClusterData > , sessions : & ClusterSessionsContainer < S , SC , D > , connection : Arc < Connection > , mut message : Message ) -> Option < Arc < S > >
where Message : IntoSessionId < S ::Id > {
// get or create new session, if required
2017-10-02 15:27:31 +02:00
let mut sender = connection . node_id ( ) . clone ( ) ;
2017-11-02 15:33:11 +01:00
let session = Self ::prepare_session ( data , sessions , & sender , & message ) ;
// send error if session is not found, or failed to create
let session = match session {
Ok ( session ) = > session ,
Err ( error ) = > {
// this is new session => it is not yet in container
2017-11-16 17:34:23 +01:00
warn! ( target : " secretstore_net " , " {}: {} session read error '{}' when requested for session from node {} " ,
2017-11-02 15:33:11 +01:00
data . self_key_pair . public ( ) , S ::type_name ( ) , error , sender ) ;
2018-01-10 11:33:45 +01:00
if ! message . is_error_message ( ) {
2017-11-02 15:33:11 +01:00
let session_id = message . into_session_id ( ) . expect ( " session_id only fails for cluster messages; only session messages are passed to process_message; qed " ) ;
let session_nonce = message . session_nonce ( ) . expect ( " session_nonce only fails for cluster messages; only session messages are passed to process_message; qed " ) ;
data . spawn ( connection . send_message ( SC ::make_error_message ( session_id , session_nonce , error ) ) ) ;
2017-10-02 15:27:31 +02:00
}
2017-11-02 15:33:11 +01:00
return None ;
2017-10-02 15:27:31 +02:00
} ,
} ;
2017-11-02 15:33:11 +01:00
let session_id = session . id ( ) ;
2017-10-02 15:27:31 +02:00
let mut is_queued_message = false ;
loop {
2017-11-02 15:33:11 +01:00
let message_result = session . on_message ( & sender , & message ) ;
match message_result {
2017-10-02 15:27:31 +02:00
Ok ( _ ) = > {
// if session is completed => stop
if session . is_finished ( ) {
2017-11-02 15:33:11 +01:00
info! ( target : " secretstore_net " , " {}: {} session completed " , data . self_key_pair . public ( ) , S ::type_name ( ) ) ;
sessions . remove ( & session_id ) ;
return Some ( session ) ;
2017-10-02 15:27:31 +02:00
}
// try to dequeue message
2017-11-02 15:33:11 +01:00
match sessions . dequeue_message ( & session_id ) {
Some ( ( msg_sender , msg ) ) = > {
2017-10-02 15:27:31 +02:00
is_queued_message = true ;
sender = msg_sender ;
message = msg ;
} ,
2017-11-02 15:33:11 +01:00
None = > return Some ( session ) ,
2017-10-02 15:27:31 +02:00
}
} ,
Err ( Error ::TooEarlyForRequest ) = > {
2017-11-02 15:33:11 +01:00
sessions . enqueue_message ( & session_id , sender , message , is_queued_message ) ;
return Some ( session ) ;
2017-10-02 15:27:31 +02:00
} ,
Err ( err ) = > {
2017-11-02 15:33:11 +01:00
warn! ( target : " secretstore_net " , " {}: {} session error '{}' when processing message {} from node {} " ,
data . self_key_pair . public ( ) ,
S ::type_name ( ) ,
err ,
message ,
sender ) ;
session . on_session_error ( data . self_key_pair . public ( ) , err ) ;
sessions . remove ( & session_id ) ;
return Some ( session ) ;
2017-10-02 15:27:31 +02:00
} ,
}
}
}
2017-04-03 11:13:51 +02:00
/// Process single cluster message from the connection.
fn process_cluster_message ( data : Arc < ClusterData > , connection : Arc < Connection > , message : ClusterMessage ) {
match message {
2017-10-05 22:38:23 +02:00
ClusterMessage ::KeepAlive ( _ ) = > data . spawn ( connection . send_message ( Message ::Cluster ( ClusterMessage ::KeepAliveResponse ( message ::KeepAliveResponse {
session_id : None ,
} ) ) ) ) ,
ClusterMessage ::KeepAliveResponse ( msg ) = > if let Some ( session_id ) = msg . session_id {
data . sessions . on_session_keep_alive ( connection . node_id ( ) , session_id . into ( ) ) ;
} ,
2017-04-03 11:13:51 +02:00
_ = > warn! ( target : " secretstore_net " , " {}: received unexpected message {} from node {} at {} " , data . self_key_pair . public ( ) , message , connection . node_id ( ) , connection . node_address ( ) ) ,
}
}
}
impl ClusterConnections {
pub fn new ( config : & ClusterConfiguration ) -> Result < Self , Error > {
2018-01-10 11:33:45 +01:00
let mut nodes = config . key_server_set . snapshot ( ) . current_set ;
2018-06-14 09:01:52 +02:00
let is_isolated = nodes . remove ( config . self_key_pair . public ( ) ) . is_none ( ) ;
2017-07-19 14:14:37 +02:00
2018-01-10 11:33:45 +01:00
let trigger : Box < ConnectionTrigger > = match config . auto_migrate_enabled {
false = > Box ::new ( SimpleConnectionTrigger ::new ( config . key_server_set . clone ( ) , config . self_key_pair . clone ( ) , config . admin_public . clone ( ) ) ) ,
true if config . admin_public . is_none ( ) = > Box ::new ( ConnectionTriggerWithMigration ::new ( config . key_server_set . clone ( ) , config . self_key_pair . clone ( ) ) ) ,
2018-05-01 15:02:14 +02:00
true = > return Err ( Error ::Internal ( " secret store admininstrator public key is specified with auto-migration enabled " . into ( ) ) ) ,
2018-01-10 11:33:45 +01:00
} ;
let connector = trigger . servers_set_change_creator_connector ( ) ;
2017-07-19 11:36:40 +02:00
Ok ( ClusterConnections {
2017-04-03 11:13:51 +02:00
self_node_id : config . self_key_pair . public ( ) . clone ( ) ,
2017-07-19 11:36:40 +02:00
key_server_set : config . key_server_set . clone ( ) ,
2018-01-10 11:33:45 +01:00
trigger : Mutex ::new ( trigger ) ,
connector : connector ,
2017-07-19 11:36:40 +02:00
data : RwLock ::new ( ClusterConnectionsData {
2018-06-14 09:01:52 +02:00
is_isolated : is_isolated ,
2017-07-19 14:14:37 +02:00
nodes : nodes ,
2017-07-19 11:36:40 +02:00
connections : BTreeMap ::new ( ) ,
} ) ,
} )
2017-04-03 11:13:51 +02:00
}
pub fn cluster_state ( & self ) -> ClusterState {
ClusterState {
2017-07-19 11:36:40 +02:00
connected : self . data . read ( ) . connections . keys ( ) . cloned ( ) . collect ( ) ,
2017-04-03 11:13:51 +02:00
}
}
pub fn get ( & self , node : & NodeId ) -> Option < Arc < Connection > > {
2017-07-19 11:36:40 +02:00
self . data . read ( ) . connections . get ( node ) . cloned ( )
2017-04-03 11:13:51 +02:00
}
2018-01-10 11:33:45 +01:00
pub fn insert ( & self , data : Arc < ClusterData > , connection : Arc < Connection > ) -> bool {
{
let mut data = self . data . write ( ) ;
if ! data . nodes . contains_key ( connection . node_id ( ) ) {
// incoming connections are checked here
trace! ( target : " secretstore_net " , " {}: ignoring unknown connection from {} at {} " , self . self_node_id , connection . node_id ( ) , connection . node_address ( ) ) ;
debug_assert! ( connection . is_inbound ( ) ) ;
2017-04-03 11:13:51 +02:00
return false ;
}
2018-01-10 11:33:45 +01:00
if data . connections . contains_key ( connection . node_id ( ) ) {
// we have already connected to the same node
// the agreement is that node with lower id must establish connection to node with higher id
if ( & self . self_node_id < connection . node_id ( ) & & connection . is_inbound ( ) )
| | ( & self . self_node_id > connection . node_id ( ) & & ! connection . is_inbound ( ) ) {
return false ;
}
}
let node = connection . node_id ( ) . clone ( ) ;
trace! ( target : " secretstore_net " , " {}: inserting connection to {} at {}. Connected to {} of {} nodes " ,
self . self_node_id , node , connection . node_address ( ) , data . connections . len ( ) + 1 , data . nodes . len ( ) ) ;
data . connections . insert ( node . clone ( ) , connection . clone ( ) ) ;
2017-04-03 11:13:51 +02:00
}
2017-04-25 21:34:03 +02:00
2018-01-10 11:33:45 +01:00
let maintain_action = self . trigger . lock ( ) . on_connection_established ( connection . node_id ( ) ) ;
self . maintain_connection_trigger ( maintain_action , data ) ;
2017-04-03 11:13:51 +02:00
true
}
2018-01-10 11:33:45 +01:00
pub fn remove ( & self , data : Arc < ClusterData > , node : & NodeId , is_inbound : bool ) {
{
2018-01-10 19:56:02 +01:00
let mut data = self . data . write ( ) ;
2018-01-10 11:33:45 +01:00
if let Entry ::Occupied ( entry ) = data . connections . entry ( node . clone ( ) ) {
if entry . get ( ) . is_inbound ( ) ! = is_inbound {
return ;
}
trace! ( target : " secretstore_net " , " {}: removing connection to {} at {} " , self . self_node_id , entry . get ( ) . node_id ( ) , entry . get ( ) . node_address ( ) ) ;
entry . remove_entry ( ) ;
} else {
2017-04-03 11:13:51 +02:00
return ;
}
}
2018-01-10 11:33:45 +01:00
let maintain_action = self . trigger . lock ( ) . on_connection_closed ( node ) ;
self . maintain_connection_trigger ( maintain_action , data ) ;
2017-04-03 11:13:51 +02:00
}
2018-06-14 09:01:52 +02:00
pub fn connected_nodes ( & self ) -> Result < BTreeSet < NodeId > , Error > {
let data = self . data . read ( ) ;
if data . is_isolated {
return Err ( Error ::NodeDisconnected ) ;
}
Ok ( data . connections . keys ( ) . cloned ( ) . collect ( ) )
2017-04-03 11:13:51 +02:00
}
pub fn active_connections ( & self ) -> Vec < Arc < Connection > > {
2017-07-19 11:36:40 +02:00
self . data . read ( ) . connections . values ( ) . cloned ( ) . collect ( )
2017-04-03 11:13:51 +02:00
}
pub fn disconnected_nodes ( & self ) -> BTreeMap < NodeId , SocketAddr > {
2017-07-19 11:36:40 +02:00
let data = self . data . read ( ) ;
data . nodes . iter ( )
. filter ( | & ( node_id , _ ) | ! data . connections . contains_key ( node_id ) )
2017-04-03 11:13:51 +02:00
. map ( | ( node_id , node_address ) | ( node_id . clone ( ) , node_address . clone ( ) ) )
. collect ( )
}
2017-07-19 11:36:40 +02:00
2018-01-10 11:33:45 +01:00
pub fn servers_set_change_creator_connector ( & self ) -> Arc < ServersSetChangeSessionCreatorConnector > {
self . connector . clone ( )
}
2017-07-20 11:19:29 +02:00
2018-01-10 11:33:45 +01:00
pub fn update_nodes_set ( & self , data : Arc < ClusterData > ) -> Option < BoxedEmptyFuture > {
let maintain_action = self . trigger . lock ( ) . on_maintain ( ) ;
self . maintain_connection_trigger ( maintain_action , data ) ;
None
}
2017-07-20 11:19:29 +02:00
2018-01-10 11:33:45 +01:00
fn maintain_connection_trigger ( & self , maintain_action : Option < Maintain > , data : Arc < ClusterData > ) {
if maintain_action = = Some ( Maintain ::SessionAndConnections ) | | maintain_action = = Some ( Maintain ::Session ) {
let client = ClusterClientImpl ::new ( data ) ;
self . trigger . lock ( ) . maintain_session ( & client ) ;
2017-07-20 11:19:29 +02:00
}
2018-01-10 11:33:45 +01:00
if maintain_action = = Some ( Maintain ::SessionAndConnections ) | | maintain_action = = Some ( Maintain ::Connections ) {
let mut trigger = self . trigger . lock ( ) ;
let mut data = self . data . write ( ) ;
trigger . maintain_connections ( & mut * data ) ;
2017-07-19 11:36:40 +02:00
}
}
2017-04-03 11:13:51 +02:00
}
impl ClusterData {
pub fn new ( handle : & Handle , config : ClusterConfiguration , connections : ClusterConnections , sessions : ClusterSessions ) -> Arc < Self > {
Arc ::new ( ClusterData {
handle : handle . remote ( ) . clone ( ) ,
pool : CpuPool ::new ( config . threads ) ,
self_key_pair : config . self_key_pair . clone ( ) ,
connections : connections ,
sessions : sessions ,
config : config ,
} )
}
/// Get connection to given node.
pub fn connection ( & self , node : & NodeId ) -> Option < Arc < Connection > > {
self . connections . get ( node )
}
/// Spawns a future using thread pool and schedules execution of it with event loop handle.
pub fn spawn < F > ( & self , f : F ) where F : Future + Send + 'static , F ::Item : Send + 'static , F ::Error : Send + 'static {
let pool_work = self . pool . spawn ( f ) ;
self . handle . spawn ( move | _handle | {
pool_work . then ( | _ | finished ( ( ) ) )
} )
}
}
impl Connection {
pub fn new ( is_inbound : bool , connection : NetConnection ) -> Arc < Connection > {
Arc ::new ( Connection {
node_id : connection . node_id ,
node_address : connection . address ,
is_inbound : is_inbound ,
stream : connection . stream ,
key : connection . key ,
2018-04-02 10:47:56 +02:00
last_message_time : Mutex ::new ( Instant ::now ( ) ) ,
2017-04-03 11:13:51 +02:00
} )
}
pub fn is_inbound ( & self ) -> bool {
self . is_inbound
}
pub fn node_id ( & self ) -> & NodeId {
& self . node_id
}
2018-04-02 10:47:56 +02:00
pub fn last_message_time ( & self ) -> Instant {
2017-04-03 11:13:51 +02:00
* self . last_message_time . lock ( )
}
2018-04-02 10:47:56 +02:00
pub fn set_last_message_time ( & self , last_message_time : Instant ) {
2017-04-03 11:13:51 +02:00
* self . last_message_time . lock ( ) = last_message_time ;
}
pub fn node_address ( & self ) -> & SocketAddr {
& self . node_address
}
pub fn send_message ( & self , message : Message ) -> WriteMessage < SharedTcpStream > {
write_encrypted_message ( self . stream . clone ( ) , & self . key , message )
}
pub fn read_message ( & self ) -> ReadMessage < SharedTcpStream > {
read_encrypted_message ( self . stream . clone ( ) , self . key . clone ( ) )
}
}
impl ClusterView {
2018-05-01 15:02:14 +02:00
pub fn new ( cluster : Arc < ClusterData > , nodes : BTreeSet < NodeId > , configured_nodes_count : usize ) -> Self {
2017-04-03 11:13:51 +02:00
ClusterView {
2018-05-01 15:02:14 +02:00
configured_nodes_count : configured_nodes_count ,
connected_nodes_count : nodes . len ( ) ,
2017-04-03 11:13:51 +02:00
core : Arc ::new ( Mutex ::new ( ClusterViewCore {
cluster : cluster ,
nodes : nodes ,
} ) ) ,
}
}
}
impl Cluster for ClusterView {
fn broadcast ( & self , message : Message ) -> Result < ( ) , Error > {
let core = self . core . lock ( ) ;
for node in core . nodes . iter ( ) . filter ( | n | * n ! = core . cluster . self_key_pair . public ( ) ) {
2017-04-25 21:34:03 +02:00
trace! ( target : " secretstore_net " , " {}: sent message {} to {} " , core . cluster . self_key_pair . public ( ) , message , node ) ;
2017-04-03 11:13:51 +02:00
let connection = core . cluster . connection ( node ) . ok_or ( Error ::NodeDisconnected ) ? ;
core . cluster . spawn ( connection . send_message ( message . clone ( ) ) )
}
Ok ( ( ) )
}
fn send ( & self , to : & NodeId , message : Message ) -> Result < ( ) , Error > {
let core = self . core . lock ( ) ;
2017-04-25 21:34:03 +02:00
trace! ( target : " secretstore_net " , " {}: sent message {} to {} " , core . cluster . self_key_pair . public ( ) , message , to ) ;
2017-04-03 11:13:51 +02:00
let connection = core . cluster . connection ( to ) . ok_or ( Error ::NodeDisconnected ) ? ;
core . cluster . spawn ( connection . send_message ( message ) ) ;
Ok ( ( ) )
}
2017-10-03 10:35:31 +02:00
fn is_connected ( & self , node : & NodeId ) -> bool {
self . core . lock ( ) . nodes . contains ( node )
}
fn nodes ( & self ) -> BTreeSet < NodeId > {
self . core . lock ( ) . nodes . clone ( )
}
2018-05-01 15:02:14 +02:00
fn configured_nodes_count ( & self ) -> usize {
self . configured_nodes_count
}
fn connected_nodes_count ( & self ) -> usize {
self . connected_nodes_count
}
2017-04-03 11:13:51 +02:00
}
impl ClusterClientImpl {
pub fn new ( data : Arc < ClusterData > ) -> Self {
ClusterClientImpl {
data : data ,
}
}
2017-11-02 15:33:11 +01:00
2017-11-22 08:05:14 +01:00
fn create_key_version_negotiation_session ( & self , session_id : SessionId ) -> Result < Arc < KeyVersionNegotiationSession < KeyVersionNegotiationSessionTransport > > , Error > {
2018-06-14 09:01:52 +02:00
let mut connected_nodes = self . data . connections . connected_nodes ( ) ? ;
2017-11-02 15:33:11 +01:00
connected_nodes . insert ( self . data . self_key_pair . public ( ) . clone ( ) ) ;
let access_key = Random . generate ( ) ? . secret ( ) . clone ( ) ;
let session_id = SessionIdWithSubSession ::new ( session_id , access_key ) ;
let cluster = create_cluster_view ( & self . data , false ) ? ;
let session = self . data . sessions . negotiation_sessions . insert ( cluster , self . data . self_key_pair . public ( ) . clone ( ) , session_id . clone ( ) , None , false , None ) ? ;
match session . initialize ( connected_nodes ) {
Ok ( ( ) ) = > Ok ( session ) ,
Err ( error ) = > {
self . data . sessions . negotiation_sessions . remove ( & session . id ( ) ) ;
Err ( error )
}
}
}
2018-04-09 16:38:59 +02:00
fn process_initialization_result < S : ClusterSession , SC : ClusterSessionCreator < S , D > , D > ( result : Result < ( ) , Error > , session : Arc < S > , sessions : & ClusterSessionsContainer < S , SC , D > ) -> Result < Arc < S > , Error > {
match result {
Ok ( ( ) ) if session . is_finished ( ) = > {
sessions . remove ( & session . id ( ) ) ;
Ok ( session )
} ,
Ok ( ( ) ) = > Ok ( session ) ,
Err ( error ) = > {
sessions . remove ( & session . id ( ) ) ;
Err ( error )
} ,
}
}
2017-04-03 11:13:51 +02:00
}
impl ClusterClient for ClusterClientImpl {
fn cluster_state ( & self ) -> ClusterState {
self . data . connections . cluster_state ( )
}
2018-04-03 16:54:34 +02:00
fn new_generation_session ( & self , session_id : SessionId , origin : Option < Address > , author : Address , threshold : usize ) -> Result < Arc < GenerationSession > , Error > {
2018-06-14 09:01:52 +02:00
let mut connected_nodes = self . data . connections . connected_nodes ( ) ? ;
2017-07-06 14:02:10 +02:00
connected_nodes . insert ( self . data . self_key_pair . public ( ) . clone ( ) ) ;
2017-11-02 15:33:11 +01:00
let cluster = create_cluster_view ( & self . data , true ) ? ;
let session = self . data . sessions . generation_sessions . insert ( cluster , self . data . self_key_pair . public ( ) . clone ( ) , session_id , None , false , None ) ? ;
2018-04-09 16:38:59 +02:00
Self ::process_initialization_result (
session . initialize ( origin , author , false , threshold , connected_nodes . into ( ) ) ,
session , & self . data . sessions . generation_sessions )
2017-07-06 14:02:10 +02:00
}
2018-03-19 06:42:40 +01:00
fn new_encryption_session ( & self , session_id : SessionId , requester : Requester , common_point : Public , encrypted_point : Public ) -> Result < Arc < EncryptionSession > , Error > {
2018-06-14 09:01:52 +02:00
let mut connected_nodes = self . data . connections . connected_nodes ( ) ? ;
2017-04-03 11:13:51 +02:00
connected_nodes . insert ( self . data . self_key_pair . public ( ) . clone ( ) ) ;
2017-11-02 15:33:11 +01:00
let cluster = create_cluster_view ( & self . data , true ) ? ;
let session = self . data . sessions . encryption_sessions . insert ( cluster , self . data . self_key_pair . public ( ) . clone ( ) , session_id , None , false , None ) ? ;
2018-04-09 16:38:59 +02:00
Self ::process_initialization_result (
session . initialize ( requester , common_point , encrypted_point ) ,
session , & self . data . sessions . encryption_sessions )
2017-04-03 11:13:51 +02:00
}
2018-04-03 16:54:34 +02:00
fn new_decryption_session ( & self , session_id : SessionId , origin : Option < Address > , requester : Requester , version : Option < H256 > , is_shadow_decryption : bool , is_broadcast_decryption : bool ) -> Result < Arc < DecryptionSession > , Error > {
2018-06-14 09:01:52 +02:00
let mut connected_nodes = self . data . connections . connected_nodes ( ) ? ;
2017-04-03 11:13:51 +02:00
connected_nodes . insert ( self . data . self_key_pair . public ( ) . clone ( ) ) ;
let access_key = Random . generate ( ) ? . secret ( ) . clone ( ) ;
2017-11-02 15:33:11 +01:00
let session_id = SessionIdWithSubSession ::new ( session_id , access_key ) ;
let cluster = create_cluster_view ( & self . data , false ) ? ;
2018-03-19 06:42:40 +01:00
let session = self . data . sessions . decryption_sessions . insert ( cluster , self . data . self_key_pair . public ( ) . clone ( ) ,
session_id . clone ( ) , None , false , Some ( requester ) ) ? ;
2017-11-02 15:33:11 +01:00
let initialization_result = match version {
2018-04-03 16:54:34 +02:00
Some ( version ) = > session . initialize ( origin , version , is_shadow_decryption , is_broadcast_decryption ) ,
2017-11-02 15:33:11 +01:00
None = > {
self . create_key_version_negotiation_session ( session_id . id . clone ( ) )
. map ( | version_session | {
2018-04-03 16:54:34 +02:00
version_session . set_continue_action ( ContinueAction ::Decrypt ( session . clone ( ) , origin , is_shadow_decryption , is_broadcast_decryption ) ) ;
2017-11-02 15:33:11 +01:00
ClusterCore ::try_continue_session ( & self . data , Some ( version_session ) ) ;
} )
} ,
} ;
2017-04-25 21:34:03 +02:00
2018-04-09 16:38:59 +02:00
Self ::process_initialization_result (
initialization_result ,
session , & self . data . sessions . decryption_sessions )
2017-04-25 21:34:03 +02:00
}
2018-03-19 06:42:40 +01:00
fn new_schnorr_signing_session ( & self , session_id : SessionId , requester : Requester , version : Option < H256 > , message_hash : H256 ) -> Result < Arc < SchnorrSigningSession > , Error > {
2018-06-14 09:01:52 +02:00
let mut connected_nodes = self . data . connections . connected_nodes ( ) ? ;
2018-03-01 09:59:21 +01:00
connected_nodes . insert ( self . data . self_key_pair . public ( ) . clone ( ) ) ;
let access_key = Random . generate ( ) ? . secret ( ) . clone ( ) ;
let session_id = SessionIdWithSubSession ::new ( session_id , access_key ) ;
let cluster = create_cluster_view ( & self . data , false ) ? ;
2018-03-19 06:42:40 +01:00
let session = self . data . sessions . schnorr_signing_sessions . insert ( cluster , self . data . self_key_pair . public ( ) . clone ( ) , session_id . clone ( ) , None , false , Some ( requester ) ) ? ;
2018-03-01 09:59:21 +01:00
let initialization_result = match version {
Some ( version ) = > session . initialize ( version , message_hash ) ,
None = > {
self . create_key_version_negotiation_session ( session_id . id . clone ( ) )
. map ( | version_session | {
version_session . set_continue_action ( ContinueAction ::SchnorrSign ( session . clone ( ) , message_hash ) ) ;
ClusterCore ::try_continue_session ( & self . data , Some ( version_session ) ) ;
} )
} ,
} ;
2018-04-09 16:38:59 +02:00
Self ::process_initialization_result (
initialization_result ,
session , & self . data . sessions . schnorr_signing_sessions )
2018-03-01 09:59:21 +01:00
}
2018-03-19 06:42:40 +01:00
fn new_ecdsa_signing_session ( & self , session_id : SessionId , requester : Requester , version : Option < H256 > , message_hash : H256 ) -> Result < Arc < EcdsaSigningSession > , Error > {
2018-06-14 09:01:52 +02:00
let mut connected_nodes = self . data . connections . connected_nodes ( ) ? ;
2017-10-02 15:27:31 +02:00
connected_nodes . insert ( self . data . self_key_pair . public ( ) . clone ( ) ) ;
2017-11-02 15:33:11 +01:00
let access_key = Random . generate ( ) ? . secret ( ) . clone ( ) ;
let session_id = SessionIdWithSubSession ::new ( session_id , access_key ) ;
let cluster = create_cluster_view ( & self . data , false ) ? ;
2018-03-19 06:42:40 +01:00
let session = self . data . sessions . ecdsa_signing_sessions . insert ( cluster , self . data . self_key_pair . public ( ) . clone ( ) , session_id . clone ( ) , None , false , Some ( requester ) ) ? ;
2017-11-02 15:33:11 +01:00
let initialization_result = match version {
Some ( version ) = > session . initialize ( version , message_hash ) ,
None = > {
self . create_key_version_negotiation_session ( session_id . id . clone ( ) )
. map ( | version_session | {
2018-03-01 09:59:21 +01:00
version_session . set_continue_action ( ContinueAction ::EcdsaSign ( session . clone ( ) , message_hash ) ) ;
2017-11-02 15:33:11 +01:00
ClusterCore ::try_continue_session ( & self . data , Some ( version_session ) ) ;
} )
} ,
} ;
2017-10-02 15:27:31 +02:00
2018-04-09 16:38:59 +02:00
Self ::process_initialization_result (
initialization_result ,
session , & self . data . sessions . ecdsa_signing_sessions )
2017-11-02 15:33:11 +01:00
}
2017-10-02 15:27:31 +02:00
2017-11-22 08:05:14 +01:00
fn new_key_version_negotiation_session ( & self , session_id : SessionId ) -> Result < Arc < KeyVersionNegotiationSession < KeyVersionNegotiationSessionTransport > > , Error > {
2017-11-02 15:33:11 +01:00
let session = self . create_key_version_negotiation_session ( session_id ) ? ;
2017-11-22 08:05:14 +01:00
Ok ( session )
2017-10-02 15:27:31 +02:00
}
2018-01-10 11:33:45 +01:00
fn new_servers_set_change_session ( & self , session_id : Option < SessionId > , migration_id : Option < H256 > , new_nodes_set : BTreeSet < NodeId > , old_set_signature : Signature , new_set_signature : Signature ) -> Result < Arc < AdminSession > , Error > {
2018-06-14 09:01:52 +02:00
let mut connected_nodes = self . data . connections . connected_nodes ( ) ? ;
2017-10-02 15:27:31 +02:00
connected_nodes . insert ( self . data . self_key_pair . public ( ) . clone ( ) ) ;
2017-11-02 15:33:11 +01:00
let session_id = match session_id {
Some ( session_id ) if session_id = = * SERVERS_SET_CHANGE_SESSION_ID = > session_id ,
Some ( _ ) = > return Err ( Error ::InvalidMessage ) ,
None = > * SERVERS_SET_CHANGE_SESSION_ID ,
2017-10-02 15:27:31 +02:00
} ;
2017-11-02 15:33:11 +01:00
let cluster = create_cluster_view ( & self . data , true ) ? ;
2018-01-10 11:33:45 +01:00
let creation_data = Some ( AdminSessionCreationData ::ServersSetChange ( migration_id , new_nodes_set . clone ( ) ) ) ;
let session = self . data . sessions . admin_sessions . insert ( cluster , self . data . self_key_pair . public ( ) . clone ( ) , session_id , None , true , creation_data ) ? ;
2017-11-02 15:33:11 +01:00
let initialization_result = session . as_servers_set_change ( ) . expect ( " servers set change session is created; qed " )
. initialize ( new_nodes_set , old_set_signature , new_set_signature ) ;
2018-04-09 16:38:59 +02:00
if initialization_result . is_ok ( ) {
self . data . connections . servers_set_change_creator_connector ( ) . set_key_servers_set_change_session ( session . clone ( ) ) ;
2017-11-02 15:33:11 +01:00
}
2018-04-09 16:38:59 +02:00
Self ::process_initialization_result (
initialization_result ,
session , & self . data . sessions . admin_sessions )
2017-10-02 15:27:31 +02:00
}
2017-11-22 08:05:14 +01:00
fn add_generation_listener ( & self , listener : Arc < ClusterSessionsListener < GenerationSession > > ) {
self . data . sessions . generation_sessions . add_listener ( listener ) ;
}
2018-04-03 16:54:34 +02:00
fn add_decryption_listener ( & self , listener : Arc < ClusterSessionsListener < DecryptionSession > > ) {
self . data . sessions . decryption_sessions . add_listener ( listener ) ;
}
2018-06-14 09:01:52 +02:00
fn add_key_version_negotiation_listener ( & self , listener : Arc < ClusterSessionsListener < KeyVersionNegotiationSession < KeyVersionNegotiationSessionTransport > > > ) {
self . data . sessions . negotiation_sessions . add_listener ( listener ) ;
}
2017-04-25 21:34:03 +02:00
#[ cfg(test) ]
2017-07-06 14:02:10 +02:00
fn connect ( & self ) {
ClusterCore ::connect_disconnected_nodes ( self . data . clone ( ) ) ;
2017-05-12 14:36:19 +02:00
}
#[ cfg(test) ]
2017-07-06 14:02:10 +02:00
fn make_faulty_generation_sessions ( & self ) {
self . data . sessions . make_faulty_generation_sessions ( ) ;
2017-05-12 14:36:19 +02:00
}
2017-07-06 14:02:10 +02:00
#[ cfg(test) ]
2017-11-22 08:05:14 +01:00
fn generation_session ( & self , session_id : & SessionId ) -> Option < Arc < GenerationSession > > {
2017-10-05 22:38:23 +02:00
self . data . sessions . generation_sessions . get ( session_id , false )
2017-05-12 14:36:19 +02:00
}
2017-11-02 15:33:11 +01:00
#[ cfg(test) ]
fn key_storage ( & self ) -> Arc < KeyStorage > {
self . data . config . key_storage . clone ( )
}
2017-05-12 14:36:19 +02:00
}
2017-04-03 11:13:51 +02:00
fn make_socket_address ( address : & str , port : u16 ) -> Result < SocketAddr , Error > {
let ip_address : IpAddr = address . parse ( ) . map_err ( | _ | Error ::InvalidNodeAddress ) ? ;
Ok ( SocketAddr ::new ( ip_address , port ) )
}
2017-03-13 12:54:56 +01:00
#[ cfg(test) ]
pub mod tests {
2017-04-03 11:13:51 +02:00
use std ::sync ::Arc ;
2018-04-03 16:54:34 +02:00
use std ::sync ::atomic ::{ AtomicUsize , Ordering } ;
2018-04-02 10:47:56 +02:00
use std ::time ::{ Duration , Instant } ;
2017-10-03 10:35:31 +02:00
use std ::collections ::{ BTreeSet , VecDeque } ;
2017-03-13 12:54:56 +01:00
use parking_lot ::Mutex ;
2017-04-03 11:13:51 +02:00
use tokio_core ::reactor ::Core ;
2018-04-03 16:54:34 +02:00
use ethereum_types ::{ Address , H256 } ;
2017-11-22 15:31:34 +01:00
use ethkey ::{ Random , Generator , Public , Signature , sign } ;
2018-03-19 06:42:40 +01:00
use key_server_cluster ::{ NodeId , SessionId , Requester , Error , DummyAclStorage , DummyKeyStorage ,
MapKeyServerSet , PlainNodeKeyPair , KeyStorage } ;
2017-03-13 12:54:56 +01:00
use key_server_cluster ::message ::Message ;
2017-11-22 15:31:34 +01:00
use key_server_cluster ::cluster ::{ Cluster , ClusterCore , ClusterConfiguration , ClusterClient , ClusterState } ;
use key_server_cluster ::cluster_sessions ::{ ClusterSession , AdminSession , ClusterSessionsListener } ;
use key_server_cluster ::generation_session ::{ SessionImpl as GenerationSession , SessionState as GenerationSessionState } ;
use key_server_cluster ::decryption_session ::{ SessionImpl as DecryptionSession } ;
use key_server_cluster ::encryption_session ::{ SessionImpl as EncryptionSession } ;
2018-03-01 09:59:21 +01:00
use key_server_cluster ::signing_session_ecdsa ::{ SessionImpl as EcdsaSigningSession } ;
use key_server_cluster ::signing_session_schnorr ::{ SessionImpl as SchnorrSigningSession } ;
2017-11-22 15:31:34 +01:00
use key_server_cluster ::key_version_negotiation_session ::{ SessionImpl as KeyVersionNegotiationSession ,
2017-11-23 06:22:51 +01:00
IsolatedSessionTransport as KeyVersionNegotiationSessionTransport } ;
2017-11-22 15:31:34 +01:00
2018-04-02 10:47:56 +02:00
const TIMEOUT : Duration = Duration ::from_millis ( 300 ) ;
2017-11-22 15:31:34 +01:00
#[ derive(Default) ]
2018-04-03 16:54:34 +02:00
pub struct DummyClusterClient {
pub generation_requests_count : AtomicUsize ,
}
2017-03-13 12:54:56 +01:00
#[ derive(Debug) ]
pub struct DummyCluster {
id : NodeId ,
data : Mutex < DummyClusterData > ,
}
#[ derive(Debug, Default) ]
struct DummyClusterData {
2017-11-02 15:33:11 +01:00
nodes : BTreeSet < NodeId > ,
2017-03-13 12:54:56 +01:00
messages : VecDeque < ( NodeId , Message ) > ,
}
2017-11-22 15:31:34 +01:00
impl ClusterClient for DummyClusterClient {
2018-01-10 11:33:45 +01:00
fn cluster_state ( & self ) -> ClusterState { unimplemented! ( " test-only " ) }
2018-04-03 16:54:34 +02:00
fn new_generation_session ( & self , _session_id : SessionId , _origin : Option < Address > , _author : Address , _threshold : usize ) -> Result < Arc < GenerationSession > , Error > {
self . generation_requests_count . fetch_add ( 1 , Ordering ::Relaxed ) ;
2018-05-01 15:02:14 +02:00
Err ( Error ::Internal ( " test-error " . into ( ) ) )
2018-04-03 16:54:34 +02:00
}
2018-03-19 06:42:40 +01:00
fn new_encryption_session ( & self , _session_id : SessionId , _requester : Requester , _common_point : Public , _encrypted_point : Public ) -> Result < Arc < EncryptionSession > , Error > { unimplemented! ( " test-only " ) }
2018-04-03 16:54:34 +02:00
fn new_decryption_session ( & self , _session_id : SessionId , _origin : Option < Address > , _requester : Requester , _version : Option < H256 > , _is_shadow_decryption : bool , _is_broadcast_session : bool ) -> Result < Arc < DecryptionSession > , Error > { unimplemented! ( " test-only " ) }
2018-03-19 06:42:40 +01:00
fn new_schnorr_signing_session ( & self , _session_id : SessionId , _requester : Requester , _version : Option < H256 > , _message_hash : H256 ) -> Result < Arc < SchnorrSigningSession > , Error > { unimplemented! ( " test-only " ) }
fn new_ecdsa_signing_session ( & self , _session_id : SessionId , _requester : Requester , _version : Option < H256 > , _message_hash : H256 ) -> Result < Arc < EcdsaSigningSession > , Error > { unimplemented! ( " test-only " ) }
2018-04-03 16:54:34 +02:00
2018-01-10 11:33:45 +01:00
fn new_key_version_negotiation_session ( & self , _session_id : SessionId ) -> Result < Arc < KeyVersionNegotiationSession < KeyVersionNegotiationSessionTransport > > , Error > { unimplemented! ( " test-only " ) }
fn new_servers_set_change_session ( & self , _session_id : Option < SessionId > , _migration_id : Option < H256 > , _new_nodes_set : BTreeSet < NodeId > , _old_set_signature : Signature , _new_set_signature : Signature ) -> Result < Arc < AdminSession > , Error > { unimplemented! ( " test-only " ) }
2017-11-22 15:31:34 +01:00
2017-11-23 06:22:51 +01:00
fn add_generation_listener ( & self , _listener : Arc < ClusterSessionsListener < GenerationSession > > ) { }
2018-04-03 16:54:34 +02:00
fn add_decryption_listener ( & self , _listener : Arc < ClusterSessionsListener < DecryptionSession > > ) { }
2018-06-14 09:01:52 +02:00
fn add_key_version_negotiation_listener ( & self , _listener : Arc < ClusterSessionsListener < KeyVersionNegotiationSession < KeyVersionNegotiationSessionTransport > > > ) { }
2017-11-22 15:31:34 +01:00
2018-01-10 11:33:45 +01:00
fn make_faulty_generation_sessions ( & self ) { unimplemented! ( " test-only " ) }
fn generation_session ( & self , _session_id : & SessionId ) -> Option < Arc < GenerationSession > > { unimplemented! ( " test-only " ) }
fn connect ( & self ) { unimplemented! ( " test-only " ) }
fn key_storage ( & self ) -> Arc < KeyStorage > { unimplemented! ( " test-only " ) }
2017-11-22 15:31:34 +01:00
}
2017-03-13 12:54:56 +01:00
impl DummyCluster {
pub fn new ( id : NodeId ) -> Self {
DummyCluster {
id : id ,
data : Mutex ::new ( DummyClusterData ::default ( ) )
}
}
pub fn node ( & self ) -> NodeId {
self . id . clone ( )
}
pub fn add_node ( & self , node : NodeId ) {
2017-11-02 15:33:11 +01:00
self . data . lock ( ) . nodes . insert ( node ) ;
}
pub fn add_nodes < I : Iterator < Item = NodeId > > ( & self , nodes : I ) {
self . data . lock ( ) . nodes . extend ( nodes )
2017-03-13 12:54:56 +01:00
}
2017-10-05 22:37:41 +02:00
pub fn remove_node ( & self , node : & NodeId ) {
2017-11-02 15:33:11 +01:00
self . data . lock ( ) . nodes . remove ( node ) ;
2017-10-05 22:37:41 +02:00
}
2017-03-13 12:54:56 +01:00
pub fn take_message ( & self ) -> Option < ( NodeId , Message ) > {
self . data . lock ( ) . messages . pop_front ( )
}
}
impl Cluster for DummyCluster {
fn broadcast ( & self , message : Message ) -> Result < ( ) , Error > {
let mut data = self . data . lock ( ) ;
let all_nodes : Vec < _ > = data . nodes . iter ( ) . cloned ( ) . filter ( | n | n ! = & self . id ) . collect ( ) ;
for node in all_nodes {
data . messages . push_back ( ( node , message . clone ( ) ) ) ;
}
Ok ( ( ) )
}
fn send ( & self , to : & NodeId , message : Message ) -> Result < ( ) , Error > {
debug_assert! ( & self . id ! = to ) ;
self . data . lock ( ) . messages . push_back ( ( to . clone ( ) , message ) ) ;
Ok ( ( ) )
}
2017-10-03 10:35:31 +02:00
fn is_connected ( & self , node : & NodeId ) -> bool {
let data = self . data . lock ( ) ;
& self . id = = node | | data . nodes . contains ( node )
}
fn nodes ( & self ) -> BTreeSet < NodeId > {
self . data . lock ( ) . nodes . iter ( ) . cloned ( ) . collect ( )
}
2018-05-01 15:02:14 +02:00
fn configured_nodes_count ( & self ) -> usize {
self . data . lock ( ) . nodes . len ( )
}
fn connected_nodes_count ( & self ) -> usize {
self . data . lock ( ) . nodes . len ( )
}
2017-03-13 12:54:56 +01:00
}
2017-04-03 11:13:51 +02:00
2018-04-02 10:47:56 +02:00
pub fn loop_until < F > ( core : & mut Core , timeout : Duration , predicate : F ) where F : Fn ( ) -> bool {
let start = Instant ::now ( ) ;
2017-04-03 11:13:51 +02:00
loop {
2018-04-02 10:47:56 +02:00
core . turn ( Some ( Duration ::from_millis ( 1 ) ) ) ;
2017-04-03 11:13:51 +02:00
if predicate ( ) {
break ;
}
2018-04-02 10:47:56 +02:00
if Instant ::now ( ) - start > timeout {
2017-04-03 11:13:51 +02:00
panic! ( " no result in {:?} " , timeout ) ;
}
}
}
pub fn all_connections_established ( cluster : & Arc < ClusterCore > ) -> bool {
2018-01-10 11:33:45 +01:00
cluster . config ( ) . key_server_set . snapshot ( ) . new_set . keys ( )
2017-04-03 11:13:51 +02:00
. filter ( | p | * p ! = cluster . config ( ) . self_key_pair . public ( ) )
. all ( | p | cluster . connection ( p ) . is_some ( ) )
}
pub fn make_clusters ( core : & Core , ports_begin : u16 , num_nodes : usize ) -> Vec < Arc < ClusterCore > > {
let key_pairs : Vec < _ > = ( 0 .. num_nodes ) . map ( | _ | Random . generate ( ) . unwrap ( ) ) . collect ( ) ;
let cluster_params : Vec < _ > = ( 0 .. num_nodes ) . map ( | i | ClusterConfiguration {
threads : 1 ,
2017-07-25 08:24:54 +02:00
self_key_pair : Arc ::new ( PlainNodeKeyPair ::new ( key_pairs [ i ] . clone ( ) ) ) ,
2017-04-03 11:13:51 +02:00
listen_address : ( " 127.0.0.1 " . to_owned ( ) , ports_begin + i as u16 ) ,
2018-06-14 09:01:52 +02:00
key_server_set : Arc ::new ( MapKeyServerSet ::new ( false , key_pairs . iter ( ) . enumerate ( )
2017-07-24 11:36:31 +02:00
. map ( | ( j , kp ) | ( kp . public ( ) . clone ( ) , format! ( " 127.0.0.1: {} " , ports_begin + j as u16 ) . parse ( ) . unwrap ( ) ) )
. collect ( ) ) ) ,
2017-04-03 11:13:51 +02:00
allow_connecting_to_higher_nodes : false ,
key_storage : Arc ::new ( DummyKeyStorage ::default ( ) ) ,
acl_storage : Arc ::new ( DummyAclStorage ::default ( ) ) ,
2017-10-02 15:27:31 +02:00
admin_public : None ,
2018-01-10 11:33:45 +01:00
auto_migrate_enabled : false ,
2017-04-03 11:13:51 +02:00
} ) . collect ( ) ;
let clusters : Vec < _ > = cluster_params . into_iter ( ) . enumerate ( )
. map ( | ( _ , params ) | ClusterCore ::new ( core . handle ( ) , params ) . unwrap ( ) )
. collect ( ) ;
clusters
}
pub fn run_clusters ( clusters : & [ Arc < ClusterCore > ] ) {
for cluster in clusters {
2017-04-25 21:34:03 +02:00
cluster . run_listener ( ) . unwrap ( ) ;
}
for cluster in clusters {
cluster . run_connections ( ) . unwrap ( ) ;
2017-04-03 11:13:51 +02:00
}
}
#[ test ]
fn cluster_connects_to_other_nodes ( ) {
let mut core = Core ::new ( ) . unwrap ( ) ;
let clusters = make_clusters ( & core , 6010 , 3 ) ;
run_clusters ( & clusters ) ;
2018-04-02 10:47:56 +02:00
loop_until ( & mut core , TIMEOUT , | | clusters . iter ( ) . all ( all_connections_established ) ) ;
2017-04-03 11:13:51 +02:00
}
2017-04-25 21:34:03 +02:00
#[ test ]
2017-07-06 14:02:10 +02:00
fn cluster_wont_start_generation_session_if_not_fully_connected ( ) {
2017-04-25 21:34:03 +02:00
let core = Core ::new ( ) . unwrap ( ) ;
let clusters = make_clusters ( & core , 6013 , 3 ) ;
clusters [ 0 ] . run ( ) . unwrap ( ) ;
2018-04-03 16:54:34 +02:00
match clusters [ 0 ] . client ( ) . new_generation_session ( SessionId ::default ( ) , Default ::default ( ) , Default ::default ( ) , 1 ) {
2017-04-25 21:34:03 +02:00
Err ( Error ::NodeDisconnected ) = > ( ) ,
Err ( e ) = > panic! ( " unexpected error {:?} " , e ) ,
_ = > panic! ( " unexpected success " ) ,
}
}
#[ test ]
2017-07-06 14:02:10 +02:00
fn error_in_generation_session_broadcasted_to_all_other_nodes ( ) {
2017-11-02 15:33:11 +01:00
//::logger::init_log();
2017-04-25 21:34:03 +02:00
let mut core = Core ::new ( ) . unwrap ( ) ;
let clusters = make_clusters ( & core , 6016 , 3 ) ;
run_clusters ( & clusters ) ;
2018-04-02 10:47:56 +02:00
loop_until ( & mut core , TIMEOUT , | | clusters . iter ( ) . all ( all_connections_established ) ) ;
2017-04-25 21:34:03 +02:00
2017-07-06 14:02:10 +02:00
// ask one of nodes to produce faulty generation sessions
clusters [ 1 ] . client ( ) . make_faulty_generation_sessions ( ) ;
2017-04-25 21:34:03 +02:00
2017-07-06 14:02:10 +02:00
// start && wait for generation session to fail
2018-04-03 16:54:34 +02:00
let session = clusters [ 0 ] . client ( ) . new_generation_session ( SessionId ::default ( ) , Default ::default ( ) , Default ::default ( ) , 1 ) . unwrap ( ) ;
2018-04-02 10:47:56 +02:00
loop_until ( & mut core , TIMEOUT , | | session . joint_public_and_secret ( ) . is_some ( )
2017-10-02 15:27:31 +02:00
& & clusters [ 0 ] . client ( ) . generation_session ( & SessionId ::default ( ) ) . is_none ( ) ) ;
assert! ( session . joint_public_and_secret ( ) . unwrap ( ) . is_err ( ) ) ;
// check that faulty session is either removed from all nodes, or nonexistent (already removed)
for i in 1 .. 3 {
if let Some ( session ) = clusters [ i ] . client ( ) . generation_session ( & SessionId ::default ( ) ) {
// wait for both session completion && session removal (session completion event is fired
// before session is removed from its own container by cluster)
2018-04-02 10:47:56 +02:00
loop_until ( & mut core , TIMEOUT , | | session . joint_public_and_secret ( ) . is_some ( )
2017-10-02 15:27:31 +02:00
& & clusters [ i ] . client ( ) . generation_session ( & SessionId ::default ( ) ) . is_none ( ) ) ;
assert! ( session . joint_public_and_secret ( ) . unwrap ( ) . is_err ( ) ) ;
}
}
}
#[ test ]
fn generation_session_completion_signalled_if_failed_on_master ( ) {
//::logger::init_log();
let mut core = Core ::new ( ) . unwrap ( ) ;
2017-10-04 21:12:45 +02:00
let clusters = make_clusters ( & core , 6025 , 3 ) ;
2017-10-02 15:27:31 +02:00
run_clusters ( & clusters ) ;
2018-04-02 10:47:56 +02:00
loop_until ( & mut core , TIMEOUT , | | clusters . iter ( ) . all ( all_connections_established ) ) ;
2017-10-02 15:27:31 +02:00
// ask one of nodes to produce faulty generation sessions
clusters [ 0 ] . client ( ) . make_faulty_generation_sessions ( ) ;
// start && wait for generation session to fail
2018-04-03 16:54:34 +02:00
let session = clusters [ 0 ] . client ( ) . new_generation_session ( SessionId ::default ( ) , Default ::default ( ) , Default ::default ( ) , 1 ) . unwrap ( ) ;
2018-04-02 10:47:56 +02:00
loop_until ( & mut core , TIMEOUT , | | session . joint_public_and_secret ( ) . is_some ( )
2017-10-02 15:27:31 +02:00
& & clusters [ 0 ] . client ( ) . generation_session ( & SessionId ::default ( ) ) . is_none ( ) ) ;
2017-07-06 14:02:10 +02:00
assert! ( session . joint_public_and_secret ( ) . unwrap ( ) . is_err ( ) ) ;
2017-04-25 21:34:03 +02:00
// check that faulty session is either removed from all nodes, or nonexistent (already removed)
for i in 1 .. 3 {
2017-07-06 14:02:10 +02:00
if let Some ( session ) = clusters [ i ] . client ( ) . generation_session ( & SessionId ::default ( ) ) {
2017-10-02 15:27:31 +02:00
// wait for both session completion && session removal (session completion event is fired
// before session is removed from its own container by cluster)
2018-04-02 10:47:56 +02:00
loop_until ( & mut core , TIMEOUT , | | session . joint_public_and_secret ( ) . is_some ( )
2017-10-02 15:27:31 +02:00
& & clusters [ i ] . client ( ) . generation_session ( & SessionId ::default ( ) ) . is_none ( ) ) ;
2017-07-06 14:02:10 +02:00
assert! ( session . joint_public_and_secret ( ) . unwrap ( ) . is_err ( ) ) ;
2017-04-25 21:34:03 +02:00
}
}
}
#[ test ]
2017-07-06 14:02:10 +02:00
fn generation_session_is_removed_when_succeeded ( ) {
2017-09-14 19:29:01 +02:00
//::logger::init_log();
2017-04-25 21:34:03 +02:00
let mut core = Core ::new ( ) . unwrap ( ) ;
let clusters = make_clusters ( & core , 6019 , 3 ) ;
run_clusters ( & clusters ) ;
2018-04-02 10:47:56 +02:00
loop_until ( & mut core , TIMEOUT , | | clusters . iter ( ) . all ( all_connections_established ) ) ;
2017-04-25 21:34:03 +02:00
2017-07-06 14:02:10 +02:00
// start && wait for generation session to complete
2018-04-03 16:54:34 +02:00
let session = clusters [ 0 ] . client ( ) . new_generation_session ( SessionId ::default ( ) , Default ::default ( ) , Default ::default ( ) , 1 ) . unwrap ( ) ;
2018-04-02 10:47:56 +02:00
loop_until ( & mut core , TIMEOUT , | | ( session . state ( ) = = GenerationSessionState ::Finished
2017-10-02 15:27:31 +02:00
| | session . state ( ) = = GenerationSessionState ::Failed )
& & clusters [ 0 ] . client ( ) . generation_session ( & SessionId ::default ( ) ) . is_none ( ) ) ;
2017-07-06 14:02:10 +02:00
assert! ( session . joint_public_and_secret ( ) . unwrap ( ) . is_ok ( ) ) ;
2017-04-25 21:34:03 +02:00
2018-10-15 17:02:09 +02:00
// check that on non-master nodes session is either:
// already removed
// or it is removed right after completion
2017-04-25 21:34:03 +02:00
for i in 1 .. 3 {
2017-07-06 14:02:10 +02:00
if let Some ( session ) = clusters [ i ] . client ( ) . generation_session ( & SessionId ::default ( ) ) {
2018-10-15 17:02:09 +02:00
// run to completion if completion message is still on the way
// AND check that it is actually removed from cluster sessions
2018-04-02 10:47:56 +02:00
loop_until ( & mut core , TIMEOUT , | | ( session . state ( ) = = GenerationSessionState ::Finished
2017-10-02 15:27:31 +02:00
| | session . state ( ) = = GenerationSessionState ::Failed )
& & clusters [ i ] . client ( ) . generation_session ( & SessionId ::default ( ) ) . is_none ( ) ) ;
2017-04-25 21:34:03 +02:00
}
}
}
2017-11-02 15:33:11 +01:00
#[ test ]
fn sessions_are_removed_when_initialization_fails ( ) {
let mut core = Core ::new ( ) . unwrap ( ) ;
let clusters = make_clusters ( & core , 6022 , 3 ) ;
run_clusters ( & clusters ) ;
2018-04-02 10:47:56 +02:00
loop_until ( & mut core , TIMEOUT , | | clusters . iter ( ) . all ( all_connections_established ) ) ;
2017-11-02 15:33:11 +01:00
// generation session
{
// try to start generation session => fail in initialization
2018-04-03 16:54:34 +02:00
assert_eq! ( clusters [ 0 ] . client ( ) . new_generation_session ( SessionId ::default ( ) , Default ::default ( ) , Default ::default ( ) , 100 ) . map ( | _ | ( ) ) ,
2018-05-01 15:02:14 +02:00
Err ( Error ::NotEnoughNodesForThreshold ) ) ;
2017-11-02 15:33:11 +01:00
// try to start generation session => fails in initialization
2018-04-03 16:54:34 +02:00
assert_eq! ( clusters [ 0 ] . client ( ) . new_generation_session ( SessionId ::default ( ) , Default ::default ( ) , Default ::default ( ) , 100 ) . map ( | _ | ( ) ) ,
2018-05-01 15:02:14 +02:00
Err ( Error ::NotEnoughNodesForThreshold ) ) ;
2018-01-10 13:35:18 +01:00
2017-11-02 15:33:11 +01:00
assert! ( clusters [ 0 ] . data . sessions . generation_sessions . is_empty ( ) ) ;
}
// decryption session
{
// try to start decryption session => fails in initialization
2018-04-03 16:54:34 +02:00
assert_eq! ( clusters [ 0 ] . client ( ) . new_decryption_session ( Default ::default ( ) , Default ::default ( ) , Default ::default ( ) , Some ( Default ::default ( ) ) , false , false ) . map ( | _ | ( ) ) ,
2017-11-02 15:33:11 +01:00
Err ( Error ::InvalidMessage ) ) ;
// try to start generation session => fails in initialization
2018-04-03 16:54:34 +02:00
assert_eq! ( clusters [ 0 ] . client ( ) . new_decryption_session ( Default ::default ( ) , Default ::default ( ) , Default ::default ( ) , Some ( Default ::default ( ) ) , false , false ) . map ( | _ | ( ) ) ,
2017-11-02 15:33:11 +01:00
Err ( Error ::InvalidMessage ) ) ;
assert! ( clusters [ 0 ] . data . sessions . decryption_sessions . is_empty ( ) ) ;
assert! ( clusters [ 0 ] . data . sessions . negotiation_sessions . is_empty ( ) ) ;
}
}
2018-09-25 15:15:35 +02:00
// test ignored because of
//
// https://github.com/paritytech/parity-ethereum/issues/9635
2017-11-02 15:33:11 +01:00
#[ test ]
2018-09-25 15:15:35 +02:00
#[ ignore ]
2018-03-01 09:59:21 +01:00
fn schnorr_signing_session_completes_if_node_does_not_have_a_share ( ) {
2017-11-02 15:33:11 +01:00
//::logger::init_log();
let mut core = Core ::new ( ) . unwrap ( ) ;
let clusters = make_clusters ( & core , 6028 , 3 ) ;
run_clusters ( & clusters ) ;
2018-04-02 10:47:56 +02:00
loop_until ( & mut core , TIMEOUT , | | clusters . iter ( ) . all ( all_connections_established ) ) ;
2017-11-02 15:33:11 +01:00
// start && wait for generation session to complete
2018-04-03 16:54:34 +02:00
let session = clusters [ 0 ] . client ( ) . new_generation_session ( SessionId ::default ( ) , Default ::default ( ) , Default ::default ( ) , 1 ) . unwrap ( ) ;
2018-04-02 10:47:56 +02:00
loop_until ( & mut core , TIMEOUT , | | ( session . state ( ) = = GenerationSessionState ::Finished
2017-11-02 15:33:11 +01:00
| | session . state ( ) = = GenerationSessionState ::Failed )
& & clusters [ 0 ] . client ( ) . generation_session ( & SessionId ::default ( ) ) . is_none ( ) ) ;
assert! ( session . joint_public_and_secret ( ) . unwrap ( ) . is_ok ( ) ) ;
// now remove share from node2
2017-11-16 17:34:23 +01:00
assert! ( ( 0 .. 3 ) . all ( | i | clusters [ i ] . data . sessions . generation_sessions . is_empty ( ) ) ) ;
2017-11-02 15:33:11 +01:00
clusters [ 2 ] . data . config . key_storage . remove ( & Default ::default ( ) ) . unwrap ( ) ;
// and try to sign message with generated key
let signature = sign ( Random . generate ( ) . unwrap ( ) . secret ( ) , & Default ::default ( ) ) . unwrap ( ) ;
2018-03-19 06:42:40 +01:00
let session0 = clusters [ 0 ] . client ( ) . new_schnorr_signing_session ( Default ::default ( ) , signature . into ( ) , None , Default ::default ( ) ) . unwrap ( ) ;
2018-03-01 09:59:21 +01:00
let session = clusters [ 0 ] . data . sessions . schnorr_signing_sessions . first ( ) . unwrap ( ) ;
2017-11-16 17:34:23 +01:00
2018-04-02 10:47:56 +02:00
loop_until ( & mut core , TIMEOUT , | | session . is_finished ( ) & & ( 0 .. 3 ) . all ( | i |
2018-03-01 09:59:21 +01:00
clusters [ i ] . data . sessions . schnorr_signing_sessions . is_empty ( ) ) ) ;
2017-11-02 15:33:11 +01:00
session0 . wait ( ) . unwrap ( ) ;
2017-11-16 17:34:23 +01:00
// and try to sign message with generated key using node that has no key share
let signature = sign ( Random . generate ( ) . unwrap ( ) . secret ( ) , & Default ::default ( ) ) . unwrap ( ) ;
2018-03-19 06:42:40 +01:00
let session2 = clusters [ 2 ] . client ( ) . new_schnorr_signing_session ( Default ::default ( ) , signature . into ( ) , None , Default ::default ( ) ) . unwrap ( ) ;
2018-03-01 09:59:21 +01:00
let session = clusters [ 2 ] . data . sessions . schnorr_signing_sessions . first ( ) . unwrap ( ) ;
2018-03-19 06:42:40 +01:00
2018-04-02 10:47:56 +02:00
loop_until ( & mut core , TIMEOUT , | | session . is_finished ( ) & & ( 0 .. 3 ) . all ( | i |
2018-03-01 09:59:21 +01:00
clusters [ i ] . data . sessions . schnorr_signing_sessions . is_empty ( ) ) ) ;
2017-11-16 17:34:23 +01:00
session2 . wait ( ) . unwrap ( ) ;
2017-11-02 15:33:11 +01:00
// now remove share from node1
clusters [ 1 ] . data . config . key_storage . remove ( & Default ::default ( ) ) . unwrap ( ) ;
// and try to sign message with generated key
let signature = sign ( Random . generate ( ) . unwrap ( ) . secret ( ) , & Default ::default ( ) ) . unwrap ( ) ;
2018-03-19 06:42:40 +01:00
let session1 = clusters [ 0 ] . client ( ) . new_schnorr_signing_session ( Default ::default ( ) , signature . into ( ) , None , Default ::default ( ) ) . unwrap ( ) ;
2018-03-01 09:59:21 +01:00
let session = clusters [ 0 ] . data . sessions . schnorr_signing_sessions . first ( ) . unwrap ( ) ;
2018-03-19 06:42:40 +01:00
2018-04-02 10:47:56 +02:00
loop_until ( & mut core , TIMEOUT , | | session . is_finished ( ) ) ;
2017-11-02 15:33:11 +01:00
session1 . wait ( ) . unwrap_err ( ) ;
}
2018-03-01 09:59:21 +01:00
2018-09-25 15:15:35 +02:00
// test ignored because of
//
// https://github.com/paritytech/parity-ethereum/issues/9635
2018-03-01 09:59:21 +01:00
#[ test ]
2018-09-25 15:15:35 +02:00
#[ ignore ]
2018-03-01 09:59:21 +01:00
fn ecdsa_signing_session_completes_if_node_does_not_have_a_share ( ) {
//::logger::init_log();
let mut core = Core ::new ( ) . unwrap ( ) ;
let clusters = make_clusters ( & core , 6041 , 4 ) ;
run_clusters ( & clusters ) ;
2018-04-02 10:47:56 +02:00
loop_until ( & mut core , TIMEOUT , | | clusters . iter ( ) . all ( all_connections_established ) ) ;
2018-03-01 09:59:21 +01:00
// start && wait for generation session to complete
2018-04-03 16:54:34 +02:00
let session = clusters [ 0 ] . client ( ) . new_generation_session ( SessionId ::default ( ) , Default ::default ( ) , Default ::default ( ) , 1 ) . unwrap ( ) ;
2018-04-02 10:47:56 +02:00
loop_until ( & mut core , TIMEOUT , | | ( session . state ( ) = = GenerationSessionState ::Finished
2018-03-01 09:59:21 +01:00
| | session . state ( ) = = GenerationSessionState ::Failed )
& & clusters [ 0 ] . client ( ) . generation_session ( & SessionId ::default ( ) ) . is_none ( ) ) ;
assert! ( session . joint_public_and_secret ( ) . unwrap ( ) . is_ok ( ) ) ;
// now remove share from node2
assert! ( ( 0 .. 3 ) . all ( | i | clusters [ i ] . data . sessions . generation_sessions . is_empty ( ) ) ) ;
clusters [ 2 ] . data . config . key_storage . remove ( & Default ::default ( ) ) . unwrap ( ) ;
// and try to sign message with generated key
let signature = sign ( Random . generate ( ) . unwrap ( ) . secret ( ) , & Default ::default ( ) ) . unwrap ( ) ;
2018-03-19 06:42:40 +01:00
let session0 = clusters [ 0 ] . client ( ) . new_ecdsa_signing_session ( Default ::default ( ) , signature . into ( ) , None , H256 ::random ( ) ) . unwrap ( ) ;
2018-03-01 09:59:21 +01:00
let session = clusters [ 0 ] . data . sessions . ecdsa_signing_sessions . first ( ) . unwrap ( ) ;
2018-04-02 10:47:56 +02:00
loop_until ( & mut core , Duration ::from_millis ( 1000 ) , | | session . is_finished ( ) & & ( 0 .. 3 ) . all ( | i |
2018-03-01 09:59:21 +01:00
clusters [ i ] . data . sessions . ecdsa_signing_sessions . is_empty ( ) ) ) ;
session0 . wait ( ) . unwrap ( ) ;
// and try to sign message with generated key using node that has no key share
let signature = sign ( Random . generate ( ) . unwrap ( ) . secret ( ) , & Default ::default ( ) ) . unwrap ( ) ;
2018-03-19 06:42:40 +01:00
let session2 = clusters [ 2 ] . client ( ) . new_ecdsa_signing_session ( Default ::default ( ) , signature . into ( ) , None , H256 ::random ( ) ) . unwrap ( ) ;
2018-03-01 09:59:21 +01:00
let session = clusters [ 2 ] . data . sessions . ecdsa_signing_sessions . first ( ) . unwrap ( ) ;
2018-04-02 10:47:56 +02:00
loop_until ( & mut core , Duration ::from_millis ( 1000 ) , | | session . is_finished ( ) & & ( 0 .. 3 ) . all ( | i |
2018-03-01 09:59:21 +01:00
clusters [ i ] . data . sessions . ecdsa_signing_sessions . is_empty ( ) ) ) ;
session2 . wait ( ) . unwrap ( ) ;
// now remove share from node1
clusters [ 1 ] . data . config . key_storage . remove ( & Default ::default ( ) ) . unwrap ( ) ;
// and try to sign message with generated key
let signature = sign ( Random . generate ( ) . unwrap ( ) . secret ( ) , & Default ::default ( ) ) . unwrap ( ) ;
2018-03-19 06:42:40 +01:00
let session1 = clusters [ 0 ] . client ( ) . new_ecdsa_signing_session ( Default ::default ( ) , signature . into ( ) , None , H256 ::random ( ) ) . unwrap ( ) ;
2018-03-01 09:59:21 +01:00
let session = clusters [ 0 ] . data . sessions . ecdsa_signing_sessions . first ( ) . unwrap ( ) ;
2018-04-02 10:47:56 +02:00
loop_until ( & mut core , Duration ::from_millis ( 1000 ) , | | session . is_finished ( ) ) ;
2018-03-01 09:59:21 +01:00
session1 . wait ( ) . unwrap_err ( ) ;
}
2017-03-13 12:54:56 +01:00
}