Merge pull request #6624 from paritytech/secretstore_exclusive_sessions
SecretStore: exclusive sessions
This commit is contained in:
commit
d8094e0629
@ -532,6 +532,14 @@ impl Cluster for SessionKeyGenerationTransport {
|
|||||||
debug_assert!(self.other_nodes_ids.contains(to));
|
debug_assert!(self.other_nodes_ids.contains(to));
|
||||||
self.cluster.send(to, self.map_message(message)?)
|
self.cluster.send(to, self.map_message(message)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn is_connected(&self, node: &NodeId) -> bool {
|
||||||
|
self.cluster.is_connected(node)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn nodes(&self) -> BTreeSet<NodeId> {
|
||||||
|
self.cluster.nodes()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SessionCore {
|
impl SessionCore {
|
||||||
|
@ -97,6 +97,10 @@ pub trait Cluster: Send + Sync {
|
|||||||
fn broadcast(&self, message: Message) -> Result<(), Error>;
|
fn broadcast(&self, message: Message) -> Result<(), Error>;
|
||||||
/// Send message to given node.
|
/// Send message to given node.
|
||||||
fn send(&self, to: &NodeId, message: Message) -> Result<(), Error>;
|
fn send(&self, to: &NodeId, message: Message) -> Result<(), Error>;
|
||||||
|
/// Is connected to given node?
|
||||||
|
fn is_connected(&self, node: &NodeId) -> bool;
|
||||||
|
/// Get a set of connected nodes.
|
||||||
|
fn nodes(&self) -> BTreeSet<NodeId>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Cluster initialization parameters.
|
/// Cluster initialization parameters.
|
||||||
@ -1287,14 +1291,6 @@ impl ClusterView {
|
|||||||
})),
|
})),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_connected(&self, node: &NodeId) -> bool {
|
|
||||||
self.core.lock().nodes.contains(node)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn nodes(&self) -> BTreeSet<NodeId> {
|
|
||||||
self.core.lock().nodes.clone()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Cluster for ClusterView {
|
impl Cluster for ClusterView {
|
||||||
@ -1315,6 +1311,14 @@ impl Cluster for ClusterView {
|
|||||||
core.cluster.spawn(connection.send_message(message));
|
core.cluster.spawn(connection.send_message(message));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn is_connected(&self, node: &NodeId) -> bool {
|
||||||
|
self.core.lock().nodes.contains(node)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn nodes(&self) -> BTreeSet<NodeId> {
|
||||||
|
self.core.lock().nodes.clone()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ClusterClientImpl {
|
impl ClusterClientImpl {
|
||||||
@ -1460,7 +1464,7 @@ fn make_socket_address(address: &str, port: u16) -> Result<SocketAddr, Error> {
|
|||||||
pub mod tests {
|
pub mod tests {
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time;
|
use std::time;
|
||||||
use std::collections::VecDeque;
|
use std::collections::{BTreeSet, VecDeque};
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use tokio_core::reactor::Core;
|
use tokio_core::reactor::Core;
|
||||||
use ethkey::{Random, Generator, Public};
|
use ethkey::{Random, Generator, Public};
|
||||||
@ -1517,6 +1521,15 @@ pub mod tests {
|
|||||||
self.data.lock().messages.push_back((to.clone(), message));
|
self.data.lock().messages.push_back((to.clone(), message));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn is_connected(&self, node: &NodeId) -> bool {
|
||||||
|
let data = self.data.lock();
|
||||||
|
&self.id == node || data.nodes.contains(node)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn nodes(&self) -> BTreeSet<NodeId> {
|
||||||
|
self.data.lock().nodes.iter().cloned().collect()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn loop_until<F>(core: &mut Core, timeout: time::Duration, predicate: F) where F: Fn() -> bool {
|
pub fn loop_until<F>(core: &mut Core, timeout: time::Duration, predicate: F) where F: Fn() -> bool {
|
||||||
@ -1621,7 +1634,7 @@ pub mod tests {
|
|||||||
fn generation_session_completion_signalled_if_failed_on_master() {
|
fn generation_session_completion_signalled_if_failed_on_master() {
|
||||||
//::logger::init_log();
|
//::logger::init_log();
|
||||||
let mut core = Core::new().unwrap();
|
let mut core = Core::new().unwrap();
|
||||||
let clusters = make_clusters(&core, 6023, 3);
|
let clusters = make_clusters(&core, 6025, 3);
|
||||||
run_clusters(&clusters);
|
run_clusters(&clusters);
|
||||||
loop_until(&mut core, time::Duration::from_millis(300), || clusters.iter().all(all_connections_established));
|
loop_until(&mut core, time::Duration::from_millis(300), || clusters.iter().all(all_connections_established));
|
||||||
|
|
||||||
|
@ -18,10 +18,10 @@ use std::time;
|
|||||||
use std::sync::{Arc, Weak};
|
use std::sync::{Arc, Weak};
|
||||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||||
use std::collections::{VecDeque, BTreeSet, BTreeMap};
|
use std::collections::{VecDeque, BTreeSet, BTreeMap};
|
||||||
use parking_lot::RwLock;
|
use parking_lot::{Mutex, RwLock};
|
||||||
use ethkey::{Public, Secret, Signature};
|
use ethkey::{Public, Secret, Signature};
|
||||||
use key_server_cluster::{Error, NodeId, SessionId, AclStorage, KeyStorage, DocumentKeyShare, EncryptedDocumentKeyShadow, SessionMeta};
|
use key_server_cluster::{Error, NodeId, SessionId, AclStorage, KeyStorage, DocumentKeyShare, EncryptedDocumentKeyShadow, SessionMeta};
|
||||||
use key_server_cluster::cluster::{Cluster, ClusterData, ClusterView, ClusterConfiguration};
|
use key_server_cluster::cluster::{Cluster, ClusterData, ClusterConfiguration};
|
||||||
use key_server_cluster::message::{self, Message, GenerationMessage, EncryptionMessage, DecryptionMessage, SigningMessage,
|
use key_server_cluster::message::{self, Message, GenerationMessage, EncryptionMessage, DecryptionMessage, SigningMessage,
|
||||||
ShareAddMessage, ShareMoveMessage, ShareRemoveMessage, ServersSetChangeMessage};
|
ShareAddMessage, ShareMoveMessage, ShareRemoveMessage, ServersSetChangeMessage};
|
||||||
use key_server_cluster::generation_session::{Session as GenerationSession, SessionImpl as GenerationSessionImpl,
|
use key_server_cluster::generation_session::{Session as GenerationSession, SessionImpl as GenerationSessionImpl,
|
||||||
@ -50,7 +50,7 @@ const SESSION_TIMEOUT_INTERVAL: u64 = 60;
|
|||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
/// Servers set change session id (there could be at most 1 session => hardcoded id).
|
/// Servers set change session id (there could be at most 1 session => hardcoded id).
|
||||||
static ref SERVERS_SET_CHANGE_SESSION_ID: SessionId = "10b7af423bb551d5dc8645db754163a2145d37d78d468fa7330435ed77064c1c206f4b71d62491dfb9f7dbeccc42a6c112c8bb507de7b4fcad8d646272b2c363"
|
static ref SERVERS_SET_CHANGE_SESSION_ID: SessionId = "10b7af423bb551d5dc8645db754163a2145d37d78d468fa7330435ed77064c1c"
|
||||||
.parse()
|
.parse()
|
||||||
.expect("hardcoded id should parse without errors; qed");
|
.expect("hardcoded id should parse without errors; qed");
|
||||||
}
|
}
|
||||||
@ -119,6 +119,8 @@ pub struct ClusterSessions {
|
|||||||
pub struct ClusterSessionsContainer<K, V, M> {
|
pub struct ClusterSessionsContainer<K, V, M> {
|
||||||
/// Active sessions.
|
/// Active sessions.
|
||||||
pub sessions: RwLock<BTreeMap<K, QueuedSession<V, M>>>,
|
pub sessions: RwLock<BTreeMap<K, QueuedSession<V, M>>>,
|
||||||
|
/// Sessions container state.
|
||||||
|
container_state: Arc<Mutex<ClusterSessionsContainerState>>
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Session and its message queue.
|
/// Session and its message queue.
|
||||||
@ -126,7 +128,7 @@ pub struct QueuedSession<V, M> {
|
|||||||
/// Session master.
|
/// Session master.
|
||||||
pub master: NodeId,
|
pub master: NodeId,
|
||||||
/// Cluster view.
|
/// Cluster view.
|
||||||
pub cluster_view: Arc<ClusterView>,
|
pub cluster_view: Arc<Cluster>,
|
||||||
/// Last received message time.
|
/// Last received message time.
|
||||||
pub last_message_time: time::Instant,
|
pub last_message_time: time::Instant,
|
||||||
/// Generation session.
|
/// Generation session.
|
||||||
@ -135,6 +137,17 @@ pub struct QueuedSession<V, M> {
|
|||||||
pub queue: VecDeque<(NodeId, M)>,
|
pub queue: VecDeque<(NodeId, M)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Cluster sessions container state.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||||
|
pub enum ClusterSessionsContainerState {
|
||||||
|
/// There's no active sessions => any session can be started.
|
||||||
|
Idle,
|
||||||
|
/// There are active sessions => exclusive session can't be started right now.
|
||||||
|
Active(usize),
|
||||||
|
/// Exclusive session is active => can't start any other sessions.
|
||||||
|
Exclusive,
|
||||||
|
}
|
||||||
|
|
||||||
/// Generation session implementation, which removes session from cluster on drop.
|
/// Generation session implementation, which removes session from cluster on drop.
|
||||||
pub struct GenerationSessionWrapper {
|
pub struct GenerationSessionWrapper {
|
||||||
/// Wrapped session.
|
/// Wrapped session.
|
||||||
@ -188,17 +201,18 @@ pub struct AdminSessionWrapper {
|
|||||||
impl ClusterSessions {
|
impl ClusterSessions {
|
||||||
/// Create new cluster sessions container.
|
/// Create new cluster sessions container.
|
||||||
pub fn new(config: &ClusterConfiguration) -> Self {
|
pub fn new(config: &ClusterConfiguration) -> Self {
|
||||||
|
let container_state = Arc::new(Mutex::new(ClusterSessionsContainerState::Idle));
|
||||||
ClusterSessions {
|
ClusterSessions {
|
||||||
self_node_id: config.self_key_pair.public().clone(),
|
self_node_id: config.self_key_pair.public().clone(),
|
||||||
nodes: config.key_server_set.get().keys().cloned().collect(),
|
nodes: config.key_server_set.get().keys().cloned().collect(),
|
||||||
acl_storage: config.acl_storage.clone(),
|
acl_storage: config.acl_storage.clone(),
|
||||||
key_storage: config.key_storage.clone(),
|
key_storage: config.key_storage.clone(),
|
||||||
admin_public: config.admin_public.clone(),
|
admin_public: config.admin_public.clone(),
|
||||||
generation_sessions: ClusterSessionsContainer::new(),
|
generation_sessions: ClusterSessionsContainer::new(container_state.clone()),
|
||||||
encryption_sessions: ClusterSessionsContainer::new(),
|
encryption_sessions: ClusterSessionsContainer::new(container_state.clone()),
|
||||||
decryption_sessions: ClusterSessionsContainer::new(),
|
decryption_sessions: ClusterSessionsContainer::new(container_state.clone()),
|
||||||
signing_sessions: ClusterSessionsContainer::new(),
|
signing_sessions: ClusterSessionsContainer::new(container_state.clone()),
|
||||||
admin_sessions: ClusterSessionsContainer::new(),
|
admin_sessions: ClusterSessionsContainer::new(container_state),
|
||||||
make_faulty_generation_sessions: AtomicBool::new(false),
|
make_faulty_generation_sessions: AtomicBool::new(false),
|
||||||
session_counter: AtomicUsize::new(0),
|
session_counter: AtomicUsize::new(0),
|
||||||
max_nonce: RwLock::new(BTreeMap::new()),
|
max_nonce: RwLock::new(BTreeMap::new()),
|
||||||
@ -211,7 +225,7 @@ impl ClusterSessions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Create new generation session.
|
/// Create new generation session.
|
||||||
pub fn new_generation_session(&self, master: NodeId, session_id: SessionId, nonce: Option<u64>, cluster: Arc<ClusterView>) -> Result<Arc<GenerationSessionImpl>, Error> {
|
pub fn new_generation_session(&self, master: NodeId, session_id: SessionId, nonce: Option<u64>, cluster: Arc<Cluster>) -> Result<Arc<GenerationSessionImpl>, Error> {
|
||||||
// check that there's no finished encryption session with the same id
|
// check that there's no finished encryption session with the same id
|
||||||
if self.key_storage.contains(&session_id) {
|
if self.key_storage.contains(&session_id) {
|
||||||
return Err(Error::DuplicateSessionId);
|
return Err(Error::DuplicateSessionId);
|
||||||
@ -225,7 +239,7 @@ impl ClusterSessions {
|
|||||||
|
|
||||||
// check that there's no active encryption session with the same id
|
// check that there's no active encryption session with the same id
|
||||||
let nonce = self.check_session_nonce(&master, nonce)?;
|
let nonce = self.check_session_nonce(&master, nonce)?;
|
||||||
self.generation_sessions.insert(master, session_id, cluster.clone(), move ||
|
self.generation_sessions.insert(master, session_id, cluster.clone(), false, move ||
|
||||||
Ok(GenerationSessionImpl::new(GenerationSessionParams {
|
Ok(GenerationSessionImpl::new(GenerationSessionParams {
|
||||||
id: session_id.clone(),
|
id: session_id.clone(),
|
||||||
self_node_id: self.self_node_id.clone(),
|
self_node_id: self.self_node_id.clone(),
|
||||||
@ -254,11 +268,11 @@ impl ClusterSessions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Create new encryption session.
|
/// Create new encryption session.
|
||||||
pub fn new_encryption_session(&self, master: NodeId, session_id: SessionId, nonce: Option<u64>, cluster: Arc<ClusterView>) -> Result<Arc<EncryptionSessionImpl>, Error> {
|
pub fn new_encryption_session(&self, master: NodeId, session_id: SessionId, nonce: Option<u64>, cluster: Arc<Cluster>) -> Result<Arc<EncryptionSessionImpl>, Error> {
|
||||||
let encrypted_data = self.read_key_share(&session_id, &cluster)?;
|
let encrypted_data = self.read_key_share(&session_id, &cluster)?;
|
||||||
let nonce = self.check_session_nonce(&master, nonce)?;
|
let nonce = self.check_session_nonce(&master, nonce)?;
|
||||||
|
|
||||||
self.encryption_sessions.insert(master, session_id, cluster.clone(), move || EncryptionSessionImpl::new(EncryptionSessionParams {
|
self.encryption_sessions.insert(master, session_id, cluster.clone(), false, move || EncryptionSessionImpl::new(EncryptionSessionParams {
|
||||||
id: session_id.clone(),
|
id: session_id.clone(),
|
||||||
self_node_id: self.self_node_id.clone(),
|
self_node_id: self.self_node_id.clone(),
|
||||||
encrypted_data: encrypted_data,
|
encrypted_data: encrypted_data,
|
||||||
@ -281,12 +295,12 @@ impl ClusterSessions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Create new decryption session.
|
/// Create new decryption session.
|
||||||
pub fn new_decryption_session(&self, master: NodeId, session_id: SessionId, sub_session_id: Secret, nonce: Option<u64>, cluster: Arc<ClusterView>, requester_signature: Option<Signature>) -> Result<Arc<DecryptionSessionImpl>, Error> {
|
pub fn new_decryption_session(&self, master: NodeId, session_id: SessionId, sub_session_id: Secret, nonce: Option<u64>, cluster: Arc<Cluster>, requester_signature: Option<Signature>) -> Result<Arc<DecryptionSessionImpl>, Error> {
|
||||||
let session_id = DecryptionSessionId::new(session_id, sub_session_id);
|
let session_id = DecryptionSessionId::new(session_id, sub_session_id);
|
||||||
let encrypted_data = self.read_key_share(&session_id.id, &cluster)?;
|
let encrypted_data = self.read_key_share(&session_id.id, &cluster)?;
|
||||||
let nonce = self.check_session_nonce(&master, nonce)?;
|
let nonce = self.check_session_nonce(&master, nonce)?;
|
||||||
|
|
||||||
self.decryption_sessions.insert(master, session_id.clone(), cluster.clone(), move || DecryptionSessionImpl::new(DecryptionSessionParams {
|
self.decryption_sessions.insert(master, session_id.clone(), cluster.clone(), false, move || DecryptionSessionImpl::new(DecryptionSessionParams {
|
||||||
meta: SessionMeta {
|
meta: SessionMeta {
|
||||||
id: session_id.id,
|
id: session_id.id,
|
||||||
self_node_id: self.self_node_id.clone(),
|
self_node_id: self.self_node_id.clone(),
|
||||||
@ -320,12 +334,12 @@ impl ClusterSessions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Create new signing session.
|
/// Create new signing session.
|
||||||
pub fn new_signing_session(&self, master: NodeId, session_id: SessionId, sub_session_id: Secret, nonce: Option<u64>, cluster: Arc<ClusterView>, requester_signature: Option<Signature>) -> Result<Arc<SigningSessionImpl>, Error> {
|
pub fn new_signing_session(&self, master: NodeId, session_id: SessionId, sub_session_id: Secret, nonce: Option<u64>, cluster: Arc<Cluster>, requester_signature: Option<Signature>) -> Result<Arc<SigningSessionImpl>, Error> {
|
||||||
let session_id = SigningSessionId::new(session_id, sub_session_id);
|
let session_id = SigningSessionId::new(session_id, sub_session_id);
|
||||||
let encrypted_data = self.read_key_share(&session_id.id, &cluster)?;
|
let encrypted_data = self.read_key_share(&session_id.id, &cluster)?;
|
||||||
let nonce = self.check_session_nonce(&master, nonce)?;
|
let nonce = self.check_session_nonce(&master, nonce)?;
|
||||||
|
|
||||||
self.signing_sessions.insert(master, session_id.clone(), cluster.clone(), move || SigningSessionImpl::new(SigningSessionParams {
|
self.signing_sessions.insert(master, session_id.clone(), cluster.clone(), false, move || SigningSessionImpl::new(SigningSessionParams {
|
||||||
meta: SessionMeta {
|
meta: SessionMeta {
|
||||||
id: session_id.id,
|
id: session_id.id,
|
||||||
self_node_id: self.self_node_id.clone(),
|
self_node_id: self.self_node_id.clone(),
|
||||||
@ -359,11 +373,11 @@ impl ClusterSessions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Create new share add session.
|
/// Create new share add session.
|
||||||
pub fn new_share_add_session(&self, master: NodeId, session_id: SessionId, nonce: Option<u64>, cluster: Arc<ClusterView>) -> Result<Arc<AdminSession>, Error> {
|
pub fn new_share_add_session(&self, master: NodeId, session_id: SessionId, nonce: Option<u64>, cluster: Arc<Cluster>) -> Result<Arc<AdminSession>, Error> {
|
||||||
let nonce = self.check_session_nonce(&master, nonce)?;
|
let nonce = self.check_session_nonce(&master, nonce)?;
|
||||||
let admin_public = self.admin_public.clone().ok_or(Error::AccessDenied)?;
|
let admin_public = self.admin_public.clone().ok_or(Error::AccessDenied)?;
|
||||||
|
|
||||||
self.admin_sessions.insert(master, session_id.clone(), cluster.clone(), move || ShareAddSessionImpl::new(ShareAddSessionParams {
|
self.admin_sessions.insert(master, session_id.clone(), cluster.clone(), false, move || ShareAddSessionImpl::new(ShareAddSessionParams {
|
||||||
meta: ShareChangeSessionMeta {
|
meta: ShareChangeSessionMeta {
|
||||||
id: session_id,
|
id: session_id,
|
||||||
self_node_id: self.self_node_id.clone(),
|
self_node_id: self.self_node_id.clone(),
|
||||||
@ -389,11 +403,11 @@ impl ClusterSessions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Create new share move session.
|
/// Create new share move session.
|
||||||
pub fn new_share_move_session(&self, master: NodeId, session_id: SessionId, nonce: Option<u64>, cluster: Arc<ClusterView>) -> Result<Arc<AdminSession>, Error> {
|
pub fn new_share_move_session(&self, master: NodeId, session_id: SessionId, nonce: Option<u64>, cluster: Arc<Cluster>) -> Result<Arc<AdminSession>, Error> {
|
||||||
let nonce = self.check_session_nonce(&master, nonce)?;
|
let nonce = self.check_session_nonce(&master, nonce)?;
|
||||||
let admin_public = self.admin_public.clone().ok_or(Error::AccessDenied)?;
|
let admin_public = self.admin_public.clone().ok_or(Error::AccessDenied)?;
|
||||||
|
|
||||||
self.admin_sessions.insert(master, session_id.clone(), cluster.clone(), move || ShareMoveSessionImpl::new(ShareMoveSessionParams {
|
self.admin_sessions.insert(master, session_id.clone(), cluster.clone(), false, move || ShareMoveSessionImpl::new(ShareMoveSessionParams {
|
||||||
meta: ShareChangeSessionMeta {
|
meta: ShareChangeSessionMeta {
|
||||||
id: session_id,
|
id: session_id,
|
||||||
self_node_id: self.self_node_id.clone(),
|
self_node_id: self.self_node_id.clone(),
|
||||||
@ -419,11 +433,11 @@ impl ClusterSessions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Create new share remove session.
|
/// Create new share remove session.
|
||||||
pub fn new_share_remove_session(&self, master: NodeId, session_id: SessionId, nonce: Option<u64>, cluster: Arc<ClusterView>) -> Result<Arc<AdminSession>, Error> {
|
pub fn new_share_remove_session(&self, master: NodeId, session_id: SessionId, nonce: Option<u64>, cluster: Arc<Cluster>) -> Result<Arc<AdminSession>, Error> {
|
||||||
let nonce = self.check_session_nonce(&master, nonce)?;
|
let nonce = self.check_session_nonce(&master, nonce)?;
|
||||||
let admin_public = self.admin_public.clone().ok_or(Error::AccessDenied)?;
|
let admin_public = self.admin_public.clone().ok_or(Error::AccessDenied)?;
|
||||||
|
|
||||||
self.admin_sessions.insert(master, session_id.clone(), cluster.clone(), move || ShareRemoveSessionImpl::new(ShareRemoveSessionParams {
|
self.admin_sessions.insert(master, session_id.clone(), cluster.clone(), false, move || ShareRemoveSessionImpl::new(ShareRemoveSessionParams {
|
||||||
meta: ShareChangeSessionMeta {
|
meta: ShareChangeSessionMeta {
|
||||||
id: session_id,
|
id: session_id,
|
||||||
self_node_id: self.self_node_id.clone(),
|
self_node_id: self.self_node_id.clone(),
|
||||||
@ -449,8 +463,7 @@ impl ClusterSessions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Create new servers set change session.
|
/// Create new servers set change session.
|
||||||
pub fn new_servers_set_change_session(&self, master: NodeId, session_id: Option<SessionId>, nonce: Option<u64>, cluster: Arc<ClusterView>, all_nodes_set: BTreeSet<NodeId>) -> Result<Arc<AdminSession>, Error> {
|
pub fn new_servers_set_change_session(&self, master: NodeId, session_id: Option<SessionId>, nonce: Option<u64>, cluster: Arc<Cluster>, all_nodes_set: BTreeSet<NodeId>) -> Result<Arc<AdminSession>, Error> {
|
||||||
// TODO: check if there's no other active sessions + do not allow to start other sessions when this session is active
|
|
||||||
let session_id = match session_id {
|
let session_id = match session_id {
|
||||||
Some(session_id) => if session_id == *SERVERS_SET_CHANGE_SESSION_ID {
|
Some(session_id) => if session_id == *SERVERS_SET_CHANGE_SESSION_ID {
|
||||||
session_id
|
session_id
|
||||||
@ -462,7 +475,7 @@ impl ClusterSessions {
|
|||||||
let nonce = self.check_session_nonce(&master, nonce)?;
|
let nonce = self.check_session_nonce(&master, nonce)?;
|
||||||
let admin_public = self.admin_public.clone().ok_or(Error::AccessDenied)?;
|
let admin_public = self.admin_public.clone().ok_or(Error::AccessDenied)?;
|
||||||
|
|
||||||
self.admin_sessions.insert(master, session_id.clone(), cluster.clone(), move || ServersSetChangeSessionImpl::new(ServersSetChangeSessionParams {
|
self.admin_sessions.insert(master, session_id.clone(), cluster.clone(), true, move || ServersSetChangeSessionImpl::new(ServersSetChangeSessionParams {
|
||||||
meta: ShareChangeSessionMeta {
|
meta: ShareChangeSessionMeta {
|
||||||
id: session_id,
|
id: session_id,
|
||||||
self_node_id: self.self_node_id.clone(),
|
self_node_id: self.self_node_id.clone(),
|
||||||
@ -511,7 +524,7 @@ impl ClusterSessions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Read key share && remove disconnected nodes.
|
/// Read key share && remove disconnected nodes.
|
||||||
fn read_key_share(&self, key_id: &SessionId, cluster: &Arc<ClusterView>) -> Result<DocumentKeyShare, Error> {
|
fn read_key_share(&self, key_id: &SessionId, cluster: &Arc<Cluster>) -> Result<DocumentKeyShare, Error> {
|
||||||
let mut encrypted_data = self.key_storage.get(key_id).map_err(|e| Error::KeyStorage(e.into()))?;
|
let mut encrypted_data = self.key_storage.get(key_id).map_err(|e| Error::KeyStorage(e.into()))?;
|
||||||
|
|
||||||
// some of nodes, which were encrypting secret may be down
|
// some of nodes, which were encrypting secret may be down
|
||||||
@ -540,9 +553,10 @@ impl ClusterSessions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<K, V, M> ClusterSessionsContainer<K, V, M> where K: Clone + Ord, V: ClusterSession {
|
impl<K, V, M> ClusterSessionsContainer<K, V, M> where K: Clone + Ord, V: ClusterSession {
|
||||||
pub fn new() -> Self {
|
pub fn new(container_state: Arc<Mutex<ClusterSessionsContainerState>>) -> Self {
|
||||||
ClusterSessionsContainer {
|
ClusterSessionsContainer {
|
||||||
sessions: RwLock::new(BTreeMap::new()),
|
sessions: RwLock::new(BTreeMap::new()),
|
||||||
|
container_state: container_state,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -554,13 +568,18 @@ impl<K, V, M> ClusterSessionsContainer<K, V, M> where K: Clone + Ord, V: Cluster
|
|||||||
self.sessions.read().get(session_id).map(|s| s.session.clone())
|
self.sessions.read().get(session_id).map(|s| s.session.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn insert<F: FnOnce() -> Result<V, Error>>(&self, master: NodeId, session_id: K, cluster: Arc<ClusterView>, session: F) -> Result<Arc<V>, Error> {
|
pub fn insert<F: FnOnce() -> Result<V, Error>>(&self, master: NodeId, session_id: K, cluster: Arc<Cluster>, is_exclusive_session: bool, session: F) -> Result<Arc<V>, Error> {
|
||||||
let mut sessions = self.sessions.write();
|
let mut sessions = self.sessions.write();
|
||||||
if sessions.contains_key(&session_id) {
|
if sessions.contains_key(&session_id) {
|
||||||
return Err(Error::DuplicateSessionId);
|
return Err(Error::DuplicateSessionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// create session
|
||||||
let session = Arc::new(session()?);
|
let session = Arc::new(session()?);
|
||||||
|
// check if session can be started
|
||||||
|
self.container_state.lock().on_session_starting(is_exclusive_session)?;
|
||||||
|
|
||||||
|
// insert session
|
||||||
let queued_session = QueuedSession {
|
let queued_session = QueuedSession {
|
||||||
master: master,
|
master: master,
|
||||||
cluster_view: cluster,
|
cluster_view: cluster,
|
||||||
@ -573,7 +592,9 @@ impl<K, V, M> ClusterSessionsContainer<K, V, M> where K: Clone + Ord, V: Cluster
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn remove(&self, session_id: &K) {
|
pub fn remove(&self, session_id: &K) {
|
||||||
self.sessions.write().remove(session_id);
|
if self.sessions.write().remove(session_id).is_some() {
|
||||||
|
self.container_state.lock().on_session_completed();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn enqueue_message(&self, session_id: &K, sender: NodeId, message: M, is_queued_message: bool) {
|
pub fn enqueue_message(&self, session_id: &K, sender: NodeId, message: M, is_queued_message: bool) {
|
||||||
@ -621,6 +642,45 @@ impl<K, V, M> ClusterSessionsContainer<K, V, M> where K: Clone + Ord, V: Cluster
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ClusterSessionsContainerState {
|
||||||
|
/// When session is starting.
|
||||||
|
pub fn on_session_starting(&mut self, is_exclusive_session: bool) -> Result<(), Error> {
|
||||||
|
match *self {
|
||||||
|
ClusterSessionsContainerState::Idle if is_exclusive_session => {
|
||||||
|
::std::mem::replace(self, ClusterSessionsContainerState::Exclusive);
|
||||||
|
},
|
||||||
|
ClusterSessionsContainerState::Idle => {
|
||||||
|
::std::mem::replace(self, ClusterSessionsContainerState::Active(1));
|
||||||
|
},
|
||||||
|
ClusterSessionsContainerState::Active(_) if is_exclusive_session =>
|
||||||
|
return Err(Error::HasActiveSessions),
|
||||||
|
ClusterSessionsContainerState::Active(sessions_count) => {
|
||||||
|
::std::mem::replace(self, ClusterSessionsContainerState::Active(sessions_count + 1));
|
||||||
|
},
|
||||||
|
ClusterSessionsContainerState::Exclusive =>
|
||||||
|
return Err(Error::ExclusiveSessionActive),
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// When session is completed.
|
||||||
|
pub fn on_session_completed(&mut self) {
|
||||||
|
match *self {
|
||||||
|
ClusterSessionsContainerState::Idle =>
|
||||||
|
unreachable!("idle means that there are no active sessions; on_session_completed is only called once after active session is completed; qed"),
|
||||||
|
ClusterSessionsContainerState::Active(sessions_count) if sessions_count == 1 => {
|
||||||
|
::std::mem::replace(self, ClusterSessionsContainerState::Idle);
|
||||||
|
},
|
||||||
|
ClusterSessionsContainerState::Active(sessions_count) => {
|
||||||
|
::std::mem::replace(self, ClusterSessionsContainerState::Active(sessions_count - 1));
|
||||||
|
}
|
||||||
|
ClusterSessionsContainerState::Exclusive => {
|
||||||
|
::std::mem::replace(self, ClusterSessionsContainerState::Idle);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl AdminSession {
|
impl AdminSession {
|
||||||
pub fn as_share_add(&self) -> Option<&ShareAddSessionImpl<ShareAddTransport>> {
|
pub fn as_share_add(&self) -> Option<&ShareAddSessionImpl<ShareAddTransport>> {
|
||||||
match *self {
|
match *self {
|
||||||
@ -841,3 +901,53 @@ impl Drop for AdminSessionWrapper {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::collections::BTreeSet;
|
||||||
|
use ethkey::{Random, Generator};
|
||||||
|
use key_server_cluster::{Error, DummyAclStorage, DummyKeyStorage, MapKeyServerSet, PlainNodeKeyPair};
|
||||||
|
use key_server_cluster::cluster::ClusterConfiguration;
|
||||||
|
use key_server_cluster::cluster::tests::DummyCluster;
|
||||||
|
use super::ClusterSessions;
|
||||||
|
|
||||||
|
pub fn make_cluster_sessions() -> ClusterSessions {
|
||||||
|
let key_pair = Random.generate().unwrap();
|
||||||
|
let config = ClusterConfiguration {
|
||||||
|
threads: 1,
|
||||||
|
self_key_pair: Arc::new(PlainNodeKeyPair::new(key_pair.clone())),
|
||||||
|
listen_address: ("127.0.0.1".to_owned(), 100_u16),
|
||||||
|
key_server_set: Arc::new(MapKeyServerSet::new(vec![(key_pair.public().clone(), format!("127.0.0.1:{}", 100).parse().unwrap())].into_iter().collect())),
|
||||||
|
allow_connecting_to_higher_nodes: false,
|
||||||
|
key_storage: Arc::new(DummyKeyStorage::default()),
|
||||||
|
acl_storage: Arc::new(DummyAclStorage::default()),
|
||||||
|
admin_public: Some(Random.generate().unwrap().public().clone()),
|
||||||
|
};
|
||||||
|
ClusterSessions::new(&config)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cluster_session_cannot_be_started_if_exclusive_session_is_active() {
|
||||||
|
let sessions = make_cluster_sessions();
|
||||||
|
|
||||||
|
sessions.new_generation_session(Default::default(), Default::default(), Some(1), Arc::new(DummyCluster::new(sessions.self_node_id.clone()))).unwrap();
|
||||||
|
match sessions.new_servers_set_change_session(Default::default(), None, Some(1), Arc::new(DummyCluster::new(sessions.self_node_id.clone())), BTreeSet::new()) {
|
||||||
|
Err(Error::HasActiveSessions) => (),
|
||||||
|
Err(e) => unreachable!(format!("{}", e)),
|
||||||
|
Ok(_) => unreachable!("OK"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn exclusive_session_cannot_be_started_if_other_session_is_active() {
|
||||||
|
let sessions = make_cluster_sessions();
|
||||||
|
|
||||||
|
sessions.new_servers_set_change_session(Default::default(), None, Some(1), Arc::new(DummyCluster::new(sessions.self_node_id.clone())), BTreeSet::new()).unwrap();
|
||||||
|
match sessions.new_generation_session(Default::default(), Default::default(), Some(1), Arc::new(DummyCluster::new(sessions.self_node_id.clone()))) {
|
||||||
|
Err(Error::ExclusiveSessionActive) => (),
|
||||||
|
Err(e) => unreachable!(format!("{}", e)),
|
||||||
|
Ok(_) => unreachable!("OK"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -107,6 +107,10 @@ pub enum Error {
|
|||||||
ConsensusUnreachable,
|
ConsensusUnreachable,
|
||||||
/// Acl storage error.
|
/// Acl storage error.
|
||||||
AccessDenied,
|
AccessDenied,
|
||||||
|
/// Can't start session, because exclusive session is active.
|
||||||
|
ExclusiveSessionActive,
|
||||||
|
/// Can't start exclusive session, because there are other active sessions.
|
||||||
|
HasActiveSessions,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<ethkey::Error> for Error {
|
impl From<ethkey::Error> for Error {
|
||||||
@ -152,6 +156,8 @@ impl fmt::Display for Error {
|
|||||||
Error::KeyStorage(ref e) => write!(f, "key storage error {}", e),
|
Error::KeyStorage(ref e) => write!(f, "key storage error {}", e),
|
||||||
Error::ConsensusUnreachable => write!(f, "Consensus unreachable"),
|
Error::ConsensusUnreachable => write!(f, "Consensus unreachable"),
|
||||||
Error::AccessDenied => write!(f, "Access denied"),
|
Error::AccessDenied => write!(f, "Access denied"),
|
||||||
|
Error::ExclusiveSessionActive => write!(f, "Exclusive session active"),
|
||||||
|
Error::HasActiveSessions => write!(f, "Unable to start exclusive session"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -175,7 +181,6 @@ pub use self::client_sessions::decryption_session;
|
|||||||
pub use self::client_sessions::encryption_session;
|
pub use self::client_sessions::encryption_session;
|
||||||
pub use self::client_sessions::generation_session;
|
pub use self::client_sessions::generation_session;
|
||||||
pub use self::client_sessions::signing_session;
|
pub use self::client_sessions::signing_session;
|
||||||
|
|
||||||
mod cluster;
|
mod cluster;
|
||||||
mod cluster_sessions;
|
mod cluster_sessions;
|
||||||
mod io;
|
mod io;
|
||||||
|
Loading…
Reference in New Issue
Block a user