diff --git a/secret_store/src/listener/mod.rs b/secret_store/src/listener/mod.rs
index 403eaf549..a29ee5cfd 100644
--- a/secret_store/src/listener/mod.rs
+++ b/secret_store/src/listener/mod.rs
@@ -17,6 +17,7 @@
pub mod http_listener;
pub mod service_contract;
pub mod service_contract_listener;
+mod tasks_queue;
use std::collections::BTreeSet;
use std::sync::Arc;
diff --git a/secret_store/src/listener/service_contract_listener.rs b/secret_store/src/listener/service_contract_listener.rs
index 9637756dc..a6327ef3c 100644
--- a/secret_store/src/listener/service_contract_listener.rs
+++ b/secret_store/src/listener/service_contract_listener.rs
@@ -14,11 +14,11 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see .
-use std::collections::{VecDeque, HashSet};
+use std::collections::HashSet;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
-use parking_lot::{Mutex, Condvar};
+use parking_lot::Mutex;
use ethcore::client::ChainNotify;
use ethkey::{Random, Generator, Public, sign};
use bytes::Bytes;
@@ -29,6 +29,7 @@ use key_server_cluster::{ClusterClient, ClusterSessionsListener, ClusterSession}
use key_server_cluster::generation_session::SessionImpl as GenerationSession;
use key_storage::KeyStorage;
use listener::service_contract::ServiceContract;
+use listener::tasks_queue::TasksQueue;
use {ServerKeyId, NodeKeyPair, KeyServer};
/// Retry interval (in blocks). Every RETRY_INTERVAL_BLOCKS blocks each KeyServer reads pending requests from
@@ -75,7 +76,7 @@ struct ServiceContractListenerData {
/// Retry-related data.
pub retry_data: Mutex,
/// Service tasks queue.
- pub tasks_queue: Arc,
+ pub tasks_queue: Arc>,
/// Service contract.
pub contract: Arc,
/// Key server reference.
@@ -96,14 +97,6 @@ struct ServiceContractRetryData {
pub generated_keys: HashSet,
}
-/// Service tasks queue.
-struct TasksQueue {
- /// Service event.
- service_event: Condvar,
- /// Service tasks queue.
- service_tasks: Mutex>,
-}
-
/// Service task.
#[derive(Debug, Clone, PartialEq)]
pub enum ServiceTask {
@@ -130,7 +123,7 @@ impl ServiceContractListener {
key_server_set: params.key_server_set,
key_storage: params.key_storage,
});
- data.tasks_queue.push(::std::iter::once(ServiceTask::Retry));
+ data.tasks_queue.push(ServiceTask::Retry);
// we are not starting thread when in test mode
let service_handle = if cfg!(test) {
@@ -292,7 +285,7 @@ impl ServiceContractListener {
impl Drop for ServiceContractListener {
fn drop(&mut self) {
if let Some(service_handle) = self.service_handle.take() {
- self.data.tasks_queue.shutdown();
+ self.data.tasks_queue.push_front(ServiceTask::Shutdown);
// ignore error as we are already closing
let _ = service_handle.join();
}
@@ -310,7 +303,7 @@ impl ChainNotify for ServiceContractListener {
// it maybe inaccurate when switching syncing/synced states, but that's ok
let enacted_len = enacted.len();
if self.data.last_retry.fetch_add(enacted_len, Ordering::Relaxed) >= RETRY_INTERVAL_BLOCKS {
- self.data.tasks_queue.push(::std::iter::once(ServiceTask::Retry));
+ self.data.tasks_queue.push(ServiceTask::Retry);
self.data.last_retry.store(0, Ordering::Relaxed);
}
}
@@ -337,43 +330,6 @@ impl ClusterSessionsListener for ServiceContractListener {
}
}
-impl TasksQueue {
- /// Create new tasks queue.
- pub fn new() -> Self {
- TasksQueue {
- service_event: Condvar::new(),
- service_tasks: Mutex::new(VecDeque::new()),
- }
- }
-
- /// Shutdown tasks queue.
- pub fn shutdown(&self) {
- let mut service_tasks = self.service_tasks.lock();
- service_tasks.push_front(ServiceTask::Shutdown);
- self.service_event.notify_all();
- }
-
- //// Push new tasks to the queue.
- pub fn push(&self, tasks: I) where I: Iterator- {
- let mut service_tasks = self.service_tasks.lock();
- service_tasks.extend(tasks);
- if !service_tasks.is_empty() {
- self.service_event.notify_all();
- }
- }
-
- /// Wait for new task.
- pub fn wait(&self) -> ServiceTask {
- let mut service_tasks = self.service_tasks.lock();
- if service_tasks.is_empty() {
- self.service_event.wait(&mut service_tasks);
- }
-
- service_tasks.pop_front()
- .expect("service_event is only fired when there are new tasks or is_shutdown == true; is_shutdown == false; qed")
- }
-}
-
/// Returns true when session, related to `server_key_id` must be started on this KeyServer.
fn is_processed_by_this_key_server(key_server_set: &KeyServerSet, self_key_pair: &NodeKeyPair, server_key_id: &H256) -> bool {
let servers = key_server_set.get();
diff --git a/secret_store/src/listener/tasks_queue.rs b/secret_store/src/listener/tasks_queue.rs
new file mode 100644
index 000000000..f313c8431
--- /dev/null
+++ b/secret_store/src/listener/tasks_queue.rs
@@ -0,0 +1,62 @@
+// Copyright 2015-2017 Parity Technologies (UK) Ltd.
+// This file is part of Parity.
+
+// Parity is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Parity is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Parity. If not, see .
+
+use std::collections::VecDeque;
+use parking_lot::{Mutex, Condvar};
+
+#[derive(Default)]
+/// Service tasks queue.
+pub struct TasksQueue {
+ /// Service event.
+ service_event: Condvar,
+ /// Service tasks queue.
+ service_tasks: Mutex>,
+}
+
+impl TasksQueue {
+ /// Create new tasks queue.
+ pub fn new() -> Self {
+ TasksQueue {
+ service_event: Condvar::new(),
+ service_tasks: Mutex::new(VecDeque::new()),
+ }
+ }
+
+ /// Push task to the front of queue.
+ pub fn push_front(&self, task: Task) {
+ let mut service_tasks = self.service_tasks.lock();
+ service_tasks.push_front(task);
+ self.service_event.notify_all();
+ }
+
+ /// Push task to the back of queue.
+ pub fn push(&self, task: Task) {
+ let mut service_tasks = self.service_tasks.lock();
+ service_tasks.push_back(task);
+ self.service_event.notify_all();
+ }
+
+ /// Wait for new task.
+ pub fn wait(&self) -> Task {
+ let mut service_tasks = self.service_tasks.lock();
+ if service_tasks.is_empty() {
+ self.service_event.wait(&mut service_tasks);
+ }
+
+ service_tasks.pop_front()
+ .expect("service_event is only fired when there are new tasks or is_shutdown == true; is_shutdown == false; qed")
+ }
+}