SecretStore: default ClusterSessionsListener impl

This commit is contained in:
Svyatoslav Nikolsky 2017-11-22 10:43:16 +03:00
parent fc7f3433b7
commit df3a8a9234
2 changed files with 17 additions and 5 deletions

View File

@ -120,9 +120,9 @@ pub struct ClusterSessions {
/// Active sessions container listener. /// Active sessions container listener.
pub trait ClusterSessionsListener<S: ClusterSession>: Send + Sync { pub trait ClusterSessionsListener<S: ClusterSession>: Send + Sync {
/// When new session is inserted to the container. /// When new session is inserted to the container.
fn on_session_inserted(&self, session: Arc<S>); fn on_session_inserted(&self, _session: Arc<S>) {}
/// When session is removed from the container. /// When session is removed from the container.
fn on_session_removed(&self, session: Arc<S>); fn on_session_removed(&self, _session: Arc<S>) {}
} }
/// Active sessions container. /// Active sessions container.

View File

@ -117,6 +117,7 @@ enum ServiceTask {
} }
impl ServiceContractListener { impl ServiceContractListener {
/// Create new service contract listener.
pub fn new(client: &Arc<Client>, sync: &Arc<SyncProvider>, key_server: Arc<KeyServer>, cluster: Arc<ClusterClient>, self_key_pair: Arc<NodeKeyPair>, key_servers_set: Arc<KeyServerSet>) -> Arc<ServiceContractListener> { pub fn new(client: &Arc<Client>, sync: &Arc<SyncProvider>, key_server: Arc<KeyServer>, cluster: Arc<ClusterClient>, self_key_pair: Arc<NodeKeyPair>, key_servers_set: Arc<KeyServerSet>) -> Arc<ServiceContractListener> {
let contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) let contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned())
.map(|a| { .map(|a| {
@ -154,6 +155,7 @@ impl ServiceContractListener {
contract contract
} }
/// Process incoming events of service contract.
fn process_service_contract_events(&self, client: &Client, service_contract: Address, blocks: Vec<H256>) { fn process_service_contract_events(&self, client: &Client, service_contract: Address, blocks: Vec<H256>) {
debug_assert!(!blocks.is_empty()); debug_assert!(!blocks.is_empty());
@ -188,6 +190,7 @@ impl ServiceContractListener {
})); }));
} }
/// Service thread procedure.
fn run_service_thread(data: Arc<ServiceContractListenerData>) { fn run_service_thread(data: Arc<ServiceContractListenerData>) {
loop { loop {
let task = data.tasks_queue.wait(); let task = data.tasks_queue.wait();
@ -203,6 +206,7 @@ impl ServiceContractListener {
} }
} }
/// Process single service task.
fn process_service_task(data: &Arc<ServiceContractListenerData>, task: ServiceTask) -> Result<(), String> { fn process_service_task(data: &Arc<ServiceContractListenerData>, task: ServiceTask) -> Result<(), String> {
match task { match task {
ServiceTask::Retry => ServiceTask::Retry =>
@ -239,6 +243,7 @@ impl ServiceContractListener {
} }
} }
/// Retry processing pending requests.
fn retry_pending_requests(data: &Arc<ServiceContractListenerData>) -> Result<usize, String> { fn retry_pending_requests(data: &Arc<ServiceContractListenerData>) -> Result<usize, String> {
let client = data.client.upgrade().ok_or("client is required".to_owned())?; let client = data.client.upgrade().ok_or("client is required".to_owned())?;
let retry_data = ::std::mem::replace(&mut *data.retry_data.lock(), Default::default()); let retry_data = ::std::mem::replace(&mut *data.retry_data.lock(), Default::default());
@ -298,6 +303,7 @@ impl ServiceContractListener {
Ok(processed_requests) Ok(processed_requests)
} }
/// Generate server key.
fn generate_server_key(data: &Arc<ServiceContractListenerData>, server_key_id: &ServerKeyId, threshold: &H256) -> Result<Public, String> { fn generate_server_key(data: &Arc<ServiceContractListenerData>, server_key_id: &ServerKeyId, threshold: &H256) -> Result<Public, String> {
let threshold_num = threshold.low_u64(); let threshold_num = threshold.low_u64();
if threshold != &threshold_num.into() || threshold_num >= ::std::usize::MAX as u64 { if threshold != &threshold_num.into() || threshold_num >= ::std::usize::MAX as u64 {
@ -313,6 +319,7 @@ impl ServiceContractListener {
.map_err(Into::into) .map_err(Into::into)
} }
/// Publish server key.
fn publish_server_key(data: &Arc<ServiceContractListenerData>, server_key_id: &ServerKeyId, server_key: &Public) -> Result<(), String> { fn publish_server_key(data: &Arc<ServiceContractListenerData>, server_key_id: &ServerKeyId, server_key: &Public) -> Result<(), String> {
let server_key_hash = keccak(server_key); let server_key_hash = keccak(server_key);
let signed_server_key = data.self_key_pair.sign(&server_key_hash).map_err(|e| format!("{}", e))?; let signed_server_key = data.self_key_pair.sign(&server_key_hash).map_err(|e| format!("{}", e))?;
@ -381,13 +388,14 @@ impl ChainNotify for ServiceContractListener {
} }
impl ClusterSessionsListener<GenerationSession> for ServiceContractListener { impl ClusterSessionsListener<GenerationSession> for ServiceContractListener {
fn on_session_inserted(&self, _session: Arc<GenerationSession>) {
}
fn on_session_removed(&self, session: Arc<GenerationSession>) { fn on_session_removed(&self, session: Arc<GenerationSession>) {
// TODO: only start if session started via the contract // TODO: only start if session started via the contract
// only publish when the session is started by another node // only publish when the session is started by another node
// when it is started by this node, it is published from process_service_task
if !is_processed_by_this_key_server(&*self.data.key_servers_set, &*self.data.self_key_pair, &session.id()) { if !is_processed_by_this_key_server(&*self.data.key_servers_set, &*self.data.self_key_pair, &session.id()) {
// by this time sesion must already be completed - either successfully, or not
debug_assert!(session.is_finished());
session.wait(Some(Default::default())) session.wait(Some(Default::default()))
.map_err(|e| format!("{}", e)) .map_err(|e| format!("{}", e))
.and_then(|server_key| Self::publish_server_key(&self.data, &session.id(), &server_key)) .and_then(|server_key| Self::publish_server_key(&self.data, &session.id(), &server_key))
@ -406,6 +414,7 @@ impl ClusterSessionsListener<GenerationSession> for ServiceContractListener {
} }
impl TasksQueue { impl TasksQueue {
/// Create new tasks queue.
pub fn new() -> Self { pub fn new() -> Self {
TasksQueue { TasksQueue {
service_event: Condvar::new(), service_event: Condvar::new(),
@ -413,12 +422,14 @@ impl TasksQueue {
} }
} }
/// Shutdown tasks queue.
pub fn shutdown(&self) { pub fn shutdown(&self) {
let mut service_tasks = self.service_tasks.lock(); let mut service_tasks = self.service_tasks.lock();
service_tasks.push_front(ServiceTask::Shutdown); service_tasks.push_front(ServiceTask::Shutdown);
self.service_event.notify_all(); self.service_event.notify_all();
} }
//// Push new tasks to the queue.
pub fn push<I>(&self, tasks: I) where I: Iterator<Item=ServiceTask> { pub fn push<I>(&self, tasks: I) where I: Iterator<Item=ServiceTask> {
let mut service_tasks = self.service_tasks.lock(); let mut service_tasks = self.service_tasks.lock();
service_tasks.extend(tasks); service_tasks.extend(tasks);
@ -427,6 +438,7 @@ impl TasksQueue {
} }
} }
/// Wait for new task.
pub fn wait(&self) -> ServiceTask { pub fn wait(&self) -> ServiceTask {
let mut service_tasks = self.service_tasks.lock(); let mut service_tasks = self.service_tasks.lock();
if service_tasks.is_empty() { if service_tasks.is_empty() {