// Copyright 2015-2017 Parity Technologies (UK) Ltd. // This file is part of Parity. // Parity is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity. If not, see . use std::io; use std::time; use std::sync::Arc; use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::collections::btree_map::Entry; use std::net::{SocketAddr, IpAddr}; use futures::{finished, failed, Future, Stream, BoxFuture}; use futures_cpupool::CpuPool; use parking_lot::{RwLock, Mutex}; use tokio_io::IoFuture; use tokio_core::reactor::{Handle, Remote, Timeout, Interval}; use tokio_core::net::{TcpListener, TcpStream}; use ethkey::{Secret, KeyPair, Signature, Random, Generator}; use key_server_cluster::{Error, NodeId, SessionId, EncryptionConfiguration, AclStorage, KeyStorage}; use key_server_cluster::message::{self, Message, ClusterMessage, EncryptionMessage, DecryptionMessage}; use key_server_cluster::decryption_session::{SessionImpl as DecryptionSessionImpl, DecryptionSessionId, SessionParams as DecryptionSessionParams, Session as DecryptionSession}; use key_server_cluster::encryption_session::{SessionImpl as EncryptionSessionImpl, SessionState as EncryptionSessionState, SessionParams as EncryptionSessionParams, Session as EncryptionSession}; use key_server_cluster::io::{DeadlineStatus, ReadMessage, SharedTcpStream, read_encrypted_message, WriteMessage, write_encrypted_message}; use key_server_cluster::net::{accept_connection as net_accept_connection, connect as net_connect, Connection as NetConnection}; pub type BoxedEmptyFuture = BoxFuture<(), ()>; /// Cluster interface for external clients. pub trait ClusterClient: Send + Sync { /// Get cluster state. fn cluster_state(&self) -> ClusterState; /// Start new encryption session. fn new_encryption_session(&self, session_id: SessionId, threshold: usize) -> Result, Error>; /// Start new decryption session. fn new_decryption_session(&self, session_id: SessionId, requestor_signature: Signature, is_shadow_decryption: bool) -> Result, Error>; } /// Cluster access for single encryption/decryption participant. pub trait Cluster: Send + Sync { /// Broadcast message to all other nodes. fn broadcast(&self, message: Message) -> Result<(), Error>; /// Send message to given node. fn send(&self, to: &NodeId, message: Message) -> Result<(), Error>; /// Blacklist node, close connection and remove all pending messages. fn blacklist(&self, node: &NodeId); } #[derive(Clone)] /// Cluster initialization parameters. pub struct ClusterConfiguration { /// Number of threads reserved by cluster. pub threads: usize, /// Allow connecting to 'higher' nodes. pub allow_connecting_to_higher_nodes: bool, /// KeyPair this node holds. pub self_key_pair: KeyPair, /// Interface to listen to. pub listen_address: (String, u16), /// Cluster nodes. pub nodes: BTreeMap, /// Encryption session configuration. pub encryption_config: EncryptionConfiguration, /// Reference to key storage pub key_storage: Arc, /// Reference to ACL storage pub acl_storage: Arc, } /// Cluster state. pub struct ClusterState { /// Nodes, to which connections are established. pub connected: BTreeSet, } /// Network cluster implementation. pub struct ClusterCore { /// Handle to the event loop. handle: Handle, /// Listen address. listen_address: SocketAddr, /// Cluster data. data: Arc, } /// Network cluster client interface implementation. pub struct ClusterClientImpl { /// Cluster data. data: Arc, } /// Network cluster view. It is a communication channel, required in single session. pub struct ClusterView { core: Arc>, } /// Cross-thread shareable cluster data. pub struct ClusterData { /// Cluster configuration. config: ClusterConfiguration, /// Handle to the event loop. handle: Remote, /// Handle to the cpu thread pool. pool: CpuPool, /// KeyPair this node holds. self_key_pair: KeyPair, /// Connections data. connections: ClusterConnections, /// Active sessions data. sessions: ClusterSessions, } /// Connections that are forming the cluster. pub struct ClusterConnections { /// Self node id. pub self_node_id: NodeId, /// All known other key servers. pub nodes: BTreeMap, /// Active connections to key servers. pub connections: RwLock>>, } /// Active sessions on this cluster. pub struct ClusterSessions { /// Self node id. pub self_node_id: NodeId, /// Reference to key storage pub key_storage: Arc, /// Reference to ACL storage pub acl_storage: Arc, /// Active encryption sessions. pub encryption_sessions: RwLock>, /// Active decryption sessions. pub decryption_sessions: RwLock>, } /// Encryption session and its message queue. pub struct QueuedEncryptionSession { /// Encryption session. pub session: Arc, /// Messages queue. pub queue: VecDeque<(NodeId, EncryptionMessage)>, } /// Decryption session and its message queue. pub struct QueuedDecryptionSession { /// Decryption session. pub session: Arc, /// Messages queue. pub queue: VecDeque<(NodeId, DecryptionMessage)>, } /// Cluster view core. struct ClusterViewCore { /// Cluster reference. cluster: Arc, /// Subset of nodes, required for this session. nodes: BTreeSet, } /// Connection to single node. pub struct Connection { /// Node id. node_id: NodeId, /// Node address. node_address: SocketAddr, /// Is inbound connection? is_inbound: bool, /// Tcp stream. stream: SharedTcpStream, /// Connection key. key: KeyPair, /// Last message time. last_message_time: Mutex, } impl ClusterCore { pub fn new(handle: Handle, config: ClusterConfiguration) -> Result, Error> { let listen_address = make_socket_address(&config.listen_address.0, config.listen_address.1)?; let connections = ClusterConnections::new(&config)?; let sessions = ClusterSessions::new(&config); let data = ClusterData::new(&handle, config, connections, sessions); Ok(Arc::new(ClusterCore { handle: handle, listen_address: listen_address, data: data, })) } /// Create new client interface. pub fn client(&self) -> Arc { Arc::new(ClusterClientImpl::new(self.data.clone())) } #[cfg(test)] /// Get cluster configuration. pub fn config(&self) -> &ClusterConfiguration { &self.data.config } #[cfg(test)] /// Get connection to given node. pub fn connection(&self, node: &NodeId) -> Option> { self.data.connection(node) } /// Run cluster pub fn run(&self) -> Result<(), Error> { // try to connect to every other peer ClusterCore::connect_disconnected_nodes(self.data.clone()); // schedule maintain procedures ClusterCore::schedule_maintain(&self.handle, self.data.clone()); // start listening for incoming connections self.handle.spawn(ClusterCore::listen(&self.handle, self.data.clone(), self.listen_address.clone())?); Ok(()) } /// Connect to peer. fn connect(data: Arc, node_address: SocketAddr) { data.handle.clone().spawn(move |handle| { data.pool.clone().spawn(ClusterCore::connect_future(handle, data, node_address)) }) } /// Connect to socket using given context and handle. fn connect_future(handle: &Handle, data: Arc, node_address: SocketAddr) -> BoxedEmptyFuture { let disconnected_nodes = data.connections.disconnected_nodes().keys().cloned().collect(); net_connect(&node_address, handle, data.self_key_pair.clone(), disconnected_nodes) .then(move |result| ClusterCore::process_connection_result(data, false, result)) .then(|_| finished(())) .boxed() } /// Start listening for incoming connections. fn listen(handle: &Handle, data: Arc, listen_address: SocketAddr) -> Result { Ok(TcpListener::bind(&listen_address, &handle)? .incoming() .and_then(move |(stream, node_address)| { ClusterCore::accept_connection(data.clone(), stream, node_address); Ok(()) }) .for_each(|_| Ok(())) .then(|_| finished(())) .boxed()) } /// Accept connection. fn accept_connection(data: Arc, stream: TcpStream, node_address: SocketAddr) { data.handle.clone().spawn(move |handle| { data.pool.clone().spawn(ClusterCore::accept_connection_future(handle, data, stream, node_address)) }) } /// Accept connection future. fn accept_connection_future(handle: &Handle, data: Arc, stream: TcpStream, node_address: SocketAddr) -> BoxedEmptyFuture { let disconnected_nodes = data.connections.disconnected_nodes().keys().cloned().collect(); net_accept_connection(node_address, stream, handle, data.self_key_pair.clone(), disconnected_nodes) .then(move |result| ClusterCore::process_connection_result(data, true, result)) .then(|_| finished(())) .boxed() } /// Schedule mainatain procedures. fn schedule_maintain(handle: &Handle, data: Arc) { // TODO: per-session timeouts (node can respond to messages, but ignore sessions messages) let (d1, d2, d3) = (data.clone(), data.clone(), data.clone()); let interval: BoxedEmptyFuture = Interval::new(time::Duration::new(10, 0), handle) .expect("failed to create interval") .and_then(move |_| Ok(trace!(target: "secretstore_net", "{}: executing maintain procedures", d1.self_key_pair.public()))) .and_then(move |_| Ok(ClusterCore::keep_alive(d2.clone()))) .and_then(move |_| Ok(ClusterCore::connect_disconnected_nodes(d3.clone()))) .for_each(|_| Ok(())) .then(|_| finished(())) .boxed(); data.spawn(interval); } /// Called for every incomming mesage. fn process_connection_messages(data: Arc, connection: Arc) -> IoFuture> { connection .read_message() .then(move |result| match result { Ok((_, Ok(message))) => { ClusterCore::process_connection_message(data.clone(), connection.clone(), message); // continue serving connection data.spawn(ClusterCore::process_connection_messages(data.clone(), connection)); finished(Ok(())).boxed() }, Ok((_, Err(err))) => { warn!(target: "secretstore_net", "{}: protocol error {} when reading message from node {}", data.self_key_pair.public(), err, connection.node_id()); // continue serving connection data.spawn(ClusterCore::process_connection_messages(data.clone(), connection)); finished(Err(err)).boxed() }, Err(err) => { warn!(target: "secretstore_net", "{}: network error {} when reading message from node {}", data.self_key_pair.public(), err, connection.node_id()); // close connection data.connections.remove(connection.node_id(), connection.is_inbound()); failed(err).boxed() }, } ).boxed() } /// Send keepalive messages to every othe node. fn keep_alive(data: Arc) { for connection in data.connections.active_connections() { let last_message_diff = time::Instant::now() - connection.last_message_time(); if last_message_diff > time::Duration::from_secs(60) { data.connections.remove(connection.node_id(), connection.is_inbound()); data.sessions.on_connection_timeout(connection.node_id()); } else if last_message_diff > time::Duration::from_secs(30) { data.spawn(connection.send_message(Message::Cluster(ClusterMessage::KeepAlive(message::KeepAlive {})))); } } } /// Try to connect to every disconnected node. fn connect_disconnected_nodes(data: Arc) { for (node_id, node_address) in data.connections.disconnected_nodes() { if data.config.allow_connecting_to_higher_nodes || data.self_key_pair.public() < &node_id { ClusterCore::connect(data.clone(), node_address); } } } /// Process connection future result. fn process_connection_result(data: Arc, is_inbound: bool, result: Result>, io::Error>) -> IoFuture> { match result { Ok(DeadlineStatus::Meet(Ok(connection))) => { let connection = Connection::new(is_inbound, connection); if data.connections.insert(connection.clone()) { ClusterCore::process_connection_messages(data.clone(), connection) } else { finished(Ok(())).boxed() } }, Ok(DeadlineStatus::Meet(Err(_))) => { finished(Ok(())).boxed() }, Ok(DeadlineStatus::Timeout) => { finished(Ok(())).boxed() }, Err(_) => { // network error finished(Ok(())).boxed() }, } } /// Process single message from the connection. fn process_connection_message(data: Arc, connection: Arc, message: Message) { connection.set_last_message_time(time::Instant::now()); trace!(target: "secretstore_net", "{}: processing message {} from {}", data.self_key_pair.public(), message, connection.node_id()); match message { Message::Encryption(message) => ClusterCore::process_encryption_message(data, connection, message), Message::Decryption(message) => ClusterCore::process_decryption_message(data, connection, message), Message::Cluster(message) => ClusterCore::process_cluster_message(data, connection, message), } } /// Process single encryption message from the connection. fn process_encryption_message(data: Arc, connection: Arc, mut message: EncryptionMessage) { let mut sender = connection.node_id().clone(); let mut is_queued_message = false; let session_id = message.session_id().clone(); let key_check_timeout_ms = data.config.encryption_config.key_check_timeout_ms; loop { let result = match message { EncryptionMessage::InitializeSession(ref message) => { let mut connected_nodes = data.connections.connected_nodes(); connected_nodes.insert(data.self_key_pair.public().clone()); let cluster = Arc::new(ClusterView::new(data.clone(), connected_nodes)); let session_id: SessionId = message.session.clone().into(); data.sessions.new_encryption_session(sender.clone(), session_id.clone(), cluster) .and_then(|s| s.on_initialize_session(sender.clone(), message)) }, EncryptionMessage::ConfirmInitialization(ref message) => data.sessions.encryption_session(&*message.session) .ok_or(Error::InvalidSessionId) .and_then(|s| s.on_confirm_initialization(sender.clone(), message)), EncryptionMessage::CompleteInitialization(ref message) => data.sessions.encryption_session(&*message.session) .ok_or(Error::InvalidSessionId) .and_then(|s| s.on_complete_initialization(sender.clone(), message)), EncryptionMessage::KeysDissemination(ref message) => data.sessions.encryption_session(&*message.session) .ok_or(Error::InvalidSessionId) .and_then(|s| { // TODO: move this logic to session (or session connector) let is_in_key_check_state = s.state() == EncryptionSessionState::KeyCheck; let result = s.on_keys_dissemination(sender.clone(), message); if !is_in_key_check_state && s.state() == EncryptionSessionState::KeyCheck { let session = s.clone(); let d = data.clone(); data.handle.spawn(move |handle| Timeout::new(time::Duration::new(key_check_timeout_ms / 1000, 0), handle) .expect("failed to create timeout") .and_then(move |_| { if let Err(error) = session.start_key_generation_phase() { session.on_session_error(d.self_key_pair.public().clone(), &message::SessionError { session: session.id().clone().into(), error: error.into(), }); } Ok(()) }) .then(|_| finished(())) ); } result }), EncryptionMessage::Complaint(ref message) => data.sessions.encryption_session(&*message.session) .ok_or(Error::InvalidSessionId) .and_then(|s| s.on_complaint(sender.clone(), message)), EncryptionMessage::ComplaintResponse(ref message) => data.sessions.encryption_session(&*message.session) .ok_or(Error::InvalidSessionId) .and_then(|s| s.on_complaint_response(sender.clone(), message)), EncryptionMessage::PublicKeyShare(ref message) => data.sessions.encryption_session(&*message.session) .ok_or(Error::InvalidSessionId) .and_then(|s| s.on_public_key_share(sender.clone(), message)), EncryptionMessage::SessionError(ref message) => { if let Some(s) = data.sessions.encryption_session(&*message.session) { data.sessions.remove_encryption_session(s.id()); s.on_session_error(sender.clone(), message); } Ok(()) }, EncryptionMessage::SessionCompleted(ref message) => data.sessions.encryption_session(&*message.session) .ok_or(Error::InvalidSessionId) .and_then(|s| { let result = s.on_session_completed(sender.clone(), message); if result.is_ok() && s.state() == EncryptionSessionState::Finished { data.sessions.remove_encryption_session(s.id()); } result }), }; match result { Err(Error::TooEarlyForRequest) => { data.sessions.enqueue_encryption_message(&session_id, sender, message, is_queued_message); break; }, Err(err) => { warn!(target: "secretstore_net", "{}: error {} when processing message {} from node {}", data.self_key_pair.public(), err, message, sender); if let Some(connection) = data.connections.get(&sender) { data.spawn(connection.send_message(Message::Encryption(EncryptionMessage::SessionError(message::SessionError { session: session_id.clone().into(), error: format!("{:?}", err), })))); } if err != Error::InvalidSessionId { data.sessions.remove_encryption_session(&session_id); } break; }, _ => { match data.sessions.dequeue_encryption_message(&session_id) { Some((msg_sender, msg)) => { is_queued_message = true; sender = msg_sender; message = msg; }, None => break, } }, } } } /// Process single decryption message from the connection. fn process_decryption_message(data: Arc, connection: Arc, mut message: DecryptionMessage) { let mut sender = connection.node_id().clone(); let mut is_queued_message = false; let session_id = message.session_id().clone(); let sub_session_id = message.sub_session_id().clone(); loop { let result = match message { DecryptionMessage::InitializeDecryptionSession(ref message) => { let mut connected_nodes = data.connections.connected_nodes(); connected_nodes.insert(data.self_key_pair.public().clone()); let cluster = Arc::new(ClusterView::new(data.clone(), connected_nodes)); data.sessions.new_decryption_session(sender.clone(), session_id.clone(), sub_session_id.clone(), cluster) .and_then(|s| s.on_initialize_session(sender.clone(), message)) }, DecryptionMessage::ConfirmDecryptionInitialization(ref message) => data.sessions.decryption_session(&*message.session, &*message.sub_session) .ok_or(Error::InvalidSessionId) .and_then(|s| s.on_confirm_initialization(sender.clone(), message)), DecryptionMessage::RequestPartialDecryption(ref message) => data.sessions.decryption_session(&*message.session, &*message.sub_session) .ok_or(Error::InvalidSessionId) .and_then(|s| s.on_partial_decryption_requested(sender.clone(), message)), DecryptionMessage::PartialDecryption(ref message) => data.sessions.decryption_session(&*message.session, &*message.sub_session) .ok_or(Error::InvalidSessionId) .and_then(|s| s.on_partial_decryption(sender.clone(), message)), DecryptionMessage::DecryptionSessionError(ref message) => { if let Some(s) = data.sessions.decryption_session(&*message.session, &*message.sub_session) { data.sessions.remove_decryption_session(&session_id, &sub_session_id); s.on_session_error(sender.clone(), message); } Ok(()) }, }; match result { Err(Error::TooEarlyForRequest) => { data.sessions.enqueue_decryption_message(&session_id, &sub_session_id, sender, message, is_queued_message); break; }, Err(err) => { if let Some(connection) = data.connections.get(&sender) { data.spawn(connection.send_message(Message::Decryption(DecryptionMessage::DecryptionSessionError(message::DecryptionSessionError { session: session_id.clone().into(), sub_session: sub_session_id.clone().into(), error: format!("{:?}", err), })))); } if err != Error::InvalidSessionId { data.sessions.remove_decryption_session(&session_id, &sub_session_id); } break; }, _ => { match data.sessions.dequeue_decryption_message(&session_id, &sub_session_id) { Some((msg_sender, msg)) => { is_queued_message = true; sender = msg_sender; message = msg; }, None => break, } }, } } } /// Process single cluster message from the connection. fn process_cluster_message(data: Arc, connection: Arc, message: ClusterMessage) { match message { ClusterMessage::KeepAlive(_) => data.spawn(connection.send_message(Message::Cluster(ClusterMessage::KeepAliveResponse(message::KeepAliveResponse {})))), ClusterMessage::KeepAliveResponse(_) => (), _ => warn!(target: "secretstore_net", "{}: received unexpected message {} from node {} at {}", data.self_key_pair.public(), message, connection.node_id(), connection.node_address()), } } } impl ClusterConnections { pub fn new(config: &ClusterConfiguration) -> Result { let mut connections = ClusterConnections { self_node_id: config.self_key_pair.public().clone(), nodes: BTreeMap::new(), connections: RwLock::new(BTreeMap::new()), }; for (node_id, &(ref node_addr, node_port)) in config.nodes.iter().filter(|&(node_id, _)| node_id != config.self_key_pair.public()) { let socket_address = make_socket_address(&node_addr, node_port)?; connections.nodes.insert(node_id.clone(), socket_address); } Ok(connections) } pub fn cluster_state(&self) -> ClusterState { ClusterState { connected: self.connections.read().keys().cloned().collect(), } } pub fn get(&self, node: &NodeId) -> Option> { self.connections.read().get(node).cloned() } pub fn insert(&self, connection: Arc) -> bool { let mut connections = self.connections.write(); if connections.contains_key(connection.node_id()) { // we have already connected to the same node // the agreement is that node with lower id must establish connection to node with higher id if (&self.self_node_id < connection.node_id() && connection.is_inbound()) || (&self.self_node_id > connection.node_id() && !connection.is_inbound()) { return false; } } trace!(target: "secretstore_net", "{}: inserting connection to {} at {}", self.self_node_id, connection.node_id(), connection.node_address()); connections.insert(connection.node_id().clone(), connection); true } pub fn remove(&self, node: &NodeId, is_inbound: bool) { let mut connections = self.connections.write(); if let Entry::Occupied(entry) = connections.entry(node.clone()) { if entry.get().is_inbound() != is_inbound { return; } trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address()); entry.remove_entry(); } } pub fn connected_nodes(&self) -> BTreeSet { self.connections.read().keys().cloned().collect() } pub fn active_connections(&self)-> Vec> { self.connections.read().values().cloned().collect() } pub fn disconnected_nodes(&self) -> BTreeMap { let connections = self.connections.read(); self.nodes.iter() .filter(|&(node_id, _)| !connections.contains_key(node_id)) .map(|(node_id, node_address)| (node_id.clone(), node_address.clone())) .collect() } } impl ClusterSessions { pub fn new(config: &ClusterConfiguration) -> Self { ClusterSessions { self_node_id: config.self_key_pair.public().clone(), acl_storage: config.acl_storage.clone(), key_storage: config.key_storage.clone(), encryption_sessions: RwLock::new(BTreeMap::new()), decryption_sessions: RwLock::new(BTreeMap::new()), } } pub fn new_encryption_session(&self, _master: NodeId, session_id: SessionId, cluster: Arc) -> Result, Error> { let mut encryption_sessions = self.encryption_sessions.write(); // check that there's no active encryption session with the same id if encryption_sessions.contains_key(&session_id) { return Err(Error::DuplicateSessionId); } // check that there's no finished encryption session with the same id if self.key_storage.contains(&session_id) { return Err(Error::DuplicateSessionId); } let session = Arc::new(EncryptionSessionImpl::new(EncryptionSessionParams { id: session_id.clone(), self_node_id: self.self_node_id.clone(), key_storage: self.key_storage.clone(), cluster: cluster, })); let encryption_session = QueuedEncryptionSession { session: session.clone(), queue: VecDeque::new() }; encryption_sessions.insert(session_id, encryption_session); Ok(session) } pub fn remove_encryption_session(&self, session_id: &SessionId) { self.encryption_sessions.write().remove(session_id); } pub fn encryption_session(&self, session_id: &SessionId) -> Option> { self.encryption_sessions.read().get(session_id).map(|s| s.session.clone()) } pub fn enqueue_encryption_message(&self, session_id: &SessionId, sender: NodeId, message: EncryptionMessage, is_queued_message: bool) { self.encryption_sessions.write().get_mut(session_id) .map(|session| if is_queued_message { session.queue.push_front((sender, message)) } else { session.queue.push_back((sender, message)) }); } pub fn dequeue_encryption_message(&self, session_id: &SessionId) -> Option<(NodeId, EncryptionMessage)> { self.encryption_sessions.write().get_mut(session_id) .and_then(|session| session.queue.pop_front()) } pub fn new_decryption_session(&self, _master: NodeId, session_id: SessionId, sub_session_id: Secret, cluster: Arc) -> Result, Error> { let mut decryption_sessions = self.decryption_sessions.write(); let session_id = DecryptionSessionId::new(session_id, sub_session_id); if decryption_sessions.contains_key(&session_id) { return Err(Error::DuplicateSessionId); } let session = Arc::new(DecryptionSessionImpl::new(DecryptionSessionParams { id: session_id.id.clone(), access_key: session_id.access_key.clone(), self_node_id: self.self_node_id.clone(), encrypted_data: self.key_storage.get(&session_id.id).map_err(|e| Error::KeyStorage(e.into()))?, acl_storage: self.acl_storage.clone(), cluster: cluster, })?); let decryption_session = QueuedDecryptionSession { session: session.clone(), queue: VecDeque::new() }; decryption_sessions.insert(session_id, decryption_session); Ok(session) } pub fn remove_decryption_session(&self, session_id: &SessionId, sub_session_id: &Secret) { let session_id = DecryptionSessionId::new(session_id.clone(), sub_session_id.clone()); self.decryption_sessions.write().remove(&session_id); } pub fn decryption_session(&self, session_id: &SessionId, sub_session_id: &Secret) -> Option> { let session_id = DecryptionSessionId::new(session_id.clone(), sub_session_id.clone()); self.decryption_sessions.read().get(&session_id).map(|s| s.session.clone()) } pub fn enqueue_decryption_message(&self, session_id: &SessionId, sub_session_id: &Secret, sender: NodeId, message: DecryptionMessage, is_queued_message: bool) { let session_id = DecryptionSessionId::new(session_id.clone(), sub_session_id.clone()); self.decryption_sessions.write().get_mut(&session_id) .map(|session| if is_queued_message { session.queue.push_front((sender, message)) } else { session.queue.push_back((sender, message)) }); } pub fn dequeue_decryption_message(&self, session_id: &SessionId, sub_session_id: &Secret) -> Option<(NodeId, DecryptionMessage)> { let session_id = DecryptionSessionId::new(session_id.clone(), sub_session_id.clone()); self.decryption_sessions.write().get_mut(&session_id) .and_then(|session| session.queue.pop_front()) } pub fn on_connection_timeout(&self, node_id: &NodeId) { for encryption_session in self.encryption_sessions.read().values() { encryption_session.session.on_session_timeout(node_id); } for decryption_session in self.decryption_sessions.read().values() { decryption_session.session.on_session_timeout(node_id); } } } impl ClusterData { pub fn new(handle: &Handle, config: ClusterConfiguration, connections: ClusterConnections, sessions: ClusterSessions) -> Arc { Arc::new(ClusterData { handle: handle.remote().clone(), pool: CpuPool::new(config.threads), self_key_pair: config.self_key_pair.clone(), connections: connections, sessions: sessions, config: config, }) } /// Get connection to given node. pub fn connection(&self, node: &NodeId) -> Option> { self.connections.get(node) } /// Spawns a future using thread pool and schedules execution of it with event loop handle. pub fn spawn(&self, f: F) where F: Future + Send + 'static, F::Item: Send + 'static, F::Error: Send + 'static { let pool_work = self.pool.spawn(f); self.handle.spawn(move |_handle| { pool_work.then(|_| finished(())) }) } } impl Connection { pub fn new(is_inbound: bool, connection: NetConnection) -> Arc { Arc::new(Connection { node_id: connection.node_id, node_address: connection.address, is_inbound: is_inbound, stream: connection.stream, key: connection.key, last_message_time: Mutex::new(time::Instant::now()), }) } pub fn is_inbound(&self) -> bool { self.is_inbound } pub fn node_id(&self) -> &NodeId { &self.node_id } pub fn last_message_time(&self) -> time::Instant { *self.last_message_time.lock() } pub fn set_last_message_time(&self, last_message_time: time::Instant) { *self.last_message_time.lock() = last_message_time; } pub fn node_address(&self) -> &SocketAddr { &self.node_address } pub fn send_message(&self, message: Message) -> WriteMessage { write_encrypted_message(self.stream.clone(), &self.key, message) } pub fn read_message(&self) -> ReadMessage { read_encrypted_message(self.stream.clone(), self.key.clone()) } } impl ClusterView { pub fn new(cluster: Arc, nodes: BTreeSet) -> Self { ClusterView { core: Arc::new(Mutex::new(ClusterViewCore { cluster: cluster, nodes: nodes, })), } } } impl Cluster for ClusterView { fn broadcast(&self, message: Message) -> Result<(), Error> { let core = self.core.lock(); for node in core.nodes.iter().filter(|n| *n != core.cluster.self_key_pair.public()) { let connection = core.cluster.connection(node).ok_or(Error::NodeDisconnected)?; core.cluster.spawn(connection.send_message(message.clone())) } Ok(()) } fn send(&self, to: &NodeId, message: Message) -> Result<(), Error> { let core = self.core.lock(); let connection = core.cluster.connection(to).ok_or(Error::NodeDisconnected)?; core.cluster.spawn(connection.send_message(message)); Ok(()) } fn blacklist(&self, _node: &NodeId) { // TODO: unimplemented!() } } impl ClusterClientImpl { pub fn new(data: Arc) -> Self { ClusterClientImpl { data: data, } } } impl ClusterClient for ClusterClientImpl { fn cluster_state(&self) -> ClusterState { self.data.connections.cluster_state() } fn new_encryption_session(&self, session_id: SessionId, threshold: usize) -> Result, Error> { let mut connected_nodes = self.data.connections.connected_nodes(); connected_nodes.insert(self.data.self_key_pair.public().clone()); let cluster = Arc::new(ClusterView::new(self.data.clone(), connected_nodes.clone())); let session = self.data.sessions.new_encryption_session(self.data.self_key_pair.public().clone(), session_id, cluster)?; session.initialize(threshold, connected_nodes)?; Ok(session) } fn new_decryption_session(&self, session_id: SessionId, requestor_signature: Signature, is_shadow_decryption: bool) -> Result, Error> { let mut connected_nodes = self.data.connections.connected_nodes(); connected_nodes.insert(self.data.self_key_pair.public().clone()); let access_key = Random.generate()?.secret().clone(); let cluster = Arc::new(ClusterView::new(self.data.clone(), connected_nodes.clone())); let session = self.data.sessions.new_decryption_session(self.data.self_key_pair.public().clone(), session_id, access_key, cluster)?; session.initialize(requestor_signature, is_shadow_decryption)?; Ok(session) } } fn make_socket_address(address: &str, port: u16) -> Result { let ip_address: IpAddr = address.parse().map_err(|_| Error::InvalidNodeAddress)?; Ok(SocketAddr::new(ip_address, port)) } #[cfg(test)] pub mod tests { use std::sync::Arc; use std::time; use std::collections::VecDeque; use parking_lot::Mutex; use tokio_core::reactor::Core; use ethkey::{Random, Generator}; use key_server_cluster::{NodeId, Error, EncryptionConfiguration, DummyAclStorage, DummyKeyStorage}; use key_server_cluster::message::Message; use key_server_cluster::cluster::{Cluster, ClusterCore, ClusterConfiguration}; #[derive(Debug)] pub struct DummyCluster { id: NodeId, data: Mutex, } #[derive(Debug, Default)] struct DummyClusterData { nodes: Vec, messages: VecDeque<(NodeId, Message)>, } impl DummyCluster { pub fn new(id: NodeId) -> Self { DummyCluster { id: id, data: Mutex::new(DummyClusterData::default()) } } pub fn node(&self) -> NodeId { self.id.clone() } pub fn add_node(&self, node: NodeId) { self.data.lock().nodes.push(node); } pub fn take_message(&self) -> Option<(NodeId, Message)> { self.data.lock().messages.pop_front() } } impl Cluster for DummyCluster { fn broadcast(&self, message: Message) -> Result<(), Error> { let mut data = self.data.lock(); let all_nodes: Vec<_> = data.nodes.iter().cloned().filter(|n| n != &self.id).collect(); for node in all_nodes { data.messages.push_back((node, message.clone())); } Ok(()) } fn send(&self, to: &NodeId, message: Message) -> Result<(), Error> { debug_assert!(&self.id != to); self.data.lock().messages.push_back((to.clone(), message)); Ok(()) } fn blacklist(&self, _node: &NodeId) { } } pub fn loop_until(core: &mut Core, timeout: time::Duration, predicate: F) where F: Fn() -> bool { let start = time::Instant::now(); loop { core.turn(Some(time::Duration::from_millis(1))); if predicate() { break; } if time::Instant::now() - start > timeout { panic!("no result in {:?}", timeout); } } } pub fn all_connections_established(cluster: &Arc) -> bool { cluster.config().nodes.keys() .filter(|p| *p != cluster.config().self_key_pair.public()) .all(|p| cluster.connection(p).is_some()) } pub fn make_clusters(core: &Core, ports_begin: u16, num_nodes: usize) -> Vec> { let key_pairs: Vec<_> = (0..num_nodes).map(|_| Random.generate().unwrap()).collect(); let cluster_params: Vec<_> = (0..num_nodes).map(|i| ClusterConfiguration { threads: 1, self_key_pair: key_pairs[i].clone(), listen_address: ("127.0.0.1".to_owned(), ports_begin + i as u16), nodes: key_pairs.iter().enumerate() .map(|(j, kp)| (kp.public().clone(), ("127.0.0.1".into(), ports_begin + j as u16))) .collect(), allow_connecting_to_higher_nodes: false, encryption_config: EncryptionConfiguration { key_check_timeout_ms: 10, }, key_storage: Arc::new(DummyKeyStorage::default()), acl_storage: Arc::new(DummyAclStorage::default()), }).collect(); let clusters: Vec<_> = cluster_params.into_iter().enumerate() .map(|(_, params)| ClusterCore::new(core.handle(), params).unwrap()) .collect(); clusters } pub fn run_clusters(clusters: &[Arc]) { for cluster in clusters { cluster.run().unwrap(); } } #[test] fn cluster_connects_to_other_nodes() { let mut core = Core::new().unwrap(); let clusters = make_clusters(&core, 6010, 3); run_clusters(&clusters); loop_until(&mut core, time::Duration::from_millis(300), || clusters.iter().all(all_connections_established)); } }