diff --git a/secret_store/src/key_server_cluster/client_sessions/signing_session.rs b/secret_store/src/key_server_cluster/client_sessions/signing_session.rs index d094e6516..a111d4472 100644 --- a/secret_store/src/key_server_cluster/client_sessions/signing_session.rs +++ b/secret_store/src/key_server_cluster/client_sessions/signing_session.rs @@ -532,6 +532,14 @@ impl Cluster for SessionKeyGenerationTransport { debug_assert!(self.other_nodes_ids.contains(to)); self.cluster.send(to, self.map_message(message)?) } + + fn is_connected(&self, node: &NodeId) -> bool { + self.cluster.is_connected(node) + } + + fn nodes(&self) -> BTreeSet { + self.cluster.nodes() + } } impl SessionCore { diff --git a/secret_store/src/key_server_cluster/cluster.rs b/secret_store/src/key_server_cluster/cluster.rs index fab5c16fd..74464a6a9 100644 --- a/secret_store/src/key_server_cluster/cluster.rs +++ b/secret_store/src/key_server_cluster/cluster.rs @@ -97,6 +97,10 @@ pub trait Cluster: Send + Sync { fn broadcast(&self, message: Message) -> Result<(), Error>; /// Send message to given node. fn send(&self, to: &NodeId, message: Message) -> Result<(), Error>; + /// Is connected to given node? + fn is_connected(&self, node: &NodeId) -> bool; + /// Get a set of connected nodes. + fn nodes(&self) -> BTreeSet; } /// Cluster initialization parameters. @@ -1287,14 +1291,6 @@ impl ClusterView { })), } } - - pub fn is_connected(&self, node: &NodeId) -> bool { - self.core.lock().nodes.contains(node) - } - - pub fn nodes(&self) -> BTreeSet { - self.core.lock().nodes.clone() - } } impl Cluster for ClusterView { @@ -1315,6 +1311,14 @@ impl Cluster for ClusterView { core.cluster.spawn(connection.send_message(message)); Ok(()) } + + fn is_connected(&self, node: &NodeId) -> bool { + self.core.lock().nodes.contains(node) + } + + fn nodes(&self) -> BTreeSet { + self.core.lock().nodes.clone() + } } impl ClusterClientImpl { @@ -1460,7 +1464,7 @@ fn make_socket_address(address: &str, port: u16) -> Result { pub mod tests { use std::sync::Arc; use std::time; - use std::collections::VecDeque; + use std::collections::{BTreeSet, VecDeque}; use parking_lot::Mutex; use tokio_core::reactor::Core; use ethkey::{Random, Generator, Public}; @@ -1517,6 +1521,15 @@ pub mod tests { self.data.lock().messages.push_back((to.clone(), message)); Ok(()) } + + fn is_connected(&self, node: &NodeId) -> bool { + let data = self.data.lock(); + &self.id == node || data.nodes.contains(node) + } + + fn nodes(&self) -> BTreeSet { + self.data.lock().nodes.iter().cloned().collect() + } } pub fn loop_until(core: &mut Core, timeout: time::Duration, predicate: F) where F: Fn() -> bool { @@ -1621,7 +1634,7 @@ pub mod tests { fn generation_session_completion_signalled_if_failed_on_master() { //::logger::init_log(); let mut core = Core::new().unwrap(); - let clusters = make_clusters(&core, 6023, 3); + let clusters = make_clusters(&core, 6025, 3); run_clusters(&clusters); loop_until(&mut core, time::Duration::from_millis(300), || clusters.iter().all(all_connections_established)); diff --git a/secret_store/src/key_server_cluster/cluster_sessions.rs b/secret_store/src/key_server_cluster/cluster_sessions.rs index cfc00241d..aa6244dbc 100644 --- a/secret_store/src/key_server_cluster/cluster_sessions.rs +++ b/secret_store/src/key_server_cluster/cluster_sessions.rs @@ -18,10 +18,10 @@ use std::time; use std::sync::{Arc, Weak}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::collections::{VecDeque, BTreeSet, BTreeMap}; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use ethkey::{Public, Secret, Signature}; use key_server_cluster::{Error, NodeId, SessionId, AclStorage, KeyStorage, DocumentKeyShare, EncryptedDocumentKeyShadow, SessionMeta}; -use key_server_cluster::cluster::{Cluster, ClusterData, ClusterView, ClusterConfiguration}; +use key_server_cluster::cluster::{Cluster, ClusterData, ClusterConfiguration}; use key_server_cluster::message::{self, Message, GenerationMessage, EncryptionMessage, DecryptionMessage, SigningMessage, ShareAddMessage, ShareMoveMessage, ShareRemoveMessage, ServersSetChangeMessage}; use key_server_cluster::generation_session::{Session as GenerationSession, SessionImpl as GenerationSessionImpl, @@ -50,7 +50,7 @@ const SESSION_TIMEOUT_INTERVAL: u64 = 60; lazy_static! { /// Servers set change session id (there could be at most 1 session => hardcoded id). - static ref SERVERS_SET_CHANGE_SESSION_ID: SessionId = "10b7af423bb551d5dc8645db754163a2145d37d78d468fa7330435ed77064c1c206f4b71d62491dfb9f7dbeccc42a6c112c8bb507de7b4fcad8d646272b2c363" + static ref SERVERS_SET_CHANGE_SESSION_ID: SessionId = "10b7af423bb551d5dc8645db754163a2145d37d78d468fa7330435ed77064c1c" .parse() .expect("hardcoded id should parse without errors; qed"); } @@ -119,6 +119,8 @@ pub struct ClusterSessions { pub struct ClusterSessionsContainer { /// Active sessions. pub sessions: RwLock>>, + /// Sessions container state. + container_state: Arc> } /// Session and its message queue. @@ -126,7 +128,7 @@ pub struct QueuedSession { /// Session master. pub master: NodeId, /// Cluster view. - pub cluster_view: Arc, + pub cluster_view: Arc, /// Last received message time. pub last_message_time: time::Instant, /// Generation session. @@ -135,6 +137,17 @@ pub struct QueuedSession { pub queue: VecDeque<(NodeId, M)>, } +/// Cluster sessions container state. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum ClusterSessionsContainerState { + /// There's no active sessions => any session can be started. + Idle, + /// There are active sessions => exclusive session can't be started right now. + Active(usize), + /// Exclusive session is active => can't start any other sessions. + Exclusive, +} + /// Generation session implementation, which removes session from cluster on drop. pub struct GenerationSessionWrapper { /// Wrapped session. @@ -188,17 +201,18 @@ pub struct AdminSessionWrapper { impl ClusterSessions { /// Create new cluster sessions container. pub fn new(config: &ClusterConfiguration) -> Self { + let container_state = Arc::new(Mutex::new(ClusterSessionsContainerState::Idle)); ClusterSessions { self_node_id: config.self_key_pair.public().clone(), nodes: config.key_server_set.get().keys().cloned().collect(), acl_storage: config.acl_storage.clone(), key_storage: config.key_storage.clone(), admin_public: config.admin_public.clone(), - generation_sessions: ClusterSessionsContainer::new(), - encryption_sessions: ClusterSessionsContainer::new(), - decryption_sessions: ClusterSessionsContainer::new(), - signing_sessions: ClusterSessionsContainer::new(), - admin_sessions: ClusterSessionsContainer::new(), + generation_sessions: ClusterSessionsContainer::new(container_state.clone()), + encryption_sessions: ClusterSessionsContainer::new(container_state.clone()), + decryption_sessions: ClusterSessionsContainer::new(container_state.clone()), + signing_sessions: ClusterSessionsContainer::new(container_state.clone()), + admin_sessions: ClusterSessionsContainer::new(container_state), make_faulty_generation_sessions: AtomicBool::new(false), session_counter: AtomicUsize::new(0), max_nonce: RwLock::new(BTreeMap::new()), @@ -211,7 +225,7 @@ impl ClusterSessions { } /// Create new generation session. - pub fn new_generation_session(&self, master: NodeId, session_id: SessionId, nonce: Option, cluster: Arc) -> Result, Error> { + pub fn new_generation_session(&self, master: NodeId, session_id: SessionId, nonce: Option, cluster: Arc) -> Result, Error> { // check that there's no finished encryption session with the same id if self.key_storage.contains(&session_id) { return Err(Error::DuplicateSessionId); @@ -225,7 +239,7 @@ impl ClusterSessions { // check that there's no active encryption session with the same id let nonce = self.check_session_nonce(&master, nonce)?; - self.generation_sessions.insert(master, session_id, cluster.clone(), move || + self.generation_sessions.insert(master, session_id, cluster.clone(), false, move || Ok(GenerationSessionImpl::new(GenerationSessionParams { id: session_id.clone(), self_node_id: self.self_node_id.clone(), @@ -254,11 +268,11 @@ impl ClusterSessions { } /// Create new encryption session. - pub fn new_encryption_session(&self, master: NodeId, session_id: SessionId, nonce: Option, cluster: Arc) -> Result, Error> { + pub fn new_encryption_session(&self, master: NodeId, session_id: SessionId, nonce: Option, cluster: Arc) -> Result, Error> { let encrypted_data = self.read_key_share(&session_id, &cluster)?; let nonce = self.check_session_nonce(&master, nonce)?; - self.encryption_sessions.insert(master, session_id, cluster.clone(), move || EncryptionSessionImpl::new(EncryptionSessionParams { + self.encryption_sessions.insert(master, session_id, cluster.clone(), false, move || EncryptionSessionImpl::new(EncryptionSessionParams { id: session_id.clone(), self_node_id: self.self_node_id.clone(), encrypted_data: encrypted_data, @@ -281,12 +295,12 @@ impl ClusterSessions { } /// Create new decryption session. - pub fn new_decryption_session(&self, master: NodeId, session_id: SessionId, sub_session_id: Secret, nonce: Option, cluster: Arc, requester_signature: Option) -> Result, Error> { + pub fn new_decryption_session(&self, master: NodeId, session_id: SessionId, sub_session_id: Secret, nonce: Option, cluster: Arc, requester_signature: Option) -> Result, Error> { let session_id = DecryptionSessionId::new(session_id, sub_session_id); let encrypted_data = self.read_key_share(&session_id.id, &cluster)?; let nonce = self.check_session_nonce(&master, nonce)?; - self.decryption_sessions.insert(master, session_id.clone(), cluster.clone(), move || DecryptionSessionImpl::new(DecryptionSessionParams { + self.decryption_sessions.insert(master, session_id.clone(), cluster.clone(), false, move || DecryptionSessionImpl::new(DecryptionSessionParams { meta: SessionMeta { id: session_id.id, self_node_id: self.self_node_id.clone(), @@ -320,12 +334,12 @@ impl ClusterSessions { } /// Create new signing session. - pub fn new_signing_session(&self, master: NodeId, session_id: SessionId, sub_session_id: Secret, nonce: Option, cluster: Arc, requester_signature: Option) -> Result, Error> { + pub fn new_signing_session(&self, master: NodeId, session_id: SessionId, sub_session_id: Secret, nonce: Option, cluster: Arc, requester_signature: Option) -> Result, Error> { let session_id = SigningSessionId::new(session_id, sub_session_id); let encrypted_data = self.read_key_share(&session_id.id, &cluster)?; let nonce = self.check_session_nonce(&master, nonce)?; - self.signing_sessions.insert(master, session_id.clone(), cluster.clone(), move || SigningSessionImpl::new(SigningSessionParams { + self.signing_sessions.insert(master, session_id.clone(), cluster.clone(), false, move || SigningSessionImpl::new(SigningSessionParams { meta: SessionMeta { id: session_id.id, self_node_id: self.self_node_id.clone(), @@ -359,11 +373,11 @@ impl ClusterSessions { } /// Create new share add session. - pub fn new_share_add_session(&self, master: NodeId, session_id: SessionId, nonce: Option, cluster: Arc) -> Result, Error> { + pub fn new_share_add_session(&self, master: NodeId, session_id: SessionId, nonce: Option, cluster: Arc) -> Result, Error> { let nonce = self.check_session_nonce(&master, nonce)?; let admin_public = self.admin_public.clone().ok_or(Error::AccessDenied)?; - self.admin_sessions.insert(master, session_id.clone(), cluster.clone(), move || ShareAddSessionImpl::new(ShareAddSessionParams { + self.admin_sessions.insert(master, session_id.clone(), cluster.clone(), false, move || ShareAddSessionImpl::new(ShareAddSessionParams { meta: ShareChangeSessionMeta { id: session_id, self_node_id: self.self_node_id.clone(), @@ -389,11 +403,11 @@ impl ClusterSessions { } /// Create new share move session. - pub fn new_share_move_session(&self, master: NodeId, session_id: SessionId, nonce: Option, cluster: Arc) -> Result, Error> { + pub fn new_share_move_session(&self, master: NodeId, session_id: SessionId, nonce: Option, cluster: Arc) -> Result, Error> { let nonce = self.check_session_nonce(&master, nonce)?; let admin_public = self.admin_public.clone().ok_or(Error::AccessDenied)?; - self.admin_sessions.insert(master, session_id.clone(), cluster.clone(), move || ShareMoveSessionImpl::new(ShareMoveSessionParams { + self.admin_sessions.insert(master, session_id.clone(), cluster.clone(), false, move || ShareMoveSessionImpl::new(ShareMoveSessionParams { meta: ShareChangeSessionMeta { id: session_id, self_node_id: self.self_node_id.clone(), @@ -419,11 +433,11 @@ impl ClusterSessions { } /// Create new share remove session. - pub fn new_share_remove_session(&self, master: NodeId, session_id: SessionId, nonce: Option, cluster: Arc) -> Result, Error> { + pub fn new_share_remove_session(&self, master: NodeId, session_id: SessionId, nonce: Option, cluster: Arc) -> Result, Error> { let nonce = self.check_session_nonce(&master, nonce)?; let admin_public = self.admin_public.clone().ok_or(Error::AccessDenied)?; - self.admin_sessions.insert(master, session_id.clone(), cluster.clone(), move || ShareRemoveSessionImpl::new(ShareRemoveSessionParams { + self.admin_sessions.insert(master, session_id.clone(), cluster.clone(), false, move || ShareRemoveSessionImpl::new(ShareRemoveSessionParams { meta: ShareChangeSessionMeta { id: session_id, self_node_id: self.self_node_id.clone(), @@ -449,8 +463,7 @@ impl ClusterSessions { } /// Create new servers set change session. - pub fn new_servers_set_change_session(&self, master: NodeId, session_id: Option, nonce: Option, cluster: Arc, all_nodes_set: BTreeSet) -> Result, Error> { - // TODO: check if there's no other active sessions + do not allow to start other sessions when this session is active + pub fn new_servers_set_change_session(&self, master: NodeId, session_id: Option, nonce: Option, cluster: Arc, all_nodes_set: BTreeSet) -> Result, Error> { let session_id = match session_id { Some(session_id) => if session_id == *SERVERS_SET_CHANGE_SESSION_ID { session_id @@ -462,7 +475,7 @@ impl ClusterSessions { let nonce = self.check_session_nonce(&master, nonce)?; let admin_public = self.admin_public.clone().ok_or(Error::AccessDenied)?; - self.admin_sessions.insert(master, session_id.clone(), cluster.clone(), move || ServersSetChangeSessionImpl::new(ServersSetChangeSessionParams { + self.admin_sessions.insert(master, session_id.clone(), cluster.clone(), true, move || ServersSetChangeSessionImpl::new(ServersSetChangeSessionParams { meta: ShareChangeSessionMeta { id: session_id, self_node_id: self.self_node_id.clone(), @@ -511,7 +524,7 @@ impl ClusterSessions { } /// Read key share && remove disconnected nodes. - fn read_key_share(&self, key_id: &SessionId, cluster: &Arc) -> Result { + fn read_key_share(&self, key_id: &SessionId, cluster: &Arc) -> Result { let mut encrypted_data = self.key_storage.get(key_id).map_err(|e| Error::KeyStorage(e.into()))?; // some of nodes, which were encrypting secret may be down @@ -540,9 +553,10 @@ impl ClusterSessions { } impl ClusterSessionsContainer where K: Clone + Ord, V: ClusterSession { - pub fn new() -> Self { + pub fn new(container_state: Arc>) -> Self { ClusterSessionsContainer { sessions: RwLock::new(BTreeMap::new()), + container_state: container_state, } } @@ -554,13 +568,18 @@ impl ClusterSessionsContainer where K: Clone + Ord, V: Cluster self.sessions.read().get(session_id).map(|s| s.session.clone()) } - pub fn insert Result>(&self, master: NodeId, session_id: K, cluster: Arc, session: F) -> Result, Error> { + pub fn insert Result>(&self, master: NodeId, session_id: K, cluster: Arc, is_exclusive_session: bool, session: F) -> Result, Error> { let mut sessions = self.sessions.write(); if sessions.contains_key(&session_id) { return Err(Error::DuplicateSessionId); } + // create session let session = Arc::new(session()?); + // check if session can be started + self.container_state.lock().on_session_starting(is_exclusive_session)?; + + // insert session let queued_session = QueuedSession { master: master, cluster_view: cluster, @@ -573,7 +592,9 @@ impl ClusterSessionsContainer where K: Clone + Ord, V: Cluster } pub fn remove(&self, session_id: &K) { - self.sessions.write().remove(session_id); + if self.sessions.write().remove(session_id).is_some() { + self.container_state.lock().on_session_completed(); + } } pub fn enqueue_message(&self, session_id: &K, sender: NodeId, message: M, is_queued_message: bool) { @@ -621,6 +642,45 @@ impl ClusterSessionsContainer where K: Clone + Ord, V: Cluster } } +impl ClusterSessionsContainerState { + /// When session is starting. + pub fn on_session_starting(&mut self, is_exclusive_session: bool) -> Result<(), Error> { + match *self { + ClusterSessionsContainerState::Idle if is_exclusive_session => { + ::std::mem::replace(self, ClusterSessionsContainerState::Exclusive); + }, + ClusterSessionsContainerState::Idle => { + ::std::mem::replace(self, ClusterSessionsContainerState::Active(1)); + }, + ClusterSessionsContainerState::Active(_) if is_exclusive_session => + return Err(Error::HasActiveSessions), + ClusterSessionsContainerState::Active(sessions_count) => { + ::std::mem::replace(self, ClusterSessionsContainerState::Active(sessions_count + 1)); + }, + ClusterSessionsContainerState::Exclusive => + return Err(Error::ExclusiveSessionActive), + } + Ok(()) + } + + /// When session is completed. + pub fn on_session_completed(&mut self) { + match *self { + ClusterSessionsContainerState::Idle => + unreachable!("idle means that there are no active sessions; on_session_completed is only called once after active session is completed; qed"), + ClusterSessionsContainerState::Active(sessions_count) if sessions_count == 1 => { + ::std::mem::replace(self, ClusterSessionsContainerState::Idle); + }, + ClusterSessionsContainerState::Active(sessions_count) => { + ::std::mem::replace(self, ClusterSessionsContainerState::Active(sessions_count - 1)); + } + ClusterSessionsContainerState::Exclusive => { + ::std::mem::replace(self, ClusterSessionsContainerState::Idle); + }, + } + } +} + impl AdminSession { pub fn as_share_add(&self) -> Option<&ShareAddSessionImpl> { match *self { @@ -841,3 +901,53 @@ impl Drop for AdminSessionWrapper { } } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::collections::BTreeSet; + use ethkey::{Random, Generator}; + use key_server_cluster::{Error, DummyAclStorage, DummyKeyStorage, MapKeyServerSet, PlainNodeKeyPair}; + use key_server_cluster::cluster::ClusterConfiguration; + use key_server_cluster::cluster::tests::DummyCluster; + use super::ClusterSessions; + + pub fn make_cluster_sessions() -> ClusterSessions { + let key_pair = Random.generate().unwrap(); + let config = ClusterConfiguration { + threads: 1, + self_key_pair: Arc::new(PlainNodeKeyPair::new(key_pair.clone())), + listen_address: ("127.0.0.1".to_owned(), 100_u16), + key_server_set: Arc::new(MapKeyServerSet::new(vec![(key_pair.public().clone(), format!("127.0.0.1:{}", 100).parse().unwrap())].into_iter().collect())), + allow_connecting_to_higher_nodes: false, + key_storage: Arc::new(DummyKeyStorage::default()), + acl_storage: Arc::new(DummyAclStorage::default()), + admin_public: Some(Random.generate().unwrap().public().clone()), + }; + ClusterSessions::new(&config) + } + + #[test] + fn cluster_session_cannot_be_started_if_exclusive_session_is_active() { + let sessions = make_cluster_sessions(); + + sessions.new_generation_session(Default::default(), Default::default(), Some(1), Arc::new(DummyCluster::new(sessions.self_node_id.clone()))).unwrap(); + match sessions.new_servers_set_change_session(Default::default(), None, Some(1), Arc::new(DummyCluster::new(sessions.self_node_id.clone())), BTreeSet::new()) { + Err(Error::HasActiveSessions) => (), + Err(e) => unreachable!(format!("{}", e)), + Ok(_) => unreachable!("OK"), + } + } + + #[test] + fn exclusive_session_cannot_be_started_if_other_session_is_active() { + let sessions = make_cluster_sessions(); + + sessions.new_servers_set_change_session(Default::default(), None, Some(1), Arc::new(DummyCluster::new(sessions.self_node_id.clone())), BTreeSet::new()).unwrap(); + match sessions.new_generation_session(Default::default(), Default::default(), Some(1), Arc::new(DummyCluster::new(sessions.self_node_id.clone()))) { + Err(Error::ExclusiveSessionActive) => (), + Err(e) => unreachable!(format!("{}", e)), + Ok(_) => unreachable!("OK"), + } + } +} diff --git a/secret_store/src/key_server_cluster/mod.rs b/secret_store/src/key_server_cluster/mod.rs index f83677830..99c53b248 100644 --- a/secret_store/src/key_server_cluster/mod.rs +++ b/secret_store/src/key_server_cluster/mod.rs @@ -107,6 +107,10 @@ pub enum Error { ConsensusUnreachable, /// Acl storage error. AccessDenied, + /// Can't start session, because exclusive session is active. + ExclusiveSessionActive, + /// Can't start exclusive session, because there are other active sessions. + HasActiveSessions, } impl From for Error { @@ -152,6 +156,8 @@ impl fmt::Display for Error { Error::KeyStorage(ref e) => write!(f, "key storage error {}", e), Error::ConsensusUnreachable => write!(f, "Consensus unreachable"), Error::AccessDenied => write!(f, "Access denied"), + Error::ExclusiveSessionActive => write!(f, "Exclusive session active"), + Error::HasActiveSessions => write!(f, "Unable to start exclusive session"), } } } @@ -175,7 +181,6 @@ pub use self::client_sessions::decryption_session; pub use self::client_sessions::encryption_session; pub use self::client_sessions::generation_session; pub use self::client_sessions::signing_session; - mod cluster; mod cluster_sessions; mod io;