diff --git a/secret_store/src/key_server_cluster/cluster_connections.rs b/secret_store/src/key_server_cluster/cluster_connections.rs deleted file mode 100644 index b484e6d8e..000000000 --- a/secret_store/src/key_server_cluster/cluster_connections.rs +++ /dev/null @@ -1,176 +0,0 @@ -// Copyright 2015-2018 Parity Technologies (UK) Ltd. -// This file is part of Parity. - -// Parity is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity. If not, see . - -use std::collections::BTreeSet; -use std::sync::Arc; -use key_server_cluster::{Error, NodeId}; -use key_server_cluster::message::Message; - -/// Connection to the single node. Provides basic information about connected node and -/// allows sending messages to this node. -pub trait Connection: Send + Sync { - /// Is this inbound connection? This only matters when both nodes are simultaneously establishing - /// two connections to each other. The agreement is that the inbound connection from the node with - /// lower NodeId is used and the other connection is closed. - fn is_inbound(&self) -> bool; - /// Returns id of the connected node. - fn node_id(&self) -> &NodeId; - /// Returns 'address' of the node to use in traces. - fn node_address(&self) -> String; - /// Send message to the connected node. - fn send_message(&self, message: Message); -} - -/// Connections manager. Responsible for keeping us connected to all required nodes. -pub trait ConnectionManager: 'static + Send + Sync { - /// Returns shared reference to connections provider. - fn provider(&self) -> Arc; - /// Try to reach all disconnected nodes immediately. This method is exposed mostly for - /// tests, where all 'nodes' are starting listening for incoming connections first and - /// only after this, they're actually start connecting to each other. - fn connect(&self); -} - -/// Connections provider. Holds all active connections and the set of nodes that we need to -/// connect to. At any moment connection could be lost and the set of connected/disconnected -/// nodes could change (at behalf of the connection manager). -/// Clone operation should be cheap (Arc). -pub trait ConnectionProvider: Send + Sync { - /// Returns the set of currently connected nodes. Error is returned when our node is - /// not a part of the cluster ('isolated' node). - fn connected_nodes(&self) -> Result, Error>; - /// Returns the set of currently disconnected nodes. - fn disconnected_nodes(&self) -> BTreeSet; - /// Returns the reference to the active node connection or None if the node is not connected. - fn connection(&self, node: &NodeId) -> Option>; -} - -#[cfg(test)] -pub mod tests { - use std::collections::{BTreeSet, VecDeque}; - use std::sync::Arc; - use std::sync::atomic::{AtomicBool, Ordering}; - use parking_lot::Mutex; - use key_server_cluster::{Error, NodeId}; - use key_server_cluster::message::Message; - use super::{ConnectionManager, Connection, ConnectionProvider}; - - /// Shared messages queue. - pub type MessagesQueue = Arc>>; - - /// Single node connections. - pub struct TestConnections { - node: NodeId, - is_isolated: AtomicBool, - connected_nodes: Mutex>, - disconnected_nodes: Mutex>, - messages: MessagesQueue, - } - - /// Single connection. - pub struct TestConnection { - from: NodeId, - to: NodeId, - messages: MessagesQueue, - } - - impl TestConnections { - pub fn isolate(&self) { - let connected_nodes = ::std::mem::replace(&mut *self.connected_nodes.lock(), Default::default()); - self.is_isolated.store(true, Ordering::Relaxed); - self.disconnected_nodes.lock().extend(connected_nodes) - } - - pub fn disconnect(&self, node: NodeId) { - self.connected_nodes.lock().remove(&node); - self.disconnected_nodes.lock().insert(node); - } - - pub fn exclude(&self, node: NodeId) { - self.connected_nodes.lock().remove(&node); - self.disconnected_nodes.lock().remove(&node); - } - - pub fn include(&self, node: NodeId) { - self.connected_nodes.lock().insert(node); - } - } - - impl ConnectionManager for Arc { - fn provider(&self) -> Arc { - self.clone() - } - - fn connect(&self) {} - } - - impl ConnectionProvider for TestConnections { - fn connected_nodes(&self) -> Result, Error> { - match self.is_isolated.load(Ordering::Relaxed) { - false => Ok(self.connected_nodes.lock().clone()), - true => Err(Error::NodeDisconnected), - } - } - - fn disconnected_nodes(&self) -> BTreeSet { - self.disconnected_nodes.lock().clone() - } - - fn connection(&self, node: &NodeId) -> Option> { - match self.connected_nodes.lock().contains(node) { - true => Some(Arc::new(TestConnection { - from: self.node, - to: *node, - messages: self.messages.clone(), - })), - false => None, - } - } - } - - impl Connection for TestConnection { - fn is_inbound(&self) -> bool { - false - } - - fn node_id(&self) -> &NodeId { - &self.to - } - - fn node_address(&self) -> String { - format!("{}", self.to) - } - - fn send_message(&self, message: Message) { - self.messages.lock().push_back((self.from, self.to, message)) - } - } - - pub fn new_test_connections( - messages: MessagesQueue, - node: NodeId, - mut nodes: BTreeSet - ) -> Arc { - let is_isolated = !nodes.remove(&node); - Arc::new(TestConnections { - node, - is_isolated: AtomicBool::new(is_isolated), - connected_nodes: Mutex::new(nodes), - disconnected_nodes: Default::default(), - messages, - }) - } -} diff --git a/secret_store/src/key_server_cluster/cluster_connections_net.rs b/secret_store/src/key_server_cluster/cluster_connections_net.rs deleted file mode 100644 index bda7f7dd2..000000000 --- a/secret_store/src/key_server_cluster/cluster_connections_net.rs +++ /dev/null @@ -1,539 +0,0 @@ -// Copyright 2015-2018 Parity Technologies (UK) Ltd. -// This file is part of Parity. - -// Parity is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity. If not, see . - -use std::collections::{BTreeMap, BTreeSet}; -use std::collections::btree_map::Entry; -use std::io; -use std::net::{SocketAddr, IpAddr}; -use std::sync::Arc; -use std::time::{Duration, Instant}; -use futures::{future, Future, Stream}; -use parking_lot::{Mutex, RwLock}; -use tokio::net::{TcpListener, TcpStream}; -use tokio::timer::{Interval, timeout::Error as TimeoutError}; -use tokio_io::IoFuture; -use ethkey::KeyPair; -use parity_runtime::Executor; -use key_server_cluster::{Error, NodeId, ClusterConfiguration, NodeKeyPair}; -use key_server_cluster::cluster_connections::{ConnectionProvider, Connection, ConnectionManager}; -use key_server_cluster::connection_trigger::{Maintain, ConnectionTrigger}; -use key_server_cluster::cluster_message_processor::MessageProcessor; -use key_server_cluster::io::{DeadlineStatus, ReadMessage, SharedTcpStream, - read_encrypted_message, WriteMessage, write_encrypted_message}; -use key_server_cluster::message::{self, ClusterMessage, Message}; -use key_server_cluster::net::{accept_connection as io_accept_connection, - connect as io_connect, Connection as IoConnection}; - -/// Empty future. -pub type BoxedEmptyFuture = Box + Send>; - -/// Maintain interval (seconds). Every MAINTAIN_INTERVAL seconds node: -/// 1) checks if connected nodes are responding to KeepAlive messages -/// 2) tries to connect to disconnected nodes -/// 3) checks if enc/dec sessions are time-outed -const MAINTAIN_INTERVAL: u64 = 10; - -/// When no messages have been received from node within KEEP_ALIVE_SEND_INTERVAL seconds, -/// we must send KeepAlive message to the node to check if it still responds to messages. -const KEEP_ALIVE_SEND_INTERVAL: Duration = Duration::from_secs(30); -/// When no messages have been received from node within KEEP_ALIVE_DISCONNECT_INTERVAL seconds, -/// we must treat this node as non-responding && disconnect from it. -const KEEP_ALIVE_DISCONNECT_INTERVAL: Duration = Duration::from_secs(60); - -/// Network connection manager configuration. -pub struct NetConnectionsManagerConfig { - /// Allow connecting to 'higher' nodes. - pub allow_connecting_to_higher_nodes: bool, - /// Interface to listen to. - pub listen_address: (String, u16), - /// True if we should autostart key servers set change session when servers set changes? - /// This will only work when servers set is configured using KeyServerSet contract. - pub auto_migrate_enabled: bool, -} - -/// Network connections manager. -pub struct NetConnectionsManager { - /// Address we're listening for incoming connections. - listen_address: SocketAddr, - /// Shared cluster connections data reference. - data: Arc, -} - -/// Network connections data. Shared among NetConnectionsManager and spawned futures. -struct NetConnectionsData { - /// Allow connecting to 'higher' nodes. - allow_connecting_to_higher_nodes: bool, - /// Reference to tokio task executor. - executor: Executor, - /// Key pair of this node. - self_key_pair: Arc, - /// Network messages processor. - message_processor: Arc, - /// Connections trigger. - trigger: Mutex>, - /// Mutable connection data. - container: Arc>, -} - -/// Network connections container. This is the only mutable data of NetConnectionsManager. -/// The set of nodes is mutated by the connection trigger and the connections set is also -/// mutated by spawned futures. -pub struct NetConnectionsContainer { - /// Is this node isolated from cluster? - pub is_isolated: bool, - /// Current key servers set. - pub nodes: BTreeMap, - /// Active connections to key servers. - pub connections: BTreeMap>, -} - -/// Network connection to single key server node. -pub struct NetConnection { - executor: Executor, - /// Id of the peer node. - node_id: NodeId, - /// Address of the peer node. - node_address: SocketAddr, - /// Is this inbound (true) or outbound (false) connection? - is_inbound: bool, - /// Key pair that is used to encrypt connection' messages. - key: KeyPair, - /// Last message time. - last_message_time: RwLock, - /// Underlying TCP stream. - stream: SharedTcpStream, -} - -impl NetConnectionsManager { - /// Create new network connections manager. - pub fn new( - executor: Executor, - message_processor: Arc, - trigger: Box, - container: Arc>, - config: &ClusterConfiguration, - net_config: NetConnectionsManagerConfig, - ) -> Result { - let listen_address = make_socket_address( - &net_config.listen_address.0, - net_config.listen_address.1)?; - - Ok(NetConnectionsManager { - listen_address, - data: Arc::new(NetConnectionsData { - allow_connecting_to_higher_nodes: net_config.allow_connecting_to_higher_nodes, - executor, - message_processor, - self_key_pair: config.self_key_pair.clone(), - trigger: Mutex::new(trigger), - container, - }), - }) - } - - /// Start listening for connections and schedule connections maintenance. - pub fn start(&self) -> Result<(), Error> { - net_listen(&self.listen_address, self.data.clone())?; - net_schedule_maintain(self.data.clone()); - Ok(()) - } -} - -impl ConnectionManager for NetConnectionsManager { - fn provider(&self) -> Arc { - self.data.container.clone() - } - - fn connect(&self) { - net_connect_disconnected(self.data.clone()); - } -} - -impl ConnectionProvider for RwLock { - fn connected_nodes(&self) -> Result, Error> { - let connections = self.read(); - if connections.is_isolated { - return Err(Error::NodeDisconnected); - } - - Ok(connections.connections.keys().cloned().collect()) - } - - fn disconnected_nodes(&self) -> BTreeSet { - let connections = self.read(); - connections.nodes.keys() - .filter(|node_id| !connections.connections.contains_key(node_id)) - .cloned() - .collect() - } - - fn connection(&self, node: &NodeId) -> Option> { - match self.read().connections.get(node).cloned() { - Some(connection) => Some(connection), - None => None, - } - } -} - -impl NetConnection { - /// Create new connection. - pub fn new(executor: Executor, is_inbound: bool, connection: IoConnection) -> NetConnection { - NetConnection { - executor, - node_id: connection.node_id, - node_address: connection.address, - is_inbound: is_inbound, - stream: connection.stream, - key: connection.key, - last_message_time: RwLock::new(Instant::now()), - } - } - - /// Get last message time. - pub fn last_message_time(&self) -> Instant { - *self.last_message_time.read() - } - - /// Update last message time - pub fn set_last_message_time(&self, last_message_time: Instant) { - *self.last_message_time.write() = last_message_time - } - - /// Returns future that sends encrypted message over this connection. - pub fn send_message_future(&self, message: Message) -> WriteMessage { - write_encrypted_message(self.stream.clone(), &self.key, message) - } - - /// Returns future that reads encrypted message from this connection. - pub fn read_message_future(&self) -> ReadMessage { - read_encrypted_message(self.stream.clone(), self.key.clone()) - } -} - -impl Connection for NetConnection { - fn is_inbound(&self) -> bool { - self.is_inbound - } - - fn node_id(&self) -> &NodeId { - &self.node_id - } - - fn node_address(&self) -> String { - format!("{}", self.node_address) - } - - fn send_message(&self, message: Message) { - execute(&self.executor, self.send_message_future(message).then(|_| Ok(()))); - } -} - -impl NetConnectionsData { - /// Executes closure for each active connection. - pub fn active_connections(&self) -> Vec> { - self.container.read().connections.values().cloned().collect() - } - - /// Executes closure for each disconnected node. - pub fn disconnected_nodes(&self) -> Vec<(NodeId, SocketAddr)> { - let container = self.container.read(); - container.nodes.iter() - .filter(|(node_id, _)| !container.connections.contains_key(node_id)) - .map(|(node_id, addr)| (*node_id, *addr)) - .collect() - } - - /// Try to insert new connection. Returns true if connection has been inserted. - /// Returns false (and ignores connections) if: - /// - we do not expect connection from this node - /// - we are already connected to the node and existing connection 'supersede' - /// new connection by agreement - pub fn insert(&self, connection: Arc) -> bool { - let node = *connection.node_id(); - let mut container = self.container.write(); - if !container.nodes.contains_key(&node) { - trace!(target: "secretstore_net", "{}: ignoring unknown connection from {} at {}", - self.self_key_pair.public(), node, connection.node_address()); - return false; - } - - if container.connections.contains_key(&node) { - // we have already connected to the same node - // the agreement is that node with lower id must establish connection to node with higher id - if (*self.self_key_pair.public() < node && connection.is_inbound()) - || (*self.self_key_pair.public() > node && !connection.is_inbound()) { - return false; - } - } - - trace!(target: "secretstore_net", - "{}: inserting connection to {} at {}. Connected to {} of {} nodes", - self.self_key_pair.public(), node, connection.node_address(), - container.connections.len() + 1, container.nodes.len()); - container.connections.insert(node, connection); - - true - } - - /// Tries to remove connection. Returns true if connection has been removed. - /// Returns false if we do not know this connection. - pub fn remove(&self, connection: &NetConnection) -> bool { - let node_id = *connection.node_id(); - let is_inbound = connection.is_inbound(); - let mut container = self.container.write(); - if let Entry::Occupied(entry) = container.connections.entry(node_id) { - if entry.get().is_inbound() != is_inbound { - return false; - } - - trace!(target: "secretstore_net", "{}: removing connection to {} at {}", - self.self_key_pair.public(), node_id, entry.get().node_address()); - entry.remove_entry(); - - true - } else { - false - } - } -} - -/// Listen incoming connections. -fn net_listen( - listen_address: &SocketAddr, - data: Arc, -) -> Result<(), Error> { - execute(&data.executor, net_listen_future(listen_address, data.clone())?); - Ok(()) -} - -/// Listen incoming connections future. -fn net_listen_future( - listen_address: &SocketAddr, - data: Arc, -) -> Result { - Ok(Box::new(TcpListener::bind(listen_address)? - .incoming() - .and_then(move |stream| { - net_accept_connection(data.clone(), stream); - Ok(()) - }) - .for_each(|_| Ok(())) - .then(|_| future::ok(())))) -} - -/// Accept incoming connection. -fn net_accept_connection( - data: Arc, - stream: TcpStream, -) { - execute(&data.executor, net_accept_connection_future(data.clone(), stream)); -} - -/// Accept incoming connection future. -fn net_accept_connection_future(data: Arc, stream: TcpStream) -> BoxedEmptyFuture { - Box::new(io_accept_connection(stream, data.self_key_pair.clone()) - .then(move |result| net_process_connection_result(data, None, result)) - .then(|_| future::ok(()))) -} - -/// Connect to remote node. -fn net_connect( - data: Arc, - remote: SocketAddr, -) { - execute(&data.executor, net_connect_future(data.clone(), remote)); -} - -/// Connect to remote node future. -fn net_connect_future( - data: Arc, - remote: SocketAddr, -) -> BoxedEmptyFuture { - let disconnected_nodes = data.container.disconnected_nodes(); - Box::new(io_connect(&remote, data.self_key_pair.clone(), disconnected_nodes) - .then(move |result| net_process_connection_result(data, Some(remote), result)) - .then(|_| future::ok(()))) -} - -/// Process network connection result. -fn net_process_connection_result( - data: Arc, - outbound_addr: Option, - result: Result>, TimeoutError>, -) -> IoFuture> { - match result { - Ok(DeadlineStatus::Meet(Ok(connection))) => { - let connection = Arc::new(NetConnection::new(data.executor.clone(), outbound_addr.is_none(), connection)); - if data.insert(connection.clone()) { - let maintain_action = data.trigger.lock().on_connection_established(connection.node_id()); - maintain_connection_trigger(data.clone(), maintain_action); - - return net_process_connection_messages(data, connection); - } - }, - Ok(DeadlineStatus::Meet(Err(err))) => { - warn!(target: "secretstore_net", "{}: protocol error '{}' when establishing {} connection{}", - data.self_key_pair.public(), err, if outbound_addr.is_some() { "outbound" } else { "inbound" }, - outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default()); - }, - Ok(DeadlineStatus::Timeout) => { - warn!(target: "secretstore_net", "{}: timeout when establishing {} connection{}", - data.self_key_pair.public(), if outbound_addr.is_some() { "outbound" } else { "inbound" }, - outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default()); - }, - Err(err) => { - warn!(target: "secretstore_net", "{}: network error '{}' when establishing {} connection{}", - data.self_key_pair.public(), err, if outbound_addr.is_some() { "outbound" } else { "inbound" }, - outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default()); - }, - } - - Box::new(future::ok(Ok(()))) -} - -/// Process connection messages. -fn net_process_connection_messages( - data: Arc, - connection: Arc, -) -> IoFuture> { - Box::new(connection - .read_message_future() - .then(move |result| - match result { - Ok((_, Ok(message))) => { - connection.set_last_message_time(Instant::now()); - data.message_processor.process_connection_message(connection.clone(), message); - // continue serving connection - let process_messages_future = net_process_connection_messages( - data.clone(), connection).then(|_| Ok(())); - execute(&data.executor, process_messages_future); - Box::new(future::ok(Ok(()))) - }, - Ok((_, Err(err))) => { - warn!(target: "secretstore_net", "{}: protocol error '{}' when reading message from node {}", - data.self_key_pair.public(), err, connection.node_id()); - // continue serving connection - let process_messages_future = net_process_connection_messages( - data.clone(), connection).then(|_| Ok(())); - execute(&data.executor, process_messages_future); - Box::new(future::ok(Err(err))) - }, - Err(err) => { - let node_id = *connection.node_id(); - warn!(target: "secretstore_net", "{}: network error '{}' when reading message from node {}", - data.self_key_pair.public(), err, node_id); - // close connection - if data.remove(&*connection) { - let maintain_action = data.trigger.lock().on_connection_closed(&node_id); - maintain_connection_trigger(data, maintain_action); - } - Box::new(future::err(err)) - }, - } - )) -} - -/// Schedule connections. maintain. -fn net_schedule_maintain(data: Arc) { - let closure_data = data.clone(); - execute(&data.executor, Interval::new_interval(Duration::new(MAINTAIN_INTERVAL, 0)) - .and_then(move |_| Ok(net_maintain(closure_data.clone()))) - .for_each(|_| Ok(())) - .then(|_| future::ok(()))); -} - -/// Maintain network connections. -fn net_maintain(data: Arc) { - trace!(target: "secretstore_net", "{}: executing maintain procedures", data.self_key_pair.public()); - - update_nodes_set(data.clone()); - data.message_processor.maintain_sessions(); - net_keep_alive(data.clone()); - net_connect_disconnected(data); -} - -/// Send keep alive messages to remote nodes. -fn net_keep_alive(data: Arc) { - let now = Instant::now(); - let active_connections = data.active_connections(); - for connection in active_connections { - let last_message_diff = now - connection.last_message_time(); - if last_message_diff > KEEP_ALIVE_DISCONNECT_INTERVAL { - warn!(target: "secretstore_net", "{}: keep alive timeout for node {}", - data.self_key_pair.public(), connection.node_id()); - - let node_id = *connection.node_id(); - if data.remove(&*connection) { - let maintain_action = data.trigger.lock().on_connection_closed(&node_id); - maintain_connection_trigger(data.clone(), maintain_action); - } - data.message_processor.process_disconnect(&node_id); - } - else if last_message_diff > KEEP_ALIVE_SEND_INTERVAL { - connection.send_message(Message::Cluster(ClusterMessage::KeepAlive(message::KeepAlive {}))); - } - } -} - -/// Connect disconnected nodes. -fn net_connect_disconnected(data: Arc) { - let disconnected_nodes = data.disconnected_nodes(); - for (node_id, address) in disconnected_nodes { - if data.allow_connecting_to_higher_nodes || *data.self_key_pair.public() < node_id { - net_connect(data.clone(), address); - } - } -} - -/// Schedule future execution. -fn execute + Send + 'static>(executor: &Executor, f: F) { - if let Err(err) = future::Executor::execute(executor, Box::new(f)) { - error!("Secret store runtime unable to spawn task. Runtime is shutting down. ({:?})", err); - } -} - -/// Try to update active nodes set from connection trigger. -fn update_nodes_set(data: Arc) { - let maintain_action = data.trigger.lock().on_maintain(); - maintain_connection_trigger(data, maintain_action); -} - -/// Execute maintain procedures of connections trigger. -fn maintain_connection_trigger(data: Arc, maintain_action: Option) { - if maintain_action == Some(Maintain::SessionAndConnections) || maintain_action == Some(Maintain::Session) { - let session_params = data.trigger.lock().maintain_session(); - if let Some(session_params) = session_params { - let session = data.message_processor.start_servers_set_change_session(session_params); - match session { - Ok(_) => trace!(target: "secretstore_net", "{}: started auto-migrate session", - data.self_key_pair.public()), - Err(err) => trace!(target: "secretstore_net", "{}: failed to start auto-migrate session with: {}", - data.self_key_pair.public(), err), - } - } - } - if maintain_action == Some(Maintain::SessionAndConnections) || maintain_action == Some(Maintain::Connections) { - let mut trigger = data.trigger.lock(); - let mut data = data.container.write(); - trigger.maintain_connections(&mut *data); - } -} - -/// Compose SocketAddr from configuration' address and port. -fn make_socket_address(address: &str, port: u16) -> Result { - let ip_address: IpAddr = address.parse().map_err(|_| Error::InvalidNodeAddress)?; - Ok(SocketAddr::new(ip_address, port)) -} diff --git a/secret_store/src/key_server_cluster/cluster_message_processor.rs b/secret_store/src/key_server_cluster/cluster_message_processor.rs deleted file mode 100644 index b4ba5ef03..000000000 --- a/secret_store/src/key_server_cluster/cluster_message_processor.rs +++ /dev/null @@ -1,357 +0,0 @@ -// Copyright 2015-2018 Parity Technologies (UK) Ltd. -// This file is part of Parity. - -// Parity is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity. If not, see . - -use std::sync::Arc; -use key_server_cluster::{Error, NodeId, NodeKeyPair}; -use key_server_cluster::cluster::{ServersSetChangeParams, new_servers_set_change_session}; -use key_server_cluster::cluster_sessions::{AdminSession}; -use key_server_cluster::cluster_connections::{ConnectionProvider, Connection}; -use key_server_cluster::cluster_sessions::{ClusterSession, ClusterSessions, ClusterSessionsContainer, - create_cluster_view}; -use key_server_cluster::cluster_sessions_creator::{ClusterSessionCreator, IntoSessionId}; -use key_server_cluster::message::{self, Message, ClusterMessage}; -use key_server_cluster::key_version_negotiation_session::{SessionImpl as KeyVersionNegotiationSession, - IsolatedSessionTransport as KeyVersionNegotiationSessionTransport, ContinueAction}; -use key_server_cluster::connection_trigger::ServersSetChangeSessionCreatorConnector; - -/// Something that is able to process signals/messages from other nodes. -pub trait MessageProcessor: Send + Sync { - /// Process disconnect from the remote node. - fn process_disconnect(&self, node: &NodeId); - /// Process single message from the connection. - fn process_connection_message(&self, connection: Arc, message: Message); - - /// Start servers set change session. This is typically used by ConnectionManager when - /// it detects that auto-migration session needs to be started. - fn start_servers_set_change_session(&self, params: ServersSetChangeParams) -> Result, Error>; - /// Try to continue session after key version negotiation session is completed. - fn try_continue_session( - &self, - session: Option>> - ); - /// Maintain active sessions. Typically called by the ConnectionManager at some intervals. - /// Should cancel stalled sessions and send keep-alive messages for sessions that support it. - fn maintain_sessions(&self); -} - -/// Bridge between ConnectionManager and ClusterSessions. -pub struct SessionsMessageProcessor { - self_key_pair: Arc, - servers_set_change_creator_connector: Arc, - sessions: Arc, - connections: Arc, -} - -impl SessionsMessageProcessor { - /// Create new instance of SessionsMessageProcessor. - pub fn new( - self_key_pair: Arc, - servers_set_change_creator_connector: Arc, - sessions: Arc, - connections: Arc, - ) -> Self { - SessionsMessageProcessor { - self_key_pair, - servers_set_change_creator_connector, - sessions, - connections, - } - } - - /// Process single session message from connection. - fn process_message, D>( - &self, - sessions: &ClusterSessionsContainer, - connection: Arc, - mut message: Message, - ) -> Option> - where - Message: IntoSessionId - { - // get or create new session, if required - let mut sender = *connection.node_id(); - let session = self.prepare_session(sessions, &sender, &message); - // send error if session is not found, or failed to create - let session = match session { - Ok(session) => session, - Err(error) => { - // this is new session => it is not yet in container - warn!(target: "secretstore_net", - "{}: {} session read error '{}' when requested for session from node {}", - self.self_key_pair.public(), S::type_name(), error, sender); - if !message.is_error_message() { - let qed = "session_id only fails for cluster messages; - only session messages are passed to process_message; - qed"; - let session_id = message.into_session_id().expect(qed); - let session_nonce = message.session_nonce().expect(qed); - - connection.send_message(SC::make_error_message(session_id, session_nonce, error)); - } - return None; - }, - }; - - let session_id = session.id(); - let mut is_queued_message = false; - loop { - let message_result = session.on_message(&sender, &message); - match message_result { - Ok(_) => { - // if session is completed => stop - if session.is_finished() { - info!(target: "secretstore_net", - "{}: {} session completed", self.self_key_pair.public(), S::type_name()); - sessions.remove(&session_id); - return Some(session); - } - - // try to dequeue message - match sessions.dequeue_message(&session_id) { - Some((msg_sender, msg)) => { - is_queued_message = true; - sender = msg_sender; - message = msg; - }, - None => return Some(session), - } - }, - Err(Error::TooEarlyForRequest) => { - sessions.enqueue_message(&session_id, sender, message, is_queued_message); - return Some(session); - }, - Err(err) => { - warn!( - target: "secretstore_net", - "{}: {} session error '{}' when processing message {} from node {}", - self.self_key_pair.public(), - S::type_name(), - err, - message, - sender); - session.on_session_error(self.self_key_pair.public(), err); - sessions.remove(&session_id); - return Some(session); - }, - } - } - } - - /// Get or insert new session. - fn prepare_session, D>( - &self, - sessions: &ClusterSessionsContainer, - sender: &NodeId, - message: &Message - ) -> Result, Error> - where - Message: IntoSessionId - { - fn requires_all_connections(message: &Message) -> bool { - match *message { - Message::Generation(_) => true, - Message::ShareAdd(_) => true, - Message::ServersSetChange(_) => true, - _ => false, - } - } - - // get or create new session, if required - let session_id = message.into_session_id() - .expect("into_session_id fails for cluster messages only; - only session messages are passed to prepare_session; - qed"); - let is_initialization_message = message.is_initialization_message(); - let is_delegation_message = message.is_delegation_message(); - match is_initialization_message || is_delegation_message { - false => sessions.get(&session_id, true).ok_or(Error::NoActiveSessionWithId), - true => { - let creation_data = SC::creation_data_from_message(&message)?; - let master = if is_initialization_message { - *sender - } else { - *self.self_key_pair.public() - }; - let cluster = create_cluster_view( - self.self_key_pair.clone(), - self.connections.clone(), - requires_all_connections(&message))?; - - let nonce = Some(message.session_nonce().ok_or(Error::InvalidMessage)?); - let exclusive = message.is_exclusive_session_message(); - sessions.insert(cluster, master, session_id, nonce, exclusive, creation_data) - }, - } - } - - /// Process single cluster message from the connection. - fn process_cluster_message(&self, connection: Arc, message: ClusterMessage) { - match message { - ClusterMessage::KeepAlive(_) => { - let msg = Message::Cluster(ClusterMessage::KeepAliveResponse(message::KeepAliveResponse { - session_id: None, - })); - connection.send_message(msg) - }, - ClusterMessage::KeepAliveResponse(msg) => if let Some(session_id) = msg.session_id { - self.sessions.on_session_keep_alive(connection.node_id(), session_id.into()); - }, - _ => warn!(target: "secretstore_net", "{}: received unexpected message {} from node {} at {}", - self.self_key_pair.public(), message, connection.node_id(), connection.node_address()), - } - } -} - -impl MessageProcessor for SessionsMessageProcessor { - fn process_disconnect(&self, node: &NodeId) { - self.sessions.on_connection_timeout(node); - } - - fn process_connection_message(&self, connection: Arc, message: Message) { - trace!(target: "secretstore_net", "{}: received message {} from {}", - self.self_key_pair.public(), message, connection.node_id()); - - // error is ignored as we only process errors on session level - match message { - Message::Generation(message) => self - .process_message(&self.sessions.generation_sessions, connection, Message::Generation(message)) - .map(|_| ()).unwrap_or_default(), - Message::Encryption(message) => self - .process_message(&self.sessions.encryption_sessions, connection, Message::Encryption(message)) - .map(|_| ()).unwrap_or_default(), - Message::Decryption(message) => self - .process_message(&self.sessions.decryption_sessions, connection, Message::Decryption(message)) - .map(|_| ()).unwrap_or_default(), - Message::SchnorrSigning(message) => self - .process_message(&self.sessions.schnorr_signing_sessions, connection, Message::SchnorrSigning(message)) - .map(|_| ()).unwrap_or_default(), - Message::EcdsaSigning(message) => self - .process_message(&self.sessions.ecdsa_signing_sessions, connection, Message::EcdsaSigning(message)) - .map(|_| ()).unwrap_or_default(), - Message::ServersSetChange(message) => { - let message = Message::ServersSetChange(message); - let is_initialization_message = message.is_initialization_message(); - let session = self.process_message(&self.sessions.admin_sessions, connection, message); - if is_initialization_message { - if let Some(session) = session { - self.servers_set_change_creator_connector - .set_key_servers_set_change_session(session.clone()); - } - } - }, - Message::KeyVersionNegotiation(message) => { - let session = self.process_message( - &self.sessions.negotiation_sessions, connection, Message::KeyVersionNegotiation(message)); - self.try_continue_session(session); - }, - Message::ShareAdd(message) => self.process_message( - &self.sessions.admin_sessions, connection, Message::ShareAdd(message)) - .map(|_| ()).unwrap_or_default(), - Message::Cluster(message) => self.process_cluster_message(connection, message), - } - } - - fn try_continue_session( - &self, - session: Option>> - ) { - if let Some(session) = session { - let meta = session.meta(); - let is_master_node = meta.self_node_id == meta.master_node_id; - if is_master_node && session.is_finished() { - self.sessions.negotiation_sessions.remove(&session.id()); - match session.wait() { - Ok(Some((version, master))) => match session.take_continue_action() { - Some(ContinueAction::Decrypt( - session, origin, is_shadow_decryption, is_broadcast_decryption - )) => { - let initialization_error = if self.self_key_pair.public() == &master { - session.initialize( - origin, version, is_shadow_decryption, is_broadcast_decryption) - } else { - session.delegate( - master, origin, version, is_shadow_decryption, is_broadcast_decryption) - }; - - if let Err(error) = initialization_error { - session.on_session_error(&meta.self_node_id, error); - self.sessions.decryption_sessions.remove(&session.id()); - } - }, - Some(ContinueAction::SchnorrSign(session, message_hash)) => { - let initialization_error = if self.self_key_pair.public() == &master { - session.initialize(version, message_hash) - } else { - session.delegate(master, version, message_hash) - }; - - if let Err(error) = initialization_error { - session.on_session_error(&meta.self_node_id, error); - self.sessions.schnorr_signing_sessions.remove(&session.id()); - } - }, - Some(ContinueAction::EcdsaSign(session, message_hash)) => { - let initialization_error = if self.self_key_pair.public() == &master { - session.initialize(version, message_hash) - } else { - session.delegate(master, version, message_hash) - }; - - if let Err(error) = initialization_error { - session.on_session_error(&meta.self_node_id, error); - self.sessions.ecdsa_signing_sessions.remove(&session.id()); - } - }, - None => (), - }, - Ok(None) => unreachable!("is_master_node; session is finished; - negotiation version always finished with result on master; - qed"), - Err(error) => match session.take_continue_action() { - Some(ContinueAction::Decrypt(session, _, _, _)) => { - session.on_session_error(&meta.self_node_id, error); - self.sessions.decryption_sessions.remove(&session.id()); - }, - Some(ContinueAction::SchnorrSign(session, _)) => { - session.on_session_error(&meta.self_node_id, error); - self.sessions.schnorr_signing_sessions.remove(&session.id()); - }, - Some(ContinueAction::EcdsaSign(session, _)) => { - session.on_session_error(&meta.self_node_id, error); - self.sessions.ecdsa_signing_sessions.remove(&session.id()); - }, - None => (), - }, - } - } - } - } - - fn maintain_sessions(&self) { - self.sessions.stop_stalled_sessions(); - self.sessions.sessions_keep_alive(); - } - - fn start_servers_set_change_session(&self, params: ServersSetChangeParams) -> Result, Error> { - new_servers_set_change_session( - self.self_key_pair.clone(), - &*self.sessions, - self.connections.clone(), - self.servers_set_change_creator_connector.clone(), - params, - ) - } -}