SecretStore: session level timeout (#6631)

* SecretStore: session level timeout

* removed obsolete TODO
This commit is contained in:
Svyatoslav Nikolsky 2017-10-05 23:38:23 +03:00 committed by Nikolay Volf
parent 6431459bcf
commit 4e9d439f39
3 changed files with 71 additions and 16 deletions

View File

@ -357,6 +357,7 @@ impl ClusterCore {
/// Send keepalive messages to every othe node.
fn keep_alive(data: Arc<ClusterData>) {
data.sessions.sessions_keep_alive();
for connection in data.connections.active_connections() {
let last_message_diff = time::Instant::now() - connection.last_message_time();
if last_message_diff > time::Duration::from_secs(KEEP_ALIVE_DISCONNECT_INTERVAL) {
@ -460,7 +461,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.generation_sessions.get(&session_id)
data.sessions.generation_sessions.get(&session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
@ -538,7 +539,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.encryption_sessions.get(&session_id)
data.sessions.encryption_sessions.get(&session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
@ -629,7 +630,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.decryption_sessions.get(&decryption_session_id)
data.sessions.decryption_sessions.get(&decryption_session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
@ -705,7 +706,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.signing_sessions.get(&signing_session_id)
data.sessions.signing_sessions.get(&signing_session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
@ -784,7 +785,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.admin_sessions.get(&session_id)
data.sessions.admin_sessions.get(&session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
@ -865,7 +866,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.admin_sessions.get(&session_id)
data.sessions.admin_sessions.get(&session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
@ -946,7 +947,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.admin_sessions.get(&session_id)
data.sessions.admin_sessions.get(&session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
@ -1029,7 +1030,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.admin_sessions.get(&session_id)
data.sessions.admin_sessions.get(&session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
@ -1084,8 +1085,12 @@ impl ClusterCore {
/// Process single cluster message from the connection.
fn process_cluster_message(data: Arc<ClusterData>, connection: Arc<Connection>, message: ClusterMessage) {
match message {
ClusterMessage::KeepAlive(_) => data.spawn(connection.send_message(Message::Cluster(ClusterMessage::KeepAliveResponse(message::KeepAliveResponse {})))),
ClusterMessage::KeepAliveResponse(_) => (),
ClusterMessage::KeepAlive(_) => data.spawn(connection.send_message(Message::Cluster(ClusterMessage::KeepAliveResponse(message::KeepAliveResponse {
session_id: None,
})))),
ClusterMessage::KeepAliveResponse(msg) => if let Some(session_id) = msg.session_id {
data.sessions.on_session_keep_alive(connection.node_id(), session_id.into());
},
_ => warn!(target: "secretstore_net", "{}: received unexpected message {} from node {} at {}", data.self_key_pair.public(), message, connection.node_id(), connection.node_address()),
}
}
@ -1459,7 +1464,7 @@ impl ClusterClient for ClusterClientImpl {
#[cfg(test)]
fn generation_session(&self, session_id: &SessionId) -> Option<Arc<GenerationSessionImpl>> {
self.data.sessions.generation_sessions.get(session_id)
self.data.sessions.generation_sessions.get(session_id, false)
}
}

View File

@ -47,6 +47,8 @@ use key_server_cluster::admin_sessions::ShareChangeSessionMeta;
/// This timeout is for cases when node is responding to KeepAlive messages, but intentionally ignores
/// session messages.
const SESSION_TIMEOUT_INTERVAL: u64 = 60;
/// Interval to send session-level KeepAlive-messages.
const SESSION_KEEP_ALIVE_INTERVAL: u64 = 30;
lazy_static! {
/// Servers set change session id (there could be at most 1 session => hardcoded id).
@ -129,6 +131,8 @@ pub struct QueuedSession<V, M> {
pub master: NodeId,
/// Cluster view.
pub cluster_view: Arc<Cluster>,
/// Last keep alive time.
pub last_keep_alive_time: time::Instant,
/// Last received message time.
pub last_message_time: time::Instant,
/// Generation session.
@ -224,6 +228,18 @@ impl ClusterSessions {
self.make_faulty_generation_sessions.store(true, Ordering::Relaxed);
}
/// Send session-level keep-alive messages.
pub fn sessions_keep_alive(&self) {
self.admin_sessions.send_keep_alive(&*SERVERS_SET_CHANGE_SESSION_ID, &self.self_node_id);
}
/// When session-level keep-alive response is received.
pub fn on_session_keep_alive(&self, sender: &NodeId, session_id: SessionId) {
if session_id == *SERVERS_SET_CHANGE_SESSION_ID {
self.admin_sessions.on_keep_alive(&session_id, sender);
}
}
/// Create new generation session.
pub fn new_generation_session(&self, master: NodeId, session_id: SessionId, nonce: Option<u64>, cluster: Arc<Cluster>) -> Result<Arc<GenerationSessionImpl>, Error> {
// check that there's no finished encryption session with the same id
@ -514,9 +530,6 @@ impl ClusterSessions {
self.encryption_sessions.stop_stalled_sessions();
self.decryption_sessions.stop_stalled_sessions();
self.signing_sessions.stop_stalled_sessions();
// TODO: servers set change session could take a lot of time
// && during that session some nodes could not receive messages
// => they could stop session as stalled. This must be handled
self.admin_sessions.stop_stalled_sessions();
}
@ -571,8 +584,15 @@ impl<K, V, M> ClusterSessionsContainer<K, V, M> where K: Clone + Ord, V: Cluster
self.sessions.read().is_empty()
}
pub fn get(&self, session_id: &K) -> Option<Arc<V>> {
self.sessions.read().get(session_id).map(|s| s.session.clone())
pub fn get(&self, session_id: &K, update_last_message_time: bool) -> Option<Arc<V>> {
let mut sessions = self.sessions.write();
sessions.get_mut(session_id)
.map(|s| {
if update_last_message_time {
s.last_message_time = time::Instant::now();
}
s.session.clone()
})
}
pub fn insert<F: FnOnce() -> Result<V, Error>>(&self, master: NodeId, session_id: K, cluster: Arc<Cluster>, is_exclusive_session: bool, session: F) -> Result<Arc<V>, Error> {
@ -590,6 +610,7 @@ impl<K, V, M> ClusterSessionsContainer<K, V, M> where K: Clone + Ord, V: Cluster
let queued_session = QueuedSession {
master: master,
cluster_view: cluster,
last_keep_alive_time: time::Instant::now(),
last_message_time: time::Instant::now(),
session: session.clone(),
queue: VecDeque::new(),
@ -649,6 +670,33 @@ impl<K, V, M> ClusterSessionsContainer<K, V, M> where K: Clone + Ord, V: Cluster
}
}
impl<K, V, M> ClusterSessionsContainer<K, V, M> where K: Clone + Ord, V: ClusterSession, SessionId: From<K> {
pub fn send_keep_alive(&self, session_id: &K, self_node_id: &NodeId) {
if let Some(session) = self.sessions.write().get_mut(session_id) {
let now = time::Instant::now();
if self_node_id == &session.master && now - session.last_keep_alive_time > time::Duration::from_secs(SESSION_KEEP_ALIVE_INTERVAL) {
session.last_keep_alive_time = now;
// since we send KeepAlive message to prevent nodes from disconnecting
// && worst thing that can happen if node is disconnected is that session is failed
// => ignore error here, because probably this node is not need for the rest of the session at all
let _ = session.cluster_view.broadcast(Message::Cluster(message::ClusterMessage::KeepAliveResponse(message::KeepAliveResponse {
session_id: Some(session_id.clone().into()),
})));
}
}
}
pub fn on_keep_alive(&self, session_id: &K, sender: &NodeId) {
if let Some(session) = self.sessions.write().get_mut(session_id) {
let now = time::Instant::now();
// we only accept keep alive from master node of ServersSetChange session
if sender == &session.master {
session.last_keep_alive_time = now;
}
}
}
}
impl ClusterSessionsContainerState {
/// When session is starting.
pub fn on_session_starting(&mut self, is_exclusive_session: bool) -> Result<(), Error> {

View File

@ -255,6 +255,8 @@ pub struct KeepAlive {
/// Confirm that the node is still alive.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct KeepAliveResponse {
/// Session id, if used for session-level keep alive.
pub session_id: Option<MessageSessionId>,
}
/// Initialize new DKG session.