SecretStore: tx retry pattern (#7323)

* SecretStore: auto migrate initial commit

* SecretStore: flush automigrate

* SecretStore: debug flush

* SecretStore: added migration to KeyServerSet contract

* SecretStore: flush automigrate

* SecretStore: flush before testing

* SecretStore: flush

* SecretStore: flush

* SecretStore: bunch of tests for simple ConnectionTrigger && KeyServerSet

* SecretStore: started work on TriggerWithMigration tests

* SecretStore: maintain_session tests

* SecretStore: updated some comments

* SecretStore pass migration_id to ServerSetChangeSession

* SecretStore: fixed lock scope

* SecretStore: fixed error response condition

* SecretStore: fixed ServerSetChange session auto-creation

* SecretStore: update active migration in connector

* removed commented code

* SecretStore: fixed tests compilation

* SecretStore: marked test-related unimplemented-s

* SecretStore: labeled all SS TODOs

* SecretStore: do not read auto-migration stuff when auto-migration is disabled + stripped KeyServerSet contract ABI

* SecretStore: ignore duplicated addresses in KeyServerSet

* fied compilation

* SecretStore: wait for N block confirmations before starting auto-migration

* SecretStore: more tests for migration delay

* SecretStore: clear current nodes set when KSS contract is uninstalled

* SecretStore: retry transaction interval
This commit is contained in:
Svyatoslav Nikolsky 2018-01-10 13:33:45 +03:00 committed by Afri Schoedon
parent 7e0928b8a2
commit b685b7fae3
29 changed files with 2039 additions and 228 deletions

View File

@ -1 +1 @@
[{"constant":true,"inputs":[{"name":"","type":"uint256"}],"name":"keyServersList","outputs":[{"name":"","type":"address"}],"payable":false,"type":"function"},{"constant":false,"inputs":[{"name":"_new","type":"address"}],"name":"setOwner","outputs":[],"payable":false,"type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getKeyServerPublic","outputs":[{"name":"","type":"bytes"}],"payable":false,"type":"function"},{"constant":true,"inputs":[],"name":"getKeyServers","outputs":[{"name":"","type":"address[]"}],"payable":false,"type":"function"},{"constant":true,"inputs":[],"name":"owner","outputs":[{"name":"","type":"address"}],"payable":false,"type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getKeyServerAddress","outputs":[{"name":"","type":"string"}],"payable":false,"type":"function"},{"constant":false,"inputs":[{"name":"keyServer","type":"address"}],"name":"removeKeyServer","outputs":[],"payable":false,"type":"function"},{"constant":false,"inputs":[{"name":"keyServerPublic","type":"bytes"},{"name":"keyServerIp","type":"string"}],"name":"addKeyServer","outputs":[],"payable":false,"type":"function"},{"anonymous":false,"inputs":[{"indexed":false,"name":"keyServer","type":"address"}],"name":"KeyServerAdded","type":"event"},{"anonymous":false,"inputs":[{"indexed":false,"name":"keyServer","type":"address"}],"name":"KeyServerRemoved","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"old","type":"address"},{"indexed":true,"name":"current","type":"address"}],"name":"NewOwner","type":"event"}]
[{"constant":true,"inputs":[],"name":"getMigrationMaster","outputs":[{"name":"","type":"address"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getMigrationKeyServerPublic","outputs":[{"name":"","type":"bytes"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"id","type":"bytes32"}],"name":"startMigration","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getMigrationKeyServerAddress","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"getMigrationId","outputs":[{"name":"","type":"bytes32"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"getNewKeyServers","outputs":[{"name":"","type":"address[]"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"id","type":"bytes32"}],"name":"confirmMigration","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"getMigrationKeyServers","outputs":[{"name":"","type":"address[]"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"isMigrationConfirmed","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"getCurrentKeyServers","outputs":[{"name":"","type":"address[]"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getCurrentKeyServerPublic","outputs":[{"name":"","type":"bytes"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getNewKeyServerAddress","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getCurrentKeyServerAddress","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getNewKeyServerPublic","outputs":[{"name":"","type":"bytes"}],"payable":false,"stateMutability":"view","type":"function"},{"anonymous":false,"inputs":[{"indexed":false,"name":"keyServer","type":"address"}],"name":"KeyServerAdded","type":"event"},{"anonymous":false,"inputs":[{"indexed":false,"name":"keyServer","type":"address"}],"name":"KeyServerRemoved","type":"event"},{"anonymous":false,"inputs":[],"name":"MigrationStarted","type":"event"},{"anonymous":false,"inputs":[],"name":"MigrationCompleted","type":"event"}]

View File

@ -556,6 +556,10 @@ usage! {
"--no-acl-check",
"Disable ACL check (useful for test environments).",
FLAG flag_no_secretstore_auto_migrate: (bool) = false, or |c: &Config| otry!(c.secretstore).disable_auto_migrate.clone(),
"--no-secretstore-auto-migrate",
"Do not run servers set change session automatically when servers set changes. This option has no effect when servers set is read from configuration file.",
ARG arg_secretstore_contract: (String) = "none", or |c: &Config| otry!(c.secretstore).service_contract.clone(),
"--secretstore-contract=[SOURCE]",
"Secret Store Service contract address source: none, registry (contract address is read from registry) or address.",
@ -589,7 +593,7 @@ usage! {
"Hex-encoded secret key of this node.",
ARG arg_secretstore_admin_public: (Option<String>) = None, or |c: &Config| otry!(c.secretstore).admin_public.clone(),
"--secretstore-admin-public=[PUBLIC]",
"--secretstore-admin=[PUBLIC]",
"Hex-encoded public key of secret store administrator.",
["Sealing/Mining options"]
@ -1111,6 +1115,7 @@ struct SecretStore {
disable: Option<bool>,
disable_http: Option<bool>,
disable_acl_check: Option<bool>,
disable_auto_migrate: Option<bool>,
service_contract: Option<String>,
self_secret: Option<String>,
admin_public: Option<String>,
@ -1519,9 +1524,11 @@ mod tests {
arg_dapps_path: "$HOME/.parity/dapps".into(),
flag_no_dapps: false,
// SECRETSTORE
flag_no_secretstore: false,
flag_no_secretstore_http: false,
flag_no_secretstore_acl_check: false,
flag_no_secretstore_auto_migrate: false,
arg_secretstore_contract: "none".into(),
arg_secretstore_secret: None,
arg_secretstore_admin_public: None,
@ -1773,6 +1780,7 @@ mod tests {
disable: None,
disable_http: None,
disable_acl_check: None,
disable_auto_migrate: None,
service_contract: None,
self_secret: None,
admin_public: None,

View File

@ -610,6 +610,7 @@ impl Configuration {
enabled: self.secretstore_enabled(),
http_enabled: self.secretstore_http_enabled(),
acl_check_enabled: self.secretstore_acl_check_enabled(),
auto_migrate_enabled: self.secretstore_auto_migrate_enabled(),
service_contract_address: self.secretstore_service_contract_address()?,
self_secret: self.secretstore_self_secret()?,
nodes: self.secretstore_nodes()?,
@ -1088,6 +1089,10 @@ impl Configuration {
!self.args.flag_no_secretstore_acl_check
}
fn secretstore_auto_migrate_enabled(&self) -> bool {
!self.args.flag_no_secretstore_auto_migrate
}
fn secretstore_service_contract_address(&self) -> Result<Option<SecretStoreContractAddress>, String> {
Ok(match self.args.arg_secretstore_contract.as_ref() {
"none" => None,

View File

@ -51,6 +51,8 @@ pub struct Configuration {
pub http_enabled: bool,
/// Is ACL check enabled.
pub acl_check_enabled: bool,
/// Is auto migrate enabled.
pub auto_migrate_enabled: bool,
/// Service contract address.
pub service_contract_address: Option<ContractAddress>,
/// This node secret.
@ -166,6 +168,7 @@ mod server {
})).collect(),
allow_connecting_to_higher_nodes: true,
admin_public: conf.admin_public,
auto_migrate_enabled: conf.auto_migrate_enabled,
},
};
@ -190,6 +193,7 @@ impl Default for Configuration {
enabled: true,
http_enabled: true,
acl_check_enabled: true,
auto_migrate_enabled: true,
service_contract_address: None,
self_secret: None,
admin_public: None,

View File

@ -63,7 +63,7 @@ impl KeyServer for KeyServerImpl {}
impl AdminSessionsServer for KeyServerImpl {
fn change_servers_set(&self, old_set_signature: RequestSignature, new_set_signature: RequestSignature, new_servers_set: BTreeSet<NodeId>) -> Result<(), Error> {
let servers_set_change_session = self.data.lock().cluster
.new_servers_set_change_session(None, new_servers_set, old_set_signature, new_set_signature)?;
.new_servers_set_change_session(None, None, new_servers_set, old_set_signature, new_set_signature)?;
servers_set_change_session.as_servers_set_change()
.expect("new_servers_set_change_session creates servers_set_change_session; qed")
.wait().map_err(Into::into)
@ -164,6 +164,7 @@ impl KeyServerCore {
acl_storage: acl_storage,
key_storage: key_storage,
admin_public: config.admin_public.clone(),
auto_migrate_enabled: config.auto_migrate_enabled,
};
let (stop, stopped) = futures::oneshot();
@ -229,7 +230,7 @@ pub mod tests {
impl AdminSessionsServer for DummyKeyServer {
fn change_servers_set(&self, _old_set_signature: RequestSignature, _new_set_signature: RequestSignature, _new_servers_set: BTreeSet<NodeId>) -> Result<(), Error> {
unimplemented!()
unimplemented!("test-only")
}
}
@ -242,25 +243,25 @@ pub mod tests {
impl DocumentKeyServer for DummyKeyServer {
fn store_document_key(&self, _key_id: &ServerKeyId, _signature: &RequestSignature, _common_point: Public, _encrypted_document_key: Public) -> Result<(), Error> {
unimplemented!()
unimplemented!("test-only")
}
fn generate_document_key(&self, _key_id: &ServerKeyId, _signature: &RequestSignature, _threshold: usize) -> Result<EncryptedDocumentKey, Error> {
unimplemented!()
unimplemented!("test-only")
}
fn restore_document_key(&self, _key_id: &ServerKeyId, _signature: &RequestSignature) -> Result<EncryptedDocumentKey, Error> {
unimplemented!()
unimplemented!("test-only")
}
fn restore_document_key_shadow(&self, _key_id: &ServerKeyId, _signature: &RequestSignature) -> Result<EncryptedDocumentKeyShadow, Error> {
unimplemented!()
unimplemented!("test-only")
}
}
impl MessageSigner for DummyKeyServer {
fn sign_message(&self, _key_id: &ServerKeyId, _signature: &RequestSignature, _message: MessageHash) -> Result<EncryptedMessageSignature, Error> {
unimplemented!()
unimplemented!("test-only")
}
}
@ -279,6 +280,7 @@ pub mod tests {
})).collect(),
allow_connecting_to_higher_nodes: false,
admin_public: None,
auto_migrate_enabled: false,
}).collect();
let key_servers_set: BTreeMap<Public, SocketAddr> = configs[0].nodes.iter()
.map(|(k, a)| (k.clone(), format!("{}:{}", a.address, a.port).parse().unwrap()))
@ -469,6 +471,6 @@ pub mod tests {
#[test]
fn servers_set_change_session_works_over_network() {
// TODO
// TODO [Test]
}
}

View File

@ -27,7 +27,7 @@ use key_server_cluster::signing_session::SessionImpl as SigningSession;
use key_server_cluster::message::{Message, KeyVersionNegotiationMessage, RequestKeyVersions, KeyVersions};
use key_server_cluster::admin_sessions::ShareChangeSessionMeta;
// TODO: optimizations: change sessions so that versions are sent by chunks.
// TODO [Opt]: change sessions so that versions are sent by chunks.
/// Number of versions sent in single message.
const VERSIONS_PER_MESSAGE: usize = 32;

View File

@ -18,6 +18,7 @@ use std::sync::Arc;
use std::collections::{BTreeSet, BTreeMap};
use std::collections::btree_map::Entry;
use parking_lot::{Mutex, Condvar};
use bigint::hash::H256;
use ethkey::{Public, Signature};
use key_server_cluster::{Error, NodeId, SessionId, KeyStorage};
use key_server_cluster::math;
@ -90,6 +91,8 @@ struct SessionCore {
pub all_nodes_set: BTreeSet<NodeId>,
/// Administrator public key.
pub admin_public: Public,
/// Migration id (if this session is a part of auto-migration process).
pub migration_id: Option<H256>,
/// SessionImpl completion condvar.
pub completed: Condvar,
}
@ -141,6 +144,8 @@ pub struct SessionParams {
pub all_nodes_set: BTreeSet<NodeId>,
/// Administrator public key.
pub admin_public: Public,
/// Migration id (if this session is a part of auto-migration process).
pub migration_id: Option<H256>,
}
/// Servers set change consensus transport.
@ -149,6 +154,8 @@ struct ServersSetChangeConsensusTransport {
id: SessionId,
/// Session-level nonce.
nonce: u64,
/// Migration id (if part of auto-migration process).
migration_id: Option<H256>,
/// Cluster.
cluster: Arc<Cluster>,
}
@ -184,6 +191,7 @@ impl SessionImpl {
nonce: params.nonce,
all_nodes_set: params.all_nodes_set,
admin_public: params.admin_public,
migration_id: params.migration_id,
completed: Condvar::new(),
},
data: Mutex::new(SessionData {
@ -205,6 +213,11 @@ impl SessionImpl {
&self.core.meta.id
}
/// Get migration id.
pub fn migration_id(&self) -> Option<&H256> {
self.core.migration_id.as_ref()
}
/// Wait for session completion.
pub fn wait(&self) -> Result<(), Error> {
Self::wait_session(&self.core.completed, &self.data, None, |data| data.result.clone())
@ -229,6 +242,7 @@ impl SessionImpl {
consensus_transport: ServersSetChangeConsensusTransport {
id: self.core.meta.id.clone(),
nonce: self.core.nonce,
migration_id: self.core.migration_id.clone(),
cluster: self.core.cluster.clone(),
},
})?;
@ -296,6 +310,7 @@ impl SessionImpl {
consensus_transport: ServersSetChangeConsensusTransport {
id: self.core.meta.id.clone(),
nonce: self.core.nonce,
migration_id: self.core.migration_id.clone(),
cluster: self.core.cluster.clone(),
},
})?);
@ -723,7 +738,7 @@ impl SessionImpl {
},
sub_session: math::generate_random_scalar()?,
key_share: key_share,
result_computer: Arc::new(LargestSupportResultComputer {}), // TODO: optimizations: could use modified Fast version
result_computer: Arc::new(LargestSupportResultComputer {}), // TODO [Opt]: could use modified Fast version
transport: ServersSetChangeKeyVersionNegotiationTransport {
id: core.meta.id.clone(),
nonce: core.nonce,
@ -937,6 +952,7 @@ impl JobTransport for ServersSetChangeConsensusTransport {
session: self.id.clone().into(),
session_nonce: self.nonce,
message: ConsensusMessageWithServersSet::InitializeConsensusSession(InitializeConsensusSessionWithServersSet {
migration_id: self.migration_id.clone().map(Into::into),
old_nodes_set: request.old_servers_set.into_iter().map(Into::into).collect(),
new_nodes_set: request.new_servers_set.into_iter().map(Into::into).collect(),
old_set_signature: request.old_set_signature.into(),
@ -1036,6 +1052,7 @@ pub mod tests {
key_storage: key_storage,
nonce: 1,
admin_public: admin_public,
migration_id: None,
}).unwrap()
}

View File

@ -29,7 +29,7 @@ pub struct SessionsQueue {
impl SessionsQueue {
/// Create new sessions queue.
pub fn new(key_storage: &Arc<KeyStorage>, unknown_sessions: BTreeSet<SessionId>) -> Self {
// TODO: optimizations:
// TODO [Opt]:
// 1) known sessions - change to iter
// 2) unknown sesions - request chunk-by-chunk
SessionsQueue {

View File

@ -516,7 +516,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
return Ok(())
}
// TODO: find a way to verificate keys
// TODO [Trust]: find a way to verificate keys
Self::complete_session(&self.core, &mut *data)
}
@ -600,7 +600,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
return Ok(())
}
// TODO: find a way to verificate keys
// TODO [Trust]: find a way to verificate keys
Self::complete_session(core, data)
}

View File

@ -151,8 +151,8 @@ impl SessionImpl {
initialization_confirmed: &n == self.node(),
})));
// TODO: id signature is not enough here, as it was already used in key generation
// TODO: there could be situation when some nodes have failed to store encrypted data
// TODO [Sec]: id signature is not enough here, as it was already used in key generation
// TODO [Reliability]: there could be situation when some nodes have failed to store encrypted data
// => potential problems during restore. some confirmation step is needed (2pc)?
// save encryption data
if let Some(mut encrypted_data) = self.encrypted_data.clone() {

View File

@ -41,6 +41,8 @@ use key_server_cluster::key_version_negotiation_session::{SessionImpl as KeyVers
IsolatedSessionTransport as KeyVersionNegotiationSessionTransport, ContinueAction};
use key_server_cluster::io::{DeadlineStatus, ReadMessage, SharedTcpStream, read_encrypted_message, WriteMessage, write_encrypted_message};
use key_server_cluster::net::{accept_connection as net_accept_connection, connect as net_connect, Connection as NetConnection};
use key_server_cluster::connection_trigger::{Maintain, ConnectionTrigger, SimpleConnectionTrigger, ServersSetChangeSessionCreatorConnector};
use key_server_cluster::connection_trigger_with_migration::ConnectionTriggerWithMigration;
/// Maintain interval (seconds). Every MAINTAIN_INTERVAL seconds node:
/// 1) checks if connected nodes are responding to KeepAlive messages
@ -56,7 +58,7 @@ const KEEP_ALIVE_SEND_INTERVAL: u64 = 30;
const KEEP_ALIVE_DISCONNECT_INTERVAL: u64 = 60;
/// Empty future.
type BoxedEmptyFuture = Box<Future<Item = (), Error = ()> + Send>;
pub type BoxedEmptyFuture = Box<Future<Item = (), Error = ()> + Send>;
/// Cluster interface for external clients.
pub trait ClusterClient: Send + Sync {
@ -73,7 +75,7 @@ pub trait ClusterClient: Send + Sync {
/// Start new key version negotiation session.
fn new_key_version_negotiation_session(&self, session_id: SessionId) -> Result<Arc<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>, Error>;
/// Start new servers set change session.
fn new_servers_set_change_session(&self, session_id: Option<SessionId>, new_nodes_set: BTreeSet<NodeId>, old_set_signature: Signature, new_set_signature: Signature) -> Result<Arc<AdminSession>, Error>;
fn new_servers_set_change_session(&self, session_id: Option<SessionId>, migration_id: Option<H256>, new_nodes_set: BTreeSet<NodeId>, old_set_signature: Signature, new_set_signature: Signature) -> Result<Arc<AdminSession>, Error>;
/// Listen for new generation sessions.
fn add_generation_listener(&self, listener: Arc<ClusterSessionsListener<GenerationSession>>);
@ -123,6 +125,9 @@ pub struct ClusterConfiguration {
pub acl_storage: Arc<AclStorage>,
/// Administrator public key.
pub admin_public: Option<Public>,
/// Should key servers set change session should be started when servers set changes.
/// This will only work when servers set is configured using KeyServerSet contract.
pub auto_migrate_enabled: bool,
}
/// Cluster state.
@ -168,16 +173,21 @@ pub struct ClusterData {
pub sessions: ClusterSessions,
}
/// Connections that are forming the cluster.
/// Connections that are forming the cluster. Lock order: trigger.lock() -> data.lock().
pub struct ClusterConnections {
/// Self node id.
pub self_node_id: NodeId,
/// All known other key servers.
pub key_server_set: Arc<KeyServerSet>,
/// Connections trigger.
pub trigger: Mutex<Box<ConnectionTrigger>>,
/// Servers set change session creator connector.
pub connector: Arc<ServersSetChangeSessionCreatorConnector>,
/// Connections data.
pub data: RwLock<ClusterConnectionsData>,
}
#[derive(Default)]
/// Cluster connections data.
pub struct ClusterConnectionsData {
/// Active key servers set.
@ -214,7 +224,8 @@ impl ClusterCore {
pub fn new(handle: Handle, config: ClusterConfiguration) -> Result<Arc<Self>, Error> {
let listen_address = make_socket_address(&config.listen_address.0, config.listen_address.1)?;
let connections = ClusterConnections::new(&config)?;
let sessions = ClusterSessions::new(&config);
let servers_set_change_creator_connector = connections.connector.clone();
let sessions = ClusterSessions::new(&config, servers_set_change_creator_connector);
let data = ClusterData::new(&handle, config, connections, sessions);
Ok(Arc::new(ClusterCore {
@ -349,7 +360,7 @@ impl ClusterCore {
Err(err) => {
warn!(target: "secretstore_net", "{}: network error '{}' when reading message from node {}", data.self_key_pair.public(), err, connection.node_id());
// close connection
data.connections.remove(connection.node_id(), connection.is_inbound());
data.connections.remove(data.clone(), connection.node_id(), connection.is_inbound());
Box::new(failed(err))
},
}
@ -362,7 +373,7 @@ impl ClusterCore {
for connection in data.connections.active_connections() {
let last_message_diff = time::Instant::now() - connection.last_message_time();
if last_message_diff > time::Duration::from_secs(KEEP_ALIVE_DISCONNECT_INTERVAL) {
data.connections.remove(connection.node_id(), connection.is_inbound());
data.connections.remove(data.clone(), connection.node_id(), connection.is_inbound());
data.sessions.on_connection_timeout(connection.node_id());
}
else if last_message_diff > time::Duration::from_secs(KEEP_ALIVE_SEND_INTERVAL) {
@ -373,13 +384,12 @@ impl ClusterCore {
/// Try to connect to every disconnected node.
fn connect_disconnected_nodes(data: Arc<ClusterData>) {
// do not update nodes set if any admin session is active
// this could happen, but will possibly lead to admin session error
// => should be performed later
if data.sessions.admin_sessions.is_empty() {
data.connections.update_nodes_set();
let r = data.connections.update_nodes_set(data.clone());
if let Some(r) = r {
data.spawn(r);
}
// connect to disconnected nodes
for (node_id, node_address) in data.connections.disconnected_nodes() {
if data.config.allow_connecting_to_higher_nodes || data.self_key_pair.public() < &node_id {
ClusterCore::connect(data.clone(), node_address);
@ -392,7 +402,7 @@ impl ClusterCore {
match result {
Ok(DeadlineStatus::Meet(Ok(connection))) => {
let connection = Connection::new(outbound_addr.is_none(), connection);
if data.connections.insert(connection.clone()) {
if data.connections.insert(data.clone(), connection.clone()) {
ClusterCore::process_connection_messages(data.clone(), connection)
} else {
Box::new(finished(Ok(())))
@ -433,8 +443,16 @@ impl ClusterCore {
.map(|_| ()).unwrap_or_default(),
Message::Signing(message) => Self::process_message(&data, &data.sessions.signing_sessions, connection, Message::Signing(message))
.map(|_| ()).unwrap_or_default(),
Message::ServersSetChange(message) => Self::process_message(&data, &data.sessions.admin_sessions, connection, Message::ServersSetChange(message))
.map(|_| ()).unwrap_or_default(),
Message::ServersSetChange(message) => {
let message = Message::ServersSetChange(message);
let is_initialization_message = message.is_initialization_message();
let session = Self::process_message(&data, &data.sessions.admin_sessions, connection, message);
if is_initialization_message {
if let Some(session) = session {
data.connections.servers_set_change_creator_connector().set_key_servers_set_change_session(session.clone());
}
}
}
Message::KeyVersionNegotiation(message) => {
let session = Self::process_message(&data, &data.sessions.negotiation_sessions, connection, Message::KeyVersionNegotiation(message));
Self::try_continue_session(&data, session);
@ -518,6 +536,7 @@ impl ClusterCore {
let creation_data = SC::creation_data_from_message(&message)?;
let master = if is_initialization_message { sender.clone() } else { data.self_key_pair.public().clone() };
let cluster = create_cluster_view(data, requires_all_connections(&message))?;
sessions.insert(cluster, master, session_id, Some(message.session_nonce().ok_or(Error::InvalidMessage)?), message.is_exclusive_session_message(), creation_data)
},
}
@ -537,7 +556,7 @@ impl ClusterCore {
// this is new session => it is not yet in container
warn!(target: "secretstore_net", "{}: {} session read error '{}' when requested for session from node {}",
data.self_key_pair.public(), S::type_name(), error, sender);
if message.is_initialization_message() {
if !message.is_error_message() {
let session_id = message.into_session_id().expect("session_id only fails for cluster messages; only session messages are passed to process_message; qed");
let session_nonce = message.session_nonce().expect("session_nonce only fails for cluster messages; only session messages are passed to process_message; qed");
data.spawn(connection.send_message(SC::make_error_message(session_id, session_nonce, error)));
@ -604,12 +623,21 @@ impl ClusterCore {
impl ClusterConnections {
pub fn new(config: &ClusterConfiguration) -> Result<Self, Error> {
let mut nodes = config.key_server_set.get();
let mut nodes = config.key_server_set.snapshot().current_set;
nodes.remove(config.self_key_pair.public());
let trigger: Box<ConnectionTrigger> = match config.auto_migrate_enabled {
false => Box::new(SimpleConnectionTrigger::new(config.key_server_set.clone(), config.self_key_pair.clone(), config.admin_public.clone())),
true if config.admin_public.is_none() => Box::new(ConnectionTriggerWithMigration::new(config.key_server_set.clone(), config.self_key_pair.clone())),
true => return Err(Error::Io("secret store admininstrator public key is specified with auto-migration enabled".into())), // TODO [Refac]: Io -> Internal
};
let connector = trigger.servers_set_change_creator_connector();
Ok(ClusterConnections {
self_node_id: config.self_key_pair.public().clone(),
key_server_set: config.key_server_set.clone(),
trigger: Mutex::new(trigger),
connector: connector,
data: RwLock::new(ClusterConnectionsData {
nodes: nodes,
connections: BTreeMap::new(),
@ -627,39 +655,54 @@ impl ClusterConnections {
self.data.read().connections.get(node).cloned()
}
pub fn insert(&self, connection: Arc<Connection>) -> bool {
let mut data = self.data.write();
if !data.nodes.contains_key(connection.node_id()) {
// incoming connections are checked here
trace!(target: "secretstore_net", "{}: ignoring unknown connection from {} at {}", self.self_node_id, connection.node_id(), connection.node_address());
debug_assert!(connection.is_inbound());
return false;
}
if data.connections.contains_key(connection.node_id()) {
// we have already connected to the same node
// the agreement is that node with lower id must establish connection to node with higher id
if (&self.self_node_id < connection.node_id() && connection.is_inbound())
|| (&self.self_node_id > connection.node_id() && !connection.is_inbound()) {
pub fn insert(&self, data: Arc<ClusterData>, connection: Arc<Connection>) -> bool {
{
let mut data = self.data.write();
if !data.nodes.contains_key(connection.node_id()) {
// incoming connections are checked here
trace!(target: "secretstore_net", "{}: ignoring unknown connection from {} at {}", self.self_node_id, connection.node_id(), connection.node_address());
debug_assert!(connection.is_inbound());
return false;
}
if data.connections.contains_key(connection.node_id()) {
// we have already connected to the same node
// the agreement is that node with lower id must establish connection to node with higher id
if (&self.self_node_id < connection.node_id() && connection.is_inbound())
|| (&self.self_node_id > connection.node_id() && !connection.is_inbound()) {
return false;
}
}
let node = connection.node_id().clone();
trace!(target: "secretstore_net", "{}: inserting connection to {} at {}. Connected to {} of {} nodes",
self.self_node_id, node, connection.node_address(), data.connections.len() + 1, data.nodes.len());
data.connections.insert(node.clone(), connection.clone());
}
trace!(target: "secretstore_net", "{}: inserting connection to {} at {}. Connected to {} of {} nodes",
self.self_node_id, connection.node_id(), connection.node_address(), data.connections.len() + 1, data.nodes.len());
data.connections.insert(connection.node_id().clone(), connection);
let maintain_action = self.trigger.lock().on_connection_established(connection.node_id());
self.maintain_connection_trigger(maintain_action, data);
true
}
pub fn remove(&self, node: &NodeId, is_inbound: bool) {
let mut data = self.data.write();
if let Entry::Occupied(entry) = data.connections.entry(node.clone()) {
if entry.get().is_inbound() != is_inbound {
pub fn remove(&self, data: Arc<ClusterData>, node: &NodeId, is_inbound: bool) {
{
let mut data = &mut *self.data.write();
if let Entry::Occupied(entry) = data.connections.entry(node.clone()) {
if entry.get().is_inbound() != is_inbound {
return;
}
trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address());
entry.remove_entry();
} else {
return;
}
trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address());
entry.remove_entry();
}
let maintain_action = self.trigger.lock().on_connection_closed(node);
self.maintain_connection_trigger(maintain_action, data);
}
pub fn connected_nodes(&self) -> BTreeSet<NodeId> {
@ -678,47 +721,25 @@ impl ClusterConnections {
.collect()
}
pub fn update_nodes_set(&self) {
let mut data = self.data.write();
let mut new_nodes = self.key_server_set.get();
// we do not need to connect to self
// + we do not need to try to connect to any other node if we are not the part of a cluster
if new_nodes.remove(&self.self_node_id).is_none() {
new_nodes.clear();
pub fn servers_set_change_creator_connector(&self) -> Arc<ServersSetChangeSessionCreatorConnector> {
self.connector.clone()
}
pub fn update_nodes_set(&self, data: Arc<ClusterData>) -> Option<BoxedEmptyFuture> {
let maintain_action = self.trigger.lock().on_maintain();
self.maintain_connection_trigger(maintain_action, data);
None
}
fn maintain_connection_trigger(&self, maintain_action: Option<Maintain>, data: Arc<ClusterData>) {
if maintain_action == Some(Maintain::SessionAndConnections) || maintain_action == Some(Maintain::Session) {
let client = ClusterClientImpl::new(data);
self.trigger.lock().maintain_session(&client);
}
let mut num_added_nodes = 0;
let mut num_removed_nodes = 0;
let mut num_changed_nodes = 0;
for obsolete_node in data.nodes.keys().cloned().collect::<Vec<_>>() {
if !new_nodes.contains_key(&obsolete_node) {
if let Entry::Occupied(entry) = data.connections.entry(obsolete_node) {
trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address());
entry.remove();
}
data.nodes.remove(&obsolete_node);
num_removed_nodes += 1;
}
}
for (new_node_public, new_node_addr) in new_nodes {
match data.nodes.insert(new_node_public, new_node_addr) {
None => num_added_nodes += 1,
Some(old_node_addr) => if new_node_addr != old_node_addr {
if let Entry::Occupied(entry) = data.connections.entry(new_node_public) {
trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address());
entry.remove();
}
num_changed_nodes += 1;
},
}
}
if num_added_nodes != 0 || num_removed_nodes != 0 || num_changed_nodes != 0 {
trace!(target: "secretstore_net", "{}: updated nodes set: removed {}, added {}, changed {}. Connected to {} of {} nodes",
self.self_node_id, num_removed_nodes, num_added_nodes, num_changed_nodes, data.connections.len(), data.nodes.len());
if maintain_action == Some(Maintain::SessionAndConnections) || maintain_action == Some(Maintain::Connections) {
let mut trigger = self.trigger.lock();
let mut data = self.data.write();
trigger.maintain_connections(&mut *data);
}
}
}
@ -952,7 +973,7 @@ impl ClusterClient for ClusterClientImpl {
Ok(session)
}
fn new_servers_set_change_session(&self, session_id: Option<SessionId>, new_nodes_set: BTreeSet<NodeId>, old_set_signature: Signature, new_set_signature: Signature) -> Result<Arc<AdminSession>, Error> {
fn new_servers_set_change_session(&self, session_id: Option<SessionId>, migration_id: Option<H256>, new_nodes_set: BTreeSet<NodeId>, old_set_signature: Signature, new_set_signature: Signature) -> Result<Arc<AdminSession>, Error> {
let mut connected_nodes = self.data.connections.connected_nodes();
connected_nodes.insert(self.data.self_key_pair.public().clone());
@ -963,12 +984,16 @@ impl ClusterClient for ClusterClientImpl {
};
let cluster = create_cluster_view(&self.data, true)?;
let session = self.data.sessions.admin_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id, None, true, Some(AdminSessionCreationData::ServersSetChange))?;
let creation_data = Some(AdminSessionCreationData::ServersSetChange(migration_id, new_nodes_set.clone()));
let session = self.data.sessions.admin_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id, None, true, creation_data)?;
let initialization_result = session.as_servers_set_change().expect("servers set change session is created; qed")
.initialize(new_nodes_set, old_set_signature, new_set_signature);
match initialization_result {
Ok(()) => Ok(session),
Ok(()) => {
self.data.connections.servers_set_change_creator_connector().set_key_servers_set_change_session(session.clone());
Ok(session)
},
Err(error) => {
self.data.sessions.admin_sessions.remove(&session.id());
Err(error)
@ -1042,20 +1067,20 @@ pub mod tests {
}
impl ClusterClient for DummyClusterClient {
fn cluster_state(&self) -> ClusterState { unimplemented!() }
fn new_generation_session(&self, _session_id: SessionId, _author: Public, _threshold: usize) -> Result<Arc<GenerationSession>, Error> { unimplemented!() }
fn new_encryption_session(&self, _session_id: SessionId, _requestor_signature: Signature, _common_point: Public, _encrypted_point: Public) -> Result<Arc<EncryptionSession>, Error> { unimplemented!() }
fn new_decryption_session(&self, _session_id: SessionId, _requestor_signature: Signature, _version: Option<H256>, _is_shadow_decryption: bool) -> Result<Arc<DecryptionSession>, Error> { unimplemented!() }
fn new_signing_session(&self, _session_id: SessionId, _requestor_signature: Signature, _version: Option<H256>, _message_hash: H256) -> Result<Arc<SigningSession>, Error> { unimplemented!() }
fn new_key_version_negotiation_session(&self, _session_id: SessionId) -> Result<Arc<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>, Error> { unimplemented!() }
fn new_servers_set_change_session(&self, _session_id: Option<SessionId>, _new_nodes_set: BTreeSet<NodeId>, _old_set_signature: Signature, _new_set_signature: Signature) -> Result<Arc<AdminSession>, Error> { unimplemented!() }
fn cluster_state(&self) -> ClusterState { unimplemented!("test-only") }
fn new_generation_session(&self, _session_id: SessionId, _author: Public, _threshold: usize) -> Result<Arc<GenerationSession>, Error> { unimplemented!("test-only") }
fn new_encryption_session(&self, _session_id: SessionId, _requestor_signature: Signature, _common_point: Public, _encrypted_point: Public) -> Result<Arc<EncryptionSession>, Error> { unimplemented!("test-only") }
fn new_decryption_session(&self, _session_id: SessionId, _requestor_signature: Signature, _version: Option<H256>, _is_shadow_decryption: bool) -> Result<Arc<DecryptionSession>, Error> { unimplemented!("test-only") }
fn new_signing_session(&self, _session_id: SessionId, _requestor_signature: Signature, _version: Option<H256>, _message_hash: H256) -> Result<Arc<SigningSession>, Error> { unimplemented!("test-only") }
fn new_key_version_negotiation_session(&self, _session_id: SessionId) -> Result<Arc<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>, Error> { unimplemented!("test-only") }
fn new_servers_set_change_session(&self, _session_id: Option<SessionId>, _migration_id: Option<H256>, _new_nodes_set: BTreeSet<NodeId>, _old_set_signature: Signature, _new_set_signature: Signature) -> Result<Arc<AdminSession>, Error> { unimplemented!("test-only") }
fn add_generation_listener(&self, _listener: Arc<ClusterSessionsListener<GenerationSession>>) {}
fn make_faulty_generation_sessions(&self) { unimplemented!() }
fn generation_session(&self, _session_id: &SessionId) -> Option<Arc<GenerationSession>> { unimplemented!() }
fn connect(&self) { unimplemented!() }
fn key_storage(&self) -> Arc<KeyStorage> { unimplemented!() }
fn make_faulty_generation_sessions(&self) { unimplemented!("test-only") }
fn generation_session(&self, _session_id: &SessionId) -> Option<Arc<GenerationSession>> { unimplemented!("test-only") }
fn connect(&self) { unimplemented!("test-only") }
fn key_storage(&self) -> Arc<KeyStorage> { unimplemented!("test-only") }
}
impl DummyCluster {
@ -1128,7 +1153,7 @@ pub mod tests {
}
pub fn all_connections_established(cluster: &Arc<ClusterCore>) -> bool {
cluster.config().key_server_set.get().keys()
cluster.config().key_server_set.snapshot().new_set.keys()
.filter(|p| *p != cluster.config().self_key_pair.public())
.all(|p| cluster.connection(p).is_some())
}
@ -1146,6 +1171,7 @@ pub mod tests {
key_storage: Arc::new(DummyKeyStorage::default()),
acl_storage: Arc::new(DummyAclStorage::default()),
admin_public: None,
auto_migrate_enabled: false,
}).collect();
let clusters: Vec<_> = cluster_params.into_iter().enumerate()
.map(|(_, params)| ClusterCore::new(core.handle(), params).unwrap())

View File

@ -17,12 +17,13 @@
use std::time;
use std::sync::{Arc, Weak};
use std::sync::atomic::AtomicBool;
use std::collections::{VecDeque, BTreeMap};
use std::collections::{VecDeque, BTreeMap, BTreeSet};
use parking_lot::{Mutex, RwLock, Condvar};
use bigint::hash::H256;
use ethkey::{Secret, Signature};
use key_server_cluster::{Error, NodeId, SessionId};
use key_server_cluster::cluster::{Cluster, ClusterData, ClusterConfiguration, ClusterView};
use key_server_cluster::connection_trigger::ServersSetChangeSessionCreatorConnector;
use key_server_cluster::message::{self, Message};
use key_server_cluster::generation_session::{SessionImpl as GenerationSessionImpl};
use key_server_cluster::decryption_session::{SessionImpl as DecryptionSessionImpl};
@ -110,10 +111,10 @@ pub enum AdminSession {
/// Administrative session creation data.
pub enum AdminSessionCreationData {
/// Share add session.
/// Share add session (key id).
ShareAdd(H256),
/// Servers set change session.
ServersSetChange,
/// Servers set change session (block id, new_server_set).
ServersSetChange(Option<H256>, BTreeSet<NodeId>),
}
/// Active sessions on this cluster.
@ -187,7 +188,7 @@ pub enum ClusterSessionsContainerState {
impl ClusterSessions {
/// Create new cluster sessions container.
pub fn new(config: &ClusterConfiguration) -> Self {
pub fn new(config: &ClusterConfiguration, servers_set_change_session_creator_connector: Arc<ServersSetChangeSessionCreatorConnector>) -> Self {
let container_state = Arc::new(Mutex::new(ClusterSessionsContainerState::Idle));
let creator_core = Arc::new(SessionCreatorCore::new(config));
ClusterSessions {
@ -210,6 +211,7 @@ impl ClusterSessions {
}, container_state.clone()),
admin_sessions: ClusterSessionsContainer::new(AdminSessionCreator {
core: creator_core.clone(),
servers_set_change_session_creator_connector: servers_set_change_session_creator_connector,
admin_public: config.admin_public.clone(),
}, container_state),
creator_core: creator_core,
@ -270,6 +272,7 @@ impl<S, SC, D> ClusterSessionsContainer<S, SC, D> where S: ClusterSession, SC: C
self.listeners.lock().push(Arc::downgrade(&listener));
}
#[cfg(test)]
pub fn is_empty(&self) -> bool {
self.sessions.read().is_empty()
}
@ -554,6 +557,7 @@ mod tests {
use ethkey::{Random, Generator};
use key_server_cluster::{Error, DummyAclStorage, DummyKeyStorage, MapKeyServerSet, PlainNodeKeyPair};
use key_server_cluster::cluster::ClusterConfiguration;
use key_server_cluster::connection_trigger::SimpleServersSetChangeSessionCreatorConnector;
use key_server_cluster::cluster::tests::DummyCluster;
use super::{ClusterSessions, AdminSessionCreationData};
@ -568,8 +572,11 @@ mod tests {
key_storage: Arc::new(DummyKeyStorage::default()),
acl_storage: Arc::new(DummyAclStorage::default()),
admin_public: Some(Random.generate().unwrap().public().clone()),
auto_migrate_enabled: false,
};
ClusterSessions::new(&config)
ClusterSessions::new(&config, Arc::new(SimpleServersSetChangeSessionCreatorConnector {
admin_public: Some(Random.generate().unwrap().public().clone()),
}))
}
#[test]

View File

@ -21,6 +21,7 @@ use parking_lot::RwLock;
use ethkey::{Public, Signature};
use key_server_cluster::{Error, NodeId, SessionId, AclStorage, KeyStorage, DocumentKeyShare, SessionMeta};
use key_server_cluster::cluster::{Cluster, ClusterConfiguration};
use key_server_cluster::connection_trigger::ServersSetChangeSessionCreatorConnector;
use key_server_cluster::cluster_sessions::{ClusterSession, SessionIdWithSubSession, AdminSession, AdminSessionCreationData};
use key_server_cluster::message::{self, Message, DecryptionMessage, SigningMessage, ConsensusMessageOfShareAdd,
ShareAddMessage, ServersSetChangeMessage, ConsensusMessage, ConsensusMessageWithServersSet};
@ -331,13 +332,18 @@ pub struct AdminSessionCreator {
pub core: Arc<SessionCreatorCore>,
/// Administrator public.
pub admin_public: Option<Public>,
/// Servers set change sessions creator connector.
pub servers_set_change_session_creator_connector: Arc<ServersSetChangeSessionCreatorConnector>,
}
impl ClusterSessionCreator<AdminSession, AdminSessionCreationData> for AdminSessionCreator {
fn creation_data_from_message(message: &Message) -> Result<Option<AdminSessionCreationData>, Error> {
match *message {
Message::ServersSetChange(ServersSetChangeMessage::ServersSetChangeConsensusMessage(ref message)) => match &message.message {
&ConsensusMessageWithServersSet::InitializeConsensusSession(_) => Ok(Some(AdminSessionCreationData::ServersSetChange)),
&ConsensusMessageWithServersSet::InitializeConsensusSession(ref message) => Ok(Some(AdminSessionCreationData::ServersSetChange(
message.migration_id.clone().map(Into::into),
message.new_nodes_set.clone().into_iter().map(Into::into).collect()
))),
_ => Err(Error::InvalidMessage),
},
Message::ShareAdd(ShareAddMessage::ShareAddConsensusMessage(ref message)) => match &message.message {
@ -358,7 +364,6 @@ impl ClusterSessionCreator<AdminSession, AdminSessionCreationData> for AdminSess
fn create(&self, cluster: Arc<Cluster>, master: NodeId, nonce: Option<u64>, id: SessionId, creation_data: Option<AdminSessionCreationData>) -> Result<Arc<AdminSession>, Error> {
let nonce = self.core.check_session_nonce(&master, nonce)?;
let admin_public = self.admin_public.clone().ok_or(Error::AccessDenied)?;
Ok(Arc::new(match creation_data {
Some(AdminSessionCreationData::ShareAdd(version)) => {
AdminSession::ShareAdd(ShareAddSessionImpl::new(ShareAddSessionParams {
@ -370,10 +375,13 @@ impl ClusterSessionCreator<AdminSession, AdminSessionCreationData> for AdminSess
transport: ShareAddTransport::new(id.clone(), Some(version), nonce, cluster),
key_storage: self.core.key_storage.clone(),
nonce: nonce,
admin_public: Some(admin_public),
admin_public: Some(self.admin_public.clone().ok_or(Error::AccessDenied)?),
})?)
},
Some(AdminSessionCreationData::ServersSetChange) => {
Some(AdminSessionCreationData::ServersSetChange(migration_id, new_nodes_set)) => {
let admin_public = self.servers_set_change_session_creator_connector.admin_public(migration_id.as_ref(), new_nodes_set)
.map_err(|_| Error::AccessDenied)?;
AdminSession::ServersSetChange(ServersSetChangeSessionImpl::new(ServersSetChangeSessionParams {
meta: ShareChangeSessionMeta {
id: id.clone(),
@ -385,6 +393,7 @@ impl ClusterSessionCreator<AdminSession, AdminSessionCreationData> for AdminSess
nonce: nonce,
all_nodes_set: cluster.nodes(),
admin_public: admin_public,
migration_id: migration_id,
})?)
},
None => unreachable!("expected to call with non-empty creation data; qed"),

View File

@ -0,0 +1,367 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{BTreeSet, BTreeMap};
use std::collections::btree_map::Entry;
use std::net::SocketAddr;
use std::sync::Arc;
use bigint::hash::H256;
use ethkey::Public;
use key_server_cluster::{KeyServerSet, KeyServerSetSnapshot};
use key_server_cluster::cluster::{ClusterClient, ClusterConnectionsData};
use key_server_cluster::cluster_sessions::AdminSession;
use types::all::{Error, NodeId};
use {NodeKeyPair};
#[derive(Debug, Clone, Copy, PartialEq)]
/// Describes which maintain() call is required.
pub enum Maintain {
/// We need to maintain() both connections && session.
SessionAndConnections,
/// Only call maintain_session.
Session,
/// Only call maintain_connections.
Connections,
}
/// Connection trigger, which executes necessary actions when set of key servers changes.
pub trait ConnectionTrigger: Send + Sync {
/// On maintain interval.
fn on_maintain(&mut self) -> Option<Maintain>;
/// When connection is established.
fn on_connection_established(&mut self, node: &NodeId) -> Option<Maintain>;
/// When connection is closed.
fn on_connection_closed(&mut self, node: &NodeId) -> Option<Maintain>;
/// Maintain active sessions.
fn maintain_session(&mut self, sessions: &ClusterClient);
/// Maintain active connections.
fn maintain_connections(&mut self, connections: &mut ClusterConnectionsData);
/// Return connector for the servers set change session creator.
fn servers_set_change_creator_connector(&self) -> Arc<ServersSetChangeSessionCreatorConnector>;
}
/// Servers set change session creator connector.
pub trait ServersSetChangeSessionCreatorConnector: Send + Sync {
/// Get actual administrator public key. For manual-migration configuration it is the pre-configured
/// administrator key. For auto-migration configurations it is the key of actual MigrationSession master node.
fn admin_public(&self, migration_id: Option<&H256>, new_server_set: BTreeSet<NodeId>) -> Result<Public, Error>;
/// Set active servers set change session.
fn set_key_servers_set_change_session(&self, session: Arc<AdminSession>);
}
/// Simple connection trigger, which only keeps connections to current_set.
pub struct SimpleConnectionTrigger {
/// Key server set cluster.
key_server_set: Arc<KeyServerSet>,
/// Trigger connections.
connections: TriggerConnections,
/// Servers set change session creator connector.
connector: Arc<ServersSetChangeSessionCreatorConnector>,
}
/// Simple Servers set change session creator connector, which will just return
/// pre-configured administartor public when asked.
pub struct SimpleServersSetChangeSessionCreatorConnector {
/// Secret store administrator public key.
pub admin_public: Option<Public>,
}
#[derive(Debug, Clone, Copy, PartialEq)]
/// Action with trigger connections.
pub enum ConnectionsAction {
/// Connect to nodes from old set only.
ConnectToCurrentSet,
/// Connect to nodes from both old and migration sets.
ConnectToCurrentAndMigrationSet,
}
/// Trigger connections.
pub struct TriggerConnections {
/// This node key pair.
pub self_key_pair: Arc<NodeKeyPair>,
}
impl SimpleConnectionTrigger {
/// Create new simple connection trigger.
pub fn new(key_server_set: Arc<KeyServerSet>, self_key_pair: Arc<NodeKeyPair>, admin_public: Option<Public>) -> Self {
SimpleConnectionTrigger {
key_server_set: key_server_set,
connections: TriggerConnections {
self_key_pair: self_key_pair,
},
connector: Arc::new(SimpleServersSetChangeSessionCreatorConnector {
admin_public: admin_public,
}),
}
}
}
impl ConnectionTrigger for SimpleConnectionTrigger {
fn on_maintain(&mut self) -> Option<Maintain> {
Some(Maintain::Connections)
}
fn on_connection_established(&mut self, _node: &NodeId) -> Option<Maintain> {
None
}
fn on_connection_closed(&mut self, _node: &NodeId) -> Option<Maintain> {
// we do not want to reconnect after every connection close
// because it could be a part of something bigger
None
}
fn maintain_session(&mut self, _sessions: &ClusterClient) {
}
fn maintain_connections(&mut self, connections: &mut ClusterConnectionsData) {
self.connections.maintain(ConnectionsAction::ConnectToCurrentSet, connections, &self.key_server_set.snapshot())
}
fn servers_set_change_creator_connector(&self) -> Arc<ServersSetChangeSessionCreatorConnector> {
self.connector.clone()
}
}
impl ServersSetChangeSessionCreatorConnector for SimpleServersSetChangeSessionCreatorConnector {
fn admin_public(&self, _migration_id: Option<&H256>, _new_server_set: BTreeSet<NodeId>) -> Result<Public, Error> {
self.admin_public.clone().ok_or(Error::AccessDenied)
}
fn set_key_servers_set_change_session(&self, _session: Arc<AdminSession>) {
}
}
impl TriggerConnections {
pub fn maintain(&self, action: ConnectionsAction, data: &mut ClusterConnectionsData, server_set: &KeyServerSetSnapshot) {
match action {
ConnectionsAction::ConnectToCurrentSet => {
adjust_connections(self.self_key_pair.public(), data, &server_set.current_set);
},
ConnectionsAction::ConnectToCurrentAndMigrationSet => {
let mut old_and_migration_set = BTreeMap::new();
if let Some(migration) = server_set.migration.as_ref() {
old_and_migration_set.extend(migration.set.iter().map(|(node_id, node_addr)| (node_id.clone(), node_addr.clone())));
}
old_and_migration_set.extend(server_set.current_set.iter().map(|(node_id, node_addr)| (node_id.clone(), node_addr.clone())));
adjust_connections(self.self_key_pair.public(), data, &old_and_migration_set);
},
}
}
}
fn adjust_connections(self_node_id: &NodeId, data: &mut ClusterConnectionsData, required_set: &BTreeMap<NodeId, SocketAddr>) {
if !required_set.contains_key(self_node_id) {
trace!(target: "secretstore_net", "{}: isolated from cluser", self_node_id);
data.connections.clear();
data.nodes.clear();
return;
}
for node_to_disconnect in select_nodes_to_disconnect(&data.nodes, required_set) {
if let Entry::Occupied(entry) = data.connections.entry(node_to_disconnect.clone()) {
trace!(target: "secretstore_net", "{}: removing connection to {} at {}",
self_node_id, entry.get().node_id(), entry.get().node_address());
entry.remove();
}
data.nodes.remove(&node_to_disconnect);
}
for (node_to_connect, node_addr) in required_set {
if node_to_connect != self_node_id {
data.nodes.insert(node_to_connect.clone(), node_addr.clone());
}
}
}
fn select_nodes_to_disconnect(current_set: &BTreeMap<NodeId, SocketAddr>, new_set: &BTreeMap<NodeId, SocketAddr>) -> Vec<NodeId> {
current_set.iter()
.filter(|&(node_id, node_addr)| match new_set.get(node_id) {
Some(new_node_addr) => node_addr != new_node_addr,
None => true,
})
.map(|(node_id, _)| node_id.clone())
.collect()
}
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use std::sync::Arc;
use ethkey::{Random, Generator};
use key_server_cluster::cluster::ClusterConnectionsData;
use key_server_cluster::{MapKeyServerSet, PlainNodeKeyPair, KeyServerSetSnapshot, KeyServerSetMigration};
use super::{Maintain, TriggerConnections, ConnectionsAction, ConnectionTrigger, SimpleConnectionTrigger,
select_nodes_to_disconnect, adjust_connections};
fn create_connections() -> TriggerConnections {
TriggerConnections {
self_key_pair: Arc::new(PlainNodeKeyPair::new(Random.generate().unwrap())),
}
}
#[test]
fn do_not_disconnect_if_set_is_not_changed() {
let node_id = Random.generate().unwrap().public().clone();
assert_eq!(select_nodes_to_disconnect(
&vec![(node_id, "127.0.0.1:8081".parse().unwrap())].into_iter().collect(),
&vec![(node_id, "127.0.0.1:8081".parse().unwrap())].into_iter().collect()),
vec![]);
}
#[test]
fn disconnect_if_address_has_changed() {
let node_id = Random.generate().unwrap().public().clone();
assert_eq!(select_nodes_to_disconnect(
&vec![(node_id.clone(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(),
&vec![(node_id.clone(), "127.0.0.1:8082".parse().unwrap())].into_iter().collect()),
vec![node_id.clone()]);
}
#[test]
fn disconnect_if_node_has_removed() {
let node_id = Random.generate().unwrap().public().clone();
assert_eq!(select_nodes_to_disconnect(
&vec![(node_id.clone(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(),
&vec![].into_iter().collect()),
vec![node_id.clone()]);
}
#[test]
fn does_not_disconnect_if_node_has_added() {
let node_id = Random.generate().unwrap().public().clone();
assert_eq!(select_nodes_to_disconnect(
&vec![(node_id.clone(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(),
&vec![(node_id.clone(), "127.0.0.1:8081".parse().unwrap()),
(Random.generate().unwrap().public().clone(), "127.0.0.1:8082".parse().unwrap())]
.into_iter().collect()),
vec![]);
}
#[test]
fn adjust_connections_disconnects_from_all_nodes_if_not_a_part_of_key_server() {
let self_node_id = Random.generate().unwrap().public().clone();
let other_node_id = Random.generate().unwrap().public().clone();
let mut connection_data: ClusterConnectionsData = Default::default();
connection_data.nodes.insert(other_node_id.clone(), "127.0.0.1:8081".parse().unwrap());
let required_set = connection_data.nodes.clone();
adjust_connections(&self_node_id, &mut connection_data, &required_set);
assert!(connection_data.nodes.is_empty());
}
#[test]
fn adjust_connections_connects_to_new_nodes() {
let self_node_id = Random.generate().unwrap().public().clone();
let other_node_id = Random.generate().unwrap().public().clone();
let mut connection_data: ClusterConnectionsData = Default::default();
let required_set = vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap()),
(other_node_id.clone(), "127.0.0.1:8082".parse().unwrap())].into_iter().collect();
adjust_connections(&self_node_id, &mut connection_data, &required_set);
assert!(connection_data.nodes.contains_key(&other_node_id));
}
#[test]
fn adjust_connections_reconnects_from_changed_nodes() {
let self_node_id = Random.generate().unwrap().public().clone();
let other_node_id = Random.generate().unwrap().public().clone();
let mut connection_data: ClusterConnectionsData = Default::default();
connection_data.nodes.insert(other_node_id.clone(), "127.0.0.1:8082".parse().unwrap());
let required_set = vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap()),
(other_node_id.clone(), "127.0.0.1:8083".parse().unwrap())].into_iter().collect();
adjust_connections(&self_node_id, &mut connection_data, &required_set);
assert_eq!(connection_data.nodes.get(&other_node_id), Some(&"127.0.0.1:8083".parse().unwrap()));
}
#[test]
fn adjust_connections_disconnects_from_removed_nodes() {
let self_node_id = Random.generate().unwrap().public().clone();
let other_node_id = Random.generate().unwrap().public().clone();
let mut connection_data: ClusterConnectionsData = Default::default();
connection_data.nodes.insert(other_node_id.clone(), "127.0.0.1:8082".parse().unwrap());
let required_set = vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect();
adjust_connections(&self_node_id, &mut connection_data, &required_set);
assert!(connection_data.nodes.is_empty());
}
#[test]
fn adjust_connections_does_not_connects_to_self() {
let self_node_id = Random.generate().unwrap().public().clone();
let mut connection_data: ClusterConnectionsData = Default::default();
let required_set = vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect();
adjust_connections(&self_node_id, &mut connection_data, &required_set);
assert!(connection_data.nodes.is_empty());
}
#[test]
fn maintain_connects_to_current_set_works() {
let connections = create_connections();
let self_node_id = connections.self_key_pair.public().clone();
let current_node_id = Random.generate().unwrap().public().clone();
let migration_node_id = Random.generate().unwrap().public().clone();
let new_node_id = Random.generate().unwrap().public().clone();
let mut connections_data: ClusterConnectionsData = Default::default();
connections.maintain(ConnectionsAction::ConnectToCurrentSet, &mut connections_data, &KeyServerSetSnapshot {
current_set: vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap()),
(current_node_id.clone(), "127.0.0.1:8082".parse().unwrap())].into_iter().collect(),
new_set: vec![(new_node_id.clone(), "127.0.0.1:8083".parse().unwrap())].into_iter().collect(),
migration: Some(KeyServerSetMigration {
set: vec![(migration_node_id.clone(), "127.0.0.1:8084".parse().unwrap())].into_iter().collect(),
..Default::default()
}),
});
assert_eq!(vec![current_node_id], connections_data.nodes.keys().cloned().collect::<Vec<_>>());
}
#[test]
fn maintain_connects_to_current_and_migration_set_works() {
let connections = create_connections();
let self_node_id = connections.self_key_pair.public().clone();
let current_node_id = Random.generate().unwrap().public().clone();
let migration_node_id = Random.generate().unwrap().public().clone();
let new_node_id = Random.generate().unwrap().public().clone();
let mut connections_data: ClusterConnectionsData = Default::default();
connections.maintain(ConnectionsAction::ConnectToCurrentAndMigrationSet, &mut connections_data, &KeyServerSetSnapshot {
current_set: vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap()),
(current_node_id.clone(), "127.0.0.1:8082".parse().unwrap())].into_iter().collect(),
new_set: vec![(new_node_id.clone(), "127.0.0.1:8083".parse().unwrap())].into_iter().collect(),
migration: Some(KeyServerSetMigration {
set: vec![(migration_node_id.clone(), "127.0.0.1:8084".parse().unwrap())].into_iter().collect(),
..Default::default()
}),
});
assert_eq!(vec![current_node_id, migration_node_id].into_iter().collect::<BTreeSet<_>>(),
connections_data.nodes.keys().cloned().collect::<BTreeSet<_>>());
}
#[test]
fn simple_connections_trigger_only_maintains_connections() {
let key_server_set = Arc::new(MapKeyServerSet::new(Default::default()));
let self_key_pair = Arc::new(PlainNodeKeyPair::new(Random.generate().unwrap()));
let mut trigger = SimpleConnectionTrigger::new(key_server_set, self_key_pair, None);
assert_eq!(trigger.on_maintain(), Some(Maintain::Connections));
}
}

View File

@ -0,0 +1,766 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{BTreeSet, BTreeMap};
use std::net::SocketAddr;
use std::sync::Arc;
use bigint::hash::H256;
use ethkey::Public;
use parking_lot::Mutex;
use key_server_cluster::{KeyServerSet, KeyServerSetSnapshot, KeyServerSetMigration, is_migration_required};
use key_server_cluster::cluster::{ClusterClient, ClusterConnectionsData};
use key_server_cluster::cluster_sessions::{AdminSession, ClusterSession};
use key_server_cluster::jobs::servers_set_change_access_job::ordered_nodes_hash;
use key_server_cluster::connection_trigger::{Maintain, ConnectionsAction, ConnectionTrigger,
ServersSetChangeSessionCreatorConnector, TriggerConnections};
use types::all::{Error, NodeId};
use {NodeKeyPair};
/// Key servers set change trigger with automated migration procedure.
pub struct ConnectionTriggerWithMigration {
/// This node key pair.
self_key_pair: Arc<NodeKeyPair>,
/// Key server set.
key_server_set: Arc<KeyServerSet>,
/// Last server set state.
snapshot: KeyServerSetSnapshot,
/// Required connections action.
connections_action: Option<ConnectionsAction>,
/// Required session action.
session_action: Option<SessionAction>,
/// Currenty connected nodes.
connected: BTreeSet<NodeId>,
/// Trigger migration connections.
connections: TriggerConnections,
/// Trigger migration session.
session: TriggerSession,
}
#[derive(Default)]
/// Key servers set change session creator connector with migration support.
pub struct ServersSetChangeSessionCreatorConnectorWithMigration {
/// This node id.
self_node_id: NodeId,
/// Active migration state to check when servers set change session is started.
migration: Mutex<Option<KeyServerSetMigration>>,
/// Active servers set change session.
session: Mutex<Option<Arc<AdminSession>>>,
}
#[derive(Debug, Clone, Copy, PartialEq)]
/// Migration session action.
enum SessionAction {
/// Start migration (confirm migration transaction).
StartMigration(H256),
/// Start migration session.
Start,
/// Confirm migration and forget migration session.
ConfirmAndDrop(H256),
/// Forget migration session.
Drop,
/// Forget migration session and retry.
DropAndRetry,
}
#[derive(Debug, Clone, Copy, PartialEq)]
/// Migration session state.
enum SessionState {
/// No active session.
Idle,
/// Session is running with given migration id.
Active(Option<H256>),
/// Session is completed successfully.
Finished(Option<H256>),
/// Session is completed with an error.
Failed(Option<H256>),
}
#[derive(Debug, Clone, Copy, PartialEq)]
/// Migration state.
pub enum MigrationState {
/// No migration required.
Idle,
/// Migration is required.
Required,
/// Migration has started.
Started,
}
/// Migration session.
struct TriggerSession {
/// Servers set change session creator connector.
connector: Arc<ServersSetChangeSessionCreatorConnectorWithMigration>,
/// This node key pair.
self_key_pair: Arc<NodeKeyPair>,
/// Key server set.
key_server_set: Arc<KeyServerSet>,
}
impl ConnectionTriggerWithMigration {
/// Create new trigge with migration.
pub fn new(key_server_set: Arc<KeyServerSet>, self_key_pair: Arc<NodeKeyPair>) -> Self {
let snapshot = key_server_set.snapshot();
let migration = snapshot.migration.clone();
ConnectionTriggerWithMigration {
self_key_pair: self_key_pair.clone(),
key_server_set: key_server_set.clone(),
snapshot: snapshot,
connected: BTreeSet::new(),
connections: TriggerConnections {
self_key_pair: self_key_pair.clone(),
},
session: TriggerSession {
connector: Arc::new(ServersSetChangeSessionCreatorConnectorWithMigration {
self_node_id: self_key_pair.public().clone(),
migration: Mutex::new(migration),
session: Mutex::new(None),
}),
self_key_pair: self_key_pair,
key_server_set: key_server_set,
},
connections_action: None,
session_action: None,
}
}
/// Actually do mainteinance.
fn do_maintain(&mut self) -> Option<Maintain> {
loop {
let session_state = session_state(self.session.connector.session.lock().clone());
let migration_state = migration_state(self.self_key_pair.public(), &self.snapshot);
let session_action = maintain_session(self.self_key_pair.public(), &self.connected, &self.snapshot, migration_state, session_state);
let session_maintain_required = session_action.map(|session_action|
self.session.process(session_action)).unwrap_or_default();
self.session_action = session_action;
let connections_action = maintain_connections(migration_state, session_state);
let connections_maintain_required = connections_action.map(|_| true).unwrap_or_default();
self.connections_action = connections_action;
if session_state != SessionState::Idle || migration_state != MigrationState::Idle {
trace!(target: "secretstore_net", "{}: non-idle auto-migration state: {:?} -> {:?}",
self.self_key_pair.public(), (migration_state, session_state), (self.connections_action, self.session_action));
}
if session_action != Some(SessionAction::DropAndRetry) {
return match (session_maintain_required, connections_maintain_required) {
(true, true) => Some(Maintain::SessionAndConnections),
(true, false) => Some(Maintain::Session),
(false, true) => Some(Maintain::Connections),
(false, false) => None,
};
}
}
}
}
impl ConnectionTrigger for ConnectionTriggerWithMigration {
fn on_maintain(&mut self) -> Option<Maintain> {
self.snapshot = self.key_server_set.snapshot();
*self.session.connector.migration.lock() = self.snapshot.migration.clone();
self.do_maintain()
}
fn on_connection_established(&mut self, node: &NodeId) -> Option<Maintain> {
self.connected.insert(node.clone());
self.do_maintain()
}
fn on_connection_closed(&mut self, node: &NodeId) -> Option<Maintain> {
self.connected.remove(node);
self.do_maintain()
}
fn maintain_session(&mut self, sessions: &ClusterClient) {
if let Some(action) = self.session_action {
self.session.maintain(action, sessions, &self.snapshot);
}
}
fn maintain_connections(&mut self, connections: &mut ClusterConnectionsData) {
if let Some(action) = self.connections_action {
self.connections.maintain(action, connections, &self.snapshot);
}
}
fn servers_set_change_creator_connector(&self) -> Arc<ServersSetChangeSessionCreatorConnector> {
self.session.connector.clone()
}
}
impl ServersSetChangeSessionCreatorConnector for ServersSetChangeSessionCreatorConnectorWithMigration {
fn admin_public(&self, migration_id: Option<&H256>, new_server_set: BTreeSet<NodeId>) -> Result<Public, Error> {
// the idea is that all nodes are agreed upon a block number and a new set of nodes in this block
// then master node is selected of all nodes set && this master signs the old set && new set
// (signatures are inputs to ServerSetChangeSession)
self.migration.lock().as_ref()
.map(|migration| {
let is_migration_id_same = migration_id.map(|mid| mid == &migration.id).unwrap_or_default();
let is_migration_set_same = new_server_set == migration.set.keys().cloned().collect();
if is_migration_id_same && is_migration_set_same {
Ok(migration.master.clone())
} else {
warn!(target: "secretstore_net", "{}: failed to accept auto-migration session: same_migration_id={}, same_migration_set={}",
self.self_node_id, is_migration_id_same, is_migration_set_same);
Err(Error::AccessDenied)
}
})
.unwrap_or_else(|| {
warn!(target: "secretstore_net", "{}: failed to accept non-scheduled auto-migration session", self.self_node_id);
Err(Error::AccessDenied)
})
}
fn set_key_servers_set_change_session(&self, session: Arc<AdminSession>) {
*self.session.lock() = Some(session);
}
}
impl TriggerSession {
/// Process session action.
pub fn process(&mut self, action: SessionAction) -> bool {
match action {
SessionAction::ConfirmAndDrop(migration_id) => {
*self.connector.session.lock() = None;
self.key_server_set.confirm_migration(migration_id);
false
},
SessionAction::Drop | SessionAction::DropAndRetry => {
*self.connector.session.lock() = None;
false
},
SessionAction::StartMigration(migration_id) => {
self.key_server_set.start_migration(migration_id);
false
},
SessionAction::Start => true,
}
}
/// Maintain session.
pub fn maintain(&mut self, action: SessionAction, sessions: &ClusterClient, server_set: &KeyServerSetSnapshot) {
if action == SessionAction::Start { // all other actions are processed in maintain
let migration = server_set.migration.as_ref()
.expect("action is Start only when migration is started (see maintain_session); qed");
let old_set: BTreeSet<_> = server_set.current_set.keys()
.chain(migration.set.keys())
.cloned().collect();
let new_set: BTreeSet<_> = migration.set.keys()
.cloned()
.collect();
let signatures = self.self_key_pair.sign(&ordered_nodes_hash(&old_set))
.and_then(|old_set_signature| self.self_key_pair.sign(&ordered_nodes_hash(&new_set))
.map(|new_set_signature| (old_set_signature, new_set_signature)))
.map_err(Into::into);
let session = signatures.and_then(|(old_set_signature, new_set_signature)|
sessions.new_servers_set_change_session(None, Some(migration.id.clone()), new_set, old_set_signature, new_set_signature));
match session {
Ok(_) => trace!(target: "secretstore_net", "{}: started auto-migrate session",
self.self_key_pair.public()),
Err(err) => trace!(target: "secretstore_net", "{}: failed to start auto-migrate session with: {}",
self.self_key_pair.public(), err),
}
}
}
}
fn migration_state(self_node_id: &NodeId, snapshot: &KeyServerSetSnapshot) -> MigrationState {
// if this node is not on current && old set => we do not participate in migration
if !snapshot.current_set.contains_key(self_node_id) &&
!snapshot.migration.as_ref().map(|s| s.set.contains_key(self_node_id)).unwrap_or_default() {
return MigrationState::Idle;
}
// if migration has already started no other states possible
if snapshot.migration.is_some() {
return MigrationState::Started;
}
// we only require migration if set actually changes
// when only address changes, we could simply adjust connections
if !is_migration_required(&snapshot.current_set, &snapshot.new_set) {
return MigrationState::Idle;
}
return MigrationState::Required;
}
fn session_state(session: Option<Arc<AdminSession>>) -> SessionState {
session
.and_then(|s| match s.as_servers_set_change() {
Some(s) if !s.is_finished() => Some(SessionState::Active(s.migration_id().cloned())),
Some(s) => match s.wait() {
Ok(_) => Some(SessionState::Finished(s.migration_id().cloned())),
Err(_) => Some(SessionState::Failed(s.migration_id().cloned())),
},
None => None,
})
.unwrap_or(SessionState::Idle)
}
fn maintain_session(self_node_id: &NodeId, connected: &BTreeSet<NodeId>, snapshot: &KeyServerSetSnapshot, migration_state: MigrationState, session_state: SessionState) -> Option<SessionAction> {
let migration_data_proof = "migration_state is Started; migration data available when started; qed";
match (migration_state, session_state) {
// === NORMAL combinations ===
// having no session when it is not required => ok
(MigrationState::Idle, SessionState::Idle) => None,
// migration is required && no active session => start migration
(MigrationState::Required, SessionState::Idle) => {
match select_master_node(snapshot) == self_node_id {
true => Some(SessionAction::StartMigration(H256::random())),
// we are not on master node
false => None,
}
},
// migration is active && there's no active session => start it
(MigrationState::Started, SessionState::Idle) => {
match is_connected_to_all_nodes(self_node_id, &snapshot.current_set, connected) &&
is_connected_to_all_nodes(self_node_id, &snapshot.migration.as_ref().expect(migration_data_proof).set, connected) &&
select_master_node(snapshot) == self_node_id {
true => Some(SessionAction::Start),
// we are not connected to all required nodes yet or we are not on master node => wait for it
false => None,
}
},
// migration is active && session is not yet started/finished => ok
(MigrationState::Started, SessionState::Active(ref session_migration_id))
if snapshot.migration.as_ref().expect(migration_data_proof).id == session_migration_id.unwrap_or_default() =>
None,
// migration has finished => confirm migration
(MigrationState::Started, SessionState::Finished(ref session_migration_id))
if snapshot.migration.as_ref().expect(migration_data_proof).id == session_migration_id.unwrap_or_default() =>
match snapshot.migration.as_ref().expect(migration_data_proof).set.contains_key(self_node_id) {
true => Some(SessionAction::ConfirmAndDrop(
snapshot.migration.as_ref().expect(migration_data_proof).id.clone()
)),
// we are not on migration set => we do not need to confirm
false => Some(SessionAction::Drop),
},
// migration has failed => it should be dropped && restarted later
(MigrationState::Started, SessionState::Failed(ref session_migration_id))
if snapshot.migration.as_ref().expect(migration_data_proof).id == session_migration_id.unwrap_or_default() =>
Some(SessionAction::Drop),
// ABNORMAL combinations, which are still possible when contract misbehaves ===
// having active session when it is not required => drop it && wait for other tasks
(MigrationState::Idle, SessionState::Active(_)) |
// no migration required && there's finished session => drop it && wait for other tasks
(MigrationState::Idle, SessionState::Finished(_)) |
// no migration required && there's failed session => drop it && wait for other tasks
(MigrationState::Idle, SessionState::Failed(_)) |
// migration is required && session is active => drop it && wait for other tasks
(MigrationState::Required, SessionState::Active(_)) |
// migration is required && session has failed => we need to forget this obsolete session and retry
(MigrationState::Required, SessionState::Finished(_)) |
// session for other migration is active => we need to forget this obsolete session and retry
// (the case for same id is checked above)
(MigrationState::Started, SessionState::Active(_)) |
// session for other migration has finished => we need to forget this obsolete session and retry
// (the case for same id is checked above)
(MigrationState::Started, SessionState::Finished(_)) |
// session for other migration has failed => we need to forget this obsolete session and retry
// (the case for same id is checked above)
(MigrationState::Started, SessionState::Failed(_)) |
// migration is required && session has failed => we need to forget this obsolete session and retry
(MigrationState::Required, SessionState::Failed(_)) => {
// some of the cases above could happen because of lags (could actually be a non-abnormal behavior)
// => we ony trace here
trace!(target: "secretstore_net", "{}: suspicious auto-migration state: {:?}",
self_node_id, (migration_state, session_state));
Some(SessionAction::DropAndRetry)
},
}
}
fn maintain_connections(migration_state: MigrationState, session_state: SessionState) -> Option<ConnectionsAction> {
match (migration_state, session_state) {
// session is active => we do not alter connections when session is active
(_, SessionState::Active(_)) => None,
// when no migration required => we just keep us connected to old nodes set
(MigrationState::Idle, _) => Some(ConnectionsAction::ConnectToCurrentSet),
// when migration is either scheduled, or in progress => connect to both old and migration set.
// this could lead to situation when node is not 'officially' a part of KeyServer (i.e. it is not in current_set)
// but it participates in new key generation session
// it is ok, since 'officialy' here means that this node is a owner of all old shares
(MigrationState::Required, _) |
(MigrationState::Started, _) => Some(ConnectionsAction::ConnectToCurrentAndMigrationSet),
}
}
fn is_connected_to_all_nodes(self_node_id: &NodeId, nodes: &BTreeMap<NodeId, SocketAddr>, connected: &BTreeSet<NodeId>) -> bool {
nodes.keys()
.filter(|n| *n != self_node_id)
.all(|n| connected.contains(n))
}
fn select_master_node(snapshot: &KeyServerSetSnapshot) -> &NodeId {
// we want to minimize a number of UnknownSession messages =>
// try to select a node which was in SS && will be in SS
match snapshot.migration.as_ref() {
Some(migration) => &migration.master,
None => snapshot.current_set.keys()
.filter(|n| snapshot.new_set.contains_key(n))
.nth(0)
.or_else(|| snapshot.new_set.keys().nth(0))
.unwrap_or_else(|| snapshot.current_set.keys().nth(0)
.expect("select_master_node is only called when migration is Required or Started;\
when Started: migration.is_some() && we return migration.master; qed;\
when Required: current_set != new_set; this means that at least one set is non-empty; we try to take node from each set; qed"))
}
/*server_set_state.migration.as_ref()
.map(|m| &m.master)
.unwrap_or_else(|| server_set_state.current_set.keys()
.filter(|n| server_set_state.new_set.contains_key(n))
.nth(0)
.or_else(|| server_set_state.new_set.keys().nth(0)))
.expect("select_master_node is only called when migration is Required or Started;"
"when Started: migration.is_some() && we have migration.master; qed"
"when Required: current_set != migration_set; this means that at least one set is non-empty; we select")*/
}
#[cfg(test)]
mod tests {
use key_server_cluster::{KeyServerSetSnapshot, KeyServerSetMigration};
use key_server_cluster::connection_trigger::ConnectionsAction;
use super::{MigrationState, SessionState, SessionAction, migration_state, maintain_session,
maintain_connections, select_master_node};
#[test]
fn migration_state_is_idle_when_required_but_this_node_is_not_on_the_list() {
assert_eq!(migration_state(&1.into(), &KeyServerSetSnapshot {
current_set: vec![(2.into(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(),
new_set: vec![(3.into(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(),
migration: None,
}), MigrationState::Idle);
}
#[test]
fn migration_state_is_idle_when_sets_are_equal() {
assert_eq!(migration_state(&1.into(), &KeyServerSetSnapshot {
current_set: vec![(1.into(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(),
new_set: vec![(1.into(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(),
migration: None,
}), MigrationState::Idle);
}
#[test]
fn migration_state_is_idle_when_only_address_changes() {
assert_eq!(migration_state(&1.into(), &KeyServerSetSnapshot {
current_set: vec![(1.into(), "127.0.0.1:8080".parse().unwrap())].into_iter().collect(),
new_set: vec![(1.into(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(),
migration: None,
}), MigrationState::Idle);
}
#[test]
fn migration_state_is_required_when_node_is_added() {
assert_eq!(migration_state(&1.into(), &KeyServerSetSnapshot {
current_set: vec![(1.into(), "127.0.0.1:8080".parse().unwrap())].into_iter().collect(),
new_set: vec![(1.into(), "127.0.0.1:8080".parse().unwrap()),
(2.into(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(),
migration: None,
}), MigrationState::Required);
}
#[test]
fn migration_state_is_required_when_node_is_removed() {
assert_eq!(migration_state(&1.into(), &KeyServerSetSnapshot {
current_set: vec![(1.into(), "127.0.0.1:8080".parse().unwrap()),
(2.into(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect(),
new_set: vec![(1.into(), "127.0.0.1:8080".parse().unwrap())].into_iter().collect(),
migration: None,
}), MigrationState::Required);
}
#[test]
fn migration_state_is_started_when_migration_is_some() {
assert_eq!(migration_state(&1.into(), &KeyServerSetSnapshot {
current_set: vec![(1.into(), "127.0.0.1:8080".parse().unwrap())].into_iter().collect(),
new_set: Default::default(),
migration: Some(KeyServerSetMigration {
id: Default::default(),
set: Default::default(),
master: Default::default(),
is_confirmed: Default::default(),
}),
}), MigrationState::Started);
}
#[test]
fn existing_master_is_selected_when_migration_has_started() {
assert_eq!(select_master_node(&KeyServerSetSnapshot {
current_set: vec![(1.into(), "127.0.0.1:8180".parse().unwrap())].into_iter().collect(),
new_set: vec![(2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(),
migration: Some(KeyServerSetMigration {
master: 3.into(),
..Default::default()
}),
}), &3.into());
}
#[test]
fn persistent_master_is_selected_when_migration_has_not_started_yet() {
assert_eq!(select_master_node(&KeyServerSetSnapshot {
current_set: vec![(1.into(), "127.0.0.1:8180".parse().unwrap()),
(2.into(), "127.0.0.1:8180".parse().unwrap())].into_iter().collect(),
new_set: vec![(2.into(), "127.0.0.1:8181".parse().unwrap()),
(4.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(),
migration: None,
}), &2.into());
}
#[test]
fn new_master_is_selected_in_worst_case() {
assert_eq!(select_master_node(&KeyServerSetSnapshot {
current_set: vec![(1.into(), "127.0.0.1:8180".parse().unwrap()),
(2.into(), "127.0.0.1:8180".parse().unwrap())].into_iter().collect(),
new_set: vec![(3.into(), "127.0.0.1:8181".parse().unwrap()),
(4.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(),
migration: None,
}), &3.into());
}
#[test]
fn maintain_connections_returns_none_when_session_is_active() {
assert_eq!(maintain_connections(MigrationState::Required,
SessionState::Active(Default::default())), None);
}
#[test]
fn maintain_connections_connects_to_current_set_when_no_migration() {
assert_eq!(maintain_connections(MigrationState::Idle,
SessionState::Idle), Some(ConnectionsAction::ConnectToCurrentSet));
}
#[test]
fn maintain_connections_connects_to_current_and_old_set_when_migration_is_required() {
assert_eq!(maintain_connections(MigrationState::Required,
SessionState::Idle), Some(ConnectionsAction::ConnectToCurrentAndMigrationSet));
}
#[test]
fn maintain_connections_connects_to_current_and_old_set_when_migration_is_started() {
assert_eq!(maintain_connections(MigrationState::Started,
SessionState::Idle), Some(ConnectionsAction::ConnectToCurrentAndMigrationSet));
}
#[test]
fn maintain_sessions_does_nothing_if_no_session_and_no_migration() {
assert_eq!(maintain_session(&1.into(), &Default::default(), &Default::default(),
MigrationState::Idle, SessionState::Idle), None);
}
#[test]
fn maintain_session_does_nothing_when_migration_required_on_slave_node_and_no_session() {
assert_eq!(maintain_session(&2.into(), &vec![2.into()].into_iter().collect(), &KeyServerSetSnapshot {
current_set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(),
new_set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap()),
(2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(),
migration: None,
}, MigrationState::Required, SessionState::Idle), None);
}
#[test]
fn maintain_session_does_nothing_when_migration_started_on_slave_node_and_no_session() {
assert_eq!(maintain_session(&2.into(), &vec![2.into()].into_iter().collect(), &KeyServerSetSnapshot {
current_set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(),
new_set: Default::default(),
migration: Some(KeyServerSetMigration {
master: 1.into(),
set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap()),
(2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(),
..Default::default()
}),
}, MigrationState::Started, SessionState::Idle), None);
}
#[test]
fn maintain_session_does_nothing_when_migration_started_on_master_node_and_no_session_and_not_connected_to_current_nodes() {
assert_eq!(maintain_session(&1.into(), &Default::default(), &KeyServerSetSnapshot {
current_set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap()),
(2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(),
new_set: Default::default(),
migration: Some(KeyServerSetMigration {
master: 1.into(),
set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(),
..Default::default()
}),
}, MigrationState::Started, SessionState::Idle), None);
}
#[test]
fn maintain_session_does_nothing_when_migration_started_on_master_node_and_no_session_and_not_connected_to_migration_nodes() {
assert_eq!(maintain_session(&1.into(), &Default::default(), &KeyServerSetSnapshot {
current_set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(),
new_set: Default::default(),
migration: Some(KeyServerSetMigration {
master: 1.into(),
set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap()),
(2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(),
..Default::default()
}),
}, MigrationState::Started, SessionState::Idle), None);
}
#[test]
fn maintain_session_starts_session_when_migration_started_on_master_node_and_no_session() {
assert_eq!(maintain_session(&1.into(), &vec![2.into()].into_iter().collect(), &KeyServerSetSnapshot {
current_set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(),
new_set: Default::default(),
migration: Some(KeyServerSetMigration {
master: 1.into(),
set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap()),
(2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(),
..Default::default()
}),
}, MigrationState::Started, SessionState::Idle), Some(SessionAction::Start));
}
#[test]
fn maintain_session_does_nothing_when_both_migration_and_session_are_started() {
assert_eq!(maintain_session(&1.into(), &vec![2.into()].into_iter().collect(), &KeyServerSetSnapshot {
current_set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(),
new_set: Default::default(),
migration: Some(KeyServerSetMigration {
master: 1.into(),
set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap()),
(2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(),
..Default::default()
}),
}, MigrationState::Started, SessionState::Active(Default::default())), None);
}
#[test]
fn maintain_session_confirms_migration_when_active_and_session_has_finished_on_new_node() {
assert_eq!(maintain_session(&1.into(), &vec![2.into()].into_iter().collect(), &KeyServerSetSnapshot {
current_set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(),
new_set: Default::default(),
migration: Some(KeyServerSetMigration {
master: 1.into(),
set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap()),
(2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(),
..Default::default()
}),
}, MigrationState::Started, SessionState::Finished(Default::default())), Some(SessionAction::ConfirmAndDrop(Default::default())));
}
#[test]
fn maintain_session_drops_session_when_active_and_session_has_finished_on_removed_node() {
assert_eq!(maintain_session(&1.into(), &vec![2.into()].into_iter().collect(), &KeyServerSetSnapshot {
current_set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap()),
(2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(),
new_set: Default::default(),
migration: Some(KeyServerSetMigration {
master: 2.into(),
set: vec![(2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(),
..Default::default()
}),
}, MigrationState::Started, SessionState::Finished(Default::default())), Some(SessionAction::Drop));
}
#[test]
fn maintain_session_drops_session_when_active_and_session_has_failed() {
assert_eq!(maintain_session(&1.into(), &vec![2.into()].into_iter().collect(), &KeyServerSetSnapshot {
current_set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(),
new_set: Default::default(),
migration: Some(KeyServerSetMigration {
master: 1.into(),
set: vec![(1.into(), "127.0.0.1:8181".parse().unwrap()),
(2.into(), "127.0.0.1:8181".parse().unwrap())].into_iter().collect(),
..Default::default()
}),
}, MigrationState::Started, SessionState::Failed(Default::default())), Some(SessionAction::Drop));
}
#[test]
fn maintain_session_detects_abnormal_when_no_migration_and_active_session() {
assert_eq!(maintain_session(&Default::default(), &Default::default(), &Default::default(),
MigrationState::Idle, SessionState::Active(Default::default())), Some(SessionAction::DropAndRetry));
}
#[test]
fn maintain_session_detects_abnormal_when_no_migration_and_finished_session() {
assert_eq!(maintain_session(&Default::default(), &Default::default(), &Default::default(),
MigrationState::Idle, SessionState::Finished(Default::default())), Some(SessionAction::DropAndRetry));
}
#[test]
fn maintain_session_detects_abnormal_when_no_migration_and_failed_session() {
assert_eq!(maintain_session(&Default::default(), &Default::default(), &Default::default(),
MigrationState::Idle, SessionState::Failed(Default::default())), Some(SessionAction::DropAndRetry));
}
#[test]
fn maintain_session_detects_abnormal_when_required_migration_and_active_session() {
assert_eq!(maintain_session(&Default::default(), &Default::default(), &Default::default(),
MigrationState::Required, SessionState::Active(Default::default())), Some(SessionAction::DropAndRetry));
}
#[test]
fn maintain_session_detects_abnormal_when_required_migration_and_finished_session() {
assert_eq!(maintain_session(&Default::default(), &Default::default(), &Default::default(),
MigrationState::Required, SessionState::Finished(Default::default())), Some(SessionAction::DropAndRetry));
}
#[test]
fn maintain_session_detects_abnormal_when_required_migration_and_failed_session() {
assert_eq!(maintain_session(&Default::default(), &Default::default(), &Default::default(),
MigrationState::Required, SessionState::Failed(Default::default())), Some(SessionAction::DropAndRetry));
}
#[test]
fn maintain_session_detects_abnormal_when_active_migration_and_active_session_with_different_id() {
assert_eq!(maintain_session(&Default::default(), &Default::default(), &KeyServerSetSnapshot {
migration: Some(KeyServerSetMigration {
id: 0.into(),
..Default::default()
}),
..Default::default()
}, MigrationState::Started, SessionState::Active(Some(1.into()))), Some(SessionAction::DropAndRetry));
}
#[test]
fn maintain_session_detects_abnormal_when_active_migration_and_finished_session_with_different_id() {
assert_eq!(maintain_session(&Default::default(), &Default::default(), &KeyServerSetSnapshot {
migration: Some(KeyServerSetMigration {
id: 0.into(),
..Default::default()
}),
..Default::default()
}, MigrationState::Started, SessionState::Finished(Some(1.into()))), Some(SessionAction::DropAndRetry));
}
#[test]
fn maintain_session_detects_abnormal_when_active_migration_and_failed_session_with_different_id() {
assert_eq!(maintain_session(&Default::default(), &Default::default(), &KeyServerSetSnapshot {
migration: Some(KeyServerSetMigration {
id: 0.into(),
..Default::default()
}),
..Default::default()
}, MigrationState::Started, SessionState::Failed(Some(1.into()))), Some(SessionAction::DropAndRetry));
}
}

View File

@ -61,20 +61,11 @@ impl<F, T> Future for Deadline<F> where F: Future<Item = T, Error = io::Error> {
#[cfg(test)]
mod tests {
use std::io;
use std::time::Duration;
use futures::{Future, empty, done};
use futures::{Future, done};
use tokio_core::reactor::Core;
use super::{deadline, DeadlineStatus};
//#[test] TODO: not working
fn _deadline_timeout_works() {
let mut core = Core::new().unwrap();
let deadline = deadline(Duration::from_millis(1), &core.handle(), empty::<(), io::Error>()).unwrap();
core.turn(Some(Duration::from_millis(3)));
assert_eq!(deadline.wait().unwrap(), DeadlineStatus::Timeout);
}
#[test]
fn deadline_result_works() {
let mut core = Core::new().unwrap();

View File

@ -18,7 +18,7 @@ use std::collections::{BTreeMap, BTreeSet};
use key_server_cluster::{Error, NodeId};
use key_server_cluster::jobs::job_session::{JobExecutor, JobTransport, JobPartialRequestAction, JobPartialResponseAction};
/// No-work job to use in generics (TODO: create separate ShareChangeConsensusSession && remove this)
/// No-work job to use in generics (TODO [Refac]: create separate ShareChangeConsensusSession && remove this)
pub struct DummyJob;
impl JobExecutor for DummyJob {
@ -43,7 +43,7 @@ impl JobExecutor for DummyJob {
}
}
/// No-work job transport to use in generics (TODO: create separate ShareChangeConsensusSession && remove this)
/// No-work job transport to use in generics (TODO [Refac]: create separate ShareChangeConsensusSession && remove this)
pub struct DummyJobTransport;
impl JobTransport for DummyJobTransport {

View File

@ -133,7 +133,7 @@ impl JobExecutor for SigningJob {
if Some(&partial_response.request_id) != self.request_id.as_ref() {
return Ok(JobPartialResponseAction::Ignore);
}
// TODO: check_signature_share()
// TODO [Trust]: check_signature_share()
Ok(JobPartialResponseAction::Accept)
}

View File

@ -63,7 +63,7 @@ impl JobExecutor for UnknownSessionsJob {
Ok(JobPartialResponseAction::Accept)
}
// TODO: optimizations:
// TODO [Opt]:
// currently ALL unknown sessions are sent at once - it is better to limit messages by size/len => add partial-partial responses
fn compute_response(&self, partial_responses: &BTreeMap<NodeId, BTreeSet<SessionId>>) -> Result<BTreeMap<SessionId, BTreeSet<NodeId>>, Error> {
let mut result: BTreeMap<SessionId, BTreeSet<NodeId>> = BTreeMap::new();

View File

@ -368,7 +368,7 @@ pub fn compute_signature_share<'a, I>(threshold: usize, combined_hash: &Secret,
/// Check signature share.
pub fn _check_signature_share<'a, I>(_combined_hash: &Secret, _signature_share: &Secret, _public_share: &Public, _one_time_public_share: &Public, _node_numbers: I)
-> Result<bool, Error> where I: Iterator<Item=&'a Secret> {
// TODO: in paper partial signature is checked using comparison:
// TODO [Trust]: in paper partial signature is checked using comparison:
// sig[i] * T = r[i] - c * lagrange_coeff(i) * y[i]
// => (k[i] - c * lagrange_coeff(i) * s[i]) * T = r[i] - c * lagrange_coeff(i) * y[i]
// => k[i] * T - c * lagrange_coeff(i) * s[i] * T = k[i] * T - c * lagrange_coeff(i) * y[i]

View File

@ -378,6 +378,8 @@ pub struct ConfirmConsensusInitialization {
/// Node is asked to be part of servers-set consensus group.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct InitializeConsensusSessionWithServersSet {
/// Migration id (if any).
pub migration_id: Option<SerializableH256>,
/// Old nodes set.
pub old_nodes_set: BTreeSet<MessageNodeId>,
/// New nodes set.
@ -878,6 +880,19 @@ impl Message {
}
}
pub fn is_error_message(&self) -> bool {
match *self {
Message::Generation(GenerationMessage::SessionError(_)) => true,
Message::Encryption(EncryptionMessage::EncryptionSessionError(_)) => true,
Message::Decryption(DecryptionMessage::DecryptionSessionError(_)) => true,
Message::Signing(SigningMessage::SigningConsensusMessage(_)) => true,
Message::KeyVersionNegotiation(KeyVersionNegotiationMessage::KeyVersionsError(_)) => true,
Message::ShareAdd(ShareAddMessage::ShareAddError(_)) => true,
Message::ServersSetChange(ServersSetChangeMessage::ServersSetChangeError(_)) => true,
_ => false,
}
}
pub fn is_exclusive_session_message(&self) -> bool {
match *self {
Message::ServersSetChange(_) => true,

View File

@ -24,7 +24,7 @@ pub use super::traits::NodeKeyPair;
pub use super::types::all::{NodeId, EncryptedDocumentKeyShadow};
pub use super::acl_storage::AclStorage;
pub use super::key_storage::{KeyStorage, DocumentKeyShare, DocumentKeyShareVersion};
pub use super::key_server_set::KeyServerSet;
pub use super::key_server_set::{is_migration_required, KeyServerSet, KeyServerSetSnapshot, KeyServerSetMigration};
pub use super::serialization::{SerializableSignature, SerializableH256, SerializableSecret, SerializablePublic, SerializableMessageHash};
pub use self::cluster::{ClusterCore, ClusterConfiguration, ClusterClient};
pub use self::cluster_sessions::{ClusterSession, ClusterSessionsListener};
@ -187,6 +187,8 @@ pub use self::client_sessions::signing_session;
mod cluster;
mod cluster_sessions;
mod cluster_sessions_creator;
mod connection_trigger;
mod connection_trigger_with_migration;
mod io;
mod jobs;
pub mod math;

View File

@ -16,35 +16,78 @@
use std::sync::Arc;
use std::net::SocketAddr;
use std::collections::BTreeMap;
use futures::{future, Future};
use std::collections::{BTreeMap, HashSet};
use futures::{future, Future, IntoFuture};
use parking_lot::Mutex;
use ethcore::filter::Filter;
use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify};
use ethkey::public_to_address;
use native_contracts::KeyServerSet as KeyServerSetContract;
use hash::keccak;
use bigint::hash::H256;
use util::Address;
use bytes::Bytes;
use types::all::{Error, Public, NodeAddress};
use types::all::{Error, Public, NodeAddress, NodeId};
use trusted_client::TrustedClient;
use {NodeKeyPair};
type BoxFuture<A, B> = Box<Future<Item = A, Error = B> + Send>;
/// Name of KeyServerSet contract in registry.
const KEY_SERVER_SET_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_server_set";
/// Number of blocks (since latest new_set change) required before actually starting migration.
const MIGRATION_CONFIRMATIONS_REQUIRED: u64 = 5;
/// Number of blocks before the same-migration transaction (be it start or confirmation) will be retried.
const TRANSACTION_RETRY_INTERVAL_BLOCKS: u64 = 30;
/// Key server has been added to the set.
const ADDED_EVENT_NAME: &'static [u8] = &*b"KeyServerAdded(address)";
/// Key server has been removed from the set.
const REMOVED_EVENT_NAME: &'static [u8] = &*b"KeyServerRemoved(address)";
/// Migration has started.
const MIGRATION_STARTED_EVENT_NAME: &'static [u8] = &*b"MigrationStarted()";
/// Migration has completed.
const MIGRATION_COMPLETED_EVENT_NAME: &'static [u8] = &*b"MigrationCompleted()";
lazy_static! {
static ref ADDED_EVENT_NAME_HASH: H256 = keccak(ADDED_EVENT_NAME);
static ref REMOVED_EVENT_NAME_HASH: H256 = keccak(REMOVED_EVENT_NAME);
static ref MIGRATION_STARTED_EVENT_NAME_HASH: H256 = keccak(MIGRATION_STARTED_EVENT_NAME);
static ref MIGRATION_COMPLETED_EVENT_NAME_HASH: H256 = keccak(MIGRATION_COMPLETED_EVENT_NAME);
}
/// Key Server set
#[derive(Default, Debug, Clone, PartialEq)]
/// Key Server Set state.
pub struct KeyServerSetSnapshot {
/// Current set of key servers.
pub current_set: BTreeMap<NodeId, SocketAddr>,
/// New set of key servers.
pub new_set: BTreeMap<NodeId, SocketAddr>,
/// Current migration data.
pub migration: Option<KeyServerSetMigration>,
}
#[derive(Default, Debug, Clone, PartialEq)]
/// Key server set migration.
pub struct KeyServerSetMigration {
/// Migration id.
pub id: H256,
/// Migration set of key servers. It is the new_set at the moment of migration start.
pub set: BTreeMap<NodeId, SocketAddr>,
/// Master node of the migration process.
pub master: NodeId,
/// Is migration confirmed by this node?
pub is_confirmed: bool,
}
/// Key Server Set
pub trait KeyServerSet: Send + Sync {
/// Get set of configured key servers
fn get(&self) -> BTreeMap<Public, SocketAddr>;
/// Get server set state.
fn snapshot(&self) -> KeyServerSetSnapshot;
/// Start migration.
fn start_migration(&self, migration_id: H256);
/// Confirm migration.
fn confirm_migration(&self, migration_id: H256);
}
/// On-chain Key Server set implementation.
@ -53,21 +96,49 @@ pub struct OnChainKeyServerSet {
contract: Mutex<CachedContract>,
}
#[derive(Default, Debug, Clone, PartialEq)]
/// Non-finalized new_set.
struct FutureNewSet {
/// New servers set.
pub new_set: BTreeMap<NodeId, SocketAddr>,
/// Hash of block, when this set has appeared for first time.
pub block: H256,
}
#[derive(Default, Debug, Clone, PartialEq)]
/// Migration-related transaction information.
struct PreviousMigrationTransaction {
/// Migration id.
pub migration_id: H256,
/// Latest actual block number at the time this transaction has been sent.
pub block: u64,
}
/// Cached on-chain Key Server set contract.
struct CachedContract {
/// Blockchain client.
client: TrustedClient,
/// Contract address.
contract_addr: Option<Address>,
/// Active set of key servers.
key_servers: BTreeMap<Public, SocketAddr>,
contract: Option<KeyServerSetContract>,
/// Is auto-migrate enabled?
auto_migrate_enabled: bool,
/// Current contract state.
snapshot: KeyServerSetSnapshot,
/// Scheduled contract state (if any).
future_new_set: Option<FutureNewSet>,
/// Previous start migration transaction.
start_migration_tx: Option<PreviousMigrationTransaction>,
/// Previous confirm migration transaction.
confirm_migration_tx: Option<PreviousMigrationTransaction>,
/// This node key pair.
self_key_pair: Arc<NodeKeyPair>,
}
impl OnChainKeyServerSet {
pub fn new(trusted_client: TrustedClient, key_servers: BTreeMap<Public, NodeAddress>) -> Result<Arc<Self>, Error> {
pub fn new(trusted_client: TrustedClient, self_key_pair: Arc<NodeKeyPair>, auto_migrate_enabled: bool, key_servers: BTreeMap<Public, NodeAddress>) -> Result<Arc<Self>, Error> {
let client = trusted_client.get_untrusted();
let key_server_set = Arc::new(OnChainKeyServerSet {
contract: Mutex::new(CachedContract::new(trusted_client, key_servers)?),
contract: Mutex::new(CachedContract::new(trusted_client, self_key_pair, auto_migrate_enabled, key_servers)?),
});
client
.ok_or(Error::Internal("Constructing OnChainKeyServerSet without active Client".into()))?
@ -77,8 +148,16 @@ impl OnChainKeyServerSet {
}
impl KeyServerSet for OnChainKeyServerSet {
fn get(&self) -> BTreeMap<Public, SocketAddr> {
self.contract.lock().get()
fn snapshot(&self) -> KeyServerSetSnapshot {
self.contract.lock().snapshot()
}
fn start_migration(&self, migration_id: H256) {
self.contract.lock().start_migration(migration_id)
}
fn confirm_migration(&self, migration_id: H256) {
self.contract.lock().confirm_migration(migration_id);
}
}
@ -91,107 +170,384 @@ impl ChainNotify for OnChainKeyServerSet {
}
impl CachedContract {
pub fn new(client: TrustedClient, key_servers: BTreeMap<Public, NodeAddress>) -> Result<Self, Error> {
let mut cached_contract = CachedContract {
pub fn new(client: TrustedClient, self_key_pair: Arc<NodeKeyPair>, auto_migrate_enabled: bool, key_servers: BTreeMap<Public, NodeAddress>) -> Result<Self, Error> {
let server_set = key_servers.into_iter()
.map(|(p, addr)| {
let addr = format!("{}:{}", addr.address, addr.port).parse()
.map_err(|err| Error::Internal(format!("error parsing node address: {}", err)))?;
Ok((p, addr))
})
.collect::<Result<BTreeMap<_, _>, Error>>()?;
Ok(CachedContract {
client: client,
contract_addr: None,
key_servers: key_servers.into_iter()
.map(|(p, addr)| {
let addr = format!("{}:{}", addr.address, addr.port).parse()
.map_err(|err| Error::Internal(format!("error parsing node address: {}", err)))?;
Ok((p, addr))
})
.collect::<Result<BTreeMap<_, _>, Error>>()?,
};
if let Some(client) = cached_contract.client.get() {
let key_server_contract_address = client.registry_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.to_owned());
// only initialize from contract if it is installed. otherwise - use default nodes
// once the contract is installed, all default nodes are lost (if not in the contract' set)
if key_server_contract_address.is_some() {
cached_contract.read_from_registry(&*client, key_server_contract_address);
}
}
Ok(cached_contract)
contract: None,
auto_migrate_enabled: auto_migrate_enabled,
future_new_set: None,
confirm_migration_tx: None,
start_migration_tx: None,
snapshot: KeyServerSetSnapshot {
current_set: server_set.clone(),
new_set: server_set,
..Default::default()
},
self_key_pair: self_key_pair,
})
}
pub fn update(&mut self, enacted: Vec<H256>, retracted: Vec<H256>) {
if let Some(client) = self.client.get() {
let new_contract_addr = client.registry_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.to_owned());
// read new snapshot from reqistry (if something has chnaged)
self.read_from_registry_if_required(&*client, enacted, retracted);
// new contract installed => read nodes set from the contract
if self.contract_addr.as_ref() != new_contract_addr.as_ref() {
self.read_from_registry(&*client, new_contract_addr);
// update number of confirmations (if there's future new set)
self.update_number_of_confirmations_if_required(&*client);
}
}
fn snapshot(&self) -> KeyServerSetSnapshot {
self.snapshot.clone()
}
fn start_migration(&mut self, migration_id: H256) {
// trust is not needed here, because it is the reaction to the read of the trusted client
if let (Some(client), Some(contract)) = (self.client.get_untrusted(), self.contract.as_ref()) {
// check if we need to send start migration transaction
if !update_last_transaction_block(&*client, &migration_id, &mut self.start_migration_tx) {
return;
}
// check for contract events
let is_set_changed = self.contract_addr.is_some() && enacted.iter()
.chain(retracted.iter())
.any(|block_hash| !client.logs(Filter {
from_block: BlockId::Hash(block_hash.clone()),
to_block: BlockId::Hash(block_hash.clone()),
address: self.contract_addr.clone().map(|a| vec![a]),
topics: vec![
Some(vec![*ADDED_EVENT_NAME_HASH, *REMOVED_EVENT_NAME_HASH]),
None,
None,
None,
],
limit: Some(1),
}).is_empty());
// to simplify processing - just re-read the whole nodes set from the contract
if is_set_changed {
self.read_from_registry(&*client, new_contract_addr);
// prepare transaction data
let transaction_data = match contract.encode_start_migration_input(migration_id) {
Ok(transaction_data) => transaction_data,
Err(error) => {
warn!(target: "secretstore_net", "{}: failed to prepare auto-migration start transaction: {}",
self.self_key_pair.public(), error);
return;
},
};
// send transaction
if let Err(error) = client.transact_contract(contract.address.clone(), transaction_data) {
warn!(target: "secretstore_net", "{}: failed to submit auto-migration start transaction: {}",
self.self_key_pair.public(), error);
} else {
trace!(target: "secretstore_net", "{}: sent auto-migration start transaction",
self.self_key_pair.public(), );
}
}
}
pub fn get(&self) -> BTreeMap<Public, SocketAddr> {
self.key_servers.clone()
fn confirm_migration(&mut self, migration_id: H256) {
// trust is not needed here, because we have already completed the action
if let (Some(client), Some(contract)) = (self.client.get(), self.contract.as_ref()) {
// check if we need to send start migration transaction
if !update_last_transaction_block(&*client, &migration_id, &mut self.confirm_migration_tx) {
return;
}
// prepare transaction data
let transaction_data = match contract.encode_confirm_migration_input(migration_id) {
Ok(transaction_data) => transaction_data,
Err(error) => {
warn!(target: "secretstore_net", "{}: failed to prepare auto-migration confirmation transaction: {}",
self.self_key_pair.public(), error);
return;
},
};
// send transaction
if let Err(error) = client.transact_contract(contract.address.clone(), transaction_data) {
warn!(target: "secretstore_net", "{}: failed to submit auto-migration confirmation transaction: {}",
self.self_key_pair.public(), error);
} else {
trace!(target: "secretstore_net", "{}: sent auto-migration confirm transaction",
self.self_key_pair.public());
}
}
}
fn read_from_registry_if_required(&mut self, client: &Client, enacted: Vec<H256>, retracted: Vec<H256>) {
// read new contract from registry
let new_contract_addr = client.registry_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.to_owned());
// new contract installed => read nodes set from the contract
if self.contract.as_ref().map(|c| &c.address) != new_contract_addr.as_ref() {
self.read_from_registry(&*client, new_contract_addr);
return;
}
// check for contract events
let is_set_changed = self.contract.is_some() && enacted.iter()
.chain(retracted.iter())
.any(|block_hash| !client.logs(Filter {
from_block: BlockId::Hash(block_hash.clone()),
to_block: BlockId::Hash(block_hash.clone()),
address: self.contract.as_ref().map(|c| vec![c.address.clone()]),
topics: vec![
Some(vec![*ADDED_EVENT_NAME_HASH, *REMOVED_EVENT_NAME_HASH,
*MIGRATION_STARTED_EVENT_NAME_HASH, *MIGRATION_COMPLETED_EVENT_NAME_HASH]),
None,
None,
None,
],
limit: Some(1),
}).is_empty());
// to simplify processing - just re-read the whole nodes set from the contract
if is_set_changed {
self.read_from_registry(&*client, new_contract_addr);
}
}
fn read_from_registry(&mut self, client: &Client, new_contract_address: Option<Address>) {
self.key_servers = new_contract_address.map(|contract_addr| {
self.contract = new_contract_address.map(|contract_addr| {
trace!(target: "secretstore", "Configuring for key server set contract from {}", contract_addr);
KeyServerSetContract::new(contract_addr)
})
.map(|contract| {
let mut key_servers = BTreeMap::new();
let do_call = |a, d| future::done(client.call_contract(BlockId::Latest, a, d));
let key_servers_list = contract.get_key_servers(do_call).wait()
.map_err(|err| { trace!(target: "secretstore", "Error {} reading list of key servers from contract", err); err })
.unwrap_or_default();
for key_server in key_servers_list {
let key_server_public = contract.get_key_server_public(
|a, d| future::done(client.call_contract(BlockId::Latest, a, d)), key_server).wait()
.and_then(|p| if p.len() == 64 { Ok(Public::from_slice(&p)) } else { Err(format!("Invalid public length {}", p.len())) });
let key_server_ip = contract.get_key_server_address(
|a, d| future::done(client.call_contract(BlockId::Latest, a, d)), key_server).wait()
.and_then(|a| a.parse().map_err(|e| format!("Invalid ip address: {}", e)));
});
// only add successfully parsed nodes
match (key_server_public, key_server_ip) {
(Ok(key_server_public), Ok(key_server_ip)) => { key_servers.insert(key_server_public, key_server_ip); },
(Err(public_err), _) => warn!(target: "secretstore_net", "received invalid public from key server set contract: {}", public_err),
(_, Err(ip_err)) => warn!(target: "secretstore_net", "received invalid IP from key server set contract: {}", ip_err),
}
let contract = match self.contract.as_ref() {
Some(contract) => contract,
None => {
// no contract installed => empty snapshot
// WARNING: after restart current_set will be reset to the set from configuration file
// even though we have reset to empty set here. We are not considerning this as an issue
// because it is actually the issue of administrator.
self.snapshot = Default::default();
self.future_new_set = None;
return;
},
};
let do_call = |a, d| future::done(client.call_contract(BlockId::Latest, a, d));
let current_set = Self::read_key_server_set(&contract, &do_call, &KeyServerSetContract::get_current_key_servers,
&KeyServerSetContract::get_current_key_server_public, &KeyServerSetContract::get_current_key_server_address);
// read migration-related data if auto migration is enabled
let (new_set, migration) = match self.auto_migrate_enabled {
true => {
let new_set = Self::read_key_server_set(&contract, &do_call, &KeyServerSetContract::get_new_key_servers,
&KeyServerSetContract::get_new_key_server_public, &KeyServerSetContract::get_new_key_server_address);
let migration_set = Self::read_key_server_set(&contract, &do_call, &KeyServerSetContract::get_migration_key_servers,
&KeyServerSetContract::get_migration_key_server_public, &KeyServerSetContract::get_migration_key_server_address);
let migration_id = match migration_set.is_empty() {
false => contract.get_migration_id(&do_call).wait()
.map_err(|err| { trace!(target: "secretstore", "Error {} reading migration id from contract", err); err })
.ok(),
true => None,
};
let migration_master = match migration_set.is_empty() {
false => contract.get_migration_master(&do_call).wait()
.map_err(|err| { trace!(target: "secretstore", "Error {} reading migration master from contract", err); err })
.ok()
.and_then(|address| current_set.keys().chain(migration_set.keys())
.find(|public| public_to_address(public) == address)
.cloned()),
true => None,
};
let is_migration_confirmed = match migration_set.is_empty() {
false if current_set.contains_key(self.self_key_pair.public()) || migration_set.contains_key(self.self_key_pair.public()) =>
contract.is_migration_confirmed(&do_call, self.self_key_pair.address()).wait()
.map_err(|err| { trace!(target: "secretstore", "Error {} reading migration confirmation from contract", err); err })
.ok(),
_ => None,
};
let migration = match (migration_set.is_empty(), migration_id, migration_master, is_migration_confirmed) {
(false, Some(migration_id), Some(migration_master), Some(is_migration_confirmed)) =>
Some(KeyServerSetMigration {
id: migration_id,
master: migration_master,
set: migration_set,
is_confirmed: is_migration_confirmed,
}),
_ => None,
};
(new_set, migration)
}
key_servers
})
.unwrap_or_default();
self.contract_addr = new_contract_address;
false => (current_set.clone(), None),
};
let mut new_snapshot = KeyServerSetSnapshot {
current_set: current_set,
new_set: new_set,
migration: migration,
};
// we might want to adjust new_set if auto migration is enabled
if self.auto_migrate_enabled {
let block = client.block_hash(BlockId::Latest).unwrap_or_default();
update_future_set(&mut self.future_new_set, &mut new_snapshot, block);
}
self.snapshot = new_snapshot;
}
fn read_key_server_set<F, U, FL, FP, FA>(contract: &KeyServerSetContract, do_call: F, read_list: FL, read_public: FP, read_address: FA) -> BTreeMap<Public, SocketAddr>
where
F: FnOnce(Address, Vec<u8>) -> U + Copy,
U: IntoFuture<Item=Vec<u8>, Error=String>,
U::Future: Send + 'static,
FL: Fn(&KeyServerSetContract, F) -> BoxFuture<Vec<Address>, String>,
FP: Fn(&KeyServerSetContract, F, Address) -> BoxFuture<Vec<u8>, String>,
FA: Fn(&KeyServerSetContract, F, Address) -> BoxFuture<String, String> {
let mut key_servers = BTreeMap::new();
let mut key_servers_addresses = HashSet::new();
let key_servers_list = read_list(contract, do_call).wait()
.map_err(|err| { warn!(target: "secretstore_net", "error {} reading list of key servers from contract", err); err })
.unwrap_or_default();
for key_server in key_servers_list {
let key_server_public = read_public(contract, do_call, key_server).wait()
.and_then(|p| if p.len() == 64 { Ok(Public::from_slice(&p)) } else { Err(format!("Invalid public length {}", p.len())) });
let key_server_address: Result<SocketAddr, _> = read_address(contract, do_call, key_server).wait()
.and_then(|a| a.parse().map_err(|e| format!("Invalid ip address: {}", e)));
// only add successfully parsed nodes
match (key_server_public, key_server_address) {
(Ok(key_server_public), Ok(key_server_address)) => {
if !key_servers_addresses.insert(key_server_address.clone()) {
warn!(target: "secretstore_net", "the same address ({}) specified twice in list of contracts. Ignoring server {}",
key_server_address, key_server_public);
continue;
}
key_servers.insert(key_server_public, key_server_address);
},
(Err(public_err), _) => warn!(target: "secretstore_net", "received invalid public from key server set contract: {}", public_err),
(_, Err(ip_err)) => warn!(target: "secretstore_net", "received invalid IP from key server set contract: {}", ip_err),
}
}
key_servers
}
fn update_number_of_confirmations_if_required(&mut self, client: &BlockChainClient) {
if !self.auto_migrate_enabled {
return;
}
update_number_of_confirmations(
&|| latest_block_hash(&*client),
&|block| block_confirmations(&*client, block),
&mut self.future_new_set, &mut self.snapshot);
}
}
/// Check if two sets are equal (in terms of migration requirements). We do not need migration if only
/// addresses are changed - simply adjusting connections is enough in this case.
pub fn is_migration_required(current_set: &BTreeMap<NodeId, SocketAddr>, new_set: &BTreeMap<NodeId, SocketAddr>) -> bool {
let no_nodes_removed = current_set.keys().all(|n| new_set.contains_key(n));
let no_nodes_added = new_set.keys().all(|n| current_set.contains_key(n));
!no_nodes_removed || !no_nodes_added
}
fn update_future_set(future_new_set: &mut Option<FutureNewSet>, new_snapshot: &mut KeyServerSetSnapshot, block: H256) {
// migration has already started => no need to delay visibility
if new_snapshot.migration.is_some() {
*future_new_set = None;
return;
}
// new no migration is required => no need to delay visibility
if !is_migration_required(&new_snapshot.current_set, &new_snapshot.new_set) {
*future_new_set = None;
return;
}
// when auto-migrate is enabled, we do not want to start migration right after new_set is changed, because of:
// 1) there could be a fork && we could start migration to forked version (and potentially lose secrets)
// 2) there must be some period for new_set changes finalization (i.e. adding/removing more servers)
let mut new_set = new_snapshot.current_set.clone();
::std::mem::swap(&mut new_set, &mut new_snapshot.new_set);
// if nothing has changed in future_new_set, then we want to preserve previous block hash
let block = match Some(&new_set) == future_new_set.as_ref().map(|f| &f.new_set) {
true => future_new_set.as_ref().map(|f| &f.block).cloned().unwrap_or_else(|| block),
false => block,
};
*future_new_set = Some(FutureNewSet {
new_set: new_set,
block: block,
});
}
fn update_number_of_confirmations<F1: Fn() -> H256, F2: Fn(H256) -> Option<u64>>(latest_block: &F1, confirmations: &F2, future_new_set: &mut Option<FutureNewSet>, snapshot: &mut KeyServerSetSnapshot) {
match future_new_set.as_mut() {
// no future new set is scheduled => do nothing,
None => return,
// else we should calculate number of confirmations for future new set
Some(future_new_set) => match confirmations(future_new_set.block.clone()) {
// we have enough confirmations => should move new_set from future to snapshot
Some(confirmations) if confirmations >= MIGRATION_CONFIRMATIONS_REQUIRED => (),
// not enough confirmations => do nothing
Some(_) => return,
// if number of confirmations is None, then reorg has happened && we need to reset block
// (some more intelligent startegy is possible, but let's stick to simplest one)
None => {
future_new_set.block = latest_block();
return;
}
}
}
let future_new_set = future_new_set.take()
.expect("we only pass through match above when future_new_set is some; qed");
snapshot.new_set = future_new_set.new_set;
}
fn update_last_transaction_block(client: &Client, migration_id: &H256, previous_transaction: &mut Option<PreviousMigrationTransaction>) -> bool {
// TODO [Reliability]: add the same mechanism to the contract listener, if accepted
let last_block = client.block_number(BlockId::Latest).unwrap_or_default();
match previous_transaction.as_ref() {
// no previous transaction => send immideately
None => (),
// previous transaction has been sent for other migration process => send immideately
Some(tx) if tx.migration_id != *migration_id => (),
// if we have sent the same type of transaction recently => do nothing (hope it will be mined eventually)
// if we have sent the same transaction some time ago =>
// assume that our tx queue was full
// or we didn't have enough eth fot this tx
// or the transaction has been removed from the queue (and never reached any miner node)
// if we have restarted after sending tx => assume we have never sent it
Some(tx) => {
let last_block = client.block_number(BlockId::Latest).unwrap_or_default();
if tx.block > last_block || last_block - tx.block < TRANSACTION_RETRY_INTERVAL_BLOCKS {
return false;
}
},
}
*previous_transaction = Some(PreviousMigrationTransaction {
migration_id: migration_id.clone(),
block: last_block,
});
true
}
fn latest_block_hash(client: &BlockChainClient) -> H256 {
client.block_hash(BlockId::Latest).unwrap_or_default()
}
fn block_confirmations(client: &BlockChainClient, block: H256) -> Option<u64> {
client.block_number(BlockId::Hash(block))
.and_then(|block| client.block_number(BlockId::Latest).map(|last_block| (block, last_block)))
.map(|(block, last_block)| last_block - block)
}
#[cfg(test)]
pub mod tests {
use std::collections::BTreeMap;
use std::net::SocketAddr;
use bigint::hash::H256;
use ethkey::Public;
use super::KeyServerSet;
use super::{update_future_set, update_number_of_confirmations, FutureNewSet,
KeyServerSet, KeyServerSetSnapshot, MIGRATION_CONFIRMATIONS_REQUIRED};
#[derive(Default)]
pub struct MapKeyServerSet {
@ -207,8 +563,228 @@ pub mod tests {
}
impl KeyServerSet for MapKeyServerSet {
fn get(&self) -> BTreeMap<Public, SocketAddr> {
self.nodes.clone()
fn snapshot(&self) -> KeyServerSetSnapshot {
KeyServerSetSnapshot {
current_set: self.nodes.clone(),
new_set: self.nodes.clone(),
..Default::default()
}
}
fn start_migration(&self, _migration_id: H256) {
unimplemented!("test-only")
}
fn confirm_migration(&self, _migration_id: H256) {
unimplemented!("test-only")
}
}
#[test]
fn future_set_is_updated_to_none_when_migration_has_already_started() {
let mut future_new_set = Some(Default::default());
let mut new_snapshot = KeyServerSetSnapshot {
migration: Some(Default::default()),
..Default::default()
};
let new_snapshot_copy = new_snapshot.clone();
update_future_set(&mut future_new_set, &mut new_snapshot, Default::default());
assert_eq!(future_new_set, None);
assert_eq!(new_snapshot, new_snapshot_copy);
}
#[test]
fn future_set_is_updated_to_none_when_no_migration_is_required() {
let node_id = Default::default();
let address1 = "127.0.0.1:12000".parse().unwrap();
let address2 = "127.0.0.1:12001".parse().unwrap();
// addresses are different, but node set is the same => no migration is required
let mut future_new_set = Some(Default::default());
let mut new_snapshot = KeyServerSetSnapshot {
current_set: vec![(node_id, address1)].into_iter().collect(),
new_set: vec![(node_id, address2)].into_iter().collect(),
..Default::default()
};
let new_snapshot_copy = new_snapshot.clone();
update_future_set(&mut future_new_set, &mut new_snapshot, Default::default());
assert_eq!(future_new_set, None);
assert_eq!(new_snapshot, new_snapshot_copy);
// everything is the same => no migration is required
let mut future_new_set = Some(Default::default());
let mut new_snapshot = KeyServerSetSnapshot {
current_set: vec![(node_id, address1)].into_iter().collect(),
new_set: vec![(node_id, address1)].into_iter().collect(),
..Default::default()
};
let new_snapshot_copy = new_snapshot.clone();
update_future_set(&mut future_new_set, &mut new_snapshot, Default::default());
assert_eq!(future_new_set, None);
assert_eq!(new_snapshot, new_snapshot_copy);
}
#[test]
fn future_set_is_initialized() {
let address = "127.0.0.1:12000".parse().unwrap();
let mut future_new_set = None;
let mut new_snapshot = KeyServerSetSnapshot {
current_set: vec![(1.into(), address)].into_iter().collect(),
new_set: vec![(2.into(), address)].into_iter().collect(),
..Default::default()
};
update_future_set(&mut future_new_set, &mut new_snapshot, Default::default());
assert_eq!(future_new_set, Some(FutureNewSet {
new_set: vec![(2.into(), address)].into_iter().collect(),
block: Default::default(),
}));
assert_eq!(new_snapshot, KeyServerSetSnapshot {
current_set: vec![(1.into(), address)].into_iter().collect(),
new_set: vec![(1.into(), address)].into_iter().collect(),
..Default::default()
});
}
#[test]
fn future_set_is_updated_when_set_differs() {
let address = "127.0.0.1:12000".parse().unwrap();
let mut future_new_set = Some(FutureNewSet {
new_set: vec![(2.into(), address)].into_iter().collect(),
block: Default::default(),
});
let mut new_snapshot = KeyServerSetSnapshot {
current_set: vec![(1.into(), address)].into_iter().collect(),
new_set: vec![(3.into(), address)].into_iter().collect(),
..Default::default()
};
update_future_set(&mut future_new_set, &mut new_snapshot, 1.into());
assert_eq!(future_new_set, Some(FutureNewSet {
new_set: vec![(3.into(), address)].into_iter().collect(),
block: 1.into(),
}));
assert_eq!(new_snapshot, KeyServerSetSnapshot {
current_set: vec![(1.into(), address)].into_iter().collect(),
new_set: vec![(1.into(), address)].into_iter().collect(),
..Default::default()
});
}
#[test]
fn future_set_is_not_updated_when_set_is_the_same() {
let address = "127.0.0.1:12000".parse().unwrap();
let mut future_new_set = Some(FutureNewSet {
new_set: vec![(2.into(), address)].into_iter().collect(),
block: Default::default(),
});
let mut new_snapshot = KeyServerSetSnapshot {
current_set: vec![(1.into(), address)].into_iter().collect(),
new_set: vec![(2.into(), address)].into_iter().collect(),
..Default::default()
};
update_future_set(&mut future_new_set, &mut new_snapshot, 1.into());
assert_eq!(future_new_set, Some(FutureNewSet {
new_set: vec![(2.into(), address)].into_iter().collect(),
block: Default::default(),
}));
assert_eq!(new_snapshot, KeyServerSetSnapshot {
current_set: vec![(1.into(), address)].into_iter().collect(),
new_set: vec![(1.into(), address)].into_iter().collect(),
..Default::default()
});
}
#[test]
fn when_updating_confirmations_nothing_is_changed_if_no_future_set() {
let address = "127.0.0.1:12000".parse().unwrap();
let mut future_new_set = None;
let mut snapshot = KeyServerSetSnapshot {
current_set: vec![(1.into(), address)].into_iter().collect(),
new_set: vec![(1.into(), address)].into_iter().collect(),
..Default::default()
};
let snapshot_copy = snapshot.clone();
update_number_of_confirmations(
&|| 1.into(),
&|_| Some(MIGRATION_CONFIRMATIONS_REQUIRED),
&mut future_new_set, &mut snapshot);
assert_eq!(future_new_set, None);
assert_eq!(snapshot, snapshot_copy);
}
#[test]
fn when_updating_confirmations_migration_is_scheduled() {
let address = "127.0.0.1:12000".parse().unwrap();
let mut future_new_set = Some(FutureNewSet {
new_set: vec![(2.into(), address)].into_iter().collect(),
block: Default::default(),
});
let mut snapshot = KeyServerSetSnapshot {
current_set: vec![(1.into(), address)].into_iter().collect(),
new_set: vec![(1.into(), address)].into_iter().collect(),
..Default::default()
};
update_number_of_confirmations(
&|| 1.into(),
&|_| Some(MIGRATION_CONFIRMATIONS_REQUIRED),
&mut future_new_set, &mut snapshot);
assert_eq!(future_new_set, None);
assert_eq!(snapshot, KeyServerSetSnapshot {
current_set: vec![(1.into(), address)].into_iter().collect(),
new_set: vec![(2.into(), address)].into_iter().collect(),
..Default::default()
});
}
#[test]
fn when_updating_confirmations_migration_is_not_scheduled_when_not_enough_confirmations() {
let address = "127.0.0.1:12000".parse().unwrap();
let mut future_new_set = Some(FutureNewSet {
new_set: vec![(2.into(), address)].into_iter().collect(),
block: Default::default(),
});
let mut snapshot = KeyServerSetSnapshot {
current_set: vec![(1.into(), address)].into_iter().collect(),
new_set: vec![(1.into(), address)].into_iter().collect(),
..Default::default()
};
let future_new_set_copy = future_new_set.clone();
let snapshot_copy = snapshot.clone();
update_number_of_confirmations(
&|| 1.into(),
&|_| Some(MIGRATION_CONFIRMATIONS_REQUIRED - 1),
&mut future_new_set, &mut snapshot);
assert_eq!(future_new_set, future_new_set_copy);
assert_eq!(snapshot, snapshot_copy);
}
#[test]
fn when_updating_confirmations_migration_is_reset_when_reorganized() {
let address = "127.0.0.1:12000".parse().unwrap();
let mut future_new_set = Some(FutureNewSet {
new_set: vec![(2.into(), address)].into_iter().collect(),
block: 1.into(),
});
let mut snapshot = KeyServerSetSnapshot {
current_set: vec![(1.into(), address)].into_iter().collect(),
new_set: vec![(1.into(), address)].into_iter().collect(),
..Default::default()
};
let snapshot_copy = snapshot.clone();
update_number_of_confirmations(
&|| 2.into(),
&|_| None,
&mut future_new_set, &mut snapshot);
assert_eq!(future_new_set, Some(FutureNewSet {
new_set: vec![(2.into(), address)].into_iter().collect(),
block: 2.into(),
}));
assert_eq!(snapshot, snapshot_copy);
}
}

View File

@ -445,6 +445,7 @@ pub mod tests {
nodes: BTreeMap::new(),
allow_connecting_to_higher_nodes: false,
admin_public: None,
auto_migrate_enabled: false,
},
};

View File

@ -79,7 +79,9 @@ pub fn start(client: Arc<Client>, sync: Arc<SyncProvider>, self_key_pair: Arc<No
} else {
Arc::new(acl_storage::DummyAclStorage::default())
};
let key_server_set = key_server_set::OnChainKeyServerSet::new(trusted_client.clone(), config.cluster_config.nodes.clone())?;
let key_server_set = key_server_set::OnChainKeyServerSet::new(trusted_client.clone(), self_key_pair.clone(),
config.cluster_config.auto_migrate_enabled, config.cluster_config.nodes.clone())?;
let key_storage = Arc::new(key_storage::PersistentKeyStorage::new(&config)?);
let key_server = Arc::new(key_server::KeyServerImpl::new(&config.cluster_config, key_server_set.clone(), self_key_pair.clone(), acl_storage, key_storage.clone())?);
let cluster = key_server.cluster();

View File

@ -362,7 +362,7 @@ impl ClusterSessionsListener<GenerationSession> for ServiceContractListener {
/// Returns true when session, related to `server_key_id` must be started on this KeyServer.
fn is_processed_by_this_key_server(key_server_set: &KeyServerSet, self_key_pair: &NodeKeyPair, server_key_id: &H256) -> bool {
let servers = key_server_set.get();
let servers = key_server_set.snapshot().current_set;
let total_servers_count = servers.len();
match total_servers_count {
0 => return false,

View File

@ -16,7 +16,7 @@
use std::sync::Arc;
use ethcrypto::ecdh::agree;
use ethkey::{KeyPair, Public, Signature, Error as EthKeyError, sign};
use ethkey::{KeyPair, Public, Signature, Error as EthKeyError, sign, public_to_address};
use ethcore::account_provider::AccountProvider;
use bigint::hash::H256;
use util::Address;
@ -46,6 +46,10 @@ impl NodeKeyPair for PlainNodeKeyPair {
self.key_pair.public()
}
fn address(&self) -> Address {
public_to_address(self.key_pair.public())
}
fn sign(&self, data: &H256) -> Result<Signature, EthKeyError> {
sign(self.key_pair.secret(), data)
}
@ -73,6 +77,10 @@ impl NodeKeyPair for KeyStoreNodeKeyPair {
&self.public
}
fn address(&self) -> Address {
public_to_address(&self.public)
}
fn sign(&self, data: &H256) -> Result<Signature, EthKeyError> {
self.account_provider.sign(self.address.clone(), Some(self.password.clone()), data.clone())
.map_err(|e| EthKeyError::Custom(format!("{}", e)))

View File

@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::BTreeSet;
use ethkey::{KeyPair, Signature, Error as EthKeyError};
use ethkey::{KeyPair, Signature, Address, Error as EthKeyError};
use bigint::hash::H256;
use types::all::{Error, Public, ServerKeyId, MessageHash, EncryptedMessageSignature, RequestSignature, EncryptedDocumentKey,
EncryptedDocumentKeyShadow, NodeId};
@ -24,6 +24,8 @@ use types::all::{Error, Public, ServerKeyId, MessageHash, EncryptedMessageSignat
pub trait NodeKeyPair: Send + Sync {
/// Public portion of key.
fn public(&self) -> &Public;
/// Address of key owner.
fn address(&self) -> Address;
/// Sign data with node key.
fn sign(&self, data: &H256) -> Result<Signature, EthKeyError>;
/// Compute shared key to encrypt channel between two nodes.

View File

@ -101,6 +101,9 @@ pub struct ClusterConfiguration {
pub allow_connecting_to_higher_nodes: bool,
/// Administrator public key.
pub admin_public: Option<Public>,
/// Should key servers set change session should be started when servers set changes.
/// This will only work when servers set is configured using KeyServerSet contract.
pub auto_migrate_enabled: bool,
}
/// Shadow decryption result.