SecretStore: Kovan flush3
This commit is contained in:
parent
76e693240d
commit
5a7e065e41
@ -31,6 +31,10 @@ use traits::{AdminSessionsServer, ServerKeyGenerator, DocumentKeyServer, Message
|
|||||||
use types::all::{Error, Public, RequestSignature, ServerKeyId, EncryptedDocumentKey, EncryptedDocumentKeyShadow,
|
use types::all::{Error, Public, RequestSignature, ServerKeyId, EncryptedDocumentKey, EncryptedDocumentKeyShadow,
|
||||||
ClusterConfiguration, MessageHash, EncryptedMessageSignature, NodeId};
|
ClusterConfiguration, MessageHash, EncryptedMessageSignature, NodeId};
|
||||||
use key_server_cluster::{ClusterClient, ClusterConfiguration as NetClusterConfiguration};
|
use key_server_cluster::{ClusterClient, ClusterConfiguration as NetClusterConfiguration};
|
||||||
|
use key_server_cluster::generation_session::Session as GenerationSession;
|
||||||
|
use key_server_cluster::encryption_session::Session as EncryptionSession;
|
||||||
|
use key_server_cluster::decryption_session::Session as DecryptionSession;
|
||||||
|
use key_server_cluster::signing_session::Session as SigningSession;
|
||||||
|
|
||||||
/// Secret store key server implementation
|
/// Secret store key server implementation
|
||||||
pub struct KeyServerImpl {
|
pub struct KeyServerImpl {
|
||||||
@ -53,7 +57,6 @@ impl KeyServerImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Get cluster client reference.
|
/// Get cluster client reference.
|
||||||
#[cfg(test)]
|
|
||||||
pub fn cluster(&self) -> Arc<ClusterClient> {
|
pub fn cluster(&self) -> Arc<ClusterClient> {
|
||||||
self.data.lock().cluster.clone()
|
self.data.lock().cluster.clone()
|
||||||
}
|
}
|
||||||
|
@ -31,17 +31,15 @@ use bigint::hash::H256;
|
|||||||
use key_server_cluster::{Error, NodeId, SessionId, AclStorage, KeyStorage, KeyServerSet, NodeKeyPair};
|
use key_server_cluster::{Error, NodeId, SessionId, AclStorage, KeyStorage, KeyServerSet, NodeKeyPair};
|
||||||
use key_server_cluster::cluster_sessions::{ClusterSession, ClusterSessions, GenerationSessionWrapper, EncryptionSessionWrapper,
|
use key_server_cluster::cluster_sessions::{ClusterSession, ClusterSessions, GenerationSessionWrapper, EncryptionSessionWrapper,
|
||||||
DecryptionSessionWrapper, SigningSessionWrapper, AdminSessionWrapper, KeyNegotiationSessionWrapper, SessionIdWithSubSession,
|
DecryptionSessionWrapper, SigningSessionWrapper, AdminSessionWrapper, KeyNegotiationSessionWrapper, SessionIdWithSubSession,
|
||||||
ClusterSessionsContainer, SERVERS_SET_CHANGE_SESSION_ID, create_cluster_view, AdminSessionCreationData};
|
ClusterSessionsContainer, SERVERS_SET_CHANGE_SESSION_ID, create_cluster_view, AdminSessionCreationData, ClusterSessionsListener};
|
||||||
use key_server_cluster::cluster_sessions_creator::{ClusterSessionCreator, IntoSessionId};
|
use key_server_cluster::cluster_sessions_creator::{ClusterSessionCreator, IntoSessionId};
|
||||||
use key_server_cluster::message::{self, Message, ClusterMessage};
|
use key_server_cluster::message::{self, Message, ClusterMessage};
|
||||||
use key_server_cluster::generation_session::{Session as GenerationSession};
|
use key_server_cluster::generation_session::{SessionImpl as GenerationSession, Session as GenerationSessionTrait};
|
||||||
#[cfg(test)]
|
use key_server_cluster::decryption_session::{SessionImpl as DecryptionSession, Session as DecryptionSessionTrait};
|
||||||
use key_server_cluster::generation_session::SessionImpl as GenerationSessionImpl;
|
use key_server_cluster::encryption_session::{SessionImpl as EncryptionSession, Session as EncryptionSessionTrait};
|
||||||
use key_server_cluster::decryption_session::{Session as DecryptionSession};
|
use key_server_cluster::signing_session::{SessionImpl as SigningSession, Session as SigningSessionTrait};
|
||||||
use key_server_cluster::encryption_session::{Session as EncryptionSession};
|
use key_server_cluster::key_version_negotiation_session::{SessionImpl as KeyVersionNegotiationSession,
|
||||||
use key_server_cluster::signing_session::{Session as SigningSession};
|
IsolatedSessionTransport as KeyVersionNegotiationSessionTransport, ContinueAction, Session as KeyVersionNegotiationSessionTrait};
|
||||||
use key_server_cluster::key_version_negotiation_session::{Session as KeyVersionNegotiationSession, SessionImpl as KeyVersionNegotiationSessionImpl,
|
|
||||||
IsolatedSessionTransport as KeyVersionNegotiationSessionTransport, ContinueAction};
|
|
||||||
use key_server_cluster::io::{DeadlineStatus, ReadMessage, SharedTcpStream, read_encrypted_message, WriteMessage, write_encrypted_message};
|
use key_server_cluster::io::{DeadlineStatus, ReadMessage, SharedTcpStream, read_encrypted_message, WriteMessage, write_encrypted_message};
|
||||||
use key_server_cluster::net::{accept_connection as net_accept_connection, connect as net_connect, Connection as NetConnection};
|
use key_server_cluster::net::{accept_connection as net_accept_connection, connect as net_connect, Connection as NetConnection};
|
||||||
|
|
||||||
@ -74,16 +72,19 @@ pub trait ClusterClient: Send + Sync {
|
|||||||
/// Start new signing session.
|
/// Start new signing session.
|
||||||
fn new_signing_session(&self, session_id: SessionId, requestor_signature: Signature, version: Option<H256>, message_hash: H256) -> Result<Arc<SigningSession>, Error>;
|
fn new_signing_session(&self, session_id: SessionId, requestor_signature: Signature, version: Option<H256>, message_hash: H256) -> Result<Arc<SigningSession>, Error>;
|
||||||
/// Start new key version negotiation session.
|
/// Start new key version negotiation session.
|
||||||
fn new_key_version_negotiation_session(&self, session_id: SessionId) -> Result<Arc<KeyVersionNegotiationSession>, Error>;
|
fn new_key_version_negotiation_session(&self, session_id: SessionId) -> Result<Arc<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>, Error>;
|
||||||
/// Start new servers set change session.
|
/// Start new servers set change session.
|
||||||
fn new_servers_set_change_session(&self, session_id: Option<SessionId>, new_nodes_set: BTreeSet<NodeId>, old_set_signature: Signature, new_set_signature: Signature) -> Result<Arc<AdminSessionWrapper>, Error>;
|
fn new_servers_set_change_session(&self, session_id: Option<SessionId>, new_nodes_set: BTreeSet<NodeId>, old_set_signature: Signature, new_set_signature: Signature) -> Result<Arc<AdminSessionWrapper>, Error>;
|
||||||
|
|
||||||
|
/// Listen for new generation sessions.
|
||||||
|
fn add_generation_listener(&self, listener: Arc<ClusterSessionsListener<GenerationSession>>);
|
||||||
|
|
||||||
/// Ask node to make 'faulty' generation sessions.
|
/// Ask node to make 'faulty' generation sessions.
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
fn make_faulty_generation_sessions(&self);
|
fn make_faulty_generation_sessions(&self);
|
||||||
/// Get active generation session with given id.
|
/// Get active generation session with given id.
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
fn generation_session(&self, session_id: &SessionId) -> Option<Arc<GenerationSessionImpl>>;
|
fn generation_session(&self, session_id: &SessionId) -> Option<Arc<GenerationSession>>;
|
||||||
/// Try connect to disconnected nodes.
|
/// Try connect to disconnected nodes.
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
fn connect(&self);
|
fn connect(&self);
|
||||||
@ -446,7 +447,7 @@ impl ClusterCore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Try to contnue session.
|
/// Try to contnue session.
|
||||||
fn try_continue_session(data: &Arc<ClusterData>, session: Option<Arc<KeyVersionNegotiationSessionImpl<KeyVersionNegotiationSessionTransport>>>) {
|
fn try_continue_session(data: &Arc<ClusterData>, session: Option<Arc<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>>) {
|
||||||
if let Some(session) = session {
|
if let Some(session) = session {
|
||||||
let meta = session.meta();
|
let meta = session.meta();
|
||||||
let is_master_node = meta.self_node_id == meta.master_node_id;
|
let is_master_node = meta.self_node_id == meta.master_node_id;
|
||||||
@ -842,7 +843,7 @@ impl ClusterClientImpl {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_key_version_negotiation_session(&self, session_id: SessionId) -> Result<Arc<KeyVersionNegotiationSessionImpl<KeyVersionNegotiationSessionTransport>>, Error> {
|
fn create_key_version_negotiation_session(&self, session_id: SessionId) -> Result<Arc<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>, Error> {
|
||||||
let mut connected_nodes = self.data.connections.connected_nodes();
|
let mut connected_nodes = self.data.connections.connected_nodes();
|
||||||
connected_nodes.insert(self.data.self_key_pair.public().clone());
|
connected_nodes.insert(self.data.self_key_pair.public().clone());
|
||||||
|
|
||||||
@ -872,7 +873,7 @@ impl ClusterClient for ClusterClientImpl {
|
|||||||
let cluster = create_cluster_view(&self.data, true)?;
|
let cluster = create_cluster_view(&self.data, true)?;
|
||||||
let session = self.data.sessions.generation_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id, None, false, None)?;
|
let session = self.data.sessions.generation_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id, None, false, None)?;
|
||||||
match session.initialize(author, threshold, connected_nodes) {
|
match session.initialize(author, threshold, connected_nodes) {
|
||||||
Ok(()) => Ok(GenerationSessionWrapper::new(Arc::downgrade(&self.data), session_id, session)),
|
Ok(()) => Ok(session),
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
self.data.sessions.generation_sessions.remove(&session.id());
|
self.data.sessions.generation_sessions.remove(&session.id());
|
||||||
Err(error)
|
Err(error)
|
||||||
@ -887,7 +888,7 @@ impl ClusterClient for ClusterClientImpl {
|
|||||||
let cluster = create_cluster_view(&self.data, true)?;
|
let cluster = create_cluster_view(&self.data, true)?;
|
||||||
let session = self.data.sessions.encryption_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id, None, false, None)?;
|
let session = self.data.sessions.encryption_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id, None, false, None)?;
|
||||||
match session.initialize(requestor_signature, common_point, encrypted_point) {
|
match session.initialize(requestor_signature, common_point, encrypted_point) {
|
||||||
Ok(()) => Ok(EncryptionSessionWrapper::new(Arc::downgrade(&self.data), session_id, session)),
|
Ok(()) => Ok(session),
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
self.data.sessions.encryption_sessions.remove(&session.id());
|
self.data.sessions.encryption_sessions.remove(&session.id());
|
||||||
Err(error)
|
Err(error)
|
||||||
@ -916,7 +917,7 @@ impl ClusterClient for ClusterClientImpl {
|
|||||||
};
|
};
|
||||||
|
|
||||||
match initialization_result {
|
match initialization_result {
|
||||||
Ok(()) => Ok(DecryptionSessionWrapper::new(Arc::downgrade(&self.data), session_id, session)),
|
Ok(()) => Ok(session),
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
self.data.sessions.decryption_sessions.remove(&session.id());
|
self.data.sessions.decryption_sessions.remove(&session.id());
|
||||||
Err(error)
|
Err(error)
|
||||||
@ -945,7 +946,7 @@ impl ClusterClient for ClusterClientImpl {
|
|||||||
};
|
};
|
||||||
|
|
||||||
match initialization_result {
|
match initialization_result {
|
||||||
Ok(()) => Ok(SigningSessionWrapper::new(Arc::downgrade(&self.data), session_id, session)),
|
Ok(()) => Ok(session),
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
self.data.sessions.signing_sessions.remove(&session.id());
|
self.data.sessions.signing_sessions.remove(&session.id());
|
||||||
Err(error)
|
Err(error)
|
||||||
@ -953,9 +954,9 @@ impl ClusterClient for ClusterClientImpl {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new_key_version_negotiation_session(&self, session_id: SessionId) -> Result<Arc<KeyVersionNegotiationSession>, Error> {
|
fn new_key_version_negotiation_session(&self, session_id: SessionId) -> Result<Arc<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>, Error> {
|
||||||
let session = self.create_key_version_negotiation_session(session_id)?;
|
let session = self.create_key_version_negotiation_session(session_id)?;
|
||||||
Ok(KeyNegotiationSessionWrapper::new(Arc::downgrade(&self.data), session.id(), session))
|
Ok(session)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new_servers_set_change_session(&self, session_id: Option<SessionId>, new_nodes_set: BTreeSet<NodeId>, old_set_signature: Signature, new_set_signature: Signature) -> Result<Arc<AdminSessionWrapper>, Error> {
|
fn new_servers_set_change_session(&self, session_id: Option<SessionId>, new_nodes_set: BTreeSet<NodeId>, old_set_signature: Signature, new_set_signature: Signature) -> Result<Arc<AdminSessionWrapper>, Error> {
|
||||||
@ -982,6 +983,10 @@ impl ClusterClient for ClusterClientImpl {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn add_generation_listener(&self, listener: Arc<ClusterSessionsListener<GenerationSession>>) {
|
||||||
|
self.data.sessions.generation_sessions.add_listener(listener);
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
fn connect(&self) {
|
fn connect(&self) {
|
||||||
ClusterCore::connect_disconnected_nodes(self.data.clone());
|
ClusterCore::connect_disconnected_nodes(self.data.clone());
|
||||||
@ -993,7 +998,7 @@ impl ClusterClient for ClusterClientImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
fn generation_session(&self, session_id: &SessionId) -> Option<Arc<GenerationSessionImpl>> {
|
fn generation_session(&self, session_id: &SessionId) -> Option<Arc<GenerationSession>> {
|
||||||
self.data.sessions.generation_sessions.get(session_id, false)
|
self.data.sessions.generation_sessions.get(session_id, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1021,6 +1026,7 @@ pub mod tests {
|
|||||||
use key_server_cluster::cluster::{Cluster, ClusterCore, ClusterConfiguration};
|
use key_server_cluster::cluster::{Cluster, ClusterCore, ClusterConfiguration};
|
||||||
use key_server_cluster::cluster_sessions::ClusterSession;
|
use key_server_cluster::cluster_sessions::ClusterSession;
|
||||||
use key_server_cluster::generation_session::{Session as GenerationSession, SessionState as GenerationSessionState};
|
use key_server_cluster::generation_session::{Session as GenerationSession, SessionState as GenerationSessionState};
|
||||||
|
use key_server_cluster::signing_session::Session as SigningSession;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct DummyCluster {
|
pub struct DummyCluster {
|
||||||
|
@ -120,12 +120,22 @@ pub struct ClusterSessions {
|
|||||||
creator_core: Arc<SessionCreatorCore>,
|
creator_core: Arc<SessionCreatorCore>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Active sessions container listener.
|
||||||
|
pub trait ClusterSessionsListener<S: ClusterSession>: Send + Sync {
|
||||||
|
/// When new session is inserted to the container.
|
||||||
|
fn on_session_inserted(&self, session: Arc<S>);
|
||||||
|
/// When session is removed from the container.
|
||||||
|
fn on_session_removed(&self, session: Arc<S>);
|
||||||
|
}
|
||||||
|
|
||||||
/// Active sessions container.
|
/// Active sessions container.
|
||||||
pub struct ClusterSessionsContainer<S: ClusterSession, SC: ClusterSessionCreator<S, D>, D> {
|
pub struct ClusterSessionsContainer<S: ClusterSession, SC: ClusterSessionCreator<S, D>, D> {
|
||||||
/// Sessions creator.
|
/// Sessions creator.
|
||||||
pub creator: SC,
|
pub creator: SC,
|
||||||
/// Active sessions.
|
/// Active sessions.
|
||||||
sessions: RwLock<BTreeMap<S::Id, QueuedSession<S>>>,
|
sessions: RwLock<BTreeMap<S::Id, QueuedSession<S>>>,
|
||||||
|
/// Listeners. Lock order: sessions -> listeners.
|
||||||
|
listeners: Mutex<Vec<Weak<ClusterSessionsListener<S>>>>,
|
||||||
/// Sessions container state.
|
/// Sessions container state.
|
||||||
container_state: Arc<Mutex<ClusterSessionsContainerState>>,
|
container_state: Arc<Mutex<ClusterSessionsContainerState>>,
|
||||||
/// Phantom data.
|
/// Phantom data.
|
||||||
@ -294,11 +304,16 @@ impl<S, SC, D> ClusterSessionsContainer<S, SC, D> where S: ClusterSession, SC: C
|
|||||||
ClusterSessionsContainer {
|
ClusterSessionsContainer {
|
||||||
creator: creator,
|
creator: creator,
|
||||||
sessions: RwLock::new(BTreeMap::new()),
|
sessions: RwLock::new(BTreeMap::new()),
|
||||||
|
listeners: Mutex::new(Vec::new()),
|
||||||
container_state: container_state,
|
container_state: container_state,
|
||||||
_pd: Default::default(),
|
_pd: Default::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn add_listener(&self, listener: Arc<ClusterSessionsListener<S>>) {
|
||||||
|
self.listeners.lock().push(Arc::downgrade(&listener));
|
||||||
|
}
|
||||||
|
|
||||||
pub fn is_empty(&self) -> bool {
|
pub fn is_empty(&self) -> bool {
|
||||||
self.sessions.read().is_empty()
|
self.sessions.read().is_empty()
|
||||||
}
|
}
|
||||||
@ -342,12 +357,51 @@ impl<S, SC, D> ClusterSessionsContainer<S, SC, D> where S: ClusterSession, SC: C
|
|||||||
queue: VecDeque::new(),
|
queue: VecDeque::new(),
|
||||||
};
|
};
|
||||||
sessions.insert(session_id, queued_session);
|
sessions.insert(session_id, queued_session);
|
||||||
|
|
||||||
|
// notify listeners
|
||||||
|
let mut listeners = self.listeners.lock();
|
||||||
|
let mut listener_index = 0;
|
||||||
|
loop {
|
||||||
|
if listener_index >= listeners.len() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
match listeners[listener_index].upgrade() {
|
||||||
|
Some(listener) => {
|
||||||
|
listener.on_session_inserted(session.clone());
|
||||||
|
listener_index += 1;
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
listeners.swap_remove(listener_index);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(session)
|
Ok(session)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn remove(&self, session_id: &S::Id) {
|
pub fn remove(&self, session_id: &S::Id) {
|
||||||
if self.sessions.write().remove(session_id).is_some() {
|
if let Some(session) = self.sessions.write().remove(session_id) {
|
||||||
self.container_state.lock().on_session_completed();
|
self.container_state.lock().on_session_completed();
|
||||||
|
|
||||||
|
// notify listeners
|
||||||
|
let mut listeners = self.listeners.lock();
|
||||||
|
let mut listener_index = 0;
|
||||||
|
loop {
|
||||||
|
if listener_index >= listeners.len() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
match listeners[listener_index].upgrade() {
|
||||||
|
Some(listener) => {
|
||||||
|
listener.on_session_removed(session.session.clone());
|
||||||
|
listener_index += 1;
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
listeners.swap_remove(listener_index);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,6 +27,7 @@ pub use super::key_storage::{KeyStorage, DocumentKeyShare, DocumentKeyShareVersi
|
|||||||
pub use super::key_server_set::KeyServerSet;
|
pub use super::key_server_set::KeyServerSet;
|
||||||
pub use super::serialization::{SerializableSignature, SerializableH256, SerializableSecret, SerializablePublic, SerializableMessageHash};
|
pub use super::serialization::{SerializableSignature, SerializableH256, SerializableSecret, SerializablePublic, SerializableMessageHash};
|
||||||
pub use self::cluster::{ClusterCore, ClusterConfiguration, ClusterClient};
|
pub use self::cluster::{ClusterCore, ClusterConfiguration, ClusterClient};
|
||||||
|
pub use self::cluster_sessions::{ClusterSession, ClusterSessionsListener};
|
||||||
pub use self::generation_session::Session as GenerationSession;
|
pub use self::generation_session::Session as GenerationSession;
|
||||||
pub use self::encryption_session::Session as EncryptionSession;
|
pub use self::encryption_session::Session as EncryptionSession;
|
||||||
pub use self::decryption_session::Session as DecryptionSession;
|
pub use self::decryption_session::Session as DecryptionSession;
|
||||||
|
@ -80,11 +80,12 @@ pub fn start(client: Arc<Client>, sync: Arc<SyncProvider>, self_key_pair: Arc<No
|
|||||||
let key_server_set = key_server_set::OnChainKeyServerSet::new(&client, /*&sync, */config.cluster_config.nodes.clone())?; // TODO: return empty set until fully synced
|
let key_server_set = key_server_set::OnChainKeyServerSet::new(&client, /*&sync, */config.cluster_config.nodes.clone())?; // TODO: return empty set until fully synced
|
||||||
let key_storage = Arc::new(key_storage::PersistentKeyStorage::new(&config)?);
|
let key_storage = Arc::new(key_storage::PersistentKeyStorage::new(&config)?);
|
||||||
let key_server = Arc::new(key_server::KeyServerImpl::new(&config.cluster_config, key_server_set.clone(), self_key_pair.clone(), acl_storage, key_storage)?);
|
let key_server = Arc::new(key_server::KeyServerImpl::new(&config.cluster_config, key_server_set.clone(), self_key_pair.clone(), acl_storage, key_storage)?);
|
||||||
|
let cluster = key_server.cluster();
|
||||||
let http_listener = match config.listener_address {
|
let http_listener = match config.listener_address {
|
||||||
Some(listener_address) => Some(listener::http_listener::KeyServerHttpListener::start(listener_address, key_server.clone())?),
|
Some(listener_address) => Some(listener::http_listener::KeyServerHttpListener::start(listener_address, key_server.clone())?),
|
||||||
None => None,
|
None => None,
|
||||||
};
|
};
|
||||||
let contract_listener = listener::service_contract_listener::ServiceContractListener::new(&client, &sync, key_server.clone(), self_key_pair, key_server_set);
|
let contract_listener = listener::service_contract_listener::ServiceContractListener::new(&client, &sync, key_server.clone(), cluster, self_key_pair, key_server_set);
|
||||||
let listener = listener::Listener::new(key_server, http_listener, Some(contract_listener));
|
let listener = listener::Listener::new(key_server, http_listener, Some(contract_listener));
|
||||||
Ok(Box::new(listener))
|
Ok(Box::new(listener))
|
||||||
}
|
}
|
||||||
|
@ -31,6 +31,8 @@ use bigint::hash::H256;
|
|||||||
use bigint::prelude::U256;
|
use bigint::prelude::U256;
|
||||||
use util::Address;
|
use util::Address;
|
||||||
use key_server_set::KeyServerSet;
|
use key_server_set::KeyServerSet;
|
||||||
|
use key_server_cluster::{ClusterClient, ClusterSessionsListener, ClusterSession};
|
||||||
|
use key_server_cluster::generation_session::{SessionImpl as GenerationSession, Session as GenerationSessionTrait};
|
||||||
use {ServerKeyId, NodeKeyPair, KeyServer};
|
use {ServerKeyId, NodeKeyPair, KeyServer};
|
||||||
|
|
||||||
/// Name of the SecretStore contract in the registry.
|
/// Name of the SecretStore contract in the registry.
|
||||||
@ -108,15 +110,17 @@ enum ServiceTask {
|
|||||||
Retry,
|
Retry,
|
||||||
/// Generate server key (server_key_id, threshold).
|
/// Generate server key (server_key_id, threshold).
|
||||||
GenerateServerKey(H256, H256),
|
GenerateServerKey(H256, H256),
|
||||||
|
/// Confirm server key (server_key_id).
|
||||||
|
ConfirmServerKey(H256),
|
||||||
/// Shutdown listener.
|
/// Shutdown listener.
|
||||||
Shutdown,
|
Shutdown,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServiceContractListener {
|
impl ServiceContractListener {
|
||||||
pub fn new(client: &Arc<Client>, sync: &Arc<SyncProvider>, key_server: Arc<KeyServer>, self_key_pair: Arc<NodeKeyPair>, key_servers_set: Arc<KeyServerSet>) -> Arc<ServiceContractListener> {
|
pub fn new(client: &Arc<Client>, sync: &Arc<SyncProvider>, key_server: Arc<KeyServer>, cluster: Arc<ClusterClient>, self_key_pair: Arc<NodeKeyPair>, key_servers_set: Arc<KeyServerSet>) -> Arc<ServiceContractListener> {
|
||||||
let contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned())
|
let contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned())
|
||||||
.map(|a| {
|
.map(|a| {
|
||||||
trace!(target: "secretstore", "Installing service contract from address {}", a);
|
trace!(target: "secretstore", "{}: installing service contract from address {}", self_key_pair.public(), a);
|
||||||
a
|
a
|
||||||
})
|
})
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
@ -146,6 +150,7 @@ impl ServiceContractListener {
|
|||||||
service_handle: Some(service_handle),
|
service_handle: Some(service_handle),
|
||||||
});
|
});
|
||||||
client.add_notify(contract.clone());
|
client.add_notify(contract.clone());
|
||||||
|
cluster.add_generation_listener(contract.clone());
|
||||||
contract
|
contract
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -186,7 +191,7 @@ impl ServiceContractListener {
|
|||||||
fn run_service_thread(data: Arc<ServiceContractListenerData>) {
|
fn run_service_thread(data: Arc<ServiceContractListenerData>) {
|
||||||
loop {
|
loop {
|
||||||
let task = data.tasks_queue.wait();
|
let task = data.tasks_queue.wait();
|
||||||
trace!(target: "secretstore", "Processing {:?} task", task);
|
trace!(target: "secretstore", "{}: processing {:?} task",data.self_key_pair.public(), task);
|
||||||
|
|
||||||
match task {
|
match task {
|
||||||
ServiceTask::Shutdown => break,
|
ServiceTask::Shutdown => break,
|
||||||
@ -204,28 +209,29 @@ impl ServiceContractListener {
|
|||||||
Self::retry_pending_requests(&data)
|
Self::retry_pending_requests(&data)
|
||||||
.map(|processed_requests| {
|
.map(|processed_requests| {
|
||||||
if processed_requests != 0 {
|
if processed_requests != 0 {
|
||||||
trace!(target: "secretstore", "Successfully retried {} pending requests",
|
trace!(target: "secretstore", "{}: successfully retried {} pending requests",
|
||||||
processed_requests);
|
data.self_key_pair.public(), processed_requests);
|
||||||
}
|
}
|
||||||
()
|
()
|
||||||
})
|
})
|
||||||
.map_err(|error| {
|
.map_err(|error| {
|
||||||
warn!(target: "secretstore", "Retrying pending requests has failed with: {}",
|
warn!(target: "secretstore", "{}: retrying pending requests has failed with: {}",
|
||||||
error);
|
data.self_key_pair.public(), error);
|
||||||
error
|
error
|
||||||
}),
|
}),
|
||||||
|
ServiceTask::ConfirmServerKey(_) => Err("not implemented".to_owned()), // TODO
|
||||||
ServiceTask::GenerateServerKey(server_key_id, threshold) => {
|
ServiceTask::GenerateServerKey(server_key_id, threshold) => {
|
||||||
data.retry_data.lock().generated_keys.insert(server_key_id.clone());
|
data.retry_data.lock().generated_keys.insert(server_key_id.clone());
|
||||||
Self::generate_server_key(&data, &server_key_id, &threshold)
|
Self::generate_server_key(&data, &server_key_id, &threshold)
|
||||||
.and_then(|server_key| Self::publish_server_key(&data, &server_key_id, &server_key))
|
.and_then(|server_key| Self::publish_server_key(&data, &server_key_id, &server_key))
|
||||||
.map(|_| {
|
.map(|_| {
|
||||||
trace!(target: "secretstore", "GenerateServerKey({}, {}) request has completed",
|
trace!(target: "secretstore", "{}: started processing GenerateServerKey({}, {}) request",
|
||||||
server_key_id, threshold);
|
data.self_key_pair.public(), server_key_id, threshold);
|
||||||
()
|
()
|
||||||
})
|
})
|
||||||
.map_err(|error| {
|
.map_err(|error| {
|
||||||
warn!(target: "secretstore", "GenerateServerKey({}, {}) request has failed with: {}",
|
warn!(target: "secretstore", "{}: failed to start processing GenerateServerKey({}, {}) request with: {}",
|
||||||
server_key_id, threshold, error);
|
data.self_key_pair.public(), server_key_id, threshold, error);
|
||||||
error
|
error
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
@ -263,19 +269,23 @@ impl ServiceContractListener {
|
|||||||
if is_confirmed {
|
if is_confirmed {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// only process request, which haven't been processed recently
|
// only process request, which haven't been processed recently
|
||||||
// there could be a lag when we've just generated server key && retrying on the same block
|
// there could be a lag when we've just generated server key && retrying on the same block
|
||||||
// (or before our tx is mined) - state is not updated yet
|
// (or before our tx is mined) - state is not updated yet
|
||||||
if retry_data.generated_keys.contains(&server_key_id){
|
if retry_data.generated_keys.contains(&server_key_id){
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// only process requests that are intended to be processed by this server
|
|
||||||
if !is_processed_by_this_key_server(&*data.key_servers_set, &*data.self_key_pair, &server_key_id) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// process request
|
// process request
|
||||||
match Self::process_service_task(data, ServiceTask::GenerateServerKey(server_key_id, threshold.into())) {
|
let is_own_request = is_processed_by_this_key_server(&*data.key_servers_set, &*data.self_key_pair, &server_key_id);
|
||||||
|
let request_result = Self::process_service_task(data, match is_own_request {
|
||||||
|
true => ServiceTask::GenerateServerKey(server_key_id, threshold.into()),
|
||||||
|
false => ServiceTask::ConfirmServerKey(server_key_id),
|
||||||
|
});
|
||||||
|
|
||||||
|
// process request result
|
||||||
|
match request_result {
|
||||||
Ok(_) => processed_requests += 1,
|
Ok(_) => processed_requests += 1,
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
failed_requests += 1;
|
failed_requests += 1;
|
||||||
@ -351,7 +361,7 @@ impl ChainNotify for ServiceContractListener {
|
|||||||
// update contract address from registry
|
// update contract address from registry
|
||||||
if let Some(service_contract_addr) = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) {
|
if let Some(service_contract_addr) = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) {
|
||||||
if self.data.contract.read().address != service_contract_addr {
|
if self.data.contract.read().address != service_contract_addr {
|
||||||
trace!(target: "secretstore", "Installing service contract from address {}", service_contract_addr);
|
trace!(target: "secretstore", "{}: installing service contract from address {}", self.data.self_key_pair.public(), service_contract_addr);
|
||||||
*self.data.contract.write() = SecretStoreService::new(service_contract_addr.clone());
|
*self.data.contract.write() = SecretStoreService::new(service_contract_addr.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -370,6 +380,31 @@ impl ChainNotify for ServiceContractListener {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ClusterSessionsListener<GenerationSession> for ServiceContractListener {
|
||||||
|
fn on_session_inserted(&self, _session: Arc<GenerationSession>) {
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_session_removed(&self, session: Arc<GenerationSession>) {
|
||||||
|
// TODO: only start if session started via the contract
|
||||||
|
// only publish when the session is started by another node
|
||||||
|
if !is_processed_by_this_key_server(&*self.data.key_servers_set, &*self.data.self_key_pair, &session.id()) {
|
||||||
|
session.wait(Some(Default::default()))
|
||||||
|
.map_err(|e| format!("{}", e))
|
||||||
|
.and_then(|server_key| Self::publish_server_key(&self.data, &session.id(), &server_key))
|
||||||
|
.map(|_| {
|
||||||
|
trace!(target: "secretstore", "{}: completed foreign GenerateServerKey({}) request",
|
||||||
|
self.data.self_key_pair.public(), session.id());
|
||||||
|
()
|
||||||
|
})
|
||||||
|
.map_err(|error| {
|
||||||
|
warn!(target: "secretstore", "{}: failed to process GenerateServerKey({}) request with: {}",
|
||||||
|
self.data.self_key_pair.public(), session.id(), error);
|
||||||
|
error
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl TasksQueue {
|
impl TasksQueue {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
TasksQueue {
|
TasksQueue {
|
||||||
|
Loading…
Reference in New Issue
Block a user