From df3a8a92348d18b5e28d472d6330cc35ef50501d Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 22 Nov 2017 10:43:16 +0300 Subject: [PATCH] SecretStore: default ClusterSessionsListener impl --- .../src/key_server_cluster/cluster_sessions.rs | 4 ++-- .../src/listener/service_contract_listener.rs | 18 +++++++++++++++--- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/secret_store/src/key_server_cluster/cluster_sessions.rs b/secret_store/src/key_server_cluster/cluster_sessions.rs index 7b1a9cfe6..2c1081c3b 100644 --- a/secret_store/src/key_server_cluster/cluster_sessions.rs +++ b/secret_store/src/key_server_cluster/cluster_sessions.rs @@ -120,9 +120,9 @@ pub struct ClusterSessions { /// Active sessions container listener. pub trait ClusterSessionsListener: Send + Sync { /// When new session is inserted to the container. - fn on_session_inserted(&self, session: Arc); + fn on_session_inserted(&self, _session: Arc) {} /// When session is removed from the container. - fn on_session_removed(&self, session: Arc); + fn on_session_removed(&self, _session: Arc) {} } /// Active sessions container. diff --git a/secret_store/src/listener/service_contract_listener.rs b/secret_store/src/listener/service_contract_listener.rs index dcf615744..d53be7daa 100644 --- a/secret_store/src/listener/service_contract_listener.rs +++ b/secret_store/src/listener/service_contract_listener.rs @@ -117,6 +117,7 @@ enum ServiceTask { } impl ServiceContractListener { + /// Create new service contract listener. pub fn new(client: &Arc, sync: &Arc, key_server: Arc, cluster: Arc, self_key_pair: Arc, key_servers_set: Arc) -> Arc { let contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) .map(|a| { @@ -154,6 +155,7 @@ impl ServiceContractListener { contract } + /// Process incoming events of service contract. fn process_service_contract_events(&self, client: &Client, service_contract: Address, blocks: Vec) { debug_assert!(!blocks.is_empty()); @@ -188,6 +190,7 @@ impl ServiceContractListener { })); } + /// Service thread procedure. fn run_service_thread(data: Arc) { loop { let task = data.tasks_queue.wait(); @@ -203,6 +206,7 @@ impl ServiceContractListener { } } + /// Process single service task. fn process_service_task(data: &Arc, task: ServiceTask) -> Result<(), String> { match task { ServiceTask::Retry => @@ -239,6 +243,7 @@ impl ServiceContractListener { } } + /// Retry processing pending requests. fn retry_pending_requests(data: &Arc) -> Result { let client = data.client.upgrade().ok_or("client is required".to_owned())?; let retry_data = ::std::mem::replace(&mut *data.retry_data.lock(), Default::default()); @@ -298,6 +303,7 @@ impl ServiceContractListener { Ok(processed_requests) } + /// Generate server key. fn generate_server_key(data: &Arc, server_key_id: &ServerKeyId, threshold: &H256) -> Result { let threshold_num = threshold.low_u64(); if threshold != &threshold_num.into() || threshold_num >= ::std::usize::MAX as u64 { @@ -313,6 +319,7 @@ impl ServiceContractListener { .map_err(Into::into) } + /// Publish server key. fn publish_server_key(data: &Arc, server_key_id: &ServerKeyId, server_key: &Public) -> Result<(), String> { let server_key_hash = keccak(server_key); let signed_server_key = data.self_key_pair.sign(&server_key_hash).map_err(|e| format!("{}", e))?; @@ -381,13 +388,14 @@ impl ChainNotify for ServiceContractListener { } impl ClusterSessionsListener for ServiceContractListener { - fn on_session_inserted(&self, _session: Arc) { - } - fn on_session_removed(&self, session: Arc) { // TODO: only start if session started via the contract + // only publish when the session is started by another node + // when it is started by this node, it is published from process_service_task if !is_processed_by_this_key_server(&*self.data.key_servers_set, &*self.data.self_key_pair, &session.id()) { + // by this time sesion must already be completed - either successfully, or not + debug_assert!(session.is_finished()); session.wait(Some(Default::default())) .map_err(|e| format!("{}", e)) .and_then(|server_key| Self::publish_server_key(&self.data, &session.id(), &server_key)) @@ -406,6 +414,7 @@ impl ClusterSessionsListener for ServiceContractListener { } impl TasksQueue { + /// Create new tasks queue. pub fn new() -> Self { TasksQueue { service_event: Condvar::new(), @@ -413,12 +422,14 @@ impl TasksQueue { } } + /// Shutdown tasks queue. pub fn shutdown(&self) { let mut service_tasks = self.service_tasks.lock(); service_tasks.push_front(ServiceTask::Shutdown); self.service_event.notify_all(); } + //// Push new tasks to the queue. pub fn push(&self, tasks: I) where I: Iterator { let mut service_tasks = self.service_tasks.lock(); service_tasks.extend(tasks); @@ -427,6 +438,7 @@ impl TasksQueue { } } + /// Wait for new task. pub fn wait(&self) -> ServiceTask { let mut service_tasks = self.service_tasks.lock(); if service_tasks.is_empty() {