938 lines
32 KiB
Rust
938 lines
32 KiB
Rust
// Copyright 2015-2020 Parity Technologies (UK) Ltd.
|
|
// This file is part of OpenEthereum.
|
|
|
|
// OpenEthereum is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
|
|
// OpenEthereum is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with OpenEthereum. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
use ethereum_types::H256;
|
|
use ethkey::Secret;
|
|
use key_server_cluster::{
|
|
cluster::{Cluster, ClusterConfiguration, ClusterView},
|
|
cluster_connections::ConnectionProvider,
|
|
connection_trigger::ServersSetChangeSessionCreatorConnector,
|
|
decryption_session::SessionImpl as DecryptionSessionImpl,
|
|
encryption_session::SessionImpl as EncryptionSessionImpl,
|
|
generation_session::SessionImpl as GenerationSessionImpl,
|
|
key_version_negotiation_session::{
|
|
IsolatedSessionTransport as VersionNegotiationTransport,
|
|
SessionImpl as KeyVersionNegotiationSessionImpl,
|
|
},
|
|
message::{self, Message},
|
|
servers_set_change_session::SessionImpl as ServersSetChangeSessionImpl,
|
|
share_add_session::{
|
|
IsolatedSessionTransport as ShareAddTransport, SessionImpl as ShareAddSessionImpl,
|
|
},
|
|
signing_session_ecdsa::SessionImpl as EcdsaSigningSessionImpl,
|
|
signing_session_schnorr::SessionImpl as SchnorrSigningSessionImpl,
|
|
Error, NodeId, NodeKeyPair, Requester, SessionId,
|
|
};
|
|
use parking_lot::{Condvar, Mutex, RwLock};
|
|
use std::{
|
|
collections::{BTreeMap, BTreeSet, VecDeque},
|
|
sync::{atomic::AtomicBool, Arc, Weak},
|
|
time::{Duration, Instant},
|
|
};
|
|
|
|
use key_server_cluster::cluster_sessions_creator::{
|
|
AdminSessionCreator, ClusterSessionCreator, DecryptionSessionCreator,
|
|
EcdsaSigningSessionCreator, EncryptionSessionCreator, GenerationSessionCreator,
|
|
KeyVersionNegotiationSessionCreator, SchnorrSigningSessionCreator, SessionCreatorCore,
|
|
};
|
|
|
|
/// When there are no session-related messages for SESSION_TIMEOUT_INTERVAL seconds,
|
|
/// we must treat this session as stalled && finish it with an error.
|
|
/// This timeout is for cases when node is responding to KeepAlive messages, but intentionally ignores
|
|
/// session messages.
|
|
const SESSION_TIMEOUT_INTERVAL: Duration = Duration::from_secs(60);
|
|
/// Interval to send session-level KeepAlive-messages.
|
|
const SESSION_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(30);
|
|
|
|
lazy_static! {
|
|
/// Servers set change session id (there could be at most 1 session => hardcoded id).
|
|
pub static ref SERVERS_SET_CHANGE_SESSION_ID: SessionId = "10b7af423bb551d5dc8645db754163a2145d37d78d468fa7330435ed77064c1c"
|
|
.parse()
|
|
.expect("hardcoded id should parse without errors; qed");
|
|
}
|
|
|
|
/// Session id with sub session.
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
pub struct SessionIdWithSubSession {
|
|
/// Key id.
|
|
pub id: SessionId,
|
|
/// Sub session id.
|
|
pub access_key: Secret,
|
|
}
|
|
|
|
/// Generic cluster session.
|
|
pub trait ClusterSession {
|
|
/// Session identifier type.
|
|
type Id: ::std::fmt::Debug + Ord + Clone;
|
|
|
|
/// Session type name.
|
|
fn type_name() -> &'static str;
|
|
/// Get session id.
|
|
fn id(&self) -> Self::Id;
|
|
/// If session is finished (either with succcess or not).
|
|
fn is_finished(&self) -> bool;
|
|
/// When it takes too much time to complete session.
|
|
fn on_session_timeout(&self);
|
|
/// When it takes too much time to receive response from the node.
|
|
fn on_node_timeout(&self, node_id: &NodeId);
|
|
/// Process error that has occured during session + propagate this error to required nodes.
|
|
fn on_session_error(&self, sender: &NodeId, error: Error);
|
|
/// Process session message.
|
|
fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error>;
|
|
|
|
/// 'Wait for session completion' helper.
|
|
fn wait_session<T, U, F: Fn(&U) -> Option<Result<T, Error>>>(
|
|
completion_event: &Condvar,
|
|
session_data: &Mutex<U>,
|
|
timeout: Option<Duration>,
|
|
result_reader: F,
|
|
) -> Option<Result<T, Error>> {
|
|
let mut locked_data = session_data.lock();
|
|
match result_reader(&locked_data) {
|
|
Some(result) => Some(result),
|
|
None => {
|
|
match timeout {
|
|
None => completion_event.wait(&mut locked_data),
|
|
Some(timeout) => {
|
|
completion_event.wait_for(&mut locked_data, timeout);
|
|
}
|
|
}
|
|
|
|
result_reader(&locked_data)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Administrative session.
|
|
pub enum AdminSession {
|
|
/// Share add session.
|
|
ShareAdd(ShareAddSessionImpl<ShareAddTransport>),
|
|
/// Servers set change session.
|
|
ServersSetChange(ServersSetChangeSessionImpl),
|
|
}
|
|
|
|
/// Administrative session creation data.
|
|
pub enum AdminSessionCreationData {
|
|
/// Share add session (key id).
|
|
ShareAdd(H256),
|
|
/// Servers set change session (block id, new_server_set).
|
|
ServersSetChange(Option<H256>, BTreeSet<NodeId>),
|
|
}
|
|
|
|
/// Active sessions on this cluster.
|
|
pub struct ClusterSessions {
|
|
/// Key generation sessions.
|
|
pub generation_sessions:
|
|
ClusterSessionsContainer<GenerationSessionImpl, GenerationSessionCreator, ()>,
|
|
/// Encryption sessions.
|
|
pub encryption_sessions:
|
|
ClusterSessionsContainer<EncryptionSessionImpl, EncryptionSessionCreator, ()>,
|
|
/// Decryption sessions.
|
|
pub decryption_sessions:
|
|
ClusterSessionsContainer<DecryptionSessionImpl, DecryptionSessionCreator, Requester>,
|
|
/// Schnorr signing sessions.
|
|
pub schnorr_signing_sessions: ClusterSessionsContainer<
|
|
SchnorrSigningSessionImpl,
|
|
SchnorrSigningSessionCreator,
|
|
Requester,
|
|
>,
|
|
/// ECDSA signing sessions.
|
|
pub ecdsa_signing_sessions:
|
|
ClusterSessionsContainer<EcdsaSigningSessionImpl, EcdsaSigningSessionCreator, Requester>,
|
|
/// Key version negotiation sessions.
|
|
pub negotiation_sessions: ClusterSessionsContainer<
|
|
KeyVersionNegotiationSessionImpl<VersionNegotiationTransport>,
|
|
KeyVersionNegotiationSessionCreator,
|
|
(),
|
|
>,
|
|
/// Administrative sessions.
|
|
pub admin_sessions:
|
|
ClusterSessionsContainer<AdminSession, AdminSessionCreator, AdminSessionCreationData>,
|
|
/// Self node id.
|
|
self_node_id: NodeId,
|
|
/// Creator core.
|
|
creator_core: Arc<SessionCreatorCore>,
|
|
}
|
|
|
|
/// Active sessions container listener.
|
|
pub trait ClusterSessionsListener<S: ClusterSession>: Send + Sync {
|
|
/// When new session is inserted to the container.
|
|
fn on_session_inserted(&self, _session: Arc<S>) {}
|
|
/// When session is removed from the container.
|
|
fn on_session_removed(&self, _session: Arc<S>) {}
|
|
}
|
|
|
|
/// Active sessions container.
|
|
pub struct ClusterSessionsContainer<S: ClusterSession, SC: ClusterSessionCreator<S, D>, D> {
|
|
/// Sessions creator.
|
|
pub creator: SC,
|
|
/// Active sessions.
|
|
sessions: RwLock<BTreeMap<S::Id, QueuedSession<S>>>,
|
|
/// Listeners. Lock order: sessions -> listeners.
|
|
listeners: Mutex<Vec<Weak<dyn ClusterSessionsListener<S>>>>,
|
|
/// Sessions container state.
|
|
container_state: Arc<Mutex<ClusterSessionsContainerState>>,
|
|
/// Do not actually remove sessions.
|
|
preserve_sessions: bool,
|
|
/// Phantom data.
|
|
_pd: ::std::marker::PhantomData<D>,
|
|
}
|
|
|
|
/// Session and its message queue.
|
|
pub struct QueuedSession<S> {
|
|
/// Session master.
|
|
pub master: NodeId,
|
|
/// Cluster view.
|
|
pub cluster_view: Arc<dyn Cluster>,
|
|
/// Last keep alive time.
|
|
pub last_keep_alive_time: Instant,
|
|
/// Last received message time.
|
|
pub last_message_time: Instant,
|
|
/// Generation session.
|
|
pub session: Arc<S>,
|
|
/// Messages queue.
|
|
pub queue: VecDeque<(NodeId, Message)>,
|
|
}
|
|
|
|
/// Cluster sessions container state.
|
|
#[derive(Debug, Clone, Copy, PartialEq)]
|
|
pub enum ClusterSessionsContainerState {
|
|
/// There's no active sessions => any session can be started.
|
|
Idle,
|
|
/// There are active sessions => exclusive session can't be started right now.
|
|
Active(usize),
|
|
/// Exclusive session is active => can't start any other sessions.
|
|
Exclusive,
|
|
}
|
|
|
|
impl ClusterSessions {
|
|
/// Create new cluster sessions container.
|
|
pub fn new(
|
|
config: &ClusterConfiguration,
|
|
servers_set_change_session_creator_connector: Arc<
|
|
dyn ServersSetChangeSessionCreatorConnector,
|
|
>,
|
|
) -> Self {
|
|
let container_state = Arc::new(Mutex::new(ClusterSessionsContainerState::Idle));
|
|
let creator_core = Arc::new(SessionCreatorCore::new(config));
|
|
ClusterSessions {
|
|
self_node_id: config.self_key_pair.public().clone(),
|
|
generation_sessions: ClusterSessionsContainer::new(
|
|
GenerationSessionCreator {
|
|
core: creator_core.clone(),
|
|
make_faulty_generation_sessions: AtomicBool::new(false),
|
|
},
|
|
container_state.clone(),
|
|
),
|
|
encryption_sessions: ClusterSessionsContainer::new(
|
|
EncryptionSessionCreator {
|
|
core: creator_core.clone(),
|
|
},
|
|
container_state.clone(),
|
|
),
|
|
decryption_sessions: ClusterSessionsContainer::new(
|
|
DecryptionSessionCreator {
|
|
core: creator_core.clone(),
|
|
},
|
|
container_state.clone(),
|
|
),
|
|
schnorr_signing_sessions: ClusterSessionsContainer::new(
|
|
SchnorrSigningSessionCreator {
|
|
core: creator_core.clone(),
|
|
},
|
|
container_state.clone(),
|
|
),
|
|
ecdsa_signing_sessions: ClusterSessionsContainer::new(
|
|
EcdsaSigningSessionCreator {
|
|
core: creator_core.clone(),
|
|
},
|
|
container_state.clone(),
|
|
),
|
|
negotiation_sessions: ClusterSessionsContainer::new(
|
|
KeyVersionNegotiationSessionCreator {
|
|
core: creator_core.clone(),
|
|
},
|
|
container_state.clone(),
|
|
),
|
|
admin_sessions: ClusterSessionsContainer::new(
|
|
AdminSessionCreator {
|
|
core: creator_core.clone(),
|
|
servers_set_change_session_creator_connector:
|
|
servers_set_change_session_creator_connector,
|
|
admin_public: config.admin_public.clone(),
|
|
},
|
|
container_state,
|
|
),
|
|
creator_core: creator_core,
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
pub fn make_faulty_generation_sessions(&self) {
|
|
self.generation_sessions
|
|
.creator
|
|
.make_faulty_generation_sessions();
|
|
}
|
|
|
|
#[cfg(test)]
|
|
pub fn preserve_sessions(&mut self) {
|
|
self.generation_sessions.preserve_sessions = true;
|
|
self.encryption_sessions.preserve_sessions = true;
|
|
self.decryption_sessions.preserve_sessions = true;
|
|
self.schnorr_signing_sessions.preserve_sessions = true;
|
|
self.ecdsa_signing_sessions.preserve_sessions = true;
|
|
self.negotiation_sessions.preserve_sessions = true;
|
|
self.admin_sessions.preserve_sessions = true;
|
|
}
|
|
|
|
/// Send session-level keep-alive messages.
|
|
pub fn sessions_keep_alive(&self) {
|
|
self.admin_sessions
|
|
.send_keep_alive(&*SERVERS_SET_CHANGE_SESSION_ID, &self.self_node_id);
|
|
}
|
|
|
|
/// When session-level keep-alive response is received.
|
|
pub fn on_session_keep_alive(&self, sender: &NodeId, session_id: SessionId) {
|
|
if session_id == *SERVERS_SET_CHANGE_SESSION_ID {
|
|
self.admin_sessions.on_keep_alive(&session_id, sender);
|
|
}
|
|
}
|
|
|
|
/// Stop sessions that are stalling.
|
|
pub fn stop_stalled_sessions(&self) {
|
|
self.generation_sessions.stop_stalled_sessions();
|
|
self.encryption_sessions.stop_stalled_sessions();
|
|
self.decryption_sessions.stop_stalled_sessions();
|
|
self.schnorr_signing_sessions.stop_stalled_sessions();
|
|
self.ecdsa_signing_sessions.stop_stalled_sessions();
|
|
self.negotiation_sessions.stop_stalled_sessions();
|
|
self.admin_sessions.stop_stalled_sessions();
|
|
}
|
|
|
|
/// When connection to node is lost.
|
|
pub fn on_connection_timeout(&self, node_id: &NodeId) {
|
|
self.generation_sessions.on_connection_timeout(node_id);
|
|
self.encryption_sessions.on_connection_timeout(node_id);
|
|
self.decryption_sessions.on_connection_timeout(node_id);
|
|
self.schnorr_signing_sessions.on_connection_timeout(node_id);
|
|
self.ecdsa_signing_sessions.on_connection_timeout(node_id);
|
|
self.negotiation_sessions.on_connection_timeout(node_id);
|
|
self.admin_sessions.on_connection_timeout(node_id);
|
|
self.creator_core.on_connection_timeout(node_id);
|
|
}
|
|
}
|
|
|
|
impl<S, SC, D> ClusterSessionsContainer<S, SC, D>
|
|
where
|
|
S: ClusterSession,
|
|
SC: ClusterSessionCreator<S, D>,
|
|
{
|
|
pub fn new(creator: SC, container_state: Arc<Mutex<ClusterSessionsContainerState>>) -> Self {
|
|
ClusterSessionsContainer {
|
|
creator: creator,
|
|
sessions: RwLock::new(BTreeMap::new()),
|
|
listeners: Mutex::new(Vec::new()),
|
|
container_state: container_state,
|
|
preserve_sessions: false,
|
|
_pd: Default::default(),
|
|
}
|
|
}
|
|
|
|
pub fn add_listener(&self, listener: Arc<dyn ClusterSessionsListener<S>>) {
|
|
self.listeners.lock().push(Arc::downgrade(&listener));
|
|
}
|
|
|
|
#[cfg(test)]
|
|
pub fn is_empty(&self) -> bool {
|
|
self.sessions.read().is_empty()
|
|
}
|
|
|
|
pub fn get(&self, session_id: &S::Id, update_last_message_time: bool) -> Option<Arc<S>> {
|
|
let mut sessions = self.sessions.write();
|
|
sessions.get_mut(session_id).map(|s| {
|
|
if update_last_message_time {
|
|
s.last_message_time = Instant::now();
|
|
}
|
|
s.session.clone()
|
|
})
|
|
}
|
|
|
|
#[cfg(test)]
|
|
pub fn first(&self) -> Option<Arc<S>> {
|
|
self.sessions
|
|
.read()
|
|
.values()
|
|
.nth(0)
|
|
.map(|s| s.session.clone())
|
|
}
|
|
|
|
pub fn insert(
|
|
&self,
|
|
cluster: Arc<dyn Cluster>,
|
|
master: NodeId,
|
|
session_id: S::Id,
|
|
session_nonce: Option<u64>,
|
|
is_exclusive_session: bool,
|
|
creation_data: Option<D>,
|
|
) -> Result<Arc<S>, Error> {
|
|
let mut sessions = self.sessions.write();
|
|
if sessions.contains_key(&session_id) {
|
|
return Err(Error::DuplicateSessionId);
|
|
}
|
|
|
|
// create cluster
|
|
// let cluster = create_cluster_view(data, requires_all_connections)?;
|
|
// create session
|
|
let session = self.creator.create(
|
|
cluster.clone(),
|
|
master.clone(),
|
|
session_nonce,
|
|
session_id.clone(),
|
|
creation_data,
|
|
)?;
|
|
// check if session can be started
|
|
self.container_state
|
|
.lock()
|
|
.on_session_starting(is_exclusive_session)?;
|
|
|
|
// insert session
|
|
let queued_session = QueuedSession {
|
|
master: master,
|
|
cluster_view: cluster,
|
|
last_keep_alive_time: Instant::now(),
|
|
last_message_time: Instant::now(),
|
|
session: session.clone(),
|
|
queue: VecDeque::new(),
|
|
};
|
|
sessions.insert(session_id, queued_session);
|
|
self.notify_listeners(|l| l.on_session_inserted(session.clone()));
|
|
|
|
Ok(session)
|
|
}
|
|
|
|
pub fn remove(&self, session_id: &S::Id) {
|
|
self.do_remove(session_id, &mut *self.sessions.write());
|
|
}
|
|
|
|
pub fn enqueue_message(
|
|
&self,
|
|
session_id: &S::Id,
|
|
sender: NodeId,
|
|
message: Message,
|
|
is_queued_message: bool,
|
|
) {
|
|
self.sessions.write().get_mut(session_id).map(|session| {
|
|
if is_queued_message {
|
|
session.queue.push_front((sender, message))
|
|
} else {
|
|
session.queue.push_back((sender, message))
|
|
}
|
|
});
|
|
}
|
|
|
|
pub fn dequeue_message(&self, session_id: &S::Id) -> Option<(NodeId, Message)> {
|
|
self.sessions
|
|
.write()
|
|
.get_mut(session_id)
|
|
.and_then(|session| session.queue.pop_front())
|
|
}
|
|
|
|
pub fn stop_stalled_sessions(&self) {
|
|
let mut sessions = self.sessions.write();
|
|
for sid in sessions.keys().cloned().collect::<Vec<_>>() {
|
|
let remove_session = {
|
|
let session = sessions
|
|
.get(&sid)
|
|
.expect("enumerating only existing sessions; qed");
|
|
if Instant::now() - session.last_message_time > SESSION_TIMEOUT_INTERVAL {
|
|
session.session.on_session_timeout();
|
|
session.session.is_finished()
|
|
} else {
|
|
false
|
|
}
|
|
};
|
|
|
|
if remove_session {
|
|
self.do_remove(&sid, &mut *sessions);
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn on_connection_timeout(&self, node_id: &NodeId) {
|
|
let mut sessions = self.sessions.write();
|
|
for sid in sessions.keys().cloned().collect::<Vec<_>>() {
|
|
let remove_session = {
|
|
let session = sessions
|
|
.get(&sid)
|
|
.expect("enumerating only existing sessions; qed");
|
|
session.session.on_node_timeout(node_id);
|
|
session.session.is_finished()
|
|
};
|
|
|
|
if remove_session {
|
|
self.do_remove(&sid, &mut *sessions);
|
|
}
|
|
}
|
|
}
|
|
|
|
fn do_remove(&self, session_id: &S::Id, sessions: &mut BTreeMap<S::Id, QueuedSession<S>>) {
|
|
if !self.preserve_sessions {
|
|
if let Some(session) = sessions.remove(session_id) {
|
|
self.container_state.lock().on_session_completed();
|
|
self.notify_listeners(|l| l.on_session_removed(session.session.clone()));
|
|
}
|
|
}
|
|
}
|
|
|
|
fn notify_listeners<F: Fn(&dyn ClusterSessionsListener<S>) -> ()>(&self, callback: F) {
|
|
let mut listeners = self.listeners.lock();
|
|
let mut listener_index = 0;
|
|
while listener_index < listeners.len() {
|
|
match listeners[listener_index].upgrade() {
|
|
Some(listener) => {
|
|
callback(&*listener);
|
|
listener_index += 1;
|
|
}
|
|
None => {
|
|
listeners.swap_remove(listener_index);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<S, SC, D> ClusterSessionsContainer<S, SC, D>
|
|
where
|
|
S: ClusterSession,
|
|
SC: ClusterSessionCreator<S, D>,
|
|
SessionId: From<S::Id>,
|
|
{
|
|
pub fn send_keep_alive(&self, session_id: &S::Id, self_node_id: &NodeId) {
|
|
if let Some(session) = self.sessions.write().get_mut(session_id) {
|
|
let now = Instant::now();
|
|
if self_node_id == &session.master
|
|
&& now - session.last_keep_alive_time > SESSION_KEEP_ALIVE_INTERVAL
|
|
{
|
|
session.last_keep_alive_time = now;
|
|
// since we send KeepAlive message to prevent nodes from disconnecting
|
|
// && worst thing that can happen if node is disconnected is that session is failed
|
|
// => ignore error here, because probably this node is not need for the rest of the session at all
|
|
let _ = session.cluster_view.broadcast(Message::Cluster(
|
|
message::ClusterMessage::KeepAliveResponse(message::KeepAliveResponse {
|
|
session_id: Some(session_id.clone().into()),
|
|
}),
|
|
));
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn on_keep_alive(&self, session_id: &S::Id, sender: &NodeId) {
|
|
if let Some(session) = self.sessions.write().get_mut(session_id) {
|
|
let now = Instant::now();
|
|
// we only accept keep alive from master node of ServersSetChange session
|
|
if sender == &session.master {
|
|
session.last_keep_alive_time = now;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl ClusterSessionsContainerState {
|
|
/// When session is starting.
|
|
pub fn on_session_starting(&mut self, is_exclusive_session: bool) -> Result<(), Error> {
|
|
match *self {
|
|
ClusterSessionsContainerState::Idle if is_exclusive_session => {
|
|
*self = ClusterSessionsContainerState::Exclusive;
|
|
}
|
|
ClusterSessionsContainerState::Idle => {
|
|
*self = ClusterSessionsContainerState::Active(1);
|
|
}
|
|
ClusterSessionsContainerState::Active(_) if is_exclusive_session => {
|
|
return Err(Error::HasActiveSessions)
|
|
}
|
|
ClusterSessionsContainerState::Active(sessions_count) => {
|
|
*self = ClusterSessionsContainerState::Active(sessions_count + 1);
|
|
}
|
|
ClusterSessionsContainerState::Exclusive => return Err(Error::ExclusiveSessionActive),
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// When session is completed.
|
|
pub fn on_session_completed(&mut self) {
|
|
match *self {
|
|
ClusterSessionsContainerState::Idle =>
|
|
unreachable!("idle means that there are no active sessions; on_session_completed is only called once after active session is completed; qed"),
|
|
ClusterSessionsContainerState::Active(sessions_count) if sessions_count == 1 => {
|
|
*self = ClusterSessionsContainerState::Idle;
|
|
},
|
|
ClusterSessionsContainerState::Active(sessions_count) => {
|
|
*self = ClusterSessionsContainerState::Active(sessions_count - 1);
|
|
}
|
|
ClusterSessionsContainerState::Exclusive => {
|
|
*self = ClusterSessionsContainerState::Idle;
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
impl SessionIdWithSubSession {
|
|
/// Create new decryption session Id.
|
|
pub fn new(session_id: SessionId, sub_session_id: Secret) -> Self {
|
|
SessionIdWithSubSession {
|
|
id: session_id,
|
|
access_key: sub_session_id,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl PartialOrd for SessionIdWithSubSession {
|
|
fn partial_cmp(&self, other: &Self) -> Option<::std::cmp::Ordering> {
|
|
Some(self.cmp(other))
|
|
}
|
|
}
|
|
|
|
impl Ord for SessionIdWithSubSession {
|
|
fn cmp(&self, other: &Self) -> ::std::cmp::Ordering {
|
|
match self.id.cmp(&other.id) {
|
|
::std::cmp::Ordering::Equal => self.access_key.cmp(&other.access_key),
|
|
r @ _ => r,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl AdminSession {
|
|
pub fn as_servers_set_change(&self) -> Option<&ServersSetChangeSessionImpl> {
|
|
match *self {
|
|
AdminSession::ServersSetChange(ref session) => Some(session),
|
|
_ => None,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl ClusterSession for AdminSession {
|
|
type Id = SessionId;
|
|
|
|
fn type_name() -> &'static str {
|
|
"admin"
|
|
}
|
|
|
|
fn id(&self) -> SessionId {
|
|
match *self {
|
|
AdminSession::ShareAdd(ref session) => session.id().clone(),
|
|
AdminSession::ServersSetChange(ref session) => session.id().clone(),
|
|
}
|
|
}
|
|
|
|
fn is_finished(&self) -> bool {
|
|
match *self {
|
|
AdminSession::ShareAdd(ref session) => session.is_finished(),
|
|
AdminSession::ServersSetChange(ref session) => session.is_finished(),
|
|
}
|
|
}
|
|
|
|
fn on_session_timeout(&self) {
|
|
match *self {
|
|
AdminSession::ShareAdd(ref session) => session.on_session_timeout(),
|
|
AdminSession::ServersSetChange(ref session) => session.on_session_timeout(),
|
|
}
|
|
}
|
|
|
|
fn on_node_timeout(&self, node_id: &NodeId) {
|
|
match *self {
|
|
AdminSession::ShareAdd(ref session) => session.on_node_timeout(node_id),
|
|
AdminSession::ServersSetChange(ref session) => session.on_node_timeout(node_id),
|
|
}
|
|
}
|
|
|
|
fn on_session_error(&self, node: &NodeId, error: Error) {
|
|
match *self {
|
|
AdminSession::ShareAdd(ref session) => session.on_session_error(node, error),
|
|
AdminSession::ServersSetChange(ref session) => session.on_session_error(node, error),
|
|
}
|
|
}
|
|
|
|
fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error> {
|
|
match *self {
|
|
AdminSession::ShareAdd(ref session) => session.on_message(sender, message),
|
|
AdminSession::ServersSetChange(ref session) => session.on_message(sender, message),
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn create_cluster_view(
|
|
self_key_pair: Arc<dyn NodeKeyPair>,
|
|
connections: Arc<dyn ConnectionProvider>,
|
|
requires_all_connections: bool,
|
|
) -> Result<Arc<dyn Cluster>, Error> {
|
|
let mut connected_nodes = connections.connected_nodes()?;
|
|
let disconnected_nodes = connections.disconnected_nodes();
|
|
|
|
let disconnected_nodes_count = disconnected_nodes.len();
|
|
if requires_all_connections {
|
|
if disconnected_nodes_count != 0 {
|
|
return Err(Error::NodeDisconnected);
|
|
}
|
|
}
|
|
|
|
connected_nodes.insert(self_key_pair.public().clone());
|
|
|
|
let connected_nodes_count = connected_nodes.len();
|
|
Ok(Arc::new(ClusterView::new(
|
|
self_key_pair,
|
|
connections,
|
|
connected_nodes,
|
|
connected_nodes_count + disconnected_nodes_count,
|
|
)))
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::{
|
|
AdminSessionCreationData, ClusterSessions, ClusterSessionsContainerState,
|
|
ClusterSessionsListener, SESSION_TIMEOUT_INTERVAL,
|
|
};
|
|
use ethkey::{Generator, Random};
|
|
use key_server_cluster::{
|
|
cluster::{tests::DummyCluster, ClusterConfiguration},
|
|
connection_trigger::SimpleServersSetChangeSessionCreatorConnector,
|
|
generation_session::SessionImpl as GenerationSession,
|
|
DummyAclStorage, DummyKeyStorage, Error, MapKeyServerSet, PlainNodeKeyPair,
|
|
};
|
|
use std::sync::{
|
|
atomic::{AtomicUsize, Ordering},
|
|
Arc,
|
|
};
|
|
|
|
pub fn make_cluster_sessions() -> ClusterSessions {
|
|
let key_pair = Random.generate().unwrap();
|
|
let config = ClusterConfiguration {
|
|
self_key_pair: Arc::new(PlainNodeKeyPair::new(key_pair.clone())),
|
|
key_server_set: Arc::new(MapKeyServerSet::new(
|
|
false,
|
|
vec![(
|
|
key_pair.public().clone(),
|
|
format!("127.0.0.1:{}", 100).parse().unwrap(),
|
|
)]
|
|
.into_iter()
|
|
.collect(),
|
|
)),
|
|
key_storage: Arc::new(DummyKeyStorage::default()),
|
|
acl_storage: Arc::new(DummyAclStorage::default()),
|
|
admin_public: Some(Random.generate().unwrap().public().clone()),
|
|
preserve_sessions: false,
|
|
};
|
|
ClusterSessions::new(
|
|
&config,
|
|
Arc::new(SimpleServersSetChangeSessionCreatorConnector {
|
|
admin_public: Some(Random.generate().unwrap().public().clone()),
|
|
}),
|
|
)
|
|
}
|
|
|
|
#[test]
|
|
fn cluster_session_cannot_be_started_if_exclusive_session_is_active() {
|
|
let sessions = make_cluster_sessions();
|
|
sessions
|
|
.generation_sessions
|
|
.insert(
|
|
Arc::new(DummyCluster::new(Default::default())),
|
|
Default::default(),
|
|
Default::default(),
|
|
None,
|
|
false,
|
|
None,
|
|
)
|
|
.unwrap();
|
|
match sessions.admin_sessions.insert(
|
|
Arc::new(DummyCluster::new(Default::default())),
|
|
Default::default(),
|
|
Default::default(),
|
|
None,
|
|
true,
|
|
Some(AdminSessionCreationData::ShareAdd(Default::default())),
|
|
) {
|
|
Err(Error::HasActiveSessions) => (),
|
|
Err(e) => unreachable!(format!("{}", e)),
|
|
Ok(_) => unreachable!("OK"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn exclusive_session_cannot_be_started_if_other_session_is_active() {
|
|
let sessions = make_cluster_sessions();
|
|
|
|
sessions
|
|
.admin_sessions
|
|
.insert(
|
|
Arc::new(DummyCluster::new(Default::default())),
|
|
Default::default(),
|
|
Default::default(),
|
|
None,
|
|
true,
|
|
Some(AdminSessionCreationData::ShareAdd(Default::default())),
|
|
)
|
|
.unwrap();
|
|
match sessions.generation_sessions.insert(
|
|
Arc::new(DummyCluster::new(Default::default())),
|
|
Default::default(),
|
|
Default::default(),
|
|
None,
|
|
false,
|
|
None,
|
|
) {
|
|
Err(Error::ExclusiveSessionActive) => (),
|
|
Err(e) => unreachable!(format!("{}", e)),
|
|
Ok(_) => unreachable!("OK"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn session_listener_works() {
|
|
#[derive(Default)]
|
|
struct GenerationSessionListener {
|
|
inserted: AtomicUsize,
|
|
removed: AtomicUsize,
|
|
}
|
|
|
|
impl ClusterSessionsListener<GenerationSession> for GenerationSessionListener {
|
|
fn on_session_inserted(&self, _session: Arc<GenerationSession>) {
|
|
self.inserted.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
|
|
fn on_session_removed(&self, _session: Arc<GenerationSession>) {
|
|
self.removed.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
}
|
|
|
|
let listener = Arc::new(GenerationSessionListener::default());
|
|
let sessions = make_cluster_sessions();
|
|
sessions.generation_sessions.add_listener(listener.clone());
|
|
|
|
sessions
|
|
.generation_sessions
|
|
.insert(
|
|
Arc::new(DummyCluster::new(Default::default())),
|
|
Default::default(),
|
|
Default::default(),
|
|
None,
|
|
false,
|
|
None,
|
|
)
|
|
.unwrap();
|
|
assert_eq!(listener.inserted.load(Ordering::Relaxed), 1);
|
|
assert_eq!(listener.removed.load(Ordering::Relaxed), 0);
|
|
|
|
sessions.generation_sessions.remove(&Default::default());
|
|
assert_eq!(listener.inserted.load(Ordering::Relaxed), 1);
|
|
assert_eq!(listener.removed.load(Ordering::Relaxed), 1);
|
|
}
|
|
|
|
#[test]
|
|
fn last_session_removal_sets_container_state_to_idle() {
|
|
let sessions = make_cluster_sessions();
|
|
|
|
sessions
|
|
.generation_sessions
|
|
.insert(
|
|
Arc::new(DummyCluster::new(Default::default())),
|
|
Default::default(),
|
|
Default::default(),
|
|
None,
|
|
false,
|
|
None,
|
|
)
|
|
.unwrap();
|
|
assert_eq!(
|
|
*sessions.generation_sessions.container_state.lock(),
|
|
ClusterSessionsContainerState::Active(1)
|
|
);
|
|
|
|
sessions.generation_sessions.remove(&Default::default());
|
|
assert_eq!(
|
|
*sessions.generation_sessions.container_state.lock(),
|
|
ClusterSessionsContainerState::Idle
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn last_session_removal_by_timeout_sets_container_state_to_idle() {
|
|
let sessions = make_cluster_sessions();
|
|
|
|
sessions
|
|
.generation_sessions
|
|
.insert(
|
|
Arc::new(DummyCluster::new(Default::default())),
|
|
Default::default(),
|
|
Default::default(),
|
|
None,
|
|
false,
|
|
None,
|
|
)
|
|
.unwrap();
|
|
assert_eq!(
|
|
*sessions.generation_sessions.container_state.lock(),
|
|
ClusterSessionsContainerState::Active(1)
|
|
);
|
|
|
|
sessions
|
|
.generation_sessions
|
|
.sessions
|
|
.write()
|
|
.get_mut(&Default::default())
|
|
.unwrap()
|
|
.last_message_time -= SESSION_TIMEOUT_INTERVAL * 2;
|
|
|
|
sessions.generation_sessions.stop_stalled_sessions();
|
|
assert_eq!(sessions.generation_sessions.sessions.read().len(), 0);
|
|
assert_eq!(
|
|
*sessions.generation_sessions.container_state.lock(),
|
|
ClusterSessionsContainerState::Idle
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn last_session_removal_by_node_timeout_sets_container_state_to_idle() {
|
|
let sessions = make_cluster_sessions();
|
|
|
|
sessions
|
|
.generation_sessions
|
|
.insert(
|
|
Arc::new(DummyCluster::new(Default::default())),
|
|
Default::default(),
|
|
Default::default(),
|
|
None,
|
|
false,
|
|
None,
|
|
)
|
|
.unwrap();
|
|
assert_eq!(
|
|
*sessions.generation_sessions.container_state.lock(),
|
|
ClusterSessionsContainerState::Active(1)
|
|
);
|
|
|
|
sessions
|
|
.generation_sessions
|
|
.on_connection_timeout(&Default::default());
|
|
assert_eq!(sessions.generation_sessions.sessions.read().len(), 0);
|
|
assert_eq!(
|
|
*sessions.generation_sessions.container_state.lock(),
|
|
ClusterSessionsContainerState::Idle
|
|
);
|
|
}
|
|
}
|