diff --git a/Cargo.lock b/Cargo.lock index 93ccf8cc4..10df3d472 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -603,6 +603,7 @@ dependencies = [ "ethcore-ipc 1.7.0", "ethcore-ipc-codegen 1.7.0", "ethcore-ipc-nano 1.7.0", + "ethcore-logger 1.7.0", "ethcore-util 1.7.0", "ethcrypto 0.1.0", "ethkey 0.2.0", diff --git a/parity/secretstore.rs b/parity/secretstore.rs index 5a787b224..f215c937c 100644 --- a/parity/secretstore.rs +++ b/parity/secretstore.rs @@ -96,9 +96,6 @@ mod server { port: port, })).collect(), allow_connecting_to_higher_nodes: true, - encryption_config: ethcore_secretstore::EncryptionConfiguration { - key_check_timeout_ms: 1000, - }, }, }; diff --git a/secret_store/Cargo.toml b/secret_store/Cargo.toml index 062195737..b087358be 100644 --- a/secret_store/Cargo.toml +++ b/secret_store/Cargo.toml @@ -31,6 +31,7 @@ ethcore-devtools = { path = "../devtools" } ethcore-util = { path = "../util" } ethcore-ipc = { path = "../ipc/rpc" } ethcore-ipc-nano = { path = "../ipc/nano" } +ethcore-logger = { path = "../logger" } ethcrypto = { path = "../ethcrypto" } ethkey = { path = "../ethkey" } native-contracts = { path = "../ethcore/native_contracts" } diff --git a/secret_store/src/http_listener.rs b/secret_store/src/http_listener.rs index e7737f160..482fa92f6 100644 --- a/secret_store/src/http_listener.rs +++ b/secret_store/src/http_listener.rs @@ -27,7 +27,7 @@ use url::percent_encoding::percent_decode; use util::ToPretty; use traits::KeyServer; use serialization::SerializableDocumentEncryptedKeyShadow; -use types::all::{Error, ServiceConfiguration, RequestSignature, DocumentAddress, DocumentEncryptedKey, DocumentEncryptedKeyShadow}; +use types::all::{Error, NodeAddress, RequestSignature, DocumentAddress, DocumentEncryptedKey, DocumentEncryptedKeyShadow}; /// Key server http-requests listener pub struct KeyServerHttpListener { @@ -60,7 +60,7 @@ struct KeyServerSharedHttpHandler { impl KeyServerHttpListener where T: KeyServer + 'static { /// Start KeyServer http listener - pub fn start(config: ServiceConfiguration, key_server: T) -> Result { + pub fn start(listener_address: &NodeAddress, key_server: T) -> Result { let shared_handler = Arc::new(KeyServerSharedHttpHandler { key_server: key_server, }); @@ -68,7 +68,7 @@ impl KeyServerHttpListener where T: KeyServer + 'static { handler: shared_handler.clone(), }; - let listener_addr: &str = &format!("{}:{}", config.listener_address.address, config.listener_address.port); + let listener_addr: &str = &format!("{}:{}", listener_address.address, listener_address.port); let http_server = HttpServer::http(&listener_addr).expect("cannot start HttpServer"); let http_server = http_server.handle(handler).expect("cannot start HttpServer"); let listener = KeyServerHttpListener { @@ -93,6 +93,13 @@ impl KeyServer for KeyServerHttpListener where T: KeyServer + 'static { } } +impl Drop for KeyServerHttpListener where T: KeyServer + 'static { + fn drop(&mut self) { + // ignore error as we are dropping anyway + let _ = self._http_server.close(); + } +} + impl HttpHandler for KeyServerHttpHandler where T: KeyServer + 'static { fn handle(&self, req: HttpRequest, mut res: HttpResponse) { if req.headers.has::() { @@ -219,7 +226,17 @@ fn parse_request(method: &HttpMethod, uri_path: &str) -> Request { #[cfg(test)] mod tests { use hyper::method::Method as HttpMethod; - use super::{parse_request, Request}; + use key_server::tests::DummyKeyServer; + use types::all::NodeAddress; + use super::{parse_request, Request, KeyServerHttpListener}; + + #[test] + fn http_listener_successfully_drops() { + let key_server = DummyKeyServer; + let address = NodeAddress { address: "127.0.0.1".into(), port: 9000 }; + let listener = KeyServerHttpListener::start(&address, key_server).unwrap(); + drop(listener); + } #[test] fn parse_request_successful() { diff --git a/secret_store/src/key_server.rs b/secret_store/src/key_server.rs index 64dc47a50..49016ac8e 100644 --- a/secret_store/src/key_server.rs +++ b/secret_store/src/key_server.rs @@ -64,7 +64,7 @@ impl KeyServer for KeyServerImpl { // generate document key let encryption_session = self.data.lock().cluster.new_encryption_session(document.clone(), threshold)?; - let document_key = encryption_session.wait()?; + let document_key = encryption_session.wait(None)?; // encrypt document key with requestor public key let document_key = ethcrypto::ecies::encrypt_single_message(&public, &document_key) @@ -104,7 +104,6 @@ impl KeyServerCore { .map(|(node_id, node_address)| (node_id.clone(), (node_address.address.clone(), node_address.port))) .collect(), allow_connecting_to_higher_nodes: config.allow_connecting_to_higher_nodes, - encryption_config: config.encryption_config.clone(), acl_storage: acl_storage, key_storage: key_storage, }; @@ -143,54 +142,108 @@ impl Drop for KeyServerCore { } #[cfg(test)] -mod tests { +pub mod tests { use std::time; use std::sync::Arc; use ethcrypto; use ethkey::{self, Random, Generator}; use acl_storage::tests::DummyAclStorage; use key_storage::tests::DummyKeyStorage; - use types::all::{ClusterConfiguration, NodeAddress, EncryptionConfiguration}; + use types::all::{Error, ClusterConfiguration, NodeAddress, RequestSignature, DocumentAddress, DocumentEncryptedKey, DocumentEncryptedKeyShadow}; use super::{KeyServer, KeyServerImpl}; - #[test] - fn document_key_generation_and_retrievement_works_over_network() { - //::util::log::init_log(); + pub struct DummyKeyServer; - let num_nodes = 3; + impl KeyServer for DummyKeyServer { + fn generate_document_key(&self, _signature: &RequestSignature, _document: &DocumentAddress, _threshold: usize) -> Result { + unimplemented!() + } + + fn document_key(&self, _signature: &RequestSignature, _document: &DocumentAddress) -> Result { + unimplemented!() + } + + fn document_key_shadow(&self, _signature: &RequestSignature, _document: &DocumentAddress) -> Result { + unimplemented!() + } + } + + fn make_key_servers(start_port: u16, num_nodes: usize) -> Vec { let key_pairs: Vec<_> = (0..num_nodes).map(|_| Random.generate().unwrap()).collect(); let configs: Vec<_> = (0..num_nodes).map(|i| ClusterConfiguration { threads: 1, self_private: (***key_pairs[i].secret()).into(), listener_address: NodeAddress { address: "127.0.0.1".into(), - port: 6060 + (i as u16), + port: start_port + (i as u16), }, nodes: key_pairs.iter().enumerate().map(|(j, kp)| (kp.public().clone(), NodeAddress { address: "127.0.0.1".into(), - port: 6060 + (j as u16), + port: start_port + (j as u16), })).collect(), allow_connecting_to_higher_nodes: false, - encryption_config: EncryptionConfiguration { - key_check_timeout_ms: 10, - }, }).collect(); let key_servers: Vec<_> = configs.into_iter().map(|cfg| KeyServerImpl::new(&cfg, Arc::new(DummyAclStorage::default()), Arc::new(DummyKeyStorage::default())).unwrap() ).collect(); - // wait until connections are established + // wait until connections are established. It is fast => do not bother with events here let start = time::Instant::now(); + let mut tried_reconnections = false; loop { if key_servers.iter().all(|ks| ks.cluster().cluster_state().connected.len() == num_nodes - 1) { break; } - if time::Instant::now() - start > time::Duration::from_millis(30000) { - panic!("connections are not established in 30000ms"); + + let old_tried_reconnections = tried_reconnections; + let mut fully_connected = true; + for key_server in &key_servers { + if key_server.cluster().cluster_state().connected.len() != num_nodes - 1 { + fully_connected = false; + if !old_tried_reconnections { + tried_reconnections = true; + key_server.cluster().connect(); + } + } + } + if fully_connected { + break; + } + if time::Instant::now() - start > time::Duration::from_millis(1000) { + panic!("connections are not established in 1000ms"); } } + key_servers + } + + #[test] + fn document_key_generation_and_retrievement_works_over_network_with_single_node() { + //::logger::init_log(); + let key_servers = make_key_servers(6070, 1); + + // generate document key + let threshold = 0; + let document = Random.generate().unwrap().secret().clone(); + let secret = Random.generate().unwrap().secret().clone(); + let signature = ethkey::sign(&secret, &document).unwrap(); + let generated_key = key_servers[0].generate_document_key(&signature, &document, threshold).unwrap(); + let generated_key = ethcrypto::ecies::decrypt_single_message(&secret, &generated_key).unwrap(); + + // now let's try to retrieve key back + for key_server in key_servers.iter() { + let retrieved_key = key_server.document_key(&signature, &document).unwrap(); + let retrieved_key = ethcrypto::ecies::decrypt_single_message(&secret, &retrieved_key).unwrap(); + assert_eq!(retrieved_key, generated_key); + } + } + + #[test] + fn document_key_generation_and_retrievement_works_over_network_with_3_nodes() { + //::logger::init_log(); + let key_servers = make_key_servers(6080, 3); + let test_cases = [0, 1, 2]; for threshold in &test_cases { // generate document key diff --git a/secret_store/src/key_server_cluster/cluster.rs b/secret_store/src/key_server_cluster/cluster.rs index 55b302757..bd857bda8 100644 --- a/secret_store/src/key_server_cluster/cluster.rs +++ b/secret_store/src/key_server_cluster/cluster.rs @@ -17,6 +17,7 @@ use std::io; use std::time; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::collections::btree_map::Entry; use std::net::{SocketAddr, IpAddr}; @@ -24,19 +25,46 @@ use futures::{finished, failed, Future, Stream, BoxFuture}; use futures_cpupool::CpuPool; use parking_lot::{RwLock, Mutex}; use tokio_io::IoFuture; -use tokio_core::reactor::{Handle, Remote, Timeout, Interval}; +use tokio_core::reactor::{Handle, Remote, Interval}; use tokio_core::net::{TcpListener, TcpStream}; use ethkey::{Secret, KeyPair, Signature, Random, Generator}; -use key_server_cluster::{Error, NodeId, SessionId, EncryptionConfiguration, AclStorage, KeyStorage}; +use key_server_cluster::{Error, NodeId, SessionId, AclStorage, KeyStorage}; use key_server_cluster::message::{self, Message, ClusterMessage, EncryptionMessage, DecryptionMessage}; -use key_server_cluster::decryption_session::{SessionImpl as DecryptionSessionImpl, DecryptionSessionId, - SessionParams as DecryptionSessionParams, Session as DecryptionSession}; +use key_server_cluster::decryption_session::{SessionImpl as DecryptionSessionImpl, SessionState as DecryptionSessionState, + SessionParams as DecryptionSessionParams, Session as DecryptionSession, DecryptionSessionId}; use key_server_cluster::encryption_session::{SessionImpl as EncryptionSessionImpl, SessionState as EncryptionSessionState, SessionParams as EncryptionSessionParams, Session as EncryptionSession}; use key_server_cluster::io::{DeadlineStatus, ReadMessage, SharedTcpStream, read_encrypted_message, WriteMessage, write_encrypted_message}; use key_server_cluster::net::{accept_connection as net_accept_connection, connect as net_connect, Connection as NetConnection}; -pub type BoxedEmptyFuture = BoxFuture<(), ()>; +/// Maintain interval (seconds). Every MAINTAIN_INTERVAL seconds node: +/// 1) checks if connected nodes are responding to KeepAlive messages +/// 2) tries to connect to disconnected nodes +/// 3) checks if enc/dec sessions are time-outed +const MAINTAIN_INTERVAL: u64 = 10; + +/// When no messages have been received from node within KEEP_ALIVE_SEND_INTERVAL seconds, +/// we must send KeepAlive message to the node to check if it still responds to messages. +const KEEP_ALIVE_SEND_INTERVAL: u64 = 30; +/// When no messages have been received from node within KEEP_ALIVE_DISCONNECT_INTERVAL seconds, +/// we must treat this node as non-responding && disconnect from it. +const KEEP_ALIVE_DISCONNECT_INTERVAL: u64 = 60; + +/// When there are no encryption session-related messages for ENCRYPTION_SESSION_TIMEOUT_INTERVAL seconds, +/// we must treat this session as stalled && finish it with an error. +/// This timeout is for cases when node is responding to KeepAlive messages, but intentionally ignores +/// session messages. +const ENCRYPTION_SESSION_TIMEOUT_INTERVAL: u64 = 60; + +/// When there are no decryption session-related messages for DECRYPTION_SESSION_TIMEOUT_INTERVAL seconds, +/// we must treat this session as stalled && finish it with an error. +/// This timeout is for cases when node is responding to KeepAlive messages, but intentionally ignores +/// session messages. +const DECRYPTION_SESSION_TIMEOUT_INTERVAL: u64 = 60; + +/// Encryption sesion timeout interval. It works +/// Empty future. +type BoxedEmptyFuture = BoxFuture<(), ()>; /// Cluster interface for external clients. pub trait ClusterClient: Send + Sync { @@ -46,6 +74,16 @@ pub trait ClusterClient: Send + Sync { fn new_encryption_session(&self, session_id: SessionId, threshold: usize) -> Result, Error>; /// Start new decryption session. fn new_decryption_session(&self, session_id: SessionId, requestor_signature: Signature, is_shadow_decryption: bool) -> Result, Error>; + + #[cfg(test)] + /// Ask node to make 'faulty' encryption sessions. + fn make_faulty_encryption_sessions(&self); + #[cfg(test)] + /// Get active encryption session with given id. + fn encryption_session(&self, session_id: &SessionId) -> Option>; + #[cfg(test)] + /// Try connect to disconnected nodes. + fn connect(&self); } /// Cluster access for single encryption/decryption participant. @@ -54,8 +92,6 @@ pub trait Cluster: Send + Sync { fn broadcast(&self, message: Message) -> Result<(), Error>; /// Send message to given node. fn send(&self, to: &NodeId, message: Message) -> Result<(), Error>; - /// Blacklist node, close connection and remove all pending messages. - fn blacklist(&self, node: &NodeId); } #[derive(Clone)] @@ -71,8 +107,6 @@ pub struct ClusterConfiguration { pub listen_address: (String, u16), /// Cluster nodes. pub nodes: BTreeMap, - /// Encryption session configuration. - pub encryption_config: EncryptionConfiguration, /// Reference to key storage pub key_storage: Arc, /// Reference to ACL storage @@ -136,6 +170,8 @@ pub struct ClusterConnections { pub struct ClusterSessions { /// Self node id. pub self_node_id: NodeId, + /// All nodes ids. + pub nodes: BTreeSet, /// Reference to key storage pub key_storage: Arc, /// Reference to ACL storage @@ -144,10 +180,18 @@ pub struct ClusterSessions { pub encryption_sessions: RwLock>, /// Active decryption sessions. pub decryption_sessions: RwLock>, + /// Make faulty encryption sessions. + pub make_faulty_encryption_sessions: AtomicBool, } /// Encryption session and its message queue. pub struct QueuedEncryptionSession { + /// Session master. + pub master: NodeId, + /// Cluster view. + pub cluster_view: Arc, + /// Last received message time. + pub last_message_time: time::Instant, /// Encryption session. pub session: Arc, /// Messages queue. @@ -156,6 +200,12 @@ pub struct QueuedEncryptionSession { /// Decryption session and its message queue. pub struct QueuedDecryptionSession { + /// Session master. + pub master: NodeId, + /// Cluster view. + pub cluster_view: Arc, + /// Last received message time. + pub last_message_time: time::Instant, /// Decryption session. pub session: Arc, /// Messages queue. @@ -217,17 +267,28 @@ impl ClusterCore { self.data.connection(node) } - /// Run cluster + /// Run cluster. pub fn run(&self) -> Result<(), Error> { - // try to connect to every other peer - ClusterCore::connect_disconnected_nodes(self.data.clone()); + self.run_listener() + .and_then(|_| self.run_connections())?; // schedule maintain procedures ClusterCore::schedule_maintain(&self.handle, self.data.clone()); - // start listening for incoming connections - self.handle.spawn(ClusterCore::listen(&self.handle, self.data.clone(), self.listen_address.clone())?); + Ok(()) + } + /// Start listening for incoming connections. + pub fn run_listener(&self) -> Result<(), Error> { + // start listeining for incoming connections + self.handle.spawn(ClusterCore::listen(&self.handle, self.data.clone(), self.listen_address.clone())?); + Ok(()) + } + + /// Start connecting to other nodes. + pub fn run_connections(&self) -> Result<(), Error> { + // try to connect to every other peer + ClusterCore::connect_disconnected_nodes(self.data.clone()); Ok(()) } @@ -278,18 +339,24 @@ impl ClusterCore { /// Schedule mainatain procedures. fn schedule_maintain(handle: &Handle, data: Arc) { - // TODO: per-session timeouts (node can respond to messages, but ignore sessions messages) - let (d1, d2, d3) = (data.clone(), data.clone(), data.clone()); - let interval: BoxedEmptyFuture = Interval::new(time::Duration::new(10, 0), handle) + let d = data.clone(); + let interval: BoxedEmptyFuture = Interval::new(time::Duration::new(MAINTAIN_INTERVAL, 0), handle) .expect("failed to create interval") - .and_then(move |_| Ok(trace!(target: "secretstore_net", "{}: executing maintain procedures", d1.self_key_pair.public()))) - .and_then(move |_| Ok(ClusterCore::keep_alive(d2.clone()))) - .and_then(move |_| Ok(ClusterCore::connect_disconnected_nodes(d3.clone()))) + .and_then(move |_| Ok(ClusterCore::maintain(data.clone()))) .for_each(|_| Ok(())) .then(|_| finished(())) .boxed(); - data.spawn(interval); + d.spawn(interval); + } + + /// Execute maintain procedures. + fn maintain(data: Arc) { + trace!(target: "secretstore_net", "{}: executing maintain procedures", data.self_key_pair.public()); + + ClusterCore::keep_alive(data.clone()); + ClusterCore::connect_disconnected_nodes(data.clone()); + data.sessions.stop_stalled_sessions(); } /// Called for every incomming mesage. @@ -324,11 +391,11 @@ impl ClusterCore { fn keep_alive(data: Arc) { for connection in data.connections.active_connections() { let last_message_diff = time::Instant::now() - connection.last_message_time(); - if last_message_diff > time::Duration::from_secs(60) { + if last_message_diff > time::Duration::from_secs(KEEP_ALIVE_DISCONNECT_INTERVAL) { data.connections.remove(connection.node_id(), connection.is_inbound()); data.sessions.on_connection_timeout(connection.node_id()); } - else if last_message_diff > time::Duration::from_secs(30) { + else if last_message_diff > time::Duration::from_secs(KEEP_ALIVE_SEND_INTERVAL) { data.spawn(connection.send_message(Message::Cluster(ClusterMessage::KeepAlive(message::KeepAlive {})))); } } @@ -370,7 +437,7 @@ impl ClusterCore { /// Process single message from the connection. fn process_connection_message(data: Arc, connection: Arc, message: Message) { connection.set_last_message_time(time::Instant::now()); - trace!(target: "secretstore_net", "{}: processing message {} from {}", data.self_key_pair.public(), message, connection.node_id()); + trace!(target: "secretstore_net", "{}: received message {} from {}", data.self_key_pair.public(), message, connection.node_id()); match message { Message::Encryption(message) => ClusterCore::process_encryption_message(data, connection, message), Message::Decryption(message) => ClusterCore::process_decryption_message(data, connection, message), @@ -380,102 +447,53 @@ impl ClusterCore { /// Process single encryption message from the connection. fn process_encryption_message(data: Arc, connection: Arc, mut message: EncryptionMessage) { - let mut sender = connection.node_id().clone(); - let mut is_queued_message = false; let session_id = message.session_id().clone(); - let key_check_timeout_ms = data.config.encryption_config.key_check_timeout_ms; + let mut sender = connection.node_id().clone(); + let session = match message { + EncryptionMessage::InitializeSession(_) => { + let mut connected_nodes = data.connections.connected_nodes(); + connected_nodes.insert(data.self_key_pair.public().clone()); + + let cluster = Arc::new(ClusterView::new(data.clone(), connected_nodes)); + data.sessions.new_encryption_session(sender.clone(), session_id.clone(), cluster) + }, + _ => { + data.sessions.encryption_session(&session_id) + .ok_or(Error::InvalidSessionId) + }, + }; + + let mut is_queued_message = false; loop { - let result = match message { - EncryptionMessage::InitializeSession(ref message) => { - let mut connected_nodes = data.connections.connected_nodes(); - connected_nodes.insert(data.self_key_pair.public().clone()); - - let cluster = Arc::new(ClusterView::new(data.clone(), connected_nodes)); - let session_id: SessionId = message.session.clone().into(); - data.sessions.new_encryption_session(sender.clone(), session_id.clone(), cluster) - .and_then(|s| s.on_initialize_session(sender.clone(), message)) - }, - EncryptionMessage::ConfirmInitialization(ref message) => data.sessions.encryption_session(&*message.session) - .ok_or(Error::InvalidSessionId) - .and_then(|s| s.on_confirm_initialization(sender.clone(), message)), - EncryptionMessage::CompleteInitialization(ref message) => data.sessions.encryption_session(&*message.session) - .ok_or(Error::InvalidSessionId) - .and_then(|s| s.on_complete_initialization(sender.clone(), message)), - EncryptionMessage::KeysDissemination(ref message) => data.sessions.encryption_session(&*message.session) - .ok_or(Error::InvalidSessionId) - .and_then(|s| { - // TODO: move this logic to session (or session connector) - let is_in_key_check_state = s.state() == EncryptionSessionState::KeyCheck; - let result = s.on_keys_dissemination(sender.clone(), message); - if !is_in_key_check_state && s.state() == EncryptionSessionState::KeyCheck { - let session = s.clone(); - let d = data.clone(); - data.handle.spawn(move |handle| - Timeout::new(time::Duration::new(key_check_timeout_ms / 1000, 0), handle) - .expect("failed to create timeout") - .and_then(move |_| { - if let Err(error) = session.start_key_generation_phase() { - session.on_session_error(d.self_key_pair.public().clone(), &message::SessionError { - session: session.id().clone().into(), - error: error.into(), - }); - } - Ok(()) - }) - .then(|_| finished(())) - ); - } - - result - }), - EncryptionMessage::Complaint(ref message) => data.sessions.encryption_session(&*message.session) - .ok_or(Error::InvalidSessionId) - .and_then(|s| s.on_complaint(sender.clone(), message)), - EncryptionMessage::ComplaintResponse(ref message) => data.sessions.encryption_session(&*message.session) - .ok_or(Error::InvalidSessionId) - .and_then(|s| s.on_complaint_response(sender.clone(), message)), - EncryptionMessage::PublicKeyShare(ref message) => data.sessions.encryption_session(&*message.session) - .ok_or(Error::InvalidSessionId) - .and_then(|s| s.on_public_key_share(sender.clone(), message)), - EncryptionMessage::SessionError(ref message) => { - if let Some(s) = data.sessions.encryption_session(&*message.session) { - data.sessions.remove_encryption_session(s.id()); - s.on_session_error(sender.clone(), message); - } - Ok(()) - }, - EncryptionMessage::SessionCompleted(ref message) => data.sessions.encryption_session(&*message.session) - .ok_or(Error::InvalidSessionId) - .and_then(|s| { - let result = s.on_session_completed(sender.clone(), message); - if result.is_ok() && s.state() == EncryptionSessionState::Finished { - data.sessions.remove_encryption_session(s.id()); - } - - result - }), - }; - - match result { - Err(Error::TooEarlyForRequest) => { - data.sessions.enqueue_encryption_message(&session_id, sender, message, is_queued_message); - break; - }, - Err(err) => { - warn!(target: "secretstore_net", "{}: error {} when processing message {} from node {}", data.self_key_pair.public(), err, message, sender); - if let Some(connection) = data.connections.get(&sender) { - data.spawn(connection.send_message(Message::Encryption(EncryptionMessage::SessionError(message::SessionError { - session: session_id.clone().into(), - error: format!("{:?}", err), - })))); + match session.clone().and_then(|session| match message { + EncryptionMessage::InitializeSession(ref message) => + session.on_initialize_session(sender.clone(), message), + EncryptionMessage::ConfirmInitialization(ref message) => + session.on_confirm_initialization(sender.clone(), message), + EncryptionMessage::CompleteInitialization(ref message) => + session.on_complete_initialization(sender.clone(), message), + EncryptionMessage::KeysDissemination(ref message) => + session.on_keys_dissemination(sender.clone(), message), + EncryptionMessage::PublicKeyShare(ref message) => + session.on_public_key_share(sender.clone(), message), + EncryptionMessage::SessionError(ref message) => + session.on_session_error(sender.clone(), message), + EncryptionMessage::SessionCompleted(ref message) => + session.on_session_completed(sender.clone(), message), + }) { + Ok(_) => { + // if session is completed => stop + let session = session.clone().expect("session.method() call finished with success; session exists; qed"); + let session_state = session.state(); + if session_state == EncryptionSessionState::Finished { + info!(target: "secretstore_net", "{}: encryption session completed", data.self_key_pair.public()); } - - if err != Error::InvalidSessionId { + if session_state == EncryptionSessionState::Finished || session_state == EncryptionSessionState::Failed { data.sessions.remove_encryption_session(&session_id); + break; } - break; - }, - _ => { + + // try to dequeue message match data.sessions.dequeue_encryption_message(&session_id) { Some((msg_sender, msg)) => { is_queued_message = true; @@ -485,64 +503,73 @@ impl ClusterCore { None => break, } }, + Err(Error::TooEarlyForRequest) => { + data.sessions.enqueue_encryption_message(&session_id, sender, message, is_queued_message); + break; + }, + Err(err) => { + warn!(target: "secretstore_net", "{}: encryption session error {} when processing message {} from node {}", data.self_key_pair.public(), err, message, sender); + data.sessions.respond_with_encryption_error(&session_id, message::SessionError { + session: session_id.clone().into(), + error: format!("{:?}", err), + }); + if err != Error::InvalidSessionId { + data.sessions.remove_encryption_session(&session_id); + } + break; + }, } } } /// Process single decryption message from the connection. fn process_decryption_message(data: Arc, connection: Arc, mut message: DecryptionMessage) { - let mut sender = connection.node_id().clone(); - let mut is_queued_message = false; let session_id = message.session_id().clone(); let sub_session_id = message.sub_session_id().clone(); + let mut sender = connection.node_id().clone(); + let session = match message { + DecryptionMessage::InitializeDecryptionSession(_) => { + let mut connected_nodes = data.connections.connected_nodes(); + connected_nodes.insert(data.self_key_pair.public().clone()); + + let cluster = Arc::new(ClusterView::new(data.clone(), connected_nodes)); + data.sessions.new_decryption_session(sender.clone(), session_id.clone(), sub_session_id.clone(), cluster) + }, + _ => { + data.sessions.decryption_session(&session_id, &sub_session_id) + .ok_or(Error::InvalidSessionId) + }, + }; + + let mut is_queued_message = false; loop { - let result = match message { - DecryptionMessage::InitializeDecryptionSession(ref message) => { - let mut connected_nodes = data.connections.connected_nodes(); - connected_nodes.insert(data.self_key_pair.public().clone()); - - let cluster = Arc::new(ClusterView::new(data.clone(), connected_nodes)); - data.sessions.new_decryption_session(sender.clone(), session_id.clone(), sub_session_id.clone(), cluster) - .and_then(|s| s.on_initialize_session(sender.clone(), message)) - }, - DecryptionMessage::ConfirmDecryptionInitialization(ref message) => data.sessions.decryption_session(&*message.session, &*message.sub_session) - .ok_or(Error::InvalidSessionId) - .and_then(|s| s.on_confirm_initialization(sender.clone(), message)), - DecryptionMessage::RequestPartialDecryption(ref message) => data.sessions.decryption_session(&*message.session, &*message.sub_session) - .ok_or(Error::InvalidSessionId) - .and_then(|s| s.on_partial_decryption_requested(sender.clone(), message)), - DecryptionMessage::PartialDecryption(ref message) => data.sessions.decryption_session(&*message.session, &*message.sub_session) - .ok_or(Error::InvalidSessionId) - .and_then(|s| s.on_partial_decryption(sender.clone(), message)), - DecryptionMessage::DecryptionSessionError(ref message) => { - if let Some(s) = data.sessions.decryption_session(&*message.session, &*message.sub_session) { - data.sessions.remove_decryption_session(&session_id, &sub_session_id); - s.on_session_error(sender.clone(), message); - } - Ok(()) - }, - }; - - match result { - Err(Error::TooEarlyForRequest) => { - data.sessions.enqueue_decryption_message(&session_id, &sub_session_id, sender, message, is_queued_message); - break; - }, - Err(err) => { - if let Some(connection) = data.connections.get(&sender) { - data.spawn(connection.send_message(Message::Decryption(DecryptionMessage::DecryptionSessionError(message::DecryptionSessionError { - session: session_id.clone().into(), - sub_session: sub_session_id.clone().into(), - error: format!("{:?}", err), - })))); + match session.clone().and_then(|session| match message { + DecryptionMessage::InitializeDecryptionSession(ref message) => + session.on_initialize_session(sender.clone(), message), + DecryptionMessage::ConfirmDecryptionInitialization(ref message) => + session.on_confirm_initialization(sender.clone(), message), + DecryptionMessage::RequestPartialDecryption(ref message) => + session.on_partial_decryption_requested(sender.clone(), message), + DecryptionMessage::PartialDecryption(ref message) => + session.on_partial_decryption(sender.clone(), message), + DecryptionMessage::DecryptionSessionError(ref message) => + session.on_session_error(sender.clone(), message), + DecryptionMessage::DecryptionSessionCompleted(ref message) => + session.on_session_completed(sender.clone(), message), + }) { + Ok(_) => { + // if session is completed => stop + let session = session.clone().expect("session.method() call finished with success; session exists; qed"); + let session_state = session.state(); + if session_state == DecryptionSessionState::Finished { + info!(target: "secretstore_net", "{}: decryption session completed", data.self_key_pair.public()); } - - if err != Error::InvalidSessionId { + if session_state == DecryptionSessionState::Finished || session_state == DecryptionSessionState::Failed { data.sessions.remove_decryption_session(&session_id, &sub_session_id); + break; } - break; - }, - _ => { + + // try to dequeue message match data.sessions.dequeue_decryption_message(&session_id, &sub_session_id) { Some((msg_sender, msg)) => { is_queued_message = true; @@ -552,6 +579,22 @@ impl ClusterCore { None => break, } }, + Err(Error::TooEarlyForRequest) => { + data.sessions.enqueue_decryption_message(&session_id, &sub_session_id, sender, message, is_queued_message); + break; + }, + Err(err) => { + warn!(target: "secretstore_net", "{}: decryption session error {} when processing message {} from node {}", data.self_key_pair.public(), err, message, sender); + data.sessions.respond_with_decryption_error(&session_id, &sub_session_id, &sender, message::DecryptionSessionError { + session: session_id.clone().into(), + sub_session: sub_session_id.clone().into(), + error: format!("{:?}", err), + }); + if err != Error::InvalidSessionId { + data.sessions.remove_decryption_session(&session_id, &sub_session_id); + } + break; + }, } } } @@ -602,6 +645,7 @@ impl ClusterConnections { return false; } } + trace!(target: "secretstore_net", "{}: inserting connection to {} at {}", self.self_node_id, connection.node_id(), connection.node_address()); connections.insert(connection.node_id().clone(), connection); true @@ -640,14 +684,16 @@ impl ClusterSessions { pub fn new(config: &ClusterConfiguration) -> Self { ClusterSessions { self_node_id: config.self_key_pair.public().clone(), + nodes: config.nodes.keys().cloned().collect(), acl_storage: config.acl_storage.clone(), key_storage: config.key_storage.clone(), encryption_sessions: RwLock::new(BTreeMap::new()), decryption_sessions: RwLock::new(BTreeMap::new()), + make_faulty_encryption_sessions: AtomicBool::new(false), } } - pub fn new_encryption_session(&self, _master: NodeId, session_id: SessionId, cluster: Arc) -> Result, Error> { + pub fn new_encryption_session(&self, master: NodeId, session_id: SessionId, cluster: Arc) -> Result, Error> { let mut encryption_sessions = self.encryption_sessions.write(); // check that there's no active encryption session with the same id if encryption_sessions.contains_key(&session_id) { @@ -658,16 +704,28 @@ impl ClusterSessions { return Err(Error::DuplicateSessionId); } + // communicating to all other nodes is crucial for encryption session + // => check that we have connections to all cluster nodes + if self.nodes.iter().any(|n| !cluster.is_connected(n)) { + return Err(Error::NodeDisconnected); + } + let session = Arc::new(EncryptionSessionImpl::new(EncryptionSessionParams { id: session_id.clone(), self_node_id: self.self_node_id.clone(), key_storage: self.key_storage.clone(), - cluster: cluster, + cluster: cluster.clone(), })); let encryption_session = QueuedEncryptionSession { + master: master, + cluster_view: cluster, + last_message_time: time::Instant::now(), session: session.clone(), queue: VecDeque::new() }; + if self.make_faulty_encryption_sessions.load(Ordering::Relaxed) { + encryption_session.session.simulate_faulty_behaviour(); + } encryption_sessions.insert(session_id, encryption_session); Ok(session) } @@ -691,22 +749,50 @@ impl ClusterSessions { .and_then(|session| session.queue.pop_front()) } - pub fn new_decryption_session(&self, _master: NodeId, session_id: SessionId, sub_session_id: Secret, cluster: Arc) -> Result, Error> { + pub fn respond_with_encryption_error(&self, session_id: &SessionId, error: message::SessionError) { + self.encryption_sessions.read().get(session_id) + .map(|s| { + // error in encryption session is considered fatal + // => broadcast error + + // do not bother processing send error, as we already processing error + let _ = s.cluster_view.broadcast(Message::Encryption(EncryptionMessage::SessionError(error))); + }); + } + + #[cfg(test)] + pub fn make_faulty_encryption_sessions(&self) { + self.make_faulty_encryption_sessions.store(true, Ordering::Relaxed); + } + + pub fn new_decryption_session(&self, master: NodeId, session_id: SessionId, sub_session_id: Secret, cluster: Arc) -> Result, Error> { let mut decryption_sessions = self.decryption_sessions.write(); let session_id = DecryptionSessionId::new(session_id, sub_session_id); if decryption_sessions.contains_key(&session_id) { return Err(Error::DuplicateSessionId); } + // some of nodes, which were encrypting secret may be down + // => do not use these in decryption session + let mut encrypted_data = self.key_storage.get(&session_id.id).map_err(|e| Error::KeyStorage(e.into()))?; + let disconnected_nodes: BTreeSet<_> = encrypted_data.id_numbers.keys().cloned().collect(); + let disconnected_nodes: BTreeSet<_> = disconnected_nodes.difference(&cluster.nodes()).cloned().collect(); + for disconnected_node in disconnected_nodes { + encrypted_data.id_numbers.remove(&disconnected_node); + } + let session = Arc::new(DecryptionSessionImpl::new(DecryptionSessionParams { id: session_id.id.clone(), access_key: session_id.access_key.clone(), self_node_id: self.self_node_id.clone(), - encrypted_data: self.key_storage.get(&session_id.id).map_err(|e| Error::KeyStorage(e.into()))?, + encrypted_data: encrypted_data, acl_storage: self.acl_storage.clone(), - cluster: cluster, + cluster: cluster.clone(), })?); let decryption_session = QueuedDecryptionSession { + master: master, + cluster_view: cluster, + last_message_time: time::Instant::now(), session: session.clone(), queue: VecDeque::new() }; @@ -737,12 +823,66 @@ impl ClusterSessions { .and_then(|session| session.queue.pop_front()) } - pub fn on_connection_timeout(&self, node_id: &NodeId) { - for encryption_session in self.encryption_sessions.read().values() { - encryption_session.session.on_session_timeout(node_id); + pub fn respond_with_decryption_error(&self, session_id: &SessionId, sub_session_id: &Secret, to: &NodeId, error: message::DecryptionSessionError) { + let session_id = DecryptionSessionId::new(session_id.clone(), sub_session_id.clone()); + self.decryption_sessions.read().get(&session_id) + .map(|s| { + // error in decryption session is non-fatal, if occurs on slave node + // => either respond with error + // => or broadcast error + + // do not bother processing send error, as we already processing error + if &s.master == s.session.node() { + let _ = s.cluster_view.broadcast(Message::Decryption(DecryptionMessage::DecryptionSessionError(error))); + } else { + let _ = s.cluster_view.send(to, Message::Decryption(DecryptionMessage::DecryptionSessionError(error))); + } + }); + } + + fn stop_stalled_sessions(&self) { + { + let sessions = self.encryption_sessions.write(); + for sid in sessions.keys().collect::>() { + let session = sessions.get(&sid).expect("enumerating only existing sessions; qed"); + if time::Instant::now() - session.last_message_time > time::Duration::from_secs(ENCRYPTION_SESSION_TIMEOUT_INTERVAL) { + session.session.on_session_timeout(); + if session.session.state() == EncryptionSessionState::Finished + || session.session.state() == EncryptionSessionState::Failed { + self.remove_encryption_session(&sid); + } + } + } } - for decryption_session in self.decryption_sessions.read().values() { - decryption_session.session.on_session_timeout(node_id); + { + let sessions = self.decryption_sessions.write(); + for sid in sessions.keys().collect::>() { + let session = sessions.get(&sid).expect("enumerating only existing sessions; qed"); + if time::Instant::now() - session.last_message_time > time::Duration::from_secs(DECRYPTION_SESSION_TIMEOUT_INTERVAL) { + session.session.on_session_timeout(); + if session.session.state() == DecryptionSessionState::Finished + || session.session.state() == DecryptionSessionState::Failed { + self.remove_decryption_session(&sid.id, &sid.access_key); + } + } + } + } + } + + pub fn on_connection_timeout(&self, node_id: &NodeId) { + for (sid, session) in self.encryption_sessions.read().iter() { + session.session.on_node_timeout(node_id); + if session.session.state() == EncryptionSessionState::Finished + || session.session.state() == EncryptionSessionState::Failed { + self.remove_encryption_session(sid); + } + } + for (sid, session) in self.decryption_sessions.read().iter() { + session.session.on_node_timeout(node_id); + if session.session.state() == DecryptionSessionState::Finished + || session.session.state() == DecryptionSessionState::Failed { + self.remove_decryption_session(&sid.id, &sid.access_key); + } } } } @@ -823,12 +963,21 @@ impl ClusterView { })), } } + + pub fn is_connected(&self, node: &NodeId) -> bool { + self.core.lock().nodes.contains(node) + } + + pub fn nodes(&self) -> BTreeSet { + self.core.lock().nodes.clone() + } } impl Cluster for ClusterView { fn broadcast(&self, message: Message) -> Result<(), Error> { let core = self.core.lock(); for node in core.nodes.iter().filter(|n| *n != core.cluster.self_key_pair.public()) { + trace!(target: "secretstore_net", "{}: sent message {} to {}", core.cluster.self_key_pair.public(), message, node); let connection = core.cluster.connection(node).ok_or(Error::NodeDisconnected)?; core.cluster.spawn(connection.send_message(message.clone())) } @@ -837,14 +986,11 @@ impl Cluster for ClusterView { fn send(&self, to: &NodeId, message: Message) -> Result<(), Error> { let core = self.core.lock(); + trace!(target: "secretstore_net", "{}: sent message {} to {}", core.cluster.self_key_pair.public(), message, to); let connection = core.cluster.connection(to).ok_or(Error::NodeDisconnected)?; core.cluster.spawn(connection.send_message(message)); Ok(()) } - - fn blacklist(&self, _node: &NodeId) { - // TODO: unimplemented!() - } } impl ClusterClientImpl { @@ -880,6 +1026,21 @@ impl ClusterClient for ClusterClientImpl { session.initialize(requestor_signature, is_shadow_decryption)?; Ok(session) } + + #[cfg(test)] + fn connect(&self) { + ClusterCore::connect_disconnected_nodes(self.data.clone()); + } + + #[cfg(test)] + fn make_faulty_encryption_sessions(&self) { + self.data.sessions.make_faulty_encryption_sessions(); + } + + #[cfg(test)] + fn encryption_session(&self, session_id: &SessionId) -> Option> { + self.data.sessions.encryption_session(session_id) + } } fn make_socket_address(address: &str, port: u16) -> Result { @@ -895,9 +1056,10 @@ pub mod tests { use parking_lot::Mutex; use tokio_core::reactor::Core; use ethkey::{Random, Generator}; - use key_server_cluster::{NodeId, Error, EncryptionConfiguration, DummyAclStorage, DummyKeyStorage}; + use key_server_cluster::{NodeId, SessionId, Error, DummyAclStorage, DummyKeyStorage}; use key_server_cluster::message::Message; use key_server_cluster::cluster::{Cluster, ClusterCore, ClusterConfiguration}; + use key_server_cluster::encryption_session::{Session as EncryptionSession, SessionState as EncryptionSessionState}; #[derive(Debug)] pub struct DummyCluster { @@ -947,9 +1109,6 @@ pub mod tests { self.data.lock().messages.push_back((to.clone(), message)); Ok(()) } - - fn blacklist(&self, _node: &NodeId) { - } } pub fn loop_until(core: &mut Core, timeout: time::Duration, predicate: F) where F: Fn() -> bool { @@ -982,9 +1141,6 @@ pub mod tests { .map(|(j, kp)| (kp.public().clone(), ("127.0.0.1".into(), ports_begin + j as u16))) .collect(), allow_connecting_to_higher_nodes: false, - encryption_config: EncryptionConfiguration { - key_check_timeout_ms: 10, - }, key_storage: Arc::new(DummyKeyStorage::default()), acl_storage: Arc::new(DummyAclStorage::default()), }).collect(); @@ -997,7 +1153,10 @@ pub mod tests { pub fn run_clusters(clusters: &[Arc]) { for cluster in clusters { - cluster.run().unwrap(); + cluster.run_listener().unwrap(); + } + for cluster in clusters { + cluster.run_connections().unwrap(); } } @@ -1008,4 +1167,65 @@ pub mod tests { run_clusters(&clusters); loop_until(&mut core, time::Duration::from_millis(300), || clusters.iter().all(all_connections_established)); } + + #[test] + fn cluster_wont_start_encryption_session_if_not_fully_connected() { + let core = Core::new().unwrap(); + let clusters = make_clusters(&core, 6013, 3); + clusters[0].run().unwrap(); + match clusters[0].client().new_encryption_session(SessionId::default(), 1) { + Err(Error::NodeDisconnected) => (), + Err(e) => panic!("unexpected error {:?}", e), + _ => panic!("unexpected success"), + } + } + + #[test] + fn error_in_encryption_session_broadcasted_to_all_other_nodes() { + let mut core = Core::new().unwrap(); + let clusters = make_clusters(&core, 6016, 3); + run_clusters(&clusters); + loop_until(&mut core, time::Duration::from_millis(300), || clusters.iter().all(all_connections_established)); + + // ask one of nodes to produce faulty encryption sessions + clusters[1].client().make_faulty_encryption_sessions(); + + // start && wait for encryption session to fail + let session = clusters[0].client().new_encryption_session(SessionId::default(), 1).unwrap(); + loop_until(&mut core, time::Duration::from_millis(300), || session.joint_public_key().is_some()); + assert!(session.joint_public_key().unwrap().is_err()); + + // check that faulty session is either removed from all nodes, or nonexistent (already removed) + assert!(clusters[0].client().encryption_session(&SessionId::default()).is_none()); + for i in 1..3 { + if let Some(session) = clusters[i].client().encryption_session(&SessionId::default()) { + loop_until(&mut core, time::Duration::from_millis(300), || session.joint_public_key().is_some()); + assert!(session.joint_public_key().unwrap().is_err()); + assert!(clusters[i].client().encryption_session(&SessionId::default()).is_none()); + } + } + } + + #[test] + fn encryption_session_is_removed_when_succeeded() { + let mut core = Core::new().unwrap(); + let clusters = make_clusters(&core, 6019, 3); + run_clusters(&clusters); + loop_until(&mut core, time::Duration::from_millis(300), || clusters.iter().all(all_connections_established)); + + // start && wait for encryption session to complete + let session = clusters[0].client().new_encryption_session(SessionId::default(), 1).unwrap(); + loop_until(&mut core, time::Duration::from_millis(300), || session.state() == EncryptionSessionState::Finished); + assert!(session.joint_public_key().unwrap().is_ok()); + + // check that session is either removed from all nodes, or nonexistent (already removed) + assert!(clusters[0].client().encryption_session(&SessionId::default()).is_none()); + for i in 1..3 { + if let Some(session) = clusters[i].client().encryption_session(&SessionId::default()) { + loop_until(&mut core, time::Duration::from_millis(300), || session.state() == EncryptionSessionState::Finished); + assert!(session.joint_public_key().unwrap().is_err()); + assert!(clusters[i].client().encryption_session(&SessionId::default()).is_none()); + } + } + } } diff --git a/secret_store/src/key_server_cluster/decryption_session.rs b/secret_store/src/key_server_cluster/decryption_session.rs index 5a8c136c6..f4d64c246 100644 --- a/secret_store/src/key_server_cluster/decryption_session.rs +++ b/secret_store/src/key_server_cluster/decryption_session.rs @@ -24,7 +24,7 @@ use key_server_cluster::{Error, AclStorage, DocumentKeyShare, NodeId, SessionId, use key_server_cluster::cluster::Cluster; use key_server_cluster::math; use key_server_cluster::message::{Message, DecryptionMessage, InitializeDecryptionSession, ConfirmDecryptionInitialization, - RequestPartialDecryption, PartialDecryption, DecryptionSessionError}; + RequestPartialDecryption, PartialDecryption, DecryptionSessionError, DecryptionSessionCompleted}; /// Decryption session API. pub trait Session: Send + Sync + 'static { @@ -114,8 +114,10 @@ struct SessionData { rejected_nodes: BTreeSet, /// Nodes, which have responded with confirm to initialization request. confirmed_nodes: BTreeSet, - + // === Values, filled during partial decryption === + /// Nodes, which have been asked for partial decryption. + shadow_requests: BTreeSet, /// Shadow points, received from nodes as a response to partial decryption request. shadow_points: BTreeMap, @@ -162,6 +164,7 @@ impl SessionImpl { requested_nodes: BTreeSet::new(), rejected_nodes: BTreeSet::new(), confirmed_nodes: BTreeSet::new(), + shadow_requests: BTreeSet::new(), shadow_points: BTreeMap::new(), decrypted_secret: None, }) @@ -173,22 +176,21 @@ impl SessionImpl { &self.self_node_id } + /// Get current session state. + pub fn state(&self) -> SessionState { + self.data.lock().state.clone() + } + #[cfg(test)] /// Get this session access key. pub fn access_key(&self) -> &Secret { &self.access_key } - #[cfg(test)] - /// Get current session state. - pub fn state(&self) -> SessionState { - self.data.lock().state.clone() - } - #[cfg(test)] /// Get decrypted secret - pub fn decrypted_secret(&self) -> Option { - self.data.lock().decrypted_secret.clone().and_then(|r| r.ok()) + pub fn decrypted_secret(&self) -> Option> { + self.data.lock().decrypted_secret.clone() } /// Initialize decryption session. @@ -284,9 +286,17 @@ impl SessionImpl { let mut data = self.data.lock(); // check state - if data.state != SessionState::WaitingForInitializationConfirm { + if data.state == SessionState::WaitingForPartialDecryption { // if there were enough confirmations/rejections before this message // we have already moved to the next state + if !data.requested_nodes.remove(&sender) { + return Err(Error::InvalidMessage); + } + + data.confirmed_nodes.insert(sender); + return Ok(()); + } + if data.state != SessionState::WaitingForInitializationConfirm { return Ok(()); } @@ -321,7 +331,7 @@ impl SessionImpl { return Err(Error::InvalidMessage); } - let mut data = self.data.lock(); + let data = self.data.lock(); // check state if data.master != Some(sender) { @@ -345,8 +355,8 @@ impl SessionImpl { decrypt_shadow: decryption_result.decrypt_shadow, })))?; - // update sate - data.state = SessionState::Finished; + // master could ask us for another partial decryption in case of restart + // => no state change is required Ok(()) } @@ -364,7 +374,7 @@ impl SessionImpl { return Err(Error::InvalidStateForRequest); } - if !data.confirmed_nodes.remove(&sender) { + if !data.shadow_requests.remove(&sender) { return Err(Error::InvalidStateForRequest); } data.shadow_points.insert(sender, PartialDecryptionResult { @@ -377,34 +387,169 @@ impl SessionImpl { return Ok(()); } + // notify all other nodes about session completion + self.cluster.broadcast(Message::Decryption(DecryptionMessage::DecryptionSessionCompleted(DecryptionSessionCompleted { + session: self.id.clone().into(), + sub_session: self.access_key.clone().into(), + })))?; + + // do decryption SessionImpl::do_decryption(self.access_key.clone(), &self.encrypted_data, &mut *data)?; self.completed.notify_all(); Ok(()) } - /// When error has occured on another node. - pub fn on_session_error(&self, sender: NodeId, message: &DecryptionSessionError) { - warn!("{}: decryption session error: {:?} from {}", self.node(), message, sender); + /// When session is completed. + pub fn on_session_completed(&self, sender: NodeId, message: &DecryptionSessionCompleted) -> Result<(), Error> { + debug_assert!(self.id == *message.session); + debug_assert!(self.access_key == *message.sub_session); + debug_assert!(&sender != self.node()); + let mut data = self.data.lock(); + + // check state + if data.state != SessionState::WaitingForPartialDecryptionRequest { + return Err(Error::InvalidStateForRequest); + } + if data.master != Some(sender) { + return Err(Error::InvalidMessage); + } + + // update state + data.state = SessionState::Finished; + + Ok(()) + } + + /// When error has occured on another node. + pub fn on_session_error(&self, sender: NodeId, message: &DecryptionSessionError) -> Result<(), Error> { + let mut data = self.data.lock(); + + warn!("{}: decryption session failed with error: {:?} from {}", self.node(), message.error, sender); + data.state = SessionState::Failed; data.decrypted_secret = Some(Err(Error::Io(message.error.clone()))); self.completed.notify_all(); + + Ok(()) + } + + /// When connection to one of cluster nodes has timeouted. + pub fn on_node_timeout(&self, node: &NodeId) { + let mut data = self.data.lock(); + + let is_self_master = data.master.as_ref() == Some(self.node()); + let is_other_master = data.master.as_ref() == Some(node); + // if this is master node, we might have to restart + if is_self_master { + match data.state { + SessionState::WaitingForInitializationConfirm => { + // we will never receive confirmation from this node => treat as reject + if data.requested_nodes.remove(node) || data.confirmed_nodes.remove(node) { + data.rejected_nodes.insert(node.clone()); + } + // check if we still have enough nodes for decryption + if self.encrypted_data.id_numbers.len() - data.rejected_nodes.len() >= self.encrypted_data.threshold + 1 { + return; + } + } + SessionState::WaitingForPartialDecryption => { + if data.rejected_nodes.contains(node) { + // already rejected => does not affect session + return; + } + if data.requested_nodes.remove(node) { + // we have tried to initialize this node, but it has failed + // => no restart required, just mark as rejected + data.rejected_nodes.insert(node.clone()); + return; + } + if data.confirmed_nodes.contains(node) { + if data.shadow_points.contains_key(node) { + // we have already received partial decryption from this node + // => just ignore this connection drop + return; + } + + // the worst case: we have sent partial decryption request to other nodes + // => we have to restart the session + data.confirmed_nodes.remove(node); + data.rejected_nodes.insert(node.clone()); + // check if we still have enough nodes for decryption + if self.encrypted_data.id_numbers.len() - data.rejected_nodes.len() >= self.encrypted_data.threshold + 1 { + // we are going to stop session anyway => ignore error + let _ = SessionImpl::start_waiting_for_partial_decryption(self.node().clone(), self.id.clone(), self.access_key.clone(), &self.cluster, &self.encrypted_data, &mut *data); + return; + } + // not enough nodes + } + } + _ => (), // all other states lead to failures + } + } else if !is_other_master { + // disconnected from non-master node on non-master node + // => this does not affect this session + return; + } + // else: disconnecting from master node means failure + + // no more nodes left for decryption => fail + warn!("{}: decryption session failed because {} connection has timeouted", self.node(), node); + + data.state = SessionState::Failed; + data.decrypted_secret = Some(Err(Error::NodeDisconnected)); + self.completed.notify_all(); } /// When session timeout has occured. - pub fn on_session_timeout(&self, _node: &NodeId) { - warn!("{}: decryption session timeout", self.node()); + pub fn on_session_timeout(&self) { let mut data = self.data.lock(); - // TODO: check that node is a part of decryption process + + let is_self_master = data.master.as_ref() == Some(self.node()); + // if this is master node, we might have to restart + if is_self_master { + match data.state { + SessionState::WaitingForInitializationConfirm => + // we have sent initialization requests to all nodes, but haven't received confirmation + // => nodes will never respond => fail + (), + SessionState::WaitingForPartialDecryption => { + // we have requested partial decryption, but some nodes have failed to respond + // => mark these nodes as rejected && restart + for timeouted_node in data.shadow_requests.iter().cloned().collect::>() { + data.confirmed_nodes.remove(&timeouted_node); + data.rejected_nodes.insert(timeouted_node); + } + + // check if we still have enough nodes for decryption + if self.encrypted_data.id_numbers.len() - data.rejected_nodes.len() >= self.encrypted_data.threshold + 1 { + // we are going to stop session anyway => ignore error + let _ = SessionImpl::start_waiting_for_partial_decryption(self.node().clone(), self.id.clone(), self.access_key.clone(), &self.cluster, &self.encrypted_data, &mut *data); + return; + } + }, + // no nodes has responded to our requests => session is failed + _ => return, + } + } + + // no more nodes left for decryption => fail + warn!("{}: decryption session failed with timeout", self.node()); + data.state = SessionState::Failed; - data.decrypted_secret = Some(Err(Error::Io("session expired".into()))); + data.decrypted_secret = Some(Err(Error::NodeDisconnected)); self.completed.notify_all(); } fn start_waiting_for_partial_decryption(self_node_id: NodeId, session_id: SessionId, access_key: Secret, cluster: &Arc, encrypted_data: &DocumentKeyShare, data: &mut SessionData) -> Result<(), Error> { let confirmed_nodes: BTreeSet<_> = data.confirmed_nodes.clone(); - for node in data.confirmed_nodes.iter().filter(|n| n != &&self_node_id) { + let confirmed_nodes: BTreeSet<_> = confirmed_nodes.difference(&data.rejected_nodes).cloned().collect(); + + data.shadow_requests.clear(); + data.shadow_points.clear(); + for node in confirmed_nodes.iter().filter(|n| n != &&self_node_id) { + data.shadow_requests.insert(node.clone()); cluster.send(node, Message::Decryption(DecryptionMessage::RequestPartialDecryption(RequestPartialDecryption { session: session_id.clone().into(), sub_session: access_key.clone().into(), @@ -595,7 +740,13 @@ mod tests { encrypted_point: encrypted_point.clone(), }).collect(); let acl_storages: Vec<_> = (0..5).map(|_| Arc::new(DummyAclStorage::default())).collect(); - let clusters: Vec<_> = (0..5).map(|i| Arc::new(DummyCluster::new(id_numbers.iter().nth(i).clone().unwrap().0))).collect(); + let clusters: Vec<_> = (0..5).map(|i| { + let cluster = Arc::new(DummyCluster::new(id_numbers.iter().nth(i).clone().unwrap().0)); + for id_number in &id_numbers { + cluster.add_node(id_number.0.clone()); + } + cluster + }).collect(); let sessions: Vec<_> = (0..5).map(|i| SessionImpl::new(SessionParams { id: session_id.clone(), access_key: access_key.clone(), @@ -624,13 +775,14 @@ mod tests { Message::Decryption(DecryptionMessage::ConfirmDecryptionInitialization(message)) => session.on_confirm_initialization(from, &message).unwrap(), Message::Decryption(DecryptionMessage::RequestPartialDecryption(message)) => session.on_partial_decryption_requested(from, &message).unwrap(), Message::Decryption(DecryptionMessage::PartialDecryption(message)) => session.on_partial_decryption(from, &message).unwrap(), + Message::Decryption(DecryptionMessage::DecryptionSessionCompleted(message)) => session.on_session_completed(from, &message).unwrap(), _ => panic!("unexpected"), } } } #[test] - fn fails_to_construct_in_cluster_of_single_node() { + fn constructs_in_cluster_of_single_node() { let mut nodes = BTreeMap::new(); let self_node_id = Random.generate().unwrap().public().clone(); nodes.insert(self_node_id, Random.generate().unwrap().secret().clone()); @@ -648,7 +800,7 @@ mod tests { acl_storage: Arc::new(DummyAclStorage::default()), cluster: Arc::new(DummyCluster::new(self_node_id.clone())), }) { - Err(Error::InvalidNodesCount) => (), + Ok(_) => (), _ => panic!("unexpected"), } } @@ -722,27 +874,6 @@ mod tests { }).unwrap_err(), Error::InvalidStateForRequest); } - #[test] - fn fails_to_partial_decrypt_if_not_waiting() { - let (_, _, sessions) = prepare_decryption_sessions(); - assert_eq!(sessions[1].on_initialize_session(sessions[0].node().clone(), &message::InitializeDecryptionSession { - session: SessionId::default().into(), - sub_session: sessions[0].access_key().clone().into(), - requestor_signature: ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap().into(), - is_shadow_decryption: false, - }).unwrap(), ()); - assert_eq!(sessions[1].on_partial_decryption_requested(sessions[0].node().clone(), &message::RequestPartialDecryption { - session: SessionId::default().into(), - sub_session: sessions[0].access_key().clone().into(), - nodes: sessions.iter().map(|s| s.node().clone().into()).take(4).collect(), - }).unwrap(), ()); - assert_eq!(sessions[1].on_partial_decryption_requested(sessions[0].node().clone(), &message::RequestPartialDecryption { - session: SessionId::default().into(), - sub_session: sessions[0].access_key().clone().into(), - nodes: sessions.iter().map(|s| s.node().clone().into()).take(4).collect(), - }).unwrap_err(), Error::InvalidStateForRequest); - } - #[test] fn fails_to_partial_decrypt_if_requested_by_slave() { let (_, _, sessions) = prepare_decryption_sessions(); @@ -806,6 +937,106 @@ mod tests { assert_eq!(sessions[0].on_partial_decryption(pd_from.unwrap(), &pd_msg.unwrap()).unwrap_err(), Error::InvalidStateForRequest); } + #[test] + fn decryption_fails_on_session_timeout() { + let (_, _, sessions) = prepare_decryption_sessions(); + assert!(sessions[0].decrypted_secret().is_none()); + sessions[0].on_session_timeout(); + assert!(sessions[0].decrypted_secret().unwrap().unwrap_err() == Error::NodeDisconnected); + } + + #[test] + fn node_is_marked_rejected_when_timed_out_during_initialization_confirmation() { + let (_, _, sessions) = prepare_decryption_sessions(); + sessions[0].initialize(ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap(), false).unwrap(); + + // 1 node disconnects => we still can recover secret + sessions[0].on_node_timeout(sessions[1].node()); + assert!(sessions[0].data.lock().rejected_nodes.contains(sessions[1].node())); + assert!(sessions[0].data.lock().state == SessionState::WaitingForInitializationConfirm); + + // 2 node are disconnected => we can not recover secret + sessions[0].on_node_timeout(sessions[2].node()); + assert!(sessions[0].data.lock().rejected_nodes.contains(sessions[2].node())); + assert!(sessions[0].data.lock().state == SessionState::Failed); + } + + #[test] + fn session_does_not_fail_if_rejected_node_disconnects() { + let (clusters, acl_storages, sessions) = prepare_decryption_sessions(); + let key_pair = Random.generate().unwrap(); + + acl_storages[1].prohibit(key_pair.public().clone(), SessionId::default()); + sessions[0].initialize(ethkey::sign(key_pair.secret(), &SessionId::default()).unwrap(), false).unwrap(); + + do_messages_exchange_until(&clusters, &sessions, |_, _, _| sessions[0].state() == SessionState::WaitingForPartialDecryption); + + // 1st node disconnects => ignore this + sessions[0].on_node_timeout(sessions[1].node()); + assert!(sessions[0].data.lock().state == SessionState::WaitingForPartialDecryption); + } + + #[test] + fn session_does_not_fail_if_requested_node_disconnects() { + let (clusters, _, sessions) = prepare_decryption_sessions(); + sessions[0].initialize(ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap(), false).unwrap(); + + do_messages_exchange_until(&clusters, &sessions, |_, _, _| sessions[0].state() == SessionState::WaitingForPartialDecryption); + + // 1 node disconnects => we still can recover secret + sessions[0].on_node_timeout(sessions[1].node()); + assert!(sessions[0].data.lock().state == SessionState::WaitingForPartialDecryption); + + // 2 node are disconnected => we can not recover secret + sessions[0].on_node_timeout(sessions[2].node()); + assert!(sessions[0].data.lock().state == SessionState::Failed); + } + + #[test] + fn session_does_not_fail_if_node_with_shadow_point_disconnects() { + let (clusters, _, sessions) = prepare_decryption_sessions(); + sessions[0].initialize(ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap(), false).unwrap(); + + do_messages_exchange_until(&clusters, &sessions, |_, _, _| sessions[0].state() == SessionState::WaitingForPartialDecryption + && sessions[0].data.lock().shadow_points.len() == 2); + + // disconnects from the node which has already sent us its own shadow point + let disconnected = sessions[0].data.lock(). + shadow_points.keys() + .filter(|n| *n != sessions[0].node()) + .cloned().nth(0).unwrap(); + sessions[0].on_node_timeout(&disconnected); + assert!(sessions[0].data.lock().state == SessionState::WaitingForPartialDecryption); + } + + #[test] + fn session_restarts_if_confirmed_node_disconnects() { + let (clusters, _, sessions) = prepare_decryption_sessions(); + sessions[0].initialize(ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap(), false).unwrap(); + + do_messages_exchange_until(&clusters, &sessions, |_, _, _| sessions[0].state() == SessionState::WaitingForPartialDecryption); + + // disconnects from the node which has already confirmed its participation + let disconnected = sessions[0].data.lock().shadow_requests.iter().cloned().nth(0).unwrap(); + sessions[0].on_node_timeout(&disconnected); + assert!(sessions[0].data.lock().state == SessionState::WaitingForPartialDecryption); + assert!(sessions[0].data.lock().rejected_nodes.contains(&disconnected)); + assert!(!sessions[0].data.lock().shadow_requests.contains(&disconnected)); + } + + #[test] + fn session_does_not_fail_if_non_master_node_disconnects_from_non_master_node() { + let (clusters, _, sessions) = prepare_decryption_sessions(); + sessions[0].initialize(ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap(), false).unwrap(); + + do_messages_exchange_until(&clusters, &sessions, |_, _, _| sessions[0].state() == SessionState::WaitingForPartialDecryption); + + // disconnects from the node which has already confirmed its participation + sessions[1].on_node_timeout(sessions[2].node()); + assert!(sessions[0].data.lock().state == SessionState::WaitingForPartialDecryption); + assert!(sessions[1].data.lock().state == SessionState::WaitingForPartialDecryptionRequest); + } + #[test] fn complete_dec_session() { let (clusters, _, sessions) = prepare_decryption_sessions(); @@ -818,17 +1049,16 @@ mod tests { do_messages_exchange(&clusters, &sessions); // now check that: - // 1) 4 of 5 sessions are in Finished state - assert_eq!(sessions.iter().filter(|s| s.state() == SessionState::Finished).count(), 4); - // 2) 1 session is in WaitingForPartialDecryptionRequest state - assert_eq!(sessions.iter().filter(|s| s.state() == SessionState::WaitingForPartialDecryptionRequest).count(), 1); - // 3) 1 session has decrypted key value + // 1) 5 of 5 sessions are in Finished state + assert_eq!(sessions.iter().filter(|s| s.state() == SessionState::Finished).count(), 5); + // 2) 1 session has decrypted key value assert!(sessions.iter().skip(1).all(|s| s.decrypted_secret().is_none())); - assert_eq!(sessions[0].decrypted_secret(), Some(DocumentEncryptedKeyShadow { + + assert_eq!(sessions[0].decrypted_secret().unwrap().unwrap(), DocumentEncryptedKeyShadow { decrypted_secret: SECRET_PLAIN.into(), common_point: None, decrypt_shadows: None, - })); + }); } #[test] @@ -843,14 +1073,12 @@ mod tests { do_messages_exchange(&clusters, &sessions); // now check that: - // 1) 4 of 5 sessions are in Finished state - assert_eq!(sessions.iter().filter(|s| s.state() == SessionState::Finished).count(), 4); - // 2) 1 session is in WaitingForPartialDecryptionRequest state - assert_eq!(sessions.iter().filter(|s| s.state() == SessionState::WaitingForPartialDecryptionRequest).count(), 1); - // 3) 1 session has decrypted key value + // 1) 5 of 5 sessions are in Finished state + assert_eq!(sessions.iter().filter(|s| s.state() == SessionState::Finished).count(), 5); + // 2) 1 session has decrypted key value assert!(sessions.iter().skip(1).all(|s| s.decrypted_secret().is_none())); - let decrypted_secret = sessions[0].decrypted_secret().unwrap(); + let decrypted_secret = sessions[0].decrypted_secret().unwrap().unwrap(); // check that decrypted_secret != SECRET_PLAIN assert!(decrypted_secret.decrypted_secret != SECRET_PLAIN.into()); // check that common point && shadow coefficients are returned @@ -879,7 +1107,8 @@ mod tests { acl_storages[1].prohibit(key_pair.public().clone(), SessionId::default()); acl_storages[2].prohibit(key_pair.public().clone(), SessionId::default()); - do_messages_exchange(&clusters, &sessions); + let node3 = sessions[3].node().clone(); + do_messages_exchange_until(&clusters, &sessions, |from, _, _msg| from == &node3); // now check that: // 1) 3 of 5 sessions are in Failed state @@ -887,7 +1116,7 @@ mod tests { // 2) 2 of 5 sessions are in WaitingForPartialDecryptionRequest state assert_eq!(sessions.iter().filter(|s| s.state() == SessionState::WaitingForPartialDecryptionRequest).count(), 2); // 3) 0 sessions have decrypted key value - assert!(sessions.iter().all(|s| s.decrypted_secret().is_none())); + assert!(sessions.iter().all(|s| s.decrypted_secret().is_none() || s.decrypted_secret().unwrap().is_err())); } #[test] @@ -910,11 +1139,11 @@ mod tests { assert_eq!(sessions.iter().filter(|s| s.state() == SessionState::Finished).count(), 5); // 2) 1 session has decrypted key value assert!(sessions.iter().skip(1).all(|s| s.decrypted_secret().is_none())); - assert_eq!(sessions[0].decrypted_secret(), Some(DocumentEncryptedKeyShadow { + assert_eq!(sessions[0].decrypted_secret().unwrap().unwrap(), DocumentEncryptedKeyShadow { decrypted_secret: SECRET_PLAIN.into(), common_point: None, decrypt_shadows: None, - })); + }); } #[test] diff --git a/secret_store/src/key_server_cluster/encryption_session.rs b/secret_store/src/key_server_cluster/encryption_session.rs index beca00443..0268bf596 100644 --- a/secret_store/src/key_server_cluster/encryption_session.rs +++ b/secret_store/src/key_server_cluster/encryption_session.rs @@ -16,6 +16,7 @@ use std::collections::{BTreeSet, BTreeMap, VecDeque}; use std::fmt::{Debug, Formatter, Error as FmtError}; +use std::time; use std::sync::Arc; use parking_lot::{Condvar, Mutex}; use ethkey::{Public, Secret}; @@ -23,15 +24,18 @@ use key_server_cluster::{Error, NodeId, SessionId, KeyStorage, DocumentKeyShare} use key_server_cluster::math; use key_server_cluster::cluster::Cluster; use key_server_cluster::message::{Message, EncryptionMessage, InitializeSession, ConfirmInitialization, CompleteInitialization, - KeysDissemination, Complaint, ComplaintResponse, PublicKeyShare, SessionError, SessionCompleted}; + KeysDissemination, PublicKeyShare, SessionError, SessionCompleted}; /// Encryption session API. pub trait Session: Send + Sync + 'static { + /// Get encryption session state. + fn state(&self) -> SessionState; + /// Wait until session is completed. Returns distributely generated secret key. + fn wait(&self, timeout: Option) -> Result; + #[cfg(test)] /// Get joint public key (if it is known). - fn joint_public_key(&self) -> Option; - /// Wait until session is completed. Returns distributely generated secret key. - fn wait(&self) -> Result; + fn joint_public_key(&self) -> Option>; } /// Encryption (distributed key generation) session. @@ -40,10 +44,9 @@ pub trait Session: Send + Sync + 'static { /// Brief overview: /// 1) initialization: master node (which has received request for generating joint public + secret) initializes the session on all other nodes /// 2) key dissemination (KD): all nodes are generating secret + public values and send these to appropriate nodes -/// 3) key verification (KV): all nodes are checking values, received for other nodes and complaining if keys are wrong -/// 4) key check phase (KC): nodes are processing complaints, received from another nodes -/// 5) key generation phase (KG): nodes are exchanging with information, enough to generate joint public key -/// 6) encryption phase: master node generates secret key, encrypts it using joint public && broadcasts encryption result +/// 3) key verification (KV): all nodes are checking values, received for other nodes +/// 4) key generation phase (KG): nodes are exchanging with information, enough to generate joint public key +/// 5) encryption phase: master node generates secret key, encrypts it using joint public && broadcasts encryption result pub struct SessionImpl { /// Unique session id. id: SessionId, @@ -76,6 +79,8 @@ pub struct SessionParams { struct SessionData { /// Current state of the session. state: SessionState, + /// Simulate faulty behaviour? + simulate_faulty_behaviour: bool, // === Values, filled when session initialization just starts === /// Reference to the node, which has started this session. @@ -123,10 +128,6 @@ struct NodeData { /// Public values, which have been received from this node. pub publics: Option>, - // === Values, filled during KC phase === - /// Nodes, complaining against this node. - pub complaints: BTreeSet, - // === Values, filled during KG phase === /// Public share, which has been received from this node. pub public_share: Option, @@ -163,10 +164,6 @@ pub enum SessionState { /// Node is waiting for generated keys from every other node. WaitingForKeysDissemination, - // === KC phase states === - /// Keys check currently occurs. - KeyCheck, - // === KG phase states === /// Node is waiting for joint public key share to be received from every other node. WaitingForPublicKeyShare, @@ -193,6 +190,7 @@ impl SessionImpl { completed: Condvar::new(), data: Mutex::new(SessionData { state: SessionState::WaitingForInitialization, + simulate_faulty_behaviour: false, master: None, threshold: None, derived_point: None, @@ -205,31 +203,20 @@ impl SessionImpl { } } - /// Get this session Id. - pub fn id(&self) -> &SessionId { - &self.id - } - /// Get this node Id. pub fn node(&self) -> &NodeId { &self.self_node_id } - /// Get current session state. - pub fn state(&self) -> SessionState { - self.data.lock().state.clone() - } - #[cfg(test)] /// Get derived point. pub fn derived_point(&self) -> Option { self.data.lock().derived_point.clone() } - #[cfg(test)] - /// Get qualified nodes. - pub fn qualified_nodes(&self) -> BTreeSet { - self.data.lock().nodes.keys().cloned().collect() + /// Simulate faulty encryption session behaviour. + pub fn simulate_faulty_behaviour(&self) { + self.data.lock().simulate_faulty_behaviour = true; } /// Start new session initialization. This must be called on master node. @@ -254,15 +241,25 @@ impl SessionImpl { } let mut visit_policy = EveryOtherNodeVisitor::new(self.node(), data.nodes.keys().cloned()); - let next_node = visit_policy.next_node().expect("at least two nodes are in cluster"); - data.state = SessionState::WaitingForInitializationConfirm(visit_policy); - - // start initialization let derived_point = math::generate_random_point()?; - self.cluster.send(&next_node, Message::Encryption(EncryptionMessage::InitializeSession(InitializeSession { - session: self.id.clone().into(), - derived_point: derived_point.into(), - }))) + match visit_policy.next_node() { + Some(next_node) => { + data.state = SessionState::WaitingForInitializationConfirm(visit_policy); + + // start initialization + self.cluster.send(&next_node, Message::Encryption(EncryptionMessage::InitializeSession(InitializeSession { + session: self.id.clone().into(), + derived_point: derived_point.into(), + }))) + }, + None => { + drop(data); + self.complete_initialization(derived_point)?; + self.disseminate_keys()?; + self.verify_keys()?; + self.complete_encryption() + } + } } /// When session initialization message is received. @@ -315,33 +312,16 @@ impl SessionImpl { }; // proceed message - match next_receiver { - Some(next_receiver) => { - return self.cluster.send(&next_receiver, Message::Encryption(EncryptionMessage::InitializeSession(InitializeSession { - session: self.id.clone().into(), - derived_point: message.derived_point.clone().into(), - }))); - }, - None => { - // update point once again to make sure that derived point is not generated by last node - let mut derived_point = message.derived_point.clone().into(); - math::update_random_point(&mut derived_point)?; - - // remember derived point - data.derived_point = Some(derived_point.clone().into()); - - // broadcast derived point && other session paraeters to every other node - self.cluster.broadcast(Message::Encryption(EncryptionMessage::CompleteInitialization(CompleteInitialization { + if let Some(next_receiver) = next_receiver { + return self.cluster.send(&next_receiver, Message::Encryption(EncryptionMessage::InitializeSession(InitializeSession { session: self.id.clone().into(), - nodes: data.nodes.iter().map(|(id, data)| (id.clone().into(), data.id_number.clone().into())).collect(), - threshold: data.threshold.expect("threshold is filled in initialization phase; KD phase follows initialization phase; qed"), - derived_point: derived_point.into(), - })))?; - }, + derived_point: message.derived_point.clone().into(), + }))); } // now it is time for keys dissemination (KD) phase drop(data); + self.complete_initialization(message.derived_point.clone().into())?; self.disseminate_keys() } @@ -382,6 +362,11 @@ impl SessionImpl { let mut data = self.data.lock(); + // simulate failure, if required + if data.simulate_faulty_behaviour { + return Err(Error::Io("simulated error".into())); + } + // check state if data.state != SessionState::WaitingForKeysDissemination { match data.state { @@ -414,150 +399,8 @@ impl SessionImpl { return Ok(()) } - // key verification (KV) phase: check that other nodes have passed correct secrets - let derived_point = data.derived_point.clone().expect("derived point generated on initialization phase; KV phase follows initialization phase; qed"); - let number_id = data.nodes[self.node()].id_number.clone(); - for (node_id, node_data) in data.nodes.iter_mut().filter(|&(node_id, _)| node_id != self.node()) { - let secret1 = node_data.secret1.as_ref().expect("keys received on KD phase; KV phase follows KD phase; qed"); - let secret2 = node_data.secret2.as_ref().expect("keys received on KD phase; KV phase follows KD phase; qed"); - let publics = node_data.publics.as_ref().expect("keys received on KD phase; KV phase follows KD phase; qed"); - let is_key_verification_ok = math::keys_verification(threshold, &derived_point, &number_id, - secret1, secret2, publics)?; - - if !is_key_verification_ok { - node_data.complaints.insert(self.node().clone()); - self.cluster.broadcast(Message::Encryption(EncryptionMessage::Complaint(Complaint { - session: self.id.clone().into(), - against: node_id.clone().into(), - })))?; - } - } - - // update state - data.state = SessionState::KeyCheck; - - Ok(()) - } - - /// When complaint is received. - pub fn on_complaint(&self, sender: NodeId, message: &Complaint) -> Result<(), Error> { - debug_assert!(self.id == *message.session); - debug_assert!(&sender != self.node()); - - let mut data = self.data.lock(); - debug_assert!(data.nodes.contains_key(&sender)); - - // check state - if data.state != SessionState::KeyCheck && data.state != SessionState::WaitingForKeysDissemination { - // message can be received after timeout is passed - // => encryption is already in progress - // => we can not treat it as an error - return Ok(()); - } - - // respond to complaint - if &*message.against == self.node() { - let secret1_sent = data.nodes[&sender].secret1_sent.clone().expect("secrets were sent on KD phase; KC phase follows KD phase; qed"); - let secret2_sent = data.nodes[&sender].secret2_sent.clone().expect("secrets were sent on KD phase; KC phase follows KD phase; qed"); - - // someone is complaining against us => let's respond - return self.cluster.broadcast(Message::Encryption(EncryptionMessage::ComplaintResponse(ComplaintResponse { - session: self.id.clone().into(), - secret1: secret1_sent.into(), - secret2: secret2_sent.into(), - }))); - } - - // someone is complaining against someone else => let's remember this - let threshold = data.threshold.expect("threshold is filled in initialization phase; KD phase follows initialization phase; qed"); - let is_critical_complaints_num = { - let node = data.nodes.get_mut(&message.against).ok_or(Error::InvalidMessage)?; - node.complaints.insert(sender); - node.complaints.len() >= threshold + 1 - }; - - if is_critical_complaints_num { - // too many complaints => exclude from session - SessionImpl::disqualify_node(&message.against, &*self.cluster, &mut *data); - } - - Ok(()) - } - - /// When complaint response is received - pub fn on_complaint_response(&self, sender: NodeId, message: &ComplaintResponse) -> Result<(), Error> { - debug_assert!(self.id == *message.session); - debug_assert!(&sender != self.node()); - - let mut data = self.data.lock(); - debug_assert!(data.nodes.contains_key(&sender)); - - // check state - if data.state != SessionState::KeyCheck { - // in theory this message can be received before KeyCheck (in WaitingForKeysDissemination state) - // => but the fact that someone is complaining about keys means that sender has already sent its keys - // => complaint response can't reach us before keys, as the cluster guarantees that messages are received in FIFO order - - // message can be received after timeout is passed - // => encryption is already in progress - // => we can not treat it as an error - return Ok(()); - } - - // check keys again - let is_key_verification_ok = { - let threshold = data.threshold.expect("threshold is filled in initialization phase; KD phase follows initialization phase; qed"); - let derived_point = data.derived_point.as_ref().expect("derived point generated on initialization phase; KV phase follows initialization phase; qed"); - let number_id = &data.nodes[self.node()].id_number; - let node_data = data.nodes.get(&sender).expect("cluster guarantees to deliver messages from qualified nodes only; qed"); - let publics = node_data.publics.as_ref().expect("keys received on KD phase; KV phase follows KD phase; qed"); - math::keys_verification(threshold, derived_point, number_id, &message.secret1, &message.secret2, publics)? - }; - - if !is_key_verification_ok { - SessionImpl::disqualify_node(&sender, &*self.cluster, &mut *data); - } else { - let node_data = data.nodes.get_mut(&sender).expect("cluster guarantees to deliver messages from qualified nodes only; qed"); - node_data.secret1 = Some(message.secret1.clone().into()); - node_data.secret2 = Some(message.secret2.clone().into()); - node_data.complaints.remove(self.node()); - } - - Ok(()) - } - - /// When KC-phase timeout is expired, it is time to start KG phase. - pub fn start_key_generation_phase(&self) -> Result<(), Error> { - let mut data = self.data.lock(); - - if data.state != SessionState::KeyCheck { - return Err(Error::InvalidStateForRequest); - } - - // calculate public share - let self_public_share = { - let self_secret_coeff = data.secret_coeff.as_ref().expect("secret_coeff is generated on KD phase; KG phase follows KD phase; qed"); - math::compute_public_share(self_secret_coeff)? - }; - - // calculate self secret + public shares - let self_secret_share = { - let secret_values_iter = data.nodes.values() - .map(|n| n.secret1.as_ref().expect("keys received on KD phase; KG phase follows KD phase; qed")); - math::compute_secret_share(secret_values_iter)? - }; - - // update state - data.state = SessionState::WaitingForPublicKeyShare; - data.secret_share = Some(self_secret_share); - let self_node = data.nodes.get_mut(self.node()).expect("node is always qualified by himself; qed"); - self_node.public_share = Some(self_public_share.clone()); - - // broadcast self public key share - self.cluster.broadcast(Message::Encryption(EncryptionMessage::PublicKeyShare(PublicKeyShare { - session: self.id.clone().into(), - public_share: self_public_share.into(), - }))) + drop(data); + self.verify_keys() } /// When public key share is received. @@ -568,8 +411,7 @@ impl SessionImpl { if data.state != SessionState::WaitingForPublicKeyShare { match data.state { SessionState::WaitingForInitializationComplete | - SessionState::WaitingForKeysDissemination | - SessionState::KeyCheck => return Err(Error::TooEarlyForRequest), + SessionState::WaitingForKeysDissemination => return Err(Error::TooEarlyForRequest), _ => return Err(Error::InvalidStateForRequest), } } @@ -589,52 +431,8 @@ impl SessionImpl { return Ok(()); } - // else - calculate joint public key - let joint_public = { - let public_shares = data.nodes.values().map(|n| n.public_share.as_ref().expect("keys received on KD phase; KG phase follows KD phase; qed")); - math::compute_joint_public(public_shares)? - }; - - // if we are at the slave node - wait for session completion - if data.master.as_ref() != Some(self.node()) { - data.joint_public = Some(Ok(joint_public)); - data.state = SessionState::WaitingForEncryptionConfirmation; - return Ok(()); - } - - // then generate secret point - // then encrypt secret point with joint public key - let secret_point = math::generate_random_point()?; - let encrypted_secret_point = math::encrypt_secret(&secret_point, &joint_public)?; - - // then save encrypted data to the key storage - let encrypted_data = DocumentKeyShare { - threshold: data.threshold.expect("threshold is filled in initialization phase; KG phase follows initialization phase; qed"), - id_numbers: data.nodes.iter().map(|(node_id, node_data)| (node_id.clone(), node_data.id_number.clone())).collect(), - secret_share: data.secret_share.as_ref().expect("secret_share is filled in KG phase; we are at the end of KG phase; qed").clone(), - common_point: encrypted_secret_point.common_point, - encrypted_point: encrypted_secret_point.encrypted_point, - }; - self.key_storage.insert(self.id.clone(), encrypted_data.clone()) - .map_err(|e| Error::KeyStorage(e.into()))?; - - // then distribute encrypted data to every other node - self.cluster.broadcast(Message::Encryption(EncryptionMessage::SessionCompleted(SessionCompleted { - session: self.id.clone().into(), - common_point: encrypted_data.common_point.clone().into(), - encrypted_point: encrypted_data.encrypted_point.clone().into(), - })))?; - - // then wait for confirmation from all other nodes - { - let self_node = data.nodes.get_mut(self.node()).expect("node is always qualified by himself; qed"); - self_node.completion_confirmed = true; - } - data.joint_public = Some(Ok(joint_public)); - data.secret_point = Some(Ok(secret_point)); - data.state = SessionState::WaitingForEncryptionConfirmation; - - Ok(()) + drop(data); + self.complete_encryption() } /// When session completion message is received. @@ -703,35 +501,63 @@ impl SessionImpl { } /// When error has occured on another node. - pub fn on_session_error(&self, sender: NodeId, message: &SessionError) { - warn!("{}: encryption session error: {:?} from {}", self.node(), message, sender); + pub fn on_session_error(&self, sender: NodeId, message: &SessionError) -> Result<(), Error> { let mut data = self.data.lock(); + + warn!("{}: encryption session failed with error: {} from {}", self.node(), message.error, sender); + data.state = SessionState::Failed; data.joint_public = Some(Err(Error::Io(message.error.clone()))); data.secret_point = Some(Err(Error::Io(message.error.clone()))); self.completed.notify_all(); + + Ok(()) + } + + /// When connection to one of cluster nodes has timeouted. + pub fn on_node_timeout(&self, node: &NodeId) { + let mut data = self.data.lock(); + + // all nodes are required for encryption session + // => fail without check + warn!("{}: encryption session failed because {} connection has timeouted", self.node(), node); + + data.state = SessionState::Failed; + data.joint_public = Some(Err(Error::NodeDisconnected)); + data.secret_point = Some(Err(Error::NodeDisconnected)); + self.completed.notify_all(); } /// When session timeout has occured. - pub fn on_session_timeout(&self, node: &NodeId) { - warn!("{}: encryption session timeout", self.node()); + pub fn on_session_timeout(&self) { let mut data = self.data.lock(); - match data.state { - SessionState::WaitingForInitialization | - SessionState::WaitingForInitializationConfirm(_) | - SessionState::WaitingForInitializationComplete => (), - _ => if !data.nodes.contains_key(node) { - return; - }, - } + warn!("{}: encryption session failed with timeout", self.node()); data.state = SessionState::Failed; - data.joint_public = Some(Err(Error::Io("session expired".into()))); - data.secret_point = Some(Err(Error::Io("session expired".into()))); + data.joint_public = Some(Err(Error::NodeDisconnected)); + data.secret_point = Some(Err(Error::NodeDisconnected)); self.completed.notify_all(); } + /// Complete initialization (when all other nodex has responded with confirmation) + fn complete_initialization(&self, mut derived_point: Public) -> Result<(), Error> { + // update point once again to make sure that derived point is not generated by last node + math::update_random_point(&mut derived_point)?; + + // remember derived point + let mut data = self.data.lock(); + data.derived_point = Some(derived_point.clone().into()); + + // broadcast derived point && other session paraeters to every other node + self.cluster.broadcast(Message::Encryption(EncryptionMessage::CompleteInitialization(CompleteInitialization { + session: self.id.clone().into(), + nodes: data.nodes.iter().map(|(id, data)| (id.clone().into(), data.id_number.clone().into())).collect(), + threshold: data.threshold.expect("threshold is filled in initialization phase; KD phase follows initialization phase; qed"), + derived_point: derived_point.into(), + }))) + } + /// Keys dissemination (KD) phase fn disseminate_keys(&self) -> Result<(), Error> { let mut data = self.data.lock(); @@ -777,34 +603,124 @@ impl SessionImpl { Ok(()) } - /// Disqualify node - fn disqualify_node(node: &NodeId, cluster: &Cluster, data: &mut SessionData) { - let threshold = data.threshold - .expect("threshold is filled on initialization phase; node can only be disqualified during KC phase; KC phase follows initialization phase; qed"); + /// Keys verification (KV) phase + fn verify_keys(&self) -> Result<(), Error> { + let mut data = self.data.lock(); + + // key verification (KV) phase: check that other nodes have passed correct secrets + let threshold = data.threshold.expect("threshold is filled in initialization phase; KV phase follows initialization phase; qed"); + let derived_point = data.derived_point.clone().expect("derived point generated on initialization phase; KV phase follows initialization phase; qed"); + let number_id = data.nodes[self.node()].id_number.clone(); + for (_ , node_data) in data.nodes.iter_mut().filter(|&(node_id, _)| node_id != self.node()) { + let secret1 = node_data.secret1.as_ref().expect("keys received on KD phase; KV phase follows KD phase; qed"); + let secret2 = node_data.secret2.as_ref().expect("keys received on KD phase; KV phase follows KD phase; qed"); + let publics = node_data.publics.as_ref().expect("keys received on KD phase; KV phase follows KD phase; qed"); + let is_key_verification_ok = math::keys_verification(threshold, &derived_point, &number_id, + secret1, secret2, publics)?; - // blacklist node - cluster.blacklist(&node); - // too many complaints => exclude from session - data.nodes.remove(&node); - // check if secret still can be reconstructed - if data.nodes.len() < threshold + 1 { - // not enough nodes => session is failed - data.state = SessionState::Failed; + if !is_key_verification_ok { + // node has sent us incorrect values. In original ECDKG protocol we should have sent complaint here. + return Err(Error::InvalidMessage); + } } + + // calculate public share + let self_public_share = { + let self_secret_coeff = data.secret_coeff.as_ref().expect("secret_coeff is generated on KD phase; KG phase follows KD phase; qed"); + math::compute_public_share(self_secret_coeff)? + }; + + // calculate self secret + public shares + let self_secret_share = { + let secret_values_iter = data.nodes.values() + .map(|n| n.secret1.as_ref().expect("keys received on KD phase; KG phase follows KD phase; qed")); + math::compute_secret_share(secret_values_iter)? + }; + + // update state + data.state = SessionState::WaitingForPublicKeyShare; + data.secret_share = Some(self_secret_share); + let self_node = data.nodes.get_mut(self.node()).expect("node is always qualified by himself; qed"); + self_node.public_share = Some(self_public_share.clone()); + + // broadcast self public key share + self.cluster.broadcast(Message::Encryption(EncryptionMessage::PublicKeyShare(PublicKeyShare { + session: self.id.clone().into(), + public_share: self_public_share.into(), + }))) + } + + /// Complete encryption + fn complete_encryption(&self) -> Result<(), Error> { + let mut data = self.data.lock(); + + // else - calculate joint public key + let joint_public = { + let public_shares = data.nodes.values().map(|n| n.public_share.as_ref().expect("keys received on KD phase; KG phase follows KD phase; qed")); + math::compute_joint_public(public_shares)? + }; + + // if we are at the slave node - wait for session completion + if data.master.as_ref() != Some(self.node()) { + data.joint_public = Some(Ok(joint_public)); + data.state = SessionState::WaitingForEncryptionConfirmation; + return Ok(()); + } + + // then generate secret point + // then encrypt secret point with joint public key + // TODO: secret is revealed to KeyServer here + let secret_point = math::generate_random_point()?; + let encrypted_secret_point = math::encrypt_secret(&secret_point, &joint_public)?; + + // then save encrypted data to the key storage + let encrypted_data = DocumentKeyShare { + threshold: data.threshold.expect("threshold is filled in initialization phase; KG phase follows initialization phase; qed"), + id_numbers: data.nodes.iter().map(|(node_id, node_data)| (node_id.clone(), node_data.id_number.clone())).collect(), + secret_share: data.secret_share.as_ref().expect("secret_share is filled in KG phase; we are at the end of KG phase; qed").clone(), + common_point: encrypted_secret_point.common_point, + encrypted_point: encrypted_secret_point.encrypted_point, + }; + self.key_storage.insert(self.id.clone(), encrypted_data.clone()) + .map_err(|e| Error::KeyStorage(e.into()))?; + + // then distribute encrypted data to every other node + self.cluster.broadcast(Message::Encryption(EncryptionMessage::SessionCompleted(SessionCompleted { + session: self.id.clone().into(), + common_point: encrypted_data.common_point.clone().into(), + encrypted_point: encrypted_data.encrypted_point.clone().into(), + })))?; + + // then wait for confirmation from all other nodes + { + let self_node = data.nodes.get_mut(self.node()).expect("node is always qualified by himself; qed"); + self_node.completion_confirmed = true; + } + data.joint_public = Some(Ok(joint_public)); + data.secret_point = Some(Ok(secret_point)); + data.state = SessionState::WaitingForEncryptionConfirmation; + + Ok(()) } } impl Session for SessionImpl { #[cfg(test)] - fn joint_public_key(&self) -> Option { - self.data.lock().joint_public.clone().and_then(|r| r.ok()) + fn joint_public_key(&self) -> Option> { + self.data.lock().joint_public.clone() } + fn state(&self) -> SessionState { + self.data.lock().state.clone() + } - fn wait(&self) -> Result { + fn wait(&self, timeout: Option) -> Result { let mut data = self.data.lock(); if !data.secret_point.is_some() { - self.completed.wait(&mut data); + match timeout { + None => self.completed.wait(&mut data), + Some(timeout) => { self.completed.wait_for(&mut data, timeout); }, + } } data.secret_point.as_ref() @@ -842,7 +758,6 @@ impl NodeData { fn with_id_number(node_id_number: Secret) -> Self { NodeData { id_number: node_id_number, - complaints: BTreeSet::new(), secret1_sent: None, secret2_sent: None, secret1: None, @@ -862,7 +777,7 @@ impl Debug for SessionImpl { pub fn check_cluster_nodes(self_node_id: &NodeId, nodes: &BTreeSet) -> Result<(), Error> { // at least two nodes must be in cluster - if nodes.len() < 2 { + if nodes.len() < 1 { return Err(Error::InvalidNodesCount); } // this node must be a part of cluster @@ -952,10 +867,6 @@ mod tests { &self.nodes.values().nth(2).unwrap().session } - pub fn third_slave(&self) -> &SessionImpl { - &self.nodes.values().nth(3).unwrap().session - } - pub fn take_message(&mut self) -> Option<(NodeId, NodeId, Message)> { self.nodes.values() .filter_map(|n| n.cluster.take_message().map(|m| (n.session.node().clone(), m.0, m.1))) @@ -970,8 +881,6 @@ mod tests { Message::Encryption(EncryptionMessage::ConfirmInitialization(ref message)) => self.nodes[&msg.1].session.on_confirm_initialization(msg.0.clone(), &message), Message::Encryption(EncryptionMessage::CompleteInitialization(ref message)) => self.nodes[&msg.1].session.on_complete_initialization(msg.0.clone(), &message), Message::Encryption(EncryptionMessage::KeysDissemination(ref message)) => self.nodes[&msg.1].session.on_keys_dissemination(msg.0.clone(), &message), - Message::Encryption(EncryptionMessage::Complaint(ref message)) => self.nodes[&msg.1].session.on_complaint(msg.0.clone(), &message), - Message::Encryption(EncryptionMessage::ComplaintResponse(ref message)) => self.nodes[&msg.1].session.on_complaint_response(msg.0.clone(), &message), Message::Encryption(EncryptionMessage::PublicKeyShare(ref message)) => self.nodes[&msg.1].session.on_public_key_share(msg.0.clone(), &message), Message::Encryption(EncryptionMessage::SessionCompleted(ref message)) => self.nodes[&msg.1].session.on_session_completed(msg.0.clone(), &message), _ => panic!("unexpected"), @@ -990,13 +899,6 @@ mod tests { let msg = self.take_message().unwrap(); self.process_message(msg) } - - pub fn take_and_process_all_messages(&mut self) -> Result<(), Error> { - while let Some(msg) = self.take_message() { - self.process_message(msg)?; - } - Ok(()) - } } fn make_simple_cluster(threshold: usize, num_nodes: usize) -> Result<(SessionId, NodeId, NodeId, MessageLoop), Error> { @@ -1010,8 +912,9 @@ mod tests { } #[test] - fn fails_to_initialize_in_cluster_of_single_node() { - assert_eq!(make_simple_cluster(0, 1).unwrap_err(), Error::InvalidNodesCount); + fn initializes_in_cluster_of_single_node() { + let l = MessageLoop::new(1); + assert!(l.master().initialize(0, l.nodes.keys().cloned().collect()).is_ok()); } #[test] @@ -1106,19 +1009,6 @@ mod tests { assert!(l.master().derived_point().unwrap() != passed_point.into()); } - #[test] - fn fails_to_complete_initialization_in_cluster_of_single_node() { - let (sid, m, s, l) = make_simple_cluster(0, 2).unwrap(); - let mut nodes = BTreeMap::new(); - nodes.insert(s, math::generate_random_scalar().unwrap()); - assert_eq!(l.first_slave().on_complete_initialization(m, &message::CompleteInitialization { - session: sid.into(), - nodes: nodes.into_iter().map(|(k, v)| (k.into(), v.into())).collect(), - threshold: 0, - derived_point: math::generate_random_point().unwrap().into(), - }).unwrap_err(), Error::InvalidNodesCount); - } - #[test] fn fails_to_complete_initialization_if_not_a_part_of_cluster() { let (sid, m, _, l) = make_simple_cluster(0, 2).unwrap(); @@ -1227,119 +1117,6 @@ mod tests { }).unwrap_err(), Error::InvalidStateForRequest); } - #[test] - fn defends_if_receives_complain_on_himself() { - let (sid, m, s, mut l) = make_simple_cluster(1, 3).unwrap(); - l.take_and_process_all_messages().unwrap(); - l.master().on_complaint(s, &message::Complaint { - session: sid.into(), - against: m.into(), - }).unwrap(); - match l.take_message().unwrap() { - (_, _, Message::Encryption(EncryptionMessage::ComplaintResponse(_))) => (), - _ => panic!("unexpected"), - } - } - - #[test] - fn node_is_disqualified_if_enough_complaints_received() { - let (sid, _, s, mut l) = make_simple_cluster(1, 4).unwrap(); - l.take_and_process_all_messages().unwrap(); - l.master().on_complaint(l.second_slave().node().clone(), &message::Complaint { - session: sid.into(), - against: s.clone().into(), - }).unwrap(); - l.master().on_complaint(l.third_slave().node().clone(), &message::Complaint { - session: sid.into(), - against: s.into(), - }).unwrap(); - assert_eq!(l.master().qualified_nodes().len(), 3); - } - - #[test] - fn node_is_not_disqualified_if_enough_complaints_received_from_the_same_node() { - let (sid, _, s, mut l) = make_simple_cluster(1, 4).unwrap(); - l.take_and_process_all_messages().unwrap(); - l.master().on_complaint(l.second_slave().node().clone(), &message::Complaint { - session: sid.into(), - against: s.clone().into(), - }).unwrap(); - l.master().on_complaint(l.second_slave().node().clone(), &message::Complaint { - session: sid.into(), - against: s.into(), - }).unwrap(); - assert_eq!(l.master().qualified_nodes().len(), 4); - } - - #[test] - fn node_is_disqualified_if_responds_to_complain_with_invalid_data() { - let (sid, _, _, mut l) = make_simple_cluster(1, 3).unwrap(); - l.take_and_process_message().unwrap(); // m -> s1: InitializeSession - l.take_and_process_message().unwrap(); // m -> s2: InitializeSession - l.take_and_process_message().unwrap(); // s1 -> m: ConfirmInitialization - l.take_and_process_message().unwrap(); // s2 -> m: ConfirmInitialization - l.take_and_process_message().unwrap(); // m -> s1: CompleteInitialization - l.take_and_process_message().unwrap(); // m -> s2: CompleteInitialization - l.take_and_process_message().unwrap(); // m -> s1: KeysDissemination - l.take_and_process_message().unwrap(); // m -> s2: KeysDissemination - l.take_and_process_message().unwrap(); // s1 -> m: KeysDissemination - l.take_and_process_message().unwrap(); // s1 -> s2: KeysDissemination - let s2 = l.second_slave().node().clone(); - l.master().on_keys_dissemination(s2.clone(), &message::KeysDissemination { - session: sid.clone().into(), - secret1: math::generate_random_scalar().unwrap().into(), - secret2: math::generate_random_scalar().unwrap().into(), - publics: vec![math::generate_random_point().unwrap().into(), math::generate_random_point().unwrap().into()], - }).unwrap(); - assert_eq!(l.master().qualified_nodes().len(), 3); - l.master().on_complaint_response(s2, &message::ComplaintResponse { - session: sid.into(), - secret1: math::generate_random_scalar().unwrap().into(), - secret2: math::generate_random_scalar().unwrap().into(), - }).unwrap(); - assert_eq!(l.master().qualified_nodes().len(), 2); - } - - #[test] - fn node_is_not_disqualified_if_responds_to_complain_with_valid_data() { - let (sid, _, _, mut l) = make_simple_cluster(1, 3).unwrap(); - l.take_and_process_message().unwrap(); // m -> s1: InitializeSession - l.take_and_process_message().unwrap(); // m -> s2: InitializeSession - l.take_and_process_message().unwrap(); // s1 -> m: ConfirmInitialization - l.take_and_process_message().unwrap(); // s2 -> m: ConfirmInitialization - l.take_and_process_message().unwrap(); // m -> s1: CompleteInitialization - l.take_and_process_message().unwrap(); // m -> s2: CompleteInitialization - l.take_and_process_message().unwrap(); // m -> s1: KeysDissemination - l.take_and_process_message().unwrap(); // m -> s2: KeysDissemination - l.take_and_process_message().unwrap(); // s1 -> m: KeysDissemination - l.take_and_process_message().unwrap(); // s1 -> s2: KeysDissemination - let (f, t, msg) = match l.take_message() { - Some((f, t, Message::Encryption(EncryptionMessage::KeysDissemination(msg)))) => (f, t, msg), - _ => panic!("unexpected"), - }; - assert_eq!(&f, l.second_slave().node()); - assert_eq!(&t, l.master().node()); - l.master().on_keys_dissemination(f.clone(), &message::KeysDissemination { - session: sid.clone().into(), - secret1: math::generate_random_scalar().unwrap().into(), - secret2: math::generate_random_scalar().unwrap().into(), - publics: msg.publics.clone().into(), - }).unwrap(); - assert_eq!(l.master().qualified_nodes().len(), 3); - l.master().on_complaint_response(f, &message::ComplaintResponse { - session: sid.into(), - secret1: msg.secret1.into(), - secret2: msg.secret2.into(), - }).unwrap(); - assert_eq!(l.master().qualified_nodes().len(), 3); - } - - #[test] - fn should_not_start_key_generation_when_not_in_key_check_state() { - let (_, _, _, l) = make_simple_cluster(1, 3).unwrap(); - assert_eq!(l.master().start_key_generation_phase().unwrap_err(), Error::InvalidStateForRequest); - } - #[test] fn should_not_accept_public_key_share_when_is_not_waiting_for_it() { let (sid, _, s, l) = make_simple_cluster(1, 3).unwrap(); @@ -1352,22 +1129,48 @@ mod tests { #[test] fn should_not_accept_public_key_share_when_receiving_twice() { let (sid, m, _, mut l) = make_simple_cluster(0, 3).unwrap(); - l.take_and_process_all_messages().unwrap(); - l.master().start_key_generation_phase().unwrap(); - l.first_slave().start_key_generation_phase().unwrap(); + l.take_and_process_message().unwrap(); // m -> s1: InitializeSession + l.take_and_process_message().unwrap(); // m -> s2: InitializeSession + l.take_and_process_message().unwrap(); // s1 -> m: ConfirmInitialization + l.take_and_process_message().unwrap(); // s2 -> m: ConfirmInitialization + l.take_and_process_message().unwrap(); // m -> s1: CompleteInitialization + l.take_and_process_message().unwrap(); // m -> s2: CompleteInitialization + l.take_and_process_message().unwrap(); // m -> s1: KeysDissemination + l.take_and_process_message().unwrap(); // m -> s2: KeysDissemination + l.take_and_process_message().unwrap(); // s1 -> m: KeysDissemination + l.take_and_process_message().unwrap(); // s1 -> s2: KeysDissemination + l.take_and_process_message().unwrap(); // s2 -> m: KeysDissemination + l.take_and_process_message().unwrap(); // s2 -> s1: KeysDissemination let (f, t, msg) = match l.take_message() { Some((f, t, Message::Encryption(EncryptionMessage::PublicKeyShare(msg)))) => (f, t, msg), _ => panic!("unexpected"), }; assert_eq!(&f, l.master().node()); - assert_eq!(&t, l.first_slave().node()); + assert_eq!(&t, l.second_slave().node()); l.process_message((f, t, Message::Encryption(EncryptionMessage::PublicKeyShare(msg.clone())))).unwrap(); - assert_eq!(l.first_slave().on_public_key_share(m, &message::PublicKeyShare { + assert_eq!(l.second_slave().on_public_key_share(m, &message::PublicKeyShare { session: sid.into(), public_share: math::generate_random_point().unwrap().into(), }).unwrap_err(), Error::InvalidMessage); } + + #[test] + fn encryption_fails_on_session_timeout() { + let (_, _, _, l) = make_simple_cluster(0, 2).unwrap(); + assert!(l.master().joint_public_key().is_none()); + l.master().on_session_timeout(); + assert!(l.master().joint_public_key().unwrap().unwrap_err() == Error::NodeDisconnected); + } + + #[test] + fn encryption_fails_on_node_timeout() { + let (_, _, _, l) = make_simple_cluster(0, 2).unwrap(); + assert!(l.master().joint_public_key().is_none()); + l.master().on_node_timeout(l.first_slave().node()); + assert!(l.master().joint_public_key().unwrap().unwrap_err() == Error::NodeDisconnected); + } + #[test] fn complete_enc_dec_session() { let test_cases = [(0, 5), (2, 5), (3, 5)]; @@ -1381,26 +1184,12 @@ mod tests { l.process_message((from, to, message)).unwrap(); } - // check that all nodes are waiting for complaint timeout is passed - for node in l.nodes.values() { - let state = node.session.state(); - assert_eq!(state, SessionState::KeyCheck); - - // simulate timeout pass - node.session.start_key_generation_phase().unwrap(); - } - - // let nodes do joint public generation - while let Some((from, to, message)) = l.take_message() { - l.process_message((from, to, message)).unwrap(); - } - // check that all nodes has finished joint public generation - let joint_public_key = l.master().joint_public_key().unwrap(); + let joint_public_key = l.master().joint_public_key().unwrap().unwrap(); for node in l.nodes.values() { let state = node.session.state(); assert_eq!(state, SessionState::Finished); - assert_eq!(node.session.joint_public_key().as_ref(), Some(&joint_public_key)); + assert_eq!(node.session.joint_public_key().as_ref(), Some(&Ok(joint_public_key))); } // now let's encrypt some secret (which is a point on EC) @@ -1417,8 +1206,6 @@ mod tests { } } - // TODO: add test where some nodes are disqualified from session - #[test] fn encryption_session_works_over_network() { //::util::log::init_log(); @@ -1428,7 +1215,7 @@ mod tests { let mut core = Core::new().unwrap(); // prepare cluster objects for each node - let clusters = make_clusters(&core, 6020, num_nodes); + let clusters = make_clusters(&core, 6022, num_nodes); run_clusters(&clusters); // establish connections diff --git a/secret_store/src/key_server_cluster/io/message.rs b/secret_store/src/key_server_cluster/io/message.rs index fcc605a8e..95d3a54cb 100644 --- a/secret_store/src/key_server_cluster/io/message.rs +++ b/secret_store/src/key_server_cluster/io/message.rs @@ -71,17 +71,16 @@ pub fn serialize_message(message: Message) -> Result { Message::Encryption(EncryptionMessage::ConfirmInitialization(payload)) => (51, serde_json::to_vec(&payload)), Message::Encryption(EncryptionMessage::CompleteInitialization(payload)) => (52, serde_json::to_vec(&payload)), Message::Encryption(EncryptionMessage::KeysDissemination(payload)) => (53, serde_json::to_vec(&payload)), - Message::Encryption(EncryptionMessage::Complaint(payload)) => (54, serde_json::to_vec(&payload)), - Message::Encryption(EncryptionMessage::ComplaintResponse(payload)) => (55, serde_json::to_vec(&payload)), - Message::Encryption(EncryptionMessage::PublicKeyShare(payload)) => (56, serde_json::to_vec(&payload)), - Message::Encryption(EncryptionMessage::SessionError(payload)) => (57, serde_json::to_vec(&payload)), - Message::Encryption(EncryptionMessage::SessionCompleted(payload)) => (58, serde_json::to_vec(&payload)), + Message::Encryption(EncryptionMessage::PublicKeyShare(payload)) => (54, serde_json::to_vec(&payload)), + Message::Encryption(EncryptionMessage::SessionError(payload)) => (55, serde_json::to_vec(&payload)), + Message::Encryption(EncryptionMessage::SessionCompleted(payload)) => (56, serde_json::to_vec(&payload)), Message::Decryption(DecryptionMessage::InitializeDecryptionSession(payload)) => (100, serde_json::to_vec(&payload)), Message::Decryption(DecryptionMessage::ConfirmDecryptionInitialization(payload)) => (101, serde_json::to_vec(&payload)), Message::Decryption(DecryptionMessage::RequestPartialDecryption(payload)) => (102, serde_json::to_vec(&payload)), Message::Decryption(DecryptionMessage::PartialDecryption(payload)) => (103, serde_json::to_vec(&payload)), Message::Decryption(DecryptionMessage::DecryptionSessionError(payload)) => (104, serde_json::to_vec(&payload)), + Message::Decryption(DecryptionMessage::DecryptionSessionCompleted(payload)) => (105, serde_json::to_vec(&payload)), }; let payload = payload.map_err(|err| Error::Serde(err.to_string()))?; @@ -104,17 +103,16 @@ pub fn deserialize_message(header: &MessageHeader, payload: Vec) -> Result Message::Encryption(EncryptionMessage::ConfirmInitialization(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)), 52 => Message::Encryption(EncryptionMessage::CompleteInitialization(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)), 53 => Message::Encryption(EncryptionMessage::KeysDissemination(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)), - 54 => Message::Encryption(EncryptionMessage::Complaint(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)), - 55 => Message::Encryption(EncryptionMessage::ComplaintResponse(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)), - 56 => Message::Encryption(EncryptionMessage::PublicKeyShare(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)), - 57 => Message::Encryption(EncryptionMessage::SessionError(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)), - 58 => Message::Encryption(EncryptionMessage::SessionCompleted(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)), + 54 => Message::Encryption(EncryptionMessage::PublicKeyShare(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)), + 55 => Message::Encryption(EncryptionMessage::SessionError(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)), + 56 => Message::Encryption(EncryptionMessage::SessionCompleted(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)), 100 => Message::Decryption(DecryptionMessage::InitializeDecryptionSession(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)), 101 => Message::Decryption(DecryptionMessage::ConfirmDecryptionInitialization(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)), 102 => Message::Decryption(DecryptionMessage::RequestPartialDecryption(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)), 103 => Message::Decryption(DecryptionMessage::PartialDecryption(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)), 104 => Message::Decryption(DecryptionMessage::DecryptionSessionError(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)), + 105 => Message::Decryption(DecryptionMessage::DecryptionSessionCompleted(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)), _ => return Err(Error::Serde(format!("unknown message type {}", header.kind))), }) diff --git a/secret_store/src/key_server_cluster/message.rs b/secret_store/src/key_server_cluster/message.rs index d817c88a4..af3a113fe 100644 --- a/secret_store/src/key_server_cluster/message.rs +++ b/secret_store/src/key_server_cluster/message.rs @@ -58,10 +58,6 @@ pub enum EncryptionMessage { CompleteInitialization(CompleteInitialization), /// Generated keys are sent to every node. KeysDissemination(KeysDissemination), - /// Complaint against another node is broadcasted. - Complaint(Complaint), - /// Complaint response is broadcasted. - ComplaintResponse(ComplaintResponse), /// Broadcast self public key portion. PublicKeyShare(PublicKeyShare), /// When session error has occured. @@ -83,6 +79,8 @@ pub enum DecryptionMessage { PartialDecryption(PartialDecryption), /// When decryption session error has occured. DecryptionSessionError(DecryptionSessionError), + /// When decryption session is completed. + DecryptionSessionCompleted(DecryptionSessionCompleted), } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -160,26 +158,6 @@ pub struct KeysDissemination { pub publics: Vec, } -#[derive(Clone, Debug, Serialize, Deserialize)] -/// Complaint against node is broadcasted. -pub struct Complaint { - /// Session Id. - pub session: MessageSessionId, - /// Public values. - pub against: MessageNodeId, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -/// Node is responding to complaint. -pub struct ComplaintResponse { - /// Session Id. - pub session: MessageSessionId, - /// Secret 1. - pub secret1: SerializableSecret, - /// Secret 2. - pub secret2: SerializableSecret, -} - #[derive(Clone, Debug, Serialize, Deserialize)] /// Node is sharing its public key share. pub struct PublicKeyShare { @@ -269,6 +247,15 @@ pub struct DecryptionSessionError { pub error: String, } +#[derive(Clone, Debug, Serialize, Deserialize)] +/// When decryption session is completed. +pub struct DecryptionSessionCompleted { + /// Encryption session Id. + pub session: MessageSessionId, + /// Decryption session Id. + pub sub_session: SerializableSecret, +} + impl EncryptionMessage { pub fn session_id(&self) -> &SessionId { match *self { @@ -276,8 +263,6 @@ impl EncryptionMessage { EncryptionMessage::ConfirmInitialization(ref msg) => &msg.session, EncryptionMessage::CompleteInitialization(ref msg) => &msg.session, EncryptionMessage::KeysDissemination(ref msg) => &msg.session, - EncryptionMessage::Complaint(ref msg) => &msg.session, - EncryptionMessage::ComplaintResponse(ref msg) => &msg.session, EncryptionMessage::PublicKeyShare(ref msg) => &msg.session, EncryptionMessage::SessionError(ref msg) => &msg.session, EncryptionMessage::SessionCompleted(ref msg) => &msg.session, @@ -293,6 +278,7 @@ impl DecryptionMessage { DecryptionMessage::RequestPartialDecryption(ref msg) => &msg.session, DecryptionMessage::PartialDecryption(ref msg) => &msg.session, DecryptionMessage::DecryptionSessionError(ref msg) => &msg.session, + DecryptionMessage::DecryptionSessionCompleted(ref msg) => &msg.session, } } @@ -303,6 +289,7 @@ impl DecryptionMessage { DecryptionMessage::RequestPartialDecryption(ref msg) => &msg.sub_session, DecryptionMessage::PartialDecryption(ref msg) => &msg.sub_session, DecryptionMessage::DecryptionSessionError(ref msg) => &msg.sub_session, + DecryptionMessage::DecryptionSessionCompleted(ref msg) => &msg.sub_session, } } } @@ -335,8 +322,6 @@ impl fmt::Display for EncryptionMessage { EncryptionMessage::ConfirmInitialization(_) => write!(f, "ConfirmInitialization"), EncryptionMessage::CompleteInitialization(_) => write!(f, "CompleteInitialization"), EncryptionMessage::KeysDissemination(_) => write!(f, "KeysDissemination"), - EncryptionMessage::Complaint(_) => write!(f, "Complaint"), - EncryptionMessage::ComplaintResponse(_) => write!(f, "ComplaintResponse"), EncryptionMessage::PublicKeyShare(_) => write!(f, "PublicKeyShare"), EncryptionMessage::SessionError(ref msg) => write!(f, "SessionError({})", msg.error), EncryptionMessage::SessionCompleted(_) => write!(f, "SessionCompleted"), @@ -352,6 +337,7 @@ impl fmt::Display for DecryptionMessage { DecryptionMessage::RequestPartialDecryption(_) => write!(f, "RequestPartialDecryption"), DecryptionMessage::PartialDecryption(_) => write!(f, "PartialDecryption"), DecryptionMessage::DecryptionSessionError(_) => write!(f, "DecryptionSessionError"), + DecryptionMessage::DecryptionSessionCompleted(_) => write!(f, "DecryptionSessionCompleted"), } } } diff --git a/secret_store/src/key_server_cluster/mod.rs b/secret_store/src/key_server_cluster/mod.rs index 5c7bc0d2f..bdaa868ee 100644 --- a/secret_store/src/key_server_cluster/mod.rs +++ b/secret_store/src/key_server_cluster/mod.rs @@ -20,7 +20,7 @@ use ethkey; use ethcrypto; use super::types::all::DocumentAddress; -pub use super::types::all::{NodeId, EncryptionConfiguration, DocumentEncryptedKeyShadow}; +pub use super::types::all::{NodeId, DocumentEncryptedKeyShadow}; pub use super::acl_storage::AclStorage; pub use super::key_storage::{KeyStorage, DocumentKeyShare}; pub use super::serialization::{SerializableSignature, SerializableH256, SerializableSecret, SerializablePublic}; diff --git a/secret_store/src/key_storage.rs b/secret_store/src/key_storage.rs index ac5e3bf8f..b324dd90a 100644 --- a/secret_store/src/key_storage.rs +++ b/secret_store/src/key_storage.rs @@ -135,8 +135,7 @@ pub mod tests { use parking_lot::RwLock; use devtools::RandomTempPath; use ethkey::{Random, Generator}; - use super::super::types::all::{Error, NodeAddress, ServiceConfiguration, ClusterConfiguration, - DocumentAddress, EncryptionConfiguration}; + use super::super::types::all::{Error, NodeAddress, ServiceConfiguration, ClusterConfiguration, DocumentAddress}; use super::{KeyStorage, PersistentKeyStorage, DocumentKeyShare}; #[derive(Default)] @@ -178,9 +177,6 @@ pub mod tests { }, nodes: BTreeMap::new(), allow_connecting_to_higher_nodes: false, - encryption_config: EncryptionConfiguration { - key_check_timeout_ms: 10, - }, }, }; diff --git a/secret_store/src/lib.rs b/secret_store/src/lib.rs index cd6116848..209a32cc2 100644 --- a/secret_store/src/lib.rs +++ b/secret_store/src/lib.rs @@ -38,6 +38,7 @@ extern crate ethcore; extern crate ethcore_devtools as devtools; extern crate ethcore_util as util; extern crate ethcore_ipc as ipc; +extern crate ethcore_logger as logger; extern crate ethcrypto; extern crate ethkey; extern crate native_contracts; @@ -60,7 +61,7 @@ use std::sync::Arc; use ethcore::client::Client; pub use types::all::{DocumentAddress, DocumentKey, DocumentEncryptedKey, RequestSignature, Public, - Error, NodeAddress, ServiceConfiguration, ClusterConfiguration, EncryptionConfiguration}; + Error, NodeAddress, ServiceConfiguration, ClusterConfiguration}; pub use traits::{KeyServer}; /// Start new key server instance @@ -70,6 +71,6 @@ pub fn start(client: Arc, config: ServiceConfiguration) -> Result