Fixing secretstore TODOs - part 2 (#5416)

* ECDKG protocol prototype

* added test for enc/dec math

* get rid of decryption_session

* added licenses

* fix after merge

* get rid of unused serde dependency

* doc

* decryption session [without commutative enc]

* failed_dec_session

* fixed tests

* added commen

* added more decryption session tests

* helper to localize an issue

* more computations to localize error

* decryption_session::SessionParams

* added tests for EC math to localize problem

* secretstore network transport

* encryption_session_works_over_network

* network errors processing

* connecting to KeyServer

* licenses

* get rid of debug println-s

* fixed secretstore args

* encryption results are stored in KS database

* decryption protocol works over network

* enc/dec Session traits

* fixing warnings

* fix after merge

* on-chain ACL checker proto

* fixed compilation

* fixed compilation

* finally fixed <odd>-of-N-scheme

* temporary commented test

* 1-of-N works in math

* scheme 1-of-N works

* updated AclStorage with real contract ABI

* remove unnecessary unsafety

* fixed grumbles

* wakeup on access denied

* encrypt secretstore messages

* 'shadow' decryption

* fix grumbles

* lost files

* secretstore cli-options

* decryption seccion when ACL check failed on master

* disallow regenerating key for existing document

* removed obsolete TODO

* fix after merge

* switched to tokio_io

* fix after merge

* fix after merge

* fix after merge

* fix after merge

* fix after merge

* fixed test

* fix after merge

* encryption session errors are now fatal

* session timeouts

* autorestart decryption session

* remove sessions on completion

* exclude disconnected nodes from decryption session

* test for enc/dec session over network with 1 node

* remove debug printlns

* fixed 1-of-1 scheme

* drop for KeyServerHttpListener

* added some tests

* fixed typo
This commit is contained in:
Svyatoslav Nikolsky 2017-04-25 22:34:03 +03:00 committed by Gav Wood
parent 87ce264926
commit 1a262048a6
14 changed files with 1051 additions and 775 deletions

1
Cargo.lock generated
View File

@ -603,6 +603,7 @@ dependencies = [
"ethcore-ipc 1.7.0",
"ethcore-ipc-codegen 1.7.0",
"ethcore-ipc-nano 1.7.0",
"ethcore-logger 1.7.0",
"ethcore-util 1.7.0",
"ethcrypto 0.1.0",
"ethkey 0.2.0",

View File

@ -96,9 +96,6 @@ mod server {
port: port,
})).collect(),
allow_connecting_to_higher_nodes: true,
encryption_config: ethcore_secretstore::EncryptionConfiguration {
key_check_timeout_ms: 1000,
},
},
};

View File

@ -31,6 +31,7 @@ ethcore-devtools = { path = "../devtools" }
ethcore-util = { path = "../util" }
ethcore-ipc = { path = "../ipc/rpc" }
ethcore-ipc-nano = { path = "../ipc/nano" }
ethcore-logger = { path = "../logger" }
ethcrypto = { path = "../ethcrypto" }
ethkey = { path = "../ethkey" }
native-contracts = { path = "../ethcore/native_contracts" }

View File

@ -27,7 +27,7 @@ use url::percent_encoding::percent_decode;
use util::ToPretty;
use traits::KeyServer;
use serialization::SerializableDocumentEncryptedKeyShadow;
use types::all::{Error, ServiceConfiguration, RequestSignature, DocumentAddress, DocumentEncryptedKey, DocumentEncryptedKeyShadow};
use types::all::{Error, NodeAddress, RequestSignature, DocumentAddress, DocumentEncryptedKey, DocumentEncryptedKeyShadow};
/// Key server http-requests listener
pub struct KeyServerHttpListener<T: KeyServer + 'static> {
@ -60,7 +60,7 @@ struct KeyServerSharedHttpHandler<T: KeyServer + 'static> {
impl<T> KeyServerHttpListener<T> where T: KeyServer + 'static {
/// Start KeyServer http listener
pub fn start(config: ServiceConfiguration, key_server: T) -> Result<Self, Error> {
pub fn start(listener_address: &NodeAddress, key_server: T) -> Result<Self, Error> {
let shared_handler = Arc::new(KeyServerSharedHttpHandler {
key_server: key_server,
});
@ -68,7 +68,7 @@ impl<T> KeyServerHttpListener<T> where T: KeyServer + 'static {
handler: shared_handler.clone(),
};
let listener_addr: &str = &format!("{}:{}", config.listener_address.address, config.listener_address.port);
let listener_addr: &str = &format!("{}:{}", listener_address.address, listener_address.port);
let http_server = HttpServer::http(&listener_addr).expect("cannot start HttpServer");
let http_server = http_server.handle(handler).expect("cannot start HttpServer");
let listener = KeyServerHttpListener {
@ -93,6 +93,13 @@ impl<T> KeyServer for KeyServerHttpListener<T> where T: KeyServer + 'static {
}
}
impl<T> Drop for KeyServerHttpListener<T> where T: KeyServer + 'static {
fn drop(&mut self) {
// ignore error as we are dropping anyway
let _ = self._http_server.close();
}
}
impl<T> HttpHandler for KeyServerHttpHandler<T> where T: KeyServer + 'static {
fn handle(&self, req: HttpRequest, mut res: HttpResponse) {
if req.headers.has::<header::Origin>() {
@ -219,7 +226,17 @@ fn parse_request(method: &HttpMethod, uri_path: &str) -> Request {
#[cfg(test)]
mod tests {
use hyper::method::Method as HttpMethod;
use super::{parse_request, Request};
use key_server::tests::DummyKeyServer;
use types::all::NodeAddress;
use super::{parse_request, Request, KeyServerHttpListener};
#[test]
fn http_listener_successfully_drops() {
let key_server = DummyKeyServer;
let address = NodeAddress { address: "127.0.0.1".into(), port: 9000 };
let listener = KeyServerHttpListener::start(&address, key_server).unwrap();
drop(listener);
}
#[test]
fn parse_request_successful() {

View File

@ -64,7 +64,7 @@ impl KeyServer for KeyServerImpl {
// generate document key
let encryption_session = self.data.lock().cluster.new_encryption_session(document.clone(), threshold)?;
let document_key = encryption_session.wait()?;
let document_key = encryption_session.wait(None)?;
// encrypt document key with requestor public key
let document_key = ethcrypto::ecies::encrypt_single_message(&public, &document_key)
@ -104,7 +104,6 @@ impl KeyServerCore {
.map(|(node_id, node_address)| (node_id.clone(), (node_address.address.clone(), node_address.port)))
.collect(),
allow_connecting_to_higher_nodes: config.allow_connecting_to_higher_nodes,
encryption_config: config.encryption_config.clone(),
acl_storage: acl_storage,
key_storage: key_storage,
};
@ -143,54 +142,108 @@ impl Drop for KeyServerCore {
}
#[cfg(test)]
mod tests {
pub mod tests {
use std::time;
use std::sync::Arc;
use ethcrypto;
use ethkey::{self, Random, Generator};
use acl_storage::tests::DummyAclStorage;
use key_storage::tests::DummyKeyStorage;
use types::all::{ClusterConfiguration, NodeAddress, EncryptionConfiguration};
use types::all::{Error, ClusterConfiguration, NodeAddress, RequestSignature, DocumentAddress, DocumentEncryptedKey, DocumentEncryptedKeyShadow};
use super::{KeyServer, KeyServerImpl};
#[test]
fn document_key_generation_and_retrievement_works_over_network() {
//::util::log::init_log();
pub struct DummyKeyServer;
let num_nodes = 3;
impl KeyServer for DummyKeyServer {
fn generate_document_key(&self, _signature: &RequestSignature, _document: &DocumentAddress, _threshold: usize) -> Result<DocumentEncryptedKey, Error> {
unimplemented!()
}
fn document_key(&self, _signature: &RequestSignature, _document: &DocumentAddress) -> Result<DocumentEncryptedKey, Error> {
unimplemented!()
}
fn document_key_shadow(&self, _signature: &RequestSignature, _document: &DocumentAddress) -> Result<DocumentEncryptedKeyShadow, Error> {
unimplemented!()
}
}
fn make_key_servers(start_port: u16, num_nodes: usize) -> Vec<KeyServerImpl> {
let key_pairs: Vec<_> = (0..num_nodes).map(|_| Random.generate().unwrap()).collect();
let configs: Vec<_> = (0..num_nodes).map(|i| ClusterConfiguration {
threads: 1,
self_private: (***key_pairs[i].secret()).into(),
listener_address: NodeAddress {
address: "127.0.0.1".into(),
port: 6060 + (i as u16),
port: start_port + (i as u16),
},
nodes: key_pairs.iter().enumerate().map(|(j, kp)| (kp.public().clone(),
NodeAddress {
address: "127.0.0.1".into(),
port: 6060 + (j as u16),
port: start_port + (j as u16),
})).collect(),
allow_connecting_to_higher_nodes: false,
encryption_config: EncryptionConfiguration {
key_check_timeout_ms: 10,
},
}).collect();
let key_servers: Vec<_> = configs.into_iter().map(|cfg|
KeyServerImpl::new(&cfg, Arc::new(DummyAclStorage::default()), Arc::new(DummyKeyStorage::default())).unwrap()
).collect();
// wait until connections are established
// wait until connections are established. It is fast => do not bother with events here
let start = time::Instant::now();
let mut tried_reconnections = false;
loop {
if key_servers.iter().all(|ks| ks.cluster().cluster_state().connected.len() == num_nodes - 1) {
break;
}
if time::Instant::now() - start > time::Duration::from_millis(30000) {
panic!("connections are not established in 30000ms");
let old_tried_reconnections = tried_reconnections;
let mut fully_connected = true;
for key_server in &key_servers {
if key_server.cluster().cluster_state().connected.len() != num_nodes - 1 {
fully_connected = false;
if !old_tried_reconnections {
tried_reconnections = true;
key_server.cluster().connect();
}
}
}
if fully_connected {
break;
}
if time::Instant::now() - start > time::Duration::from_millis(1000) {
panic!("connections are not established in 1000ms");
}
}
key_servers
}
#[test]
fn document_key_generation_and_retrievement_works_over_network_with_single_node() {
//::logger::init_log();
let key_servers = make_key_servers(6070, 1);
// generate document key
let threshold = 0;
let document = Random.generate().unwrap().secret().clone();
let secret = Random.generate().unwrap().secret().clone();
let signature = ethkey::sign(&secret, &document).unwrap();
let generated_key = key_servers[0].generate_document_key(&signature, &document, threshold).unwrap();
let generated_key = ethcrypto::ecies::decrypt_single_message(&secret, &generated_key).unwrap();
// now let's try to retrieve key back
for key_server in key_servers.iter() {
let retrieved_key = key_server.document_key(&signature, &document).unwrap();
let retrieved_key = ethcrypto::ecies::decrypt_single_message(&secret, &retrieved_key).unwrap();
assert_eq!(retrieved_key, generated_key);
}
}
#[test]
fn document_key_generation_and_retrievement_works_over_network_with_3_nodes() {
//::logger::init_log();
let key_servers = make_key_servers(6080, 3);
let test_cases = [0, 1, 2];
for threshold in &test_cases {
// generate document key

View File

@ -17,6 +17,7 @@
use std::io;
use std::time;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::collections::btree_map::Entry;
use std::net::{SocketAddr, IpAddr};
@ -24,19 +25,46 @@ use futures::{finished, failed, Future, Stream, BoxFuture};
use futures_cpupool::CpuPool;
use parking_lot::{RwLock, Mutex};
use tokio_io::IoFuture;
use tokio_core::reactor::{Handle, Remote, Timeout, Interval};
use tokio_core::reactor::{Handle, Remote, Interval};
use tokio_core::net::{TcpListener, TcpStream};
use ethkey::{Secret, KeyPair, Signature, Random, Generator};
use key_server_cluster::{Error, NodeId, SessionId, EncryptionConfiguration, AclStorage, KeyStorage};
use key_server_cluster::{Error, NodeId, SessionId, AclStorage, KeyStorage};
use key_server_cluster::message::{self, Message, ClusterMessage, EncryptionMessage, DecryptionMessage};
use key_server_cluster::decryption_session::{SessionImpl as DecryptionSessionImpl, DecryptionSessionId,
SessionParams as DecryptionSessionParams, Session as DecryptionSession};
use key_server_cluster::decryption_session::{SessionImpl as DecryptionSessionImpl, SessionState as DecryptionSessionState,
SessionParams as DecryptionSessionParams, Session as DecryptionSession, DecryptionSessionId};
use key_server_cluster::encryption_session::{SessionImpl as EncryptionSessionImpl, SessionState as EncryptionSessionState,
SessionParams as EncryptionSessionParams, Session as EncryptionSession};
use key_server_cluster::io::{DeadlineStatus, ReadMessage, SharedTcpStream, read_encrypted_message, WriteMessage, write_encrypted_message};
use key_server_cluster::net::{accept_connection as net_accept_connection, connect as net_connect, Connection as NetConnection};
pub type BoxedEmptyFuture = BoxFuture<(), ()>;
/// Maintain interval (seconds). Every MAINTAIN_INTERVAL seconds node:
/// 1) checks if connected nodes are responding to KeepAlive messages
/// 2) tries to connect to disconnected nodes
/// 3) checks if enc/dec sessions are time-outed
const MAINTAIN_INTERVAL: u64 = 10;
/// When no messages have been received from node within KEEP_ALIVE_SEND_INTERVAL seconds,
/// we must send KeepAlive message to the node to check if it still responds to messages.
const KEEP_ALIVE_SEND_INTERVAL: u64 = 30;
/// When no messages have been received from node within KEEP_ALIVE_DISCONNECT_INTERVAL seconds,
/// we must treat this node as non-responding && disconnect from it.
const KEEP_ALIVE_DISCONNECT_INTERVAL: u64 = 60;
/// When there are no encryption session-related messages for ENCRYPTION_SESSION_TIMEOUT_INTERVAL seconds,
/// we must treat this session as stalled && finish it with an error.
/// This timeout is for cases when node is responding to KeepAlive messages, but intentionally ignores
/// session messages.
const ENCRYPTION_SESSION_TIMEOUT_INTERVAL: u64 = 60;
/// When there are no decryption session-related messages for DECRYPTION_SESSION_TIMEOUT_INTERVAL seconds,
/// we must treat this session as stalled && finish it with an error.
/// This timeout is for cases when node is responding to KeepAlive messages, but intentionally ignores
/// session messages.
const DECRYPTION_SESSION_TIMEOUT_INTERVAL: u64 = 60;
/// Encryption sesion timeout interval. It works
/// Empty future.
type BoxedEmptyFuture = BoxFuture<(), ()>;
/// Cluster interface for external clients.
pub trait ClusterClient: Send + Sync {
@ -46,6 +74,16 @@ pub trait ClusterClient: Send + Sync {
fn new_encryption_session(&self, session_id: SessionId, threshold: usize) -> Result<Arc<EncryptionSession>, Error>;
/// Start new decryption session.
fn new_decryption_session(&self, session_id: SessionId, requestor_signature: Signature, is_shadow_decryption: bool) -> Result<Arc<DecryptionSession>, Error>;
#[cfg(test)]
/// Ask node to make 'faulty' encryption sessions.
fn make_faulty_encryption_sessions(&self);
#[cfg(test)]
/// Get active encryption session with given id.
fn encryption_session(&self, session_id: &SessionId) -> Option<Arc<EncryptionSessionImpl>>;
#[cfg(test)]
/// Try connect to disconnected nodes.
fn connect(&self);
}
/// Cluster access for single encryption/decryption participant.
@ -54,8 +92,6 @@ pub trait Cluster: Send + Sync {
fn broadcast(&self, message: Message) -> Result<(), Error>;
/// Send message to given node.
fn send(&self, to: &NodeId, message: Message) -> Result<(), Error>;
/// Blacklist node, close connection and remove all pending messages.
fn blacklist(&self, node: &NodeId);
}
#[derive(Clone)]
@ -71,8 +107,6 @@ pub struct ClusterConfiguration {
pub listen_address: (String, u16),
/// Cluster nodes.
pub nodes: BTreeMap<NodeId, (String, u16)>,
/// Encryption session configuration.
pub encryption_config: EncryptionConfiguration,
/// Reference to key storage
pub key_storage: Arc<KeyStorage>,
/// Reference to ACL storage
@ -136,6 +170,8 @@ pub struct ClusterConnections {
pub struct ClusterSessions {
/// Self node id.
pub self_node_id: NodeId,
/// All nodes ids.
pub nodes: BTreeSet<NodeId>,
/// Reference to key storage
pub key_storage: Arc<KeyStorage>,
/// Reference to ACL storage
@ -144,10 +180,18 @@ pub struct ClusterSessions {
pub encryption_sessions: RwLock<BTreeMap<SessionId, QueuedEncryptionSession>>,
/// Active decryption sessions.
pub decryption_sessions: RwLock<BTreeMap<DecryptionSessionId, QueuedDecryptionSession>>,
/// Make faulty encryption sessions.
pub make_faulty_encryption_sessions: AtomicBool,
}
/// Encryption session and its message queue.
pub struct QueuedEncryptionSession {
/// Session master.
pub master: NodeId,
/// Cluster view.
pub cluster_view: Arc<ClusterView>,
/// Last received message time.
pub last_message_time: time::Instant,
/// Encryption session.
pub session: Arc<EncryptionSessionImpl>,
/// Messages queue.
@ -156,6 +200,12 @@ pub struct QueuedEncryptionSession {
/// Decryption session and its message queue.
pub struct QueuedDecryptionSession {
/// Session master.
pub master: NodeId,
/// Cluster view.
pub cluster_view: Arc<ClusterView>,
/// Last received message time.
pub last_message_time: time::Instant,
/// Decryption session.
pub session: Arc<DecryptionSessionImpl>,
/// Messages queue.
@ -217,17 +267,28 @@ impl ClusterCore {
self.data.connection(node)
}
/// Run cluster
/// Run cluster.
pub fn run(&self) -> Result<(), Error> {
// try to connect to every other peer
ClusterCore::connect_disconnected_nodes(self.data.clone());
self.run_listener()
.and_then(|_| self.run_connections())?;
// schedule maintain procedures
ClusterCore::schedule_maintain(&self.handle, self.data.clone());
// start listening for incoming connections
self.handle.spawn(ClusterCore::listen(&self.handle, self.data.clone(), self.listen_address.clone())?);
Ok(())
}
/// Start listening for incoming connections.
pub fn run_listener(&self) -> Result<(), Error> {
// start listeining for incoming connections
self.handle.spawn(ClusterCore::listen(&self.handle, self.data.clone(), self.listen_address.clone())?);
Ok(())
}
/// Start connecting to other nodes.
pub fn run_connections(&self) -> Result<(), Error> {
// try to connect to every other peer
ClusterCore::connect_disconnected_nodes(self.data.clone());
Ok(())
}
@ -278,18 +339,24 @@ impl ClusterCore {
/// Schedule mainatain procedures.
fn schedule_maintain(handle: &Handle, data: Arc<ClusterData>) {
// TODO: per-session timeouts (node can respond to messages, but ignore sessions messages)
let (d1, d2, d3) = (data.clone(), data.clone(), data.clone());
let interval: BoxedEmptyFuture = Interval::new(time::Duration::new(10, 0), handle)
let d = data.clone();
let interval: BoxedEmptyFuture = Interval::new(time::Duration::new(MAINTAIN_INTERVAL, 0), handle)
.expect("failed to create interval")
.and_then(move |_| Ok(trace!(target: "secretstore_net", "{}: executing maintain procedures", d1.self_key_pair.public())))
.and_then(move |_| Ok(ClusterCore::keep_alive(d2.clone())))
.and_then(move |_| Ok(ClusterCore::connect_disconnected_nodes(d3.clone())))
.and_then(move |_| Ok(ClusterCore::maintain(data.clone())))
.for_each(|_| Ok(()))
.then(|_| finished(()))
.boxed();
data.spawn(interval);
d.spawn(interval);
}
/// Execute maintain procedures.
fn maintain(data: Arc<ClusterData>) {
trace!(target: "secretstore_net", "{}: executing maintain procedures", data.self_key_pair.public());
ClusterCore::keep_alive(data.clone());
ClusterCore::connect_disconnected_nodes(data.clone());
data.sessions.stop_stalled_sessions();
}
/// Called for every incomming mesage.
@ -324,11 +391,11 @@ impl ClusterCore {
fn keep_alive(data: Arc<ClusterData>) {
for connection in data.connections.active_connections() {
let last_message_diff = time::Instant::now() - connection.last_message_time();
if last_message_diff > time::Duration::from_secs(60) {
if last_message_diff > time::Duration::from_secs(KEEP_ALIVE_DISCONNECT_INTERVAL) {
data.connections.remove(connection.node_id(), connection.is_inbound());
data.sessions.on_connection_timeout(connection.node_id());
}
else if last_message_diff > time::Duration::from_secs(30) {
else if last_message_diff > time::Duration::from_secs(KEEP_ALIVE_SEND_INTERVAL) {
data.spawn(connection.send_message(Message::Cluster(ClusterMessage::KeepAlive(message::KeepAlive {}))));
}
}
@ -370,7 +437,7 @@ impl ClusterCore {
/// Process single message from the connection.
fn process_connection_message(data: Arc<ClusterData>, connection: Arc<Connection>, message: Message) {
connection.set_last_message_time(time::Instant::now());
trace!(target: "secretstore_net", "{}: processing message {} from {}", data.self_key_pair.public(), message, connection.node_id());
trace!(target: "secretstore_net", "{}: received message {} from {}", data.self_key_pair.public(), message, connection.node_id());
match message {
Message::Encryption(message) => ClusterCore::process_encryption_message(data, connection, message),
Message::Decryption(message) => ClusterCore::process_decryption_message(data, connection, message),
@ -380,102 +447,53 @@ impl ClusterCore {
/// Process single encryption message from the connection.
fn process_encryption_message(data: Arc<ClusterData>, connection: Arc<Connection>, mut message: EncryptionMessage) {
let mut sender = connection.node_id().clone();
let mut is_queued_message = false;
let session_id = message.session_id().clone();
let key_check_timeout_ms = data.config.encryption_config.key_check_timeout_ms;
let mut sender = connection.node_id().clone();
let session = match message {
EncryptionMessage::InitializeSession(_) => {
let mut connected_nodes = data.connections.connected_nodes();
connected_nodes.insert(data.self_key_pair.public().clone());
let cluster = Arc::new(ClusterView::new(data.clone(), connected_nodes));
data.sessions.new_encryption_session(sender.clone(), session_id.clone(), cluster)
},
_ => {
data.sessions.encryption_session(&session_id)
.ok_or(Error::InvalidSessionId)
},
};
let mut is_queued_message = false;
loop {
let result = match message {
EncryptionMessage::InitializeSession(ref message) => {
let mut connected_nodes = data.connections.connected_nodes();
connected_nodes.insert(data.self_key_pair.public().clone());
let cluster = Arc::new(ClusterView::new(data.clone(), connected_nodes));
let session_id: SessionId = message.session.clone().into();
data.sessions.new_encryption_session(sender.clone(), session_id.clone(), cluster)
.and_then(|s| s.on_initialize_session(sender.clone(), message))
},
EncryptionMessage::ConfirmInitialization(ref message) => data.sessions.encryption_session(&*message.session)
.ok_or(Error::InvalidSessionId)
.and_then(|s| s.on_confirm_initialization(sender.clone(), message)),
EncryptionMessage::CompleteInitialization(ref message) => data.sessions.encryption_session(&*message.session)
.ok_or(Error::InvalidSessionId)
.and_then(|s| s.on_complete_initialization(sender.clone(), message)),
EncryptionMessage::KeysDissemination(ref message) => data.sessions.encryption_session(&*message.session)
.ok_or(Error::InvalidSessionId)
.and_then(|s| {
// TODO: move this logic to session (or session connector)
let is_in_key_check_state = s.state() == EncryptionSessionState::KeyCheck;
let result = s.on_keys_dissemination(sender.clone(), message);
if !is_in_key_check_state && s.state() == EncryptionSessionState::KeyCheck {
let session = s.clone();
let d = data.clone();
data.handle.spawn(move |handle|
Timeout::new(time::Duration::new(key_check_timeout_ms / 1000, 0), handle)
.expect("failed to create timeout")
.and_then(move |_| {
if let Err(error) = session.start_key_generation_phase() {
session.on_session_error(d.self_key_pair.public().clone(), &message::SessionError {
session: session.id().clone().into(),
error: error.into(),
});
}
Ok(())
})
.then(|_| finished(()))
);
}
result
}),
EncryptionMessage::Complaint(ref message) => data.sessions.encryption_session(&*message.session)
.ok_or(Error::InvalidSessionId)
.and_then(|s| s.on_complaint(sender.clone(), message)),
EncryptionMessage::ComplaintResponse(ref message) => data.sessions.encryption_session(&*message.session)
.ok_or(Error::InvalidSessionId)
.and_then(|s| s.on_complaint_response(sender.clone(), message)),
EncryptionMessage::PublicKeyShare(ref message) => data.sessions.encryption_session(&*message.session)
.ok_or(Error::InvalidSessionId)
.and_then(|s| s.on_public_key_share(sender.clone(), message)),
EncryptionMessage::SessionError(ref message) => {
if let Some(s) = data.sessions.encryption_session(&*message.session) {
data.sessions.remove_encryption_session(s.id());
s.on_session_error(sender.clone(), message);
}
Ok(())
},
EncryptionMessage::SessionCompleted(ref message) => data.sessions.encryption_session(&*message.session)
.ok_or(Error::InvalidSessionId)
.and_then(|s| {
let result = s.on_session_completed(sender.clone(), message);
if result.is_ok() && s.state() == EncryptionSessionState::Finished {
data.sessions.remove_encryption_session(s.id());
}
result
}),
};
match result {
Err(Error::TooEarlyForRequest) => {
data.sessions.enqueue_encryption_message(&session_id, sender, message, is_queued_message);
break;
},
Err(err) => {
warn!(target: "secretstore_net", "{}: error {} when processing message {} from node {}", data.self_key_pair.public(), err, message, sender);
if let Some(connection) = data.connections.get(&sender) {
data.spawn(connection.send_message(Message::Encryption(EncryptionMessage::SessionError(message::SessionError {
session: session_id.clone().into(),
error: format!("{:?}", err),
}))));
match session.clone().and_then(|session| match message {
EncryptionMessage::InitializeSession(ref message) =>
session.on_initialize_session(sender.clone(), message),
EncryptionMessage::ConfirmInitialization(ref message) =>
session.on_confirm_initialization(sender.clone(), message),
EncryptionMessage::CompleteInitialization(ref message) =>
session.on_complete_initialization(sender.clone(), message),
EncryptionMessage::KeysDissemination(ref message) =>
session.on_keys_dissemination(sender.clone(), message),
EncryptionMessage::PublicKeyShare(ref message) =>
session.on_public_key_share(sender.clone(), message),
EncryptionMessage::SessionError(ref message) =>
session.on_session_error(sender.clone(), message),
EncryptionMessage::SessionCompleted(ref message) =>
session.on_session_completed(sender.clone(), message),
}) {
Ok(_) => {
// if session is completed => stop
let session = session.clone().expect("session.method() call finished with success; session exists; qed");
let session_state = session.state();
if session_state == EncryptionSessionState::Finished {
info!(target: "secretstore_net", "{}: encryption session completed", data.self_key_pair.public());
}
if err != Error::InvalidSessionId {
if session_state == EncryptionSessionState::Finished || session_state == EncryptionSessionState::Failed {
data.sessions.remove_encryption_session(&session_id);
break;
}
break;
},
_ => {
// try to dequeue message
match data.sessions.dequeue_encryption_message(&session_id) {
Some((msg_sender, msg)) => {
is_queued_message = true;
@ -485,64 +503,73 @@ impl ClusterCore {
None => break,
}
},
Err(Error::TooEarlyForRequest) => {
data.sessions.enqueue_encryption_message(&session_id, sender, message, is_queued_message);
break;
},
Err(err) => {
warn!(target: "secretstore_net", "{}: encryption session error {} when processing message {} from node {}", data.self_key_pair.public(), err, message, sender);
data.sessions.respond_with_encryption_error(&session_id, message::SessionError {
session: session_id.clone().into(),
error: format!("{:?}", err),
});
if err != Error::InvalidSessionId {
data.sessions.remove_encryption_session(&session_id);
}
break;
},
}
}
}
/// Process single decryption message from the connection.
fn process_decryption_message(data: Arc<ClusterData>, connection: Arc<Connection>, mut message: DecryptionMessage) {
let mut sender = connection.node_id().clone();
let mut is_queued_message = false;
let session_id = message.session_id().clone();
let sub_session_id = message.sub_session_id().clone();
let mut sender = connection.node_id().clone();
let session = match message {
DecryptionMessage::InitializeDecryptionSession(_) => {
let mut connected_nodes = data.connections.connected_nodes();
connected_nodes.insert(data.self_key_pair.public().clone());
let cluster = Arc::new(ClusterView::new(data.clone(), connected_nodes));
data.sessions.new_decryption_session(sender.clone(), session_id.clone(), sub_session_id.clone(), cluster)
},
_ => {
data.sessions.decryption_session(&session_id, &sub_session_id)
.ok_or(Error::InvalidSessionId)
},
};
let mut is_queued_message = false;
loop {
let result = match message {
DecryptionMessage::InitializeDecryptionSession(ref message) => {
let mut connected_nodes = data.connections.connected_nodes();
connected_nodes.insert(data.self_key_pair.public().clone());
let cluster = Arc::new(ClusterView::new(data.clone(), connected_nodes));
data.sessions.new_decryption_session(sender.clone(), session_id.clone(), sub_session_id.clone(), cluster)
.and_then(|s| s.on_initialize_session(sender.clone(), message))
},
DecryptionMessage::ConfirmDecryptionInitialization(ref message) => data.sessions.decryption_session(&*message.session, &*message.sub_session)
.ok_or(Error::InvalidSessionId)
.and_then(|s| s.on_confirm_initialization(sender.clone(), message)),
DecryptionMessage::RequestPartialDecryption(ref message) => data.sessions.decryption_session(&*message.session, &*message.sub_session)
.ok_or(Error::InvalidSessionId)
.and_then(|s| s.on_partial_decryption_requested(sender.clone(), message)),
DecryptionMessage::PartialDecryption(ref message) => data.sessions.decryption_session(&*message.session, &*message.sub_session)
.ok_or(Error::InvalidSessionId)
.and_then(|s| s.on_partial_decryption(sender.clone(), message)),
DecryptionMessage::DecryptionSessionError(ref message) => {
if let Some(s) = data.sessions.decryption_session(&*message.session, &*message.sub_session) {
data.sessions.remove_decryption_session(&session_id, &sub_session_id);
s.on_session_error(sender.clone(), message);
}
Ok(())
},
};
match result {
Err(Error::TooEarlyForRequest) => {
data.sessions.enqueue_decryption_message(&session_id, &sub_session_id, sender, message, is_queued_message);
break;
},
Err(err) => {
if let Some(connection) = data.connections.get(&sender) {
data.spawn(connection.send_message(Message::Decryption(DecryptionMessage::DecryptionSessionError(message::DecryptionSessionError {
session: session_id.clone().into(),
sub_session: sub_session_id.clone().into(),
error: format!("{:?}", err),
}))));
match session.clone().and_then(|session| match message {
DecryptionMessage::InitializeDecryptionSession(ref message) =>
session.on_initialize_session(sender.clone(), message),
DecryptionMessage::ConfirmDecryptionInitialization(ref message) =>
session.on_confirm_initialization(sender.clone(), message),
DecryptionMessage::RequestPartialDecryption(ref message) =>
session.on_partial_decryption_requested(sender.clone(), message),
DecryptionMessage::PartialDecryption(ref message) =>
session.on_partial_decryption(sender.clone(), message),
DecryptionMessage::DecryptionSessionError(ref message) =>
session.on_session_error(sender.clone(), message),
DecryptionMessage::DecryptionSessionCompleted(ref message) =>
session.on_session_completed(sender.clone(), message),
}) {
Ok(_) => {
// if session is completed => stop
let session = session.clone().expect("session.method() call finished with success; session exists; qed");
let session_state = session.state();
if session_state == DecryptionSessionState::Finished {
info!(target: "secretstore_net", "{}: decryption session completed", data.self_key_pair.public());
}
if err != Error::InvalidSessionId {
if session_state == DecryptionSessionState::Finished || session_state == DecryptionSessionState::Failed {
data.sessions.remove_decryption_session(&session_id, &sub_session_id);
break;
}
break;
},
_ => {
// try to dequeue message
match data.sessions.dequeue_decryption_message(&session_id, &sub_session_id) {
Some((msg_sender, msg)) => {
is_queued_message = true;
@ -552,6 +579,22 @@ impl ClusterCore {
None => break,
}
},
Err(Error::TooEarlyForRequest) => {
data.sessions.enqueue_decryption_message(&session_id, &sub_session_id, sender, message, is_queued_message);
break;
},
Err(err) => {
warn!(target: "secretstore_net", "{}: decryption session error {} when processing message {} from node {}", data.self_key_pair.public(), err, message, sender);
data.sessions.respond_with_decryption_error(&session_id, &sub_session_id, &sender, message::DecryptionSessionError {
session: session_id.clone().into(),
sub_session: sub_session_id.clone().into(),
error: format!("{:?}", err),
});
if err != Error::InvalidSessionId {
data.sessions.remove_decryption_session(&session_id, &sub_session_id);
}
break;
},
}
}
}
@ -602,6 +645,7 @@ impl ClusterConnections {
return false;
}
}
trace!(target: "secretstore_net", "{}: inserting connection to {} at {}", self.self_node_id, connection.node_id(), connection.node_address());
connections.insert(connection.node_id().clone(), connection);
true
@ -640,14 +684,16 @@ impl ClusterSessions {
pub fn new(config: &ClusterConfiguration) -> Self {
ClusterSessions {
self_node_id: config.self_key_pair.public().clone(),
nodes: config.nodes.keys().cloned().collect(),
acl_storage: config.acl_storage.clone(),
key_storage: config.key_storage.clone(),
encryption_sessions: RwLock::new(BTreeMap::new()),
decryption_sessions: RwLock::new(BTreeMap::new()),
make_faulty_encryption_sessions: AtomicBool::new(false),
}
}
pub fn new_encryption_session(&self, _master: NodeId, session_id: SessionId, cluster: Arc<Cluster>) -> Result<Arc<EncryptionSessionImpl>, Error> {
pub fn new_encryption_session(&self, master: NodeId, session_id: SessionId, cluster: Arc<ClusterView>) -> Result<Arc<EncryptionSessionImpl>, Error> {
let mut encryption_sessions = self.encryption_sessions.write();
// check that there's no active encryption session with the same id
if encryption_sessions.contains_key(&session_id) {
@ -658,16 +704,28 @@ impl ClusterSessions {
return Err(Error::DuplicateSessionId);
}
// communicating to all other nodes is crucial for encryption session
// => check that we have connections to all cluster nodes
if self.nodes.iter().any(|n| !cluster.is_connected(n)) {
return Err(Error::NodeDisconnected);
}
let session = Arc::new(EncryptionSessionImpl::new(EncryptionSessionParams {
id: session_id.clone(),
self_node_id: self.self_node_id.clone(),
key_storage: self.key_storage.clone(),
cluster: cluster,
cluster: cluster.clone(),
}));
let encryption_session = QueuedEncryptionSession {
master: master,
cluster_view: cluster,
last_message_time: time::Instant::now(),
session: session.clone(),
queue: VecDeque::new()
};
if self.make_faulty_encryption_sessions.load(Ordering::Relaxed) {
encryption_session.session.simulate_faulty_behaviour();
}
encryption_sessions.insert(session_id, encryption_session);
Ok(session)
}
@ -691,22 +749,50 @@ impl ClusterSessions {
.and_then(|session| session.queue.pop_front())
}
pub fn new_decryption_session(&self, _master: NodeId, session_id: SessionId, sub_session_id: Secret, cluster: Arc<Cluster>) -> Result<Arc<DecryptionSessionImpl>, Error> {
pub fn respond_with_encryption_error(&self, session_id: &SessionId, error: message::SessionError) {
self.encryption_sessions.read().get(session_id)
.map(|s| {
// error in encryption session is considered fatal
// => broadcast error
// do not bother processing send error, as we already processing error
let _ = s.cluster_view.broadcast(Message::Encryption(EncryptionMessage::SessionError(error)));
});
}
#[cfg(test)]
pub fn make_faulty_encryption_sessions(&self) {
self.make_faulty_encryption_sessions.store(true, Ordering::Relaxed);
}
pub fn new_decryption_session(&self, master: NodeId, session_id: SessionId, sub_session_id: Secret, cluster: Arc<ClusterView>) -> Result<Arc<DecryptionSessionImpl>, Error> {
let mut decryption_sessions = self.decryption_sessions.write();
let session_id = DecryptionSessionId::new(session_id, sub_session_id);
if decryption_sessions.contains_key(&session_id) {
return Err(Error::DuplicateSessionId);
}
// some of nodes, which were encrypting secret may be down
// => do not use these in decryption session
let mut encrypted_data = self.key_storage.get(&session_id.id).map_err(|e| Error::KeyStorage(e.into()))?;
let disconnected_nodes: BTreeSet<_> = encrypted_data.id_numbers.keys().cloned().collect();
let disconnected_nodes: BTreeSet<_> = disconnected_nodes.difference(&cluster.nodes()).cloned().collect();
for disconnected_node in disconnected_nodes {
encrypted_data.id_numbers.remove(&disconnected_node);
}
let session = Arc::new(DecryptionSessionImpl::new(DecryptionSessionParams {
id: session_id.id.clone(),
access_key: session_id.access_key.clone(),
self_node_id: self.self_node_id.clone(),
encrypted_data: self.key_storage.get(&session_id.id).map_err(|e| Error::KeyStorage(e.into()))?,
encrypted_data: encrypted_data,
acl_storage: self.acl_storage.clone(),
cluster: cluster,
cluster: cluster.clone(),
})?);
let decryption_session = QueuedDecryptionSession {
master: master,
cluster_view: cluster,
last_message_time: time::Instant::now(),
session: session.clone(),
queue: VecDeque::new()
};
@ -737,12 +823,66 @@ impl ClusterSessions {
.and_then(|session| session.queue.pop_front())
}
pub fn on_connection_timeout(&self, node_id: &NodeId) {
for encryption_session in self.encryption_sessions.read().values() {
encryption_session.session.on_session_timeout(node_id);
pub fn respond_with_decryption_error(&self, session_id: &SessionId, sub_session_id: &Secret, to: &NodeId, error: message::DecryptionSessionError) {
let session_id = DecryptionSessionId::new(session_id.clone(), sub_session_id.clone());
self.decryption_sessions.read().get(&session_id)
.map(|s| {
// error in decryption session is non-fatal, if occurs on slave node
// => either respond with error
// => or broadcast error
// do not bother processing send error, as we already processing error
if &s.master == s.session.node() {
let _ = s.cluster_view.broadcast(Message::Decryption(DecryptionMessage::DecryptionSessionError(error)));
} else {
let _ = s.cluster_view.send(to, Message::Decryption(DecryptionMessage::DecryptionSessionError(error)));
}
});
}
fn stop_stalled_sessions(&self) {
{
let sessions = self.encryption_sessions.write();
for sid in sessions.keys().collect::<Vec<_>>() {
let session = sessions.get(&sid).expect("enumerating only existing sessions; qed");
if time::Instant::now() - session.last_message_time > time::Duration::from_secs(ENCRYPTION_SESSION_TIMEOUT_INTERVAL) {
session.session.on_session_timeout();
if session.session.state() == EncryptionSessionState::Finished
|| session.session.state() == EncryptionSessionState::Failed {
self.remove_encryption_session(&sid);
}
}
}
}
for decryption_session in self.decryption_sessions.read().values() {
decryption_session.session.on_session_timeout(node_id);
{
let sessions = self.decryption_sessions.write();
for sid in sessions.keys().collect::<Vec<_>>() {
let session = sessions.get(&sid).expect("enumerating only existing sessions; qed");
if time::Instant::now() - session.last_message_time > time::Duration::from_secs(DECRYPTION_SESSION_TIMEOUT_INTERVAL) {
session.session.on_session_timeout();
if session.session.state() == DecryptionSessionState::Finished
|| session.session.state() == DecryptionSessionState::Failed {
self.remove_decryption_session(&sid.id, &sid.access_key);
}
}
}
}
}
pub fn on_connection_timeout(&self, node_id: &NodeId) {
for (sid, session) in self.encryption_sessions.read().iter() {
session.session.on_node_timeout(node_id);
if session.session.state() == EncryptionSessionState::Finished
|| session.session.state() == EncryptionSessionState::Failed {
self.remove_encryption_session(sid);
}
}
for (sid, session) in self.decryption_sessions.read().iter() {
session.session.on_node_timeout(node_id);
if session.session.state() == DecryptionSessionState::Finished
|| session.session.state() == DecryptionSessionState::Failed {
self.remove_decryption_session(&sid.id, &sid.access_key);
}
}
}
}
@ -823,12 +963,21 @@ impl ClusterView {
})),
}
}
pub fn is_connected(&self, node: &NodeId) -> bool {
self.core.lock().nodes.contains(node)
}
pub fn nodes(&self) -> BTreeSet<NodeId> {
self.core.lock().nodes.clone()
}
}
impl Cluster for ClusterView {
fn broadcast(&self, message: Message) -> Result<(), Error> {
let core = self.core.lock();
for node in core.nodes.iter().filter(|n| *n != core.cluster.self_key_pair.public()) {
trace!(target: "secretstore_net", "{}: sent message {} to {}", core.cluster.self_key_pair.public(), message, node);
let connection = core.cluster.connection(node).ok_or(Error::NodeDisconnected)?;
core.cluster.spawn(connection.send_message(message.clone()))
}
@ -837,14 +986,11 @@ impl Cluster for ClusterView {
fn send(&self, to: &NodeId, message: Message) -> Result<(), Error> {
let core = self.core.lock();
trace!(target: "secretstore_net", "{}: sent message {} to {}", core.cluster.self_key_pair.public(), message, to);
let connection = core.cluster.connection(to).ok_or(Error::NodeDisconnected)?;
core.cluster.spawn(connection.send_message(message));
Ok(())
}
fn blacklist(&self, _node: &NodeId) {
// TODO: unimplemented!()
}
}
impl ClusterClientImpl {
@ -880,6 +1026,21 @@ impl ClusterClient for ClusterClientImpl {
session.initialize(requestor_signature, is_shadow_decryption)?;
Ok(session)
}
#[cfg(test)]
fn connect(&self) {
ClusterCore::connect_disconnected_nodes(self.data.clone());
}
#[cfg(test)]
fn make_faulty_encryption_sessions(&self) {
self.data.sessions.make_faulty_encryption_sessions();
}
#[cfg(test)]
fn encryption_session(&self, session_id: &SessionId) -> Option<Arc<EncryptionSessionImpl>> {
self.data.sessions.encryption_session(session_id)
}
}
fn make_socket_address(address: &str, port: u16) -> Result<SocketAddr, Error> {
@ -895,9 +1056,10 @@ pub mod tests {
use parking_lot::Mutex;
use tokio_core::reactor::Core;
use ethkey::{Random, Generator};
use key_server_cluster::{NodeId, Error, EncryptionConfiguration, DummyAclStorage, DummyKeyStorage};
use key_server_cluster::{NodeId, SessionId, Error, DummyAclStorage, DummyKeyStorage};
use key_server_cluster::message::Message;
use key_server_cluster::cluster::{Cluster, ClusterCore, ClusterConfiguration};
use key_server_cluster::encryption_session::{Session as EncryptionSession, SessionState as EncryptionSessionState};
#[derive(Debug)]
pub struct DummyCluster {
@ -947,9 +1109,6 @@ pub mod tests {
self.data.lock().messages.push_back((to.clone(), message));
Ok(())
}
fn blacklist(&self, _node: &NodeId) {
}
}
pub fn loop_until<F>(core: &mut Core, timeout: time::Duration, predicate: F) where F: Fn() -> bool {
@ -982,9 +1141,6 @@ pub mod tests {
.map(|(j, kp)| (kp.public().clone(), ("127.0.0.1".into(), ports_begin + j as u16)))
.collect(),
allow_connecting_to_higher_nodes: false,
encryption_config: EncryptionConfiguration {
key_check_timeout_ms: 10,
},
key_storage: Arc::new(DummyKeyStorage::default()),
acl_storage: Arc::new(DummyAclStorage::default()),
}).collect();
@ -997,7 +1153,10 @@ pub mod tests {
pub fn run_clusters(clusters: &[Arc<ClusterCore>]) {
for cluster in clusters {
cluster.run().unwrap();
cluster.run_listener().unwrap();
}
for cluster in clusters {
cluster.run_connections().unwrap();
}
}
@ -1008,4 +1167,65 @@ pub mod tests {
run_clusters(&clusters);
loop_until(&mut core, time::Duration::from_millis(300), || clusters.iter().all(all_connections_established));
}
#[test]
fn cluster_wont_start_encryption_session_if_not_fully_connected() {
let core = Core::new().unwrap();
let clusters = make_clusters(&core, 6013, 3);
clusters[0].run().unwrap();
match clusters[0].client().new_encryption_session(SessionId::default(), 1) {
Err(Error::NodeDisconnected) => (),
Err(e) => panic!("unexpected error {:?}", e),
_ => panic!("unexpected success"),
}
}
#[test]
fn error_in_encryption_session_broadcasted_to_all_other_nodes() {
let mut core = Core::new().unwrap();
let clusters = make_clusters(&core, 6016, 3);
run_clusters(&clusters);
loop_until(&mut core, time::Duration::from_millis(300), || clusters.iter().all(all_connections_established));
// ask one of nodes to produce faulty encryption sessions
clusters[1].client().make_faulty_encryption_sessions();
// start && wait for encryption session to fail
let session = clusters[0].client().new_encryption_session(SessionId::default(), 1).unwrap();
loop_until(&mut core, time::Duration::from_millis(300), || session.joint_public_key().is_some());
assert!(session.joint_public_key().unwrap().is_err());
// check that faulty session is either removed from all nodes, or nonexistent (already removed)
assert!(clusters[0].client().encryption_session(&SessionId::default()).is_none());
for i in 1..3 {
if let Some(session) = clusters[i].client().encryption_session(&SessionId::default()) {
loop_until(&mut core, time::Duration::from_millis(300), || session.joint_public_key().is_some());
assert!(session.joint_public_key().unwrap().is_err());
assert!(clusters[i].client().encryption_session(&SessionId::default()).is_none());
}
}
}
#[test]
fn encryption_session_is_removed_when_succeeded() {
let mut core = Core::new().unwrap();
let clusters = make_clusters(&core, 6019, 3);
run_clusters(&clusters);
loop_until(&mut core, time::Duration::from_millis(300), || clusters.iter().all(all_connections_established));
// start && wait for encryption session to complete
let session = clusters[0].client().new_encryption_session(SessionId::default(), 1).unwrap();
loop_until(&mut core, time::Duration::from_millis(300), || session.state() == EncryptionSessionState::Finished);
assert!(session.joint_public_key().unwrap().is_ok());
// check that session is either removed from all nodes, or nonexistent (already removed)
assert!(clusters[0].client().encryption_session(&SessionId::default()).is_none());
for i in 1..3 {
if let Some(session) = clusters[i].client().encryption_session(&SessionId::default()) {
loop_until(&mut core, time::Duration::from_millis(300), || session.state() == EncryptionSessionState::Finished);
assert!(session.joint_public_key().unwrap().is_err());
assert!(clusters[i].client().encryption_session(&SessionId::default()).is_none());
}
}
}
}

View File

@ -24,7 +24,7 @@ use key_server_cluster::{Error, AclStorage, DocumentKeyShare, NodeId, SessionId,
use key_server_cluster::cluster::Cluster;
use key_server_cluster::math;
use key_server_cluster::message::{Message, DecryptionMessage, InitializeDecryptionSession, ConfirmDecryptionInitialization,
RequestPartialDecryption, PartialDecryption, DecryptionSessionError};
RequestPartialDecryption, PartialDecryption, DecryptionSessionError, DecryptionSessionCompleted};
/// Decryption session API.
pub trait Session: Send + Sync + 'static {
@ -114,8 +114,10 @@ struct SessionData {
rejected_nodes: BTreeSet<NodeId>,
/// Nodes, which have responded with confirm to initialization request.
confirmed_nodes: BTreeSet<NodeId>,
// === Values, filled during partial decryption ===
/// Nodes, which have been asked for partial decryption.
shadow_requests: BTreeSet<NodeId>,
/// Shadow points, received from nodes as a response to partial decryption request.
shadow_points: BTreeMap<NodeId, PartialDecryptionResult>,
@ -162,6 +164,7 @@ impl SessionImpl {
requested_nodes: BTreeSet::new(),
rejected_nodes: BTreeSet::new(),
confirmed_nodes: BTreeSet::new(),
shadow_requests: BTreeSet::new(),
shadow_points: BTreeMap::new(),
decrypted_secret: None,
})
@ -173,22 +176,21 @@ impl SessionImpl {
&self.self_node_id
}
/// Get current session state.
pub fn state(&self) -> SessionState {
self.data.lock().state.clone()
}
#[cfg(test)]
/// Get this session access key.
pub fn access_key(&self) -> &Secret {
&self.access_key
}
#[cfg(test)]
/// Get current session state.
pub fn state(&self) -> SessionState {
self.data.lock().state.clone()
}
#[cfg(test)]
/// Get decrypted secret
pub fn decrypted_secret(&self) -> Option<DocumentEncryptedKeyShadow> {
self.data.lock().decrypted_secret.clone().and_then(|r| r.ok())
pub fn decrypted_secret(&self) -> Option<Result<DocumentEncryptedKeyShadow, Error>> {
self.data.lock().decrypted_secret.clone()
}
/// Initialize decryption session.
@ -284,9 +286,17 @@ impl SessionImpl {
let mut data = self.data.lock();
// check state
if data.state != SessionState::WaitingForInitializationConfirm {
if data.state == SessionState::WaitingForPartialDecryption {
// if there were enough confirmations/rejections before this message
// we have already moved to the next state
if !data.requested_nodes.remove(&sender) {
return Err(Error::InvalidMessage);
}
data.confirmed_nodes.insert(sender);
return Ok(());
}
if data.state != SessionState::WaitingForInitializationConfirm {
return Ok(());
}
@ -321,7 +331,7 @@ impl SessionImpl {
return Err(Error::InvalidMessage);
}
let mut data = self.data.lock();
let data = self.data.lock();
// check state
if data.master != Some(sender) {
@ -345,8 +355,8 @@ impl SessionImpl {
decrypt_shadow: decryption_result.decrypt_shadow,
})))?;
// update sate
data.state = SessionState::Finished;
// master could ask us for another partial decryption in case of restart
// => no state change is required
Ok(())
}
@ -364,7 +374,7 @@ impl SessionImpl {
return Err(Error::InvalidStateForRequest);
}
if !data.confirmed_nodes.remove(&sender) {
if !data.shadow_requests.remove(&sender) {
return Err(Error::InvalidStateForRequest);
}
data.shadow_points.insert(sender, PartialDecryptionResult {
@ -377,34 +387,169 @@ impl SessionImpl {
return Ok(());
}
// notify all other nodes about session completion
self.cluster.broadcast(Message::Decryption(DecryptionMessage::DecryptionSessionCompleted(DecryptionSessionCompleted {
session: self.id.clone().into(),
sub_session: self.access_key.clone().into(),
})))?;
// do decryption
SessionImpl::do_decryption(self.access_key.clone(), &self.encrypted_data, &mut *data)?;
self.completed.notify_all();
Ok(())
}
/// When error has occured on another node.
pub fn on_session_error(&self, sender: NodeId, message: &DecryptionSessionError) {
warn!("{}: decryption session error: {:?} from {}", self.node(), message, sender);
/// When session is completed.
pub fn on_session_completed(&self, sender: NodeId, message: &DecryptionSessionCompleted) -> Result<(), Error> {
debug_assert!(self.id == *message.session);
debug_assert!(self.access_key == *message.sub_session);
debug_assert!(&sender != self.node());
let mut data = self.data.lock();
// check state
if data.state != SessionState::WaitingForPartialDecryptionRequest {
return Err(Error::InvalidStateForRequest);
}
if data.master != Some(sender) {
return Err(Error::InvalidMessage);
}
// update state
data.state = SessionState::Finished;
Ok(())
}
/// When error has occured on another node.
pub fn on_session_error(&self, sender: NodeId, message: &DecryptionSessionError) -> Result<(), Error> {
let mut data = self.data.lock();
warn!("{}: decryption session failed with error: {:?} from {}", self.node(), message.error, sender);
data.state = SessionState::Failed;
data.decrypted_secret = Some(Err(Error::Io(message.error.clone())));
self.completed.notify_all();
Ok(())
}
/// When connection to one of cluster nodes has timeouted.
pub fn on_node_timeout(&self, node: &NodeId) {
let mut data = self.data.lock();
let is_self_master = data.master.as_ref() == Some(self.node());
let is_other_master = data.master.as_ref() == Some(node);
// if this is master node, we might have to restart
if is_self_master {
match data.state {
SessionState::WaitingForInitializationConfirm => {
// we will never receive confirmation from this node => treat as reject
if data.requested_nodes.remove(node) || data.confirmed_nodes.remove(node) {
data.rejected_nodes.insert(node.clone());
}
// check if we still have enough nodes for decryption
if self.encrypted_data.id_numbers.len() - data.rejected_nodes.len() >= self.encrypted_data.threshold + 1 {
return;
}
}
SessionState::WaitingForPartialDecryption => {
if data.rejected_nodes.contains(node) {
// already rejected => does not affect session
return;
}
if data.requested_nodes.remove(node) {
// we have tried to initialize this node, but it has failed
// => no restart required, just mark as rejected
data.rejected_nodes.insert(node.clone());
return;
}
if data.confirmed_nodes.contains(node) {
if data.shadow_points.contains_key(node) {
// we have already received partial decryption from this node
// => just ignore this connection drop
return;
}
// the worst case: we have sent partial decryption request to other nodes
// => we have to restart the session
data.confirmed_nodes.remove(node);
data.rejected_nodes.insert(node.clone());
// check if we still have enough nodes for decryption
if self.encrypted_data.id_numbers.len() - data.rejected_nodes.len() >= self.encrypted_data.threshold + 1 {
// we are going to stop session anyway => ignore error
let _ = SessionImpl::start_waiting_for_partial_decryption(self.node().clone(), self.id.clone(), self.access_key.clone(), &self.cluster, &self.encrypted_data, &mut *data);
return;
}
// not enough nodes
}
}
_ => (), // all other states lead to failures
}
} else if !is_other_master {
// disconnected from non-master node on non-master node
// => this does not affect this session
return;
}
// else: disconnecting from master node means failure
// no more nodes left for decryption => fail
warn!("{}: decryption session failed because {} connection has timeouted", self.node(), node);
data.state = SessionState::Failed;
data.decrypted_secret = Some(Err(Error::NodeDisconnected));
self.completed.notify_all();
}
/// When session timeout has occured.
pub fn on_session_timeout(&self, _node: &NodeId) {
warn!("{}: decryption session timeout", self.node());
pub fn on_session_timeout(&self) {
let mut data = self.data.lock();
// TODO: check that node is a part of decryption process
let is_self_master = data.master.as_ref() == Some(self.node());
// if this is master node, we might have to restart
if is_self_master {
match data.state {
SessionState::WaitingForInitializationConfirm =>
// we have sent initialization requests to all nodes, but haven't received confirmation
// => nodes will never respond => fail
(),
SessionState::WaitingForPartialDecryption => {
// we have requested partial decryption, but some nodes have failed to respond
// => mark these nodes as rejected && restart
for timeouted_node in data.shadow_requests.iter().cloned().collect::<Vec<_>>() {
data.confirmed_nodes.remove(&timeouted_node);
data.rejected_nodes.insert(timeouted_node);
}
// check if we still have enough nodes for decryption
if self.encrypted_data.id_numbers.len() - data.rejected_nodes.len() >= self.encrypted_data.threshold + 1 {
// we are going to stop session anyway => ignore error
let _ = SessionImpl::start_waiting_for_partial_decryption(self.node().clone(), self.id.clone(), self.access_key.clone(), &self.cluster, &self.encrypted_data, &mut *data);
return;
}
},
// no nodes has responded to our requests => session is failed
_ => return,
}
}
// no more nodes left for decryption => fail
warn!("{}: decryption session failed with timeout", self.node());
data.state = SessionState::Failed;
data.decrypted_secret = Some(Err(Error::Io("session expired".into())));
data.decrypted_secret = Some(Err(Error::NodeDisconnected));
self.completed.notify_all();
}
fn start_waiting_for_partial_decryption(self_node_id: NodeId, session_id: SessionId, access_key: Secret, cluster: &Arc<Cluster>, encrypted_data: &DocumentKeyShare, data: &mut SessionData) -> Result<(), Error> {
let confirmed_nodes: BTreeSet<_> = data.confirmed_nodes.clone();
for node in data.confirmed_nodes.iter().filter(|n| n != &&self_node_id) {
let confirmed_nodes: BTreeSet<_> = confirmed_nodes.difference(&data.rejected_nodes).cloned().collect();
data.shadow_requests.clear();
data.shadow_points.clear();
for node in confirmed_nodes.iter().filter(|n| n != &&self_node_id) {
data.shadow_requests.insert(node.clone());
cluster.send(node, Message::Decryption(DecryptionMessage::RequestPartialDecryption(RequestPartialDecryption {
session: session_id.clone().into(),
sub_session: access_key.clone().into(),
@ -595,7 +740,13 @@ mod tests {
encrypted_point: encrypted_point.clone(),
}).collect();
let acl_storages: Vec<_> = (0..5).map(|_| Arc::new(DummyAclStorage::default())).collect();
let clusters: Vec<_> = (0..5).map(|i| Arc::new(DummyCluster::new(id_numbers.iter().nth(i).clone().unwrap().0))).collect();
let clusters: Vec<_> = (0..5).map(|i| {
let cluster = Arc::new(DummyCluster::new(id_numbers.iter().nth(i).clone().unwrap().0));
for id_number in &id_numbers {
cluster.add_node(id_number.0.clone());
}
cluster
}).collect();
let sessions: Vec<_> = (0..5).map(|i| SessionImpl::new(SessionParams {
id: session_id.clone(),
access_key: access_key.clone(),
@ -624,13 +775,14 @@ mod tests {
Message::Decryption(DecryptionMessage::ConfirmDecryptionInitialization(message)) => session.on_confirm_initialization(from, &message).unwrap(),
Message::Decryption(DecryptionMessage::RequestPartialDecryption(message)) => session.on_partial_decryption_requested(from, &message).unwrap(),
Message::Decryption(DecryptionMessage::PartialDecryption(message)) => session.on_partial_decryption(from, &message).unwrap(),
Message::Decryption(DecryptionMessage::DecryptionSessionCompleted(message)) => session.on_session_completed(from, &message).unwrap(),
_ => panic!("unexpected"),
}
}
}
#[test]
fn fails_to_construct_in_cluster_of_single_node() {
fn constructs_in_cluster_of_single_node() {
let mut nodes = BTreeMap::new();
let self_node_id = Random.generate().unwrap().public().clone();
nodes.insert(self_node_id, Random.generate().unwrap().secret().clone());
@ -648,7 +800,7 @@ mod tests {
acl_storage: Arc::new(DummyAclStorage::default()),
cluster: Arc::new(DummyCluster::new(self_node_id.clone())),
}) {
Err(Error::InvalidNodesCount) => (),
Ok(_) => (),
_ => panic!("unexpected"),
}
}
@ -722,27 +874,6 @@ mod tests {
}).unwrap_err(), Error::InvalidStateForRequest);
}
#[test]
fn fails_to_partial_decrypt_if_not_waiting() {
let (_, _, sessions) = prepare_decryption_sessions();
assert_eq!(sessions[1].on_initialize_session(sessions[0].node().clone(), &message::InitializeDecryptionSession {
session: SessionId::default().into(),
sub_session: sessions[0].access_key().clone().into(),
requestor_signature: ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap().into(),
is_shadow_decryption: false,
}).unwrap(), ());
assert_eq!(sessions[1].on_partial_decryption_requested(sessions[0].node().clone(), &message::RequestPartialDecryption {
session: SessionId::default().into(),
sub_session: sessions[0].access_key().clone().into(),
nodes: sessions.iter().map(|s| s.node().clone().into()).take(4).collect(),
}).unwrap(), ());
assert_eq!(sessions[1].on_partial_decryption_requested(sessions[0].node().clone(), &message::RequestPartialDecryption {
session: SessionId::default().into(),
sub_session: sessions[0].access_key().clone().into(),
nodes: sessions.iter().map(|s| s.node().clone().into()).take(4).collect(),
}).unwrap_err(), Error::InvalidStateForRequest);
}
#[test]
fn fails_to_partial_decrypt_if_requested_by_slave() {
let (_, _, sessions) = prepare_decryption_sessions();
@ -806,6 +937,106 @@ mod tests {
assert_eq!(sessions[0].on_partial_decryption(pd_from.unwrap(), &pd_msg.unwrap()).unwrap_err(), Error::InvalidStateForRequest);
}
#[test]
fn decryption_fails_on_session_timeout() {
let (_, _, sessions) = prepare_decryption_sessions();
assert!(sessions[0].decrypted_secret().is_none());
sessions[0].on_session_timeout();
assert!(sessions[0].decrypted_secret().unwrap().unwrap_err() == Error::NodeDisconnected);
}
#[test]
fn node_is_marked_rejected_when_timed_out_during_initialization_confirmation() {
let (_, _, sessions) = prepare_decryption_sessions();
sessions[0].initialize(ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap(), false).unwrap();
// 1 node disconnects => we still can recover secret
sessions[0].on_node_timeout(sessions[1].node());
assert!(sessions[0].data.lock().rejected_nodes.contains(sessions[1].node()));
assert!(sessions[0].data.lock().state == SessionState::WaitingForInitializationConfirm);
// 2 node are disconnected => we can not recover secret
sessions[0].on_node_timeout(sessions[2].node());
assert!(sessions[0].data.lock().rejected_nodes.contains(sessions[2].node()));
assert!(sessions[0].data.lock().state == SessionState::Failed);
}
#[test]
fn session_does_not_fail_if_rejected_node_disconnects() {
let (clusters, acl_storages, sessions) = prepare_decryption_sessions();
let key_pair = Random.generate().unwrap();
acl_storages[1].prohibit(key_pair.public().clone(), SessionId::default());
sessions[0].initialize(ethkey::sign(key_pair.secret(), &SessionId::default()).unwrap(), false).unwrap();
do_messages_exchange_until(&clusters, &sessions, |_, _, _| sessions[0].state() == SessionState::WaitingForPartialDecryption);
// 1st node disconnects => ignore this
sessions[0].on_node_timeout(sessions[1].node());
assert!(sessions[0].data.lock().state == SessionState::WaitingForPartialDecryption);
}
#[test]
fn session_does_not_fail_if_requested_node_disconnects() {
let (clusters, _, sessions) = prepare_decryption_sessions();
sessions[0].initialize(ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap(), false).unwrap();
do_messages_exchange_until(&clusters, &sessions, |_, _, _| sessions[0].state() == SessionState::WaitingForPartialDecryption);
// 1 node disconnects => we still can recover secret
sessions[0].on_node_timeout(sessions[1].node());
assert!(sessions[0].data.lock().state == SessionState::WaitingForPartialDecryption);
// 2 node are disconnected => we can not recover secret
sessions[0].on_node_timeout(sessions[2].node());
assert!(sessions[0].data.lock().state == SessionState::Failed);
}
#[test]
fn session_does_not_fail_if_node_with_shadow_point_disconnects() {
let (clusters, _, sessions) = prepare_decryption_sessions();
sessions[0].initialize(ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap(), false).unwrap();
do_messages_exchange_until(&clusters, &sessions, |_, _, _| sessions[0].state() == SessionState::WaitingForPartialDecryption
&& sessions[0].data.lock().shadow_points.len() == 2);
// disconnects from the node which has already sent us its own shadow point
let disconnected = sessions[0].data.lock().
shadow_points.keys()
.filter(|n| *n != sessions[0].node())
.cloned().nth(0).unwrap();
sessions[0].on_node_timeout(&disconnected);
assert!(sessions[0].data.lock().state == SessionState::WaitingForPartialDecryption);
}
#[test]
fn session_restarts_if_confirmed_node_disconnects() {
let (clusters, _, sessions) = prepare_decryption_sessions();
sessions[0].initialize(ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap(), false).unwrap();
do_messages_exchange_until(&clusters, &sessions, |_, _, _| sessions[0].state() == SessionState::WaitingForPartialDecryption);
// disconnects from the node which has already confirmed its participation
let disconnected = sessions[0].data.lock().shadow_requests.iter().cloned().nth(0).unwrap();
sessions[0].on_node_timeout(&disconnected);
assert!(sessions[0].data.lock().state == SessionState::WaitingForPartialDecryption);
assert!(sessions[0].data.lock().rejected_nodes.contains(&disconnected));
assert!(!sessions[0].data.lock().shadow_requests.contains(&disconnected));
}
#[test]
fn session_does_not_fail_if_non_master_node_disconnects_from_non_master_node() {
let (clusters, _, sessions) = prepare_decryption_sessions();
sessions[0].initialize(ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap(), false).unwrap();
do_messages_exchange_until(&clusters, &sessions, |_, _, _| sessions[0].state() == SessionState::WaitingForPartialDecryption);
// disconnects from the node which has already confirmed its participation
sessions[1].on_node_timeout(sessions[2].node());
assert!(sessions[0].data.lock().state == SessionState::WaitingForPartialDecryption);
assert!(sessions[1].data.lock().state == SessionState::WaitingForPartialDecryptionRequest);
}
#[test]
fn complete_dec_session() {
let (clusters, _, sessions) = prepare_decryption_sessions();
@ -818,17 +1049,16 @@ mod tests {
do_messages_exchange(&clusters, &sessions);
// now check that:
// 1) 4 of 5 sessions are in Finished state
assert_eq!(sessions.iter().filter(|s| s.state() == SessionState::Finished).count(), 4);
// 2) 1 session is in WaitingForPartialDecryptionRequest state
assert_eq!(sessions.iter().filter(|s| s.state() == SessionState::WaitingForPartialDecryptionRequest).count(), 1);
// 3) 1 session has decrypted key value
// 1) 5 of 5 sessions are in Finished state
assert_eq!(sessions.iter().filter(|s| s.state() == SessionState::Finished).count(), 5);
// 2) 1 session has decrypted key value
assert!(sessions.iter().skip(1).all(|s| s.decrypted_secret().is_none()));
assert_eq!(sessions[0].decrypted_secret(), Some(DocumentEncryptedKeyShadow {
assert_eq!(sessions[0].decrypted_secret().unwrap().unwrap(), DocumentEncryptedKeyShadow {
decrypted_secret: SECRET_PLAIN.into(),
common_point: None,
decrypt_shadows: None,
}));
});
}
#[test]
@ -843,14 +1073,12 @@ mod tests {
do_messages_exchange(&clusters, &sessions);
// now check that:
// 1) 4 of 5 sessions are in Finished state
assert_eq!(sessions.iter().filter(|s| s.state() == SessionState::Finished).count(), 4);
// 2) 1 session is in WaitingForPartialDecryptionRequest state
assert_eq!(sessions.iter().filter(|s| s.state() == SessionState::WaitingForPartialDecryptionRequest).count(), 1);
// 3) 1 session has decrypted key value
// 1) 5 of 5 sessions are in Finished state
assert_eq!(sessions.iter().filter(|s| s.state() == SessionState::Finished).count(), 5);
// 2) 1 session has decrypted key value
assert!(sessions.iter().skip(1).all(|s| s.decrypted_secret().is_none()));
let decrypted_secret = sessions[0].decrypted_secret().unwrap();
let decrypted_secret = sessions[0].decrypted_secret().unwrap().unwrap();
// check that decrypted_secret != SECRET_PLAIN
assert!(decrypted_secret.decrypted_secret != SECRET_PLAIN.into());
// check that common point && shadow coefficients are returned
@ -879,7 +1107,8 @@ mod tests {
acl_storages[1].prohibit(key_pair.public().clone(), SessionId::default());
acl_storages[2].prohibit(key_pair.public().clone(), SessionId::default());
do_messages_exchange(&clusters, &sessions);
let node3 = sessions[3].node().clone();
do_messages_exchange_until(&clusters, &sessions, |from, _, _msg| from == &node3);
// now check that:
// 1) 3 of 5 sessions are in Failed state
@ -887,7 +1116,7 @@ mod tests {
// 2) 2 of 5 sessions are in WaitingForPartialDecryptionRequest state
assert_eq!(sessions.iter().filter(|s| s.state() == SessionState::WaitingForPartialDecryptionRequest).count(), 2);
// 3) 0 sessions have decrypted key value
assert!(sessions.iter().all(|s| s.decrypted_secret().is_none()));
assert!(sessions.iter().all(|s| s.decrypted_secret().is_none() || s.decrypted_secret().unwrap().is_err()));
}
#[test]
@ -910,11 +1139,11 @@ mod tests {
assert_eq!(sessions.iter().filter(|s| s.state() == SessionState::Finished).count(), 5);
// 2) 1 session has decrypted key value
assert!(sessions.iter().skip(1).all(|s| s.decrypted_secret().is_none()));
assert_eq!(sessions[0].decrypted_secret(), Some(DocumentEncryptedKeyShadow {
assert_eq!(sessions[0].decrypted_secret().unwrap().unwrap(), DocumentEncryptedKeyShadow {
decrypted_secret: SECRET_PLAIN.into(),
common_point: None,
decrypt_shadows: None,
}));
});
}
#[test]

View File

@ -16,6 +16,7 @@
use std::collections::{BTreeSet, BTreeMap, VecDeque};
use std::fmt::{Debug, Formatter, Error as FmtError};
use std::time;
use std::sync::Arc;
use parking_lot::{Condvar, Mutex};
use ethkey::{Public, Secret};
@ -23,15 +24,18 @@ use key_server_cluster::{Error, NodeId, SessionId, KeyStorage, DocumentKeyShare}
use key_server_cluster::math;
use key_server_cluster::cluster::Cluster;
use key_server_cluster::message::{Message, EncryptionMessage, InitializeSession, ConfirmInitialization, CompleteInitialization,
KeysDissemination, Complaint, ComplaintResponse, PublicKeyShare, SessionError, SessionCompleted};
KeysDissemination, PublicKeyShare, SessionError, SessionCompleted};
/// Encryption session API.
pub trait Session: Send + Sync + 'static {
/// Get encryption session state.
fn state(&self) -> SessionState;
/// Wait until session is completed. Returns distributely generated secret key.
fn wait(&self, timeout: Option<time::Duration>) -> Result<Public, Error>;
#[cfg(test)]
/// Get joint public key (if it is known).
fn joint_public_key(&self) -> Option<Public>;
/// Wait until session is completed. Returns distributely generated secret key.
fn wait(&self) -> Result<Public, Error>;
fn joint_public_key(&self) -> Option<Result<Public, Error>>;
}
/// Encryption (distributed key generation) session.
@ -40,10 +44,9 @@ pub trait Session: Send + Sync + 'static {
/// Brief overview:
/// 1) initialization: master node (which has received request for generating joint public + secret) initializes the session on all other nodes
/// 2) key dissemination (KD): all nodes are generating secret + public values and send these to appropriate nodes
/// 3) key verification (KV): all nodes are checking values, received for other nodes and complaining if keys are wrong
/// 4) key check phase (KC): nodes are processing complaints, received from another nodes
/// 5) key generation phase (KG): nodes are exchanging with information, enough to generate joint public key
/// 6) encryption phase: master node generates secret key, encrypts it using joint public && broadcasts encryption result
/// 3) key verification (KV): all nodes are checking values, received for other nodes
/// 4) key generation phase (KG): nodes are exchanging with information, enough to generate joint public key
/// 5) encryption phase: master node generates secret key, encrypts it using joint public && broadcasts encryption result
pub struct SessionImpl {
/// Unique session id.
id: SessionId,
@ -76,6 +79,8 @@ pub struct SessionParams {
struct SessionData {
/// Current state of the session.
state: SessionState,
/// Simulate faulty behaviour?
simulate_faulty_behaviour: bool,
// === Values, filled when session initialization just starts ===
/// Reference to the node, which has started this session.
@ -123,10 +128,6 @@ struct NodeData {
/// Public values, which have been received from this node.
pub publics: Option<Vec<Public>>,
// === Values, filled during KC phase ===
/// Nodes, complaining against this node.
pub complaints: BTreeSet<NodeId>,
// === Values, filled during KG phase ===
/// Public share, which has been received from this node.
pub public_share: Option<Public>,
@ -163,10 +164,6 @@ pub enum SessionState {
/// Node is waiting for generated keys from every other node.
WaitingForKeysDissemination,
// === KC phase states ===
/// Keys check currently occurs.
KeyCheck,
// === KG phase states ===
/// Node is waiting for joint public key share to be received from every other node.
WaitingForPublicKeyShare,
@ -193,6 +190,7 @@ impl SessionImpl {
completed: Condvar::new(),
data: Mutex::new(SessionData {
state: SessionState::WaitingForInitialization,
simulate_faulty_behaviour: false,
master: None,
threshold: None,
derived_point: None,
@ -205,31 +203,20 @@ impl SessionImpl {
}
}
/// Get this session Id.
pub fn id(&self) -> &SessionId {
&self.id
}
/// Get this node Id.
pub fn node(&self) -> &NodeId {
&self.self_node_id
}
/// Get current session state.
pub fn state(&self) -> SessionState {
self.data.lock().state.clone()
}
#[cfg(test)]
/// Get derived point.
pub fn derived_point(&self) -> Option<Public> {
self.data.lock().derived_point.clone()
}
#[cfg(test)]
/// Get qualified nodes.
pub fn qualified_nodes(&self) -> BTreeSet<NodeId> {
self.data.lock().nodes.keys().cloned().collect()
/// Simulate faulty encryption session behaviour.
pub fn simulate_faulty_behaviour(&self) {
self.data.lock().simulate_faulty_behaviour = true;
}
/// Start new session initialization. This must be called on master node.
@ -254,15 +241,25 @@ impl SessionImpl {
}
let mut visit_policy = EveryOtherNodeVisitor::new(self.node(), data.nodes.keys().cloned());
let next_node = visit_policy.next_node().expect("at least two nodes are in cluster");
data.state = SessionState::WaitingForInitializationConfirm(visit_policy);
// start initialization
let derived_point = math::generate_random_point()?;
self.cluster.send(&next_node, Message::Encryption(EncryptionMessage::InitializeSession(InitializeSession {
session: self.id.clone().into(),
derived_point: derived_point.into(),
})))
match visit_policy.next_node() {
Some(next_node) => {
data.state = SessionState::WaitingForInitializationConfirm(visit_policy);
// start initialization
self.cluster.send(&next_node, Message::Encryption(EncryptionMessage::InitializeSession(InitializeSession {
session: self.id.clone().into(),
derived_point: derived_point.into(),
})))
},
None => {
drop(data);
self.complete_initialization(derived_point)?;
self.disseminate_keys()?;
self.verify_keys()?;
self.complete_encryption()
}
}
}
/// When session initialization message is received.
@ -315,33 +312,16 @@ impl SessionImpl {
};
// proceed message
match next_receiver {
Some(next_receiver) => {
return self.cluster.send(&next_receiver, Message::Encryption(EncryptionMessage::InitializeSession(InitializeSession {
session: self.id.clone().into(),
derived_point: message.derived_point.clone().into(),
})));
},
None => {
// update point once again to make sure that derived point is not generated by last node
let mut derived_point = message.derived_point.clone().into();
math::update_random_point(&mut derived_point)?;
// remember derived point
data.derived_point = Some(derived_point.clone().into());
// broadcast derived point && other session paraeters to every other node
self.cluster.broadcast(Message::Encryption(EncryptionMessage::CompleteInitialization(CompleteInitialization {
if let Some(next_receiver) = next_receiver {
return self.cluster.send(&next_receiver, Message::Encryption(EncryptionMessage::InitializeSession(InitializeSession {
session: self.id.clone().into(),
nodes: data.nodes.iter().map(|(id, data)| (id.clone().into(), data.id_number.clone().into())).collect(),
threshold: data.threshold.expect("threshold is filled in initialization phase; KD phase follows initialization phase; qed"),
derived_point: derived_point.into(),
})))?;
},
derived_point: message.derived_point.clone().into(),
})));
}
// now it is time for keys dissemination (KD) phase
drop(data);
self.complete_initialization(message.derived_point.clone().into())?;
self.disseminate_keys()
}
@ -382,6 +362,11 @@ impl SessionImpl {
let mut data = self.data.lock();
// simulate failure, if required
if data.simulate_faulty_behaviour {
return Err(Error::Io("simulated error".into()));
}
// check state
if data.state != SessionState::WaitingForKeysDissemination {
match data.state {
@ -414,150 +399,8 @@ impl SessionImpl {
return Ok(())
}
// key verification (KV) phase: check that other nodes have passed correct secrets
let derived_point = data.derived_point.clone().expect("derived point generated on initialization phase; KV phase follows initialization phase; qed");
let number_id = data.nodes[self.node()].id_number.clone();
for (node_id, node_data) in data.nodes.iter_mut().filter(|&(node_id, _)| node_id != self.node()) {
let secret1 = node_data.secret1.as_ref().expect("keys received on KD phase; KV phase follows KD phase; qed");
let secret2 = node_data.secret2.as_ref().expect("keys received on KD phase; KV phase follows KD phase; qed");
let publics = node_data.publics.as_ref().expect("keys received on KD phase; KV phase follows KD phase; qed");
let is_key_verification_ok = math::keys_verification(threshold, &derived_point, &number_id,
secret1, secret2, publics)?;
if !is_key_verification_ok {
node_data.complaints.insert(self.node().clone());
self.cluster.broadcast(Message::Encryption(EncryptionMessage::Complaint(Complaint {
session: self.id.clone().into(),
against: node_id.clone().into(),
})))?;
}
}
// update state
data.state = SessionState::KeyCheck;
Ok(())
}
/// When complaint is received.
pub fn on_complaint(&self, sender: NodeId, message: &Complaint) -> Result<(), Error> {
debug_assert!(self.id == *message.session);
debug_assert!(&sender != self.node());
let mut data = self.data.lock();
debug_assert!(data.nodes.contains_key(&sender));
// check state
if data.state != SessionState::KeyCheck && data.state != SessionState::WaitingForKeysDissemination {
// message can be received after timeout is passed
// => encryption is already in progress
// => we can not treat it as an error
return Ok(());
}
// respond to complaint
if &*message.against == self.node() {
let secret1_sent = data.nodes[&sender].secret1_sent.clone().expect("secrets were sent on KD phase; KC phase follows KD phase; qed");
let secret2_sent = data.nodes[&sender].secret2_sent.clone().expect("secrets were sent on KD phase; KC phase follows KD phase; qed");
// someone is complaining against us => let's respond
return self.cluster.broadcast(Message::Encryption(EncryptionMessage::ComplaintResponse(ComplaintResponse {
session: self.id.clone().into(),
secret1: secret1_sent.into(),
secret2: secret2_sent.into(),
})));
}
// someone is complaining against someone else => let's remember this
let threshold = data.threshold.expect("threshold is filled in initialization phase; KD phase follows initialization phase; qed");
let is_critical_complaints_num = {
let node = data.nodes.get_mut(&message.against).ok_or(Error::InvalidMessage)?;
node.complaints.insert(sender);
node.complaints.len() >= threshold + 1
};
if is_critical_complaints_num {
// too many complaints => exclude from session
SessionImpl::disqualify_node(&message.against, &*self.cluster, &mut *data);
}
Ok(())
}
/// When complaint response is received
pub fn on_complaint_response(&self, sender: NodeId, message: &ComplaintResponse) -> Result<(), Error> {
debug_assert!(self.id == *message.session);
debug_assert!(&sender != self.node());
let mut data = self.data.lock();
debug_assert!(data.nodes.contains_key(&sender));
// check state
if data.state != SessionState::KeyCheck {
// in theory this message can be received before KeyCheck (in WaitingForKeysDissemination state)
// => but the fact that someone is complaining about keys means that sender has already sent its keys
// => complaint response can't reach us before keys, as the cluster guarantees that messages are received in FIFO order
// message can be received after timeout is passed
// => encryption is already in progress
// => we can not treat it as an error
return Ok(());
}
// check keys again
let is_key_verification_ok = {
let threshold = data.threshold.expect("threshold is filled in initialization phase; KD phase follows initialization phase; qed");
let derived_point = data.derived_point.as_ref().expect("derived point generated on initialization phase; KV phase follows initialization phase; qed");
let number_id = &data.nodes[self.node()].id_number;
let node_data = data.nodes.get(&sender).expect("cluster guarantees to deliver messages from qualified nodes only; qed");
let publics = node_data.publics.as_ref().expect("keys received on KD phase; KV phase follows KD phase; qed");
math::keys_verification(threshold, derived_point, number_id, &message.secret1, &message.secret2, publics)?
};
if !is_key_verification_ok {
SessionImpl::disqualify_node(&sender, &*self.cluster, &mut *data);
} else {
let node_data = data.nodes.get_mut(&sender).expect("cluster guarantees to deliver messages from qualified nodes only; qed");
node_data.secret1 = Some(message.secret1.clone().into());
node_data.secret2 = Some(message.secret2.clone().into());
node_data.complaints.remove(self.node());
}
Ok(())
}
/// When KC-phase timeout is expired, it is time to start KG phase.
pub fn start_key_generation_phase(&self) -> Result<(), Error> {
let mut data = self.data.lock();
if data.state != SessionState::KeyCheck {
return Err(Error::InvalidStateForRequest);
}
// calculate public share
let self_public_share = {
let self_secret_coeff = data.secret_coeff.as_ref().expect("secret_coeff is generated on KD phase; KG phase follows KD phase; qed");
math::compute_public_share(self_secret_coeff)?
};
// calculate self secret + public shares
let self_secret_share = {
let secret_values_iter = data.nodes.values()
.map(|n| n.secret1.as_ref().expect("keys received on KD phase; KG phase follows KD phase; qed"));
math::compute_secret_share(secret_values_iter)?
};
// update state
data.state = SessionState::WaitingForPublicKeyShare;
data.secret_share = Some(self_secret_share);
let self_node = data.nodes.get_mut(self.node()).expect("node is always qualified by himself; qed");
self_node.public_share = Some(self_public_share.clone());
// broadcast self public key share
self.cluster.broadcast(Message::Encryption(EncryptionMessage::PublicKeyShare(PublicKeyShare {
session: self.id.clone().into(),
public_share: self_public_share.into(),
})))
drop(data);
self.verify_keys()
}
/// When public key share is received.
@ -568,8 +411,7 @@ impl SessionImpl {
if data.state != SessionState::WaitingForPublicKeyShare {
match data.state {
SessionState::WaitingForInitializationComplete |
SessionState::WaitingForKeysDissemination |
SessionState::KeyCheck => return Err(Error::TooEarlyForRequest),
SessionState::WaitingForKeysDissemination => return Err(Error::TooEarlyForRequest),
_ => return Err(Error::InvalidStateForRequest),
}
}
@ -589,52 +431,8 @@ impl SessionImpl {
return Ok(());
}
// else - calculate joint public key
let joint_public = {
let public_shares = data.nodes.values().map(|n| n.public_share.as_ref().expect("keys received on KD phase; KG phase follows KD phase; qed"));
math::compute_joint_public(public_shares)?
};
// if we are at the slave node - wait for session completion
if data.master.as_ref() != Some(self.node()) {
data.joint_public = Some(Ok(joint_public));
data.state = SessionState::WaitingForEncryptionConfirmation;
return Ok(());
}
// then generate secret point
// then encrypt secret point with joint public key
let secret_point = math::generate_random_point()?;
let encrypted_secret_point = math::encrypt_secret(&secret_point, &joint_public)?;
// then save encrypted data to the key storage
let encrypted_data = DocumentKeyShare {
threshold: data.threshold.expect("threshold is filled in initialization phase; KG phase follows initialization phase; qed"),
id_numbers: data.nodes.iter().map(|(node_id, node_data)| (node_id.clone(), node_data.id_number.clone())).collect(),
secret_share: data.secret_share.as_ref().expect("secret_share is filled in KG phase; we are at the end of KG phase; qed").clone(),
common_point: encrypted_secret_point.common_point,
encrypted_point: encrypted_secret_point.encrypted_point,
};
self.key_storage.insert(self.id.clone(), encrypted_data.clone())
.map_err(|e| Error::KeyStorage(e.into()))?;
// then distribute encrypted data to every other node
self.cluster.broadcast(Message::Encryption(EncryptionMessage::SessionCompleted(SessionCompleted {
session: self.id.clone().into(),
common_point: encrypted_data.common_point.clone().into(),
encrypted_point: encrypted_data.encrypted_point.clone().into(),
})))?;
// then wait for confirmation from all other nodes
{
let self_node = data.nodes.get_mut(self.node()).expect("node is always qualified by himself; qed");
self_node.completion_confirmed = true;
}
data.joint_public = Some(Ok(joint_public));
data.secret_point = Some(Ok(secret_point));
data.state = SessionState::WaitingForEncryptionConfirmation;
Ok(())
drop(data);
self.complete_encryption()
}
/// When session completion message is received.
@ -703,35 +501,63 @@ impl SessionImpl {
}
/// When error has occured on another node.
pub fn on_session_error(&self, sender: NodeId, message: &SessionError) {
warn!("{}: encryption session error: {:?} from {}", self.node(), message, sender);
pub fn on_session_error(&self, sender: NodeId, message: &SessionError) -> Result<(), Error> {
let mut data = self.data.lock();
warn!("{}: encryption session failed with error: {} from {}", self.node(), message.error, sender);
data.state = SessionState::Failed;
data.joint_public = Some(Err(Error::Io(message.error.clone())));
data.secret_point = Some(Err(Error::Io(message.error.clone())));
self.completed.notify_all();
Ok(())
}
/// When connection to one of cluster nodes has timeouted.
pub fn on_node_timeout(&self, node: &NodeId) {
let mut data = self.data.lock();
// all nodes are required for encryption session
// => fail without check
warn!("{}: encryption session failed because {} connection has timeouted", self.node(), node);
data.state = SessionState::Failed;
data.joint_public = Some(Err(Error::NodeDisconnected));
data.secret_point = Some(Err(Error::NodeDisconnected));
self.completed.notify_all();
}
/// When session timeout has occured.
pub fn on_session_timeout(&self, node: &NodeId) {
warn!("{}: encryption session timeout", self.node());
pub fn on_session_timeout(&self) {
let mut data = self.data.lock();
match data.state {
SessionState::WaitingForInitialization |
SessionState::WaitingForInitializationConfirm(_) |
SessionState::WaitingForInitializationComplete => (),
_ => if !data.nodes.contains_key(node) {
return;
},
}
warn!("{}: encryption session failed with timeout", self.node());
data.state = SessionState::Failed;
data.joint_public = Some(Err(Error::Io("session expired".into())));
data.secret_point = Some(Err(Error::Io("session expired".into())));
data.joint_public = Some(Err(Error::NodeDisconnected));
data.secret_point = Some(Err(Error::NodeDisconnected));
self.completed.notify_all();
}
/// Complete initialization (when all other nodex has responded with confirmation)
fn complete_initialization(&self, mut derived_point: Public) -> Result<(), Error> {
// update point once again to make sure that derived point is not generated by last node
math::update_random_point(&mut derived_point)?;
// remember derived point
let mut data = self.data.lock();
data.derived_point = Some(derived_point.clone().into());
// broadcast derived point && other session paraeters to every other node
self.cluster.broadcast(Message::Encryption(EncryptionMessage::CompleteInitialization(CompleteInitialization {
session: self.id.clone().into(),
nodes: data.nodes.iter().map(|(id, data)| (id.clone().into(), data.id_number.clone().into())).collect(),
threshold: data.threshold.expect("threshold is filled in initialization phase; KD phase follows initialization phase; qed"),
derived_point: derived_point.into(),
})))
}
/// Keys dissemination (KD) phase
fn disseminate_keys(&self) -> Result<(), Error> {
let mut data = self.data.lock();
@ -777,34 +603,124 @@ impl SessionImpl {
Ok(())
}
/// Disqualify node
fn disqualify_node(node: &NodeId, cluster: &Cluster, data: &mut SessionData) {
let threshold = data.threshold
.expect("threshold is filled on initialization phase; node can only be disqualified during KC phase; KC phase follows initialization phase; qed");
/// Keys verification (KV) phase
fn verify_keys(&self) -> Result<(), Error> {
let mut data = self.data.lock();
// key verification (KV) phase: check that other nodes have passed correct secrets
let threshold = data.threshold.expect("threshold is filled in initialization phase; KV phase follows initialization phase; qed");
let derived_point = data.derived_point.clone().expect("derived point generated on initialization phase; KV phase follows initialization phase; qed");
let number_id = data.nodes[self.node()].id_number.clone();
for (_ , node_data) in data.nodes.iter_mut().filter(|&(node_id, _)| node_id != self.node()) {
let secret1 = node_data.secret1.as_ref().expect("keys received on KD phase; KV phase follows KD phase; qed");
let secret2 = node_data.secret2.as_ref().expect("keys received on KD phase; KV phase follows KD phase; qed");
let publics = node_data.publics.as_ref().expect("keys received on KD phase; KV phase follows KD phase; qed");
let is_key_verification_ok = math::keys_verification(threshold, &derived_point, &number_id,
secret1, secret2, publics)?;
// blacklist node
cluster.blacklist(&node);
// too many complaints => exclude from session
data.nodes.remove(&node);
// check if secret still can be reconstructed
if data.nodes.len() < threshold + 1 {
// not enough nodes => session is failed
data.state = SessionState::Failed;
if !is_key_verification_ok {
// node has sent us incorrect values. In original ECDKG protocol we should have sent complaint here.
return Err(Error::InvalidMessage);
}
}
// calculate public share
let self_public_share = {
let self_secret_coeff = data.secret_coeff.as_ref().expect("secret_coeff is generated on KD phase; KG phase follows KD phase; qed");
math::compute_public_share(self_secret_coeff)?
};
// calculate self secret + public shares
let self_secret_share = {
let secret_values_iter = data.nodes.values()
.map(|n| n.secret1.as_ref().expect("keys received on KD phase; KG phase follows KD phase; qed"));
math::compute_secret_share(secret_values_iter)?
};
// update state
data.state = SessionState::WaitingForPublicKeyShare;
data.secret_share = Some(self_secret_share);
let self_node = data.nodes.get_mut(self.node()).expect("node is always qualified by himself; qed");
self_node.public_share = Some(self_public_share.clone());
// broadcast self public key share
self.cluster.broadcast(Message::Encryption(EncryptionMessage::PublicKeyShare(PublicKeyShare {
session: self.id.clone().into(),
public_share: self_public_share.into(),
})))
}
/// Complete encryption
fn complete_encryption(&self) -> Result<(), Error> {
let mut data = self.data.lock();
// else - calculate joint public key
let joint_public = {
let public_shares = data.nodes.values().map(|n| n.public_share.as_ref().expect("keys received on KD phase; KG phase follows KD phase; qed"));
math::compute_joint_public(public_shares)?
};
// if we are at the slave node - wait for session completion
if data.master.as_ref() != Some(self.node()) {
data.joint_public = Some(Ok(joint_public));
data.state = SessionState::WaitingForEncryptionConfirmation;
return Ok(());
}
// then generate secret point
// then encrypt secret point with joint public key
// TODO: secret is revealed to KeyServer here
let secret_point = math::generate_random_point()?;
let encrypted_secret_point = math::encrypt_secret(&secret_point, &joint_public)?;
// then save encrypted data to the key storage
let encrypted_data = DocumentKeyShare {
threshold: data.threshold.expect("threshold is filled in initialization phase; KG phase follows initialization phase; qed"),
id_numbers: data.nodes.iter().map(|(node_id, node_data)| (node_id.clone(), node_data.id_number.clone())).collect(),
secret_share: data.secret_share.as_ref().expect("secret_share is filled in KG phase; we are at the end of KG phase; qed").clone(),
common_point: encrypted_secret_point.common_point,
encrypted_point: encrypted_secret_point.encrypted_point,
};
self.key_storage.insert(self.id.clone(), encrypted_data.clone())
.map_err(|e| Error::KeyStorage(e.into()))?;
// then distribute encrypted data to every other node
self.cluster.broadcast(Message::Encryption(EncryptionMessage::SessionCompleted(SessionCompleted {
session: self.id.clone().into(),
common_point: encrypted_data.common_point.clone().into(),
encrypted_point: encrypted_data.encrypted_point.clone().into(),
})))?;
// then wait for confirmation from all other nodes
{
let self_node = data.nodes.get_mut(self.node()).expect("node is always qualified by himself; qed");
self_node.completion_confirmed = true;
}
data.joint_public = Some(Ok(joint_public));
data.secret_point = Some(Ok(secret_point));
data.state = SessionState::WaitingForEncryptionConfirmation;
Ok(())
}
}
impl Session for SessionImpl {
#[cfg(test)]
fn joint_public_key(&self) -> Option<Public> {
self.data.lock().joint_public.clone().and_then(|r| r.ok())
fn joint_public_key(&self) -> Option<Result<Public, Error>> {
self.data.lock().joint_public.clone()
}
fn state(&self) -> SessionState {
self.data.lock().state.clone()
}
fn wait(&self) -> Result<Public, Error> {
fn wait(&self, timeout: Option<time::Duration>) -> Result<Public, Error> {
let mut data = self.data.lock();
if !data.secret_point.is_some() {
self.completed.wait(&mut data);
match timeout {
None => self.completed.wait(&mut data),
Some(timeout) => { self.completed.wait_for(&mut data, timeout); },
}
}
data.secret_point.as_ref()
@ -842,7 +758,6 @@ impl NodeData {
fn with_id_number(node_id_number: Secret) -> Self {
NodeData {
id_number: node_id_number,
complaints: BTreeSet::new(),
secret1_sent: None,
secret2_sent: None,
secret1: None,
@ -862,7 +777,7 @@ impl Debug for SessionImpl {
pub fn check_cluster_nodes(self_node_id: &NodeId, nodes: &BTreeSet<NodeId>) -> Result<(), Error> {
// at least two nodes must be in cluster
if nodes.len() < 2 {
if nodes.len() < 1 {
return Err(Error::InvalidNodesCount);
}
// this node must be a part of cluster
@ -952,10 +867,6 @@ mod tests {
&self.nodes.values().nth(2).unwrap().session
}
pub fn third_slave(&self) -> &SessionImpl {
&self.nodes.values().nth(3).unwrap().session
}
pub fn take_message(&mut self) -> Option<(NodeId, NodeId, Message)> {
self.nodes.values()
.filter_map(|n| n.cluster.take_message().map(|m| (n.session.node().clone(), m.0, m.1)))
@ -970,8 +881,6 @@ mod tests {
Message::Encryption(EncryptionMessage::ConfirmInitialization(ref message)) => self.nodes[&msg.1].session.on_confirm_initialization(msg.0.clone(), &message),
Message::Encryption(EncryptionMessage::CompleteInitialization(ref message)) => self.nodes[&msg.1].session.on_complete_initialization(msg.0.clone(), &message),
Message::Encryption(EncryptionMessage::KeysDissemination(ref message)) => self.nodes[&msg.1].session.on_keys_dissemination(msg.0.clone(), &message),
Message::Encryption(EncryptionMessage::Complaint(ref message)) => self.nodes[&msg.1].session.on_complaint(msg.0.clone(), &message),
Message::Encryption(EncryptionMessage::ComplaintResponse(ref message)) => self.nodes[&msg.1].session.on_complaint_response(msg.0.clone(), &message),
Message::Encryption(EncryptionMessage::PublicKeyShare(ref message)) => self.nodes[&msg.1].session.on_public_key_share(msg.0.clone(), &message),
Message::Encryption(EncryptionMessage::SessionCompleted(ref message)) => self.nodes[&msg.1].session.on_session_completed(msg.0.clone(), &message),
_ => panic!("unexpected"),
@ -990,13 +899,6 @@ mod tests {
let msg = self.take_message().unwrap();
self.process_message(msg)
}
pub fn take_and_process_all_messages(&mut self) -> Result<(), Error> {
while let Some(msg) = self.take_message() {
self.process_message(msg)?;
}
Ok(())
}
}
fn make_simple_cluster(threshold: usize, num_nodes: usize) -> Result<(SessionId, NodeId, NodeId, MessageLoop), Error> {
@ -1010,8 +912,9 @@ mod tests {
}
#[test]
fn fails_to_initialize_in_cluster_of_single_node() {
assert_eq!(make_simple_cluster(0, 1).unwrap_err(), Error::InvalidNodesCount);
fn initializes_in_cluster_of_single_node() {
let l = MessageLoop::new(1);
assert!(l.master().initialize(0, l.nodes.keys().cloned().collect()).is_ok());
}
#[test]
@ -1106,19 +1009,6 @@ mod tests {
assert!(l.master().derived_point().unwrap() != passed_point.into());
}
#[test]
fn fails_to_complete_initialization_in_cluster_of_single_node() {
let (sid, m, s, l) = make_simple_cluster(0, 2).unwrap();
let mut nodes = BTreeMap::new();
nodes.insert(s, math::generate_random_scalar().unwrap());
assert_eq!(l.first_slave().on_complete_initialization(m, &message::CompleteInitialization {
session: sid.into(),
nodes: nodes.into_iter().map(|(k, v)| (k.into(), v.into())).collect(),
threshold: 0,
derived_point: math::generate_random_point().unwrap().into(),
}).unwrap_err(), Error::InvalidNodesCount);
}
#[test]
fn fails_to_complete_initialization_if_not_a_part_of_cluster() {
let (sid, m, _, l) = make_simple_cluster(0, 2).unwrap();
@ -1227,119 +1117,6 @@ mod tests {
}).unwrap_err(), Error::InvalidStateForRequest);
}
#[test]
fn defends_if_receives_complain_on_himself() {
let (sid, m, s, mut l) = make_simple_cluster(1, 3).unwrap();
l.take_and_process_all_messages().unwrap();
l.master().on_complaint(s, &message::Complaint {
session: sid.into(),
against: m.into(),
}).unwrap();
match l.take_message().unwrap() {
(_, _, Message::Encryption(EncryptionMessage::ComplaintResponse(_))) => (),
_ => panic!("unexpected"),
}
}
#[test]
fn node_is_disqualified_if_enough_complaints_received() {
let (sid, _, s, mut l) = make_simple_cluster(1, 4).unwrap();
l.take_and_process_all_messages().unwrap();
l.master().on_complaint(l.second_slave().node().clone(), &message::Complaint {
session: sid.into(),
against: s.clone().into(),
}).unwrap();
l.master().on_complaint(l.third_slave().node().clone(), &message::Complaint {
session: sid.into(),
against: s.into(),
}).unwrap();
assert_eq!(l.master().qualified_nodes().len(), 3);
}
#[test]
fn node_is_not_disqualified_if_enough_complaints_received_from_the_same_node() {
let (sid, _, s, mut l) = make_simple_cluster(1, 4).unwrap();
l.take_and_process_all_messages().unwrap();
l.master().on_complaint(l.second_slave().node().clone(), &message::Complaint {
session: sid.into(),
against: s.clone().into(),
}).unwrap();
l.master().on_complaint(l.second_slave().node().clone(), &message::Complaint {
session: sid.into(),
against: s.into(),
}).unwrap();
assert_eq!(l.master().qualified_nodes().len(), 4);
}
#[test]
fn node_is_disqualified_if_responds_to_complain_with_invalid_data() {
let (sid, _, _, mut l) = make_simple_cluster(1, 3).unwrap();
l.take_and_process_message().unwrap(); // m -> s1: InitializeSession
l.take_and_process_message().unwrap(); // m -> s2: InitializeSession
l.take_and_process_message().unwrap(); // s1 -> m: ConfirmInitialization
l.take_and_process_message().unwrap(); // s2 -> m: ConfirmInitialization
l.take_and_process_message().unwrap(); // m -> s1: CompleteInitialization
l.take_and_process_message().unwrap(); // m -> s2: CompleteInitialization
l.take_and_process_message().unwrap(); // m -> s1: KeysDissemination
l.take_and_process_message().unwrap(); // m -> s2: KeysDissemination
l.take_and_process_message().unwrap(); // s1 -> m: KeysDissemination
l.take_and_process_message().unwrap(); // s1 -> s2: KeysDissemination
let s2 = l.second_slave().node().clone();
l.master().on_keys_dissemination(s2.clone(), &message::KeysDissemination {
session: sid.clone().into(),
secret1: math::generate_random_scalar().unwrap().into(),
secret2: math::generate_random_scalar().unwrap().into(),
publics: vec![math::generate_random_point().unwrap().into(), math::generate_random_point().unwrap().into()],
}).unwrap();
assert_eq!(l.master().qualified_nodes().len(), 3);
l.master().on_complaint_response(s2, &message::ComplaintResponse {
session: sid.into(),
secret1: math::generate_random_scalar().unwrap().into(),
secret2: math::generate_random_scalar().unwrap().into(),
}).unwrap();
assert_eq!(l.master().qualified_nodes().len(), 2);
}
#[test]
fn node_is_not_disqualified_if_responds_to_complain_with_valid_data() {
let (sid, _, _, mut l) = make_simple_cluster(1, 3).unwrap();
l.take_and_process_message().unwrap(); // m -> s1: InitializeSession
l.take_and_process_message().unwrap(); // m -> s2: InitializeSession
l.take_and_process_message().unwrap(); // s1 -> m: ConfirmInitialization
l.take_and_process_message().unwrap(); // s2 -> m: ConfirmInitialization
l.take_and_process_message().unwrap(); // m -> s1: CompleteInitialization
l.take_and_process_message().unwrap(); // m -> s2: CompleteInitialization
l.take_and_process_message().unwrap(); // m -> s1: KeysDissemination
l.take_and_process_message().unwrap(); // m -> s2: KeysDissemination
l.take_and_process_message().unwrap(); // s1 -> m: KeysDissemination
l.take_and_process_message().unwrap(); // s1 -> s2: KeysDissemination
let (f, t, msg) = match l.take_message() {
Some((f, t, Message::Encryption(EncryptionMessage::KeysDissemination(msg)))) => (f, t, msg),
_ => panic!("unexpected"),
};
assert_eq!(&f, l.second_slave().node());
assert_eq!(&t, l.master().node());
l.master().on_keys_dissemination(f.clone(), &message::KeysDissemination {
session: sid.clone().into(),
secret1: math::generate_random_scalar().unwrap().into(),
secret2: math::generate_random_scalar().unwrap().into(),
publics: msg.publics.clone().into(),
}).unwrap();
assert_eq!(l.master().qualified_nodes().len(), 3);
l.master().on_complaint_response(f, &message::ComplaintResponse {
session: sid.into(),
secret1: msg.secret1.into(),
secret2: msg.secret2.into(),
}).unwrap();
assert_eq!(l.master().qualified_nodes().len(), 3);
}
#[test]
fn should_not_start_key_generation_when_not_in_key_check_state() {
let (_, _, _, l) = make_simple_cluster(1, 3).unwrap();
assert_eq!(l.master().start_key_generation_phase().unwrap_err(), Error::InvalidStateForRequest);
}
#[test]
fn should_not_accept_public_key_share_when_is_not_waiting_for_it() {
let (sid, _, s, l) = make_simple_cluster(1, 3).unwrap();
@ -1352,22 +1129,48 @@ mod tests {
#[test]
fn should_not_accept_public_key_share_when_receiving_twice() {
let (sid, m, _, mut l) = make_simple_cluster(0, 3).unwrap();
l.take_and_process_all_messages().unwrap();
l.master().start_key_generation_phase().unwrap();
l.first_slave().start_key_generation_phase().unwrap();
l.take_and_process_message().unwrap(); // m -> s1: InitializeSession
l.take_and_process_message().unwrap(); // m -> s2: InitializeSession
l.take_and_process_message().unwrap(); // s1 -> m: ConfirmInitialization
l.take_and_process_message().unwrap(); // s2 -> m: ConfirmInitialization
l.take_and_process_message().unwrap(); // m -> s1: CompleteInitialization
l.take_and_process_message().unwrap(); // m -> s2: CompleteInitialization
l.take_and_process_message().unwrap(); // m -> s1: KeysDissemination
l.take_and_process_message().unwrap(); // m -> s2: KeysDissemination
l.take_and_process_message().unwrap(); // s1 -> m: KeysDissemination
l.take_and_process_message().unwrap(); // s1 -> s2: KeysDissemination
l.take_and_process_message().unwrap(); // s2 -> m: KeysDissemination
l.take_and_process_message().unwrap(); // s2 -> s1: KeysDissemination
let (f, t, msg) = match l.take_message() {
Some((f, t, Message::Encryption(EncryptionMessage::PublicKeyShare(msg)))) => (f, t, msg),
_ => panic!("unexpected"),
};
assert_eq!(&f, l.master().node());
assert_eq!(&t, l.first_slave().node());
assert_eq!(&t, l.second_slave().node());
l.process_message((f, t, Message::Encryption(EncryptionMessage::PublicKeyShare(msg.clone())))).unwrap();
assert_eq!(l.first_slave().on_public_key_share(m, &message::PublicKeyShare {
assert_eq!(l.second_slave().on_public_key_share(m, &message::PublicKeyShare {
session: sid.into(),
public_share: math::generate_random_point().unwrap().into(),
}).unwrap_err(), Error::InvalidMessage);
}
#[test]
fn encryption_fails_on_session_timeout() {
let (_, _, _, l) = make_simple_cluster(0, 2).unwrap();
assert!(l.master().joint_public_key().is_none());
l.master().on_session_timeout();
assert!(l.master().joint_public_key().unwrap().unwrap_err() == Error::NodeDisconnected);
}
#[test]
fn encryption_fails_on_node_timeout() {
let (_, _, _, l) = make_simple_cluster(0, 2).unwrap();
assert!(l.master().joint_public_key().is_none());
l.master().on_node_timeout(l.first_slave().node());
assert!(l.master().joint_public_key().unwrap().unwrap_err() == Error::NodeDisconnected);
}
#[test]
fn complete_enc_dec_session() {
let test_cases = [(0, 5), (2, 5), (3, 5)];
@ -1381,26 +1184,12 @@ mod tests {
l.process_message((from, to, message)).unwrap();
}
// check that all nodes are waiting for complaint timeout is passed
for node in l.nodes.values() {
let state = node.session.state();
assert_eq!(state, SessionState::KeyCheck);
// simulate timeout pass
node.session.start_key_generation_phase().unwrap();
}
// let nodes do joint public generation
while let Some((from, to, message)) = l.take_message() {
l.process_message((from, to, message)).unwrap();
}
// check that all nodes has finished joint public generation
let joint_public_key = l.master().joint_public_key().unwrap();
let joint_public_key = l.master().joint_public_key().unwrap().unwrap();
for node in l.nodes.values() {
let state = node.session.state();
assert_eq!(state, SessionState::Finished);
assert_eq!(node.session.joint_public_key().as_ref(), Some(&joint_public_key));
assert_eq!(node.session.joint_public_key().as_ref(), Some(&Ok(joint_public_key)));
}
// now let's encrypt some secret (which is a point on EC)
@ -1417,8 +1206,6 @@ mod tests {
}
}
// TODO: add test where some nodes are disqualified from session
#[test]
fn encryption_session_works_over_network() {
//::util::log::init_log();
@ -1428,7 +1215,7 @@ mod tests {
let mut core = Core::new().unwrap();
// prepare cluster objects for each node
let clusters = make_clusters(&core, 6020, num_nodes);
let clusters = make_clusters(&core, 6022, num_nodes);
run_clusters(&clusters);
// establish connections

View File

@ -71,17 +71,16 @@ pub fn serialize_message(message: Message) -> Result<SerializedMessage, Error> {
Message::Encryption(EncryptionMessage::ConfirmInitialization(payload)) => (51, serde_json::to_vec(&payload)),
Message::Encryption(EncryptionMessage::CompleteInitialization(payload)) => (52, serde_json::to_vec(&payload)),
Message::Encryption(EncryptionMessage::KeysDissemination(payload)) => (53, serde_json::to_vec(&payload)),
Message::Encryption(EncryptionMessage::Complaint(payload)) => (54, serde_json::to_vec(&payload)),
Message::Encryption(EncryptionMessage::ComplaintResponse(payload)) => (55, serde_json::to_vec(&payload)),
Message::Encryption(EncryptionMessage::PublicKeyShare(payload)) => (56, serde_json::to_vec(&payload)),
Message::Encryption(EncryptionMessage::SessionError(payload)) => (57, serde_json::to_vec(&payload)),
Message::Encryption(EncryptionMessage::SessionCompleted(payload)) => (58, serde_json::to_vec(&payload)),
Message::Encryption(EncryptionMessage::PublicKeyShare(payload)) => (54, serde_json::to_vec(&payload)),
Message::Encryption(EncryptionMessage::SessionError(payload)) => (55, serde_json::to_vec(&payload)),
Message::Encryption(EncryptionMessage::SessionCompleted(payload)) => (56, serde_json::to_vec(&payload)),
Message::Decryption(DecryptionMessage::InitializeDecryptionSession(payload)) => (100, serde_json::to_vec(&payload)),
Message::Decryption(DecryptionMessage::ConfirmDecryptionInitialization(payload)) => (101, serde_json::to_vec(&payload)),
Message::Decryption(DecryptionMessage::RequestPartialDecryption(payload)) => (102, serde_json::to_vec(&payload)),
Message::Decryption(DecryptionMessage::PartialDecryption(payload)) => (103, serde_json::to_vec(&payload)),
Message::Decryption(DecryptionMessage::DecryptionSessionError(payload)) => (104, serde_json::to_vec(&payload)),
Message::Decryption(DecryptionMessage::DecryptionSessionCompleted(payload)) => (105, serde_json::to_vec(&payload)),
};
let payload = payload.map_err(|err| Error::Serde(err.to_string()))?;
@ -104,17 +103,16 @@ pub fn deserialize_message(header: &MessageHeader, payload: Vec<u8>) -> Result<M
51 => Message::Encryption(EncryptionMessage::ConfirmInitialization(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)),
52 => Message::Encryption(EncryptionMessage::CompleteInitialization(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)),
53 => Message::Encryption(EncryptionMessage::KeysDissemination(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)),
54 => Message::Encryption(EncryptionMessage::Complaint(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)),
55 => Message::Encryption(EncryptionMessage::ComplaintResponse(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)),
56 => Message::Encryption(EncryptionMessage::PublicKeyShare(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)),
57 => Message::Encryption(EncryptionMessage::SessionError(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)),
58 => Message::Encryption(EncryptionMessage::SessionCompleted(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)),
54 => Message::Encryption(EncryptionMessage::PublicKeyShare(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)),
55 => Message::Encryption(EncryptionMessage::SessionError(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)),
56 => Message::Encryption(EncryptionMessage::SessionCompleted(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)),
100 => Message::Decryption(DecryptionMessage::InitializeDecryptionSession(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)),
101 => Message::Decryption(DecryptionMessage::ConfirmDecryptionInitialization(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)),
102 => Message::Decryption(DecryptionMessage::RequestPartialDecryption(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)),
103 => Message::Decryption(DecryptionMessage::PartialDecryption(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)),
104 => Message::Decryption(DecryptionMessage::DecryptionSessionError(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)),
105 => Message::Decryption(DecryptionMessage::DecryptionSessionCompleted(serde_json::from_slice(&payload).map_err(|err| Error::Serde(err.to_string()))?)),
_ => return Err(Error::Serde(format!("unknown message type {}", header.kind))),
})

View File

@ -58,10 +58,6 @@ pub enum EncryptionMessage {
CompleteInitialization(CompleteInitialization),
/// Generated keys are sent to every node.
KeysDissemination(KeysDissemination),
/// Complaint against another node is broadcasted.
Complaint(Complaint),
/// Complaint response is broadcasted.
ComplaintResponse(ComplaintResponse),
/// Broadcast self public key portion.
PublicKeyShare(PublicKeyShare),
/// When session error has occured.
@ -83,6 +79,8 @@ pub enum DecryptionMessage {
PartialDecryption(PartialDecryption),
/// When decryption session error has occured.
DecryptionSessionError(DecryptionSessionError),
/// When decryption session is completed.
DecryptionSessionCompleted(DecryptionSessionCompleted),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -160,26 +158,6 @@ pub struct KeysDissemination {
pub publics: Vec<SerializablePublic>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
/// Complaint against node is broadcasted.
pub struct Complaint {
/// Session Id.
pub session: MessageSessionId,
/// Public values.
pub against: MessageNodeId,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
/// Node is responding to complaint.
pub struct ComplaintResponse {
/// Session Id.
pub session: MessageSessionId,
/// Secret 1.
pub secret1: SerializableSecret,
/// Secret 2.
pub secret2: SerializableSecret,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
/// Node is sharing its public key share.
pub struct PublicKeyShare {
@ -269,6 +247,15 @@ pub struct DecryptionSessionError {
pub error: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
/// When decryption session is completed.
pub struct DecryptionSessionCompleted {
/// Encryption session Id.
pub session: MessageSessionId,
/// Decryption session Id.
pub sub_session: SerializableSecret,
}
impl EncryptionMessage {
pub fn session_id(&self) -> &SessionId {
match *self {
@ -276,8 +263,6 @@ impl EncryptionMessage {
EncryptionMessage::ConfirmInitialization(ref msg) => &msg.session,
EncryptionMessage::CompleteInitialization(ref msg) => &msg.session,
EncryptionMessage::KeysDissemination(ref msg) => &msg.session,
EncryptionMessage::Complaint(ref msg) => &msg.session,
EncryptionMessage::ComplaintResponse(ref msg) => &msg.session,
EncryptionMessage::PublicKeyShare(ref msg) => &msg.session,
EncryptionMessage::SessionError(ref msg) => &msg.session,
EncryptionMessage::SessionCompleted(ref msg) => &msg.session,
@ -293,6 +278,7 @@ impl DecryptionMessage {
DecryptionMessage::RequestPartialDecryption(ref msg) => &msg.session,
DecryptionMessage::PartialDecryption(ref msg) => &msg.session,
DecryptionMessage::DecryptionSessionError(ref msg) => &msg.session,
DecryptionMessage::DecryptionSessionCompleted(ref msg) => &msg.session,
}
}
@ -303,6 +289,7 @@ impl DecryptionMessage {
DecryptionMessage::RequestPartialDecryption(ref msg) => &msg.sub_session,
DecryptionMessage::PartialDecryption(ref msg) => &msg.sub_session,
DecryptionMessage::DecryptionSessionError(ref msg) => &msg.sub_session,
DecryptionMessage::DecryptionSessionCompleted(ref msg) => &msg.sub_session,
}
}
}
@ -335,8 +322,6 @@ impl fmt::Display for EncryptionMessage {
EncryptionMessage::ConfirmInitialization(_) => write!(f, "ConfirmInitialization"),
EncryptionMessage::CompleteInitialization(_) => write!(f, "CompleteInitialization"),
EncryptionMessage::KeysDissemination(_) => write!(f, "KeysDissemination"),
EncryptionMessage::Complaint(_) => write!(f, "Complaint"),
EncryptionMessage::ComplaintResponse(_) => write!(f, "ComplaintResponse"),
EncryptionMessage::PublicKeyShare(_) => write!(f, "PublicKeyShare"),
EncryptionMessage::SessionError(ref msg) => write!(f, "SessionError({})", msg.error),
EncryptionMessage::SessionCompleted(_) => write!(f, "SessionCompleted"),
@ -352,6 +337,7 @@ impl fmt::Display for DecryptionMessage {
DecryptionMessage::RequestPartialDecryption(_) => write!(f, "RequestPartialDecryption"),
DecryptionMessage::PartialDecryption(_) => write!(f, "PartialDecryption"),
DecryptionMessage::DecryptionSessionError(_) => write!(f, "DecryptionSessionError"),
DecryptionMessage::DecryptionSessionCompleted(_) => write!(f, "DecryptionSessionCompleted"),
}
}
}

View File

@ -20,7 +20,7 @@ use ethkey;
use ethcrypto;
use super::types::all::DocumentAddress;
pub use super::types::all::{NodeId, EncryptionConfiguration, DocumentEncryptedKeyShadow};
pub use super::types::all::{NodeId, DocumentEncryptedKeyShadow};
pub use super::acl_storage::AclStorage;
pub use super::key_storage::{KeyStorage, DocumentKeyShare};
pub use super::serialization::{SerializableSignature, SerializableH256, SerializableSecret, SerializablePublic};

View File

@ -135,8 +135,7 @@ pub mod tests {
use parking_lot::RwLock;
use devtools::RandomTempPath;
use ethkey::{Random, Generator};
use super::super::types::all::{Error, NodeAddress, ServiceConfiguration, ClusterConfiguration,
DocumentAddress, EncryptionConfiguration};
use super::super::types::all::{Error, NodeAddress, ServiceConfiguration, ClusterConfiguration, DocumentAddress};
use super::{KeyStorage, PersistentKeyStorage, DocumentKeyShare};
#[derive(Default)]
@ -178,9 +177,6 @@ pub mod tests {
},
nodes: BTreeMap::new(),
allow_connecting_to_higher_nodes: false,
encryption_config: EncryptionConfiguration {
key_check_timeout_ms: 10,
},
},
};

View File

@ -38,6 +38,7 @@ extern crate ethcore;
extern crate ethcore_devtools as devtools;
extern crate ethcore_util as util;
extern crate ethcore_ipc as ipc;
extern crate ethcore_logger as logger;
extern crate ethcrypto;
extern crate ethkey;
extern crate native_contracts;
@ -60,7 +61,7 @@ use std::sync::Arc;
use ethcore::client::Client;
pub use types::all::{DocumentAddress, DocumentKey, DocumentEncryptedKey, RequestSignature, Public,
Error, NodeAddress, ServiceConfiguration, ClusterConfiguration, EncryptionConfiguration};
Error, NodeAddress, ServiceConfiguration, ClusterConfiguration};
pub use traits::{KeyServer};
/// Start new key server instance
@ -70,6 +71,6 @@ pub fn start(client: Arc<Client>, config: ServiceConfiguration) -> Result<Box<Ke
let acl_storage = Arc::new(acl_storage::OnChainAclStorage::new(client));
let key_storage = Arc::new(key_storage::PersistentKeyStorage::new(&config)?);
let key_server = key_server::KeyServerImpl::new(&config.cluster_config, acl_storage, key_storage)?;
let listener = http_listener::KeyServerHttpListener::start(config, key_server)?;
let listener = http_listener::KeyServerHttpListener::start(&config.listener_address, key_server)?;
Ok(Box::new(listener))
}

View File

@ -87,16 +87,6 @@ pub struct ClusterConfiguration {
/// Allow outbound connections to 'higher' nodes.
/// This is useful for tests, but slower a bit for production.
pub allow_connecting_to_higher_nodes: bool,
/// Encryption session configuration.
pub encryption_config: EncryptionConfiguration,
}
#[derive(Clone, Debug)]
#[binary]
/// Encryption parameters.
pub struct EncryptionConfiguration {
/// Key check timeout.
pub key_check_timeout_ms: u64,
}
#[derive(Clone, Debug, PartialEq)]