SecretStore: ClusterSession::wait_session helper

This commit is contained in:
Svyatoslav Nikolsky 2017-12-20 19:11:37 +03:00
parent 6efca8860a
commit b10d567386
7 changed files with 27 additions and 54 deletions

View File

@ -198,13 +198,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
/// Wait for session completion. /// Wait for session completion.
pub fn wait(&self) -> Result<(H256, NodeId), Error> { pub fn wait(&self) -> Result<(H256, NodeId), Error> {
let mut data = self.data.lock(); Self::wait_session(&self.core.completed, &self.data, None, |data| data.result.clone())
if !data.result.is_some() {
self.core.completed.wait(&mut data);
}
data.result.clone()
.expect("checked above or waited for completed; completed is only signaled when result.is_some(); qed")
} }
/// Initialize session. /// Initialize session.

View File

@ -207,13 +207,7 @@ impl SessionImpl {
/// Wait for session completion. /// Wait for session completion.
pub fn wait(&self) -> Result<(), Error> { pub fn wait(&self) -> Result<(), Error> {
let mut data = self.data.lock(); Self::wait_session(&self.core.completed, &self.data, None, |data| data.result.clone())
if !data.result.is_some() {
self.core.completed.wait(&mut data);
}
data.result.clone()
.expect("checked above or waited for completed; completed is only signaled when result.is_some(); qed")
} }
/// Initialize servers set change session on master node. /// Initialize servers set change session on master node.

View File

@ -202,14 +202,7 @@ impl SessionImpl {
/// Wait for session completion. /// Wait for session completion.
pub fn wait(&self) -> Result<EncryptedDocumentKeyShadow, Error> { pub fn wait(&self) -> Result<EncryptedDocumentKeyShadow, Error> {
let mut data = self.data.lock(); Self::wait_session(&self.core.completed, &self.data, None, |data| data.result.clone())
if !data.result.is_some() {
self.core.completed.wait(&mut data);
}
data.result.as_ref()
.expect("checked above or waited for completed; completed is only signaled when result.is_some(); qed")
.clone()
} }
/// Delegate session to other node. /// Delegate session to other node.

View File

@ -132,17 +132,7 @@ impl SessionImpl {
/// Wait for session completion. /// Wait for session completion.
pub fn wait(&self, timeout: Option<time::Duration>) -> Result<(), Error> { pub fn wait(&self, timeout: Option<time::Duration>) -> Result<(), Error> {
let mut data = self.data.lock(); Self::wait_session(&self.completed, &self.data, timeout, |data| data.result.clone())
if !data.result.is_some() {
match timeout {
None => self.completed.wait(&mut data),
Some(timeout) => { self.completed.wait_for(&mut data, timeout); },
}
}
data.result.as_ref()
.expect("checked above or waited for completed; completed is only signaled when result.is_some(); qed")
.clone()
} }

View File

@ -223,17 +223,8 @@ impl SessionImpl {
/// Wait for session completion. /// Wait for session completion.
pub fn wait(&self, timeout: Option<time::Duration>) -> Result<Public, Error> { pub fn wait(&self, timeout: Option<time::Duration>) -> Result<Public, Error> {
let mut data = self.data.lock(); Self::wait_session(&self.completed, &self.data, timeout, |data| data.joint_public_and_secret.clone()
if !data.joint_public_and_secret.is_some() { .map(|r| r.map(|r| r.0.clone())))
match timeout {
None => self.completed.wait(&mut data),
Some(timeout) => { self.completed.wait_for(&mut data, timeout); },
}
}
data.joint_public_and_secret.clone()
.expect("checked above or waited for completed; completed is only signaled when joint_public.is_some(); qed")
.map(|p| p.0)
} }
/// Get generated public and secret (if any). /// Get generated public and secret (if any).

View File

@ -207,14 +207,7 @@ impl SessionImpl {
/// Wait for session completion. /// Wait for session completion.
pub fn wait(&self) -> Result<(Secret, Secret), Error> { pub fn wait(&self) -> Result<(Secret, Secret), Error> {
let mut data = self.data.lock(); Self::wait_session(&self.core.completed, &self.data, None, |data| data.result.clone())
if !data.result.is_some() {
self.core.completed.wait(&mut data);
}
data.result.as_ref()
.expect("checked above or waited for completed; completed is only signaled when result.is_some(); qed")
.clone()
} }
/// Delegate session to other node. /// Delegate session to other node.

View File

@ -18,7 +18,7 @@ use std::time;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::collections::{VecDeque, BTreeMap}; use std::collections::{VecDeque, BTreeMap};
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock, Condvar};
use bigint::hash::H256; use bigint::hash::H256;
use ethkey::{Secret, Signature}; use ethkey::{Secret, Signature};
use key_server_cluster::{Error, NodeId, SessionId}; use key_server_cluster::{Error, NodeId, SessionId};
@ -79,6 +79,25 @@ pub trait ClusterSession {
fn on_session_error(&self, sender: &NodeId, error: Error); fn on_session_error(&self, sender: &NodeId, error: Error);
/// Process session message. /// Process session message.
fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error>; fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error>;
/// 'Wait for session completion' helper.
fn wait_session<T, U, F: Fn(&U) -> Option<Result<T, Error>>>(completion_event: &Condvar, session_data: &Mutex<U>, timeout: Option<time::Duration>, result_reader: F) -> Result<T, Error> {
let mut locked_data = session_data.lock();
match result_reader(&locked_data) {
Some(result) => result,
None => {
match timeout {
None => completion_event.wait(&mut locked_data),
Some(timeout) => {
completion_event.wait_for(&mut locked_data, timeout);
},
}
result_reader(&locked_data)
.expect("waited for completion; completion is only signaled when result.is_some(); qed")
},
}
}
} }
/// Administrative session. /// Administrative session.
@ -529,7 +548,6 @@ pub fn create_cluster_view(data: &Arc<ClusterData>, requires_all_connections: bo
Ok(Arc::new(ClusterView::new(data.clone(), connected_nodes))) Ok(Arc::new(ClusterView::new(data.clone(), connected_nodes)))
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::Arc; use std::sync::Arc;