From ee1ce425468db8020c95ea0eba6b23a9588894e2 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 20 Dec 2017 19:27:47 +0300 Subject: [PATCH] SecretStore: extracted TasksQueue to separate file --- secret_store/src/listener/mod.rs | 1 + .../src/listener/service_contract_listener.rs | 58 +++-------------- secret_store/src/listener/tasks_queue.rs | 62 +++++++++++++++++++ 3 files changed, 70 insertions(+), 51 deletions(-) create mode 100644 secret_store/src/listener/tasks_queue.rs diff --git a/secret_store/src/listener/mod.rs b/secret_store/src/listener/mod.rs index 403eaf549..a29ee5cfd 100644 --- a/secret_store/src/listener/mod.rs +++ b/secret_store/src/listener/mod.rs @@ -17,6 +17,7 @@ pub mod http_listener; pub mod service_contract; pub mod service_contract_listener; +mod tasks_queue; use std::collections::BTreeSet; use std::sync::Arc; diff --git a/secret_store/src/listener/service_contract_listener.rs b/secret_store/src/listener/service_contract_listener.rs index 9637756dc..a6327ef3c 100644 --- a/secret_store/src/listener/service_contract_listener.rs +++ b/secret_store/src/listener/service_contract_listener.rs @@ -14,11 +14,11 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::collections::{VecDeque, HashSet}; +use std::collections::HashSet; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::thread; -use parking_lot::{Mutex, Condvar}; +use parking_lot::Mutex; use ethcore::client::ChainNotify; use ethkey::{Random, Generator, Public, sign}; use bytes::Bytes; @@ -29,6 +29,7 @@ use key_server_cluster::{ClusterClient, ClusterSessionsListener, ClusterSession} use key_server_cluster::generation_session::SessionImpl as GenerationSession; use key_storage::KeyStorage; use listener::service_contract::ServiceContract; +use listener::tasks_queue::TasksQueue; use {ServerKeyId, NodeKeyPair, KeyServer}; /// Retry interval (in blocks). Every RETRY_INTERVAL_BLOCKS blocks each KeyServer reads pending requests from @@ -75,7 +76,7 @@ struct ServiceContractListenerData { /// Retry-related data. pub retry_data: Mutex, /// Service tasks queue. - pub tasks_queue: Arc, + pub tasks_queue: Arc>, /// Service contract. pub contract: Arc, /// Key server reference. @@ -96,14 +97,6 @@ struct ServiceContractRetryData { pub generated_keys: HashSet, } -/// Service tasks queue. -struct TasksQueue { - /// Service event. - service_event: Condvar, - /// Service tasks queue. - service_tasks: Mutex>, -} - /// Service task. #[derive(Debug, Clone, PartialEq)] pub enum ServiceTask { @@ -130,7 +123,7 @@ impl ServiceContractListener { key_server_set: params.key_server_set, key_storage: params.key_storage, }); - data.tasks_queue.push(::std::iter::once(ServiceTask::Retry)); + data.tasks_queue.push(ServiceTask::Retry); // we are not starting thread when in test mode let service_handle = if cfg!(test) { @@ -292,7 +285,7 @@ impl ServiceContractListener { impl Drop for ServiceContractListener { fn drop(&mut self) { if let Some(service_handle) = self.service_handle.take() { - self.data.tasks_queue.shutdown(); + self.data.tasks_queue.push_front(ServiceTask::Shutdown); // ignore error as we are already closing let _ = service_handle.join(); } @@ -310,7 +303,7 @@ impl ChainNotify for ServiceContractListener { // it maybe inaccurate when switching syncing/synced states, but that's ok let enacted_len = enacted.len(); if self.data.last_retry.fetch_add(enacted_len, Ordering::Relaxed) >= RETRY_INTERVAL_BLOCKS { - self.data.tasks_queue.push(::std::iter::once(ServiceTask::Retry)); + self.data.tasks_queue.push(ServiceTask::Retry); self.data.last_retry.store(0, Ordering::Relaxed); } } @@ -337,43 +330,6 @@ impl ClusterSessionsListener for ServiceContractListener { } } -impl TasksQueue { - /// Create new tasks queue. - pub fn new() -> Self { - TasksQueue { - service_event: Condvar::new(), - service_tasks: Mutex::new(VecDeque::new()), - } - } - - /// Shutdown tasks queue. - pub fn shutdown(&self) { - let mut service_tasks = self.service_tasks.lock(); - service_tasks.push_front(ServiceTask::Shutdown); - self.service_event.notify_all(); - } - - //// Push new tasks to the queue. - pub fn push(&self, tasks: I) where I: Iterator { - let mut service_tasks = self.service_tasks.lock(); - service_tasks.extend(tasks); - if !service_tasks.is_empty() { - self.service_event.notify_all(); - } - } - - /// Wait for new task. - pub fn wait(&self) -> ServiceTask { - let mut service_tasks = self.service_tasks.lock(); - if service_tasks.is_empty() { - self.service_event.wait(&mut service_tasks); - } - - service_tasks.pop_front() - .expect("service_event is only fired when there are new tasks or is_shutdown == true; is_shutdown == false; qed") - } -} - /// Returns true when session, related to `server_key_id` must be started on this KeyServer. fn is_processed_by_this_key_server(key_server_set: &KeyServerSet, self_key_pair: &NodeKeyPair, server_key_id: &H256) -> bool { let servers = key_server_set.get(); diff --git a/secret_store/src/listener/tasks_queue.rs b/secret_store/src/listener/tasks_queue.rs new file mode 100644 index 000000000..f313c8431 --- /dev/null +++ b/secret_store/src/listener/tasks_queue.rs @@ -0,0 +1,62 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::collections::VecDeque; +use parking_lot::{Mutex, Condvar}; + +#[derive(Default)] +/// Service tasks queue. +pub struct TasksQueue { + /// Service event. + service_event: Condvar, + /// Service tasks queue. + service_tasks: Mutex>, +} + +impl TasksQueue { + /// Create new tasks queue. + pub fn new() -> Self { + TasksQueue { + service_event: Condvar::new(), + service_tasks: Mutex::new(VecDeque::new()), + } + } + + /// Push task to the front of queue. + pub fn push_front(&self, task: Task) { + let mut service_tasks = self.service_tasks.lock(); + service_tasks.push_front(task); + self.service_event.notify_all(); + } + + /// Push task to the back of queue. + pub fn push(&self, task: Task) { + let mut service_tasks = self.service_tasks.lock(); + service_tasks.push_back(task); + self.service_event.notify_all(); + } + + /// Wait for new task. + pub fn wait(&self) -> Task { + let mut service_tasks = self.service_tasks.lock(); + if service_tasks.is_empty() { + self.service_event.wait(&mut service_tasks); + } + + service_tasks.pop_front() + .expect("service_event is only fired when there are new tasks or is_shutdown == true; is_shutdown == false; qed") + } +}