diff --git a/ethcore/native_contracts/res/key_server_set.json b/ethcore/native_contracts/res/key_server_set.json index 93f68837a..0fb9502b1 100644 --- a/ethcore/native_contracts/res/key_server_set.json +++ b/ethcore/native_contracts/res/key_server_set.json @@ -1 +1 @@ -[{"constant":true,"inputs":[{"name":"","type":"uint256"}],"name":"keyServersList","outputs":[{"name":"","type":"address"}],"payable":false,"type":"function"},{"constant":false,"inputs":[{"name":"_new","type":"address"}],"name":"setOwner","outputs":[],"payable":false,"type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getKeyServerPublic","outputs":[{"name":"","type":"bytes"}],"payable":false,"type":"function"},{"constant":true,"inputs":[],"name":"getKeyServers","outputs":[{"name":"","type":"address[]"}],"payable":false,"type":"function"},{"constant":true,"inputs":[],"name":"owner","outputs":[{"name":"","type":"address"}],"payable":false,"type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getKeyServerAddress","outputs":[{"name":"","type":"string"}],"payable":false,"type":"function"},{"constant":false,"inputs":[{"name":"keyServer","type":"address"}],"name":"removeKeyServer","outputs":[],"payable":false,"type":"function"},{"constant":false,"inputs":[{"name":"keyServerPublic","type":"bytes"},{"name":"keyServerIp","type":"string"}],"name":"addKeyServer","outputs":[],"payable":false,"type":"function"},{"anonymous":false,"inputs":[{"indexed":false,"name":"keyServer","type":"address"}],"name":"KeyServerAdded","type":"event"},{"anonymous":false,"inputs":[{"indexed":false,"name":"keyServer","type":"address"}],"name":"KeyServerRemoved","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"old","type":"address"},{"indexed":true,"name":"current","type":"address"}],"name":"NewOwner","type":"event"}] \ No newline at end of file +[{"constant":true,"inputs":[],"name":"getMigrationMaster","outputs":[{"name":"","type":"address"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getMigrationKeyServerPublic","outputs":[{"name":"","type":"bytes"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"id","type":"bytes32"}],"name":"startMigration","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getMigrationKeyServerAddress","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"getMigrationId","outputs":[{"name":"","type":"bytes32"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"getNewKeyServers","outputs":[{"name":"","type":"address[]"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"id","type":"bytes32"}],"name":"confirmMigration","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"getMigrationKeyServers","outputs":[{"name":"","type":"address[]"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"isMigrationConfirmed","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"getCurrentKeyServers","outputs":[{"name":"","type":"address[]"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getCurrentKeyServerPublic","outputs":[{"name":"","type":"bytes"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getNewKeyServerAddress","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getCurrentKeyServerAddress","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getNewKeyServerPublic","outputs":[{"name":"","type":"bytes"}],"payable":false,"stateMutability":"view","type":"function"},{"anonymous":false,"inputs":[{"indexed":false,"name":"keyServer","type":"address"}],"name":"KeyServerAdded","type":"event"},{"anonymous":false,"inputs":[{"indexed":false,"name":"keyServer","type":"address"}],"name":"KeyServerRemoved","type":"event"},{"anonymous":false,"inputs":[],"name":"MigrationStarted","type":"event"},{"anonymous":false,"inputs":[],"name":"MigrationCompleted","type":"event"}] \ No newline at end of file diff --git a/parity/cli/mod.rs b/parity/cli/mod.rs index 855f28444..31fa32e2f 100644 --- a/parity/cli/mod.rs +++ b/parity/cli/mod.rs @@ -556,6 +556,10 @@ usage! { "--no-acl-check", "Disable ACL check (useful for test environments).", + FLAG flag_no_secretstore_auto_migrate: (bool) = false, or |c: &Config| otry!(c.secretstore).disable_auto_migrate.clone(), + "--no-secretstore-auto-migrate", + "Do not run servers set change session automatically when servers set changes. This option has no effect when servers set is read from configuration file.", + ARG arg_secretstore_contract: (String) = "none", or |c: &Config| otry!(c.secretstore).service_contract.clone(), "--secretstore-contract=[SOURCE]", "Secret Store Service contract address source: none, registry (contract address is read from registry) or address.", @@ -589,7 +593,7 @@ usage! { "Hex-encoded secret key of this node.", ARG arg_secretstore_admin_public: (Option) = None, or |c: &Config| otry!(c.secretstore).admin_public.clone(), - "--secretstore-admin-public=[PUBLIC]", + "--secretstore-admin=[PUBLIC]", "Hex-encoded public key of secret store administrator.", ["Sealing/Mining options"] @@ -1111,6 +1115,7 @@ struct SecretStore { disable: Option, disable_http: Option, disable_acl_check: Option, + disable_auto_migrate: Option, service_contract: Option, self_secret: Option, admin_public: Option, @@ -1519,9 +1524,11 @@ mod tests { arg_dapps_path: "$HOME/.parity/dapps".into(), flag_no_dapps: false, + // SECRETSTORE flag_no_secretstore: false, flag_no_secretstore_http: false, flag_no_secretstore_acl_check: false, + flag_no_secretstore_auto_migrate: false, arg_secretstore_contract: "none".into(), arg_secretstore_secret: None, arg_secretstore_admin_public: None, @@ -1773,6 +1780,7 @@ mod tests { disable: None, disable_http: None, disable_acl_check: None, + disable_auto_migrate: None, service_contract: None, self_secret: None, admin_public: None, diff --git a/parity/configuration.rs b/parity/configuration.rs index 641f5cffa..fa0b602ef 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -610,6 +610,7 @@ impl Configuration { enabled: self.secretstore_enabled(), http_enabled: self.secretstore_http_enabled(), acl_check_enabled: self.secretstore_acl_check_enabled(), + auto_migrate_enabled: self.secretstore_auto_migrate_enabled(), service_contract_address: self.secretstore_service_contract_address()?, self_secret: self.secretstore_self_secret()?, nodes: self.secretstore_nodes()?, @@ -1088,6 +1089,10 @@ impl Configuration { !self.args.flag_no_secretstore_acl_check } + fn secretstore_auto_migrate_enabled(&self) -> bool { + !self.args.flag_no_secretstore_auto_migrate + } + fn secretstore_service_contract_address(&self) -> Result, String> { Ok(match self.args.arg_secretstore_contract.as_ref() { "none" => None, diff --git a/parity/secretstore.rs b/parity/secretstore.rs index 161042d3c..48e6e5c1c 100644 --- a/parity/secretstore.rs +++ b/parity/secretstore.rs @@ -51,6 +51,8 @@ pub struct Configuration { pub http_enabled: bool, /// Is ACL check enabled. pub acl_check_enabled: bool, + /// Is auto migrate enabled. + pub auto_migrate_enabled: bool, /// Service contract address. pub service_contract_address: Option, /// This node secret. @@ -166,6 +168,7 @@ mod server { })).collect(), allow_connecting_to_higher_nodes: true, admin_public: conf.admin_public, + auto_migrate_enabled: conf.auto_migrate_enabled, }, }; @@ -190,6 +193,7 @@ impl Default for Configuration { enabled: true, http_enabled: true, acl_check_enabled: true, + auto_migrate_enabled: true, service_contract_address: None, self_secret: None, admin_public: None, diff --git a/secret_store/src/key_server.rs b/secret_store/src/key_server.rs index 2908afd64..51f96302b 100644 --- a/secret_store/src/key_server.rs +++ b/secret_store/src/key_server.rs @@ -63,7 +63,7 @@ impl KeyServer for KeyServerImpl {} impl AdminSessionsServer for KeyServerImpl { fn change_servers_set(&self, old_set_signature: RequestSignature, new_set_signature: RequestSignature, new_servers_set: BTreeSet) -> Result<(), Error> { let servers_set_change_session = self.data.lock().cluster - .new_servers_set_change_session(None, new_servers_set, old_set_signature, new_set_signature)?; + .new_servers_set_change_session(None, None, new_servers_set, old_set_signature, new_set_signature)?; servers_set_change_session.as_servers_set_change() .expect("new_servers_set_change_session creates servers_set_change_session; qed") .wait().map_err(Into::into) @@ -164,6 +164,7 @@ impl KeyServerCore { acl_storage: acl_storage, key_storage: key_storage, admin_public: config.admin_public.clone(), + auto_migrate_enabled: config.auto_migrate_enabled, }; let (stop, stopped) = futures::oneshot(); @@ -229,7 +230,7 @@ pub mod tests { impl AdminSessionsServer for DummyKeyServer { fn change_servers_set(&self, _old_set_signature: RequestSignature, _new_set_signature: RequestSignature, _new_servers_set: BTreeSet) -> Result<(), Error> { - unimplemented!() + unimplemented!("test-only") } } @@ -242,25 +243,25 @@ pub mod tests { impl DocumentKeyServer for DummyKeyServer { fn store_document_key(&self, _key_id: &ServerKeyId, _signature: &RequestSignature, _common_point: Public, _encrypted_document_key: Public) -> Result<(), Error> { - unimplemented!() + unimplemented!("test-only") } fn generate_document_key(&self, _key_id: &ServerKeyId, _signature: &RequestSignature, _threshold: usize) -> Result { - unimplemented!() + unimplemented!("test-only") } fn restore_document_key(&self, _key_id: &ServerKeyId, _signature: &RequestSignature) -> Result { - unimplemented!() + unimplemented!("test-only") } fn restore_document_key_shadow(&self, _key_id: &ServerKeyId, _signature: &RequestSignature) -> Result { - unimplemented!() + unimplemented!("test-only") } } impl MessageSigner for DummyKeyServer { fn sign_message(&self, _key_id: &ServerKeyId, _signature: &RequestSignature, _message: MessageHash) -> Result { - unimplemented!() + unimplemented!("test-only") } } @@ -279,6 +280,7 @@ pub mod tests { })).collect(), allow_connecting_to_higher_nodes: false, admin_public: None, + auto_migrate_enabled: false, }).collect(); let key_servers_set: BTreeMap = configs[0].nodes.iter() .map(|(k, a)| (k.clone(), format!("{}:{}", a.address, a.port).parse().unwrap())) @@ -469,6 +471,6 @@ pub mod tests { #[test] fn servers_set_change_session_works_over_network() { - // TODO + // TODO [Test] } } diff --git a/secret_store/src/key_server_cluster/admin_sessions/key_version_negotiation_session.rs b/secret_store/src/key_server_cluster/admin_sessions/key_version_negotiation_session.rs index b1acc5f45..3b931a45b 100644 --- a/secret_store/src/key_server_cluster/admin_sessions/key_version_negotiation_session.rs +++ b/secret_store/src/key_server_cluster/admin_sessions/key_version_negotiation_session.rs @@ -27,7 +27,7 @@ use key_server_cluster::signing_session::SessionImpl as SigningSession; use key_server_cluster::message::{Message, KeyVersionNegotiationMessage, RequestKeyVersions, KeyVersions}; use key_server_cluster::admin_sessions::ShareChangeSessionMeta; -// TODO: optimizations: change sessions so that versions are sent by chunks. +// TODO [Opt]: change sessions so that versions are sent by chunks. /// Number of versions sent in single message. const VERSIONS_PER_MESSAGE: usize = 32; diff --git a/secret_store/src/key_server_cluster/admin_sessions/servers_set_change_session.rs b/secret_store/src/key_server_cluster/admin_sessions/servers_set_change_session.rs index 93bdc27e6..e81259c94 100644 --- a/secret_store/src/key_server_cluster/admin_sessions/servers_set_change_session.rs +++ b/secret_store/src/key_server_cluster/admin_sessions/servers_set_change_session.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::collections::{BTreeSet, BTreeMap}; use std::collections::btree_map::Entry; use parking_lot::{Mutex, Condvar}; +use bigint::hash::H256; use ethkey::{Public, Signature}; use key_server_cluster::{Error, NodeId, SessionId, KeyStorage}; use key_server_cluster::math; @@ -90,6 +91,8 @@ struct SessionCore { pub all_nodes_set: BTreeSet, /// Administrator public key. pub admin_public: Public, + /// Migration id (if this session is a part of auto-migration process). + pub migration_id: Option, /// SessionImpl completion condvar. pub completed: Condvar, } @@ -141,6 +144,8 @@ pub struct SessionParams { pub all_nodes_set: BTreeSet, /// Administrator public key. pub admin_public: Public, + /// Migration id (if this session is a part of auto-migration process). + pub migration_id: Option, } /// Servers set change consensus transport. @@ -149,6 +154,8 @@ struct ServersSetChangeConsensusTransport { id: SessionId, /// Session-level nonce. nonce: u64, + /// Migration id (if part of auto-migration process). + migration_id: Option, /// Cluster. cluster: Arc, } @@ -184,6 +191,7 @@ impl SessionImpl { nonce: params.nonce, all_nodes_set: params.all_nodes_set, admin_public: params.admin_public, + migration_id: params.migration_id, completed: Condvar::new(), }, data: Mutex::new(SessionData { @@ -205,6 +213,11 @@ impl SessionImpl { &self.core.meta.id } + /// Get migration id. + pub fn migration_id(&self) -> Option<&H256> { + self.core.migration_id.as_ref() + } + /// Wait for session completion. pub fn wait(&self) -> Result<(), Error> { Self::wait_session(&self.core.completed, &self.data, None, |data| data.result.clone()) @@ -229,6 +242,7 @@ impl SessionImpl { consensus_transport: ServersSetChangeConsensusTransport { id: self.core.meta.id.clone(), nonce: self.core.nonce, + migration_id: self.core.migration_id.clone(), cluster: self.core.cluster.clone(), }, })?; @@ -296,6 +310,7 @@ impl SessionImpl { consensus_transport: ServersSetChangeConsensusTransport { id: self.core.meta.id.clone(), nonce: self.core.nonce, + migration_id: self.core.migration_id.clone(), cluster: self.core.cluster.clone(), }, })?); @@ -723,7 +738,7 @@ impl SessionImpl { }, sub_session: math::generate_random_scalar()?, key_share: key_share, - result_computer: Arc::new(LargestSupportResultComputer {}), // TODO: optimizations: could use modified Fast version + result_computer: Arc::new(LargestSupportResultComputer {}), // TODO [Opt]: could use modified Fast version transport: ServersSetChangeKeyVersionNegotiationTransport { id: core.meta.id.clone(), nonce: core.nonce, @@ -937,6 +952,7 @@ impl JobTransport for ServersSetChangeConsensusTransport { session: self.id.clone().into(), session_nonce: self.nonce, message: ConsensusMessageWithServersSet::InitializeConsensusSession(InitializeConsensusSessionWithServersSet { + migration_id: self.migration_id.clone().map(Into::into), old_nodes_set: request.old_servers_set.into_iter().map(Into::into).collect(), new_nodes_set: request.new_servers_set.into_iter().map(Into::into).collect(), old_set_signature: request.old_set_signature.into(), @@ -1036,6 +1052,7 @@ pub mod tests { key_storage: key_storage, nonce: 1, admin_public: admin_public, + migration_id: None, }).unwrap() } diff --git a/secret_store/src/key_server_cluster/admin_sessions/sessions_queue.rs b/secret_store/src/key_server_cluster/admin_sessions/sessions_queue.rs index 3c504a516..35adaab68 100644 --- a/secret_store/src/key_server_cluster/admin_sessions/sessions_queue.rs +++ b/secret_store/src/key_server_cluster/admin_sessions/sessions_queue.rs @@ -29,7 +29,7 @@ pub struct SessionsQueue { impl SessionsQueue { /// Create new sessions queue. pub fn new(key_storage: &Arc, unknown_sessions: BTreeSet) -> Self { - // TODO: optimizations: + // TODO [Opt]: // 1) known sessions - change to iter // 2) unknown sesions - request chunk-by-chunk SessionsQueue { diff --git a/secret_store/src/key_server_cluster/admin_sessions/share_add_session.rs b/secret_store/src/key_server_cluster/admin_sessions/share_add_session.rs index 194138eda..4cb70228a 100644 --- a/secret_store/src/key_server_cluster/admin_sessions/share_add_session.rs +++ b/secret_store/src/key_server_cluster/admin_sessions/share_add_session.rs @@ -516,7 +516,7 @@ impl SessionImpl where T: SessionTransport { return Ok(()) } - // TODO: find a way to verificate keys + // TODO [Trust]: find a way to verificate keys Self::complete_session(&self.core, &mut *data) } @@ -600,7 +600,7 @@ impl SessionImpl where T: SessionTransport { return Ok(()) } - // TODO: find a way to verificate keys + // TODO [Trust]: find a way to verificate keys Self::complete_session(core, data) } diff --git a/secret_store/src/key_server_cluster/client_sessions/encryption_session.rs b/secret_store/src/key_server_cluster/client_sessions/encryption_session.rs index ff173a968..22aa8d40e 100644 --- a/secret_store/src/key_server_cluster/client_sessions/encryption_session.rs +++ b/secret_store/src/key_server_cluster/client_sessions/encryption_session.rs @@ -151,8 +151,8 @@ impl SessionImpl { initialization_confirmed: &n == self.node(), }))); - // TODO: id signature is not enough here, as it was already used in key generation - // TODO: there could be situation when some nodes have failed to store encrypted data + // TODO [Sec]: id signature is not enough here, as it was already used in key generation + // TODO [Reliability]: there could be situation when some nodes have failed to store encrypted data // => potential problems during restore. some confirmation step is needed (2pc)? // save encryption data if let Some(mut encrypted_data) = self.encrypted_data.clone() { diff --git a/secret_store/src/key_server_cluster/cluster.rs b/secret_store/src/key_server_cluster/cluster.rs index b530711aa..c32097c82 100644 --- a/secret_store/src/key_server_cluster/cluster.rs +++ b/secret_store/src/key_server_cluster/cluster.rs @@ -41,6 +41,8 @@ use key_server_cluster::key_version_negotiation_session::{SessionImpl as KeyVers IsolatedSessionTransport as KeyVersionNegotiationSessionTransport, ContinueAction}; use key_server_cluster::io::{DeadlineStatus, ReadMessage, SharedTcpStream, read_encrypted_message, WriteMessage, write_encrypted_message}; use key_server_cluster::net::{accept_connection as net_accept_connection, connect as net_connect, Connection as NetConnection}; +use key_server_cluster::connection_trigger::{Maintain, ConnectionTrigger, SimpleConnectionTrigger, ServersSetChangeSessionCreatorConnector}; +use key_server_cluster::connection_trigger_with_migration::ConnectionTriggerWithMigration; /// Maintain interval (seconds). Every MAINTAIN_INTERVAL seconds node: /// 1) checks if connected nodes are responding to KeepAlive messages @@ -56,7 +58,7 @@ const KEEP_ALIVE_SEND_INTERVAL: u64 = 30; const KEEP_ALIVE_DISCONNECT_INTERVAL: u64 = 60; /// Empty future. -type BoxedEmptyFuture = Box + Send>; +pub type BoxedEmptyFuture = Box + Send>; /// Cluster interface for external clients. pub trait ClusterClient: Send + Sync { @@ -73,7 +75,7 @@ pub trait ClusterClient: Send + Sync { /// Start new key version negotiation session. fn new_key_version_negotiation_session(&self, session_id: SessionId) -> Result>, Error>; /// Start new servers set change session. - fn new_servers_set_change_session(&self, session_id: Option, new_nodes_set: BTreeSet, old_set_signature: Signature, new_set_signature: Signature) -> Result, Error>; + fn new_servers_set_change_session(&self, session_id: Option, migration_id: Option, new_nodes_set: BTreeSet, old_set_signature: Signature, new_set_signature: Signature) -> Result, Error>; /// Listen for new generation sessions. fn add_generation_listener(&self, listener: Arc>); @@ -123,6 +125,9 @@ pub struct ClusterConfiguration { pub acl_storage: Arc, /// Administrator public key. pub admin_public: Option, + /// Should key servers set change session should be started when servers set changes. + /// This will only work when servers set is configured using KeyServerSet contract. + pub auto_migrate_enabled: bool, } /// Cluster state. @@ -168,16 +173,21 @@ pub struct ClusterData { pub sessions: ClusterSessions, } -/// Connections that are forming the cluster. +/// Connections that are forming the cluster. Lock order: trigger.lock() -> data.lock(). pub struct ClusterConnections { /// Self node id. pub self_node_id: NodeId, /// All known other key servers. pub key_server_set: Arc, + /// Connections trigger. + pub trigger: Mutex>, + /// Servers set change session creator connector. + pub connector: Arc, /// Connections data. pub data: RwLock, } +#[derive(Default)] /// Cluster connections data. pub struct ClusterConnectionsData { /// Active key servers set. @@ -214,7 +224,8 @@ impl ClusterCore { pub fn new(handle: Handle, config: ClusterConfiguration) -> Result, Error> { let listen_address = make_socket_address(&config.listen_address.0, config.listen_address.1)?; let connections = ClusterConnections::new(&config)?; - let sessions = ClusterSessions::new(&config); + let servers_set_change_creator_connector = connections.connector.clone(); + let sessions = ClusterSessions::new(&config, servers_set_change_creator_connector); let data = ClusterData::new(&handle, config, connections, sessions); Ok(Arc::new(ClusterCore { @@ -349,7 +360,7 @@ impl ClusterCore { Err(err) => { warn!(target: "secretstore_net", "{}: network error '{}' when reading message from node {}", data.self_key_pair.public(), err, connection.node_id()); // close connection - data.connections.remove(connection.node_id(), connection.is_inbound()); + data.connections.remove(data.clone(), connection.node_id(), connection.is_inbound()); Box::new(failed(err)) }, } @@ -362,7 +373,7 @@ impl ClusterCore { for connection in data.connections.active_connections() { let last_message_diff = time::Instant::now() - connection.last_message_time(); if last_message_diff > time::Duration::from_secs(KEEP_ALIVE_DISCONNECT_INTERVAL) { - data.connections.remove(connection.node_id(), connection.is_inbound()); + data.connections.remove(data.clone(), connection.node_id(), connection.is_inbound()); data.sessions.on_connection_timeout(connection.node_id()); } else if last_message_diff > time::Duration::from_secs(KEEP_ALIVE_SEND_INTERVAL) { @@ -373,13 +384,12 @@ impl ClusterCore { /// Try to connect to every disconnected node. fn connect_disconnected_nodes(data: Arc) { - // do not update nodes set if any admin session is active - // this could happen, but will possibly lead to admin session error - // => should be performed later - if data.sessions.admin_sessions.is_empty() { - data.connections.update_nodes_set(); + let r = data.connections.update_nodes_set(data.clone()); + if let Some(r) = r { + data.spawn(r); } + // connect to disconnected nodes for (node_id, node_address) in data.connections.disconnected_nodes() { if data.config.allow_connecting_to_higher_nodes || data.self_key_pair.public() < &node_id { ClusterCore::connect(data.clone(), node_address); @@ -392,7 +402,7 @@ impl ClusterCore { match result { Ok(DeadlineStatus::Meet(Ok(connection))) => { let connection = Connection::new(outbound_addr.is_none(), connection); - if data.connections.insert(connection.clone()) { + if data.connections.insert(data.clone(), connection.clone()) { ClusterCore::process_connection_messages(data.clone(), connection) } else { Box::new(finished(Ok(()))) @@ -433,8 +443,16 @@ impl ClusterCore { .map(|_| ()).unwrap_or_default(), Message::Signing(message) => Self::process_message(&data, &data.sessions.signing_sessions, connection, Message::Signing(message)) .map(|_| ()).unwrap_or_default(), - Message::ServersSetChange(message) => Self::process_message(&data, &data.sessions.admin_sessions, connection, Message::ServersSetChange(message)) - .map(|_| ()).unwrap_or_default(), + Message::ServersSetChange(message) => { + let message = Message::ServersSetChange(message); + let is_initialization_message = message.is_initialization_message(); + let session = Self::process_message(&data, &data.sessions.admin_sessions, connection, message); + if is_initialization_message { + if let Some(session) = session { + data.connections.servers_set_change_creator_connector().set_key_servers_set_change_session(session.clone()); + } + } + } Message::KeyVersionNegotiation(message) => { let session = Self::process_message(&data, &data.sessions.negotiation_sessions, connection, Message::KeyVersionNegotiation(message)); Self::try_continue_session(&data, session); @@ -518,6 +536,7 @@ impl ClusterCore { let creation_data = SC::creation_data_from_message(&message)?; let master = if is_initialization_message { sender.clone() } else { data.self_key_pair.public().clone() }; let cluster = create_cluster_view(data, requires_all_connections(&message))?; + sessions.insert(cluster, master, session_id, Some(message.session_nonce().ok_or(Error::InvalidMessage)?), message.is_exclusive_session_message(), creation_data) }, } @@ -537,7 +556,7 @@ impl ClusterCore { // this is new session => it is not yet in container warn!(target: "secretstore_net", "{}: {} session read error '{}' when requested for session from node {}", data.self_key_pair.public(), S::type_name(), error, sender); - if message.is_initialization_message() { + if !message.is_error_message() { let session_id = message.into_session_id().expect("session_id only fails for cluster messages; only session messages are passed to process_message; qed"); let session_nonce = message.session_nonce().expect("session_nonce only fails for cluster messages; only session messages are passed to process_message; qed"); data.spawn(connection.send_message(SC::make_error_message(session_id, session_nonce, error))); @@ -604,12 +623,21 @@ impl ClusterCore { impl ClusterConnections { pub fn new(config: &ClusterConfiguration) -> Result { - let mut nodes = config.key_server_set.get(); + let mut nodes = config.key_server_set.snapshot().current_set; nodes.remove(config.self_key_pair.public()); + let trigger: Box = match config.auto_migrate_enabled { + false => Box::new(SimpleConnectionTrigger::new(config.key_server_set.clone(), config.self_key_pair.clone(), config.admin_public.clone())), + true if config.admin_public.is_none() => Box::new(ConnectionTriggerWithMigration::new(config.key_server_set.clone(), config.self_key_pair.clone())), + true => return Err(Error::Io("secret store admininstrator public key is specified with auto-migration enabled".into())), // TODO [Refac]: Io -> Internal + }; + let connector = trigger.servers_set_change_creator_connector(); + Ok(ClusterConnections { self_node_id: config.self_key_pair.public().clone(), key_server_set: config.key_server_set.clone(), + trigger: Mutex::new(trigger), + connector: connector, data: RwLock::new(ClusterConnectionsData { nodes: nodes, connections: BTreeMap::new(), @@ -627,39 +655,54 @@ impl ClusterConnections { self.data.read().connections.get(node).cloned() } - pub fn insert(&self, connection: Arc) -> bool { - let mut data = self.data.write(); - if !data.nodes.contains_key(connection.node_id()) { - // incoming connections are checked here - trace!(target: "secretstore_net", "{}: ignoring unknown connection from {} at {}", self.self_node_id, connection.node_id(), connection.node_address()); - debug_assert!(connection.is_inbound()); - return false; - } - if data.connections.contains_key(connection.node_id()) { - // we have already connected to the same node - // the agreement is that node with lower id must establish connection to node with higher id - if (&self.self_node_id < connection.node_id() && connection.is_inbound()) - || (&self.self_node_id > connection.node_id() && !connection.is_inbound()) { + pub fn insert(&self, data: Arc, connection: Arc) -> bool { + { + let mut data = self.data.write(); + if !data.nodes.contains_key(connection.node_id()) { + // incoming connections are checked here + trace!(target: "secretstore_net", "{}: ignoring unknown connection from {} at {}", self.self_node_id, connection.node_id(), connection.node_address()); + debug_assert!(connection.is_inbound()); return false; } + + if data.connections.contains_key(connection.node_id()) { + // we have already connected to the same node + // the agreement is that node with lower id must establish connection to node with higher id + if (&self.self_node_id < connection.node_id() && connection.is_inbound()) + || (&self.self_node_id > connection.node_id() && !connection.is_inbound()) { + return false; + } + } + + let node = connection.node_id().clone(); + trace!(target: "secretstore_net", "{}: inserting connection to {} at {}. Connected to {} of {} nodes", + self.self_node_id, node, connection.node_address(), data.connections.len() + 1, data.nodes.len()); + data.connections.insert(node.clone(), connection.clone()); } - trace!(target: "secretstore_net", "{}: inserting connection to {} at {}. Connected to {} of {} nodes", - self.self_node_id, connection.node_id(), connection.node_address(), data.connections.len() + 1, data.nodes.len()); - data.connections.insert(connection.node_id().clone(), connection); + let maintain_action = self.trigger.lock().on_connection_established(connection.node_id()); + self.maintain_connection_trigger(maintain_action, data); + true } - pub fn remove(&self, node: &NodeId, is_inbound: bool) { - let mut data = self.data.write(); - if let Entry::Occupied(entry) = data.connections.entry(node.clone()) { - if entry.get().is_inbound() != is_inbound { + pub fn remove(&self, data: Arc, node: &NodeId, is_inbound: bool) { + { + let mut data = &mut *self.data.write(); + if let Entry::Occupied(entry) = data.connections.entry(node.clone()) { + if entry.get().is_inbound() != is_inbound { + return; + } + + trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address()); + entry.remove_entry(); + } else { return; } - - trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address()); - entry.remove_entry(); } + + let maintain_action = self.trigger.lock().on_connection_closed(node); + self.maintain_connection_trigger(maintain_action, data); } pub fn connected_nodes(&self) -> BTreeSet { @@ -678,47 +721,25 @@ impl ClusterConnections { .collect() } - pub fn update_nodes_set(&self) { - let mut data = self.data.write(); - let mut new_nodes = self.key_server_set.get(); - // we do not need to connect to self - // + we do not need to try to connect to any other node if we are not the part of a cluster - if new_nodes.remove(&self.self_node_id).is_none() { - new_nodes.clear(); + pub fn servers_set_change_creator_connector(&self) -> Arc { + self.connector.clone() + } + + pub fn update_nodes_set(&self, data: Arc) -> Option { + let maintain_action = self.trigger.lock().on_maintain(); + self.maintain_connection_trigger(maintain_action, data); + None + } + + fn maintain_connection_trigger(&self, maintain_action: Option, data: Arc) { + if maintain_action == Some(Maintain::SessionAndConnections) || maintain_action == Some(Maintain::Session) { + let client = ClusterClientImpl::new(data); + self.trigger.lock().maintain_session(&client); } - - let mut num_added_nodes = 0; - let mut num_removed_nodes = 0; - let mut num_changed_nodes = 0; - - for obsolete_node in data.nodes.keys().cloned().collect::>() { - if !new_nodes.contains_key(&obsolete_node) { - if let Entry::Occupied(entry) = data.connections.entry(obsolete_node) { - trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address()); - entry.remove(); - } - - data.nodes.remove(&obsolete_node); - num_removed_nodes += 1; - } - } - - for (new_node_public, new_node_addr) in new_nodes { - match data.nodes.insert(new_node_public, new_node_addr) { - None => num_added_nodes += 1, - Some(old_node_addr) => if new_node_addr != old_node_addr { - if let Entry::Occupied(entry) = data.connections.entry(new_node_public) { - trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address()); - entry.remove(); - } - num_changed_nodes += 1; - }, - } - } - - if num_added_nodes != 0 || num_removed_nodes != 0 || num_changed_nodes != 0 { - trace!(target: "secretstore_net", "{}: updated nodes set: removed {}, added {}, changed {}. Connected to {} of {} nodes", - self.self_node_id, num_removed_nodes, num_added_nodes, num_changed_nodes, data.connections.len(), data.nodes.len()); + if maintain_action == Some(Maintain::SessionAndConnections) || maintain_action == Some(Maintain::Connections) { + let mut trigger = self.trigger.lock(); + let mut data = self.data.write(); + trigger.maintain_connections(&mut *data); } } } @@ -952,7 +973,7 @@ impl ClusterClient for ClusterClientImpl { Ok(session) } - fn new_servers_set_change_session(&self, session_id: Option, new_nodes_set: BTreeSet, old_set_signature: Signature, new_set_signature: Signature) -> Result, Error> { + fn new_servers_set_change_session(&self, session_id: Option, migration_id: Option, new_nodes_set: BTreeSet, old_set_signature: Signature, new_set_signature: Signature) -> Result, Error> { let mut connected_nodes = self.data.connections.connected_nodes(); connected_nodes.insert(self.data.self_key_pair.public().clone()); @@ -963,12 +984,16 @@ impl ClusterClient for ClusterClientImpl { }; let cluster = create_cluster_view(&self.data, true)?; - let session = self.data.sessions.admin_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id, None, true, Some(AdminSessionCreationData::ServersSetChange))?; + let creation_data = Some(AdminSessionCreationData::ServersSetChange(migration_id, new_nodes_set.clone())); + let session = self.data.sessions.admin_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id, None, true, creation_data)?; let initialization_result = session.as_servers_set_change().expect("servers set change session is created; qed") .initialize(new_nodes_set, old_set_signature, new_set_signature); match initialization_result { - Ok(()) => Ok(session), + Ok(()) => { + self.data.connections.servers_set_change_creator_connector().set_key_servers_set_change_session(session.clone()); + Ok(session) + }, Err(error) => { self.data.sessions.admin_sessions.remove(&session.id()); Err(error) @@ -1042,20 +1067,20 @@ pub mod tests { } impl ClusterClient for DummyClusterClient { - fn cluster_state(&self) -> ClusterState { unimplemented!() } - fn new_generation_session(&self, _session_id: SessionId, _author: Public, _threshold: usize) -> Result, Error> { unimplemented!() } - fn new_encryption_session(&self, _session_id: SessionId, _requestor_signature: Signature, _common_point: Public, _encrypted_point: Public) -> Result, Error> { unimplemented!() } - fn new_decryption_session(&self, _session_id: SessionId, _requestor_signature: Signature, _version: Option, _is_shadow_decryption: bool) -> Result, Error> { unimplemented!() } - fn new_signing_session(&self, _session_id: SessionId, _requestor_signature: Signature, _version: Option, _message_hash: H256) -> Result, Error> { unimplemented!() } - fn new_key_version_negotiation_session(&self, _session_id: SessionId) -> Result>, Error> { unimplemented!() } - fn new_servers_set_change_session(&self, _session_id: Option, _new_nodes_set: BTreeSet, _old_set_signature: Signature, _new_set_signature: Signature) -> Result, Error> { unimplemented!() } + fn cluster_state(&self) -> ClusterState { unimplemented!("test-only") } + fn new_generation_session(&self, _session_id: SessionId, _author: Public, _threshold: usize) -> Result, Error> { unimplemented!("test-only") } + fn new_encryption_session(&self, _session_id: SessionId, _requestor_signature: Signature, _common_point: Public, _encrypted_point: Public) -> Result, Error> { unimplemented!("test-only") } + fn new_decryption_session(&self, _session_id: SessionId, _requestor_signature: Signature, _version: Option, _is_shadow_decryption: bool) -> Result, Error> { unimplemented!("test-only") } + fn new_signing_session(&self, _session_id: SessionId, _requestor_signature: Signature, _version: Option, _message_hash: H256) -> Result, Error> { unimplemented!("test-only") } + fn new_key_version_negotiation_session(&self, _session_id: SessionId) -> Result>, Error> { unimplemented!("test-only") } + fn new_servers_set_change_session(&self, _session_id: Option, _migration_id: Option, _new_nodes_set: BTreeSet, _old_set_signature: Signature, _new_set_signature: Signature) -> Result, Error> { unimplemented!("test-only") } fn add_generation_listener(&self, _listener: Arc>) {} - fn make_faulty_generation_sessions(&self) { unimplemented!() } - fn generation_session(&self, _session_id: &SessionId) -> Option> { unimplemented!() } - fn connect(&self) { unimplemented!() } - fn key_storage(&self) -> Arc { unimplemented!() } + fn make_faulty_generation_sessions(&self) { unimplemented!("test-only") } + fn generation_session(&self, _session_id: &SessionId) -> Option> { unimplemented!("test-only") } + fn connect(&self) { unimplemented!("test-only") } + fn key_storage(&self) -> Arc { unimplemented!("test-only") } } impl DummyCluster { @@ -1128,7 +1153,7 @@ pub mod tests { } pub fn all_connections_established(cluster: &Arc) -> bool { - cluster.config().key_server_set.get().keys() + cluster.config().key_server_set.snapshot().new_set.keys() .filter(|p| *p != cluster.config().self_key_pair.public()) .all(|p| cluster.connection(p).is_some()) } @@ -1146,6 +1171,7 @@ pub mod tests { key_storage: Arc::new(DummyKeyStorage::default()), acl_storage: Arc::new(DummyAclStorage::default()), admin_public: None, + auto_migrate_enabled: false, }).collect(); let clusters: Vec<_> = cluster_params.into_iter().enumerate() .map(|(_, params)| ClusterCore::new(core.handle(), params).unwrap()) diff --git a/secret_store/src/key_server_cluster/cluster_sessions.rs b/secret_store/src/key_server_cluster/cluster_sessions.rs index 3cba0e14c..b5ddf5311 100644 --- a/secret_store/src/key_server_cluster/cluster_sessions.rs +++ b/secret_store/src/key_server_cluster/cluster_sessions.rs @@ -17,12 +17,13 @@ use std::time; use std::sync::{Arc, Weak}; use std::sync::atomic::AtomicBool; -use std::collections::{VecDeque, BTreeMap}; +use std::collections::{VecDeque, BTreeMap, BTreeSet}; use parking_lot::{Mutex, RwLock, Condvar}; use bigint::hash::H256; use ethkey::{Secret, Signature}; use key_server_cluster::{Error, NodeId, SessionId}; use key_server_cluster::cluster::{Cluster, ClusterData, ClusterConfiguration, ClusterView}; +use key_server_cluster::connection_trigger::ServersSetChangeSessionCreatorConnector; use key_server_cluster::message::{self, Message}; use key_server_cluster::generation_session::{SessionImpl as GenerationSessionImpl}; use key_server_cluster::decryption_session::{SessionImpl as DecryptionSessionImpl}; @@ -110,10 +111,10 @@ pub enum AdminSession { /// Administrative session creation data. pub enum AdminSessionCreationData { - /// Share add session. + /// Share add session (key id). ShareAdd(H256), - /// Servers set change session. - ServersSetChange, + /// Servers set change session (block id, new_server_set). + ServersSetChange(Option, BTreeSet), } /// Active sessions on this cluster. @@ -187,7 +188,7 @@ pub enum ClusterSessionsContainerState { impl ClusterSessions { /// Create new cluster sessions container. - pub fn new(config: &ClusterConfiguration) -> Self { + pub fn new(config: &ClusterConfiguration, servers_set_change_session_creator_connector: Arc) -> Self { let container_state = Arc::new(Mutex::new(ClusterSessionsContainerState::Idle)); let creator_core = Arc::new(SessionCreatorCore::new(config)); ClusterSessions { @@ -210,6 +211,7 @@ impl ClusterSessions { }, container_state.clone()), admin_sessions: ClusterSessionsContainer::new(AdminSessionCreator { core: creator_core.clone(), + servers_set_change_session_creator_connector: servers_set_change_session_creator_connector, admin_public: config.admin_public.clone(), }, container_state), creator_core: creator_core, @@ -270,6 +272,7 @@ impl ClusterSessionsContainer where S: ClusterSession, SC: C self.listeners.lock().push(Arc::downgrade(&listener)); } + #[cfg(test)] pub fn is_empty(&self) -> bool { self.sessions.read().is_empty() } @@ -554,6 +557,7 @@ mod tests { use ethkey::{Random, Generator}; use key_server_cluster::{Error, DummyAclStorage, DummyKeyStorage, MapKeyServerSet, PlainNodeKeyPair}; use key_server_cluster::cluster::ClusterConfiguration; + use key_server_cluster::connection_trigger::SimpleServersSetChangeSessionCreatorConnector; use key_server_cluster::cluster::tests::DummyCluster; use super::{ClusterSessions, AdminSessionCreationData}; @@ -568,8 +572,11 @@ mod tests { key_storage: Arc::new(DummyKeyStorage::default()), acl_storage: Arc::new(DummyAclStorage::default()), admin_public: Some(Random.generate().unwrap().public().clone()), + auto_migrate_enabled: false, }; - ClusterSessions::new(&config) + ClusterSessions::new(&config, Arc::new(SimpleServersSetChangeSessionCreatorConnector { + admin_public: Some(Random.generate().unwrap().public().clone()), + })) } #[test] diff --git a/secret_store/src/key_server_cluster/cluster_sessions_creator.rs b/secret_store/src/key_server_cluster/cluster_sessions_creator.rs index be9678fce..90fb09edf 100644 --- a/secret_store/src/key_server_cluster/cluster_sessions_creator.rs +++ b/secret_store/src/key_server_cluster/cluster_sessions_creator.rs @@ -21,6 +21,7 @@ use parking_lot::RwLock; use ethkey::{Public, Signature}; use key_server_cluster::{Error, NodeId, SessionId, AclStorage, KeyStorage, DocumentKeyShare, SessionMeta}; use key_server_cluster::cluster::{Cluster, ClusterConfiguration}; +use key_server_cluster::connection_trigger::ServersSetChangeSessionCreatorConnector; use key_server_cluster::cluster_sessions::{ClusterSession, SessionIdWithSubSession, AdminSession, AdminSessionCreationData}; use key_server_cluster::message::{self, Message, DecryptionMessage, SigningMessage, ConsensusMessageOfShareAdd, ShareAddMessage, ServersSetChangeMessage, ConsensusMessage, ConsensusMessageWithServersSet}; @@ -331,13 +332,18 @@ pub struct AdminSessionCreator { pub core: Arc, /// Administrator public. pub admin_public: Option, + /// Servers set change sessions creator connector. + pub servers_set_change_session_creator_connector: Arc, } impl ClusterSessionCreator for AdminSessionCreator { fn creation_data_from_message(message: &Message) -> Result, Error> { match *message { Message::ServersSetChange(ServersSetChangeMessage::ServersSetChangeConsensusMessage(ref message)) => match &message.message { - &ConsensusMessageWithServersSet::InitializeConsensusSession(_) => Ok(Some(AdminSessionCreationData::ServersSetChange)), + &ConsensusMessageWithServersSet::InitializeConsensusSession(ref message) => Ok(Some(AdminSessionCreationData::ServersSetChange( + message.migration_id.clone().map(Into::into), + message.new_nodes_set.clone().into_iter().map(Into::into).collect() + ))), _ => Err(Error::InvalidMessage), }, Message::ShareAdd(ShareAddMessage::ShareAddConsensusMessage(ref message)) => match &message.message { @@ -358,7 +364,6 @@ impl ClusterSessionCreator for AdminSess fn create(&self, cluster: Arc, master: NodeId, nonce: Option, id: SessionId, creation_data: Option) -> Result, Error> { let nonce = self.core.check_session_nonce(&master, nonce)?; - let admin_public = self.admin_public.clone().ok_or(Error::AccessDenied)?; Ok(Arc::new(match creation_data { Some(AdminSessionCreationData::ShareAdd(version)) => { AdminSession::ShareAdd(ShareAddSessionImpl::new(ShareAddSessionParams { @@ -370,10 +375,13 @@ impl ClusterSessionCreator for AdminSess transport: ShareAddTransport::new(id.clone(), Some(version), nonce, cluster), key_storage: self.core.key_storage.clone(), nonce: nonce, - admin_public: Some(admin_public), + admin_public: Some(self.admin_public.clone().ok_or(Error::AccessDenied)?), })?) }, - Some(AdminSessionCreationData::ServersSetChange) => { + Some(AdminSessionCreationData::ServersSetChange(migration_id, new_nodes_set)) => { + let admin_public = self.servers_set_change_session_creator_connector.admin_public(migration_id.as_ref(), new_nodes_set) + .map_err(|_| Error::AccessDenied)?; + AdminSession::ServersSetChange(ServersSetChangeSessionImpl::new(ServersSetChangeSessionParams { meta: ShareChangeSessionMeta { id: id.clone(), @@ -385,6 +393,7 @@ impl ClusterSessionCreator for AdminSess nonce: nonce, all_nodes_set: cluster.nodes(), admin_public: admin_public, + migration_id: migration_id, })?) }, None => unreachable!("expected to call with non-empty creation data; qed"), diff --git a/secret_store/src/key_server_cluster/connection_trigger.rs b/secret_store/src/key_server_cluster/connection_trigger.rs new file mode 100644 index 000000000..ba9ff99c9 --- /dev/null +++ b/secret_store/src/key_server_cluster/connection_trigger.rs @@ -0,0 +1,367 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::collections::{BTreeSet, BTreeMap}; +use std::collections::btree_map::Entry; +use std::net::SocketAddr; +use std::sync::Arc; +use bigint::hash::H256; +use ethkey::Public; +use key_server_cluster::{KeyServerSet, KeyServerSetSnapshot}; +use key_server_cluster::cluster::{ClusterClient, ClusterConnectionsData}; +use key_server_cluster::cluster_sessions::AdminSession; +use types::all::{Error, NodeId}; +use {NodeKeyPair}; + +#[derive(Debug, Clone, Copy, PartialEq)] +/// Describes which maintain() call is required. +pub enum Maintain { + /// We need to maintain() both connections && session. + SessionAndConnections, + /// Only call maintain_session. + Session, + /// Only call maintain_connections. + Connections, +} + +/// Connection trigger, which executes necessary actions when set of key servers changes. +pub trait ConnectionTrigger: Send + Sync { + /// On maintain interval. + fn on_maintain(&mut self) -> Option; + /// When connection is established. + fn on_connection_established(&mut self, node: &NodeId) -> Option; + /// When connection is closed. + fn on_connection_closed(&mut self, node: &NodeId) -> Option; + /// Maintain active sessions. + fn maintain_session(&mut self, sessions: &ClusterClient); + /// Maintain active connections. + fn maintain_connections(&mut self, connections: &mut ClusterConnectionsData); + /// Return connector for the servers set change session creator. + fn servers_set_change_creator_connector(&self) -> Arc; +} + +/// Servers set change session creator connector. +pub trait ServersSetChangeSessionCreatorConnector: Send + Sync { + /// Get actual administrator public key. For manual-migration configuration it is the pre-configured + /// administrator key. For auto-migration configurations it is the key of actual MigrationSession master node. + fn admin_public(&self, migration_id: Option<&H256>, new_server_set: BTreeSet) -> Result; + /// Set active servers set change session. + fn set_key_servers_set_change_session(&self, session: Arc); +} + +/// Simple connection trigger, which only keeps connections to current_set. +pub struct SimpleConnectionTrigger { + /// Key server set cluster. + key_server_set: Arc, + /// Trigger connections. + connections: TriggerConnections, + /// Servers set change session creator connector. + connector: Arc, +} + +/// Simple Servers set change session creator connector, which will just return +/// pre-configured administartor public when asked. +pub struct SimpleServersSetChangeSessionCreatorConnector { + /// Secret store administrator public key. + pub admin_public: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq)] +/// Action with trigger connections. +pub enum ConnectionsAction { + /// Connect to nodes from old set only. + ConnectToCurrentSet, + /// Connect to nodes from both old and migration sets. + ConnectToCurrentAndMigrationSet, +} + +/// Trigger connections. +pub struct TriggerConnections { + /// This node key pair. + pub self_key_pair: Arc, +} + +impl SimpleConnectionTrigger { + /// Create new simple connection trigger. + pub fn new(key_server_set: Arc, self_key_pair: Arc, admin_public: Option) -> Self { + SimpleConnectionTrigger { + key_server_set: key_server_set, + connections: TriggerConnections { + self_key_pair: self_key_pair, + }, + connector: Arc::new(SimpleServersSetChangeSessionCreatorConnector { + admin_public: admin_public, + }), + } + } +} + +impl ConnectionTrigger for SimpleConnectionTrigger { + fn on_maintain(&mut self) -> Option { + Some(Maintain::Connections) + } + + fn on_connection_established(&mut self, _node: &NodeId) -> Option { + None + } + + fn on_connection_closed(&mut self, _node: &NodeId) -> Option { + // we do not want to reconnect after every connection close + // because it could be a part of something bigger + None + } + + fn maintain_session(&mut self, _sessions: &ClusterClient) { + } + + fn maintain_connections(&mut self, connections: &mut ClusterConnectionsData) { + self.connections.maintain(ConnectionsAction::ConnectToCurrentSet, connections, &self.key_server_set.snapshot()) + } + + fn servers_set_change_creator_connector(&self) -> Arc { + self.connector.clone() + } +} + +impl ServersSetChangeSessionCreatorConnector for SimpleServersSetChangeSessionCreatorConnector { + fn admin_public(&self, _migration_id: Option<&H256>, _new_server_set: BTreeSet) -> Result { + self.admin_public.clone().ok_or(Error::AccessDenied) + } + + fn set_key_servers_set_change_session(&self, _session: Arc) { + } +} + +impl TriggerConnections { + pub fn maintain(&self, action: ConnectionsAction, data: &mut ClusterConnectionsData, server_set: &KeyServerSetSnapshot) { + match action { + ConnectionsAction::ConnectToCurrentSet => { + adjust_connections(self.self_key_pair.public(), data, &server_set.current_set); + }, + ConnectionsAction::ConnectToCurrentAndMigrationSet => { + let mut old_and_migration_set = BTreeMap::new(); + if let Some(migration) = server_set.migration.as_ref() { + old_and_migration_set.extend(migration.set.iter().map(|(node_id, node_addr)| (node_id.clone(), node_addr.clone()))); + } + old_and_migration_set.extend(server_set.current_set.iter().map(|(node_id, node_addr)| (node_id.clone(), node_addr.clone()))); + + adjust_connections(self.self_key_pair.public(), data, &old_and_migration_set); + }, + } + } +} + +fn adjust_connections(self_node_id: &NodeId, data: &mut ClusterConnectionsData, required_set: &BTreeMap) { + if !required_set.contains_key(self_node_id) { + trace!(target: "secretstore_net", "{}: isolated from cluser", self_node_id); + data.connections.clear(); + data.nodes.clear(); + return; + } + + for node_to_disconnect in select_nodes_to_disconnect(&data.nodes, required_set) { + if let Entry::Occupied(entry) = data.connections.entry(node_to_disconnect.clone()) { + trace!(target: "secretstore_net", "{}: removing connection to {} at {}", + self_node_id, entry.get().node_id(), entry.get().node_address()); + entry.remove(); + } + + data.nodes.remove(&node_to_disconnect); + } + + for (node_to_connect, node_addr) in required_set { + if node_to_connect != self_node_id { + data.nodes.insert(node_to_connect.clone(), node_addr.clone()); + } + } +} + +fn select_nodes_to_disconnect(current_set: &BTreeMap, new_set: &BTreeMap) -> Vec { + current_set.iter() + .filter(|&(node_id, node_addr)| match new_set.get(node_id) { + Some(new_node_addr) => node_addr != new_node_addr, + None => true, + }) + .map(|(node_id, _)| node_id.clone()) + .collect() +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeSet; + use std::sync::Arc; + use ethkey::{Random, Generator}; + use key_server_cluster::cluster::ClusterConnectionsData; + use key_server_cluster::{MapKeyServerSet, PlainNodeKeyPair, KeyServerSetSnapshot, KeyServerSetMigration}; + use super::{Maintain, TriggerConnections, ConnectionsAction, ConnectionTrigger, SimpleConnectionTrigger, + select_nodes_to_disconnect, adjust_connections}; + + fn create_connections() -> TriggerConnections { + TriggerConnections { + self_key_pair: Arc::new(PlainNodeKeyPair::new(Random.generate().unwrap())), + } + } + + #[test] + fn do_not_disconnect_if_set_is_not_changed() { + let node_id = Random.generate().unwrap().public().clone(); + assert_eq!(select_nodes_to_disconnect( + &vec![(node_id, "127.0.0.1:8081".parse().unwrap())].into_iter().collect(), + &vec![(node_id, "127.0.0.1:8081".parse().unwrap())].into_iter().collect()), + vec![]); + } + + #[test] + fn disconnect_if_address_has_changed() { + let node_id = Random.generate().unwrap().public().clone(); + assert_eq!(select_nodes_to_disconnect( + &vec![(node_id.clone(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(), + &vec![(node_id.clone(), "127.0.0.1:8082".parse().unwrap())].into_iter().collect()), + vec![node_id.clone()]); + } + + #[test] + fn disconnect_if_node_has_removed() { + let node_id = Random.generate().unwrap().public().clone(); + assert_eq!(select_nodes_to_disconnect( + &vec![(node_id.clone(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(), + &vec![].into_iter().collect()), + vec![node_id.clone()]); + } + + #[test] + fn does_not_disconnect_if_node_has_added() { + let node_id = Random.generate().unwrap().public().clone(); + assert_eq!(select_nodes_to_disconnect( + &vec![(node_id.clone(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(), + &vec![(node_id.clone(), "127.0.0.1:8081".parse().unwrap()), + (Random.generate().unwrap().public().clone(), "127.0.0.1:8082".parse().unwrap())] + .into_iter().collect()), + vec![]); + } + + #[test] + fn adjust_connections_disconnects_from_all_nodes_if_not_a_part_of_key_server() { + let self_node_id = Random.generate().unwrap().public().clone(); + let other_node_id = Random.generate().unwrap().public().clone(); + let mut connection_data: ClusterConnectionsData = Default::default(); + connection_data.nodes.insert(other_node_id.clone(), "127.0.0.1:8081".parse().unwrap()); + + let required_set = connection_data.nodes.clone(); + adjust_connections(&self_node_id, &mut connection_data, &required_set); + assert!(connection_data.nodes.is_empty()); + } + + #[test] + fn adjust_connections_connects_to_new_nodes() { + let self_node_id = Random.generate().unwrap().public().clone(); + let other_node_id = Random.generate().unwrap().public().clone(); + let mut connection_data: ClusterConnectionsData = Default::default(); + + let required_set = vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap()), + (other_node_id.clone(), "127.0.0.1:8082".parse().unwrap())].into_iter().collect(); + adjust_connections(&self_node_id, &mut connection_data, &required_set); + assert!(connection_data.nodes.contains_key(&other_node_id)); + } + + #[test] + fn adjust_connections_reconnects_from_changed_nodes() { + let self_node_id = Random.generate().unwrap().public().clone(); + let other_node_id = Random.generate().unwrap().public().clone(); + let mut connection_data: ClusterConnectionsData = Default::default(); + connection_data.nodes.insert(other_node_id.clone(), "127.0.0.1:8082".parse().unwrap()); + + let required_set = vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap()), + (other_node_id.clone(), "127.0.0.1:8083".parse().unwrap())].into_iter().collect(); + adjust_connections(&self_node_id, &mut connection_data, &required_set); + assert_eq!(connection_data.nodes.get(&other_node_id), Some(&"127.0.0.1:8083".parse().unwrap())); + } + + #[test] + fn adjust_connections_disconnects_from_removed_nodes() { + let self_node_id = Random.generate().unwrap().public().clone(); + let other_node_id = Random.generate().unwrap().public().clone(); + let mut connection_data: ClusterConnectionsData = Default::default(); + connection_data.nodes.insert(other_node_id.clone(), "127.0.0.1:8082".parse().unwrap()); + + let required_set = vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(); + adjust_connections(&self_node_id, &mut connection_data, &required_set); + assert!(connection_data.nodes.is_empty()); + } + + #[test] + fn adjust_connections_does_not_connects_to_self() { + let self_node_id = Random.generate().unwrap().public().clone(); + let mut connection_data: ClusterConnectionsData = Default::default(); + + let required_set = vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(); + adjust_connections(&self_node_id, &mut connection_data, &required_set); + assert!(connection_data.nodes.is_empty()); + } + + #[test] + fn maintain_connects_to_current_set_works() { + let connections = create_connections(); + let self_node_id = connections.self_key_pair.public().clone(); + let current_node_id = Random.generate().unwrap().public().clone(); + let migration_node_id = Random.generate().unwrap().public().clone(); + let new_node_id = Random.generate().unwrap().public().clone(); + + let mut connections_data: ClusterConnectionsData = Default::default(); + connections.maintain(ConnectionsAction::ConnectToCurrentSet, &mut connections_data, &KeyServerSetSnapshot { + current_set: vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap()), + (current_node_id.clone(), "127.0.0.1:8082".parse().unwrap())].into_iter().collect(), + new_set: vec![(new_node_id.clone(), "127.0.0.1:8083".parse().unwrap())].into_iter().collect(), + migration: Some(KeyServerSetMigration { + set: vec![(migration_node_id.clone(), "127.0.0.1:8084".parse().unwrap())].into_iter().collect(), + ..Default::default() + }), + }); + + assert_eq!(vec![current_node_id], connections_data.nodes.keys().cloned().collect::>()); + } + + #[test] + fn maintain_connects_to_current_and_migration_set_works() { + let connections = create_connections(); + let self_node_id = connections.self_key_pair.public().clone(); + let current_node_id = Random.generate().unwrap().public().clone(); + let migration_node_id = Random.generate().unwrap().public().clone(); + let new_node_id = Random.generate().unwrap().public().clone(); + + let mut connections_data: ClusterConnectionsData = Default::default(); + connections.maintain(ConnectionsAction::ConnectToCurrentAndMigrationSet, &mut connections_data, &KeyServerSetSnapshot { + current_set: vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap()), + (current_node_id.clone(), "127.0.0.1:8082".parse().unwrap())].into_iter().collect(), + new_set: vec![(new_node_id.clone(), "127.0.0.1:8083".parse().unwrap())].into_iter().collect(), + migration: Some(KeyServerSetMigration { + set: vec![(migration_node_id.clone(), "127.0.0.1:8084".parse().unwrap())].into_iter().collect(), + ..Default::default() + }), + }); + + assert_eq!(vec![current_node_id, migration_node_id].into_iter().collect::>(), + connections_data.nodes.keys().cloned().collect::>()); + } + + #[test] + fn simple_connections_trigger_only_maintains_connections() { + let key_server_set = Arc::new(MapKeyServerSet::new(Default::default())); + let self_key_pair = Arc::new(PlainNodeKeyPair::new(Random.generate().unwrap())); + let mut trigger = SimpleConnectionTrigger::new(key_server_set, self_key_pair, None); + assert_eq!(trigger.on_maintain(), Some(Maintain::Connections)); + } +} diff --git a/secret_store/src/key_server_cluster/connection_trigger_with_migration.rs b/secret_store/src/key_server_cluster/connection_trigger_with_migration.rs new file mode 100644 index 000000000..cb9525742 --- /dev/null +++ b/secret_store/src/key_server_cluster/connection_trigger_with_migration.rs @@ -0,0 +1,766 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::collections::{BTreeSet, BTreeMap}; +use std::net::SocketAddr; +use std::sync::Arc; +use bigint::hash::H256; +use ethkey::Public; +use parking_lot::Mutex; +use key_server_cluster::{KeyServerSet, KeyServerSetSnapshot, KeyServerSetMigration, is_migration_required}; +use key_server_cluster::cluster::{ClusterClient, ClusterConnectionsData}; +use key_server_cluster::cluster_sessions::{AdminSession, ClusterSession}; +use key_server_cluster::jobs::servers_set_change_access_job::ordered_nodes_hash; +use key_server_cluster::connection_trigger::{Maintain, ConnectionsAction, ConnectionTrigger, + ServersSetChangeSessionCreatorConnector, TriggerConnections}; +use types::all::{Error, NodeId}; +use {NodeKeyPair}; + +/// Key servers set change trigger with automated migration procedure. +pub struct ConnectionTriggerWithMigration { + /// This node key pair. + self_key_pair: Arc, + /// Key server set. + key_server_set: Arc, + /// Last server set state. + snapshot: KeyServerSetSnapshot, + /// Required connections action. + connections_action: Option, + /// Required session action. + session_action: Option, + /// Currenty connected nodes. + connected: BTreeSet, + /// Trigger migration connections. + connections: TriggerConnections, + /// Trigger migration session. + session: TriggerSession, +} + +#[derive(Default)] +/// Key servers set change session creator connector with migration support. +pub struct ServersSetChangeSessionCreatorConnectorWithMigration { + /// This node id. + self_node_id: NodeId, + /// Active migration state to check when servers set change session is started. + migration: Mutex>, + /// Active servers set change session. + session: Mutex>>, +} + +#[derive(Debug, Clone, Copy, PartialEq)] +/// Migration session action. +enum SessionAction { + /// Start migration (confirm migration transaction). + StartMigration(H256), + /// Start migration session. + Start, + /// Confirm migration and forget migration session. + ConfirmAndDrop(H256), + /// Forget migration session. + Drop, + /// Forget migration session and retry. + DropAndRetry, +} + +#[derive(Debug, Clone, Copy, PartialEq)] +/// Migration session state. +enum SessionState { + /// No active session. + Idle, + /// Session is running with given migration id. + Active(Option), + /// Session is completed successfully. + Finished(Option), + /// Session is completed with an error. + Failed(Option), +} + +#[derive(Debug, Clone, Copy, PartialEq)] +/// Migration state. +pub enum MigrationState { + /// No migration required. + Idle, + /// Migration is required. + Required, + /// Migration has started. + Started, +} + +/// Migration session. +struct TriggerSession { + /// Servers set change session creator connector. + connector: Arc, + /// This node key pair. + self_key_pair: Arc, + /// Key server set. + key_server_set: Arc, +} + +impl ConnectionTriggerWithMigration { + /// Create new trigge with migration. + pub fn new(key_server_set: Arc, self_key_pair: Arc) -> Self { + let snapshot = key_server_set.snapshot(); + let migration = snapshot.migration.clone(); + + ConnectionTriggerWithMigration { + self_key_pair: self_key_pair.clone(), + key_server_set: key_server_set.clone(), + snapshot: snapshot, + connected: BTreeSet::new(), + connections: TriggerConnections { + self_key_pair: self_key_pair.clone(), + }, + session: TriggerSession { + connector: Arc::new(ServersSetChangeSessionCreatorConnectorWithMigration { + self_node_id: self_key_pair.public().clone(), + migration: Mutex::new(migration), + session: Mutex::new(None), + }), + self_key_pair: self_key_pair, + key_server_set: key_server_set, + }, + connections_action: None, + session_action: None, + } + } + + /// Actually do mainteinance. + fn do_maintain(&mut self) -> Option { + loop { + let session_state = session_state(self.session.connector.session.lock().clone()); + let migration_state = migration_state(self.self_key_pair.public(), &self.snapshot); + + let session_action = maintain_session(self.self_key_pair.public(), &self.connected, &self.snapshot, migration_state, session_state); + let session_maintain_required = session_action.map(|session_action| + self.session.process(session_action)).unwrap_or_default(); + self.session_action = session_action; + + let connections_action = maintain_connections(migration_state, session_state); + let connections_maintain_required = connections_action.map(|_| true).unwrap_or_default(); + self.connections_action = connections_action; + + if session_state != SessionState::Idle || migration_state != MigrationState::Idle { + trace!(target: "secretstore_net", "{}: non-idle auto-migration state: {:?} -> {:?}", + self.self_key_pair.public(), (migration_state, session_state), (self.connections_action, self.session_action)); + } + + if session_action != Some(SessionAction::DropAndRetry) { + return match (session_maintain_required, connections_maintain_required) { + (true, true) => Some(Maintain::SessionAndConnections), + (true, false) => Some(Maintain::Session), + (false, true) => Some(Maintain::Connections), + (false, false) => None, + }; + } + } + } +} + +impl ConnectionTrigger for ConnectionTriggerWithMigration { + fn on_maintain(&mut self) -> Option { + self.snapshot = self.key_server_set.snapshot(); + *self.session.connector.migration.lock() = self.snapshot.migration.clone(); + + self.do_maintain() + } + + fn on_connection_established(&mut self, node: &NodeId) -> Option { + self.connected.insert(node.clone()); + self.do_maintain() + } + + fn on_connection_closed(&mut self, node: &NodeId) -> Option { + self.connected.remove(node); + self.do_maintain() + } + + fn maintain_session(&mut self, sessions: &ClusterClient) { + if let Some(action) = self.session_action { + self.session.maintain(action, sessions, &self.snapshot); + } + } + + fn maintain_connections(&mut self, connections: &mut ClusterConnectionsData) { + if let Some(action) = self.connections_action { + self.connections.maintain(action, connections, &self.snapshot); + } + } + + fn servers_set_change_creator_connector(&self) -> Arc { + self.session.connector.clone() + } +} + +impl ServersSetChangeSessionCreatorConnector for ServersSetChangeSessionCreatorConnectorWithMigration { + fn admin_public(&self, migration_id: Option<&H256>, new_server_set: BTreeSet) -> Result { + // the idea is that all nodes are agreed upon a block number and a new set of nodes in this block + // then master node is selected of all nodes set && this master signs the old set && new set + // (signatures are inputs to ServerSetChangeSession) + self.migration.lock().as_ref() + .map(|migration| { + let is_migration_id_same = migration_id.map(|mid| mid == &migration.id).unwrap_or_default(); + let is_migration_set_same = new_server_set == migration.set.keys().cloned().collect(); + if is_migration_id_same && is_migration_set_same { + Ok(migration.master.clone()) + } else { + warn!(target: "secretstore_net", "{}: failed to accept auto-migration session: same_migration_id={}, same_migration_set={}", + self.self_node_id, is_migration_id_same, is_migration_set_same); + + Err(Error::AccessDenied) + } + }) + .unwrap_or_else(|| { + warn!(target: "secretstore_net", "{}: failed to accept non-scheduled auto-migration session", self.self_node_id); + Err(Error::AccessDenied) + }) + } + + fn set_key_servers_set_change_session(&self, session: Arc) { + *self.session.lock() = Some(session); + } +} + +impl TriggerSession { + /// Process session action. + pub fn process(&mut self, action: SessionAction) -> bool { + match action { + SessionAction::ConfirmAndDrop(migration_id) => { + *self.connector.session.lock() = None; + self.key_server_set.confirm_migration(migration_id); + false + }, + SessionAction::Drop | SessionAction::DropAndRetry => { + *self.connector.session.lock() = None; + false + }, + SessionAction::StartMigration(migration_id) => { + self.key_server_set.start_migration(migration_id); + false + }, + SessionAction::Start => true, + } + } + + /// Maintain session. + pub fn maintain(&mut self, action: SessionAction, sessions: &ClusterClient, server_set: &KeyServerSetSnapshot) { + if action == SessionAction::Start { // all other actions are processed in maintain + let migration = server_set.migration.as_ref() + .expect("action is Start only when migration is started (see maintain_session); qed"); + + let old_set: BTreeSet<_> = server_set.current_set.keys() + .chain(migration.set.keys()) + .cloned().collect(); + let new_set: BTreeSet<_> = migration.set.keys() + .cloned() + .collect(); + + let signatures = self.self_key_pair.sign(&ordered_nodes_hash(&old_set)) + .and_then(|old_set_signature| self.self_key_pair.sign(&ordered_nodes_hash(&new_set)) + .map(|new_set_signature| (old_set_signature, new_set_signature))) + .map_err(Into::into); + let session = signatures.and_then(|(old_set_signature, new_set_signature)| + sessions.new_servers_set_change_session(None, Some(migration.id.clone()), new_set, old_set_signature, new_set_signature)); + + match session { + Ok(_) => trace!(target: "secretstore_net", "{}: started auto-migrate session", + self.self_key_pair.public()), + Err(err) => trace!(target: "secretstore_net", "{}: failed to start auto-migrate session with: {}", + self.self_key_pair.public(), err), + } + } + } +} + +fn migration_state(self_node_id: &NodeId, snapshot: &KeyServerSetSnapshot) -> MigrationState { + // if this node is not on current && old set => we do not participate in migration + if !snapshot.current_set.contains_key(self_node_id) && + !snapshot.migration.as_ref().map(|s| s.set.contains_key(self_node_id)).unwrap_or_default() { + return MigrationState::Idle; + } + + // if migration has already started no other states possible + if snapshot.migration.is_some() { + return MigrationState::Started; + } + + // we only require migration if set actually changes + // when only address changes, we could simply adjust connections + if !is_migration_required(&snapshot.current_set, &snapshot.new_set) { + return MigrationState::Idle; + } + + return MigrationState::Required; +} + +fn session_state(session: Option>) -> SessionState { + session + .and_then(|s| match s.as_servers_set_change() { + Some(s) if !s.is_finished() => Some(SessionState::Active(s.migration_id().cloned())), + Some(s) => match s.wait() { + Ok(_) => Some(SessionState::Finished(s.migration_id().cloned())), + Err(_) => Some(SessionState::Failed(s.migration_id().cloned())), + }, + None => None, + }) + .unwrap_or(SessionState::Idle) +} + +fn maintain_session(self_node_id: &NodeId, connected: &BTreeSet, snapshot: &KeyServerSetSnapshot, migration_state: MigrationState, session_state: SessionState) -> Option { + let migration_data_proof = "migration_state is Started; migration data available when started; qed"; + + match (migration_state, session_state) { + // === NORMAL combinations === + + // having no session when it is not required => ok + (MigrationState::Idle, SessionState::Idle) => None, + // migration is required && no active session => start migration + (MigrationState::Required, SessionState::Idle) => { + match select_master_node(snapshot) == self_node_id { + true => Some(SessionAction::StartMigration(H256::random())), + // we are not on master node + false => None, + } + }, + // migration is active && there's no active session => start it + (MigrationState::Started, SessionState::Idle) => { + match is_connected_to_all_nodes(self_node_id, &snapshot.current_set, connected) && + is_connected_to_all_nodes(self_node_id, &snapshot.migration.as_ref().expect(migration_data_proof).set, connected) && + select_master_node(snapshot) == self_node_id { + true => Some(SessionAction::Start), + // we are not connected to all required nodes yet or we are not on master node => wait for it + false => None, + } + }, + // migration is active && session is not yet started/finished => ok + (MigrationState::Started, SessionState::Active(ref session_migration_id)) + if snapshot.migration.as_ref().expect(migration_data_proof).id == session_migration_id.unwrap_or_default() => + None, + // migration has finished => confirm migration + (MigrationState::Started, SessionState::Finished(ref session_migration_id)) + if snapshot.migration.as_ref().expect(migration_data_proof).id == session_migration_id.unwrap_or_default() => + match snapshot.migration.as_ref().expect(migration_data_proof).set.contains_key(self_node_id) { + true => Some(SessionAction::ConfirmAndDrop( + snapshot.migration.as_ref().expect(migration_data_proof).id.clone() + )), + // we are not on migration set => we do not need to confirm + false => Some(SessionAction::Drop), + }, + // migration has failed => it should be dropped && restarted later + (MigrationState::Started, SessionState::Failed(ref session_migration_id)) + if snapshot.migration.as_ref().expect(migration_data_proof).id == session_migration_id.unwrap_or_default() => + Some(SessionAction::Drop), + + // ABNORMAL combinations, which are still possible when contract misbehaves === + + // having active session when it is not required => drop it && wait for other tasks + (MigrationState::Idle, SessionState::Active(_)) | + // no migration required && there's finished session => drop it && wait for other tasks + (MigrationState::Idle, SessionState::Finished(_)) | + // no migration required && there's failed session => drop it && wait for other tasks + (MigrationState::Idle, SessionState::Failed(_)) | + // migration is required && session is active => drop it && wait for other tasks + (MigrationState::Required, SessionState::Active(_)) | + // migration is required && session has failed => we need to forget this obsolete session and retry + (MigrationState::Required, SessionState::Finished(_)) | + // session for other migration is active => we need to forget this obsolete session and retry + // (the case for same id is checked above) + (MigrationState::Started, SessionState::Active(_)) | + // session for other migration has finished => we need to forget this obsolete session and retry + // (the case for same id is checked above) + (MigrationState::Started, SessionState::Finished(_)) | + // session for other migration has failed => we need to forget this obsolete session and retry + // (the case for same id is checked above) + (MigrationState::Started, SessionState::Failed(_)) | + // migration is required && session has failed => we need to forget this obsolete session and retry + (MigrationState::Required, SessionState::Failed(_)) => { + // some of the cases above could happen because of lags (could actually be a non-abnormal behavior) + // => we ony trace here + trace!(target: "secretstore_net", "{}: suspicious auto-migration state: {:?}", + self_node_id, (migration_state, session_state)); + Some(SessionAction::DropAndRetry) + }, + } +} + +fn maintain_connections(migration_state: MigrationState, session_state: SessionState) -> Option { + match (migration_state, session_state) { + // session is active => we do not alter connections when session is active + (_, SessionState::Active(_)) => None, + // when no migration required => we just keep us connected to old nodes set + (MigrationState::Idle, _) => Some(ConnectionsAction::ConnectToCurrentSet), + // when migration is either scheduled, or in progress => connect to both old and migration set. + // this could lead to situation when node is not 'officially' a part of KeyServer (i.e. it is not in current_set) + // but it participates in new key generation session + // it is ok, since 'officialy' here means that this node is a owner of all old shares + (MigrationState::Required, _) | + (MigrationState::Started, _) => Some(ConnectionsAction::ConnectToCurrentAndMigrationSet), + } +} + +fn is_connected_to_all_nodes(self_node_id: &NodeId, nodes: &BTreeMap, connected: &BTreeSet) -> bool { + nodes.keys() + .filter(|n| *n != self_node_id) + .all(|n| connected.contains(n)) +} + +fn select_master_node(snapshot: &KeyServerSetSnapshot) -> &NodeId { + // we want to minimize a number of UnknownSession messages => + // try to select a node which was in SS && will be in SS + match snapshot.migration.as_ref() { + Some(migration) => &migration.master, + None => snapshot.current_set.keys() + .filter(|n| snapshot.new_set.contains_key(n)) + .nth(0) + .or_else(|| snapshot.new_set.keys().nth(0)) + .unwrap_or_else(|| snapshot.current_set.keys().nth(0) + .expect("select_master_node is only called when migration is Required or Started;\ + when Started: migration.is_some() && we return migration.master; qed;\ + when Required: current_set != new_set; this means that at least one set is non-empty; we try to take node from each set; qed")) + } + /*server_set_state.migration.as_ref() + .map(|m| &m.master) + .unwrap_or_else(|| server_set_state.current_set.keys() + .filter(|n| server_set_state.new_set.contains_key(n)) + .nth(0) + .or_else(|| server_set_state.new_set.keys().nth(0))) + .expect("select_master_node is only called when migration is Required or Started;" + "when Started: migration.is_some() && we have migration.master; qed" + "when Required: current_set != migration_set; this means that at least one set is non-empty; we select")*/ +} + +#[cfg(test)] +mod tests { + use key_server_cluster::{KeyServerSetSnapshot, KeyServerSetMigration}; + use key_server_cluster::connection_trigger::ConnectionsAction; + use super::{MigrationState, SessionState, SessionAction, migration_state, maintain_session, + maintain_connections, select_master_node}; + + #[test] + fn migration_state_is_idle_when_required_but_this_node_is_not_on_the_list() { + assert_eq!(migration_state(&1.into(), &KeyServerSetSnapshot { + current_set: vec![(2.into(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(), + new_set: vec![(3.into(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(), + migration: None, + }), MigrationState::Idle); + } + + #[test] + fn migration_state_is_idle_when_sets_are_equal() { + assert_eq!(migration_state(&1.into(), &KeyServerSetSnapshot { + current_set: vec![(1.into(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(), + new_set: vec![(1.into(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(), + migration: None, + }), MigrationState::Idle); + } + + #[test] + fn migration_state_is_idle_when_only_address_changes() { + assert_eq!(migration_state(&1.into(), &KeyServerSetSnapshot { + current_set: vec![(1.into(), "127.0.0.1:8080".parse().unwrap())].into_iter().collect(), + new_set: vec![(1.into(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(), + migration: None, + }), MigrationState::Idle); + } + + #[test] + fn migration_state_is_required_when_node_is_added() { + assert_eq!(migration_state(&1.into(), &KeyServerSetSnapshot { + current_set: vec![(1.into(), "127.0.0.1:8080".parse().unwrap())].into_iter().collect(), + new_set: vec![(1.into(), "127.0.0.1:8080".parse().unwrap()), + (2.into(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(), + migration: None, + }), MigrationState::Required); + } + + #[test] + fn migration_state_is_required_when_node_is_removed() { + assert_eq!(migration_state(&1.into(), &KeyServerSetSnapshot { + current_set: vec![(1.into(), "127.0.0.1:8080".parse().unwrap()), + (2.into(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(), + new_set: vec![(1.into(), "127.0.0.1:8080".parse().unwrap())].into_iter().collect(), + migration: None, + }), MigrationState::Required); + } + + #[test] + fn migration_state_is_started_when_migration_is_some() { + assert_eq!(migration_state(&1.into(), &KeyServerSetSnapshot { + current_set: vec![(1.into(), "127.0.0.1:8080".parse().unwrap())].into_iter().collect(), + new_set: Default::default(), + migration: Some(KeyServerSetMigration { + id: Default::default(), + set: Default::default(), + master: Default::default(), + is_confirmed: Default::default(), + }), + }), MigrationState::Started); + } + + #[test] + fn existing_master_is_selected_when_migration_has_started() { + assert_eq!(select_master_node(&KeyServerSetSnapshot { + current_set: vec![(1.into(), "127.0.0.1:8180".parse().unwrap())].into_iter().collect(), + new_set: vec![(2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(), + migration: Some(KeyServerSetMigration { + master: 3.into(), + ..Default::default() + }), + }), &3.into()); + } + + #[test] + fn persistent_master_is_selected_when_migration_has_not_started_yet() { + assert_eq!(select_master_node(&KeyServerSetSnapshot { + current_set: vec![(1.into(), "127.0.0.1:8180".parse().unwrap()), + (2.into(), "127.0.0.1:8180".parse().unwrap())].into_iter().collect(), + new_set: vec![(2.into(), "127.0.0.1:8181".parse().unwrap()), + (4.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(), + migration: None, + }), &2.into()); + } + + #[test] + fn new_master_is_selected_in_worst_case() { + assert_eq!(select_master_node(&KeyServerSetSnapshot { + current_set: vec![(1.into(), "127.0.0.1:8180".parse().unwrap()), + (2.into(), "127.0.0.1:8180".parse().unwrap())].into_iter().collect(), + new_set: vec![(3.into(), "127.0.0.1:8181".parse().unwrap()), + (4.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(), + migration: None, + }), &3.into()); + } + + #[test] + fn maintain_connections_returns_none_when_session_is_active() { + assert_eq!(maintain_connections(MigrationState::Required, + SessionState::Active(Default::default())), None); + } + + #[test] + fn maintain_connections_connects_to_current_set_when_no_migration() { + assert_eq!(maintain_connections(MigrationState::Idle, + SessionState::Idle), Some(ConnectionsAction::ConnectToCurrentSet)); + } + + #[test] + fn maintain_connections_connects_to_current_and_old_set_when_migration_is_required() { + assert_eq!(maintain_connections(MigrationState::Required, + SessionState::Idle), Some(ConnectionsAction::ConnectToCurrentAndMigrationSet)); + } + + #[test] + fn maintain_connections_connects_to_current_and_old_set_when_migration_is_started() { + assert_eq!(maintain_connections(MigrationState::Started, + SessionState::Idle), Some(ConnectionsAction::ConnectToCurrentAndMigrationSet)); + } + + #[test] + fn maintain_sessions_does_nothing_if_no_session_and_no_migration() { + assert_eq!(maintain_session(&1.into(), &Default::default(), &Default::default(), + MigrationState::Idle, SessionState::Idle), None); + } + + #[test] + fn maintain_session_does_nothing_when_migration_required_on_slave_node_and_no_session() { + assert_eq!(maintain_session(&2.into(), &vec![2.into()].into_iter().collect(), &KeyServerSetSnapshot { + current_set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(), + new_set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap()), + (2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(), + migration: None, + }, MigrationState::Required, SessionState::Idle), None); + } + + #[test] + fn maintain_session_does_nothing_when_migration_started_on_slave_node_and_no_session() { + assert_eq!(maintain_session(&2.into(), &vec![2.into()].into_iter().collect(), &KeyServerSetSnapshot { + current_set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(), + new_set: Default::default(), + migration: Some(KeyServerSetMigration { + master: 1.into(), + set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap()), + (2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(), + ..Default::default() + }), + }, MigrationState::Started, SessionState::Idle), None); + } + + #[test] + fn maintain_session_does_nothing_when_migration_started_on_master_node_and_no_session_and_not_connected_to_current_nodes() { + assert_eq!(maintain_session(&1.into(), &Default::default(), &KeyServerSetSnapshot { + current_set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap()), + (2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(), + new_set: Default::default(), + migration: Some(KeyServerSetMigration { + master: 1.into(), + set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(), + ..Default::default() + }), + }, MigrationState::Started, SessionState::Idle), None); + } + + #[test] + fn maintain_session_does_nothing_when_migration_started_on_master_node_and_no_session_and_not_connected_to_migration_nodes() { + assert_eq!(maintain_session(&1.into(), &Default::default(), &KeyServerSetSnapshot { + current_set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(), + new_set: Default::default(), + migration: Some(KeyServerSetMigration { + master: 1.into(), + set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap()), + (2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(), + ..Default::default() + }), + }, MigrationState::Started, SessionState::Idle), None); + } + + #[test] + fn maintain_session_starts_session_when_migration_started_on_master_node_and_no_session() { + assert_eq!(maintain_session(&1.into(), &vec![2.into()].into_iter().collect(), &KeyServerSetSnapshot { + current_set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(), + new_set: Default::default(), + migration: Some(KeyServerSetMigration { + master: 1.into(), + set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap()), + (2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(), + ..Default::default() + }), + }, MigrationState::Started, SessionState::Idle), Some(SessionAction::Start)); + } + + #[test] + fn maintain_session_does_nothing_when_both_migration_and_session_are_started() { + assert_eq!(maintain_session(&1.into(), &vec![2.into()].into_iter().collect(), &KeyServerSetSnapshot { + current_set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(), + new_set: Default::default(), + migration: Some(KeyServerSetMigration { + master: 1.into(), + set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap()), + (2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(), + ..Default::default() + }), + }, MigrationState::Started, SessionState::Active(Default::default())), None); + } + + #[test] + fn maintain_session_confirms_migration_when_active_and_session_has_finished_on_new_node() { + assert_eq!(maintain_session(&1.into(), &vec![2.into()].into_iter().collect(), &KeyServerSetSnapshot { + current_set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(), + new_set: Default::default(), + migration: Some(KeyServerSetMigration { + master: 1.into(), + set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap()), + (2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(), + ..Default::default() + }), + }, MigrationState::Started, SessionState::Finished(Default::default())), Some(SessionAction::ConfirmAndDrop(Default::default()))); + } + + #[test] + fn maintain_session_drops_session_when_active_and_session_has_finished_on_removed_node() { + assert_eq!(maintain_session(&1.into(), &vec![2.into()].into_iter().collect(), &KeyServerSetSnapshot { + current_set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap()), + (2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(), + new_set: Default::default(), + migration: Some(KeyServerSetMigration { + master: 2.into(), + set: vec![(2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(), + ..Default::default() + }), + }, MigrationState::Started, SessionState::Finished(Default::default())), Some(SessionAction::Drop)); + } + + #[test] + fn maintain_session_drops_session_when_active_and_session_has_failed() { + assert_eq!(maintain_session(&1.into(), &vec![2.into()].into_iter().collect(), &KeyServerSetSnapshot { + current_set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(), + new_set: Default::default(), + migration: Some(KeyServerSetMigration { + master: 1.into(), + set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap()), + (2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(), + ..Default::default() + }), + }, MigrationState::Started, SessionState::Failed(Default::default())), Some(SessionAction::Drop)); + } + + #[test] + fn maintain_session_detects_abnormal_when_no_migration_and_active_session() { + assert_eq!(maintain_session(&Default::default(), &Default::default(), &Default::default(), + MigrationState::Idle, SessionState::Active(Default::default())), Some(SessionAction::DropAndRetry)); + } + + #[test] + fn maintain_session_detects_abnormal_when_no_migration_and_finished_session() { + assert_eq!(maintain_session(&Default::default(), &Default::default(), &Default::default(), + MigrationState::Idle, SessionState::Finished(Default::default())), Some(SessionAction::DropAndRetry)); + } + + #[test] + fn maintain_session_detects_abnormal_when_no_migration_and_failed_session() { + assert_eq!(maintain_session(&Default::default(), &Default::default(), &Default::default(), + MigrationState::Idle, SessionState::Failed(Default::default())), Some(SessionAction::DropAndRetry)); + } + + #[test] + fn maintain_session_detects_abnormal_when_required_migration_and_active_session() { + assert_eq!(maintain_session(&Default::default(), &Default::default(), &Default::default(), + MigrationState::Required, SessionState::Active(Default::default())), Some(SessionAction::DropAndRetry)); + } + + #[test] + fn maintain_session_detects_abnormal_when_required_migration_and_finished_session() { + assert_eq!(maintain_session(&Default::default(), &Default::default(), &Default::default(), + MigrationState::Required, SessionState::Finished(Default::default())), Some(SessionAction::DropAndRetry)); + } + + #[test] + fn maintain_session_detects_abnormal_when_required_migration_and_failed_session() { + assert_eq!(maintain_session(&Default::default(), &Default::default(), &Default::default(), + MigrationState::Required, SessionState::Failed(Default::default())), Some(SessionAction::DropAndRetry)); + } + + #[test] + fn maintain_session_detects_abnormal_when_active_migration_and_active_session_with_different_id() { + assert_eq!(maintain_session(&Default::default(), &Default::default(), &KeyServerSetSnapshot { + migration: Some(KeyServerSetMigration { + id: 0.into(), + ..Default::default() + }), + ..Default::default() + }, MigrationState::Started, SessionState::Active(Some(1.into()))), Some(SessionAction::DropAndRetry)); + } + + #[test] + fn maintain_session_detects_abnormal_when_active_migration_and_finished_session_with_different_id() { + assert_eq!(maintain_session(&Default::default(), &Default::default(), &KeyServerSetSnapshot { + migration: Some(KeyServerSetMigration { + id: 0.into(), + ..Default::default() + }), + ..Default::default() + }, MigrationState::Started, SessionState::Finished(Some(1.into()))), Some(SessionAction::DropAndRetry)); + } + + #[test] + fn maintain_session_detects_abnormal_when_active_migration_and_failed_session_with_different_id() { + assert_eq!(maintain_session(&Default::default(), &Default::default(), &KeyServerSetSnapshot { + migration: Some(KeyServerSetMigration { + id: 0.into(), + ..Default::default() + }), + ..Default::default() + }, MigrationState::Started, SessionState::Failed(Some(1.into()))), Some(SessionAction::DropAndRetry)); + } +} diff --git a/secret_store/src/key_server_cluster/io/deadline.rs b/secret_store/src/key_server_cluster/io/deadline.rs index a2a282701..1088f4f33 100644 --- a/secret_store/src/key_server_cluster/io/deadline.rs +++ b/secret_store/src/key_server_cluster/io/deadline.rs @@ -61,20 +61,11 @@ impl Future for Deadline where F: Future { #[cfg(test)] mod tests { - use std::io; use std::time::Duration; - use futures::{Future, empty, done}; + use futures::{Future, done}; use tokio_core::reactor::Core; use super::{deadline, DeadlineStatus}; - //#[test] TODO: not working - fn _deadline_timeout_works() { - let mut core = Core::new().unwrap(); - let deadline = deadline(Duration::from_millis(1), &core.handle(), empty::<(), io::Error>()).unwrap(); - core.turn(Some(Duration::from_millis(3))); - assert_eq!(deadline.wait().unwrap(), DeadlineStatus::Timeout); - } - #[test] fn deadline_result_works() { let mut core = Core::new().unwrap(); diff --git a/secret_store/src/key_server_cluster/jobs/dummy_job.rs b/secret_store/src/key_server_cluster/jobs/dummy_job.rs index 7126384da..3e84c0d49 100644 --- a/secret_store/src/key_server_cluster/jobs/dummy_job.rs +++ b/secret_store/src/key_server_cluster/jobs/dummy_job.rs @@ -18,7 +18,7 @@ use std::collections::{BTreeMap, BTreeSet}; use key_server_cluster::{Error, NodeId}; use key_server_cluster::jobs::job_session::{JobExecutor, JobTransport, JobPartialRequestAction, JobPartialResponseAction}; -/// No-work job to use in generics (TODO: create separate ShareChangeConsensusSession && remove this) +/// No-work job to use in generics (TODO [Refac]: create separate ShareChangeConsensusSession && remove this) pub struct DummyJob; impl JobExecutor for DummyJob { @@ -43,7 +43,7 @@ impl JobExecutor for DummyJob { } } -/// No-work job transport to use in generics (TODO: create separate ShareChangeConsensusSession && remove this) +/// No-work job transport to use in generics (TODO [Refac]: create separate ShareChangeConsensusSession && remove this) pub struct DummyJobTransport; impl JobTransport for DummyJobTransport { diff --git a/secret_store/src/key_server_cluster/jobs/signing_job.rs b/secret_store/src/key_server_cluster/jobs/signing_job.rs index ae0c0afc5..7b02de1c0 100644 --- a/secret_store/src/key_server_cluster/jobs/signing_job.rs +++ b/secret_store/src/key_server_cluster/jobs/signing_job.rs @@ -133,7 +133,7 @@ impl JobExecutor for SigningJob { if Some(&partial_response.request_id) != self.request_id.as_ref() { return Ok(JobPartialResponseAction::Ignore); } - // TODO: check_signature_share() + // TODO [Trust]: check_signature_share() Ok(JobPartialResponseAction::Accept) } diff --git a/secret_store/src/key_server_cluster/jobs/unknown_sessions_job.rs b/secret_store/src/key_server_cluster/jobs/unknown_sessions_job.rs index d4551298b..13f2f8b8b 100644 --- a/secret_store/src/key_server_cluster/jobs/unknown_sessions_job.rs +++ b/secret_store/src/key_server_cluster/jobs/unknown_sessions_job.rs @@ -63,7 +63,7 @@ impl JobExecutor for UnknownSessionsJob { Ok(JobPartialResponseAction::Accept) } - // TODO: optimizations: + // TODO [Opt]: // currently ALL unknown sessions are sent at once - it is better to limit messages by size/len => add partial-partial responses fn compute_response(&self, partial_responses: &BTreeMap>) -> Result>, Error> { let mut result: BTreeMap> = BTreeMap::new(); diff --git a/secret_store/src/key_server_cluster/math.rs b/secret_store/src/key_server_cluster/math.rs index 21be0fae0..352a1e015 100644 --- a/secret_store/src/key_server_cluster/math.rs +++ b/secret_store/src/key_server_cluster/math.rs @@ -368,7 +368,7 @@ pub fn compute_signature_share<'a, I>(threshold: usize, combined_hash: &Secret, /// Check signature share. pub fn _check_signature_share<'a, I>(_combined_hash: &Secret, _signature_share: &Secret, _public_share: &Public, _one_time_public_share: &Public, _node_numbers: I) -> Result where I: Iterator { - // TODO: in paper partial signature is checked using comparison: + // TODO [Trust]: in paper partial signature is checked using comparison: // sig[i] * T = r[i] - c * lagrange_coeff(i) * y[i] // => (k[i] - c * lagrange_coeff(i) * s[i]) * T = r[i] - c * lagrange_coeff(i) * y[i] // => k[i] * T - c * lagrange_coeff(i) * s[i] * T = k[i] * T - c * lagrange_coeff(i) * y[i] diff --git a/secret_store/src/key_server_cluster/message.rs b/secret_store/src/key_server_cluster/message.rs index b8cd98643..073a6460e 100644 --- a/secret_store/src/key_server_cluster/message.rs +++ b/secret_store/src/key_server_cluster/message.rs @@ -378,6 +378,8 @@ pub struct ConfirmConsensusInitialization { /// Node is asked to be part of servers-set consensus group. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct InitializeConsensusSessionWithServersSet { + /// Migration id (if any). + pub migration_id: Option, /// Old nodes set. pub old_nodes_set: BTreeSet, /// New nodes set. @@ -878,6 +880,19 @@ impl Message { } } + pub fn is_error_message(&self) -> bool { + match *self { + Message::Generation(GenerationMessage::SessionError(_)) => true, + Message::Encryption(EncryptionMessage::EncryptionSessionError(_)) => true, + Message::Decryption(DecryptionMessage::DecryptionSessionError(_)) => true, + Message::Signing(SigningMessage::SigningConsensusMessage(_)) => true, + Message::KeyVersionNegotiation(KeyVersionNegotiationMessage::KeyVersionsError(_)) => true, + Message::ShareAdd(ShareAddMessage::ShareAddError(_)) => true, + Message::ServersSetChange(ServersSetChangeMessage::ServersSetChangeError(_)) => true, + _ => false, + } + } + pub fn is_exclusive_session_message(&self) -> bool { match *self { Message::ServersSetChange(_) => true, diff --git a/secret_store/src/key_server_cluster/mod.rs b/secret_store/src/key_server_cluster/mod.rs index 0692a39ad..feafd442c 100644 --- a/secret_store/src/key_server_cluster/mod.rs +++ b/secret_store/src/key_server_cluster/mod.rs @@ -24,7 +24,7 @@ pub use super::traits::NodeKeyPair; pub use super::types::all::{NodeId, EncryptedDocumentKeyShadow}; pub use super::acl_storage::AclStorage; pub use super::key_storage::{KeyStorage, DocumentKeyShare, DocumentKeyShareVersion}; -pub use super::key_server_set::KeyServerSet; +pub use super::key_server_set::{is_migration_required, KeyServerSet, KeyServerSetSnapshot, KeyServerSetMigration}; pub use super::serialization::{SerializableSignature, SerializableH256, SerializableSecret, SerializablePublic, SerializableMessageHash}; pub use self::cluster::{ClusterCore, ClusterConfiguration, ClusterClient}; pub use self::cluster_sessions::{ClusterSession, ClusterSessionsListener}; @@ -187,6 +187,8 @@ pub use self::client_sessions::signing_session; mod cluster; mod cluster_sessions; mod cluster_sessions_creator; +mod connection_trigger; +mod connection_trigger_with_migration; mod io; mod jobs; pub mod math; diff --git a/secret_store/src/key_server_set.rs b/secret_store/src/key_server_set.rs index e64123a7f..f60b377ec 100644 --- a/secret_store/src/key_server_set.rs +++ b/secret_store/src/key_server_set.rs @@ -16,35 +16,78 @@ use std::sync::Arc; use std::net::SocketAddr; -use std::collections::BTreeMap; -use futures::{future, Future}; +use std::collections::{BTreeMap, HashSet}; +use futures::{future, Future, IntoFuture}; use parking_lot::Mutex; use ethcore::filter::Filter; use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify}; +use ethkey::public_to_address; use native_contracts::KeyServerSet as KeyServerSetContract; use hash::keccak; use bigint::hash::H256; use util::Address; use bytes::Bytes; -use types::all::{Error, Public, NodeAddress}; +use types::all::{Error, Public, NodeAddress, NodeId}; use trusted_client::TrustedClient; +use {NodeKeyPair}; +type BoxFuture = Box + Send>; + +/// Name of KeyServerSet contract in registry. const KEY_SERVER_SET_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_server_set"; +/// Number of blocks (since latest new_set change) required before actually starting migration. +const MIGRATION_CONFIRMATIONS_REQUIRED: u64 = 5; +/// Number of blocks before the same-migration transaction (be it start or confirmation) will be retried. +const TRANSACTION_RETRY_INTERVAL_BLOCKS: u64 = 30; /// Key server has been added to the set. const ADDED_EVENT_NAME: &'static [u8] = &*b"KeyServerAdded(address)"; /// Key server has been removed from the set. const REMOVED_EVENT_NAME: &'static [u8] = &*b"KeyServerRemoved(address)"; +/// Migration has started. +const MIGRATION_STARTED_EVENT_NAME: &'static [u8] = &*b"MigrationStarted()"; +/// Migration has completed. +const MIGRATION_COMPLETED_EVENT_NAME: &'static [u8] = &*b"MigrationCompleted()"; lazy_static! { static ref ADDED_EVENT_NAME_HASH: H256 = keccak(ADDED_EVENT_NAME); static ref REMOVED_EVENT_NAME_HASH: H256 = keccak(REMOVED_EVENT_NAME); + static ref MIGRATION_STARTED_EVENT_NAME_HASH: H256 = keccak(MIGRATION_STARTED_EVENT_NAME); + static ref MIGRATION_COMPLETED_EVENT_NAME_HASH: H256 = keccak(MIGRATION_COMPLETED_EVENT_NAME); } -/// Key Server set +#[derive(Default, Debug, Clone, PartialEq)] +/// Key Server Set state. +pub struct KeyServerSetSnapshot { + /// Current set of key servers. + pub current_set: BTreeMap, + /// New set of key servers. + pub new_set: BTreeMap, + /// Current migration data. + pub migration: Option, +} + +#[derive(Default, Debug, Clone, PartialEq)] +/// Key server set migration. +pub struct KeyServerSetMigration { + /// Migration id. + pub id: H256, + /// Migration set of key servers. It is the new_set at the moment of migration start. + pub set: BTreeMap, + /// Master node of the migration process. + pub master: NodeId, + /// Is migration confirmed by this node? + pub is_confirmed: bool, +} + +/// Key Server Set pub trait KeyServerSet: Send + Sync { - /// Get set of configured key servers - fn get(&self) -> BTreeMap; + /// Get server set state. + fn snapshot(&self) -> KeyServerSetSnapshot; + /// Start migration. + fn start_migration(&self, migration_id: H256); + /// Confirm migration. + fn confirm_migration(&self, migration_id: H256); } /// On-chain Key Server set implementation. @@ -53,21 +96,49 @@ pub struct OnChainKeyServerSet { contract: Mutex, } +#[derive(Default, Debug, Clone, PartialEq)] +/// Non-finalized new_set. +struct FutureNewSet { + /// New servers set. + pub new_set: BTreeMap, + /// Hash of block, when this set has appeared for first time. + pub block: H256, +} + +#[derive(Default, Debug, Clone, PartialEq)] +/// Migration-related transaction information. +struct PreviousMigrationTransaction { + /// Migration id. + pub migration_id: H256, + /// Latest actual block number at the time this transaction has been sent. + pub block: u64, +} + /// Cached on-chain Key Server set contract. struct CachedContract { /// Blockchain client. client: TrustedClient, /// Contract address. - contract_addr: Option
, - /// Active set of key servers. - key_servers: BTreeMap, + contract: Option, + /// Is auto-migrate enabled? + auto_migrate_enabled: bool, + /// Current contract state. + snapshot: KeyServerSetSnapshot, + /// Scheduled contract state (if any). + future_new_set: Option, + /// Previous start migration transaction. + start_migration_tx: Option, + /// Previous confirm migration transaction. + confirm_migration_tx: Option, + /// This node key pair. + self_key_pair: Arc, } impl OnChainKeyServerSet { - pub fn new(trusted_client: TrustedClient, key_servers: BTreeMap) -> Result, Error> { + pub fn new(trusted_client: TrustedClient, self_key_pair: Arc, auto_migrate_enabled: bool, key_servers: BTreeMap) -> Result, Error> { let client = trusted_client.get_untrusted(); let key_server_set = Arc::new(OnChainKeyServerSet { - contract: Mutex::new(CachedContract::new(trusted_client, key_servers)?), + contract: Mutex::new(CachedContract::new(trusted_client, self_key_pair, auto_migrate_enabled, key_servers)?), }); client .ok_or(Error::Internal("Constructing OnChainKeyServerSet without active Client".into()))? @@ -77,8 +148,16 @@ impl OnChainKeyServerSet { } impl KeyServerSet for OnChainKeyServerSet { - fn get(&self) -> BTreeMap { - self.contract.lock().get() + fn snapshot(&self) -> KeyServerSetSnapshot { + self.contract.lock().snapshot() + } + + fn start_migration(&self, migration_id: H256) { + self.contract.lock().start_migration(migration_id) + } + + fn confirm_migration(&self, migration_id: H256) { + self.contract.lock().confirm_migration(migration_id); } } @@ -91,107 +170,384 @@ impl ChainNotify for OnChainKeyServerSet { } impl CachedContract { - pub fn new(client: TrustedClient, key_servers: BTreeMap) -> Result { - let mut cached_contract = CachedContract { + pub fn new(client: TrustedClient, self_key_pair: Arc, auto_migrate_enabled: bool, key_servers: BTreeMap) -> Result { + let server_set = key_servers.into_iter() + .map(|(p, addr)| { + let addr = format!("{}:{}", addr.address, addr.port).parse() + .map_err(|err| Error::Internal(format!("error parsing node address: {}", err)))?; + Ok((p, addr)) + }) + .collect::, Error>>()?; + Ok(CachedContract { client: client, - contract_addr: None, - key_servers: key_servers.into_iter() - .map(|(p, addr)| { - let addr = format!("{}:{}", addr.address, addr.port).parse() - .map_err(|err| Error::Internal(format!("error parsing node address: {}", err)))?; - Ok((p, addr)) - }) - .collect::, Error>>()?, - }; - - if let Some(client) = cached_contract.client.get() { - let key_server_contract_address = client.registry_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.to_owned()); - // only initialize from contract if it is installed. otherwise - use default nodes - // once the contract is installed, all default nodes are lost (if not in the contract' set) - if key_server_contract_address.is_some() { - cached_contract.read_from_registry(&*client, key_server_contract_address); - } - } - - Ok(cached_contract) + contract: None, + auto_migrate_enabled: auto_migrate_enabled, + future_new_set: None, + confirm_migration_tx: None, + start_migration_tx: None, + snapshot: KeyServerSetSnapshot { + current_set: server_set.clone(), + new_set: server_set, + ..Default::default() + }, + self_key_pair: self_key_pair, + }) } pub fn update(&mut self, enacted: Vec, retracted: Vec) { if let Some(client) = self.client.get() { - let new_contract_addr = client.registry_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.to_owned()); + // read new snapshot from reqistry (if something has chnaged) + self.read_from_registry_if_required(&*client, enacted, retracted); - // new contract installed => read nodes set from the contract - if self.contract_addr.as_ref() != new_contract_addr.as_ref() { - self.read_from_registry(&*client, new_contract_addr); + // update number of confirmations (if there's future new set) + self.update_number_of_confirmations_if_required(&*client); + } + } + + fn snapshot(&self) -> KeyServerSetSnapshot { + self.snapshot.clone() + } + + fn start_migration(&mut self, migration_id: H256) { + // trust is not needed here, because it is the reaction to the read of the trusted client + if let (Some(client), Some(contract)) = (self.client.get_untrusted(), self.contract.as_ref()) { + // check if we need to send start migration transaction + if !update_last_transaction_block(&*client, &migration_id, &mut self.start_migration_tx) { return; } - // check for contract events - let is_set_changed = self.contract_addr.is_some() && enacted.iter() - .chain(retracted.iter()) - .any(|block_hash| !client.logs(Filter { - from_block: BlockId::Hash(block_hash.clone()), - to_block: BlockId::Hash(block_hash.clone()), - address: self.contract_addr.clone().map(|a| vec![a]), - topics: vec![ - Some(vec![*ADDED_EVENT_NAME_HASH, *REMOVED_EVENT_NAME_HASH]), - None, - None, - None, - ], - limit: Some(1), - }).is_empty()); - // to simplify processing - just re-read the whole nodes set from the contract - if is_set_changed { - self.read_from_registry(&*client, new_contract_addr); + // prepare transaction data + let transaction_data = match contract.encode_start_migration_input(migration_id) { + Ok(transaction_data) => transaction_data, + Err(error) => { + warn!(target: "secretstore_net", "{}: failed to prepare auto-migration start transaction: {}", + self.self_key_pair.public(), error); + return; + }, + }; + + // send transaction + if let Err(error) = client.transact_contract(contract.address.clone(), transaction_data) { + warn!(target: "secretstore_net", "{}: failed to submit auto-migration start transaction: {}", + self.self_key_pair.public(), error); + } else { + trace!(target: "secretstore_net", "{}: sent auto-migration start transaction", + self.self_key_pair.public(), ); } } } - pub fn get(&self) -> BTreeMap { - self.key_servers.clone() + fn confirm_migration(&mut self, migration_id: H256) { + // trust is not needed here, because we have already completed the action + if let (Some(client), Some(contract)) = (self.client.get(), self.contract.as_ref()) { + // check if we need to send start migration transaction + if !update_last_transaction_block(&*client, &migration_id, &mut self.confirm_migration_tx) { + return; + } + + // prepare transaction data + let transaction_data = match contract.encode_confirm_migration_input(migration_id) { + Ok(transaction_data) => transaction_data, + Err(error) => { + warn!(target: "secretstore_net", "{}: failed to prepare auto-migration confirmation transaction: {}", + self.self_key_pair.public(), error); + return; + }, + }; + + // send transaction + if let Err(error) = client.transact_contract(contract.address.clone(), transaction_data) { + warn!(target: "secretstore_net", "{}: failed to submit auto-migration confirmation transaction: {}", + self.self_key_pair.public(), error); + } else { + trace!(target: "secretstore_net", "{}: sent auto-migration confirm transaction", + self.self_key_pair.public()); + } + } + } + + fn read_from_registry_if_required(&mut self, client: &Client, enacted: Vec, retracted: Vec) { + // read new contract from registry + let new_contract_addr = client.registry_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.to_owned()); + + // new contract installed => read nodes set from the contract + if self.contract.as_ref().map(|c| &c.address) != new_contract_addr.as_ref() { + self.read_from_registry(&*client, new_contract_addr); + return; + } + + // check for contract events + let is_set_changed = self.contract.is_some() && enacted.iter() + .chain(retracted.iter()) + .any(|block_hash| !client.logs(Filter { + from_block: BlockId::Hash(block_hash.clone()), + to_block: BlockId::Hash(block_hash.clone()), + address: self.contract.as_ref().map(|c| vec![c.address.clone()]), + topics: vec![ + Some(vec![*ADDED_EVENT_NAME_HASH, *REMOVED_EVENT_NAME_HASH, + *MIGRATION_STARTED_EVENT_NAME_HASH, *MIGRATION_COMPLETED_EVENT_NAME_HASH]), + None, + None, + None, + ], + limit: Some(1), + }).is_empty()); + + // to simplify processing - just re-read the whole nodes set from the contract + if is_set_changed { + self.read_from_registry(&*client, new_contract_addr); + } } fn read_from_registry(&mut self, client: &Client, new_contract_address: Option
) { - self.key_servers = new_contract_address.map(|contract_addr| { + self.contract = new_contract_address.map(|contract_addr| { trace!(target: "secretstore", "Configuring for key server set contract from {}", contract_addr); KeyServerSetContract::new(contract_addr) - }) - .map(|contract| { - let mut key_servers = BTreeMap::new(); - let do_call = |a, d| future::done(client.call_contract(BlockId::Latest, a, d)); - let key_servers_list = contract.get_key_servers(do_call).wait() - .map_err(|err| { trace!(target: "secretstore", "Error {} reading list of key servers from contract", err); err }) - .unwrap_or_default(); - for key_server in key_servers_list { - let key_server_public = contract.get_key_server_public( - |a, d| future::done(client.call_contract(BlockId::Latest, a, d)), key_server).wait() - .and_then(|p| if p.len() == 64 { Ok(Public::from_slice(&p)) } else { Err(format!("Invalid public length {}", p.len())) }); - let key_server_ip = contract.get_key_server_address( - |a, d| future::done(client.call_contract(BlockId::Latest, a, d)), key_server).wait() - .and_then(|a| a.parse().map_err(|e| format!("Invalid ip address: {}", e))); + }); - // only add successfully parsed nodes - match (key_server_public, key_server_ip) { - (Ok(key_server_public), Ok(key_server_ip)) => { key_servers.insert(key_server_public, key_server_ip); }, - (Err(public_err), _) => warn!(target: "secretstore_net", "received invalid public from key server set contract: {}", public_err), - (_, Err(ip_err)) => warn!(target: "secretstore_net", "received invalid IP from key server set contract: {}", ip_err), - } + let contract = match self.contract.as_ref() { + Some(contract) => contract, + None => { + // no contract installed => empty snapshot + // WARNING: after restart current_set will be reset to the set from configuration file + // even though we have reset to empty set here. We are not considerning this as an issue + // because it is actually the issue of administrator. + self.snapshot = Default::default(); + self.future_new_set = None; + return; + }, + }; + + let do_call = |a, d| future::done(client.call_contract(BlockId::Latest, a, d)); + + let current_set = Self::read_key_server_set(&contract, &do_call, &KeyServerSetContract::get_current_key_servers, + &KeyServerSetContract::get_current_key_server_public, &KeyServerSetContract::get_current_key_server_address); + + // read migration-related data if auto migration is enabled + let (new_set, migration) = match self.auto_migrate_enabled { + true => { + let new_set = Self::read_key_server_set(&contract, &do_call, &KeyServerSetContract::get_new_key_servers, + &KeyServerSetContract::get_new_key_server_public, &KeyServerSetContract::get_new_key_server_address); + let migration_set = Self::read_key_server_set(&contract, &do_call, &KeyServerSetContract::get_migration_key_servers, + &KeyServerSetContract::get_migration_key_server_public, &KeyServerSetContract::get_migration_key_server_address); + + let migration_id = match migration_set.is_empty() { + false => contract.get_migration_id(&do_call).wait() + .map_err(|err| { trace!(target: "secretstore", "Error {} reading migration id from contract", err); err }) + .ok(), + true => None, + }; + + let migration_master = match migration_set.is_empty() { + false => contract.get_migration_master(&do_call).wait() + .map_err(|err| { trace!(target: "secretstore", "Error {} reading migration master from contract", err); err }) + .ok() + .and_then(|address| current_set.keys().chain(migration_set.keys()) + .find(|public| public_to_address(public) == address) + .cloned()), + true => None, + }; + + let is_migration_confirmed = match migration_set.is_empty() { + false if current_set.contains_key(self.self_key_pair.public()) || migration_set.contains_key(self.self_key_pair.public()) => + contract.is_migration_confirmed(&do_call, self.self_key_pair.address()).wait() + .map_err(|err| { trace!(target: "secretstore", "Error {} reading migration confirmation from contract", err); err }) + .ok(), + _ => None, + }; + + let migration = match (migration_set.is_empty(), migration_id, migration_master, is_migration_confirmed) { + (false, Some(migration_id), Some(migration_master), Some(is_migration_confirmed)) => + Some(KeyServerSetMigration { + id: migration_id, + master: migration_master, + set: migration_set, + is_confirmed: is_migration_confirmed, + }), + _ => None, + }; + + (new_set, migration) } - key_servers - }) - .unwrap_or_default(); - self.contract_addr = new_contract_address; + false => (current_set.clone(), None), + }; + + let mut new_snapshot = KeyServerSetSnapshot { + current_set: current_set, + new_set: new_set, + migration: migration, + }; + + // we might want to adjust new_set if auto migration is enabled + if self.auto_migrate_enabled { + let block = client.block_hash(BlockId::Latest).unwrap_or_default(); + update_future_set(&mut self.future_new_set, &mut new_snapshot, block); + } + + self.snapshot = new_snapshot; } + + fn read_key_server_set(contract: &KeyServerSetContract, do_call: F, read_list: FL, read_public: FP, read_address: FA) -> BTreeMap + where + F: FnOnce(Address, Vec) -> U + Copy, + U: IntoFuture, Error=String>, + U::Future: Send + 'static, + FL: Fn(&KeyServerSetContract, F) -> BoxFuture, String>, + FP: Fn(&KeyServerSetContract, F, Address) -> BoxFuture, String>, + FA: Fn(&KeyServerSetContract, F, Address) -> BoxFuture { + let mut key_servers = BTreeMap::new(); + let mut key_servers_addresses = HashSet::new(); + let key_servers_list = read_list(contract, do_call).wait() + .map_err(|err| { warn!(target: "secretstore_net", "error {} reading list of key servers from contract", err); err }) + .unwrap_or_default(); + for key_server in key_servers_list { + let key_server_public = read_public(contract, do_call, key_server).wait() + .and_then(|p| if p.len() == 64 { Ok(Public::from_slice(&p)) } else { Err(format!("Invalid public length {}", p.len())) }); + let key_server_address: Result = read_address(contract, do_call, key_server).wait() + .and_then(|a| a.parse().map_err(|e| format!("Invalid ip address: {}", e))); + + // only add successfully parsed nodes + match (key_server_public, key_server_address) { + (Ok(key_server_public), Ok(key_server_address)) => { + if !key_servers_addresses.insert(key_server_address.clone()) { + warn!(target: "secretstore_net", "the same address ({}) specified twice in list of contracts. Ignoring server {}", + key_server_address, key_server_public); + continue; + } + + key_servers.insert(key_server_public, key_server_address); + }, + (Err(public_err), _) => warn!(target: "secretstore_net", "received invalid public from key server set contract: {}", public_err), + (_, Err(ip_err)) => warn!(target: "secretstore_net", "received invalid IP from key server set contract: {}", ip_err), + } + } + key_servers + } + + fn update_number_of_confirmations_if_required(&mut self, client: &BlockChainClient) { + if !self.auto_migrate_enabled { + return; + } + + update_number_of_confirmations( + &|| latest_block_hash(&*client), + &|block| block_confirmations(&*client, block), + &mut self.future_new_set, &mut self.snapshot); + } +} + +/// Check if two sets are equal (in terms of migration requirements). We do not need migration if only +/// addresses are changed - simply adjusting connections is enough in this case. +pub fn is_migration_required(current_set: &BTreeMap, new_set: &BTreeMap) -> bool { + let no_nodes_removed = current_set.keys().all(|n| new_set.contains_key(n)); + let no_nodes_added = new_set.keys().all(|n| current_set.contains_key(n)); + !no_nodes_removed || !no_nodes_added +} + +fn update_future_set(future_new_set: &mut Option, new_snapshot: &mut KeyServerSetSnapshot, block: H256) { + // migration has already started => no need to delay visibility + if new_snapshot.migration.is_some() { + *future_new_set = None; + return; + } + + // new no migration is required => no need to delay visibility + if !is_migration_required(&new_snapshot.current_set, &new_snapshot.new_set) { + *future_new_set = None; + return; + } + + // when auto-migrate is enabled, we do not want to start migration right after new_set is changed, because of: + // 1) there could be a fork && we could start migration to forked version (and potentially lose secrets) + // 2) there must be some period for new_set changes finalization (i.e. adding/removing more servers) + let mut new_set = new_snapshot.current_set.clone(); + ::std::mem::swap(&mut new_set, &mut new_snapshot.new_set); + + // if nothing has changed in future_new_set, then we want to preserve previous block hash + let block = match Some(&new_set) == future_new_set.as_ref().map(|f| &f.new_set) { + true => future_new_set.as_ref().map(|f| &f.block).cloned().unwrap_or_else(|| block), + false => block, + }; + + *future_new_set = Some(FutureNewSet { + new_set: new_set, + block: block, + }); +} + +fn update_number_of_confirmations H256, F2: Fn(H256) -> Option>(latest_block: &F1, confirmations: &F2, future_new_set: &mut Option, snapshot: &mut KeyServerSetSnapshot) { + match future_new_set.as_mut() { + // no future new set is scheduled => do nothing, + None => return, + // else we should calculate number of confirmations for future new set + Some(future_new_set) => match confirmations(future_new_set.block.clone()) { + // we have enough confirmations => should move new_set from future to snapshot + Some(confirmations) if confirmations >= MIGRATION_CONFIRMATIONS_REQUIRED => (), + // not enough confirmations => do nothing + Some(_) => return, + // if number of confirmations is None, then reorg has happened && we need to reset block + // (some more intelligent startegy is possible, but let's stick to simplest one) + None => { + future_new_set.block = latest_block(); + return; + } + } + } + + let future_new_set = future_new_set.take() + .expect("we only pass through match above when future_new_set is some; qed"); + snapshot.new_set = future_new_set.new_set; +} + +fn update_last_transaction_block(client: &Client, migration_id: &H256, previous_transaction: &mut Option) -> bool { + // TODO [Reliability]: add the same mechanism to the contract listener, if accepted + let last_block = client.block_number(BlockId::Latest).unwrap_or_default(); + match previous_transaction.as_ref() { + // no previous transaction => send immideately + None => (), + // previous transaction has been sent for other migration process => send immideately + Some(tx) if tx.migration_id != *migration_id => (), + // if we have sent the same type of transaction recently => do nothing (hope it will be mined eventually) + // if we have sent the same transaction some time ago => + // assume that our tx queue was full + // or we didn't have enough eth fot this tx + // or the transaction has been removed from the queue (and never reached any miner node) + // if we have restarted after sending tx => assume we have never sent it + Some(tx) => { + let last_block = client.block_number(BlockId::Latest).unwrap_or_default(); + if tx.block > last_block || last_block - tx.block < TRANSACTION_RETRY_INTERVAL_BLOCKS { + return false; + } + }, + } + + *previous_transaction = Some(PreviousMigrationTransaction { + migration_id: migration_id.clone(), + block: last_block, + }); + + true +} + +fn latest_block_hash(client: &BlockChainClient) -> H256 { + client.block_hash(BlockId::Latest).unwrap_or_default() +} + +fn block_confirmations(client: &BlockChainClient, block: H256) -> Option { + client.block_number(BlockId::Hash(block)) + .and_then(|block| client.block_number(BlockId::Latest).map(|last_block| (block, last_block))) + .map(|(block, last_block)| last_block - block) } #[cfg(test)] pub mod tests { use std::collections::BTreeMap; use std::net::SocketAddr; + use bigint::hash::H256; use ethkey::Public; - use super::KeyServerSet; + use super::{update_future_set, update_number_of_confirmations, FutureNewSet, + KeyServerSet, KeyServerSetSnapshot, MIGRATION_CONFIRMATIONS_REQUIRED}; #[derive(Default)] pub struct MapKeyServerSet { @@ -207,8 +563,228 @@ pub mod tests { } impl KeyServerSet for MapKeyServerSet { - fn get(&self) -> BTreeMap { - self.nodes.clone() + fn snapshot(&self) -> KeyServerSetSnapshot { + KeyServerSetSnapshot { + current_set: self.nodes.clone(), + new_set: self.nodes.clone(), + ..Default::default() + } + } + + fn start_migration(&self, _migration_id: H256) { + unimplemented!("test-only") + } + + fn confirm_migration(&self, _migration_id: H256) { + unimplemented!("test-only") } } + + #[test] + fn future_set_is_updated_to_none_when_migration_has_already_started() { + let mut future_new_set = Some(Default::default()); + let mut new_snapshot = KeyServerSetSnapshot { + migration: Some(Default::default()), + ..Default::default() + }; + let new_snapshot_copy = new_snapshot.clone(); + update_future_set(&mut future_new_set, &mut new_snapshot, Default::default()); + assert_eq!(future_new_set, None); + assert_eq!(new_snapshot, new_snapshot_copy); + } + + #[test] + fn future_set_is_updated_to_none_when_no_migration_is_required() { + let node_id = Default::default(); + let address1 = "127.0.0.1:12000".parse().unwrap(); + let address2 = "127.0.0.1:12001".parse().unwrap(); + + // addresses are different, but node set is the same => no migration is required + let mut future_new_set = Some(Default::default()); + let mut new_snapshot = KeyServerSetSnapshot { + current_set: vec![(node_id, address1)].into_iter().collect(), + new_set: vec![(node_id, address2)].into_iter().collect(), + ..Default::default() + }; + let new_snapshot_copy = new_snapshot.clone(); + update_future_set(&mut future_new_set, &mut new_snapshot, Default::default()); + assert_eq!(future_new_set, None); + assert_eq!(new_snapshot, new_snapshot_copy); + + // everything is the same => no migration is required + let mut future_new_set = Some(Default::default()); + let mut new_snapshot = KeyServerSetSnapshot { + current_set: vec![(node_id, address1)].into_iter().collect(), + new_set: vec![(node_id, address1)].into_iter().collect(), + ..Default::default() + }; + let new_snapshot_copy = new_snapshot.clone(); + update_future_set(&mut future_new_set, &mut new_snapshot, Default::default()); + assert_eq!(future_new_set, None); + assert_eq!(new_snapshot, new_snapshot_copy); + } + + #[test] + fn future_set_is_initialized() { + let address = "127.0.0.1:12000".parse().unwrap(); + + let mut future_new_set = None; + let mut new_snapshot = KeyServerSetSnapshot { + current_set: vec![(1.into(), address)].into_iter().collect(), + new_set: vec![(2.into(), address)].into_iter().collect(), + ..Default::default() + }; + update_future_set(&mut future_new_set, &mut new_snapshot, Default::default()); + assert_eq!(future_new_set, Some(FutureNewSet { + new_set: vec![(2.into(), address)].into_iter().collect(), + block: Default::default(), + })); + assert_eq!(new_snapshot, KeyServerSetSnapshot { + current_set: vec![(1.into(), address)].into_iter().collect(), + new_set: vec![(1.into(), address)].into_iter().collect(), + ..Default::default() + }); + } + + #[test] + fn future_set_is_updated_when_set_differs() { + let address = "127.0.0.1:12000".parse().unwrap(); + + let mut future_new_set = Some(FutureNewSet { + new_set: vec![(2.into(), address)].into_iter().collect(), + block: Default::default(), + }); + let mut new_snapshot = KeyServerSetSnapshot { + current_set: vec![(1.into(), address)].into_iter().collect(), + new_set: vec![(3.into(), address)].into_iter().collect(), + ..Default::default() + }; + update_future_set(&mut future_new_set, &mut new_snapshot, 1.into()); + assert_eq!(future_new_set, Some(FutureNewSet { + new_set: vec![(3.into(), address)].into_iter().collect(), + block: 1.into(), + })); + assert_eq!(new_snapshot, KeyServerSetSnapshot { + current_set: vec![(1.into(), address)].into_iter().collect(), + new_set: vec![(1.into(), address)].into_iter().collect(), + ..Default::default() + }); + } + + #[test] + fn future_set_is_not_updated_when_set_is_the_same() { + let address = "127.0.0.1:12000".parse().unwrap(); + + let mut future_new_set = Some(FutureNewSet { + new_set: vec![(2.into(), address)].into_iter().collect(), + block: Default::default(), + }); + let mut new_snapshot = KeyServerSetSnapshot { + current_set: vec![(1.into(), address)].into_iter().collect(), + new_set: vec![(2.into(), address)].into_iter().collect(), + ..Default::default() + }; + update_future_set(&mut future_new_set, &mut new_snapshot, 1.into()); + assert_eq!(future_new_set, Some(FutureNewSet { + new_set: vec![(2.into(), address)].into_iter().collect(), + block: Default::default(), + })); + assert_eq!(new_snapshot, KeyServerSetSnapshot { + current_set: vec![(1.into(), address)].into_iter().collect(), + new_set: vec![(1.into(), address)].into_iter().collect(), + ..Default::default() + }); + } + + #[test] + fn when_updating_confirmations_nothing_is_changed_if_no_future_set() { + let address = "127.0.0.1:12000".parse().unwrap(); + + let mut future_new_set = None; + let mut snapshot = KeyServerSetSnapshot { + current_set: vec![(1.into(), address)].into_iter().collect(), + new_set: vec![(1.into(), address)].into_iter().collect(), + ..Default::default() + }; + let snapshot_copy = snapshot.clone(); + update_number_of_confirmations( + &|| 1.into(), + &|_| Some(MIGRATION_CONFIRMATIONS_REQUIRED), + &mut future_new_set, &mut snapshot); + assert_eq!(future_new_set, None); + assert_eq!(snapshot, snapshot_copy); + } + + #[test] + fn when_updating_confirmations_migration_is_scheduled() { + let address = "127.0.0.1:12000".parse().unwrap(); + + let mut future_new_set = Some(FutureNewSet { + new_set: vec![(2.into(), address)].into_iter().collect(), + block: Default::default(), + }); + let mut snapshot = KeyServerSetSnapshot { + current_set: vec![(1.into(), address)].into_iter().collect(), + new_set: vec![(1.into(), address)].into_iter().collect(), + ..Default::default() + }; + update_number_of_confirmations( + &|| 1.into(), + &|_| Some(MIGRATION_CONFIRMATIONS_REQUIRED), + &mut future_new_set, &mut snapshot); + assert_eq!(future_new_set, None); + assert_eq!(snapshot, KeyServerSetSnapshot { + current_set: vec![(1.into(), address)].into_iter().collect(), + new_set: vec![(2.into(), address)].into_iter().collect(), + ..Default::default() + }); + } + + #[test] + fn when_updating_confirmations_migration_is_not_scheduled_when_not_enough_confirmations() { + let address = "127.0.0.1:12000".parse().unwrap(); + + let mut future_new_set = Some(FutureNewSet { + new_set: vec![(2.into(), address)].into_iter().collect(), + block: Default::default(), + }); + let mut snapshot = KeyServerSetSnapshot { + current_set: vec![(1.into(), address)].into_iter().collect(), + new_set: vec![(1.into(), address)].into_iter().collect(), + ..Default::default() + }; + let future_new_set_copy = future_new_set.clone(); + let snapshot_copy = snapshot.clone(); + update_number_of_confirmations( + &|| 1.into(), + &|_| Some(MIGRATION_CONFIRMATIONS_REQUIRED - 1), + &mut future_new_set, &mut snapshot); + assert_eq!(future_new_set, future_new_set_copy); + assert_eq!(snapshot, snapshot_copy); + } + + #[test] + fn when_updating_confirmations_migration_is_reset_when_reorganized() { + let address = "127.0.0.1:12000".parse().unwrap(); + + let mut future_new_set = Some(FutureNewSet { + new_set: vec![(2.into(), address)].into_iter().collect(), + block: 1.into(), + }); + let mut snapshot = KeyServerSetSnapshot { + current_set: vec![(1.into(), address)].into_iter().collect(), + new_set: vec![(1.into(), address)].into_iter().collect(), + ..Default::default() + }; + let snapshot_copy = snapshot.clone(); + update_number_of_confirmations( + &|| 2.into(), + &|_| None, + &mut future_new_set, &mut snapshot); + assert_eq!(future_new_set, Some(FutureNewSet { + new_set: vec![(2.into(), address)].into_iter().collect(), + block: 2.into(), + })); + assert_eq!(snapshot, snapshot_copy); + } } diff --git a/secret_store/src/key_storage.rs b/secret_store/src/key_storage.rs index 798a2c553..7fe3b24db 100644 --- a/secret_store/src/key_storage.rs +++ b/secret_store/src/key_storage.rs @@ -445,6 +445,7 @@ pub mod tests { nodes: BTreeMap::new(), allow_connecting_to_higher_nodes: false, admin_public: None, + auto_migrate_enabled: false, }, }; diff --git a/secret_store/src/lib.rs b/secret_store/src/lib.rs index d2eb7b125..3400e3fae 100644 --- a/secret_store/src/lib.rs +++ b/secret_store/src/lib.rs @@ -79,7 +79,9 @@ pub fn start(client: Arc, sync: Arc, self_key_pair: Arc for ServiceContractListener { /// Returns true when session, related to `server_key_id` must be started on this KeyServer. fn is_processed_by_this_key_server(key_server_set: &KeyServerSet, self_key_pair: &NodeKeyPair, server_key_id: &H256) -> bool { - let servers = key_server_set.get(); + let servers = key_server_set.snapshot().current_set; let total_servers_count = servers.len(); match total_servers_count { 0 => return false, diff --git a/secret_store/src/node_key_pair.rs b/secret_store/src/node_key_pair.rs index 29f1ffec6..f50c765f2 100644 --- a/secret_store/src/node_key_pair.rs +++ b/secret_store/src/node_key_pair.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use ethcrypto::ecdh::agree; -use ethkey::{KeyPair, Public, Signature, Error as EthKeyError, sign}; +use ethkey::{KeyPair, Public, Signature, Error as EthKeyError, sign, public_to_address}; use ethcore::account_provider::AccountProvider; use bigint::hash::H256; use util::Address; @@ -46,6 +46,10 @@ impl NodeKeyPair for PlainNodeKeyPair { self.key_pair.public() } + fn address(&self) -> Address { + public_to_address(self.key_pair.public()) + } + fn sign(&self, data: &H256) -> Result { sign(self.key_pair.secret(), data) } @@ -73,6 +77,10 @@ impl NodeKeyPair for KeyStoreNodeKeyPair { &self.public } + fn address(&self) -> Address { + public_to_address(&self.public) + } + fn sign(&self, data: &H256) -> Result { self.account_provider.sign(self.address.clone(), Some(self.password.clone()), data.clone()) .map_err(|e| EthKeyError::Custom(format!("{}", e))) diff --git a/secret_store/src/traits.rs b/secret_store/src/traits.rs index 0982f40e3..15e1af84d 100644 --- a/secret_store/src/traits.rs +++ b/secret_store/src/traits.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . use std::collections::BTreeSet; -use ethkey::{KeyPair, Signature, Error as EthKeyError}; +use ethkey::{KeyPair, Signature, Address, Error as EthKeyError}; use bigint::hash::H256; use types::all::{Error, Public, ServerKeyId, MessageHash, EncryptedMessageSignature, RequestSignature, EncryptedDocumentKey, EncryptedDocumentKeyShadow, NodeId}; @@ -24,6 +24,8 @@ use types::all::{Error, Public, ServerKeyId, MessageHash, EncryptedMessageSignat pub trait NodeKeyPair: Send + Sync { /// Public portion of key. fn public(&self) -> &Public; + /// Address of key owner. + fn address(&self) -> Address; /// Sign data with node key. fn sign(&self, data: &H256) -> Result; /// Compute shared key to encrypt channel between two nodes. diff --git a/secret_store/src/types/all.rs b/secret_store/src/types/all.rs index 70ac0b2b0..5b628fa1f 100644 --- a/secret_store/src/types/all.rs +++ b/secret_store/src/types/all.rs @@ -101,6 +101,9 @@ pub struct ClusterConfiguration { pub allow_connecting_to_higher_nodes: bool, /// Administrator public key. pub admin_public: Option, + /// Should key servers set change session should be started when servers set changes. + /// This will only work when servers set is configured using KeyServerSet contract. + pub auto_migrate_enabled: bool, } /// Shadow decryption result.