// Copyright 2015-2017 Parity Technologies (UK) Ltd. // This file is part of Parity. // Parity is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity. If not, see . use std::time; use std::sync::{Arc, Weak}; use std::sync::atomic::{AtomicBool, Ordering}; use std::collections::{VecDeque, BTreeSet, BTreeMap}; use parking_lot::RwLock; use ethkey::{Public, Secret, Signature}; use key_server_cluster::{Error, NodeId, SessionId, AclStorage, KeyStorage, DocumentKeyShare, EncryptedDocumentKeyShadow, SessionMeta}; use key_server_cluster::cluster::{Cluster, ClusterData, ClusterView, ClusterConfiguration}; use key_server_cluster::message::{self, Message, GenerationMessage, EncryptionMessage, DecryptionMessage, SigningMessage}; use key_server_cluster::generation_session::{Session as GenerationSession, SessionImpl as GenerationSessionImpl, SessionParams as GenerationSessionParams, SessionState as GenerationSessionState}; use key_server_cluster::decryption_session::{Session as DecryptionSession, SessionImpl as DecryptionSessionImpl, DecryptionSessionId, SessionParams as DecryptionSessionParams}; use key_server_cluster::encryption_session::{Session as EncryptionSession, SessionImpl as EncryptionSessionImpl, SessionParams as EncryptionSessionParams, SessionState as EncryptionSessionState}; use key_server_cluster::signing_session::{Session as SigningSession, SessionImpl as SigningSessionImpl, SigningSessionId, SessionParams as SigningSessionParams}; /// When there are no session-related messages for SESSION_TIMEOUT_INTERVAL seconds, /// we must treat this session as stalled && finish it with an error. /// This timeout is for cases when node is responding to KeepAlive messages, but intentionally ignores /// session messages. const SESSION_TIMEOUT_INTERVAL: u64 = 60; /// Generic cluster session. pub trait ClusterSession { /// If session is finished (either with succcess or not). fn is_finished(&self) -> bool; /// When it takes too much time to complete session. fn on_session_timeout(&self); /// When it takes too much time to receive response from the node. fn on_node_timeout(&self, node_id: &NodeId); } /// Active sessions on this cluster. pub struct ClusterSessions { /// Key generation sessions. pub generation_sessions: ClusterSessionsContainer, /// Encryption sessions. pub encryption_sessions: ClusterSessionsContainer, /// Decryption sessions. pub decryption_sessions: ClusterSessionsContainer, /// Signing sessions. pub signing_sessions: ClusterSessionsContainer, /// Self node id. self_node_id: NodeId, /// All nodes ids. nodes: BTreeSet, /// Reference to key storage key_storage: Arc, /// Reference to ACL storage acl_storage: Arc, /// Make faulty generation sessions. make_faulty_generation_sessions: AtomicBool, } /// Active sessions container. pub struct ClusterSessionsContainer { /// Active sessions. pub sessions: RwLock>>, } /// Session and its message queue. pub struct QueuedSession { /// Session master. pub master: NodeId, /// Cluster view. pub cluster_view: Arc, /// Last received message time. pub last_message_time: time::Instant, /// Generation session. pub session: Arc, /// Messages queue. pub queue: VecDeque<(NodeId, M)>, } /// Generation session implementation, which removes session from cluster on drop. pub struct GenerationSessionWrapper { /// Wrapped session. session: Arc, /// Session Id. session_id: SessionId, /// Cluster data reference. cluster: Weak, } /// Encryption session implementation, which removes session from cluster on drop. pub struct EncryptionSessionWrapper { /// Wrapped session. session: Arc, /// Session Id. session_id: SessionId, /// Cluster data reference. cluster: Weak, } /// Decryption session implementation, which removes session from cluster on drop. pub struct DecryptionSessionWrapper { /// Wrapped session. session: Arc, /// Session Id. session_id: DecryptionSessionId, /// Cluster data reference. cluster: Weak, } /// Signing session implementation, which removes session from cluster on drop. pub struct SigningSessionWrapper { /// Wrapped session. session: Arc, /// Session Id. session_id: SigningSessionId, /// Cluster data reference. cluster: Weak, } impl ClusterSessions { /// Create new cluster sessions container. pub fn new(config: &ClusterConfiguration) -> Self { ClusterSessions { self_node_id: config.self_key_pair.public().clone(), nodes: config.nodes.keys().cloned().collect(), acl_storage: config.acl_storage.clone(), key_storage: config.key_storage.clone(), generation_sessions: ClusterSessionsContainer::new(), encryption_sessions: ClusterSessionsContainer::new(), decryption_sessions: ClusterSessionsContainer::new(), signing_sessions: ClusterSessionsContainer::new(), make_faulty_generation_sessions: AtomicBool::new(false), } } #[cfg(test)] pub fn make_faulty_generation_sessions(&self) { self.make_faulty_generation_sessions.store(true, Ordering::Relaxed); } /// Create new generation session. pub fn new_generation_session(&self, master: NodeId, session_id: SessionId, cluster: Arc) -> Result, Error> { // check that there's no finished encryption session with the same id if self.key_storage.contains(&session_id) { return Err(Error::DuplicateSessionId); } // communicating to all other nodes is crucial for encryption session // => check that we have connections to all cluster nodes if self.nodes.iter().any(|n| !cluster.is_connected(n)) { return Err(Error::NodeDisconnected); } // check that there's no active encryption session with the same id self.generation_sessions.insert(master, session_id, cluster.clone(), move || Ok(GenerationSessionImpl::new(GenerationSessionParams { id: session_id.clone(), self_node_id: self.self_node_id.clone(), key_storage: Some(self.key_storage.clone()), cluster: cluster, }))) .map(|session| { if self.make_faulty_generation_sessions.load(Ordering::Relaxed) { session.simulate_faulty_behaviour(); } session }) } /// Send generation session error. pub fn respond_with_generation_error(&self, session_id: &SessionId, error: message::SessionError) { self.generation_sessions.sessions.read().get(session_id) .map(|s| { // error in generation session is considered fatal // => broadcast error // do not bother processing send error, as we already processing error let _ = s.cluster_view.broadcast(Message::Generation(GenerationMessage::SessionError(error))); }); } /// Create new encryption session. pub fn new_encryption_session(&self, master: NodeId, session_id: SessionId, cluster: Arc) -> Result, Error> { let encrypted_data = self.read_key_share(&session_id, &cluster)?; self.encryption_sessions.insert(master, session_id, cluster.clone(), move || EncryptionSessionImpl::new(EncryptionSessionParams { id: session_id.clone(), self_node_id: self.self_node_id.clone(), encrypted_data: encrypted_data, key_storage: self.key_storage.clone(), cluster: cluster, })) } /// Send encryption session error. pub fn respond_with_encryption_error(&self, session_id: &SessionId, error: message::EncryptionSessionError) { self.encryption_sessions.sessions.read().get(session_id) .map(|s| { // error in encryption session is considered fatal // => broadcast error // do not bother processing send error, as we already processing error let _ = s.cluster_view.broadcast(Message::Encryption(EncryptionMessage::EncryptionSessionError(error))); }); } /// Create new decryption session. pub fn new_decryption_session(&self, master: NodeId, session_id: SessionId, sub_session_id: Secret, cluster: Arc, requester_signature: Option) -> Result, Error> { let session_id = DecryptionSessionId::new(session_id, sub_session_id); let encrypted_data = self.read_key_share(&session_id.id, &cluster)?; self.decryption_sessions.insert(master, session_id.clone(), cluster.clone(), move || DecryptionSessionImpl::new(DecryptionSessionParams { meta: SessionMeta { id: session_id.id, self_node_id: self.self_node_id.clone(), master_node_id: master, threshold: encrypted_data.threshold, }, access_key: session_id.access_key, key_share: encrypted_data, acl_storage: self.acl_storage.clone(), cluster: cluster, }, requester_signature)) } /// Send decryption session error. pub fn respond_with_decryption_error(&self, session_id: &SessionId, sub_session_id: &Secret, to: &NodeId, error: message::DecryptionSessionError) { let session_id = DecryptionSessionId::new(session_id.clone(), sub_session_id.clone()); self.decryption_sessions.sessions.read().get(&session_id) .map(|s| { // error in decryption session is non-fatal, if occurs on slave node // => either respond with error // => or broadcast error // do not bother processing send error, as we already processing error if s.master == self.self_node_id { let _ = s.cluster_view.broadcast(Message::Decryption(DecryptionMessage::DecryptionSessionError(error))); } else { let _ = s.cluster_view.send(to, Message::Decryption(DecryptionMessage::DecryptionSessionError(error))); } }); } /// Create new signing session. pub fn new_signing_session(&self, master: NodeId, session_id: SessionId, sub_session_id: Secret, cluster: Arc, requester_signature: Option) -> Result, Error> { let session_id = SigningSessionId::new(session_id, sub_session_id); let encrypted_data = self.read_key_share(&session_id.id, &cluster)?; self.signing_sessions.insert(master, session_id.clone(), cluster.clone(), move || SigningSessionImpl::new(SigningSessionParams { meta: SessionMeta { id: session_id.id, self_node_id: self.self_node_id.clone(), master_node_id: master, threshold: encrypted_data.threshold, }, access_key: session_id.access_key, key_share: encrypted_data, acl_storage: self.acl_storage.clone(), cluster: cluster, }, requester_signature)) } /// Send signing session error. pub fn respond_with_signing_error(&self, session_id: &SessionId, sub_session_id: &Secret, to: &NodeId, error: message::SigningSessionError) { let session_id = SigningSessionId::new(session_id.clone(), sub_session_id.clone()); self.signing_sessions.sessions.read().get(&session_id) .map(|s| { // error in signing session is non-fatal, if occurs on slave node // => either respond with error // => or broadcast error // do not bother processing send error, as we already processing error if s.master == self.self_node_id { let _ = s.cluster_view.broadcast(Message::Signing(SigningMessage::SigningSessionError(error))); } else { let _ = s.cluster_view.send(to, Message::Signing(SigningMessage::SigningSessionError(error))); } }); } /// Stop sessions that are stalling. pub fn stop_stalled_sessions(&self) { self.generation_sessions.stop_stalled_sessions(); self.encryption_sessions.stop_stalled_sessions(); self.decryption_sessions.stop_stalled_sessions(); self.signing_sessions.stop_stalled_sessions(); } /// When connection to node is lost. pub fn on_connection_timeout(&self, node_id: &NodeId) { self.generation_sessions.on_connection_timeout(node_id); self.encryption_sessions.on_connection_timeout(node_id); self.decryption_sessions.on_connection_timeout(node_id); self.signing_sessions.on_connection_timeout(node_id); } /// Read key share && remove disconnected nodes. fn read_key_share(&self, key_id: &SessionId, cluster: &Arc) -> Result { let mut encrypted_data = self.key_storage.get(key_id).map_err(|e| Error::KeyStorage(e.into()))?; // some of nodes, which were encrypting secret may be down // => do not use these in session let disconnected_nodes: BTreeSet<_> = encrypted_data.id_numbers.keys().cloned().collect(); for disconnected_node in disconnected_nodes.difference(&cluster.nodes()) { encrypted_data.id_numbers.remove(&disconnected_node); } Ok(encrypted_data) } } impl ClusterSessionsContainer where K: Clone + Ord, V: ClusterSession { pub fn new() -> Self { ClusterSessionsContainer { sessions: RwLock::new(BTreeMap::new()), } } pub fn get(&self, session_id: &K) -> Option> { self.sessions.read().get(session_id).map(|s| s.session.clone()) } pub fn insert Result>(&self, master: NodeId, session_id: K, cluster: Arc, session: F) -> Result, Error> { let mut sessions = self.sessions.write(); if sessions.contains_key(&session_id) { return Err(Error::DuplicateSessionId); } let session = Arc::new(session()?); let queued_session = QueuedSession { master: master, cluster_view: cluster, last_message_time: time::Instant::now(), session: session.clone(), queue: VecDeque::new(), }; sessions.insert(session_id, queued_session); Ok(session) } pub fn remove(&self, session_id: &K) { self.sessions.write().remove(session_id); } pub fn enqueue_message(&self, session_id: &K, sender: NodeId, message: M, is_queued_message: bool) { self.sessions.write().get_mut(session_id) .map(|session| if is_queued_message { session.queue.push_front((sender, message)) } else { session.queue.push_back((sender, message)) }); } pub fn dequeue_message(&self, session_id: &K) -> Option<(NodeId, M)> { self.sessions.write().get_mut(session_id) .and_then(|session| session.queue.pop_front()) } pub fn stop_stalled_sessions(&self) { let mut sessions = self.sessions.write(); for sid in sessions.keys().cloned().collect::>() { let remove_session = { let session = sessions.get(&sid).expect("enumerating only existing sessions; qed"); if time::Instant::now() - session.last_message_time > time::Duration::from_secs(SESSION_TIMEOUT_INTERVAL) { session.session.on_session_timeout(); session.session.is_finished() } else { false } }; if remove_session { sessions.remove(&sid); } } } pub fn on_connection_timeout(&self, node_id: &NodeId) { let mut sessions = self.sessions.write(); for sid in sessions.keys().cloned().collect::>() { let remove_session = { let session = sessions.get(&sid).expect("enumerating only existing sessions; qed"); session.session.on_node_timeout(node_id); session.session.is_finished() }; if remove_session { sessions.remove(&sid); } } } } impl GenerationSessionWrapper { pub fn new(cluster: Weak, session_id: SessionId, session: Arc) -> Arc { Arc::new(GenerationSessionWrapper { session: session, session_id: session_id, cluster: cluster, }) } } impl GenerationSession for GenerationSessionWrapper { fn state(&self) -> GenerationSessionState { self.session.state() } fn wait(&self, timeout: Option) -> Result { self.session.wait(timeout) } fn joint_public_and_secret(&self) -> Option> { self.session.joint_public_and_secret() } } impl Drop for GenerationSessionWrapper { fn drop(&mut self) { if let Some(cluster) = self.cluster.upgrade() { cluster.sessions().generation_sessions.remove(&self.session_id); } } } impl EncryptionSessionWrapper { pub fn new(cluster: Weak, session_id: SessionId, session: Arc) -> Arc { Arc::new(EncryptionSessionWrapper { session: session, session_id: session_id, cluster: cluster, }) } } impl EncryptionSession for EncryptionSessionWrapper { fn state(&self) -> EncryptionSessionState { self.session.state() } fn wait(&self, timeout: Option) -> Result<(), Error> { self.session.wait(timeout) } } impl Drop for EncryptionSessionWrapper { fn drop(&mut self) { if let Some(cluster) = self.cluster.upgrade() { cluster.sessions().encryption_sessions.remove(&self.session_id); } } } impl DecryptionSessionWrapper { pub fn new(cluster: Weak, session_id: DecryptionSessionId, session: Arc) -> Arc { Arc::new(DecryptionSessionWrapper { session: session, session_id: session_id, cluster: cluster, }) } } impl DecryptionSession for DecryptionSessionWrapper { fn wait(&self) -> Result { self.session.wait() } } impl Drop for DecryptionSessionWrapper { fn drop(&mut self) { if let Some(cluster) = self.cluster.upgrade() { cluster.sessions().decryption_sessions.remove(&self.session_id); } } } impl SigningSessionWrapper { pub fn new(cluster: Weak, session_id: SigningSessionId, session: Arc) -> Arc { Arc::new(SigningSessionWrapper { session: session, session_id: session_id, cluster: cluster, }) } } impl SigningSession for SigningSessionWrapper { fn wait(&self) -> Result<(Secret, Secret), Error> { self.session.wait() } } impl Drop for SigningSessionWrapper { fn drop(&mut self) { if let Some(cluster) = self.cluster.upgrade() { cluster.sessions().signing_sessions.remove(&self.session_id); } } }