removed secret_store folder (#10722)
This commit is contained in:
parent
6be45367e9
commit
eed630a002
@ -1,176 +0,0 @@
|
|||||||
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
|
|
||||||
// This file is part of Parity.
|
|
||||||
|
|
||||||
// Parity is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// Parity is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU General Public License
|
|
||||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
use std::collections::BTreeSet;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use key_server_cluster::{Error, NodeId};
|
|
||||||
use key_server_cluster::message::Message;
|
|
||||||
|
|
||||||
/// Connection to the single node. Provides basic information about connected node and
|
|
||||||
/// allows sending messages to this node.
|
|
||||||
pub trait Connection: Send + Sync {
|
|
||||||
/// Is this inbound connection? This only matters when both nodes are simultaneously establishing
|
|
||||||
/// two connections to each other. The agreement is that the inbound connection from the node with
|
|
||||||
/// lower NodeId is used and the other connection is closed.
|
|
||||||
fn is_inbound(&self) -> bool;
|
|
||||||
/// Returns id of the connected node.
|
|
||||||
fn node_id(&self) -> &NodeId;
|
|
||||||
/// Returns 'address' of the node to use in traces.
|
|
||||||
fn node_address(&self) -> String;
|
|
||||||
/// Send message to the connected node.
|
|
||||||
fn send_message(&self, message: Message);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Connections manager. Responsible for keeping us connected to all required nodes.
|
|
||||||
pub trait ConnectionManager: 'static + Send + Sync {
|
|
||||||
/// Returns shared reference to connections provider.
|
|
||||||
fn provider(&self) -> Arc<ConnectionProvider>;
|
|
||||||
/// Try to reach all disconnected nodes immediately. This method is exposed mostly for
|
|
||||||
/// tests, where all 'nodes' are starting listening for incoming connections first and
|
|
||||||
/// only after this, they're actually start connecting to each other.
|
|
||||||
fn connect(&self);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Connections provider. Holds all active connections and the set of nodes that we need to
|
|
||||||
/// connect to. At any moment connection could be lost and the set of connected/disconnected
|
|
||||||
/// nodes could change (at behalf of the connection manager).
|
|
||||||
/// Clone operation should be cheap (Arc).
|
|
||||||
pub trait ConnectionProvider: Send + Sync {
|
|
||||||
/// Returns the set of currently connected nodes. Error is returned when our node is
|
|
||||||
/// not a part of the cluster ('isolated' node).
|
|
||||||
fn connected_nodes(&self) -> Result<BTreeSet<NodeId>, Error>;
|
|
||||||
/// Returns the set of currently disconnected nodes.
|
|
||||||
fn disconnected_nodes(&self) -> BTreeSet<NodeId>;
|
|
||||||
/// Returns the reference to the active node connection or None if the node is not connected.
|
|
||||||
fn connection(&self, node: &NodeId) -> Option<Arc<Connection>>;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
pub mod tests {
|
|
||||||
use std::collections::{BTreeSet, VecDeque};
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
|
||||||
use parking_lot::Mutex;
|
|
||||||
use key_server_cluster::{Error, NodeId};
|
|
||||||
use key_server_cluster::message::Message;
|
|
||||||
use super::{ConnectionManager, Connection, ConnectionProvider};
|
|
||||||
|
|
||||||
/// Shared messages queue.
|
|
||||||
pub type MessagesQueue = Arc<Mutex<VecDeque<(NodeId, NodeId, Message)>>>;
|
|
||||||
|
|
||||||
/// Single node connections.
|
|
||||||
pub struct TestConnections {
|
|
||||||
node: NodeId,
|
|
||||||
is_isolated: AtomicBool,
|
|
||||||
connected_nodes: Mutex<BTreeSet<NodeId>>,
|
|
||||||
disconnected_nodes: Mutex<BTreeSet<NodeId>>,
|
|
||||||
messages: MessagesQueue,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Single connection.
|
|
||||||
pub struct TestConnection {
|
|
||||||
from: NodeId,
|
|
||||||
to: NodeId,
|
|
||||||
messages: MessagesQueue,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TestConnections {
|
|
||||||
pub fn isolate(&self) {
|
|
||||||
let connected_nodes = ::std::mem::replace(&mut *self.connected_nodes.lock(), Default::default());
|
|
||||||
self.is_isolated.store(true, Ordering::Relaxed);
|
|
||||||
self.disconnected_nodes.lock().extend(connected_nodes)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn disconnect(&self, node: NodeId) {
|
|
||||||
self.connected_nodes.lock().remove(&node);
|
|
||||||
self.disconnected_nodes.lock().insert(node);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn exclude(&self, node: NodeId) {
|
|
||||||
self.connected_nodes.lock().remove(&node);
|
|
||||||
self.disconnected_nodes.lock().remove(&node);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn include(&self, node: NodeId) {
|
|
||||||
self.connected_nodes.lock().insert(node);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ConnectionManager for Arc<TestConnections> {
|
|
||||||
fn provider(&self) -> Arc<ConnectionProvider> {
|
|
||||||
self.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn connect(&self) {}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ConnectionProvider for TestConnections {
|
|
||||||
fn connected_nodes(&self) -> Result<BTreeSet<NodeId>, Error> {
|
|
||||||
match self.is_isolated.load(Ordering::Relaxed) {
|
|
||||||
false => Ok(self.connected_nodes.lock().clone()),
|
|
||||||
true => Err(Error::NodeDisconnected),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn disconnected_nodes(&self) -> BTreeSet<NodeId> {
|
|
||||||
self.disconnected_nodes.lock().clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn connection(&self, node: &NodeId) -> Option<Arc<Connection>> {
|
|
||||||
match self.connected_nodes.lock().contains(node) {
|
|
||||||
true => Some(Arc::new(TestConnection {
|
|
||||||
from: self.node,
|
|
||||||
to: *node,
|
|
||||||
messages: self.messages.clone(),
|
|
||||||
})),
|
|
||||||
false => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Connection for TestConnection {
|
|
||||||
fn is_inbound(&self) -> bool {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
|
|
||||||
fn node_id(&self) -> &NodeId {
|
|
||||||
&self.to
|
|
||||||
}
|
|
||||||
|
|
||||||
fn node_address(&self) -> String {
|
|
||||||
format!("{}", self.to)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn send_message(&self, message: Message) {
|
|
||||||
self.messages.lock().push_back((self.from, self.to, message))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn new_test_connections(
|
|
||||||
messages: MessagesQueue,
|
|
||||||
node: NodeId,
|
|
||||||
mut nodes: BTreeSet<NodeId>
|
|
||||||
) -> Arc<TestConnections> {
|
|
||||||
let is_isolated = !nodes.remove(&node);
|
|
||||||
Arc::new(TestConnections {
|
|
||||||
node,
|
|
||||||
is_isolated: AtomicBool::new(is_isolated),
|
|
||||||
connected_nodes: Mutex::new(nodes),
|
|
||||||
disconnected_nodes: Default::default(),
|
|
||||||
messages,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,539 +0,0 @@
|
|||||||
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
|
|
||||||
// This file is part of Parity.
|
|
||||||
|
|
||||||
// Parity is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// Parity is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU General Public License
|
|
||||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
use std::collections::{BTreeMap, BTreeSet};
|
|
||||||
use std::collections::btree_map::Entry;
|
|
||||||
use std::io;
|
|
||||||
use std::net::{SocketAddr, IpAddr};
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::time::{Duration, Instant};
|
|
||||||
use futures::{future, Future, Stream};
|
|
||||||
use parking_lot::{Mutex, RwLock};
|
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
|
||||||
use tokio::timer::{Interval, timeout::Error as TimeoutError};
|
|
||||||
use tokio_io::IoFuture;
|
|
||||||
use ethkey::KeyPair;
|
|
||||||
use parity_runtime::Executor;
|
|
||||||
use key_server_cluster::{Error, NodeId, ClusterConfiguration, NodeKeyPair};
|
|
||||||
use key_server_cluster::cluster_connections::{ConnectionProvider, Connection, ConnectionManager};
|
|
||||||
use key_server_cluster::connection_trigger::{Maintain, ConnectionTrigger};
|
|
||||||
use key_server_cluster::cluster_message_processor::MessageProcessor;
|
|
||||||
use key_server_cluster::io::{DeadlineStatus, ReadMessage, SharedTcpStream,
|
|
||||||
read_encrypted_message, WriteMessage, write_encrypted_message};
|
|
||||||
use key_server_cluster::message::{self, ClusterMessage, Message};
|
|
||||||
use key_server_cluster::net::{accept_connection as io_accept_connection,
|
|
||||||
connect as io_connect, Connection as IoConnection};
|
|
||||||
|
|
||||||
/// Empty future.
|
|
||||||
pub type BoxedEmptyFuture = Box<Future<Item = (), Error = ()> + Send>;
|
|
||||||
|
|
||||||
/// Maintain interval (seconds). Every MAINTAIN_INTERVAL seconds node:
|
|
||||||
/// 1) checks if connected nodes are responding to KeepAlive messages
|
|
||||||
/// 2) tries to connect to disconnected nodes
|
|
||||||
/// 3) checks if enc/dec sessions are time-outed
|
|
||||||
const MAINTAIN_INTERVAL: u64 = 10;
|
|
||||||
|
|
||||||
/// When no messages have been received from node within KEEP_ALIVE_SEND_INTERVAL seconds,
|
|
||||||
/// we must send KeepAlive message to the node to check if it still responds to messages.
|
|
||||||
const KEEP_ALIVE_SEND_INTERVAL: Duration = Duration::from_secs(30);
|
|
||||||
/// When no messages have been received from node within KEEP_ALIVE_DISCONNECT_INTERVAL seconds,
|
|
||||||
/// we must treat this node as non-responding && disconnect from it.
|
|
||||||
const KEEP_ALIVE_DISCONNECT_INTERVAL: Duration = Duration::from_secs(60);
|
|
||||||
|
|
||||||
/// Network connection manager configuration.
|
|
||||||
pub struct NetConnectionsManagerConfig {
|
|
||||||
/// Allow connecting to 'higher' nodes.
|
|
||||||
pub allow_connecting_to_higher_nodes: bool,
|
|
||||||
/// Interface to listen to.
|
|
||||||
pub listen_address: (String, u16),
|
|
||||||
/// True if we should autostart key servers set change session when servers set changes?
|
|
||||||
/// This will only work when servers set is configured using KeyServerSet contract.
|
|
||||||
pub auto_migrate_enabled: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Network connections manager.
|
|
||||||
pub struct NetConnectionsManager {
|
|
||||||
/// Address we're listening for incoming connections.
|
|
||||||
listen_address: SocketAddr,
|
|
||||||
/// Shared cluster connections data reference.
|
|
||||||
data: Arc<NetConnectionsData>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Network connections data. Shared among NetConnectionsManager and spawned futures.
|
|
||||||
struct NetConnectionsData {
|
|
||||||
/// Allow connecting to 'higher' nodes.
|
|
||||||
allow_connecting_to_higher_nodes: bool,
|
|
||||||
/// Reference to tokio task executor.
|
|
||||||
executor: Executor,
|
|
||||||
/// Key pair of this node.
|
|
||||||
self_key_pair: Arc<NodeKeyPair>,
|
|
||||||
/// Network messages processor.
|
|
||||||
message_processor: Arc<MessageProcessor>,
|
|
||||||
/// Connections trigger.
|
|
||||||
trigger: Mutex<Box<ConnectionTrigger>>,
|
|
||||||
/// Mutable connection data.
|
|
||||||
container: Arc<RwLock<NetConnectionsContainer>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Network connections container. This is the only mutable data of NetConnectionsManager.
|
|
||||||
/// The set of nodes is mutated by the connection trigger and the connections set is also
|
|
||||||
/// mutated by spawned futures.
|
|
||||||
pub struct NetConnectionsContainer {
|
|
||||||
/// Is this node isolated from cluster?
|
|
||||||
pub is_isolated: bool,
|
|
||||||
/// Current key servers set.
|
|
||||||
pub nodes: BTreeMap<NodeId, SocketAddr>,
|
|
||||||
/// Active connections to key servers.
|
|
||||||
pub connections: BTreeMap<NodeId, Arc<NetConnection>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Network connection to single key server node.
|
|
||||||
pub struct NetConnection {
|
|
||||||
executor: Executor,
|
|
||||||
/// Id of the peer node.
|
|
||||||
node_id: NodeId,
|
|
||||||
/// Address of the peer node.
|
|
||||||
node_address: SocketAddr,
|
|
||||||
/// Is this inbound (true) or outbound (false) connection?
|
|
||||||
is_inbound: bool,
|
|
||||||
/// Key pair that is used to encrypt connection' messages.
|
|
||||||
key: KeyPair,
|
|
||||||
/// Last message time.
|
|
||||||
last_message_time: RwLock<Instant>,
|
|
||||||
/// Underlying TCP stream.
|
|
||||||
stream: SharedTcpStream,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl NetConnectionsManager {
|
|
||||||
/// Create new network connections manager.
|
|
||||||
pub fn new(
|
|
||||||
executor: Executor,
|
|
||||||
message_processor: Arc<MessageProcessor>,
|
|
||||||
trigger: Box<ConnectionTrigger>,
|
|
||||||
container: Arc<RwLock<NetConnectionsContainer>>,
|
|
||||||
config: &ClusterConfiguration,
|
|
||||||
net_config: NetConnectionsManagerConfig,
|
|
||||||
) -> Result<Self, Error> {
|
|
||||||
let listen_address = make_socket_address(
|
|
||||||
&net_config.listen_address.0,
|
|
||||||
net_config.listen_address.1)?;
|
|
||||||
|
|
||||||
Ok(NetConnectionsManager {
|
|
||||||
listen_address,
|
|
||||||
data: Arc::new(NetConnectionsData {
|
|
||||||
allow_connecting_to_higher_nodes: net_config.allow_connecting_to_higher_nodes,
|
|
||||||
executor,
|
|
||||||
message_processor,
|
|
||||||
self_key_pair: config.self_key_pair.clone(),
|
|
||||||
trigger: Mutex::new(trigger),
|
|
||||||
container,
|
|
||||||
}),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Start listening for connections and schedule connections maintenance.
|
|
||||||
pub fn start(&self) -> Result<(), Error> {
|
|
||||||
net_listen(&self.listen_address, self.data.clone())?;
|
|
||||||
net_schedule_maintain(self.data.clone());
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ConnectionManager for NetConnectionsManager {
|
|
||||||
fn provider(&self) -> Arc<ConnectionProvider> {
|
|
||||||
self.data.container.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn connect(&self) {
|
|
||||||
net_connect_disconnected(self.data.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ConnectionProvider for RwLock<NetConnectionsContainer> {
|
|
||||||
fn connected_nodes(&self) -> Result<BTreeSet<NodeId>, Error> {
|
|
||||||
let connections = self.read();
|
|
||||||
if connections.is_isolated {
|
|
||||||
return Err(Error::NodeDisconnected);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(connections.connections.keys().cloned().collect())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn disconnected_nodes(&self) -> BTreeSet<NodeId> {
|
|
||||||
let connections = self.read();
|
|
||||||
connections.nodes.keys()
|
|
||||||
.filter(|node_id| !connections.connections.contains_key(node_id))
|
|
||||||
.cloned()
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn connection(&self, node: &NodeId) -> Option<Arc<Connection>> {
|
|
||||||
match self.read().connections.get(node).cloned() {
|
|
||||||
Some(connection) => Some(connection),
|
|
||||||
None => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl NetConnection {
|
|
||||||
/// Create new connection.
|
|
||||||
pub fn new(executor: Executor, is_inbound: bool, connection: IoConnection) -> NetConnection {
|
|
||||||
NetConnection {
|
|
||||||
executor,
|
|
||||||
node_id: connection.node_id,
|
|
||||||
node_address: connection.address,
|
|
||||||
is_inbound: is_inbound,
|
|
||||||
stream: connection.stream,
|
|
||||||
key: connection.key,
|
|
||||||
last_message_time: RwLock::new(Instant::now()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get last message time.
|
|
||||||
pub fn last_message_time(&self) -> Instant {
|
|
||||||
*self.last_message_time.read()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Update last message time
|
|
||||||
pub fn set_last_message_time(&self, last_message_time: Instant) {
|
|
||||||
*self.last_message_time.write() = last_message_time
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns future that sends encrypted message over this connection.
|
|
||||||
pub fn send_message_future(&self, message: Message) -> WriteMessage<SharedTcpStream> {
|
|
||||||
write_encrypted_message(self.stream.clone(), &self.key, message)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns future that reads encrypted message from this connection.
|
|
||||||
pub fn read_message_future(&self) -> ReadMessage<SharedTcpStream> {
|
|
||||||
read_encrypted_message(self.stream.clone(), self.key.clone())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Connection for NetConnection {
|
|
||||||
fn is_inbound(&self) -> bool {
|
|
||||||
self.is_inbound
|
|
||||||
}
|
|
||||||
|
|
||||||
fn node_id(&self) -> &NodeId {
|
|
||||||
&self.node_id
|
|
||||||
}
|
|
||||||
|
|
||||||
fn node_address(&self) -> String {
|
|
||||||
format!("{}", self.node_address)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn send_message(&self, message: Message) {
|
|
||||||
execute(&self.executor, self.send_message_future(message).then(|_| Ok(())));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl NetConnectionsData {
|
|
||||||
/// Executes closure for each active connection.
|
|
||||||
pub fn active_connections(&self) -> Vec<Arc<NetConnection>> {
|
|
||||||
self.container.read().connections.values().cloned().collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Executes closure for each disconnected node.
|
|
||||||
pub fn disconnected_nodes(&self) -> Vec<(NodeId, SocketAddr)> {
|
|
||||||
let container = self.container.read();
|
|
||||||
container.nodes.iter()
|
|
||||||
.filter(|(node_id, _)| !container.connections.contains_key(node_id))
|
|
||||||
.map(|(node_id, addr)| (*node_id, *addr))
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to insert new connection. Returns true if connection has been inserted.
|
|
||||||
/// Returns false (and ignores connections) if:
|
|
||||||
/// - we do not expect connection from this node
|
|
||||||
/// - we are already connected to the node and existing connection 'supersede'
|
|
||||||
/// new connection by agreement
|
|
||||||
pub fn insert(&self, connection: Arc<NetConnection>) -> bool {
|
|
||||||
let node = *connection.node_id();
|
|
||||||
let mut container = self.container.write();
|
|
||||||
if !container.nodes.contains_key(&node) {
|
|
||||||
trace!(target: "secretstore_net", "{}: ignoring unknown connection from {} at {}",
|
|
||||||
self.self_key_pair.public(), node, connection.node_address());
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if container.connections.contains_key(&node) {
|
|
||||||
// we have already connected to the same node
|
|
||||||
// the agreement is that node with lower id must establish connection to node with higher id
|
|
||||||
if (*self.self_key_pair.public() < node && connection.is_inbound())
|
|
||||||
|| (*self.self_key_pair.public() > node && !connection.is_inbound()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
trace!(target: "secretstore_net",
|
|
||||||
"{}: inserting connection to {} at {}. Connected to {} of {} nodes",
|
|
||||||
self.self_key_pair.public(), node, connection.node_address(),
|
|
||||||
container.connections.len() + 1, container.nodes.len());
|
|
||||||
container.connections.insert(node, connection);
|
|
||||||
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Tries to remove connection. Returns true if connection has been removed.
|
|
||||||
/// Returns false if we do not know this connection.
|
|
||||||
pub fn remove(&self, connection: &NetConnection) -> bool {
|
|
||||||
let node_id = *connection.node_id();
|
|
||||||
let is_inbound = connection.is_inbound();
|
|
||||||
let mut container = self.container.write();
|
|
||||||
if let Entry::Occupied(entry) = container.connections.entry(node_id) {
|
|
||||||
if entry.get().is_inbound() != is_inbound {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
trace!(target: "secretstore_net", "{}: removing connection to {} at {}",
|
|
||||||
self.self_key_pair.public(), node_id, entry.get().node_address());
|
|
||||||
entry.remove_entry();
|
|
||||||
|
|
||||||
true
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Listen incoming connections.
|
|
||||||
fn net_listen(
|
|
||||||
listen_address: &SocketAddr,
|
|
||||||
data: Arc<NetConnectionsData>,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
execute(&data.executor, net_listen_future(listen_address, data.clone())?);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Listen incoming connections future.
|
|
||||||
fn net_listen_future(
|
|
||||||
listen_address: &SocketAddr,
|
|
||||||
data: Arc<NetConnectionsData>,
|
|
||||||
) -> Result<BoxedEmptyFuture, Error> {
|
|
||||||
Ok(Box::new(TcpListener::bind(listen_address)?
|
|
||||||
.incoming()
|
|
||||||
.and_then(move |stream| {
|
|
||||||
net_accept_connection(data.clone(), stream);
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
.for_each(|_| Ok(()))
|
|
||||||
.then(|_| future::ok(()))))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Accept incoming connection.
|
|
||||||
fn net_accept_connection(
|
|
||||||
data: Arc<NetConnectionsData>,
|
|
||||||
stream: TcpStream,
|
|
||||||
) {
|
|
||||||
execute(&data.executor, net_accept_connection_future(data.clone(), stream));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Accept incoming connection future.
|
|
||||||
fn net_accept_connection_future(data: Arc<NetConnectionsData>, stream: TcpStream) -> BoxedEmptyFuture {
|
|
||||||
Box::new(io_accept_connection(stream, data.self_key_pair.clone())
|
|
||||||
.then(move |result| net_process_connection_result(data, None, result))
|
|
||||||
.then(|_| future::ok(())))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Connect to remote node.
|
|
||||||
fn net_connect(
|
|
||||||
data: Arc<NetConnectionsData>,
|
|
||||||
remote: SocketAddr,
|
|
||||||
) {
|
|
||||||
execute(&data.executor, net_connect_future(data.clone(), remote));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Connect to remote node future.
|
|
||||||
fn net_connect_future(
|
|
||||||
data: Arc<NetConnectionsData>,
|
|
||||||
remote: SocketAddr,
|
|
||||||
) -> BoxedEmptyFuture {
|
|
||||||
let disconnected_nodes = data.container.disconnected_nodes();
|
|
||||||
Box::new(io_connect(&remote, data.self_key_pair.clone(), disconnected_nodes)
|
|
||||||
.then(move |result| net_process_connection_result(data, Some(remote), result))
|
|
||||||
.then(|_| future::ok(())))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Process network connection result.
|
|
||||||
fn net_process_connection_result(
|
|
||||||
data: Arc<NetConnectionsData>,
|
|
||||||
outbound_addr: Option<SocketAddr>,
|
|
||||||
result: Result<DeadlineStatus<Result<IoConnection, Error>>, TimeoutError<io::Error>>,
|
|
||||||
) -> IoFuture<Result<(), Error>> {
|
|
||||||
match result {
|
|
||||||
Ok(DeadlineStatus::Meet(Ok(connection))) => {
|
|
||||||
let connection = Arc::new(NetConnection::new(data.executor.clone(), outbound_addr.is_none(), connection));
|
|
||||||
if data.insert(connection.clone()) {
|
|
||||||
let maintain_action = data.trigger.lock().on_connection_established(connection.node_id());
|
|
||||||
maintain_connection_trigger(data.clone(), maintain_action);
|
|
||||||
|
|
||||||
return net_process_connection_messages(data, connection);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Ok(DeadlineStatus::Meet(Err(err))) => {
|
|
||||||
warn!(target: "secretstore_net", "{}: protocol error '{}' when establishing {} connection{}",
|
|
||||||
data.self_key_pair.public(), err, if outbound_addr.is_some() { "outbound" } else { "inbound" },
|
|
||||||
outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default());
|
|
||||||
},
|
|
||||||
Ok(DeadlineStatus::Timeout) => {
|
|
||||||
warn!(target: "secretstore_net", "{}: timeout when establishing {} connection{}",
|
|
||||||
data.self_key_pair.public(), if outbound_addr.is_some() { "outbound" } else { "inbound" },
|
|
||||||
outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default());
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
warn!(target: "secretstore_net", "{}: network error '{}' when establishing {} connection{}",
|
|
||||||
data.self_key_pair.public(), err, if outbound_addr.is_some() { "outbound" } else { "inbound" },
|
|
||||||
outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default());
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
Box::new(future::ok(Ok(())))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Process connection messages.
|
|
||||||
fn net_process_connection_messages(
|
|
||||||
data: Arc<NetConnectionsData>,
|
|
||||||
connection: Arc<NetConnection>,
|
|
||||||
) -> IoFuture<Result<(), Error>> {
|
|
||||||
Box::new(connection
|
|
||||||
.read_message_future()
|
|
||||||
.then(move |result|
|
|
||||||
match result {
|
|
||||||
Ok((_, Ok(message))) => {
|
|
||||||
connection.set_last_message_time(Instant::now());
|
|
||||||
data.message_processor.process_connection_message(connection.clone(), message);
|
|
||||||
// continue serving connection
|
|
||||||
let process_messages_future = net_process_connection_messages(
|
|
||||||
data.clone(), connection).then(|_| Ok(()));
|
|
||||||
execute(&data.executor, process_messages_future);
|
|
||||||
Box::new(future::ok(Ok(())))
|
|
||||||
},
|
|
||||||
Ok((_, Err(err))) => {
|
|
||||||
warn!(target: "secretstore_net", "{}: protocol error '{}' when reading message from node {}",
|
|
||||||
data.self_key_pair.public(), err, connection.node_id());
|
|
||||||
// continue serving connection
|
|
||||||
let process_messages_future = net_process_connection_messages(
|
|
||||||
data.clone(), connection).then(|_| Ok(()));
|
|
||||||
execute(&data.executor, process_messages_future);
|
|
||||||
Box::new(future::ok(Err(err)))
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
let node_id = *connection.node_id();
|
|
||||||
warn!(target: "secretstore_net", "{}: network error '{}' when reading message from node {}",
|
|
||||||
data.self_key_pair.public(), err, node_id);
|
|
||||||
// close connection
|
|
||||||
if data.remove(&*connection) {
|
|
||||||
let maintain_action = data.trigger.lock().on_connection_closed(&node_id);
|
|
||||||
maintain_connection_trigger(data, maintain_action);
|
|
||||||
}
|
|
||||||
Box::new(future::err(err))
|
|
||||||
},
|
|
||||||
}
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Schedule connections. maintain.
|
|
||||||
fn net_schedule_maintain(data: Arc<NetConnectionsData>) {
|
|
||||||
let closure_data = data.clone();
|
|
||||||
execute(&data.executor, Interval::new_interval(Duration::new(MAINTAIN_INTERVAL, 0))
|
|
||||||
.and_then(move |_| Ok(net_maintain(closure_data.clone())))
|
|
||||||
.for_each(|_| Ok(()))
|
|
||||||
.then(|_| future::ok(())));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Maintain network connections.
|
|
||||||
fn net_maintain(data: Arc<NetConnectionsData>) {
|
|
||||||
trace!(target: "secretstore_net", "{}: executing maintain procedures", data.self_key_pair.public());
|
|
||||||
|
|
||||||
update_nodes_set(data.clone());
|
|
||||||
data.message_processor.maintain_sessions();
|
|
||||||
net_keep_alive(data.clone());
|
|
||||||
net_connect_disconnected(data);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Send keep alive messages to remote nodes.
|
|
||||||
fn net_keep_alive(data: Arc<NetConnectionsData>) {
|
|
||||||
let now = Instant::now();
|
|
||||||
let active_connections = data.active_connections();
|
|
||||||
for connection in active_connections {
|
|
||||||
let last_message_diff = now - connection.last_message_time();
|
|
||||||
if last_message_diff > KEEP_ALIVE_DISCONNECT_INTERVAL {
|
|
||||||
warn!(target: "secretstore_net", "{}: keep alive timeout for node {}",
|
|
||||||
data.self_key_pair.public(), connection.node_id());
|
|
||||||
|
|
||||||
let node_id = *connection.node_id();
|
|
||||||
if data.remove(&*connection) {
|
|
||||||
let maintain_action = data.trigger.lock().on_connection_closed(&node_id);
|
|
||||||
maintain_connection_trigger(data.clone(), maintain_action);
|
|
||||||
}
|
|
||||||
data.message_processor.process_disconnect(&node_id);
|
|
||||||
}
|
|
||||||
else if last_message_diff > KEEP_ALIVE_SEND_INTERVAL {
|
|
||||||
connection.send_message(Message::Cluster(ClusterMessage::KeepAlive(message::KeepAlive {})));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Connect disconnected nodes.
|
|
||||||
fn net_connect_disconnected(data: Arc<NetConnectionsData>) {
|
|
||||||
let disconnected_nodes = data.disconnected_nodes();
|
|
||||||
for (node_id, address) in disconnected_nodes {
|
|
||||||
if data.allow_connecting_to_higher_nodes || *data.self_key_pair.public() < node_id {
|
|
||||||
net_connect(data.clone(), address);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Schedule future execution.
|
|
||||||
fn execute<F: Future<Item = (), Error = ()> + Send + 'static>(executor: &Executor, f: F) {
|
|
||||||
if let Err(err) = future::Executor::execute(executor, Box::new(f)) {
|
|
||||||
error!("Secret store runtime unable to spawn task. Runtime is shutting down. ({:?})", err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to update active nodes set from connection trigger.
|
|
||||||
fn update_nodes_set(data: Arc<NetConnectionsData>) {
|
|
||||||
let maintain_action = data.trigger.lock().on_maintain();
|
|
||||||
maintain_connection_trigger(data, maintain_action);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Execute maintain procedures of connections trigger.
|
|
||||||
fn maintain_connection_trigger(data: Arc<NetConnectionsData>, maintain_action: Option<Maintain>) {
|
|
||||||
if maintain_action == Some(Maintain::SessionAndConnections) || maintain_action == Some(Maintain::Session) {
|
|
||||||
let session_params = data.trigger.lock().maintain_session();
|
|
||||||
if let Some(session_params) = session_params {
|
|
||||||
let session = data.message_processor.start_servers_set_change_session(session_params);
|
|
||||||
match session {
|
|
||||||
Ok(_) => trace!(target: "secretstore_net", "{}: started auto-migrate session",
|
|
||||||
data.self_key_pair.public()),
|
|
||||||
Err(err) => trace!(target: "secretstore_net", "{}: failed to start auto-migrate session with: {}",
|
|
||||||
data.self_key_pair.public(), err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if maintain_action == Some(Maintain::SessionAndConnections) || maintain_action == Some(Maintain::Connections) {
|
|
||||||
let mut trigger = data.trigger.lock();
|
|
||||||
let mut data = data.container.write();
|
|
||||||
trigger.maintain_connections(&mut *data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Compose SocketAddr from configuration' address and port.
|
|
||||||
fn make_socket_address(address: &str, port: u16) -> Result<SocketAddr, Error> {
|
|
||||||
let ip_address: IpAddr = address.parse().map_err(|_| Error::InvalidNodeAddress)?;
|
|
||||||
Ok(SocketAddr::new(ip_address, port))
|
|
||||||
}
|
|
@ -1,357 +0,0 @@
|
|||||||
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
|
|
||||||
// This file is part of Parity.
|
|
||||||
|
|
||||||
// Parity is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// Parity is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU General Public License
|
|
||||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
use std::sync::Arc;
|
|
||||||
use key_server_cluster::{Error, NodeId, NodeKeyPair};
|
|
||||||
use key_server_cluster::cluster::{ServersSetChangeParams, new_servers_set_change_session};
|
|
||||||
use key_server_cluster::cluster_sessions::{AdminSession};
|
|
||||||
use key_server_cluster::cluster_connections::{ConnectionProvider, Connection};
|
|
||||||
use key_server_cluster::cluster_sessions::{ClusterSession, ClusterSessions, ClusterSessionsContainer,
|
|
||||||
create_cluster_view};
|
|
||||||
use key_server_cluster::cluster_sessions_creator::{ClusterSessionCreator, IntoSessionId};
|
|
||||||
use key_server_cluster::message::{self, Message, ClusterMessage};
|
|
||||||
use key_server_cluster::key_version_negotiation_session::{SessionImpl as KeyVersionNegotiationSession,
|
|
||||||
IsolatedSessionTransport as KeyVersionNegotiationSessionTransport, ContinueAction};
|
|
||||||
use key_server_cluster::connection_trigger::ServersSetChangeSessionCreatorConnector;
|
|
||||||
|
|
||||||
/// Something that is able to process signals/messages from other nodes.
|
|
||||||
pub trait MessageProcessor: Send + Sync {
|
|
||||||
/// Process disconnect from the remote node.
|
|
||||||
fn process_disconnect(&self, node: &NodeId);
|
|
||||||
/// Process single message from the connection.
|
|
||||||
fn process_connection_message(&self, connection: Arc<Connection>, message: Message);
|
|
||||||
|
|
||||||
/// Start servers set change session. This is typically used by ConnectionManager when
|
|
||||||
/// it detects that auto-migration session needs to be started.
|
|
||||||
fn start_servers_set_change_session(&self, params: ServersSetChangeParams) -> Result<Arc<AdminSession>, Error>;
|
|
||||||
/// Try to continue session after key version negotiation session is completed.
|
|
||||||
fn try_continue_session(
|
|
||||||
&self,
|
|
||||||
session: Option<Arc<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>>
|
|
||||||
);
|
|
||||||
/// Maintain active sessions. Typically called by the ConnectionManager at some intervals.
|
|
||||||
/// Should cancel stalled sessions and send keep-alive messages for sessions that support it.
|
|
||||||
fn maintain_sessions(&self);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Bridge between ConnectionManager and ClusterSessions.
|
|
||||||
pub struct SessionsMessageProcessor {
|
|
||||||
self_key_pair: Arc<NodeKeyPair>,
|
|
||||||
servers_set_change_creator_connector: Arc<ServersSetChangeSessionCreatorConnector>,
|
|
||||||
sessions: Arc<ClusterSessions>,
|
|
||||||
connections: Arc<ConnectionProvider>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SessionsMessageProcessor {
|
|
||||||
/// Create new instance of SessionsMessageProcessor.
|
|
||||||
pub fn new(
|
|
||||||
self_key_pair: Arc<NodeKeyPair>,
|
|
||||||
servers_set_change_creator_connector: Arc<ServersSetChangeSessionCreatorConnector>,
|
|
||||||
sessions: Arc<ClusterSessions>,
|
|
||||||
connections: Arc<ConnectionProvider>,
|
|
||||||
) -> Self {
|
|
||||||
SessionsMessageProcessor {
|
|
||||||
self_key_pair,
|
|
||||||
servers_set_change_creator_connector,
|
|
||||||
sessions,
|
|
||||||
connections,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Process single session message from connection.
|
|
||||||
fn process_message<S: ClusterSession, SC: ClusterSessionCreator<S, D>, D>(
|
|
||||||
&self,
|
|
||||||
sessions: &ClusterSessionsContainer<S, SC, D>,
|
|
||||||
connection: Arc<Connection>,
|
|
||||||
mut message: Message,
|
|
||||||
) -> Option<Arc<S>>
|
|
||||||
where
|
|
||||||
Message: IntoSessionId<S::Id>
|
|
||||||
{
|
|
||||||
// get or create new session, if required
|
|
||||||
let mut sender = *connection.node_id();
|
|
||||||
let session = self.prepare_session(sessions, &sender, &message);
|
|
||||||
// send error if session is not found, or failed to create
|
|
||||||
let session = match session {
|
|
||||||
Ok(session) => session,
|
|
||||||
Err(error) => {
|
|
||||||
// this is new session => it is not yet in container
|
|
||||||
warn!(target: "secretstore_net",
|
|
||||||
"{}: {} session read error '{}' when requested for session from node {}",
|
|
||||||
self.self_key_pair.public(), S::type_name(), error, sender);
|
|
||||||
if !message.is_error_message() {
|
|
||||||
let qed = "session_id only fails for cluster messages;
|
|
||||||
only session messages are passed to process_message;
|
|
||||||
qed";
|
|
||||||
let session_id = message.into_session_id().expect(qed);
|
|
||||||
let session_nonce = message.session_nonce().expect(qed);
|
|
||||||
|
|
||||||
connection.send_message(SC::make_error_message(session_id, session_nonce, error));
|
|
||||||
}
|
|
||||||
return None;
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
let session_id = session.id();
|
|
||||||
let mut is_queued_message = false;
|
|
||||||
loop {
|
|
||||||
let message_result = session.on_message(&sender, &message);
|
|
||||||
match message_result {
|
|
||||||
Ok(_) => {
|
|
||||||
// if session is completed => stop
|
|
||||||
if session.is_finished() {
|
|
||||||
info!(target: "secretstore_net",
|
|
||||||
"{}: {} session completed", self.self_key_pair.public(), S::type_name());
|
|
||||||
sessions.remove(&session_id);
|
|
||||||
return Some(session);
|
|
||||||
}
|
|
||||||
|
|
||||||
// try to dequeue message
|
|
||||||
match sessions.dequeue_message(&session_id) {
|
|
||||||
Some((msg_sender, msg)) => {
|
|
||||||
is_queued_message = true;
|
|
||||||
sender = msg_sender;
|
|
||||||
message = msg;
|
|
||||||
},
|
|
||||||
None => return Some(session),
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(Error::TooEarlyForRequest) => {
|
|
||||||
sessions.enqueue_message(&session_id, sender, message, is_queued_message);
|
|
||||||
return Some(session);
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
warn!(
|
|
||||||
target: "secretstore_net",
|
|
||||||
"{}: {} session error '{}' when processing message {} from node {}",
|
|
||||||
self.self_key_pair.public(),
|
|
||||||
S::type_name(),
|
|
||||||
err,
|
|
||||||
message,
|
|
||||||
sender);
|
|
||||||
session.on_session_error(self.self_key_pair.public(), err);
|
|
||||||
sessions.remove(&session_id);
|
|
||||||
return Some(session);
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get or insert new session.
|
|
||||||
fn prepare_session<S: ClusterSession, SC: ClusterSessionCreator<S, D>, D>(
|
|
||||||
&self,
|
|
||||||
sessions: &ClusterSessionsContainer<S, SC, D>,
|
|
||||||
sender: &NodeId,
|
|
||||||
message: &Message
|
|
||||||
) -> Result<Arc<S>, Error>
|
|
||||||
where
|
|
||||||
Message: IntoSessionId<S::Id>
|
|
||||||
{
|
|
||||||
fn requires_all_connections(message: &Message) -> bool {
|
|
||||||
match *message {
|
|
||||||
Message::Generation(_) => true,
|
|
||||||
Message::ShareAdd(_) => true,
|
|
||||||
Message::ServersSetChange(_) => true,
|
|
||||||
_ => false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// get or create new session, if required
|
|
||||||
let session_id = message.into_session_id()
|
|
||||||
.expect("into_session_id fails for cluster messages only;
|
|
||||||
only session messages are passed to prepare_session;
|
|
||||||
qed");
|
|
||||||
let is_initialization_message = message.is_initialization_message();
|
|
||||||
let is_delegation_message = message.is_delegation_message();
|
|
||||||
match is_initialization_message || is_delegation_message {
|
|
||||||
false => sessions.get(&session_id, true).ok_or(Error::NoActiveSessionWithId),
|
|
||||||
true => {
|
|
||||||
let creation_data = SC::creation_data_from_message(&message)?;
|
|
||||||
let master = if is_initialization_message {
|
|
||||||
*sender
|
|
||||||
} else {
|
|
||||||
*self.self_key_pair.public()
|
|
||||||
};
|
|
||||||
let cluster = create_cluster_view(
|
|
||||||
self.self_key_pair.clone(),
|
|
||||||
self.connections.clone(),
|
|
||||||
requires_all_connections(&message))?;
|
|
||||||
|
|
||||||
let nonce = Some(message.session_nonce().ok_or(Error::InvalidMessage)?);
|
|
||||||
let exclusive = message.is_exclusive_session_message();
|
|
||||||
sessions.insert(cluster, master, session_id, nonce, exclusive, creation_data)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Process single cluster message from the connection.
|
|
||||||
fn process_cluster_message(&self, connection: Arc<Connection>, message: ClusterMessage) {
|
|
||||||
match message {
|
|
||||||
ClusterMessage::KeepAlive(_) => {
|
|
||||||
let msg = Message::Cluster(ClusterMessage::KeepAliveResponse(message::KeepAliveResponse {
|
|
||||||
session_id: None,
|
|
||||||
}));
|
|
||||||
connection.send_message(msg)
|
|
||||||
},
|
|
||||||
ClusterMessage::KeepAliveResponse(msg) => if let Some(session_id) = msg.session_id {
|
|
||||||
self.sessions.on_session_keep_alive(connection.node_id(), session_id.into());
|
|
||||||
},
|
|
||||||
_ => warn!(target: "secretstore_net", "{}: received unexpected message {} from node {} at {}",
|
|
||||||
self.self_key_pair.public(), message, connection.node_id(), connection.node_address()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MessageProcessor for SessionsMessageProcessor {
|
|
||||||
fn process_disconnect(&self, node: &NodeId) {
|
|
||||||
self.sessions.on_connection_timeout(node);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn process_connection_message(&self, connection: Arc<Connection>, message: Message) {
|
|
||||||
trace!(target: "secretstore_net", "{}: received message {} from {}",
|
|
||||||
self.self_key_pair.public(), message, connection.node_id());
|
|
||||||
|
|
||||||
// error is ignored as we only process errors on session level
|
|
||||||
match message {
|
|
||||||
Message::Generation(message) => self
|
|
||||||
.process_message(&self.sessions.generation_sessions, connection, Message::Generation(message))
|
|
||||||
.map(|_| ()).unwrap_or_default(),
|
|
||||||
Message::Encryption(message) => self
|
|
||||||
.process_message(&self.sessions.encryption_sessions, connection, Message::Encryption(message))
|
|
||||||
.map(|_| ()).unwrap_or_default(),
|
|
||||||
Message::Decryption(message) => self
|
|
||||||
.process_message(&self.sessions.decryption_sessions, connection, Message::Decryption(message))
|
|
||||||
.map(|_| ()).unwrap_or_default(),
|
|
||||||
Message::SchnorrSigning(message) => self
|
|
||||||
.process_message(&self.sessions.schnorr_signing_sessions, connection, Message::SchnorrSigning(message))
|
|
||||||
.map(|_| ()).unwrap_or_default(),
|
|
||||||
Message::EcdsaSigning(message) => self
|
|
||||||
.process_message(&self.sessions.ecdsa_signing_sessions, connection, Message::EcdsaSigning(message))
|
|
||||||
.map(|_| ()).unwrap_or_default(),
|
|
||||||
Message::ServersSetChange(message) => {
|
|
||||||
let message = Message::ServersSetChange(message);
|
|
||||||
let is_initialization_message = message.is_initialization_message();
|
|
||||||
let session = self.process_message(&self.sessions.admin_sessions, connection, message);
|
|
||||||
if is_initialization_message {
|
|
||||||
if let Some(session) = session {
|
|
||||||
self.servers_set_change_creator_connector
|
|
||||||
.set_key_servers_set_change_session(session.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Message::KeyVersionNegotiation(message) => {
|
|
||||||
let session = self.process_message(
|
|
||||||
&self.sessions.negotiation_sessions, connection, Message::KeyVersionNegotiation(message));
|
|
||||||
self.try_continue_session(session);
|
|
||||||
},
|
|
||||||
Message::ShareAdd(message) => self.process_message(
|
|
||||||
&self.sessions.admin_sessions, connection, Message::ShareAdd(message))
|
|
||||||
.map(|_| ()).unwrap_or_default(),
|
|
||||||
Message::Cluster(message) => self.process_cluster_message(connection, message),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn try_continue_session(
|
|
||||||
&self,
|
|
||||||
session: Option<Arc<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>>
|
|
||||||
) {
|
|
||||||
if let Some(session) = session {
|
|
||||||
let meta = session.meta();
|
|
||||||
let is_master_node = meta.self_node_id == meta.master_node_id;
|
|
||||||
if is_master_node && session.is_finished() {
|
|
||||||
self.sessions.negotiation_sessions.remove(&session.id());
|
|
||||||
match session.wait() {
|
|
||||||
Ok(Some((version, master))) => match session.take_continue_action() {
|
|
||||||
Some(ContinueAction::Decrypt(
|
|
||||||
session, origin, is_shadow_decryption, is_broadcast_decryption
|
|
||||||
)) => {
|
|
||||||
let initialization_error = if self.self_key_pair.public() == &master {
|
|
||||||
session.initialize(
|
|
||||||
origin, version, is_shadow_decryption, is_broadcast_decryption)
|
|
||||||
} else {
|
|
||||||
session.delegate(
|
|
||||||
master, origin, version, is_shadow_decryption, is_broadcast_decryption)
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Err(error) = initialization_error {
|
|
||||||
session.on_session_error(&meta.self_node_id, error);
|
|
||||||
self.sessions.decryption_sessions.remove(&session.id());
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Some(ContinueAction::SchnorrSign(session, message_hash)) => {
|
|
||||||
let initialization_error = if self.self_key_pair.public() == &master {
|
|
||||||
session.initialize(version, message_hash)
|
|
||||||
} else {
|
|
||||||
session.delegate(master, version, message_hash)
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Err(error) = initialization_error {
|
|
||||||
session.on_session_error(&meta.self_node_id, error);
|
|
||||||
self.sessions.schnorr_signing_sessions.remove(&session.id());
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Some(ContinueAction::EcdsaSign(session, message_hash)) => {
|
|
||||||
let initialization_error = if self.self_key_pair.public() == &master {
|
|
||||||
session.initialize(version, message_hash)
|
|
||||||
} else {
|
|
||||||
session.delegate(master, version, message_hash)
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Err(error) = initialization_error {
|
|
||||||
session.on_session_error(&meta.self_node_id, error);
|
|
||||||
self.sessions.ecdsa_signing_sessions.remove(&session.id());
|
|
||||||
}
|
|
||||||
},
|
|
||||||
None => (),
|
|
||||||
},
|
|
||||||
Ok(None) => unreachable!("is_master_node; session is finished;
|
|
||||||
negotiation version always finished with result on master;
|
|
||||||
qed"),
|
|
||||||
Err(error) => match session.take_continue_action() {
|
|
||||||
Some(ContinueAction::Decrypt(session, _, _, _)) => {
|
|
||||||
session.on_session_error(&meta.self_node_id, error);
|
|
||||||
self.sessions.decryption_sessions.remove(&session.id());
|
|
||||||
},
|
|
||||||
Some(ContinueAction::SchnorrSign(session, _)) => {
|
|
||||||
session.on_session_error(&meta.self_node_id, error);
|
|
||||||
self.sessions.schnorr_signing_sessions.remove(&session.id());
|
|
||||||
},
|
|
||||||
Some(ContinueAction::EcdsaSign(session, _)) => {
|
|
||||||
session.on_session_error(&meta.self_node_id, error);
|
|
||||||
self.sessions.ecdsa_signing_sessions.remove(&session.id());
|
|
||||||
},
|
|
||||||
None => (),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn maintain_sessions(&self) {
|
|
||||||
self.sessions.stop_stalled_sessions();
|
|
||||||
self.sessions.sessions_keep_alive();
|
|
||||||
}
|
|
||||||
|
|
||||||
fn start_servers_set_change_session(&self, params: ServersSetChangeParams) -> Result<Arc<AdminSession>, Error> {
|
|
||||||
new_servers_set_change_session(
|
|
||||||
self.self_key_pair.clone(),
|
|
||||||
&*self.sessions,
|
|
||||||
self.connections.clone(),
|
|
||||||
self.servers_set_change_creator_connector.clone(),
|
|
||||||
params,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user