SecretStore: extracted TasksQueue to separate file

This commit is contained in:
Svyatoslav Nikolsky 2017-12-20 19:27:47 +03:00
parent b10d567386
commit ee1ce42546
3 changed files with 70 additions and 51 deletions

View File

@ -17,6 +17,7 @@
pub mod http_listener; pub mod http_listener;
pub mod service_contract; pub mod service_contract;
pub mod service_contract_listener; pub mod service_contract_listener;
mod tasks_queue;
use std::collections::BTreeSet; use std::collections::BTreeSet;
use std::sync::Arc; use std::sync::Arc;

View File

@ -14,11 +14,11 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{VecDeque, HashSet}; use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread; use std::thread;
use parking_lot::{Mutex, Condvar}; use parking_lot::Mutex;
use ethcore::client::ChainNotify; use ethcore::client::ChainNotify;
use ethkey::{Random, Generator, Public, sign}; use ethkey::{Random, Generator, Public, sign};
use bytes::Bytes; use bytes::Bytes;
@ -29,6 +29,7 @@ use key_server_cluster::{ClusterClient, ClusterSessionsListener, ClusterSession}
use key_server_cluster::generation_session::SessionImpl as GenerationSession; use key_server_cluster::generation_session::SessionImpl as GenerationSession;
use key_storage::KeyStorage; use key_storage::KeyStorage;
use listener::service_contract::ServiceContract; use listener::service_contract::ServiceContract;
use listener::tasks_queue::TasksQueue;
use {ServerKeyId, NodeKeyPair, KeyServer}; use {ServerKeyId, NodeKeyPair, KeyServer};
/// Retry interval (in blocks). Every RETRY_INTERVAL_BLOCKS blocks each KeyServer reads pending requests from /// Retry interval (in blocks). Every RETRY_INTERVAL_BLOCKS blocks each KeyServer reads pending requests from
@ -75,7 +76,7 @@ struct ServiceContractListenerData {
/// Retry-related data. /// Retry-related data.
pub retry_data: Mutex<ServiceContractRetryData>, pub retry_data: Mutex<ServiceContractRetryData>,
/// Service tasks queue. /// Service tasks queue.
pub tasks_queue: Arc<TasksQueue>, pub tasks_queue: Arc<TasksQueue<ServiceTask>>,
/// Service contract. /// Service contract.
pub contract: Arc<ServiceContract>, pub contract: Arc<ServiceContract>,
/// Key server reference. /// Key server reference.
@ -96,14 +97,6 @@ struct ServiceContractRetryData {
pub generated_keys: HashSet<ServerKeyId>, pub generated_keys: HashSet<ServerKeyId>,
} }
/// Service tasks queue.
struct TasksQueue {
/// Service event.
service_event: Condvar,
/// Service tasks queue.
service_tasks: Mutex<VecDeque<ServiceTask>>,
}
/// Service task. /// Service task.
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
pub enum ServiceTask { pub enum ServiceTask {
@ -130,7 +123,7 @@ impl ServiceContractListener {
key_server_set: params.key_server_set, key_server_set: params.key_server_set,
key_storage: params.key_storage, key_storage: params.key_storage,
}); });
data.tasks_queue.push(::std::iter::once(ServiceTask::Retry)); data.tasks_queue.push(ServiceTask::Retry);
// we are not starting thread when in test mode // we are not starting thread when in test mode
let service_handle = if cfg!(test) { let service_handle = if cfg!(test) {
@ -292,7 +285,7 @@ impl ServiceContractListener {
impl Drop for ServiceContractListener { impl Drop for ServiceContractListener {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(service_handle) = self.service_handle.take() { if let Some(service_handle) = self.service_handle.take() {
self.data.tasks_queue.shutdown(); self.data.tasks_queue.push_front(ServiceTask::Shutdown);
// ignore error as we are already closing // ignore error as we are already closing
let _ = service_handle.join(); let _ = service_handle.join();
} }
@ -310,7 +303,7 @@ impl ChainNotify for ServiceContractListener {
// it maybe inaccurate when switching syncing/synced states, but that's ok // it maybe inaccurate when switching syncing/synced states, but that's ok
let enacted_len = enacted.len(); let enacted_len = enacted.len();
if self.data.last_retry.fetch_add(enacted_len, Ordering::Relaxed) >= RETRY_INTERVAL_BLOCKS { if self.data.last_retry.fetch_add(enacted_len, Ordering::Relaxed) >= RETRY_INTERVAL_BLOCKS {
self.data.tasks_queue.push(::std::iter::once(ServiceTask::Retry)); self.data.tasks_queue.push(ServiceTask::Retry);
self.data.last_retry.store(0, Ordering::Relaxed); self.data.last_retry.store(0, Ordering::Relaxed);
} }
} }
@ -337,43 +330,6 @@ impl ClusterSessionsListener<GenerationSession> for ServiceContractListener {
} }
} }
impl TasksQueue {
/// Create new tasks queue.
pub fn new() -> Self {
TasksQueue {
service_event: Condvar::new(),
service_tasks: Mutex::new(VecDeque::new()),
}
}
/// Shutdown tasks queue.
pub fn shutdown(&self) {
let mut service_tasks = self.service_tasks.lock();
service_tasks.push_front(ServiceTask::Shutdown);
self.service_event.notify_all();
}
//// Push new tasks to the queue.
pub fn push<I>(&self, tasks: I) where I: Iterator<Item=ServiceTask> {
let mut service_tasks = self.service_tasks.lock();
service_tasks.extend(tasks);
if !service_tasks.is_empty() {
self.service_event.notify_all();
}
}
/// Wait for new task.
pub fn wait(&self) -> ServiceTask {
let mut service_tasks = self.service_tasks.lock();
if service_tasks.is_empty() {
self.service_event.wait(&mut service_tasks);
}
service_tasks.pop_front()
.expect("service_event is only fired when there are new tasks or is_shutdown == true; is_shutdown == false; qed")
}
}
/// Returns true when session, related to `server_key_id` must be started on this KeyServer. /// Returns true when session, related to `server_key_id` must be started on this KeyServer.
fn is_processed_by_this_key_server(key_server_set: &KeyServerSet, self_key_pair: &NodeKeyPair, server_key_id: &H256) -> bool { fn is_processed_by_this_key_server(key_server_set: &KeyServerSet, self_key_pair: &NodeKeyPair, server_key_id: &H256) -> bool {
let servers = key_server_set.get(); let servers = key_server_set.get();

View File

@ -0,0 +1,62 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::VecDeque;
use parking_lot::{Mutex, Condvar};
#[derive(Default)]
/// Service tasks queue.
pub struct TasksQueue<Task> {
/// Service event.
service_event: Condvar,
/// Service tasks queue.
service_tasks: Mutex<VecDeque<Task>>,
}
impl<Task> TasksQueue<Task> {
/// Create new tasks queue.
pub fn new() -> Self {
TasksQueue {
service_event: Condvar::new(),
service_tasks: Mutex::new(VecDeque::new()),
}
}
/// Push task to the front of queue.
pub fn push_front(&self, task: Task) {
let mut service_tasks = self.service_tasks.lock();
service_tasks.push_front(task);
self.service_event.notify_all();
}
/// Push task to the back of queue.
pub fn push(&self, task: Task) {
let mut service_tasks = self.service_tasks.lock();
service_tasks.push_back(task);
self.service_event.notify_all();
}
/// Wait for new task.
pub fn wait(&self) -> Task {
let mut service_tasks = self.service_tasks.lock();
if service_tasks.is_empty() {
self.service_event.wait(&mut service_tasks);
}
service_tasks.pop_front()
.expect("service_event is only fired when there are new tasks or is_shutdown == true; is_shutdown == false; qed")
}
}