From 5a7e065e41cde696045366e39eb4b0f644452b37 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 22 Nov 2017 10:05:14 +0300 Subject: [PATCH] SecretStore: Kovan flush3 --- secret_store/src/key_server.rs | 5 +- .../src/key_server_cluster/cluster.rs | 46 +++++++------ .../key_server_cluster/cluster_sessions.rs | 56 ++++++++++++++- secret_store/src/key_server_cluster/mod.rs | 1 + secret_store/src/lib.rs | 3 +- .../src/listener/service_contract_listener.rs | 69 ++++++++++++++----- 6 files changed, 140 insertions(+), 40 deletions(-) diff --git a/secret_store/src/key_server.rs b/secret_store/src/key_server.rs index 0d23b99c8..e9cd382d9 100644 --- a/secret_store/src/key_server.rs +++ b/secret_store/src/key_server.rs @@ -31,6 +31,10 @@ use traits::{AdminSessionsServer, ServerKeyGenerator, DocumentKeyServer, Message use types::all::{Error, Public, RequestSignature, ServerKeyId, EncryptedDocumentKey, EncryptedDocumentKeyShadow, ClusterConfiguration, MessageHash, EncryptedMessageSignature, NodeId}; use key_server_cluster::{ClusterClient, ClusterConfiguration as NetClusterConfiguration}; +use key_server_cluster::generation_session::Session as GenerationSession; +use key_server_cluster::encryption_session::Session as EncryptionSession; +use key_server_cluster::decryption_session::Session as DecryptionSession; +use key_server_cluster::signing_session::Session as SigningSession; /// Secret store key server implementation pub struct KeyServerImpl { @@ -53,7 +57,6 @@ impl KeyServerImpl { } /// Get cluster client reference. - #[cfg(test)] pub fn cluster(&self) -> Arc { self.data.lock().cluster.clone() } diff --git a/secret_store/src/key_server_cluster/cluster.rs b/secret_store/src/key_server_cluster/cluster.rs index 0a975c275..3d3724e13 100644 --- a/secret_store/src/key_server_cluster/cluster.rs +++ b/secret_store/src/key_server_cluster/cluster.rs @@ -31,17 +31,15 @@ use bigint::hash::H256; use key_server_cluster::{Error, NodeId, SessionId, AclStorage, KeyStorage, KeyServerSet, NodeKeyPair}; use key_server_cluster::cluster_sessions::{ClusterSession, ClusterSessions, GenerationSessionWrapper, EncryptionSessionWrapper, DecryptionSessionWrapper, SigningSessionWrapper, AdminSessionWrapper, KeyNegotiationSessionWrapper, SessionIdWithSubSession, - ClusterSessionsContainer, SERVERS_SET_CHANGE_SESSION_ID, create_cluster_view, AdminSessionCreationData}; + ClusterSessionsContainer, SERVERS_SET_CHANGE_SESSION_ID, create_cluster_view, AdminSessionCreationData, ClusterSessionsListener}; use key_server_cluster::cluster_sessions_creator::{ClusterSessionCreator, IntoSessionId}; use key_server_cluster::message::{self, Message, ClusterMessage}; -use key_server_cluster::generation_session::{Session as GenerationSession}; -#[cfg(test)] -use key_server_cluster::generation_session::SessionImpl as GenerationSessionImpl; -use key_server_cluster::decryption_session::{Session as DecryptionSession}; -use key_server_cluster::encryption_session::{Session as EncryptionSession}; -use key_server_cluster::signing_session::{Session as SigningSession}; -use key_server_cluster::key_version_negotiation_session::{Session as KeyVersionNegotiationSession, SessionImpl as KeyVersionNegotiationSessionImpl, - IsolatedSessionTransport as KeyVersionNegotiationSessionTransport, ContinueAction}; +use key_server_cluster::generation_session::{SessionImpl as GenerationSession, Session as GenerationSessionTrait}; +use key_server_cluster::decryption_session::{SessionImpl as DecryptionSession, Session as DecryptionSessionTrait}; +use key_server_cluster::encryption_session::{SessionImpl as EncryptionSession, Session as EncryptionSessionTrait}; +use key_server_cluster::signing_session::{SessionImpl as SigningSession, Session as SigningSessionTrait}; +use key_server_cluster::key_version_negotiation_session::{SessionImpl as KeyVersionNegotiationSession, + IsolatedSessionTransport as KeyVersionNegotiationSessionTransport, ContinueAction, Session as KeyVersionNegotiationSessionTrait}; use key_server_cluster::io::{DeadlineStatus, ReadMessage, SharedTcpStream, read_encrypted_message, WriteMessage, write_encrypted_message}; use key_server_cluster::net::{accept_connection as net_accept_connection, connect as net_connect, Connection as NetConnection}; @@ -74,16 +72,19 @@ pub trait ClusterClient: Send + Sync { /// Start new signing session. fn new_signing_session(&self, session_id: SessionId, requestor_signature: Signature, version: Option, message_hash: H256) -> Result, Error>; /// Start new key version negotiation session. - fn new_key_version_negotiation_session(&self, session_id: SessionId) -> Result, Error>; + fn new_key_version_negotiation_session(&self, session_id: SessionId) -> Result>, Error>; /// Start new servers set change session. fn new_servers_set_change_session(&self, session_id: Option, new_nodes_set: BTreeSet, old_set_signature: Signature, new_set_signature: Signature) -> Result, Error>; + /// Listen for new generation sessions. + fn add_generation_listener(&self, listener: Arc>); + /// Ask node to make 'faulty' generation sessions. #[cfg(test)] fn make_faulty_generation_sessions(&self); /// Get active generation session with given id. #[cfg(test)] - fn generation_session(&self, session_id: &SessionId) -> Option>; + fn generation_session(&self, session_id: &SessionId) -> Option>; /// Try connect to disconnected nodes. #[cfg(test)] fn connect(&self); @@ -446,7 +447,7 @@ impl ClusterCore { } /// Try to contnue session. - fn try_continue_session(data: &Arc, session: Option>>) { + fn try_continue_session(data: &Arc, session: Option>>) { if let Some(session) = session { let meta = session.meta(); let is_master_node = meta.self_node_id == meta.master_node_id; @@ -842,7 +843,7 @@ impl ClusterClientImpl { } } - fn create_key_version_negotiation_session(&self, session_id: SessionId) -> Result>, Error> { + fn create_key_version_negotiation_session(&self, session_id: SessionId) -> Result>, Error> { let mut connected_nodes = self.data.connections.connected_nodes(); connected_nodes.insert(self.data.self_key_pair.public().clone()); @@ -872,7 +873,7 @@ impl ClusterClient for ClusterClientImpl { let cluster = create_cluster_view(&self.data, true)?; let session = self.data.sessions.generation_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id, None, false, None)?; match session.initialize(author, threshold, connected_nodes) { - Ok(()) => Ok(GenerationSessionWrapper::new(Arc::downgrade(&self.data), session_id, session)), + Ok(()) => Ok(session), Err(error) => { self.data.sessions.generation_sessions.remove(&session.id()); Err(error) @@ -887,7 +888,7 @@ impl ClusterClient for ClusterClientImpl { let cluster = create_cluster_view(&self.data, true)?; let session = self.data.sessions.encryption_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id, None, false, None)?; match session.initialize(requestor_signature, common_point, encrypted_point) { - Ok(()) => Ok(EncryptionSessionWrapper::new(Arc::downgrade(&self.data), session_id, session)), + Ok(()) => Ok(session), Err(error) => { self.data.sessions.encryption_sessions.remove(&session.id()); Err(error) @@ -916,7 +917,7 @@ impl ClusterClient for ClusterClientImpl { }; match initialization_result { - Ok(()) => Ok(DecryptionSessionWrapper::new(Arc::downgrade(&self.data), session_id, session)), + Ok(()) => Ok(session), Err(error) => { self.data.sessions.decryption_sessions.remove(&session.id()); Err(error) @@ -945,7 +946,7 @@ impl ClusterClient for ClusterClientImpl { }; match initialization_result { - Ok(()) => Ok(SigningSessionWrapper::new(Arc::downgrade(&self.data), session_id, session)), + Ok(()) => Ok(session), Err(error) => { self.data.sessions.signing_sessions.remove(&session.id()); Err(error) @@ -953,9 +954,9 @@ impl ClusterClient for ClusterClientImpl { } } - fn new_key_version_negotiation_session(&self, session_id: SessionId) -> Result, Error> { + fn new_key_version_negotiation_session(&self, session_id: SessionId) -> Result>, Error> { let session = self.create_key_version_negotiation_session(session_id)?; - Ok(KeyNegotiationSessionWrapper::new(Arc::downgrade(&self.data), session.id(), session)) + Ok(session) } fn new_servers_set_change_session(&self, session_id: Option, new_nodes_set: BTreeSet, old_set_signature: Signature, new_set_signature: Signature) -> Result, Error> { @@ -982,6 +983,10 @@ impl ClusterClient for ClusterClientImpl { } } + fn add_generation_listener(&self, listener: Arc>) { + self.data.sessions.generation_sessions.add_listener(listener); + } + #[cfg(test)] fn connect(&self) { ClusterCore::connect_disconnected_nodes(self.data.clone()); @@ -993,7 +998,7 @@ impl ClusterClient for ClusterClientImpl { } #[cfg(test)] - fn generation_session(&self, session_id: &SessionId) -> Option> { + fn generation_session(&self, session_id: &SessionId) -> Option> { self.data.sessions.generation_sessions.get(session_id, false) } @@ -1021,6 +1026,7 @@ pub mod tests { use key_server_cluster::cluster::{Cluster, ClusterCore, ClusterConfiguration}; use key_server_cluster::cluster_sessions::ClusterSession; use key_server_cluster::generation_session::{Session as GenerationSession, SessionState as GenerationSessionState}; + use key_server_cluster::signing_session::Session as SigningSession; #[derive(Debug)] pub struct DummyCluster { diff --git a/secret_store/src/key_server_cluster/cluster_sessions.rs b/secret_store/src/key_server_cluster/cluster_sessions.rs index 254e3ecc6..ceec154da 100644 --- a/secret_store/src/key_server_cluster/cluster_sessions.rs +++ b/secret_store/src/key_server_cluster/cluster_sessions.rs @@ -120,12 +120,22 @@ pub struct ClusterSessions { creator_core: Arc, } +/// Active sessions container listener. +pub trait ClusterSessionsListener: Send + Sync { + /// When new session is inserted to the container. + fn on_session_inserted(&self, session: Arc); + /// When session is removed from the container. + fn on_session_removed(&self, session: Arc); +} + /// Active sessions container. pub struct ClusterSessionsContainer, D> { /// Sessions creator. pub creator: SC, /// Active sessions. sessions: RwLock>>, + /// Listeners. Lock order: sessions -> listeners. + listeners: Mutex>>>, /// Sessions container state. container_state: Arc>, /// Phantom data. @@ -294,11 +304,16 @@ impl ClusterSessionsContainer where S: ClusterSession, SC: C ClusterSessionsContainer { creator: creator, sessions: RwLock::new(BTreeMap::new()), + listeners: Mutex::new(Vec::new()), container_state: container_state, _pd: Default::default(), } } + pub fn add_listener(&self, listener: Arc>) { + self.listeners.lock().push(Arc::downgrade(&listener)); + } + pub fn is_empty(&self) -> bool { self.sessions.read().is_empty() } @@ -342,12 +357,51 @@ impl ClusterSessionsContainer where S: ClusterSession, SC: C queue: VecDeque::new(), }; sessions.insert(session_id, queued_session); + + // notify listeners + let mut listeners = self.listeners.lock(); + let mut listener_index = 0; + loop { + if listener_index >= listeners.len() { + break; + } + + match listeners[listener_index].upgrade() { + Some(listener) => { + listener.on_session_inserted(session.clone()); + listener_index += 1; + }, + None => { + listeners.swap_remove(listener_index); + }, + } + } + Ok(session) } pub fn remove(&self, session_id: &S::Id) { - if self.sessions.write().remove(session_id).is_some() { + if let Some(session) = self.sessions.write().remove(session_id) { self.container_state.lock().on_session_completed(); + + // notify listeners + let mut listeners = self.listeners.lock(); + let mut listener_index = 0; + loop { + if listener_index >= listeners.len() { + break; + } + + match listeners[listener_index].upgrade() { + Some(listener) => { + listener.on_session_removed(session.session.clone()); + listener_index += 1; + }, + None => { + listeners.swap_remove(listener_index); + }, + } + } } } diff --git a/secret_store/src/key_server_cluster/mod.rs b/secret_store/src/key_server_cluster/mod.rs index 81f9be647..8a2f777c0 100644 --- a/secret_store/src/key_server_cluster/mod.rs +++ b/secret_store/src/key_server_cluster/mod.rs @@ -27,6 +27,7 @@ pub use super::key_storage::{KeyStorage, DocumentKeyShare, DocumentKeyShareVersi pub use super::key_server_set::KeyServerSet; pub use super::serialization::{SerializableSignature, SerializableH256, SerializableSecret, SerializablePublic, SerializableMessageHash}; pub use self::cluster::{ClusterCore, ClusterConfiguration, ClusterClient}; +pub use self::cluster_sessions::{ClusterSession, ClusterSessionsListener}; pub use self::generation_session::Session as GenerationSession; pub use self::encryption_session::Session as EncryptionSession; pub use self::decryption_session::Session as DecryptionSession; diff --git a/secret_store/src/lib.rs b/secret_store/src/lib.rs index 34675b9c5..09d4ce774 100644 --- a/secret_store/src/lib.rs +++ b/secret_store/src/lib.rs @@ -80,11 +80,12 @@ pub fn start(client: Arc, sync: Arc, self_key_pair: Arc Some(listener::http_listener::KeyServerHttpListener::start(listener_address, key_server.clone())?), None => None, }; - let contract_listener = listener::service_contract_listener::ServiceContractListener::new(&client, &sync, key_server.clone(), self_key_pair, key_server_set); + let contract_listener = listener::service_contract_listener::ServiceContractListener::new(&client, &sync, key_server.clone(), cluster, self_key_pair, key_server_set); let listener = listener::Listener::new(key_server, http_listener, Some(contract_listener)); Ok(Box::new(listener)) } diff --git a/secret_store/src/listener/service_contract_listener.rs b/secret_store/src/listener/service_contract_listener.rs index 74e76b1c1..e3fc83674 100644 --- a/secret_store/src/listener/service_contract_listener.rs +++ b/secret_store/src/listener/service_contract_listener.rs @@ -31,6 +31,8 @@ use bigint::hash::H256; use bigint::prelude::U256; use util::Address; use key_server_set::KeyServerSet; +use key_server_cluster::{ClusterClient, ClusterSessionsListener, ClusterSession}; +use key_server_cluster::generation_session::{SessionImpl as GenerationSession, Session as GenerationSessionTrait}; use {ServerKeyId, NodeKeyPair, KeyServer}; /// Name of the SecretStore contract in the registry. @@ -108,15 +110,17 @@ enum ServiceTask { Retry, /// Generate server key (server_key_id, threshold). GenerateServerKey(H256, H256), + /// Confirm server key (server_key_id). + ConfirmServerKey(H256), /// Shutdown listener. Shutdown, } impl ServiceContractListener { - pub fn new(client: &Arc, sync: &Arc, key_server: Arc, self_key_pair: Arc, key_servers_set: Arc) -> Arc { + pub fn new(client: &Arc, sync: &Arc, key_server: Arc, cluster: Arc, self_key_pair: Arc, key_servers_set: Arc) -> Arc { let contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) .map(|a| { - trace!(target: "secretstore", "Installing service contract from address {}", a); + trace!(target: "secretstore", "{}: installing service contract from address {}", self_key_pair.public(), a); a }) .unwrap_or_default(); @@ -146,6 +150,7 @@ impl ServiceContractListener { service_handle: Some(service_handle), }); client.add_notify(contract.clone()); + cluster.add_generation_listener(contract.clone()); contract } @@ -186,7 +191,7 @@ impl ServiceContractListener { fn run_service_thread(data: Arc) { loop { let task = data.tasks_queue.wait(); - trace!(target: "secretstore", "Processing {:?} task", task); + trace!(target: "secretstore", "{}: processing {:?} task",data.self_key_pair.public(), task); match task { ServiceTask::Shutdown => break, @@ -204,28 +209,29 @@ impl ServiceContractListener { Self::retry_pending_requests(&data) .map(|processed_requests| { if processed_requests != 0 { - trace!(target: "secretstore", "Successfully retried {} pending requests", - processed_requests); + trace!(target: "secretstore", "{}: successfully retried {} pending requests", + data.self_key_pair.public(), processed_requests); } () }) .map_err(|error| { - warn!(target: "secretstore", "Retrying pending requests has failed with: {}", - error); + warn!(target: "secretstore", "{}: retrying pending requests has failed with: {}", + data.self_key_pair.public(), error); error }), + ServiceTask::ConfirmServerKey(_) => Err("not implemented".to_owned()), // TODO ServiceTask::GenerateServerKey(server_key_id, threshold) => { data.retry_data.lock().generated_keys.insert(server_key_id.clone()); Self::generate_server_key(&data, &server_key_id, &threshold) .and_then(|server_key| Self::publish_server_key(&data, &server_key_id, &server_key)) .map(|_| { - trace!(target: "secretstore", "GenerateServerKey({}, {}) request has completed", - server_key_id, threshold); + trace!(target: "secretstore", "{}: started processing GenerateServerKey({}, {}) request", + data.self_key_pair.public(), server_key_id, threshold); () }) .map_err(|error| { - warn!(target: "secretstore", "GenerateServerKey({}, {}) request has failed with: {}", - server_key_id, threshold, error); + warn!(target: "secretstore", "{}: failed to start processing GenerateServerKey({}, {}) request with: {}", + data.self_key_pair.public(), server_key_id, threshold, error); error }) }, @@ -263,19 +269,23 @@ impl ServiceContractListener { if is_confirmed { continue; } + // only process request, which haven't been processed recently // there could be a lag when we've just generated server key && retrying on the same block // (or before our tx is mined) - state is not updated yet if retry_data.generated_keys.contains(&server_key_id){ continue; } - // only process requests that are intended to be processed by this server - if !is_processed_by_this_key_server(&*data.key_servers_set, &*data.self_key_pair, &server_key_id) { - continue; - } // process request - match Self::process_service_task(data, ServiceTask::GenerateServerKey(server_key_id, threshold.into())) { + let is_own_request = is_processed_by_this_key_server(&*data.key_servers_set, &*data.self_key_pair, &server_key_id); + let request_result = Self::process_service_task(data, match is_own_request { + true => ServiceTask::GenerateServerKey(server_key_id, threshold.into()), + false => ServiceTask::ConfirmServerKey(server_key_id), + }); + + // process request result + match request_result { Ok(_) => processed_requests += 1, Err(_) => { failed_requests += 1; @@ -351,7 +361,7 @@ impl ChainNotify for ServiceContractListener { // update contract address from registry if let Some(service_contract_addr) = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) { if self.data.contract.read().address != service_contract_addr { - trace!(target: "secretstore", "Installing service contract from address {}", service_contract_addr); + trace!(target: "secretstore", "{}: installing service contract from address {}", self.data.self_key_pair.public(), service_contract_addr); *self.data.contract.write() = SecretStoreService::new(service_contract_addr.clone()); } @@ -370,6 +380,31 @@ impl ChainNotify for ServiceContractListener { } } +impl ClusterSessionsListener for ServiceContractListener { + fn on_session_inserted(&self, _session: Arc) { + } + + fn on_session_removed(&self, session: Arc) { + // TODO: only start if session started via the contract + // only publish when the session is started by another node + if !is_processed_by_this_key_server(&*self.data.key_servers_set, &*self.data.self_key_pair, &session.id()) { + session.wait(Some(Default::default())) + .map_err(|e| format!("{}", e)) + .and_then(|server_key| Self::publish_server_key(&self.data, &session.id(), &server_key)) + .map(|_| { + trace!(target: "secretstore", "{}: completed foreign GenerateServerKey({}) request", + self.data.self_key_pair.public(), session.id()); + () + }) + .map_err(|error| { + warn!(target: "secretstore", "{}: failed to process GenerateServerKey({}) request with: {}", + self.data.self_key_pair.public(), session.id(), error); + error + }); + } + } +} + impl TasksQueue { pub fn new() -> Self { TasksQueue {