openethereum/secret-store/src/key_server_cluster/cluster_sessions.rs

938 lines
32 KiB
Rust

// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
use ethereum_types::H256;
use ethkey::Secret;
use key_server_cluster::{
cluster::{Cluster, ClusterConfiguration, ClusterView},
cluster_connections::ConnectionProvider,
connection_trigger::ServersSetChangeSessionCreatorConnector,
decryption_session::SessionImpl as DecryptionSessionImpl,
encryption_session::SessionImpl as EncryptionSessionImpl,
generation_session::SessionImpl as GenerationSessionImpl,
key_version_negotiation_session::{
IsolatedSessionTransport as VersionNegotiationTransport,
SessionImpl as KeyVersionNegotiationSessionImpl,
},
message::{self, Message},
servers_set_change_session::SessionImpl as ServersSetChangeSessionImpl,
share_add_session::{
IsolatedSessionTransport as ShareAddTransport, SessionImpl as ShareAddSessionImpl,
},
signing_session_ecdsa::SessionImpl as EcdsaSigningSessionImpl,
signing_session_schnorr::SessionImpl as SchnorrSigningSessionImpl,
Error, NodeId, NodeKeyPair, Requester, SessionId,
};
use parking_lot::{Condvar, Mutex, RwLock};
use std::{
collections::{BTreeMap, BTreeSet, VecDeque},
sync::{atomic::AtomicBool, Arc, Weak},
time::{Duration, Instant},
};
use key_server_cluster::cluster_sessions_creator::{
AdminSessionCreator, ClusterSessionCreator, DecryptionSessionCreator,
EcdsaSigningSessionCreator, EncryptionSessionCreator, GenerationSessionCreator,
KeyVersionNegotiationSessionCreator, SchnorrSigningSessionCreator, SessionCreatorCore,
};
/// When there are no session-related messages for SESSION_TIMEOUT_INTERVAL seconds,
/// we must treat this session as stalled && finish it with an error.
/// This timeout is for cases when node is responding to KeepAlive messages, but intentionally ignores
/// session messages.
const SESSION_TIMEOUT_INTERVAL: Duration = Duration::from_secs(60);
/// Interval to send session-level KeepAlive-messages.
const SESSION_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(30);
lazy_static! {
/// Servers set change session id (there could be at most 1 session => hardcoded id).
pub static ref SERVERS_SET_CHANGE_SESSION_ID: SessionId = "10b7af423bb551d5dc8645db754163a2145d37d78d468fa7330435ed77064c1c"
.parse()
.expect("hardcoded id should parse without errors; qed");
}
/// Session id with sub session.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SessionIdWithSubSession {
/// Key id.
pub id: SessionId,
/// Sub session id.
pub access_key: Secret,
}
/// Generic cluster session.
pub trait ClusterSession {
/// Session identifier type.
type Id: ::std::fmt::Debug + Ord + Clone;
/// Session type name.
fn type_name() -> &'static str;
/// Get session id.
fn id(&self) -> Self::Id;
/// If session is finished (either with succcess or not).
fn is_finished(&self) -> bool;
/// When it takes too much time to complete session.
fn on_session_timeout(&self);
/// When it takes too much time to receive response from the node.
fn on_node_timeout(&self, node_id: &NodeId);
/// Process error that has occured during session + propagate this error to required nodes.
fn on_session_error(&self, sender: &NodeId, error: Error);
/// Process session message.
fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error>;
/// 'Wait for session completion' helper.
fn wait_session<T, U, F: Fn(&U) -> Option<Result<T, Error>>>(
completion_event: &Condvar,
session_data: &Mutex<U>,
timeout: Option<Duration>,
result_reader: F,
) -> Option<Result<T, Error>> {
let mut locked_data = session_data.lock();
match result_reader(&locked_data) {
Some(result) => Some(result),
None => {
match timeout {
None => completion_event.wait(&mut locked_data),
Some(timeout) => {
completion_event.wait_for(&mut locked_data, timeout);
}
}
result_reader(&locked_data)
}
}
}
}
/// Administrative session.
pub enum AdminSession {
/// Share add session.
ShareAdd(ShareAddSessionImpl<ShareAddTransport>),
/// Servers set change session.
ServersSetChange(ServersSetChangeSessionImpl),
}
/// Administrative session creation data.
pub enum AdminSessionCreationData {
/// Share add session (key id).
ShareAdd(H256),
/// Servers set change session (block id, new_server_set).
ServersSetChange(Option<H256>, BTreeSet<NodeId>),
}
/// Active sessions on this cluster.
pub struct ClusterSessions {
/// Key generation sessions.
pub generation_sessions:
ClusterSessionsContainer<GenerationSessionImpl, GenerationSessionCreator, ()>,
/// Encryption sessions.
pub encryption_sessions:
ClusterSessionsContainer<EncryptionSessionImpl, EncryptionSessionCreator, ()>,
/// Decryption sessions.
pub decryption_sessions:
ClusterSessionsContainer<DecryptionSessionImpl, DecryptionSessionCreator, Requester>,
/// Schnorr signing sessions.
pub schnorr_signing_sessions: ClusterSessionsContainer<
SchnorrSigningSessionImpl,
SchnorrSigningSessionCreator,
Requester,
>,
/// ECDSA signing sessions.
pub ecdsa_signing_sessions:
ClusterSessionsContainer<EcdsaSigningSessionImpl, EcdsaSigningSessionCreator, Requester>,
/// Key version negotiation sessions.
pub negotiation_sessions: ClusterSessionsContainer<
KeyVersionNegotiationSessionImpl<VersionNegotiationTransport>,
KeyVersionNegotiationSessionCreator,
(),
>,
/// Administrative sessions.
pub admin_sessions:
ClusterSessionsContainer<AdminSession, AdminSessionCreator, AdminSessionCreationData>,
/// Self node id.
self_node_id: NodeId,
/// Creator core.
creator_core: Arc<SessionCreatorCore>,
}
/// Active sessions container listener.
pub trait ClusterSessionsListener<S: ClusterSession>: Send + Sync {
/// When new session is inserted to the container.
fn on_session_inserted(&self, _session: Arc<S>) {}
/// When session is removed from the container.
fn on_session_removed(&self, _session: Arc<S>) {}
}
/// Active sessions container.
pub struct ClusterSessionsContainer<S: ClusterSession, SC: ClusterSessionCreator<S, D>, D> {
/// Sessions creator.
pub creator: SC,
/// Active sessions.
sessions: RwLock<BTreeMap<S::Id, QueuedSession<S>>>,
/// Listeners. Lock order: sessions -> listeners.
listeners: Mutex<Vec<Weak<dyn ClusterSessionsListener<S>>>>,
/// Sessions container state.
container_state: Arc<Mutex<ClusterSessionsContainerState>>,
/// Do not actually remove sessions.
preserve_sessions: bool,
/// Phantom data.
_pd: ::std::marker::PhantomData<D>,
}
/// Session and its message queue.
pub struct QueuedSession<S> {
/// Session master.
pub master: NodeId,
/// Cluster view.
pub cluster_view: Arc<dyn Cluster>,
/// Last keep alive time.
pub last_keep_alive_time: Instant,
/// Last received message time.
pub last_message_time: Instant,
/// Generation session.
pub session: Arc<S>,
/// Messages queue.
pub queue: VecDeque<(NodeId, Message)>,
}
/// Cluster sessions container state.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ClusterSessionsContainerState {
/// There's no active sessions => any session can be started.
Idle,
/// There are active sessions => exclusive session can't be started right now.
Active(usize),
/// Exclusive session is active => can't start any other sessions.
Exclusive,
}
impl ClusterSessions {
/// Create new cluster sessions container.
pub fn new(
config: &ClusterConfiguration,
servers_set_change_session_creator_connector: Arc<
dyn ServersSetChangeSessionCreatorConnector,
>,
) -> Self {
let container_state = Arc::new(Mutex::new(ClusterSessionsContainerState::Idle));
let creator_core = Arc::new(SessionCreatorCore::new(config));
ClusterSessions {
self_node_id: config.self_key_pair.public().clone(),
generation_sessions: ClusterSessionsContainer::new(
GenerationSessionCreator {
core: creator_core.clone(),
make_faulty_generation_sessions: AtomicBool::new(false),
},
container_state.clone(),
),
encryption_sessions: ClusterSessionsContainer::new(
EncryptionSessionCreator {
core: creator_core.clone(),
},
container_state.clone(),
),
decryption_sessions: ClusterSessionsContainer::new(
DecryptionSessionCreator {
core: creator_core.clone(),
},
container_state.clone(),
),
schnorr_signing_sessions: ClusterSessionsContainer::new(
SchnorrSigningSessionCreator {
core: creator_core.clone(),
},
container_state.clone(),
),
ecdsa_signing_sessions: ClusterSessionsContainer::new(
EcdsaSigningSessionCreator {
core: creator_core.clone(),
},
container_state.clone(),
),
negotiation_sessions: ClusterSessionsContainer::new(
KeyVersionNegotiationSessionCreator {
core: creator_core.clone(),
},
container_state.clone(),
),
admin_sessions: ClusterSessionsContainer::new(
AdminSessionCreator {
core: creator_core.clone(),
servers_set_change_session_creator_connector:
servers_set_change_session_creator_connector,
admin_public: config.admin_public.clone(),
},
container_state,
),
creator_core: creator_core,
}
}
#[cfg(test)]
pub fn make_faulty_generation_sessions(&self) {
self.generation_sessions
.creator
.make_faulty_generation_sessions();
}
#[cfg(test)]
pub fn preserve_sessions(&mut self) {
self.generation_sessions.preserve_sessions = true;
self.encryption_sessions.preserve_sessions = true;
self.decryption_sessions.preserve_sessions = true;
self.schnorr_signing_sessions.preserve_sessions = true;
self.ecdsa_signing_sessions.preserve_sessions = true;
self.negotiation_sessions.preserve_sessions = true;
self.admin_sessions.preserve_sessions = true;
}
/// Send session-level keep-alive messages.
pub fn sessions_keep_alive(&self) {
self.admin_sessions
.send_keep_alive(&*SERVERS_SET_CHANGE_SESSION_ID, &self.self_node_id);
}
/// When session-level keep-alive response is received.
pub fn on_session_keep_alive(&self, sender: &NodeId, session_id: SessionId) {
if session_id == *SERVERS_SET_CHANGE_SESSION_ID {
self.admin_sessions.on_keep_alive(&session_id, sender);
}
}
/// Stop sessions that are stalling.
pub fn stop_stalled_sessions(&self) {
self.generation_sessions.stop_stalled_sessions();
self.encryption_sessions.stop_stalled_sessions();
self.decryption_sessions.stop_stalled_sessions();
self.schnorr_signing_sessions.stop_stalled_sessions();
self.ecdsa_signing_sessions.stop_stalled_sessions();
self.negotiation_sessions.stop_stalled_sessions();
self.admin_sessions.stop_stalled_sessions();
}
/// When connection to node is lost.
pub fn on_connection_timeout(&self, node_id: &NodeId) {
self.generation_sessions.on_connection_timeout(node_id);
self.encryption_sessions.on_connection_timeout(node_id);
self.decryption_sessions.on_connection_timeout(node_id);
self.schnorr_signing_sessions.on_connection_timeout(node_id);
self.ecdsa_signing_sessions.on_connection_timeout(node_id);
self.negotiation_sessions.on_connection_timeout(node_id);
self.admin_sessions.on_connection_timeout(node_id);
self.creator_core.on_connection_timeout(node_id);
}
}
impl<S, SC, D> ClusterSessionsContainer<S, SC, D>
where
S: ClusterSession,
SC: ClusterSessionCreator<S, D>,
{
pub fn new(creator: SC, container_state: Arc<Mutex<ClusterSessionsContainerState>>) -> Self {
ClusterSessionsContainer {
creator: creator,
sessions: RwLock::new(BTreeMap::new()),
listeners: Mutex::new(Vec::new()),
container_state: container_state,
preserve_sessions: false,
_pd: Default::default(),
}
}
pub fn add_listener(&self, listener: Arc<dyn ClusterSessionsListener<S>>) {
self.listeners.lock().push(Arc::downgrade(&listener));
}
#[cfg(test)]
pub fn is_empty(&self) -> bool {
self.sessions.read().is_empty()
}
pub fn get(&self, session_id: &S::Id, update_last_message_time: bool) -> Option<Arc<S>> {
let mut sessions = self.sessions.write();
sessions.get_mut(session_id).map(|s| {
if update_last_message_time {
s.last_message_time = Instant::now();
}
s.session.clone()
})
}
#[cfg(test)]
pub fn first(&self) -> Option<Arc<S>> {
self.sessions
.read()
.values()
.nth(0)
.map(|s| s.session.clone())
}
pub fn insert(
&self,
cluster: Arc<dyn Cluster>,
master: NodeId,
session_id: S::Id,
session_nonce: Option<u64>,
is_exclusive_session: bool,
creation_data: Option<D>,
) -> Result<Arc<S>, Error> {
let mut sessions = self.sessions.write();
if sessions.contains_key(&session_id) {
return Err(Error::DuplicateSessionId);
}
// create cluster
// let cluster = create_cluster_view(data, requires_all_connections)?;
// create session
let session = self.creator.create(
cluster.clone(),
master.clone(),
session_nonce,
session_id.clone(),
creation_data,
)?;
// check if session can be started
self.container_state
.lock()
.on_session_starting(is_exclusive_session)?;
// insert session
let queued_session = QueuedSession {
master: master,
cluster_view: cluster,
last_keep_alive_time: Instant::now(),
last_message_time: Instant::now(),
session: session.clone(),
queue: VecDeque::new(),
};
sessions.insert(session_id, queued_session);
self.notify_listeners(|l| l.on_session_inserted(session.clone()));
Ok(session)
}
pub fn remove(&self, session_id: &S::Id) {
self.do_remove(session_id, &mut *self.sessions.write());
}
pub fn enqueue_message(
&self,
session_id: &S::Id,
sender: NodeId,
message: Message,
is_queued_message: bool,
) {
self.sessions.write().get_mut(session_id).map(|session| {
if is_queued_message {
session.queue.push_front((sender, message))
} else {
session.queue.push_back((sender, message))
}
});
}
pub fn dequeue_message(&self, session_id: &S::Id) -> Option<(NodeId, Message)> {
self.sessions
.write()
.get_mut(session_id)
.and_then(|session| session.queue.pop_front())
}
pub fn stop_stalled_sessions(&self) {
let mut sessions = self.sessions.write();
for sid in sessions.keys().cloned().collect::<Vec<_>>() {
let remove_session = {
let session = sessions
.get(&sid)
.expect("enumerating only existing sessions; qed");
if Instant::now() - session.last_message_time > SESSION_TIMEOUT_INTERVAL {
session.session.on_session_timeout();
session.session.is_finished()
} else {
false
}
};
if remove_session {
self.do_remove(&sid, &mut *sessions);
}
}
}
pub fn on_connection_timeout(&self, node_id: &NodeId) {
let mut sessions = self.sessions.write();
for sid in sessions.keys().cloned().collect::<Vec<_>>() {
let remove_session = {
let session = sessions
.get(&sid)
.expect("enumerating only existing sessions; qed");
session.session.on_node_timeout(node_id);
session.session.is_finished()
};
if remove_session {
self.do_remove(&sid, &mut *sessions);
}
}
}
fn do_remove(&self, session_id: &S::Id, sessions: &mut BTreeMap<S::Id, QueuedSession<S>>) {
if !self.preserve_sessions {
if let Some(session) = sessions.remove(session_id) {
self.container_state.lock().on_session_completed();
self.notify_listeners(|l| l.on_session_removed(session.session.clone()));
}
}
}
fn notify_listeners<F: Fn(&dyn ClusterSessionsListener<S>) -> ()>(&self, callback: F) {
let mut listeners = self.listeners.lock();
let mut listener_index = 0;
while listener_index < listeners.len() {
match listeners[listener_index].upgrade() {
Some(listener) => {
callback(&*listener);
listener_index += 1;
}
None => {
listeners.swap_remove(listener_index);
}
}
}
}
}
impl<S, SC, D> ClusterSessionsContainer<S, SC, D>
where
S: ClusterSession,
SC: ClusterSessionCreator<S, D>,
SessionId: From<S::Id>,
{
pub fn send_keep_alive(&self, session_id: &S::Id, self_node_id: &NodeId) {
if let Some(session) = self.sessions.write().get_mut(session_id) {
let now = Instant::now();
if self_node_id == &session.master
&& now - session.last_keep_alive_time > SESSION_KEEP_ALIVE_INTERVAL
{
session.last_keep_alive_time = now;
// since we send KeepAlive message to prevent nodes from disconnecting
// && worst thing that can happen if node is disconnected is that session is failed
// => ignore error here, because probably this node is not need for the rest of the session at all
let _ = session.cluster_view.broadcast(Message::Cluster(
message::ClusterMessage::KeepAliveResponse(message::KeepAliveResponse {
session_id: Some(session_id.clone().into()),
}),
));
}
}
}
pub fn on_keep_alive(&self, session_id: &S::Id, sender: &NodeId) {
if let Some(session) = self.sessions.write().get_mut(session_id) {
let now = Instant::now();
// we only accept keep alive from master node of ServersSetChange session
if sender == &session.master {
session.last_keep_alive_time = now;
}
}
}
}
impl ClusterSessionsContainerState {
/// When session is starting.
pub fn on_session_starting(&mut self, is_exclusive_session: bool) -> Result<(), Error> {
match *self {
ClusterSessionsContainerState::Idle if is_exclusive_session => {
*self = ClusterSessionsContainerState::Exclusive;
}
ClusterSessionsContainerState::Idle => {
*self = ClusterSessionsContainerState::Active(1);
}
ClusterSessionsContainerState::Active(_) if is_exclusive_session => {
return Err(Error::HasActiveSessions)
}
ClusterSessionsContainerState::Active(sessions_count) => {
*self = ClusterSessionsContainerState::Active(sessions_count + 1);
}
ClusterSessionsContainerState::Exclusive => return Err(Error::ExclusiveSessionActive),
}
Ok(())
}
/// When session is completed.
pub fn on_session_completed(&mut self) {
match *self {
ClusterSessionsContainerState::Idle =>
unreachable!("idle means that there are no active sessions; on_session_completed is only called once after active session is completed; qed"),
ClusterSessionsContainerState::Active(sessions_count) if sessions_count == 1 => {
*self = ClusterSessionsContainerState::Idle;
},
ClusterSessionsContainerState::Active(sessions_count) => {
*self = ClusterSessionsContainerState::Active(sessions_count - 1);
}
ClusterSessionsContainerState::Exclusive => {
*self = ClusterSessionsContainerState::Idle;
},
}
}
}
impl SessionIdWithSubSession {
/// Create new decryption session Id.
pub fn new(session_id: SessionId, sub_session_id: Secret) -> Self {
SessionIdWithSubSession {
id: session_id,
access_key: sub_session_id,
}
}
}
impl PartialOrd for SessionIdWithSubSession {
fn partial_cmp(&self, other: &Self) -> Option<::std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for SessionIdWithSubSession {
fn cmp(&self, other: &Self) -> ::std::cmp::Ordering {
match self.id.cmp(&other.id) {
::std::cmp::Ordering::Equal => self.access_key.cmp(&other.access_key),
r @ _ => r,
}
}
}
impl AdminSession {
pub fn as_servers_set_change(&self) -> Option<&ServersSetChangeSessionImpl> {
match *self {
AdminSession::ServersSetChange(ref session) => Some(session),
_ => None,
}
}
}
impl ClusterSession for AdminSession {
type Id = SessionId;
fn type_name() -> &'static str {
"admin"
}
fn id(&self) -> SessionId {
match *self {
AdminSession::ShareAdd(ref session) => session.id().clone(),
AdminSession::ServersSetChange(ref session) => session.id().clone(),
}
}
fn is_finished(&self) -> bool {
match *self {
AdminSession::ShareAdd(ref session) => session.is_finished(),
AdminSession::ServersSetChange(ref session) => session.is_finished(),
}
}
fn on_session_timeout(&self) {
match *self {
AdminSession::ShareAdd(ref session) => session.on_session_timeout(),
AdminSession::ServersSetChange(ref session) => session.on_session_timeout(),
}
}
fn on_node_timeout(&self, node_id: &NodeId) {
match *self {
AdminSession::ShareAdd(ref session) => session.on_node_timeout(node_id),
AdminSession::ServersSetChange(ref session) => session.on_node_timeout(node_id),
}
}
fn on_session_error(&self, node: &NodeId, error: Error) {
match *self {
AdminSession::ShareAdd(ref session) => session.on_session_error(node, error),
AdminSession::ServersSetChange(ref session) => session.on_session_error(node, error),
}
}
fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error> {
match *self {
AdminSession::ShareAdd(ref session) => session.on_message(sender, message),
AdminSession::ServersSetChange(ref session) => session.on_message(sender, message),
}
}
}
pub fn create_cluster_view(
self_key_pair: Arc<dyn NodeKeyPair>,
connections: Arc<dyn ConnectionProvider>,
requires_all_connections: bool,
) -> Result<Arc<dyn Cluster>, Error> {
let mut connected_nodes = connections.connected_nodes()?;
let disconnected_nodes = connections.disconnected_nodes();
let disconnected_nodes_count = disconnected_nodes.len();
if requires_all_connections {
if disconnected_nodes_count != 0 {
return Err(Error::NodeDisconnected);
}
}
connected_nodes.insert(self_key_pair.public().clone());
let connected_nodes_count = connected_nodes.len();
Ok(Arc::new(ClusterView::new(
self_key_pair,
connections,
connected_nodes,
connected_nodes_count + disconnected_nodes_count,
)))
}
#[cfg(test)]
mod tests {
use super::{
AdminSessionCreationData, ClusterSessions, ClusterSessionsContainerState,
ClusterSessionsListener, SESSION_TIMEOUT_INTERVAL,
};
use ethkey::{Generator, Random};
use key_server_cluster::{
cluster::{tests::DummyCluster, ClusterConfiguration},
connection_trigger::SimpleServersSetChangeSessionCreatorConnector,
generation_session::SessionImpl as GenerationSession,
DummyAclStorage, DummyKeyStorage, Error, MapKeyServerSet, PlainNodeKeyPair,
};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
pub fn make_cluster_sessions() -> ClusterSessions {
let key_pair = Random.generate().unwrap();
let config = ClusterConfiguration {
self_key_pair: Arc::new(PlainNodeKeyPair::new(key_pair.clone())),
key_server_set: Arc::new(MapKeyServerSet::new(
false,
vec![(
key_pair.public().clone(),
format!("127.0.0.1:{}", 100).parse().unwrap(),
)]
.into_iter()
.collect(),
)),
key_storage: Arc::new(DummyKeyStorage::default()),
acl_storage: Arc::new(DummyAclStorage::default()),
admin_public: Some(Random.generate().unwrap().public().clone()),
preserve_sessions: false,
};
ClusterSessions::new(
&config,
Arc::new(SimpleServersSetChangeSessionCreatorConnector {
admin_public: Some(Random.generate().unwrap().public().clone()),
}),
)
}
#[test]
fn cluster_session_cannot_be_started_if_exclusive_session_is_active() {
let sessions = make_cluster_sessions();
sessions
.generation_sessions
.insert(
Arc::new(DummyCluster::new(Default::default())),
Default::default(),
Default::default(),
None,
false,
None,
)
.unwrap();
match sessions.admin_sessions.insert(
Arc::new(DummyCluster::new(Default::default())),
Default::default(),
Default::default(),
None,
true,
Some(AdminSessionCreationData::ShareAdd(Default::default())),
) {
Err(Error::HasActiveSessions) => (),
Err(e) => unreachable!(format!("{}", e)),
Ok(_) => unreachable!("OK"),
}
}
#[test]
fn exclusive_session_cannot_be_started_if_other_session_is_active() {
let sessions = make_cluster_sessions();
sessions
.admin_sessions
.insert(
Arc::new(DummyCluster::new(Default::default())),
Default::default(),
Default::default(),
None,
true,
Some(AdminSessionCreationData::ShareAdd(Default::default())),
)
.unwrap();
match sessions.generation_sessions.insert(
Arc::new(DummyCluster::new(Default::default())),
Default::default(),
Default::default(),
None,
false,
None,
) {
Err(Error::ExclusiveSessionActive) => (),
Err(e) => unreachable!(format!("{}", e)),
Ok(_) => unreachable!("OK"),
}
}
#[test]
fn session_listener_works() {
#[derive(Default)]
struct GenerationSessionListener {
inserted: AtomicUsize,
removed: AtomicUsize,
}
impl ClusterSessionsListener<GenerationSession> for GenerationSessionListener {
fn on_session_inserted(&self, _session: Arc<GenerationSession>) {
self.inserted.fetch_add(1, Ordering::Relaxed);
}
fn on_session_removed(&self, _session: Arc<GenerationSession>) {
self.removed.fetch_add(1, Ordering::Relaxed);
}
}
let listener = Arc::new(GenerationSessionListener::default());
let sessions = make_cluster_sessions();
sessions.generation_sessions.add_listener(listener.clone());
sessions
.generation_sessions
.insert(
Arc::new(DummyCluster::new(Default::default())),
Default::default(),
Default::default(),
None,
false,
None,
)
.unwrap();
assert_eq!(listener.inserted.load(Ordering::Relaxed), 1);
assert_eq!(listener.removed.load(Ordering::Relaxed), 0);
sessions.generation_sessions.remove(&Default::default());
assert_eq!(listener.inserted.load(Ordering::Relaxed), 1);
assert_eq!(listener.removed.load(Ordering::Relaxed), 1);
}
#[test]
fn last_session_removal_sets_container_state_to_idle() {
let sessions = make_cluster_sessions();
sessions
.generation_sessions
.insert(
Arc::new(DummyCluster::new(Default::default())),
Default::default(),
Default::default(),
None,
false,
None,
)
.unwrap();
assert_eq!(
*sessions.generation_sessions.container_state.lock(),
ClusterSessionsContainerState::Active(1)
);
sessions.generation_sessions.remove(&Default::default());
assert_eq!(
*sessions.generation_sessions.container_state.lock(),
ClusterSessionsContainerState::Idle
);
}
#[test]
fn last_session_removal_by_timeout_sets_container_state_to_idle() {
let sessions = make_cluster_sessions();
sessions
.generation_sessions
.insert(
Arc::new(DummyCluster::new(Default::default())),
Default::default(),
Default::default(),
None,
false,
None,
)
.unwrap();
assert_eq!(
*sessions.generation_sessions.container_state.lock(),
ClusterSessionsContainerState::Active(1)
);
sessions
.generation_sessions
.sessions
.write()
.get_mut(&Default::default())
.unwrap()
.last_message_time -= SESSION_TIMEOUT_INTERVAL * 2;
sessions.generation_sessions.stop_stalled_sessions();
assert_eq!(sessions.generation_sessions.sessions.read().len(), 0);
assert_eq!(
*sessions.generation_sessions.container_state.lock(),
ClusterSessionsContainerState::Idle
);
}
#[test]
fn last_session_removal_by_node_timeout_sets_container_state_to_idle() {
let sessions = make_cluster_sessions();
sessions
.generation_sessions
.insert(
Arc::new(DummyCluster::new(Default::default())),
Default::default(),
Default::default(),
None,
false,
None,
)
.unwrap();
assert_eq!(
*sessions.generation_sessions.container_state.lock(),
ClusterSessionsContainerState::Active(1)
);
sessions
.generation_sessions
.on_connection_timeout(&Default::default());
assert_eq!(sessions.generation_sessions.sessions.read().len(), 0);
assert_eq!(
*sessions.generation_sessions.container_state.lock(),
ClusterSessionsContainerState::Idle
);
}
}