SecretStore: non-blocking wait of session completion (#10303)

* make SS sessions return future

* fix grumbles

* do not create unused Condvar in production mode
This commit is contained in:
Svyatoslav Nikolsky 2019-06-06 13:35:06 +03:00 committed by GitHub
parent eed630a002
commit 9de1afeeb6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1132 additions and 528 deletions

View File

@ -16,14 +16,15 @@
use std::collections::BTreeSet;
use std::sync::Arc;
use futures::{future::{err, result}, Future};
use parking_lot::Mutex;
use crypto::DEFAULT_MAC;
use ethkey::crypto;
use ethkey::{crypto, public_to_address};
use parity_runtime::Executor;
use super::acl_storage::AclStorage;
use super::key_storage::KeyStorage;
use super::key_server_set::KeyServerSet;
use key_server_cluster::{math, new_network_cluster};
use key_server_cluster::{math, new_network_cluster, ClusterSession, WaitableSession};
use traits::{AdminSessionsServer, ServerKeyGenerator, DocumentKeyServer, MessageSigner, KeyServer, NodeKeyPair};
use types::{Error, Public, RequestSignature, Requester, ServerKeyId, EncryptedDocumentKey, EncryptedDocumentKeyShadow,
ClusterConfiguration, MessageHash, EncryptedMessageSignature, NodeId};
@ -58,132 +59,212 @@ impl KeyServerImpl {
impl KeyServer for KeyServerImpl {}
impl AdminSessionsServer for KeyServerImpl {
fn change_servers_set(&self, old_set_signature: RequestSignature, new_set_signature: RequestSignature, new_servers_set: BTreeSet<NodeId>) -> Result<(), Error> {
let servers_set_change_session = self.data.lock().cluster
.new_servers_set_change_session(None, None, new_servers_set, old_set_signature, new_set_signature)?;
servers_set_change_session.as_servers_set_change()
.expect("new_servers_set_change_session creates servers_set_change_session; qed")
.wait().map_err(Into::into)
fn change_servers_set(
&self,
old_set_signature: RequestSignature,
new_set_signature: RequestSignature,
new_servers_set: BTreeSet<NodeId>,
) -> Box<Future<Item=(), Error=Error> + Send> {
return_session(self.data.lock().cluster
.new_servers_set_change_session(None, None, new_servers_set, old_set_signature, new_set_signature))
}
}
impl ServerKeyGenerator for KeyServerImpl {
fn generate_key(&self, key_id: &ServerKeyId, author: &Requester, threshold: usize) -> Result<Public, Error> {
// recover requestor' public key from signature
let address = author.address(key_id).map_err(Error::InsufficientRequesterData)?;
fn generate_key(
&self,
key_id: ServerKeyId,
author: Requester,
threshold: usize,
) -> Box<Future<Item=Public, Error=Error> + Send> {
// recover requestor' address key from signature
let address = author.address(&key_id).map_err(Error::InsufficientRequesterData);
// generate server key
let generation_session = self.data.lock().cluster.new_generation_session(key_id.clone(), None, address, threshold)?;
generation_session.wait(None)
.expect("when wait is called without timeout it always returns Some; qed")
.map_err(Into::into)
return_session(address.and_then(|address| self.data.lock().cluster
.new_generation_session(key_id, None, address, threshold)))
}
fn restore_key_public(&self, key_id: &ServerKeyId, author: &Requester) -> Result<Public, Error> {
fn restore_key_public(
&self,
key_id: ServerKeyId,
author: Requester,
) -> Box<Future<Item=Public, Error=Error> + Send> {
// recover requestor' public key from signature
let address = author.address(key_id).map_err(Error::InsufficientRequesterData)?;
let session_and_address = author
.address(&key_id)
.map_err(Error::InsufficientRequesterData)
.and_then(|address| self.data.lock().cluster.new_key_version_negotiation_session(key_id)
.map(|session| (session, address)));
let (session, address) = match session_and_address {
Ok((session, address)) => (session, address),
Err(error) => return Box::new(err(error)),
};
// negotiate key version && retrieve common key data
let negotiation_session = self.data.lock().cluster.new_key_version_negotiation_session(*key_id)?;
negotiation_session.wait()
.and_then(|_| negotiation_session.common_key_data())
.and_then(|key_share| if key_share.author == address {
let core_session = session.session.clone();
Box::new(session.into_wait_future()
.and_then(move |_| core_session.common_key_data()
.map(|key_share| (key_share, address)))
.and_then(|(key_share, address)| if key_share.author == address {
Ok(key_share.public)
} else {
Err(Error::AccessDenied)
})
.map_err(Into::into)
}))
}
}
impl DocumentKeyServer for KeyServerImpl {
fn store_document_key(&self, key_id: &ServerKeyId, author: &Requester, common_point: Public, encrypted_document_key: Public) -> Result<(), Error> {
fn store_document_key(
&self,
key_id: ServerKeyId,
author: Requester,
common_point: Public,
encrypted_document_key: Public,
) -> Box<Future<Item=(), Error=Error> + Send> {
// store encrypted key
let encryption_session = self.data.lock().cluster.new_encryption_session(key_id.clone(),
author.clone(), common_point, encrypted_document_key)?;
encryption_session.wait(None).map_err(Into::into)
return_session(self.data.lock().cluster.new_encryption_session(key_id,
author.clone(), common_point, encrypted_document_key))
}
fn generate_document_key(&self, key_id: &ServerKeyId, author: &Requester, threshold: usize) -> Result<EncryptedDocumentKey, Error> {
fn generate_document_key(
&self,
key_id: ServerKeyId,
author: Requester,
threshold: usize,
) -> Box<Future<Item=EncryptedDocumentKey, Error=Error> + Send> {
// recover requestor' public key from signature
let public = author.public(key_id).map_err(Error::InsufficientRequesterData)?;
let public = result(author.public(&key_id).map_err(Error::InsufficientRequesterData));
// generate server key
let server_key = self.generate_key(key_id, author, threshold)?;
let data = self.data.clone();
let server_key = public.and_then(move |public| {
let data = data.lock();
let session = data.cluster.new_generation_session(key_id, None, public_to_address(&public), threshold);
result(session.map(|session| (public, session)))
})
.and_then(|(public, session)| session.into_wait_future().map(move |server_key| (public, server_key)));
// generate random document key
let document_key = math::generate_random_point()?;
let encrypted_document_key = math::encrypt_secret(&document_key, &server_key)?;
let document_key = server_key.and_then(|(public, server_key)|
result(math::generate_random_point()
.and_then(|document_key| math::encrypt_secret(&document_key, &server_key)
.map(|encrypted_document_key| (public, document_key, encrypted_document_key))))
);
// store document key in the storage
self.store_document_key(key_id, author, encrypted_document_key.common_point, encrypted_document_key.encrypted_point)?;
let data = self.data.clone();
let stored_document_key = document_key.and_then(move |(public, document_key, encrypted_document_key)| {
let data = data.lock();
let session = data.cluster.new_encryption_session(key_id,
author.clone(), encrypted_document_key.common_point, encrypted_document_key.encrypted_point);
result(session.map(|session| (public, document_key, session)))
})
.and_then(|(public, document_key, session)| session.into_wait_future().map(move |_| (public, document_key)));
// encrypt document key with requestor public key
let document_key = crypto::ecies::encrypt(&public, &DEFAULT_MAC, document_key.as_bytes())
.map_err(|err| Error::Internal(format!("Error encrypting document key: {}", err)))?;
Ok(document_key)
let encrypted_document_key = stored_document_key
.and_then(|(public, document_key)| crypto::ecies::encrypt(&public, &DEFAULT_MAC, document_key.as_bytes())
.map_err(|err| Error::Internal(format!("Error encrypting document key: {}", err))));
Box::new(encrypted_document_key)
}
fn restore_document_key(&self, key_id: &ServerKeyId, requester: &Requester) -> Result<EncryptedDocumentKey, Error> {
fn restore_document_key(
&self,
key_id: ServerKeyId,
requester: Requester,
) -> Box<Future<Item=EncryptedDocumentKey, Error=Error> + Send> {
// recover requestor' public key from signature
let public = requester.public(key_id).map_err(Error::InsufficientRequesterData)?;
let public = result(requester.public(&key_id).map_err(Error::InsufficientRequesterData));
// decrypt document key
let decryption_session = self.data.lock().cluster.new_decryption_session(key_id.clone(),
None, requester.clone(), None, false, false)?;
let document_key = decryption_session.wait(None)
.expect("when wait is called without timeout it always returns Some; qed")?
.decrypted_secret;
let data = self.data.clone();
let stored_document_key = public.and_then(move |public| {
let data = data.lock();
let session = data.cluster.new_decryption_session(key_id, None, requester.clone(), None, false, false);
result(session.map(|session| (public, session)))
})
.and_then(|(public, session)| session.into_wait_future().map(move |document_key| (public, document_key)));
// encrypt document key with requestor public key
let document_key = crypto::ecies::encrypt(&public, &DEFAULT_MAC, document_key.as_bytes())
.map_err(|err| Error::Internal(format!("Error encrypting document key: {}", err)))?;
Ok(document_key)
let encrypted_document_key = stored_document_key
.and_then(|(public, document_key)|
crypto::ecies::encrypt(&public, &DEFAULT_MAC, document_key.decrypted_secret.as_bytes())
.map_err(|err| Error::Internal(format!("Error encrypting document key: {}", err))));
Box::new(encrypted_document_key)
}
fn restore_document_key_shadow(&self, key_id: &ServerKeyId, requester: &Requester) -> Result<EncryptedDocumentKeyShadow, Error> {
let decryption_session = self.data.lock().cluster.new_decryption_session(key_id.clone(),
None, requester.clone(), None, true, false)?;
decryption_session.wait(None)
.expect("when wait is called without timeout it always returns Some; qed")
.map_err(Into::into)
fn restore_document_key_shadow(
&self,
key_id: ServerKeyId,
requester: Requester,
) -> Box<Future<Item=EncryptedDocumentKeyShadow, Error=Error> + Send> {
return_session(self.data.lock().cluster.new_decryption_session(key_id,
None, requester.clone(), None, true, false))
}
}
impl MessageSigner for KeyServerImpl {
fn sign_message_schnorr(&self, key_id: &ServerKeyId, requester: &Requester, message: MessageHash) -> Result<EncryptedMessageSignature, Error> {
fn sign_message_schnorr(
&self,
key_id: ServerKeyId,
requester: Requester,
message: MessageHash,
) -> Box<Future<Item=EncryptedMessageSignature, Error=Error> + Send> {
// recover requestor' public key from signature
let public = requester.public(key_id).map_err(Error::InsufficientRequesterData)?;
let public = result(requester.public(&key_id).map_err(Error::InsufficientRequesterData));
// sign message
let signing_session = self.data.lock().cluster.new_schnorr_signing_session(key_id.clone(),
requester.clone().into(), None, message)?;
let message_signature = signing_session.wait()?;
let data = self.data.clone();
let signature = public.and_then(move |public| {
let data = data.lock();
let session = data.cluster.new_schnorr_signing_session(key_id, requester.clone().into(), None, message);
result(session.map(|session| (public, session)))
})
.and_then(|(public, session)| session.into_wait_future().map(move |signature| (public, signature)));
// compose two message signature components into single one
let combined_signature = signature.map(|(public, signature)| {
let mut combined_signature = [0; 64];
combined_signature[..32].clone_from_slice(message_signature.0.as_bytes());
combined_signature[32..].clone_from_slice(message_signature.1.as_bytes());
combined_signature[..32].clone_from_slice(signature.0.as_bytes());
combined_signature[32..].clone_from_slice(signature.1.as_bytes());
(public, combined_signature)
});
// encrypt combined signature with requestor public key
let message_signature = crypto::ecies::encrypt(&public, &DEFAULT_MAC, &combined_signature)
.map_err(|err| Error::Internal(format!("Error encrypting message signature: {}", err)))?;
Ok(message_signature)
// encrypt signature with requestor public key
let encrypted_signature = combined_signature
.and_then(|(public, combined_signature)| crypto::ecies::encrypt(&public, &DEFAULT_MAC, &combined_signature)
.map_err(|err| Error::Internal(format!("Error encrypting message signature: {}", err))));
Box::new(encrypted_signature)
}
fn sign_message_ecdsa(&self, key_id: &ServerKeyId, requester: &Requester, message: MessageHash) -> Result<EncryptedMessageSignature, Error> {
fn sign_message_ecdsa(
&self,
key_id: ServerKeyId,
requester: Requester,
message: MessageHash,
) -> Box<Future<Item=EncryptedMessageSignature, Error=Error> + Send> {
// recover requestor' public key from signature
let public = requester.public(key_id).map_err(Error::InsufficientRequesterData)?;
let public = result(requester.public(&key_id).map_err(Error::InsufficientRequesterData));
// sign message
let signing_session = self.data.lock().cluster.new_ecdsa_signing_session(key_id.clone(),
requester.clone().into(), None, message)?;
let message_signature = signing_session.wait()?;
let data = self.data.clone();
let signature = public.and_then(move |public| {
let data = data.lock();
let session = data.cluster.new_ecdsa_signing_session(key_id, requester.clone().into(), None, message);
result(session.map(|session| (public, session)))
})
.and_then(|(public, session)| session.into_wait_future().map(move |signature| (public, signature)));
// encrypt combined signature with requestor public key
let message_signature = crypto::ecies::encrypt(&public, &DEFAULT_MAC, &*message_signature)
.map_err(|err| Error::Internal(format!("Error encrypting message signature: {}", err)))?;
Ok(message_signature)
let encrypted_signature = signature
.and_then(|(public, signature)| crypto::ecies::encrypt(&public, &DEFAULT_MAC, &*signature)
.map_err(|err| Error::Internal(format!("Error encrypting message signature: {}", err))));
Box::new(encrypted_signature)
}
}
@ -215,6 +296,15 @@ impl KeyServerCore {
}
}
fn return_session<S: ClusterSession>(
session: Result<WaitableSession<S>, Error>,
) -> Box<Future<Item=S::SuccessfulResult, Error=Error> + Send> {
match session {
Ok(session) => Box::new(session.into_wait_future()),
Err(error) => Box::new(err(error))
}
}
#[cfg(test)]
pub mod tests {
use std::collections::BTreeSet;
@ -222,6 +312,7 @@ pub mod tests {
use std::sync::Arc;
use std::net::SocketAddr;
use std::collections::BTreeMap;
use futures::Future;
use crypto::DEFAULT_MAC;
use ethkey::{self, crypto, Secret, Random, Generator, verify_public};
use acl_storage::DummyAclStorage;
@ -244,45 +335,88 @@ pub mod tests {
impl KeyServer for DummyKeyServer {}
impl AdminSessionsServer for DummyKeyServer {
fn change_servers_set(&self, _old_set_signature: RequestSignature, _new_set_signature: RequestSignature, _new_servers_set: BTreeSet<NodeId>) -> Result<(), Error> {
fn change_servers_set(
&self,
_old_set_signature: RequestSignature,
_new_set_signature: RequestSignature,
_new_servers_set: BTreeSet<NodeId>,
) -> Box<Future<Item=(), Error=Error> + Send> {
unimplemented!("test-only")
}
}
impl ServerKeyGenerator for DummyKeyServer {
fn generate_key(&self, _key_id: &ServerKeyId, _author: &Requester, _threshold: usize) -> Result<Public, Error> {
fn generate_key(
&self,
_key_id: ServerKeyId,
_author: Requester,
_threshold: usize,
) -> Box<Future<Item=Public, Error=Error> + Send> {
unimplemented!("test-only")
}
fn restore_key_public(&self, _key_id: &ServerKeyId, _author: &Requester) -> Result<Public, Error> {
fn restore_key_public(
&self,
_key_id: ServerKeyId,
_author: Requester,
) -> Box<Future<Item=Public, Error=Error> + Send> {
unimplemented!("test-only")
}
}
impl DocumentKeyServer for DummyKeyServer {
fn store_document_key(&self, _key_id: &ServerKeyId, _author: &Requester, _common_point: Public, _encrypted_document_key: Public) -> Result<(), Error> {
fn store_document_key(
&self,
_key_id: ServerKeyId,
_author: Requester,
_common_point: Public,
_encrypted_document_key: Public,
) -> Box<Future<Item=(), Error=Error> + Send> {
unimplemented!("test-only")
}
fn generate_document_key(&self, _key_id: &ServerKeyId, _author: &Requester, _threshold: usize) -> Result<EncryptedDocumentKey, Error> {
fn generate_document_key(
&self,
_key_id: ServerKeyId,
_author: Requester,
_threshold: usize,
) -> Box<Future<Item=EncryptedDocumentKey, Error=Error> + Send> {
unimplemented!("test-only")
}
fn restore_document_key(&self, _key_id: &ServerKeyId, _requester: &Requester) -> Result<EncryptedDocumentKey, Error> {
fn restore_document_key(
&self,
_key_id: ServerKeyId,
_requester: Requester,
) -> Box<Future<Item=EncryptedDocumentKey, Error=Error> + Send> {
unimplemented!("test-only")
}
fn restore_document_key_shadow(&self, _key_id: &ServerKeyId, _requester: &Requester) -> Result<EncryptedDocumentKeyShadow, Error> {
fn restore_document_key_shadow(
&self,
_key_id: ServerKeyId,
_requester: Requester,
) -> Box<Future<Item=EncryptedDocumentKeyShadow, Error=Error> + Send> {
unimplemented!("test-only")
}
}
impl MessageSigner for DummyKeyServer {
fn sign_message_schnorr(&self, _key_id: &ServerKeyId, _requester: &Requester, _message: MessageHash) -> Result<EncryptedMessageSignature, Error> {
fn sign_message_schnorr(
&self,
_key_id: ServerKeyId,
_requester: Requester,
_message: MessageHash,
) -> Box<Future<Item=EncryptedMessageSignature, Error=Error> + Send> {
unimplemented!("test-only")
}
fn sign_message_ecdsa(&self, _key_id: &ServerKeyId, _requester: &Requester, _message: MessageHash) -> Result<EncryptedMessageSignature, Error> {
fn sign_message_ecdsa(
&self,
_key_id: ServerKeyId,
_requester: Requester,
_message: MessageHash,
) -> Box<Future<Item=EncryptedMessageSignature, Error=Error> + Send> {
unimplemented!("test-only")
}
}
@ -355,13 +489,20 @@ pub mod tests {
let threshold = 0;
let document = Random.generate().unwrap().secret().clone();
let secret = Random.generate().unwrap().secret().clone();
let signature = ethkey::sign(&secret, &document).unwrap();
let generated_key = key_servers[0].generate_document_key(&document, &signature.clone().into(), threshold).unwrap();
let signature: Requester = ethkey::sign(&secret, &document).unwrap().into();
let generated_key = key_servers[0].generate_document_key(
*document,
signature.clone(),
threshold,
).wait().unwrap();
let generated_key = crypto::ecies::decrypt(&secret, &DEFAULT_MAC, &generated_key).unwrap();
// now let's try to retrieve key back
for key_server in key_servers.iter() {
let retrieved_key = key_server.restore_document_key(&document, &signature.clone().into()).unwrap();
let retrieved_key = key_server.restore_document_key(
*document,
signature.clone(),
).wait().unwrap();
let retrieved_key = crypto::ecies::decrypt(&secret, &DEFAULT_MAC, &retrieved_key).unwrap();
assert_eq!(retrieved_key, generated_key);
}
@ -378,13 +519,20 @@ pub mod tests {
// generate document key
let document = Random.generate().unwrap().secret().clone();
let secret = Random.generate().unwrap().secret().clone();
let signature = ethkey::sign(&secret, &document).unwrap();
let generated_key = key_servers[0].generate_document_key(&document, &signature.clone().into(), *threshold).unwrap();
let signature: Requester = ethkey::sign(&secret, &document).unwrap().into();
let generated_key = key_servers[0].generate_document_key(
*document,
signature.clone(),
*threshold,
).wait().unwrap();
let generated_key = crypto::ecies::decrypt(&secret, &DEFAULT_MAC, &generated_key).unwrap();
// now let's try to retrieve key back
for (i, key_server) in key_servers.iter().enumerate() {
let retrieved_key = key_server.restore_document_key(&document, &signature.clone().into()).unwrap();
let retrieved_key = key_server.restore_document_key(
*document,
signature.clone(),
).wait().unwrap();
let retrieved_key = crypto::ecies::decrypt(&secret, &DEFAULT_MAC, &retrieved_key).unwrap();
assert_eq!(retrieved_key, generated_key);
@ -406,20 +554,24 @@ pub mod tests {
// generate server key
let server_key_id = Random.generate().unwrap().secret().clone();
let requestor_secret = Random.generate().unwrap().secret().clone();
let signature = ethkey::sign(&requestor_secret, &server_key_id).unwrap();
let server_public = key_servers[0].generate_key(&server_key_id, &signature.clone().into(), *threshold).unwrap();
let signature: Requester = ethkey::sign(&requestor_secret, &server_key_id).unwrap().into();
let server_public = key_servers[0].generate_key(
*server_key_id,
signature.clone(),
*threshold,
).wait().unwrap();
// generate document key (this is done by KS client so that document key is unknown to any KS)
let generated_key = Random.generate().unwrap().public().clone();
let encrypted_document_key = math::encrypt_secret(&generated_key, &server_public).unwrap();
// store document key
key_servers[0].store_document_key(&server_key_id, &signature.clone().into(),
encrypted_document_key.common_point, encrypted_document_key.encrypted_point).unwrap();
key_servers[0].store_document_key(*server_key_id, signature.clone(),
encrypted_document_key.common_point, encrypted_document_key.encrypted_point).wait().unwrap();
// now let's try to retrieve key back
for key_server in key_servers.iter() {
let retrieved_key = key_server.restore_document_key(&server_key_id, &signature.clone().into()).unwrap();
let retrieved_key = key_server.restore_document_key(*server_key_id, signature.clone()).wait().unwrap();
let retrieved_key = crypto::ecies::decrypt(&requestor_secret, &DEFAULT_MAC, &retrieved_key).unwrap();
let retrieved_key = Public::from_slice(&retrieved_key);
assert_eq!(retrieved_key, generated_key);
@ -438,12 +590,20 @@ pub mod tests {
// generate server key
let server_key_id = Random.generate().unwrap().secret().clone();
let requestor_secret = Random.generate().unwrap().secret().clone();
let signature = ethkey::sign(&requestor_secret, &server_key_id).unwrap();
let server_public = key_servers[0].generate_key(&server_key_id, &signature.clone().into(), *threshold).unwrap();
let signature: Requester = ethkey::sign(&requestor_secret, &server_key_id).unwrap().into();
let server_public = key_servers[0].generate_key(
*server_key_id,
signature.clone(),
*threshold,
).wait().unwrap();
// sign message
let message_hash = H256::from_low_u64_be(42);
let combined_signature = key_servers[0].sign_message_schnorr(&server_key_id, &signature.into(), message_hash.clone()).unwrap();
let combined_signature = key_servers[0].sign_message_schnorr(
*server_key_id,
signature,
message_hash,
).wait().unwrap();
let combined_signature = crypto::ecies::decrypt(&requestor_secret, &DEFAULT_MAC, &combined_signature).unwrap();
let signature_c = Secret::from_slice(&combined_signature[..32]).unwrap();
let signature_s = Secret::from_slice(&combined_signature[32..]).unwrap();
@ -463,15 +623,19 @@ pub mod tests {
let threshold = 0;
let document = Random.generate().unwrap().secret().clone();
let secret = Random.generate().unwrap().secret().clone();
let signature = ethkey::sign(&secret, &document).unwrap();
let generated_key = key_servers[0].generate_document_key(&document, &signature.clone().into(), threshold).unwrap();
let signature: Requester = ethkey::sign(&secret, &document).unwrap().into();
let generated_key = key_servers[0].generate_document_key(
*document,
signature.clone(),
threshold,
).wait().unwrap();
let generated_key = crypto::ecies::decrypt(&secret, &DEFAULT_MAC, &generated_key).unwrap();
// remove key from node0
key_storages[0].remove(&document).unwrap();
// now let's try to retrieve key back by requesting it from node0, so that session must be delegated
let retrieved_key = key_servers[0].restore_document_key(&document, &signature.into()).unwrap();
let retrieved_key = key_servers[0].restore_document_key(*document, signature).wait().unwrap();
let retrieved_key = crypto::ecies::decrypt(&secret, &DEFAULT_MAC, &retrieved_key).unwrap();
assert_eq!(retrieved_key, generated_key);
drop(runtime);
@ -486,15 +650,19 @@ pub mod tests {
// generate server key
let server_key_id = Random.generate().unwrap().secret().clone();
let requestor_secret = Random.generate().unwrap().secret().clone();
let signature = ethkey::sign(&requestor_secret, &server_key_id).unwrap();
let server_public = key_servers[0].generate_key(&server_key_id, &signature.clone().into(), threshold).unwrap();
let signature: Requester = ethkey::sign(&requestor_secret, &server_key_id).unwrap().into();
let server_public = key_servers[0].generate_key(*server_key_id, signature.clone(), threshold).wait().unwrap();
// remove key from node0
key_storages[0].remove(&server_key_id).unwrap();
// sign message
let message_hash = H256::from_low_u64_be(42);
let combined_signature = key_servers[0].sign_message_schnorr(&server_key_id, &signature.into(), message_hash.clone()).unwrap();
let combined_signature = key_servers[0].sign_message_schnorr(
*server_key_id,
signature,
message_hash,
).wait().unwrap();
let combined_signature = crypto::ecies::decrypt(&requestor_secret, &DEFAULT_MAC, &combined_signature).unwrap();
let signature_c = Secret::from_slice(&combined_signature[..32]).unwrap();
let signature_s = Secret::from_slice(&combined_signature[32..]).unwrap();
@ -514,14 +682,22 @@ pub mod tests {
let server_key_id = Random.generate().unwrap().secret().clone();
let requestor_secret = Random.generate().unwrap().secret().clone();
let signature = ethkey::sign(&requestor_secret, &server_key_id).unwrap();
let server_public = key_servers[0].generate_key(&server_key_id, &signature.clone().into(), threshold).unwrap();
let server_public = key_servers[0].generate_key(
*server_key_id,
signature.clone().into(),
threshold,
).wait().unwrap();
// remove key from node0
key_storages[0].remove(&server_key_id).unwrap();
// sign message
let message_hash = H256::random();
let signature = key_servers[0].sign_message_ecdsa(&server_key_id, &signature.into(), message_hash.clone()).unwrap();
let signature = key_servers[0].sign_message_ecdsa(
*server_key_id,
signature.clone().into(),
message_hash,
).wait().unwrap();
let signature = crypto::ecies::decrypt(&requestor_secret, &DEFAULT_MAC, &signature).unwrap();
let signature = H520::from_slice(&signature[0..65]);

View File

@ -18,10 +18,11 @@ use std::sync::Arc;
use std::collections::{BTreeSet, BTreeMap};
use ethereum_types::{Address, H256};
use ethkey::Secret;
use parking_lot::{Mutex, Condvar};
use futures::Oneshot;
use parking_lot::Mutex;
use key_server_cluster::{Error, SessionId, NodeId, DocumentKeyShare};
use key_server_cluster::cluster::Cluster;
use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSession};
use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSession, CompletionSignal};
use key_server_cluster::decryption_session::SessionImpl as DecryptionSession;
use key_server_cluster::signing_session_ecdsa::SessionImpl as EcdsaSigningSession;
use key_server_cluster::signing_session_schnorr::SessionImpl as SchnorrSigningSession;
@ -87,8 +88,8 @@ struct SessionCore<T: SessionTransport> {
pub transport: T,
/// Session nonce.
pub nonce: u64,
/// SessionImpl completion condvar.
pub completed: Condvar,
/// Session completion signal.
pub completed: CompletionSignal<Option<(H256, NodeId)>>,
}
/// Mutable session data.
@ -166,8 +167,9 @@ pub struct LargestSupportResultComputer;
impl<T> SessionImpl<T> where T: SessionTransport {
/// Create new session.
pub fn new(params: SessionParams<T>) -> Self {
SessionImpl {
pub fn new(params: SessionParams<T>) -> (Self, Oneshot<Result<Option<(H256, NodeId)>, Error>>) {
let (completed, oneshot) = CompletionSignal::new();
(SessionImpl {
core: SessionCore {
meta: params.meta,
sub_session: params.sub_session,
@ -175,7 +177,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
result_computer: params.result_computer,
transport: params.transport,
nonce: params.nonce,
completed: Condvar::new(),
completed,
},
data: Mutex::new(SessionData {
state: SessionState::WaitingForInitialization,
@ -191,7 +193,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
continue_with: None,
failed_continue_with: None,
})
}
}, oneshot)
}
/// Return session meta.
@ -221,10 +223,9 @@ impl<T> SessionImpl<T> where T: SessionTransport {
self.data.lock().failed_continue_with.take()
}
/// Wait for session completion.
pub fn wait(&self) -> Result<Option<(H256, NodeId)>, Error> {
Self::wait_session(&self.core.completed, &self.data, None, |data| data.result.clone())
.expect("wait_session returns Some if called without timeout; qed")
/// Return session completion result (if available).
pub fn result(&self) -> Option<Result<Option<(H256, NodeId)>, Error>> {
self.data.lock().result.clone()
}
/// Retrieve common key data (author, threshold, public), if available.
@ -344,7 +345,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
// update state
data.state = SessionState::Finished;
data.result = Some(Ok(None));
self.core.completed.notify_all();
self.core.completed.send(Ok(None));
Ok(())
}
@ -450,15 +451,18 @@ impl<T> SessionImpl<T> where T: SessionTransport {
}
}
let result = result.map(Some);
data.state = SessionState::Finished;
data.result = Some(result.map(Some));
core.completed.notify_all();
data.result = Some(result.clone());
core.completed.send(result);
}
}
}
impl<T> ClusterSession for SessionImpl<T> where T: SessionTransport {
type Id = SessionIdWithSubSession;
type CreationData = ();
type SuccessfulResult = Option<(H256, NodeId)>;
fn type_name() -> &'static str {
"version negotiation"
@ -482,7 +486,7 @@ impl<T> ClusterSession for SessionImpl<T> where T: SessionTransport {
warn!(target: "secretstore_net", "{}: key version negotiation session failed with timeout", self.core.meta.self_node_id);
data.result = Some(Err(Error::ConsensusTemporaryUnreachable));
self.core.completed.notify_all();
self.core.completed.send(Err(Error::ConsensusTemporaryUnreachable));
}
}
}
@ -510,8 +514,8 @@ impl<T> ClusterSession for SessionImpl<T> where T: SessionTransport {
self.core.meta.self_node_id, error, node);
data.state = SessionState::Finished;
data.result = Some(Err(error));
self.core.completed.notify_all();
data.result = Some(Err(error.clone()));
self.core.completed.send(Err(error));
}
fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error> {
@ -698,7 +702,7 @@ mod tests {
cluster: cluster,
},
nonce: 0,
}),
}).0,
})
}).collect(),
queue: VecDeque::new(),

View File

@ -17,13 +17,14 @@
use std::sync::Arc;
use std::collections::{BTreeSet, BTreeMap};
use std::collections::btree_map::Entry;
use parking_lot::{Mutex, Condvar};
use futures::Oneshot;
use parking_lot::Mutex;
use ethereum_types::H256;
use ethkey::{Public, Signature};
use key_server_cluster::{Error, NodeId, SessionId, KeyStorage};
use key_server_cluster::math;
use key_server_cluster::cluster::Cluster;
use key_server_cluster::cluster_sessions::ClusterSession;
use key_server_cluster::cluster_sessions::{ClusterSession, CompletionSignal};
use key_server_cluster::message::{Message, ServersSetChangeMessage,
ConsensusMessageWithServersSet, InitializeConsensusSessionWithServersSet,
ServersSetChangeConsensusMessage, ConfirmConsensusInitialization, UnknownSessionsRequest, UnknownSessions,
@ -93,8 +94,8 @@ struct SessionCore {
pub admin_public: Public,
/// Migration id (if this session is a part of auto-migration process).
pub migration_id: Option<H256>,
/// SessionImpl completion condvar.
pub completed: Condvar,
/// Session completion signal.
pub completed: CompletionSignal<()>,
}
/// Servers set change consensus session type.
@ -182,8 +183,9 @@ struct ServersSetChangeKeyVersionNegotiationTransport {
impl SessionImpl {
/// Create new servers set change session.
pub fn new(params: SessionParams) -> Result<Self, Error> {
Ok(SessionImpl {
pub fn new(params: SessionParams) -> Result<(Self, Oneshot<Result<(), Error>>), Error> {
let (completed, oneshot) = CompletionSignal::new();
Ok((SessionImpl {
core: SessionCore {
meta: params.meta,
cluster: params.cluster,
@ -192,7 +194,7 @@ impl SessionImpl {
all_nodes_set: params.all_nodes_set,
admin_public: params.admin_public,
migration_id: params.migration_id,
completed: Condvar::new(),
completed,
},
data: Mutex::new(SessionData {
state: SessionState::EstablishingConsensus,
@ -205,7 +207,7 @@ impl SessionImpl {
active_key_sessions: BTreeMap::new(),
result: None,
}),
})
}, oneshot))
}
/// Get session id.
@ -218,10 +220,9 @@ impl SessionImpl {
self.core.migration_id.as_ref()
}
/// Wait for session completion.
pub fn wait(&self) -> Result<(), Error> {
Self::wait_session(&self.core.completed, &self.data, None, |data| data.result.clone())
.expect("wait_session returns Some if called without timeout; qed")
/// Return session completion result (if available).
pub fn result(&self) -> Option<Result<(), Error>> {
self.data.lock().result.clone()
}
/// Initialize servers set change session on master node.
@ -423,7 +424,7 @@ impl SessionImpl {
&KeyVersionNegotiationMessage::RequestKeyVersions(ref message) if sender == &self.core.meta.master_node_id => {
let key_id = message.session.clone().into();
let key_share = self.core.key_storage.get(&key_id)?;
let negotiation_session = KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams {
let (negotiation_session, _) = KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams {
meta: ShareChangeSessionMeta {
id: key_id.clone(),
self_node_id: self.core.meta.self_node_id.clone(),
@ -671,7 +672,7 @@ impl SessionImpl {
}
data.state = SessionState::Finished;
self.core.completed.notify_all();
self.core.completed.send(Ok(()));
Ok(())
}
@ -741,7 +742,7 @@ impl SessionImpl {
};
let key_share = core.key_storage.get(&key_id)?;
let negotiation_session = KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams {
let (negotiation_session, _) = KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams {
meta: ShareChangeSessionMeta {
id: key_id,
self_node_id: core.meta.self_node_id.clone(),
@ -797,7 +798,8 @@ impl SessionImpl {
let negotiation_session = data.negotiation_sessions.remove(&key_id)
.expect("share change session is only initialized when negotiation is completed; qed");
let (selected_version, selected_master) = negotiation_session
.wait()?
.result()
.expect("share change session is only initialized when negotiation is completed; qed")?
.expect("initialize_share_change_session is only called on share change master; negotiation session completes with some on master; qed");
let selected_version_holders = negotiation_session.version_holders(&selected_version)?;
let selected_version_threshold = negotiation_session.common_key_data()?.threshold;
@ -882,7 +884,7 @@ impl SessionImpl {
if data.result.is_some() && data.active_key_sessions.len() == 0 {
data.state = SessionState::Finished;
core.completed.notify_all();
core.completed.send(Ok(()));
}
Ok(())
@ -907,7 +909,7 @@ impl SessionImpl {
data.state = SessionState::Finished;
data.result = Some(Ok(()));
core.completed.notify_all();
core.completed.send(Ok(()));
Ok(())
}
@ -915,6 +917,8 @@ impl SessionImpl {
impl ClusterSession for SessionImpl {
type Id = SessionId;
type CreationData = (); // never used directly
type SuccessfulResult = ();
fn type_name() -> &'static str {
"servers set change"
@ -954,8 +958,8 @@ impl ClusterSession for SessionImpl {
self.core.meta.self_node_id, error, node);
data.state = SessionState::Finished;
data.result = Some(Err(error));
self.core.completed.notify_all();
data.result = Some(Err(error.clone()));
self.core.completed.send(Err(error));
}
fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error> {
@ -1109,7 +1113,7 @@ pub mod tests {
nonce: 1,
admin_public: admin_public,
migration_id: None,
}).unwrap()
}).unwrap().0
}
}

View File

@ -18,10 +18,11 @@ use std::sync::Arc;
use std::collections::{BTreeSet, BTreeMap};
use ethereum_types::{H256, Address};
use ethkey::{Public, Secret, Signature};
use parking_lot::{Mutex, Condvar};
use futures::Oneshot;
use parking_lot::Mutex;
use key_server_cluster::{Error, SessionId, NodeId, DocumentKeyShare, DocumentKeyShareVersion, KeyStorage};
use key_server_cluster::cluster::Cluster;
use key_server_cluster::cluster_sessions::ClusterSession;
use key_server_cluster::cluster_sessions::{ClusterSession, CompletionSignal};
use key_server_cluster::math;
use key_server_cluster::message::{Message, ShareAddMessage, ShareAddConsensusMessage, ConsensusMessageOfShareAdd,
InitializeConsensusSessionOfShareAdd, KeyShareCommon, NewKeysDissemination, ShareAddError,
@ -71,8 +72,8 @@ struct SessionCore<T: SessionTransport> {
pub key_storage: Arc<KeyStorage>,
/// Administrator public key.
pub admin_public: Option<Public>,
/// SessionImpl completion condvar.
pub completed: Condvar,
/// Session completion signal.
pub completed: CompletionSignal<()>,
}
/// Share add consensus session type.
@ -158,10 +159,10 @@ pub struct IsolatedSessionTransport {
impl<T> SessionImpl<T> where T: SessionTransport {
/// Create new share addition session.
pub fn new(params: SessionParams<T>) -> Result<Self, Error> {
pub fn new(params: SessionParams<T>) -> Result<(Self, Oneshot<Result<(), Error>>), Error> {
let key_share = params.key_storage.get(&params.meta.id)?;
Ok(SessionImpl {
let (completed, oneshot) = CompletionSignal::new();
Ok((SessionImpl {
core: SessionCore {
meta: params.meta,
nonce: params.nonce,
@ -169,7 +170,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
transport: params.transport,
key_storage: params.key_storage,
admin_public: params.admin_public,
completed: Condvar::new(),
completed,
},
data: Mutex::new(SessionData {
state: SessionState::ConsensusEstablishing,
@ -181,7 +182,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
secret_subshares: None,
result: None,
}),
})
}, oneshot))
}
/// Set pre-established consensus data.
@ -752,7 +753,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
// signal session completion
data.state = SessionState::Finished;
data.result = Some(Ok(()));
core.completed.notify_all();
core.completed.send(Ok(()));
Ok(())
}
@ -760,6 +761,8 @@ impl<T> SessionImpl<T> where T: SessionTransport {
impl<T> ClusterSession for SessionImpl<T> where T: SessionTransport {
type Id = SessionId;
type CreationData = (); // never used directly
type SuccessfulResult = ();
fn type_name() -> &'static str {
"share add"
@ -801,8 +804,8 @@ impl<T> ClusterSession for SessionImpl<T> where T: SessionTransport {
self.core.meta.self_node_id, error, node);
data.state = SessionState::Finished;
data.result = Some(Err(error));
self.core.completed.notify_all();
data.result = Some(Err(error.clone()));
self.core.completed.send(Err(error));
}
fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error> {
@ -914,7 +917,7 @@ pub mod tests {
key_storage,
admin_public: Some(admin_public),
nonce: 1,
}).unwrap()
}).unwrap().0
}
}

View File

@ -166,7 +166,7 @@ impl ShareChangeSession {
let consensus_group = self.consensus_group.take().ok_or(Error::InvalidStateForRequest)?;
let version_holders = self.version_holders.take().ok_or(Error::InvalidStateForRequest)?;
let new_nodes_map = self.new_nodes_map.take().ok_or(Error::InvalidStateForRequest)?;
let share_add_session = ShareAddSessionImpl::new(ShareAddSessionParams {
let (share_add_session, _) = ShareAddSessionImpl::new(ShareAddSessionParams {
meta: self.meta.clone(),
nonce: self.nonce,
transport: ShareChangeTransport::new(self.session_id, self.nonce, self.cluster.clone()),

View File

@ -16,14 +16,14 @@
use std::collections::{BTreeSet, BTreeMap};
use std::sync::Arc;
use std::time;
use parking_lot::{Mutex, Condvar};
use futures::Oneshot;
use parking_lot::Mutex;
use ethereum_types::{Address, H256};
use ethkey::Secret;
use key_server_cluster::{Error, AclStorage, DocumentKeyShare, NodeId, SessionId, Requester,
EncryptedDocumentKeyShadow, SessionMeta};
use key_server_cluster::cluster::Cluster;
use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSession};
use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSession, CompletionSignal};
use key_server_cluster::message::{Message, DecryptionMessage, DecryptionConsensusMessage, RequestPartialDecryption,
PartialDecryption, DecryptionSessionError, DecryptionSessionCompleted, ConsensusMessage, InitializeConsensusSession,
ConfirmConsensusInitialization, DecryptionSessionDelegation, DecryptionSessionDelegationCompleted};
@ -59,8 +59,8 @@ struct SessionCore {
pub cluster: Arc<Cluster>,
/// Session-level nonce.
pub nonce: u64,
/// SessionImpl completion condvar.
pub completed: Condvar,
/// Session completion signal.
pub completed: CompletionSignal<EncryptedDocumentKeyShadow>,
}
/// Decryption consensus session type.
@ -147,7 +147,10 @@ enum DelegationStatus {
impl SessionImpl {
/// Create new decryption session.
pub fn new(params: SessionParams, requester: Option<Requester>) -> Result<Self, Error> {
pub fn new(
params: SessionParams,
requester: Option<Requester>,
) -> Result<(Self, Oneshot<Result<EncryptedDocumentKeyShadow, Error>>), Error> {
debug_assert_eq!(params.meta.threshold, params.key_share.as_ref().map(|ks| ks.threshold).unwrap_or_default());
// check that common_point and encrypted_point are already set
@ -175,14 +178,15 @@ impl SessionImpl {
consensus_transport: consensus_transport,
})?;
Ok(SessionImpl {
let (completed, oneshot) = CompletionSignal::new();
Ok((SessionImpl {
core: SessionCore {
meta: params.meta,
access_key: params.access_key,
key_share: params.key_share,
cluster: params.cluster,
nonce: params.nonce,
completed: Condvar::new(),
completed,
},
data: Mutex::new(SessionData {
version: None,
@ -194,7 +198,7 @@ impl SessionImpl {
delegation_status: None,
result: None,
}),
})
}, oneshot))
}
/// Get this node id.
@ -209,7 +213,7 @@ impl SessionImpl {
&self.core.access_key
}
/// Get session state.
/// Get session state (tests only).
#[cfg(test)]
pub fn state(&self) -> ConsensusSessionState {
self.data.lock().consensus_session.state()
@ -231,9 +235,9 @@ impl SessionImpl {
self.data.lock().origin.clone()
}
/// Wait for session completion.
pub fn wait(&self, timeout: Option<time::Duration>) -> Option<Result<EncryptedDocumentKeyShadow, Error>> {
Self::wait_session(&self.core.completed, &self.data, timeout, |data| data.result.clone())
/// Get session completion result (if available).
pub fn result(&self) -> Option<Result<EncryptedDocumentKeyShadow, Error>> {
self.data.lock().result.clone()
}
/// Get broadcasted shadows.
@ -667,13 +671,15 @@ impl SessionImpl {
};
}
data.result = Some(result);
core.completed.notify_all();
data.result = Some(result.clone());
core.completed.send(result);
}
}
impl ClusterSession for SessionImpl {
type Id = SessionIdWithSubSession;
type CreationData = Requester;
type SuccessfulResult = EncryptedDocumentKeyShadow;
fn type_name() -> &'static str {
"decryption"
@ -832,7 +838,7 @@ pub fn create_default_decryption_session() -> Arc<SessionImpl> {
acl_storage: Arc::new(DummyAclStorage::default()),
cluster: Arc::new(DummyCluster::new(Default::default())),
nonce: 0,
}, Some(Requester::Public(H512::from_low_u64_be(2)))).unwrap())
}, Some(Requester::Public(H512::from_low_u64_be(2)))).unwrap().0)
}
#[cfg(test)]
@ -915,7 +921,7 @@ mod tests {
acl_storage: acl_storages[i].clone(),
cluster: clusters[i].clone(),
nonce: 0,
}, if i == 0 { signature.clone().map(Into::into) } else { None }).unwrap()).collect();
}, if i == 0 { signature.clone().map(Into::into) } else { None }).unwrap().0).collect();
(requester, clusters, acl_storages, sessions)
}
@ -1014,7 +1020,9 @@ mod tests {
acl_storage: Arc::new(DummyAclStorage::default()),
cluster: Arc::new(DummyCluster::new(self_node_id.clone())),
nonce: 0,
}, Some(Requester::Signature(ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap()))).unwrap();
}, Some(Requester::Signature(
ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap()
))).unwrap().0;
assert_eq!(session.initialize(Default::default(), Default::default(), false, false), Err(Error::InvalidMessage));
}
@ -1049,7 +1057,9 @@ mod tests {
acl_storage: Arc::new(DummyAclStorage::default()),
cluster: Arc::new(DummyCluster::new(self_node_id.clone())),
nonce: 0,
}, Some(Requester::Signature(ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap()))).unwrap();
}, Some(Requester::Signature(
ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap()
))).unwrap().0;
assert_eq!(session.initialize(Default::default(), Default::default(), false, false), Err(Error::ConsensusUnreachable));
}

View File

@ -16,15 +16,15 @@
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter, Error as FmtError};
use std::time;
use std::sync::Arc;
use parking_lot::{Condvar, Mutex};
use futures::Oneshot;
use parking_lot::Mutex;
use ethereum_types::Address;
use ethkey::Public;
use key_server_cluster::{Error, NodeId, SessionId, Requester, KeyStorage,
DocumentKeyShare, ServerKeyId};
use key_server_cluster::cluster::Cluster;
use key_server_cluster::cluster_sessions::ClusterSession;
use key_server_cluster::cluster_sessions::{ClusterSession, CompletionSignal};
use key_server_cluster::message::{Message, EncryptionMessage, InitializeEncryptionSession,
ConfirmEncryptionInitialization, EncryptionSessionError};
@ -49,8 +49,8 @@ pub struct SessionImpl {
cluster: Arc<Cluster>,
/// Session nonce.
nonce: u64,
/// SessionImpl completion condvar.
completed: Condvar,
/// Session completion signal.
completed: CompletionSignal<()>,
/// Mutable session data.
data: Mutex<SessionData>,
}
@ -108,23 +108,24 @@ pub enum SessionState {
impl SessionImpl {
/// Create new encryption session.
pub fn new(params: SessionParams) -> Result<Self, Error> {
pub fn new(params: SessionParams) -> Result<(Self, Oneshot<Result<(), Error>>), Error> {
check_encrypted_data(params.encrypted_data.as_ref())?;
Ok(SessionImpl {
let (completed, oneshot) = CompletionSignal::new();
Ok((SessionImpl {
id: params.id,
self_node_id: params.self_node_id,
encrypted_data: params.encrypted_data,
key_storage: params.key_storage,
cluster: params.cluster,
nonce: params.nonce,
completed: Condvar::new(),
completed,
data: Mutex::new(SessionData {
state: SessionState::WaitingForInitialization,
nodes: BTreeMap::new(),
result: None,
}),
})
}, oneshot))
}
/// Get this node Id.
@ -132,12 +133,6 @@ impl SessionImpl {
&self.self_node_id
}
/// Wait for session completion.
pub fn wait(&self, timeout: Option<time::Duration>) -> Result<(), Error> {
Self::wait_session(&self.completed, &self.data, timeout, |data| data.result.clone())
.expect("wait_session returns Some if called without timeout; qed")
}
/// Start new session initialization. This must be called on master node.
pub fn initialize(&self, requester: Requester, common_point: Public, encrypted_point: Public) -> Result<(), Error> {
let mut data = self.data.lock();
@ -175,7 +170,7 @@ impl SessionImpl {
} else {
data.state = SessionState::Finished;
data.result = Some(Ok(()));
self.completed.notify_all();
self.completed.send(Ok(()));
Ok(())
}
@ -230,7 +225,7 @@ impl SessionImpl {
// update state
data.state = SessionState::Finished;
data.result = Some(Ok(()));
self.completed.notify_all();
self.completed.send(Ok(()));
Ok(())
}
@ -238,6 +233,8 @@ impl SessionImpl {
impl ClusterSession for SessionImpl {
type Id = SessionId;
type CreationData = ();
type SuccessfulResult = ();
fn type_name() -> &'static str {
"encryption"
@ -260,7 +257,7 @@ impl ClusterSession for SessionImpl {
data.state = SessionState::Failed;
data.result = Some(Err(Error::NodeDisconnected));
self.completed.notify_all();
self.completed.send(Err(Error::NodeDisconnected));
}
fn on_session_timeout(&self) {
@ -270,7 +267,7 @@ impl ClusterSession for SessionImpl {
data.state = SessionState::Failed;
data.result = Some(Err(Error::NodeDisconnected));
self.completed.notify_all();
self.completed.send(Err(Error::NodeDisconnected));
}
fn on_session_error(&self, node: &NodeId, error: Error) {
@ -290,8 +287,8 @@ impl ClusterSession for SessionImpl {
warn!("{}: encryption session failed with error: {} from {}", self.node(), error, node);
data.state = SessionState::Failed;
data.result = Some(Err(error));
self.completed.notify_all();
data.result = Some(Err(error.clone()));
self.completed.send(Err(error));
}
fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error> {

View File

@ -16,15 +16,15 @@
use std::collections::{BTreeSet, BTreeMap, VecDeque};
use std::fmt::{Debug, Formatter, Error as FmtError};
use std::time::Duration;
use std::sync::Arc;
use parking_lot::{Condvar, Mutex};
use futures::Oneshot;
use parking_lot::Mutex;
use ethereum_types::Address;
use ethkey::{Public, Secret};
use key_server_cluster::{Error, NodeId, SessionId, KeyStorage, DocumentKeyShare, DocumentKeyShareVersion};
use key_server_cluster::math;
use key_server_cluster::cluster::Cluster;
use key_server_cluster::cluster_sessions::ClusterSession;
use key_server_cluster::cluster_sessions::{ClusterSession, CompletionSignal};
use key_server_cluster::message::{Message, GenerationMessage, InitializeSession, ConfirmInitialization, CompleteInitialization,
KeysDissemination, PublicKeyShare, SessionError, SessionCompleted};
@ -47,10 +47,10 @@ pub struct SessionImpl {
cluster: Arc<Cluster>,
/// Session-level nonce.
nonce: u64,
/// SessionImpl completion condvar.
completed: Condvar,
/// Mutable session data.
data: Mutex<SessionData>,
/// Session completion signal.
completed: CompletionSignal<Public>,
}
/// SessionImpl creation parameters
@ -204,8 +204,9 @@ impl From<BTreeMap<NodeId, Secret>> for InitializationNodes {
impl SessionImpl {
/// Create new generation session.
pub fn new(params: SessionParams) -> Self {
SessionImpl {
pub fn new(params: SessionParams) -> (Self, Oneshot<Result<Public, Error>>) {
let (completed, oneshot) = CompletionSignal::new();
(SessionImpl {
id: params.id,
self_node_id: params.self_node_id,
key_storage: params.key_storage,
@ -213,7 +214,7 @@ impl SessionImpl {
// when nonce.is_nonce(), generation session is wrapped
// => nonce is checked somewhere else && we can pass any value
nonce: params.nonce.unwrap_or_default(),
completed: Condvar::new(),
completed,
data: Mutex::new(SessionData {
state: SessionState::WaitingForInitialization,
simulate_faulty_behaviour: false,
@ -230,7 +231,7 @@ impl SessionImpl {
key_share: None,
joint_public_and_secret: None,
}),
}
}, oneshot)
}
/// Get this node Id.
@ -259,10 +260,10 @@ impl SessionImpl {
self.data.lock().origin.clone()
}
/// Wait for session completion.
pub fn wait(&self, timeout: Option<Duration>) -> Option<Result<Public, Error>> {
Self::wait_session(&self.completed, &self.data, timeout, |data| data.joint_public_and_secret.clone()
.map(|r| r.map(|r| r.0.clone())))
/// Get session completion result (if available).
pub fn result(&self) -> Option<Result<Public, Error>> {
self.data.lock().joint_public_and_secret.clone()
.map(|r| r.map(|r| r.0.clone()))
}
/// Get generated public and secret (if any).
@ -328,8 +329,12 @@ impl SessionImpl {
self.verify_keys()?;
self.complete_generation()?;
self.data.lock().state = SessionState::Finished;
self.completed.notify_all();
let mut data = self.data.lock();
let result = data.joint_public_and_secret.clone()
.expect("session is instantly completed on a single node; qed")
.map(|(p, _, _)| p);
data.state = SessionState::Finished;
self.completed.send(result);
Ok(())
}
@ -619,8 +624,11 @@ impl SessionImpl {
}
// we have received enough confirmations => complete session
let result = data.joint_public_and_secret.clone()
.expect("we're on master node; we have received last completion confirmation; qed")
.map(|(p, _, _)| p);
data.state = SessionState::Finished;
self.completed.notify_all();
self.completed.send(result);
Ok(())
}
@ -813,6 +821,8 @@ impl SessionImpl {
impl ClusterSession for SessionImpl {
type Id = SessionId;
type CreationData = ();
type SuccessfulResult = Public;
fn type_name() -> &'static str {
"generation"
@ -838,7 +848,7 @@ impl ClusterSession for SessionImpl {
data.state = SessionState::Failed;
data.key_share = Some(Err(Error::NodeDisconnected));
data.joint_public_and_secret = Some(Err(Error::NodeDisconnected));
self.completed.notify_all();
self.completed.send(Err(Error::NodeDisconnected));
}
fn on_session_timeout(&self) {
@ -849,7 +859,7 @@ impl ClusterSession for SessionImpl {
data.state = SessionState::Failed;
data.key_share = Some(Err(Error::NodeDisconnected));
data.joint_public_and_secret = Some(Err(Error::NodeDisconnected));
self.completed.notify_all();
self.completed.send(Err(Error::NodeDisconnected));
}
fn on_session_error(&self, node: &NodeId, error: Error) {
@ -867,8 +877,8 @@ impl ClusterSession for SessionImpl {
let mut data = self.data.lock();
data.state = SessionState::Failed;
data.key_share = Some(Err(error.clone()));
data.joint_public_and_secret = Some(Err(error));
self.completed.notify_all();
data.joint_public_and_secret = Some(Err(error.clone()));
self.completed.send(Err(error));
}
fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error> {

View File

@ -17,12 +17,13 @@
use std::collections::{BTreeSet, BTreeMap};
use std::collections::btree_map::Entry;
use std::sync::Arc;
use parking_lot::{Mutex, Condvar};
use futures::Oneshot;
use parking_lot::Mutex;
use ethkey::{Public, Secret, Signature, sign};
use ethereum_types::H256;
use key_server_cluster::{Error, NodeId, SessionId, SessionMeta, AclStorage, DocumentKeyShare, Requester};
use key_server_cluster::cluster::{Cluster};
use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSession};
use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSession, CompletionSignal};
use key_server_cluster::generation_session::{SessionImpl as GenerationSession, SessionParams as GenerationSessionParams,
SessionState as GenerationSessionState};
use key_server_cluster::math;
@ -58,8 +59,8 @@ struct SessionCore {
pub cluster: Arc<Cluster>,
/// Session-level nonce.
pub nonce: u64,
/// SessionImpl completion condvar.
pub completed: Condvar,
/// Session completion signal.
pub completed: CompletionSignal<Signature>,
}
/// Signing consensus session type.
@ -170,7 +171,10 @@ enum DelegationStatus {
impl SessionImpl {
/// Create new signing session.
pub fn new(params: SessionParams, requester: Option<Requester>) -> Result<Self, Error> {
pub fn new(
params: SessionParams,
requester: Option<Requester>,
) -> Result<(Self, Oneshot<Result<Signature, Error>>), Error> {
debug_assert_eq!(params.meta.threshold, params.key_share.as_ref().map(|ks| ks.threshold).unwrap_or_default());
let consensus_transport = SigningConsensusTransport {
@ -197,14 +201,15 @@ impl SessionImpl {
consensus_transport: consensus_transport,
})?;
Ok(SessionImpl {
let (completed, oneshot) = CompletionSignal::new();
Ok((SessionImpl {
core: SessionCore {
meta: params.meta,
access_key: params.access_key,
key_share: params.key_share,
cluster: params.cluster,
nonce: params.nonce,
completed: Condvar::new(),
completed,
},
data: Mutex::new(SessionData {
state: SessionState::ConsensusEstablishing,
@ -218,10 +223,11 @@ impl SessionImpl {
delegation_status: None,
result: None,
}),
})
}, oneshot))
}
/// Wait for session completion.
#[cfg(test)]
pub fn wait(&self) -> Result<Signature, Error> {
Self::wait_session(&self.core.completed, &self.data, None, |data| data.result.clone())
.expect("wait_session returns Some if called without timeout; qed")
@ -251,7 +257,6 @@ impl SessionImpl {
})))?;
data.delegation_status = Some(DelegationStatus::DelegatedTo(master));
Ok(())
}
/// Initialize signing session on master node.
@ -284,8 +289,9 @@ impl SessionImpl {
// consensus established => threshold is 0 => we can generate signature on this node
if data.consensus_session.state() == ConsensusSessionState::ConsensusEstablished {
data.result = Some(sign(&key_version.secret_share, &message_hash).map_err(Into::into));
self.core.completed.notify_all();
let result = sign(&key_version.secret_share, &message_hash).map_err(Into::into);
data.result = Some(result.clone());
self.core.completed.send(result);
}
Ok(())
@ -797,7 +803,7 @@ impl SessionImpl {
map: map_message,
}),
nonce: None,
})
}).0
}
/// Set signing session result.
@ -820,8 +826,8 @@ impl SessionImpl {
};
}
data.result = Some(result);
core.completed.notify_all();
data.result = Some(result.clone());
core.completed.send(result);
}
/// Check if all nonces are generated.
@ -883,6 +889,8 @@ impl SessionImpl {
impl ClusterSession for SessionImpl {
type Id = SessionIdWithSubSession;
type CreationData = Requester;
type SuccessfulResult = Signature;
fn type_name() -> &'static str {
"ecdsa_signing"

View File

@ -16,12 +16,13 @@
use std::collections::BTreeSet;
use std::sync::Arc;
use parking_lot::{Mutex, Condvar};
use futures::Oneshot;
use parking_lot::Mutex;
use ethkey::{Public, Secret};
use ethereum_types::H256;
use key_server_cluster::{Error, NodeId, SessionId, Requester, SessionMeta, AclStorage, DocumentKeyShare};
use key_server_cluster::cluster::{Cluster};
use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSession};
use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSession, CompletionSignal};
use key_server_cluster::generation_session::{SessionImpl as GenerationSession, SessionParams as GenerationSessionParams,
SessionState as GenerationSessionState};
use key_server_cluster::message::{Message, SchnorrSigningMessage, SchnorrSigningConsensusMessage, SchnorrSigningGenerationMessage,
@ -59,8 +60,8 @@ struct SessionCore {
pub cluster: Arc<Cluster>,
/// Session-level nonce.
pub nonce: u64,
/// SessionImpl completion condvar.
pub completed: Condvar,
/// SessionImpl completion signal.
pub completed: CompletionSignal<(Secret, Secret)>,
}
/// Signing consensus session type.
@ -160,7 +161,10 @@ enum DelegationStatus {
impl SessionImpl {
/// Create new signing session.
pub fn new(params: SessionParams, requester: Option<Requester>) -> Result<Self, Error> {
pub fn new(
params: SessionParams,
requester: Option<Requester>,
) -> Result<(Self, Oneshot<Result<(Secret, Secret), Error>>), Error> {
debug_assert_eq!(params.meta.threshold, params.key_share.as_ref().map(|ks| ks.threshold).unwrap_or_default());
let consensus_transport = SigningConsensusTransport {
@ -179,14 +183,15 @@ impl SessionImpl {
consensus_transport: consensus_transport,
})?;
Ok(SessionImpl {
let (completed, oneshot) = CompletionSignal::new();
Ok((SessionImpl {
core: SessionCore {
meta: params.meta,
access_key: params.access_key,
key_share: params.key_share,
cluster: params.cluster,
nonce: params.nonce,
completed: Condvar::new(),
completed,
},
data: Mutex::new(SessionData {
state: SessionState::ConsensusEstablishing,
@ -197,21 +202,22 @@ impl SessionImpl {
delegation_status: None,
result: None,
}),
})
}
/// Get session state.
#[cfg(test)]
pub fn state(&self) -> SessionState {
self.data.lock().state
}, oneshot))
}
/// Wait for session completion.
#[cfg(test)]
pub fn wait(&self) -> Result<(Secret, Secret), Error> {
Self::wait_session(&self.core.completed, &self.data, None, |data| data.result.clone())
.expect("wait_session returns Some if called without timeout; qed")
}
/// Get session state (tests only).
#[cfg(test)]
pub fn state(&self) -> SessionState {
self.data.lock().state
}
/// Delegate session to other node.
pub fn delegate(&self, master: NodeId, version: H256, message_hash: H256) -> Result<(), Error> {
if self.core.meta.master_node_id != self.core.meta.self_node_id {
@ -277,7 +283,7 @@ impl SessionImpl {
other_nodes_ids: BTreeSet::new()
}),
nonce: None,
});
}).0;
generation_session.initialize(Default::default(), Default::default(), false, 0, vec![self.core.meta.self_node_id.clone()].into_iter().collect::<BTreeSet<_>>().into())?;
debug_assert_eq!(generation_session.state(), GenerationSessionState::Finished);
@ -405,7 +411,7 @@ impl SessionImpl {
other_nodes_ids: other_consensus_group_nodes,
}),
nonce: None,
});
}).0;
generation_session.initialize(Default::default(), Default::default(), false, key_share.threshold, consensus_group.into())?;
data.generation_session = Some(generation_session);
@ -445,7 +451,7 @@ impl SessionImpl {
other_nodes_ids: other_consensus_group_nodes
}),
nonce: None,
});
}).0;
data.generation_session = Some(generation_session);
data.state = SessionState::SessionKeyGeneration;
}
@ -617,13 +623,15 @@ impl SessionImpl {
};
}
data.result = Some(result);
core.completed.notify_all();
data.result = Some(result.clone());
core.completed.send(result);
}
}
impl ClusterSession for SessionImpl {
type Id = SessionIdWithSubSession;
type CreationData = Requester;
type SuccessfulResult = (Secret, Secret);
fn type_name() -> &'static str {
"signing"
@ -850,7 +858,7 @@ mod tests {
acl_storage: Arc::new(DummyAclStorage::default()),
cluster: self.0.cluster(0).view().unwrap(),
nonce: 0,
}, requester).unwrap()
}, requester).unwrap().0
}
pub fn init_with_version(self, key_version: Option<H256>) -> Result<(Self, Public, H256), Error> {

View File

@ -21,8 +21,8 @@ use ethkey::{Public, Signature, Random, Generator};
use ethereum_types::{Address, H256};
use parity_runtime::Executor;
use key_server_cluster::{Error, NodeId, SessionId, Requester, AclStorage, KeyStorage, KeyServerSet, NodeKeyPair};
use key_server_cluster::cluster_sessions::{ClusterSession, AdminSession, ClusterSessions, SessionIdWithSubSession,
ClusterSessionsContainer, SERVERS_SET_CHANGE_SESSION_ID, create_cluster_view,
use key_server_cluster::cluster_sessions::{WaitableSession, ClusterSession, AdminSession, ClusterSessions,
SessionIdWithSubSession, ClusterSessionsContainer, SERVERS_SET_CHANGE_SESSION_ID, create_cluster_view,
AdminSessionCreationData, ClusterSessionsListener};
use key_server_cluster::cluster_sessions_creator::ClusterSessionCreator;
use key_server_cluster::cluster_connections::{ConnectionProvider, ConnectionManager};
@ -47,19 +47,61 @@ use key_server_cluster::cluster_connections::tests::{MessagesQueue, TestConnecti
/// Cluster interface for external clients.
pub trait ClusterClient: Send + Sync {
/// Start new generation session.
fn new_generation_session(&self, session_id: SessionId, origin: Option<Address>, author: Address, threshold: usize) -> Result<Arc<GenerationSession>, Error>;
fn new_generation_session(
&self,
session_id: SessionId,
origin: Option<Address>,
author: Address,
threshold: usize,
) -> Result<WaitableSession<GenerationSession>, Error>;
/// Start new encryption session.
fn new_encryption_session(&self, session_id: SessionId, author: Requester, common_point: Public, encrypted_point: Public) -> Result<Arc<EncryptionSession>, Error>;
fn new_encryption_session(
&self,
session_id: SessionId,
author: Requester,
common_point: Public,
encrypted_point: Public,
) -> Result<WaitableSession<EncryptionSession>, Error>;
/// Start new decryption session.
fn new_decryption_session(&self, session_id: SessionId, origin: Option<Address>, requester: Requester, version: Option<H256>, is_shadow_decryption: bool, is_broadcast_decryption: bool) -> Result<Arc<DecryptionSession>, Error>;
fn new_decryption_session(
&self,
session_id: SessionId,
origin: Option<Address>,
requester: Requester,
version: Option<H256>,
is_shadow_decryption: bool,
is_broadcast_decryption: bool,
) -> Result<WaitableSession<DecryptionSession>, Error>;
/// Start new Schnorr signing session.
fn new_schnorr_signing_session(&self, session_id: SessionId, requester: Requester, version: Option<H256>, message_hash: H256) -> Result<Arc<SchnorrSigningSession>, Error>;
fn new_schnorr_signing_session(
&self,
session_id: SessionId,
requester: Requester,
version: Option<H256>,
message_hash: H256,
) -> Result<WaitableSession<SchnorrSigningSession>, Error>;
/// Start new ECDSA session.
fn new_ecdsa_signing_session(&self, session_id: SessionId, requester: Requester, version: Option<H256>, message_hash: H256) -> Result<Arc<EcdsaSigningSession>, Error>;
fn new_ecdsa_signing_session(
&self,
session_id: SessionId,
requester: Requester,
version: Option<H256>,
message_hash: H256,
) -> Result<WaitableSession<EcdsaSigningSession>, Error>;
/// Start new key version negotiation session.
fn new_key_version_negotiation_session(&self, session_id: SessionId) -> Result<Arc<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>, Error>;
fn new_key_version_negotiation_session(
&self,
session_id: SessionId,
) -> Result<WaitableSession<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>, Error>;
/// Start new servers set change session.
fn new_servers_set_change_session(&self, session_id: Option<SessionId>, migration_id: Option<H256>, new_nodes_set: BTreeSet<NodeId>, old_set_signature: Signature, new_set_signature: Signature) -> Result<Arc<AdminSession>, Error>;
fn new_servers_set_change_session(
&self,
session_id: Option<SessionId>,
migration_id: Option<H256>,
new_nodes_set: BTreeSet<NodeId>,
old_set_signature: Signature,
new_set_signature: Signature,
) -> Result<WaitableSession<AdminSession>, Error>;
/// Listen for new generation sessions.
fn add_generation_listener(&self, listener: Arc<ClusterSessionsListener<GenerationSession>>);
@ -324,7 +366,10 @@ impl<C: ConnectionManager> ClusterClientImpl<C> {
}
}
fn create_key_version_negotiation_session(&self, session_id: SessionId) -> Result<Arc<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>, Error> {
fn create_key_version_negotiation_session(
&self,
session_id: SessionId,
) -> Result<WaitableSession<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>, Error> {
let mut connected_nodes = self.data.connections.provider().connected_nodes()?;
connected_nodes.insert(self.data.self_key_pair.public().clone());
@ -332,10 +377,10 @@ impl<C: ConnectionManager> ClusterClientImpl<C> {
let session_id = SessionIdWithSubSession::new(session_id, access_key);
let cluster = create_cluster_view(self.data.self_key_pair.clone(), self.data.connections.provider(), false)?;
let session = self.data.sessions.negotiation_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id.clone(), None, false, None)?;
match session.initialize(connected_nodes) {
match session.session.initialize(connected_nodes) {
Ok(()) => Ok(session),
Err(error) => {
self.data.sessions.negotiation_sessions.remove(&session.id());
self.data.sessions.negotiation_sessions.remove(&session.session.id());
Err(error)
}
}
@ -343,29 +388,49 @@ impl<C: ConnectionManager> ClusterClientImpl<C> {
}
impl<C: ConnectionManager> ClusterClient for ClusterClientImpl<C> {
fn new_generation_session(&self, session_id: SessionId, origin: Option<Address>, author: Address, threshold: usize) -> Result<Arc<GenerationSession>, Error> {
fn new_generation_session(
&self,
session_id: SessionId,
origin: Option<Address>,
author: Address,
threshold: usize,
) -> Result<WaitableSession<GenerationSession>, Error> {
let mut connected_nodes = self.data.connections.provider().connected_nodes()?;
connected_nodes.insert(self.data.self_key_pair.public().clone());
let cluster = create_cluster_view(self.data.self_key_pair.clone(), self.data.connections.provider(), true)?;
let session = self.data.sessions.generation_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id, None, false, None)?;
process_initialization_result(
session.initialize(origin, author, false, threshold, connected_nodes.into()),
session.session.initialize(origin, author, false, threshold, connected_nodes.into()),
session, &self.data.sessions.generation_sessions)
}
fn new_encryption_session(&self, session_id: SessionId, requester: Requester, common_point: Public, encrypted_point: Public) -> Result<Arc<EncryptionSession>, Error> {
fn new_encryption_session(
&self,
session_id: SessionId,
requester: Requester,
common_point: Public,
encrypted_point: Public,
) -> Result<WaitableSession<EncryptionSession>, Error> {
let mut connected_nodes = self.data.connections.provider().connected_nodes()?;
connected_nodes.insert(self.data.self_key_pair.public().clone());
let cluster = create_cluster_view(self.data.self_key_pair.clone(), self.data.connections.provider(), true)?;
let session = self.data.sessions.encryption_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id, None, false, None)?;
process_initialization_result(
session.initialize(requester, common_point, encrypted_point),
session.session.initialize(requester, common_point, encrypted_point),
session, &self.data.sessions.encryption_sessions)
}
fn new_decryption_session(&self, session_id: SessionId, origin: Option<Address>, requester: Requester, version: Option<H256>, is_shadow_decryption: bool, is_broadcast_decryption: bool) -> Result<Arc<DecryptionSession>, Error> {
fn new_decryption_session(
&self,
session_id: SessionId,
origin: Option<Address>,
requester: Requester,
version: Option<H256>,
is_shadow_decryption: bool,
is_broadcast_decryption: bool,
) -> Result<WaitableSession<DecryptionSession>, Error> {
let mut connected_nodes = self.data.connections.provider().connected_nodes()?;
connected_nodes.insert(self.data.self_key_pair.public().clone());
@ -376,12 +441,18 @@ impl<C: ConnectionManager> ClusterClient for ClusterClientImpl<C> {
session_id.clone(), None, false, Some(requester))?;
let initialization_result = match version {
Some(version) => session.initialize(origin, version, is_shadow_decryption, is_broadcast_decryption),
Some(version) => session.session.initialize(origin, version, is_shadow_decryption, is_broadcast_decryption),
None => {
self.create_key_version_negotiation_session(session_id.id.clone())
.map(|version_session| {
version_session.set_continue_action(ContinueAction::Decrypt(session.clone(), origin, is_shadow_decryption, is_broadcast_decryption));
self.data.message_processor.try_continue_session(Some(version_session));
let continue_action = ContinueAction::Decrypt(
session.session.clone(),
origin,
is_shadow_decryption,
is_broadcast_decryption,
);
version_session.session.set_continue_action(continue_action);
self.data.message_processor.try_continue_session(Some(version_session.session));
})
},
};
@ -391,7 +462,13 @@ impl<C: ConnectionManager> ClusterClient for ClusterClientImpl<C> {
session, &self.data.sessions.decryption_sessions)
}
fn new_schnorr_signing_session(&self, session_id: SessionId, requester: Requester, version: Option<H256>, message_hash: H256) -> Result<Arc<SchnorrSigningSession>, Error> {
fn new_schnorr_signing_session(
&self,
session_id: SessionId,
requester: Requester,
version: Option<H256>,
message_hash: H256,
) -> Result<WaitableSession<SchnorrSigningSession>, Error> {
let mut connected_nodes = self.data.connections.provider().connected_nodes()?;
connected_nodes.insert(self.data.self_key_pair.public().clone());
@ -401,12 +478,13 @@ impl<C: ConnectionManager> ClusterClient for ClusterClientImpl<C> {
let session = self.data.sessions.schnorr_signing_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id.clone(), None, false, Some(requester))?;
let initialization_result = match version {
Some(version) => session.initialize(version, message_hash),
Some(version) => session.session.initialize(version, message_hash),
None => {
self.create_key_version_negotiation_session(session_id.id.clone())
.map(|version_session| {
version_session.set_continue_action(ContinueAction::SchnorrSign(session.clone(), message_hash));
self.data.message_processor.try_continue_session(Some(version_session));
let continue_action = ContinueAction::SchnorrSign(session.session.clone(), message_hash);
version_session.session.set_continue_action(continue_action);
self.data.message_processor.try_continue_session(Some(version_session.session));
})
},
};
@ -416,7 +494,13 @@ impl<C: ConnectionManager> ClusterClient for ClusterClientImpl<C> {
session, &self.data.sessions.schnorr_signing_sessions)
}
fn new_ecdsa_signing_session(&self, session_id: SessionId, requester: Requester, version: Option<H256>, message_hash: H256) -> Result<Arc<EcdsaSigningSession>, Error> {
fn new_ecdsa_signing_session(
&self,
session_id: SessionId,
requester: Requester,
version: Option<H256>,
message_hash: H256,
) -> Result<WaitableSession<EcdsaSigningSession>, Error> {
let mut connected_nodes = self.data.connections.provider().connected_nodes()?;
connected_nodes.insert(self.data.self_key_pair.public().clone());
@ -426,12 +510,13 @@ impl<C: ConnectionManager> ClusterClient for ClusterClientImpl<C> {
let session = self.data.sessions.ecdsa_signing_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id.clone(), None, false, Some(requester))?;
let initialization_result = match version {
Some(version) => session.initialize(version, message_hash),
Some(version) => session.session.initialize(version, message_hash),
None => {
self.create_key_version_negotiation_session(session_id.id.clone())
.map(|version_session| {
version_session.set_continue_action(ContinueAction::EcdsaSign(session.clone(), message_hash));
self.data.message_processor.try_continue_session(Some(version_session));
let continue_action = ContinueAction::EcdsaSign(session.session.clone(), message_hash);
version_session.session.set_continue_action(continue_action);
self.data.message_processor.try_continue_session(Some(version_session.session));
})
},
};
@ -441,12 +526,21 @@ impl<C: ConnectionManager> ClusterClient for ClusterClientImpl<C> {
session, &self.data.sessions.ecdsa_signing_sessions)
}
fn new_key_version_negotiation_session(&self, session_id: SessionId) -> Result<Arc<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>, Error> {
let session = self.create_key_version_negotiation_session(session_id)?;
Ok(session)
fn new_key_version_negotiation_session(
&self,
session_id: SessionId,
) -> Result<WaitableSession<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>, Error> {
self.create_key_version_negotiation_session(session_id)
}
fn new_servers_set_change_session(&self, session_id: Option<SessionId>, migration_id: Option<H256>, new_nodes_set: BTreeSet<NodeId>, old_set_signature: Signature, new_set_signature: Signature) -> Result<Arc<AdminSession>, Error> {
fn new_servers_set_change_session(
&self,
session_id: Option<SessionId>,
migration_id: Option<H256>,
new_nodes_set: BTreeSet<NodeId>,
old_set_signature: Signature,
new_set_signature: Signature,
) -> Result<WaitableSession<AdminSession>, Error> {
new_servers_set_change_session(
self.data.self_key_pair.clone(),
&self.data.sessions,
@ -508,7 +602,7 @@ pub fn new_servers_set_change_session(
connections: Arc<ConnectionProvider>,
servers_set_change_creator_connector: Arc<ServersSetChangeSessionCreatorConnector>,
params: ServersSetChangeParams,
) -> Result<Arc<AdminSession>, Error> {
) -> Result<WaitableSession<AdminSession>, Error> {
let session_id = match params.session_id {
Some(session_id) if session_id == *SERVERS_SET_CHANGE_SESSION_ID => session_id,
Some(_) => return Err(Error::InvalidMessage),
@ -519,11 +613,11 @@ pub fn new_servers_set_change_session(
let creation_data = AdminSessionCreationData::ServersSetChange(params.migration_id, params.new_nodes_set.clone());
let session = sessions.admin_sessions
.insert(cluster, *self_key_pair.public(), session_id, None, true, Some(creation_data))?;
let initialization_result = session.as_servers_set_change().expect("servers set change session is created; qed")
let initialization_result = session.session.as_servers_set_change().expect("servers set change session is created; qed")
.initialize(params.new_nodes_set, params.old_set_signature, params.new_set_signature);
if initialization_result.is_ok() {
servers_set_change_creator_connector.set_key_servers_set_change_session(session.clone());
servers_set_change_creator_connector.set_key_servers_set_change_session(session.session.clone());
}
process_initialization_result(
@ -531,23 +625,23 @@ pub fn new_servers_set_change_session(
session, &sessions.admin_sessions)
}
fn process_initialization_result<S, SC, D>(
fn process_initialization_result<S, SC>(
result: Result<(), Error>,
session: Arc<S>,
sessions: &ClusterSessionsContainer<S, SC, D>
) -> Result<Arc<S>, Error>
session: WaitableSession<S>,
sessions: &ClusterSessionsContainer<S, SC>
) -> Result<WaitableSession<S>, Error>
where
S: ClusterSession,
SC: ClusterSessionCreator<S, D>
SC: ClusterSessionCreator<S>
{
match result {
Ok(()) if session.is_finished() => {
sessions.remove(&session.id());
Ok(()) if session.session.is_finished() => {
sessions.remove(&session.session.id());
Ok(session)
},
Ok(()) => Ok(session),
Err(error) => {
sessions.remove(&session.id());
sessions.remove(&session.session.id());
Err(error)
},
}
@ -558,6 +652,7 @@ pub mod tests {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use futures::Future;
use parking_lot::{Mutex, RwLock};
use ethereum_types::{Address, H256};
use ethkey::{Random, Generator, Public, Signature, sign};
@ -567,7 +662,8 @@ pub mod tests {
use key_server_cluster::cluster::{new_test_cluster, Cluster, ClusterCore, ClusterConfiguration, ClusterClient};
use key_server_cluster::cluster_connections::ConnectionManager;
use key_server_cluster::cluster_connections::tests::{MessagesQueue, TestConnections};
use key_server_cluster::cluster_sessions::{ClusterSession, ClusterSessions, AdminSession, ClusterSessionsListener};
use key_server_cluster::cluster_sessions::{WaitableSession, ClusterSession, ClusterSessions, AdminSession,
ClusterSessionsListener};
use key_server_cluster::generation_session::{SessionImpl as GenerationSession,
SessionState as GenerationSessionState};
use key_server_cluster::decryption_session::{SessionImpl as DecryptionSession};
@ -595,17 +691,71 @@ pub mod tests {
}
impl ClusterClient for DummyClusterClient {
fn new_generation_session(&self, _session_id: SessionId, _origin: Option<Address>, _author: Address, _threshold: usize) -> Result<Arc<GenerationSession>, Error> {
fn new_generation_session(
&self,
_session_id: SessionId,
_origin: Option<Address>,
_author: Address,
_threshold: usize,
) -> Result<WaitableSession<GenerationSession>, Error> {
self.generation_requests_count.fetch_add(1, Ordering::Relaxed);
Err(Error::Internal("test-error".into()))
}
fn new_encryption_session(&self, _session_id: SessionId, _requester: Requester, _common_point: Public, _encrypted_point: Public) -> Result<Arc<EncryptionSession>, Error> { unimplemented!("test-only") }
fn new_decryption_session(&self, _session_id: SessionId, _origin: Option<Address>, _requester: Requester, _version: Option<H256>, _is_shadow_decryption: bool, _is_broadcast_session: bool) -> Result<Arc<DecryptionSession>, Error> { unimplemented!("test-only") }
fn new_schnorr_signing_session(&self, _session_id: SessionId, _requester: Requester, _version: Option<H256>, _message_hash: H256) -> Result<Arc<SchnorrSigningSession>, Error> { unimplemented!("test-only") }
fn new_ecdsa_signing_session(&self, _session_id: SessionId, _requester: Requester, _version: Option<H256>, _message_hash: H256) -> Result<Arc<EcdsaSigningSession>, Error> { unimplemented!("test-only") }
fn new_encryption_session(
&self,
_session_id: SessionId,
_requester: Requester,
_common_point: Public,
_encrypted_point: Public,
) -> Result<WaitableSession<EncryptionSession>, Error> {
unimplemented!("test-only")
}
fn new_decryption_session(
&self,
_session_id: SessionId,
_origin: Option<Address>,
_requester: Requester,
_version: Option<H256>,
_is_shadow_decryption: bool,
_is_broadcast_session: bool,
) -> Result<WaitableSession<DecryptionSession>, Error> {
unimplemented!("test-only")
}
fn new_schnorr_signing_session(
&self,
_session_id: SessionId,
_requester: Requester,
_version: Option<H256>,
_message_hash: H256,
) -> Result<WaitableSession<SchnorrSigningSession>, Error> {
unimplemented!("test-only")
}
fn new_ecdsa_signing_session(
&self,
_session_id: SessionId,
_requester: Requester,
_version: Option<H256>,
_message_hash: H256,
) -> Result<WaitableSession<EcdsaSigningSession>, Error> {
unimplemented!("test-only")
}
fn new_key_version_negotiation_session(&self, _session_id: SessionId) -> Result<Arc<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>, Error> { unimplemented!("test-only") }
fn new_servers_set_change_session(&self, _session_id: Option<SessionId>, _migration_id: Option<H256>, _new_nodes_set: BTreeSet<NodeId>, _old_set_signature: Signature, _new_set_signature: Signature) -> Result<Arc<AdminSession>, Error> { unimplemented!("test-only") }
fn new_key_version_negotiation_session(
&self,
_session_id: SessionId,
) -> Result<WaitableSession<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>, Error> {
unimplemented!("test-only")
}
fn new_servers_set_change_session(
&self,
_session_id: Option<SessionId>,
_migration_id: Option<H256>,
_new_nodes_set: BTreeSet<NodeId>,
_old_set_signature: Signature,
_new_set_signature: Signature,
) -> Result<WaitableSession<AdminSession>, Error> {
unimplemented!("test-only")
}
fn add_generation_listener(&self, _listener: Arc<ClusterSessionsListener<GenerationSession>>) {}
fn add_decryption_listener(&self, _listener: Arc<ClusterSessionsListener<DecryptionSession>>) {}
@ -897,7 +1047,7 @@ pub mod tests {
// start && wait for generation session to fail
let session = ml.cluster(0).client()
.new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap();
.new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap().session;
ml.loop_until(|| session.joint_public_and_secret().is_some()
&& ml.cluster(0).client().generation_session(&SessionId::default()).is_none());
assert!(session.joint_public_and_secret().unwrap().is_err());
@ -924,7 +1074,7 @@ pub mod tests {
// start && wait for generation session to fail
let session = ml.cluster(0).client()
.new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap();
.new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap().session;
ml.loop_until(|| session.joint_public_and_secret().is_some()
&& ml.cluster(0).client().generation_session(&SessionId::default()).is_none());
assert!(session.joint_public_and_secret().unwrap().is_err());
@ -949,7 +1099,7 @@ pub mod tests {
// start && wait for generation session to complete
let session = ml.cluster(0).client()
.new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap();
.new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap().session;
ml.loop_until(|| (session.state() == GenerationSessionState::Finished
|| session.state() == GenerationSessionState::Failed)
&& ml.cluster(0).client().generation_session(&SessionId::default()).is_none());
@ -1017,7 +1167,7 @@ pub mod tests {
// start && wait for generation session to complete
let session = ml.cluster(0).client().
new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap();
new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap().session;
ml.loop_until(|| (session.state() == GenerationSessionState::Finished
|| session.state() == GenerationSessionState::Failed)
&& ml.cluster(0).client().generation_session(&SessionId::default()).is_none());
@ -1035,7 +1185,7 @@ pub mod tests {
ml.loop_until(|| session.is_finished() && (0..3).all(|i|
ml.cluster(i).data.sessions.schnorr_signing_sessions.is_empty()));
session0.wait().unwrap();
session0.into_wait_future().wait().unwrap();
// and try to sign message with generated key using node that has no key share
let signature = sign(Random.generate().unwrap().secret(), &Default::default()).unwrap();
@ -1045,7 +1195,7 @@ pub mod tests {
ml.loop_until(|| session.is_finished() && (0..3).all(|i|
ml.cluster(i).data.sessions.schnorr_signing_sessions.is_empty()));
session2.wait().unwrap();
session2.into_wait_future().wait().unwrap();
// now remove share from node1
ml.cluster(1).data.config.key_storage.remove(&Default::default()).unwrap();
@ -1057,7 +1207,7 @@ pub mod tests {
let session = ml.cluster(0).data.sessions.schnorr_signing_sessions.first().unwrap();
ml.loop_until(|| session.is_finished());
session1.wait().unwrap_err();
session1.into_wait_future().wait().unwrap_err();
}
#[test]
@ -1067,7 +1217,7 @@ pub mod tests {
// start && wait for generation session to complete
let session = ml.cluster(0).client()
.new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap();
.new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap().session;
ml.loop_until(|| (session.state() == GenerationSessionState::Finished
|| session.state() == GenerationSessionState::Failed)
&& ml.cluster(0).client().generation_session(&SessionId::default()).is_none());
@ -1085,7 +1235,7 @@ pub mod tests {
ml.loop_until(|| session.is_finished() && (0..3).all(|i|
ml.cluster(i).data.sessions.ecdsa_signing_sessions.is_empty()));
session0.wait().unwrap();
session0.into_wait_future().wait().unwrap();
// and try to sign message with generated key using node that has no key share
let signature = sign(Random.generate().unwrap().secret(), &Default::default()).unwrap();
@ -1094,7 +1244,7 @@ pub mod tests {
let session = ml.cluster(2).data.sessions.ecdsa_signing_sessions.first().unwrap();
ml.loop_until(|| session.is_finished() && (0..3).all(|i|
ml.cluster(i).data.sessions.ecdsa_signing_sessions.is_empty()));
session2.wait().unwrap();
session2.into_wait_future().wait().unwrap();
// now remove share from node1
ml.cluster(1).data.config.key_storage.remove(&Default::default()).unwrap();
@ -1105,6 +1255,6 @@ pub mod tests {
.new_ecdsa_signing_session(Default::default(), signature.into(), None, H256::random()).unwrap();
let session = ml.cluster(0).data.sessions.ecdsa_signing_sessions.first().unwrap();
ml.loop_until(|| session.is_finished());
session1.wait().unwrap_err();
session1.into_wait_future().wait().unwrap_err();
}
}

View File

@ -72,9 +72,9 @@ impl SessionsMessageProcessor {
}
/// Process single session message from connection.
fn process_message<S: ClusterSession, SC: ClusterSessionCreator<S, D>, D>(
fn process_message<S: ClusterSession, SC: ClusterSessionCreator<S>>(
&self,
sessions: &ClusterSessionsContainer<S, SC, D>,
sessions: &ClusterSessionsContainer<S, SC>,
connection: Arc<Connection>,
mut message: Message,
) -> Option<Arc<S>>
@ -151,9 +151,9 @@ impl SessionsMessageProcessor {
}
/// Get or insert new session.
fn prepare_session<S: ClusterSession, SC: ClusterSessionCreator<S, D>, D>(
fn prepare_session<S: ClusterSession, SC: ClusterSessionCreator<S>>(
&self,
sessions: &ClusterSessionsContainer<S, SC, D>,
sessions: &ClusterSessionsContainer<S, SC>,
sender: &NodeId,
message: &Message
) -> Result<Arc<S>, Error>
@ -192,7 +192,7 @@ impl SessionsMessageProcessor {
let nonce = Some(message.session_nonce().ok_or(Error::InvalidMessage)?);
let exclusive = message.is_exclusive_session_message();
sessions.insert(cluster, master, session_id, nonce, exclusive, creation_data)
sessions.insert(cluster, master, session_id, nonce, exclusive, creation_data).map(|s| s.session)
},
}
}
@ -273,8 +273,8 @@ impl MessageProcessor for SessionsMessageProcessor {
let is_master_node = meta.self_node_id == meta.master_node_id;
if is_master_node && session.is_finished() {
self.sessions.negotiation_sessions.remove(&session.id());
match session.wait() {
Ok(Some((version, master))) => match session.take_continue_action() {
match session.result() {
Some(Ok(Some((version, master)))) => match session.take_continue_action() {
Some(ContinueAction::Decrypt(
session, origin, is_shadow_decryption, is_broadcast_decryption
)) => {
@ -317,10 +317,7 @@ impl MessageProcessor for SessionsMessageProcessor {
},
None => (),
},
Ok(None) => unreachable!("is_master_node; session is finished;
negotiation version always finished with result on master;
qed"),
Err(error) => match session.take_continue_action() {
Some(Err(error)) => match session.take_continue_action() {
Some(ContinueAction::Decrypt(session, _, _, _)) => {
session.on_session_error(&meta.self_node_id, error);
self.sessions.decryption_sessions.remove(&session.id());
@ -335,6 +332,9 @@ impl MessageProcessor for SessionsMessageProcessor {
},
None => (),
},
None | Some(Ok(None)) => unreachable!("is_master_node; session is finished;
negotiation version always finished with result on master;
qed"),
}
}
}
@ -352,6 +352,6 @@ impl MessageProcessor for SessionsMessageProcessor {
self.connections.clone(),
self.servers_set_change_creator_connector.clone(),
params,
)
).map(|s| s.session)
}
}

View File

@ -18,10 +18,11 @@ use std::time::{Duration, Instant};
use std::sync::{Arc, Weak};
use std::sync::atomic::AtomicBool;
use std::collections::{VecDeque, BTreeMap, BTreeSet};
use futures::{oneshot, Oneshot, Complete, Future};
use parking_lot::{Mutex, RwLock, Condvar};
use ethereum_types::H256;
use ethkey::Secret;
use key_server_cluster::{Error, NodeId, SessionId, Requester, NodeKeyPair};
use key_server_cluster::{Error, NodeId, SessionId, NodeKeyPair};
use key_server_cluster::cluster::{Cluster, ClusterConfiguration, ClusterView};
use key_server_cluster::cluster_connections::ConnectionProvider;
use key_server_cluster::connection_trigger::ServersSetChangeSessionCreatorConnector;
@ -68,6 +69,10 @@ pub struct SessionIdWithSubSession {
pub trait ClusterSession {
/// Session identifier type.
type Id: ::std::fmt::Debug + Ord + Clone;
/// Session creation data type.
type CreationData;
/// Session (successful) result type.
type SuccessfulResult: Send + 'static;
/// Session type name.
fn type_name() -> &'static str;
@ -85,15 +90,22 @@ pub trait ClusterSession {
fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error>;
/// 'Wait for session completion' helper.
fn wait_session<T, U, F: Fn(&U) -> Option<Result<T, Error>>>(completion_event: &Condvar, session_data: &Mutex<U>, timeout: Option<Duration>, result_reader: F) -> Option<Result<T, Error>> {
#[cfg(test)]
fn wait_session<T, U, F: Fn(&U) -> Option<Result<T, Error>>>(
completion: &CompletionSignal<T>,
session_data: &Mutex<U>,
timeout: Option<Duration>,
result_reader: F
) -> Option<Result<T, Error>> {
let mut locked_data = session_data.lock();
match result_reader(&locked_data) {
Some(result) => Some(result),
None => {
let completion_condvar = completion.completion_condvar.as_ref().expect("created in test mode");
match timeout {
None => completion_event.wait(&mut locked_data),
None => completion_condvar.wait(&mut locked_data),
Some(timeout) => {
completion_event.wait_for(&mut locked_data, timeout);
completion_condvar.wait_for(&mut locked_data, timeout);
},
}
@ -103,6 +115,23 @@ pub trait ClusterSession {
}
}
/// Waitable cluster session.
pub struct WaitableSession<S: ClusterSession> {
/// Session handle.
pub session: Arc<S>,
/// Session result oneshot.
pub oneshot: Oneshot<Result<S::SuccessfulResult, Error>>,
}
/// Session completion signal.
pub struct CompletionSignal<T> {
/// Completion future.
pub completion_future: Mutex<Option<Complete<Result<T, Error>>>>,
/// Completion condvar.
pub completion_condvar: Option<Condvar>,
}
/// Administrative session.
pub enum AdminSession {
/// Share add session.
@ -122,19 +151,22 @@ pub enum AdminSessionCreationData {
/// Active sessions on this cluster.
pub struct ClusterSessions {
/// Key generation sessions.
pub generation_sessions: ClusterSessionsContainer<GenerationSessionImpl, GenerationSessionCreator, ()>,
pub generation_sessions: ClusterSessionsContainer<GenerationSessionImpl, GenerationSessionCreator>,
/// Encryption sessions.
pub encryption_sessions: ClusterSessionsContainer<EncryptionSessionImpl, EncryptionSessionCreator, ()>,
pub encryption_sessions: ClusterSessionsContainer<EncryptionSessionImpl, EncryptionSessionCreator>,
/// Decryption sessions.
pub decryption_sessions: ClusterSessionsContainer<DecryptionSessionImpl, DecryptionSessionCreator, Requester>,
pub decryption_sessions: ClusterSessionsContainer<DecryptionSessionImpl, DecryptionSessionCreator>,
/// Schnorr signing sessions.
pub schnorr_signing_sessions: ClusterSessionsContainer<SchnorrSigningSessionImpl, SchnorrSigningSessionCreator, Requester>,
pub schnorr_signing_sessions: ClusterSessionsContainer<SchnorrSigningSessionImpl, SchnorrSigningSessionCreator>,
/// ECDSA signing sessions.
pub ecdsa_signing_sessions: ClusterSessionsContainer<EcdsaSigningSessionImpl, EcdsaSigningSessionCreator, Requester>,
pub ecdsa_signing_sessions: ClusterSessionsContainer<EcdsaSigningSessionImpl, EcdsaSigningSessionCreator>,
/// Key version negotiation sessions.
pub negotiation_sessions: ClusterSessionsContainer<KeyVersionNegotiationSessionImpl<VersionNegotiationTransport>, KeyVersionNegotiationSessionCreator, ()>,
pub negotiation_sessions: ClusterSessionsContainer<
KeyVersionNegotiationSessionImpl<VersionNegotiationTransport>,
KeyVersionNegotiationSessionCreator
>,
/// Administrative sessions.
pub admin_sessions: ClusterSessionsContainer<AdminSession, AdminSessionCreator, AdminSessionCreationData>,
pub admin_sessions: ClusterSessionsContainer<AdminSession, AdminSessionCreator>,
/// Self node id.
self_node_id: NodeId,
/// Creator core.
@ -150,7 +182,7 @@ pub trait ClusterSessionsListener<S: ClusterSession>: Send + Sync {
}
/// Active sessions container.
pub struct ClusterSessionsContainer<S: ClusterSession, SC: ClusterSessionCreator<S, D>, D> {
pub struct ClusterSessionsContainer<S: ClusterSession, SC: ClusterSessionCreator<S>> {
/// Sessions creator.
pub creator: SC,
/// Active sessions.
@ -161,8 +193,6 @@ pub struct ClusterSessionsContainer<S: ClusterSession, SC: ClusterSessionCreator
container_state: Arc<Mutex<ClusterSessionsContainerState>>,
/// Do not actually remove sessions.
preserve_sessions: bool,
/// Phantom data.
_pd: ::std::marker::PhantomData<D>,
}
/// Session and its message queue.
@ -279,7 +309,7 @@ impl ClusterSessions {
}
}
impl<S, SC, D> ClusterSessionsContainer<S, SC, D> where S: ClusterSession, SC: ClusterSessionCreator<S, D> {
impl<S, SC> ClusterSessionsContainer<S, SC> where S: ClusterSession, SC: ClusterSessionCreator<S> {
pub fn new(creator: SC, container_state: Arc<Mutex<ClusterSessionsContainerState>>) -> Self {
ClusterSessionsContainer {
creator: creator,
@ -287,7 +317,6 @@ impl<S, SC, D> ClusterSessionsContainer<S, SC, D> where S: ClusterSession, SC: C
listeners: Mutex::new(Vec::new()),
container_state: container_state,
preserve_sessions: false,
_pd: Default::default(),
}
}
@ -316,7 +345,15 @@ impl<S, SC, D> ClusterSessionsContainer<S, SC, D> where S: ClusterSession, SC: C
self.sessions.read().values().nth(0).map(|s| s.session.clone())
}
pub fn insert(&self, cluster: Arc<Cluster>, master: NodeId, session_id: S::Id, session_nonce: Option<u64>, is_exclusive_session: bool, creation_data: Option<D>) -> Result<Arc<S>, Error> {
pub fn insert(
&self,
cluster: Arc<Cluster>,
master: NodeId,
session_id: S::Id,
session_nonce: Option<u64>,
is_exclusive_session: bool,
creation_data: Option<S::CreationData>,
) -> Result<WaitableSession<S>, Error> {
let mut sessions = self.sessions.write();
if sessions.contains_key(&session_id) {
return Err(Error::DuplicateSessionId);
@ -335,11 +372,11 @@ impl<S, SC, D> ClusterSessionsContainer<S, SC, D> where S: ClusterSession, SC: C
cluster_view: cluster,
last_keep_alive_time: Instant::now(),
last_message_time: Instant::now(),
session: session.clone(),
session: session.session.clone(),
queue: VecDeque::new(),
};
sessions.insert(session_id, queued_session);
self.notify_listeners(|l| l.on_session_inserted(session.clone()));
self.notify_listeners(|l| l.on_session_inserted(session.session.clone()));
Ok(session)
}
@ -419,7 +456,12 @@ impl<S, SC, D> ClusterSessionsContainer<S, SC, D> where S: ClusterSession, SC: C
}
}
impl<S, SC, D> ClusterSessionsContainer<S, SC, D> where S: ClusterSession, SC: ClusterSessionCreator<S, D>, SessionId: From<S::Id> {
impl<S, SC> ClusterSessionsContainer<S, SC>
where
S: ClusterSession,
SC: ClusterSessionCreator<S>,
SessionId: From<S::Id>,
{
pub fn send_keep_alive(&self, session_id: &S::Id, self_node_id: &NodeId) {
if let Some(session) = self.sessions.write().get_mut(session_id) {
let now = Instant::now();
@ -521,6 +563,8 @@ impl AdminSession {
impl ClusterSession for AdminSession {
type Id = SessionId;
type CreationData = AdminSessionCreationData;
type SuccessfulResult = ();
fn type_name() -> &'static str {
"admin"
@ -569,6 +613,40 @@ impl ClusterSession for AdminSession {
}
}
impl<S: ClusterSession> WaitableSession<S> {
pub fn new(session: S, oneshot: Oneshot<Result<S::SuccessfulResult, Error>>) -> Self {
WaitableSession {
session: Arc::new(session),
oneshot,
}
}
pub fn into_wait_future(self) -> Box<Future<Item=S::SuccessfulResult, Error=Error> + Send> {
Box::new(self.oneshot
.map_err(|e| Error::Internal(e.to_string()))
.and_then(|res| res))
}
}
impl<T> CompletionSignal<T> {
pub fn new() -> (Self, Oneshot<Result<T, Error>>) {
let (complete, oneshot) = oneshot();
let completion_condvar = if cfg!(test) { Some(Condvar::new()) } else { None };
(CompletionSignal {
completion_future: Mutex::new(Some(complete)),
completion_condvar,
}, oneshot)
}
pub fn send(&self, result: Result<T, Error>) {
let completion_future = ::std::mem::replace(&mut *self.completion_future.lock(), None);
completion_future.map(|c| c.send(result));
if let Some(ref completion_condvar) = self.completion_condvar {
completion_condvar.notify_all();
}
}
}
pub fn create_cluster_view(self_key_pair: Arc<NodeKeyPair>, connections: Arc<ConnectionProvider>, requires_all_connections: bool) -> Result<Arc<Cluster>, Error> {
let mut connected_nodes = connections.connected_nodes()?;
let disconnected_nodes = connections.disconnected_nodes();

View File

@ -22,7 +22,8 @@ use ethkey::Public;
use key_server_cluster::{Error, NodeId, SessionId, Requester, AclStorage, KeyStorage, DocumentKeyShare, SessionMeta};
use key_server_cluster::cluster::{Cluster, ClusterConfiguration};
use key_server_cluster::connection_trigger::ServersSetChangeSessionCreatorConnector;
use key_server_cluster::cluster_sessions::{ClusterSession, SessionIdWithSubSession, AdminSession, AdminSessionCreationData};
use key_server_cluster::cluster_sessions::{WaitableSession, ClusterSession, SessionIdWithSubSession,
AdminSession, AdminSessionCreationData};
use key_server_cluster::message::{self, Message, DecryptionMessage, SchnorrSigningMessage, ConsensusMessageOfShareAdd,
ShareAddMessage, ServersSetChangeMessage, ConsensusMessage, ConsensusMessageWithServersSet, EcdsaSigningMessage};
use key_server_cluster::generation_session::{SessionImpl as GenerationSessionImpl, SessionParams as GenerationSessionParams};
@ -43,9 +44,9 @@ use key_server_cluster::key_version_negotiation_session::{SessionImpl as KeyVers
use key_server_cluster::admin_sessions::ShareChangeSessionMeta;
/// Generic cluster session creator.
pub trait ClusterSessionCreator<S: ClusterSession, D> {
pub trait ClusterSessionCreator<S: ClusterSession> {
/// Get creation data from message.
fn creation_data_from_message(_message: &Message) -> Result<Option<D>, Error> {
fn creation_data_from_message(_message: &Message) -> Result<Option<S::CreationData>, Error> {
Ok(None)
}
@ -53,7 +54,14 @@ pub trait ClusterSessionCreator<S: ClusterSession, D> {
fn make_error_message(sid: S::Id, nonce: u64, err: Error) -> Message;
/// Create cluster session.
fn create(&self, cluster: Arc<Cluster>, master: NodeId, nonce: Option<u64>, id: S::Id, creation_data: Option<D>) -> Result<Arc<S>, Error>;
fn create(
&self,
cluster: Arc<Cluster>,
master: NodeId,
nonce: Option<u64>,
id: S::Id,
creation_data: Option<S::CreationData>,
) -> Result<WaitableSession<S>, Error>;
}
/// Message with session id.
@ -134,7 +142,7 @@ impl GenerationSessionCreator {
}
}
impl ClusterSessionCreator<GenerationSessionImpl, ()> for GenerationSessionCreator {
impl ClusterSessionCreator<GenerationSessionImpl> for GenerationSessionCreator {
fn make_error_message(sid: SessionId, nonce: u64, err: Error) -> Message {
message::Message::Generation(message::GenerationMessage::SessionError(message::SessionError {
session: sid.into(),
@ -143,27 +151,33 @@ impl ClusterSessionCreator<GenerationSessionImpl, ()> for GenerationSessionCreat
}))
}
fn create(&self, cluster: Arc<Cluster>, master: NodeId, nonce: Option<u64>, id: SessionId, _creation_data: Option<()>) -> Result<Arc<GenerationSessionImpl>, Error> {
fn create(
&self,
cluster: Arc<Cluster>,
master: NodeId,
nonce: Option<u64>,
id: SessionId,
_creation_data: Option<()>,
) -> Result<WaitableSession<GenerationSessionImpl>, Error> {
// check that there's no finished encryption session with the same id
if self.core.key_storage.contains(&id) {
return Err(Error::ServerKeyAlreadyGenerated);
}
let nonce = self.core.check_session_nonce(&master, nonce)?;
Ok(GenerationSessionImpl::new(GenerationSessionParams {
let (session, oneshot) = GenerationSessionImpl::new(GenerationSessionParams {
id: id.clone(),
self_node_id: self.core.self_node_id.clone(),
key_storage: Some(self.core.key_storage.clone()),
cluster: cluster,
nonce: Some(nonce),
}))
.map(|session| {
});
if self.make_faulty_generation_sessions.load(Ordering::Relaxed) {
session.simulate_faulty_behaviour();
}
session
})
.map(Arc::new)
Ok(WaitableSession::new(session, oneshot))
}
}
@ -173,7 +187,7 @@ pub struct EncryptionSessionCreator {
pub core: Arc<SessionCreatorCore>,
}
impl ClusterSessionCreator<EncryptionSessionImpl, ()> for EncryptionSessionCreator {
impl ClusterSessionCreator<EncryptionSessionImpl> for EncryptionSessionCreator {
fn make_error_message(sid: SessionId, nonce: u64, err: Error) -> Message {
message::Message::Encryption(message::EncryptionMessage::EncryptionSessionError(message::EncryptionSessionError {
session: sid.into(),
@ -182,17 +196,26 @@ impl ClusterSessionCreator<EncryptionSessionImpl, ()> for EncryptionSessionCreat
}))
}
fn create(&self, cluster: Arc<Cluster>, master: NodeId, nonce: Option<u64>, id: SessionId, _creation_data: Option<()>) -> Result<Arc<EncryptionSessionImpl>, Error> {
fn create(
&self,
cluster: Arc<Cluster>,
master: NodeId,
nonce: Option<u64>,
id: SessionId,
_creation_data: Option<()>,
) -> Result<WaitableSession<EncryptionSessionImpl>, Error> {
let encrypted_data = self.core.read_key_share(&id)?;
let nonce = self.core.check_session_nonce(&master, nonce)?;
Ok(Arc::new(EncryptionSessionImpl::new(EncryptionSessionParams {
let (session, oneshot) = EncryptionSessionImpl::new(EncryptionSessionParams {
id: id,
self_node_id: self.core.self_node_id.clone(),
encrypted_data: encrypted_data,
key_storage: self.core.key_storage.clone(),
cluster: cluster,
nonce: nonce,
})?))
})?;
Ok(WaitableSession::new(session, oneshot))
}
}
@ -202,7 +225,7 @@ pub struct DecryptionSessionCreator {
pub core: Arc<SessionCreatorCore>,
}
impl ClusterSessionCreator<DecryptionSessionImpl, Requester> for DecryptionSessionCreator {
impl ClusterSessionCreator<DecryptionSessionImpl> for DecryptionSessionCreator {
fn creation_data_from_message(message: &Message) -> Result<Option<Requester>, Error> {
match *message {
Message::Decryption(DecryptionMessage::DecryptionConsensusMessage(ref message)) => match &message.message {
@ -223,10 +246,17 @@ impl ClusterSessionCreator<DecryptionSessionImpl, Requester> for DecryptionSessi
}))
}
fn create(&self, cluster: Arc<Cluster>, master: NodeId, nonce: Option<u64>, id: SessionIdWithSubSession, requester: Option<Requester>) -> Result<Arc<DecryptionSessionImpl>, Error> {
fn create(
&self,
cluster: Arc<Cluster>,
master: NodeId,
nonce: Option<u64>,
id: SessionIdWithSubSession,
requester: Option<Requester>,
) -> Result<WaitableSession<DecryptionSessionImpl>, Error> {
let encrypted_data = self.core.read_key_share(&id.id)?;
let nonce = self.core.check_session_nonce(&master, nonce)?;
Ok(Arc::new(DecryptionSessionImpl::new(DecryptionSessionParams {
let (session, oneshot) = DecryptionSessionImpl::new(DecryptionSessionParams {
meta: SessionMeta {
id: id.id,
self_node_id: self.core.self_node_id.clone(),
@ -240,7 +270,9 @@ impl ClusterSessionCreator<DecryptionSessionImpl, Requester> for DecryptionSessi
acl_storage: self.core.acl_storage.clone(),
cluster: cluster,
nonce: nonce,
}, requester)?))
}, requester)?;
Ok(WaitableSession::new(session, oneshot))
}
}
@ -250,7 +282,7 @@ pub struct SchnorrSigningSessionCreator {
pub core: Arc<SessionCreatorCore>,
}
impl ClusterSessionCreator<SchnorrSigningSessionImpl, Requester> for SchnorrSigningSessionCreator {
impl ClusterSessionCreator<SchnorrSigningSessionImpl> for SchnorrSigningSessionCreator {
fn creation_data_from_message(message: &Message) -> Result<Option<Requester>, Error> {
match *message {
Message::SchnorrSigning(SchnorrSigningMessage::SchnorrSigningConsensusMessage(ref message)) => match &message.message {
@ -271,10 +303,17 @@ impl ClusterSessionCreator<SchnorrSigningSessionImpl, Requester> for SchnorrSign
}))
}
fn create(&self, cluster: Arc<Cluster>, master: NodeId, nonce: Option<u64>, id: SessionIdWithSubSession, requester: Option<Requester>) -> Result<Arc<SchnorrSigningSessionImpl>, Error> {
fn create(
&self,
cluster: Arc<Cluster>,
master: NodeId,
nonce: Option<u64>,
id: SessionIdWithSubSession,
requester: Option<Requester>,
) -> Result<WaitableSession<SchnorrSigningSessionImpl>, Error> {
let encrypted_data = self.core.read_key_share(&id.id)?;
let nonce = self.core.check_session_nonce(&master, nonce)?;
Ok(Arc::new(SchnorrSigningSessionImpl::new(SchnorrSigningSessionParams {
let (session, oneshot) = SchnorrSigningSessionImpl::new(SchnorrSigningSessionParams {
meta: SessionMeta {
id: id.id,
self_node_id: self.core.self_node_id.clone(),
@ -288,7 +327,8 @@ impl ClusterSessionCreator<SchnorrSigningSessionImpl, Requester> for SchnorrSign
acl_storage: self.core.acl_storage.clone(),
cluster: cluster,
nonce: nonce,
}, requester)?))
}, requester)?;
Ok(WaitableSession::new(session, oneshot))
}
}
@ -298,7 +338,7 @@ pub struct EcdsaSigningSessionCreator {
pub core: Arc<SessionCreatorCore>,
}
impl ClusterSessionCreator<EcdsaSigningSessionImpl, Requester> for EcdsaSigningSessionCreator {
impl ClusterSessionCreator<EcdsaSigningSessionImpl> for EcdsaSigningSessionCreator {
fn creation_data_from_message(message: &Message) -> Result<Option<Requester>, Error> {
match *message {
Message::EcdsaSigning(EcdsaSigningMessage::EcdsaSigningConsensusMessage(ref message)) => match &message.message {
@ -319,10 +359,10 @@ impl ClusterSessionCreator<EcdsaSigningSessionImpl, Requester> for EcdsaSigningS
}))
}
fn create(&self, cluster: Arc<Cluster>, master: NodeId, nonce: Option<u64>, id: SessionIdWithSubSession, requester: Option<Requester>) -> Result<Arc<EcdsaSigningSessionImpl>, Error> {
fn create(&self, cluster: Arc<Cluster>, master: NodeId, nonce: Option<u64>, id: SessionIdWithSubSession, requester: Option<Requester>) -> Result<WaitableSession<EcdsaSigningSessionImpl>, Error> {
let encrypted_data = self.core.read_key_share(&id.id)?;
let nonce = self.core.check_session_nonce(&master, nonce)?;
Ok(Arc::new(EcdsaSigningSessionImpl::new(EcdsaSigningSessionParams {
let (session, oneshot) = EcdsaSigningSessionImpl::new(EcdsaSigningSessionParams {
meta: SessionMeta {
id: id.id,
self_node_id: self.core.self_node_id.clone(),
@ -336,7 +376,9 @@ impl ClusterSessionCreator<EcdsaSigningSessionImpl, Requester> for EcdsaSigningS
acl_storage: self.core.acl_storage.clone(),
cluster: cluster,
nonce: nonce,
}, requester)?))
}, requester)?;
Ok(WaitableSession::new(session, oneshot))
}
}
@ -346,7 +388,7 @@ pub struct KeyVersionNegotiationSessionCreator {
pub core: Arc<SessionCreatorCore>,
}
impl ClusterSessionCreator<KeyVersionNegotiationSessionImpl<VersionNegotiationTransport>, ()> for KeyVersionNegotiationSessionCreator {
impl ClusterSessionCreator<KeyVersionNegotiationSessionImpl<VersionNegotiationTransport>> for KeyVersionNegotiationSessionCreator {
fn make_error_message(sid: SessionIdWithSubSession, nonce: u64, err: Error) -> Message {
message::Message::KeyVersionNegotiation(message::KeyVersionNegotiationMessage::KeyVersionsError(message::KeyVersionsError {
session: sid.id.into(),
@ -359,14 +401,21 @@ impl ClusterSessionCreator<KeyVersionNegotiationSessionImpl<VersionNegotiationTr
}))
}
fn create(&self, cluster: Arc<Cluster>, master: NodeId, nonce: Option<u64>, id: SessionIdWithSubSession, _creation_data: Option<()>) -> Result<Arc<KeyVersionNegotiationSessionImpl<VersionNegotiationTransport>>, Error> {
fn create(
&self,
cluster: Arc<Cluster>,
master: NodeId,
nonce: Option<u64>,
id: SessionIdWithSubSession,
_creation_data: Option<()>,
) -> Result<WaitableSession<KeyVersionNegotiationSessionImpl<VersionNegotiationTransport>>, Error> {
let configured_nodes_count = cluster.configured_nodes_count();
let connected_nodes_count = cluster.connected_nodes_count();
let encrypted_data = self.core.read_key_share(&id.id)?;
let nonce = self.core.check_session_nonce(&master, nonce)?;
let computer = Arc::new(FastestResultKeyVersionsResultComputer::new(self.core.self_node_id.clone(), encrypted_data.as_ref(),
configured_nodes_count, configured_nodes_count));
Ok(Arc::new(KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams {
let (session, oneshot) = KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams {
meta: ShareChangeSessionMeta {
id: id.id.clone(),
self_node_id: self.core.self_node_id.clone(),
@ -384,7 +433,8 @@ impl ClusterSessionCreator<KeyVersionNegotiationSessionImpl<VersionNegotiationTr
nonce: nonce,
},
nonce: nonce,
})))
});
Ok(WaitableSession::new(session, oneshot))
}
}
@ -398,7 +448,7 @@ pub struct AdminSessionCreator {
pub servers_set_change_session_creator_connector: Arc<ServersSetChangeSessionCreatorConnector>,
}
impl ClusterSessionCreator<AdminSession, AdminSessionCreationData> for AdminSessionCreator {
impl ClusterSessionCreator<AdminSession> for AdminSessionCreator {
fn creation_data_from_message(message: &Message) -> Result<Option<AdminSessionCreationData>, Error> {
match *message {
Message::ServersSetChange(ServersSetChangeMessage::ServersSetChangeConsensusMessage(ref message)) => match &message.message {
@ -424,11 +474,18 @@ impl ClusterSessionCreator<AdminSession, AdminSessionCreationData> for AdminSess
}))
}
fn create(&self, cluster: Arc<Cluster>, master: NodeId, nonce: Option<u64>, id: SessionId, creation_data: Option<AdminSessionCreationData>) -> Result<Arc<AdminSession>, Error> {
fn create(
&self,
cluster: Arc<Cluster>,
master: NodeId,
nonce: Option<u64>,
id: SessionId,
creation_data: Option<AdminSessionCreationData>,
) -> Result<WaitableSession<AdminSession>, Error> {
let nonce = self.core.check_session_nonce(&master, nonce)?;
Ok(Arc::new(match creation_data {
match creation_data {
Some(AdminSessionCreationData::ShareAdd(version)) => {
AdminSession::ShareAdd(ShareAddSessionImpl::new(ShareAddSessionParams {
let (session, oneshot) = ShareAddSessionImpl::new(ShareAddSessionParams {
meta: ShareChangeSessionMeta {
id: id.clone(),
self_node_id: self.core.self_node_id.clone(),
@ -440,13 +497,14 @@ impl ClusterSessionCreator<AdminSession, AdminSessionCreationData> for AdminSess
key_storage: self.core.key_storage.clone(),
nonce: nonce,
admin_public: Some(self.admin_public.clone().ok_or(Error::AccessDenied)?),
})?)
})?;
Ok(WaitableSession::new(AdminSession::ShareAdd(session), oneshot))
},
Some(AdminSessionCreationData::ServersSetChange(migration_id, new_nodes_set)) => {
let admin_public = self.servers_set_change_session_creator_connector.admin_public(migration_id.as_ref(), new_nodes_set)
.map_err(|_| Error::AccessDenied)?;
AdminSession::ServersSetChange(ServersSetChangeSessionImpl::new(ServersSetChangeSessionParams {
let (session, oneshot) = ServersSetChangeSessionImpl::new(ServersSetChangeSessionParams {
meta: ShareChangeSessionMeta {
id: id.clone(),
self_node_id: self.core.self_node_id.clone(),
@ -460,10 +518,11 @@ impl ClusterSessionCreator<AdminSession, AdminSessionCreationData> for AdminSess
all_nodes_set: cluster.nodes(),
admin_public: admin_public,
migration_id: migration_id,
})?)
})?;
Ok(WaitableSession::new(AdminSession::ServersSetChange(session), oneshot))
},
None => unreachable!("expected to call with non-empty creation data; qed"),
}))
}
}
}

View File

@ -324,9 +324,10 @@ fn session_state(session: Option<Arc<AdminSession>>) -> SessionState {
session
.and_then(|s| match s.as_servers_set_change() {
Some(s) if !s.is_finished() => Some(SessionState::Active(s.migration_id().cloned())),
Some(s) => match s.wait() {
Ok(_) => Some(SessionState::Finished(s.migration_id().cloned())),
Err(_) => Some(SessionState::Failed(s.migration_id().cloned())),
Some(s) => match s.result() {
Some(Ok(_)) => Some(SessionState::Finished(s.migration_id().cloned())),
Some(Err(_)) => Some(SessionState::Failed(s.migration_id().cloned())),
None => unreachable!("s.is_finished() == true; when session is finished, result is available; qed"),
},
None => None,
})

View File

@ -25,7 +25,7 @@ pub use super::serialization::{SerializableSignature, SerializableH256, Serializ
SerializableRequester, SerializableMessageHash, SerializableAddress};
pub use self::cluster::{new_network_cluster, ClusterCore, ClusterConfiguration, ClusterClient};
pub use self::cluster_connections_net::NetConnectionsManagerConfig;
pub use self::cluster_sessions::{ClusterSession, ClusterSessionsListener};
pub use self::cluster_sessions::{ClusterSession, ClusterSessionsListener, WaitableSession};
#[cfg(test)]
pub use self::cluster::tests::DummyClusterClient;

View File

@ -16,6 +16,7 @@
use std::collections::BTreeSet;
use std::sync::{Arc, Weak};
use futures::future::{ok, result};
use hyper::{self, Uri, Request as HttpRequest, Response as HttpResponse, Method as HttpMethod,
StatusCode as HttpStatusCode, Body,
header::{self, HeaderValue},
@ -129,95 +130,86 @@ impl KeyServerHttpListener {
}
impl KeyServerHttpHandler {
fn process(self, req_method: HttpMethod, req_uri: Uri, path: &str, req_body: &[u8], cors: AllowCors<AccessControlAllowOrigin>) -> HttpResponse<Body> {
fn key_server(&self) -> Result<Arc<KeyServer>, Error> {
self.handler.key_server.upgrade()
.ok_or_else(|| Error::Internal("KeyServer is already destroyed".into()))
}
fn process(
self,
req_method: HttpMethod,
req_uri: Uri,
path: &str,
req_body: &[u8],
cors: AllowCors<AccessControlAllowOrigin>,
) -> Box<Future<Item=HttpResponse<Body>, Error=hyper::Error> + Send> {
match parse_request(&req_method, &path, &req_body) {
Request::GenerateServerKey(document, signature, threshold) => {
return_server_public_key(&req_uri, cors, self.handler.key_server.upgrade()
.map(|key_server| key_server.generate_key(&document, &signature.into(), threshold))
.unwrap_or(Err(Error::Internal("KeyServer is already destroyed".into())))
.map_err(|err| {
warn!(target: "secretstore", "GenerateServerKey request {} has failed with: {}", req_uri, err);
err
}))
},
Request::StoreDocumentKey(document, signature, common_point, encrypted_document_key) => {
return_empty(&req_uri, cors, self.handler.key_server.upgrade()
.map(|key_server| key_server.store_document_key(&document, &signature.into(), common_point, encrypted_document_key))
.unwrap_or(Err(Error::Internal("KeyServer is already destroyed".into())))
.map_err(|err| {
warn!(target: "secretstore", "StoreDocumentKey request {} has failed with: {}", req_uri, err);
err
}))
},
Request::GenerateDocumentKey(document, signature, threshold) => {
return_document_key(&req_uri, cors, self.handler.key_server.upgrade()
.map(|key_server| key_server.generate_document_key(&document, &signature.into(), threshold))
.unwrap_or(Err(Error::Internal("KeyServer is already destroyed".into())))
.map_err(|err| {
warn!(target: "secretstore", "GenerateDocumentKey request {} has failed with: {}", req_uri, err);
err
}))
},
Request::GetServerKey(document, signature) => {
return_server_public_key(&req_uri, cors, self.handler.key_server.upgrade()
.map(|key_server| key_server.restore_key_public(&document, &signature.into()))
.unwrap_or(Err(Error::Internal("KeyServer is already destroyed".into())))
.map_err(|err| {
warn!(target: "secretstore", "GetServerKey request {} has failed with: {}", req_uri, err);
err
}))
},
Request::GetDocumentKey(document, signature) => {
return_document_key(&req_uri, cors, self.handler.key_server.upgrade()
.map(|key_server| key_server.restore_document_key(&document, &signature.into()))
.unwrap_or(Err(Error::Internal("KeyServer is already destroyed".into())))
.map_err(|err| {
warn!(target: "secretstore", "GetDocumentKey request {} has failed with: {}", req_uri, err);
err
}))
},
Request::GetDocumentKeyShadow(document, signature) => {
return_document_key_shadow(&req_uri, cors, self.handler.key_server.upgrade()
.map(|key_server| key_server.restore_document_key_shadow(&document, &signature.into()))
.unwrap_or(Err(Error::Internal("KeyServer is already destroyed".into())))
.map_err(|err| {
warn!(target: "secretstore", "GetDocumentKeyShadow request {} has failed with: {}", req_uri, err);
err
}))
},
Request::SchnorrSignMessage(document, signature, message_hash) => {
return_message_signature(&req_uri, cors, self.handler.key_server.upgrade()
.map(|key_server| key_server.sign_message_schnorr(&document, &signature.into(), message_hash))
.unwrap_or(Err(Error::Internal("KeyServer is already destroyed".into())))
.map_err(|err| {
warn!(target: "secretstore", "SchnorrSignMessage request {} has failed with: {}", req_uri, err);
err
}))
},
Request::EcdsaSignMessage(document, signature, message_hash) => {
return_message_signature(&req_uri, cors, self.handler.key_server.upgrade()
.map(|key_server| key_server.sign_message_ecdsa(&document, &signature.into(), message_hash))
.unwrap_or(Err(Error::Internal("KeyServer is already destroyed".into())))
.map_err(|err| {
warn!(target: "secretstore", "EcdsaSignMessage request {} has failed with: {}", req_uri, err);
err
}))
},
Request::ChangeServersSet(old_set_signature, new_set_signature, new_servers_set) => {
return_empty(&req_uri, cors, self.handler.key_server.upgrade()
.map(|key_server| key_server.change_servers_set(old_set_signature, new_set_signature, new_servers_set))
.unwrap_or(Err(Error::Internal("KeyServer is already destroyed".into())))
.map_err(|err| {
warn!(target: "secretstore", "ChangeServersSet request {} has failed with: {}", req_uri, err);
err
}))
},
Request::GenerateServerKey(document, signature, threshold) =>
Box::new(result(self.key_server())
.and_then(move |key_server| key_server.generate_key(document, signature.into(), threshold))
.then(move |result| ok(return_server_public_key("GenerateServerKey", &req_uri, cors, result)))),
Request::StoreDocumentKey(document, signature, common_point, encrypted_document_key) =>
Box::new(result(self.key_server())
.and_then(move |key_server| key_server.store_document_key(
document,
signature.into(),
common_point,
encrypted_document_key,
))
.then(move |result| ok(return_empty("StoreDocumentKey", &req_uri, cors, result)))),
Request::GenerateDocumentKey(document, signature, threshold) =>
Box::new(result(self.key_server())
.and_then(move |key_server| key_server.generate_document_key(
document,
signature.into(),
threshold,
))
.then(move |result| ok(return_document_key("GenerateDocumentKey", &req_uri, cors, result)))),
Request::GetServerKey(document, signature) =>
Box::new(result(self.key_server())
.and_then(move |key_server| key_server.restore_key_public(
document,
signature.into(),
))
.then(move |result| ok(return_server_public_key("GetServerKey", &req_uri, cors, result)))),
Request::GetDocumentKey(document, signature) =>
Box::new(result(self.key_server())
.and_then(move |key_server| key_server.restore_document_key(document, signature.into()))
.then(move |result| ok(return_document_key("GetDocumentKey", &req_uri, cors, result)))),
Request::GetDocumentKeyShadow(document, signature) =>
Box::new(result(self.key_server())
.and_then(move |key_server| key_server.restore_document_key_shadow(document, signature.into()))
.then(move |result| ok(return_document_key_shadow("GetDocumentKeyShadow", &req_uri, cors, result)))),
Request::SchnorrSignMessage(document, signature, message_hash) =>
Box::new(result(self.key_server())
.and_then(move |key_server| key_server.sign_message_schnorr(
document,
signature.into(),
message_hash,
))
.then(move |result| ok(return_message_signature("SchnorrSignMessage", &req_uri, cors, result)))),
Request::EcdsaSignMessage(document, signature, message_hash) =>
Box::new(result(self.key_server())
.and_then(move |key_server| key_server.sign_message_ecdsa(
document,
signature.into(),
message_hash,
))
.then(move |result| ok(return_message_signature("EcdsaSignMessage", &req_uri, cors, result)))),
Request::ChangeServersSet(old_set_signature, new_set_signature, new_servers_set) =>
Box::new(result(self.key_server())
.and_then(move |key_server| key_server.change_servers_set(
old_set_signature,
new_set_signature,
new_servers_set,
))
.then(move |result| ok(return_empty("ChangeServersSet", &req_uri, cors, result)))),
Request::Invalid => {
warn!(target: "secretstore", "Ignoring invalid {}-request {}", req_method, req_uri);
HttpResponse::builder()
Box::new(ok(HttpResponse::builder()
.status(HttpStatusCode::BAD_REQUEST)
.body(Body::empty())
.expect("Nothing to parse, cannot fail; qed")
.expect("Nothing to parse, cannot fail; qed")))
},
}
}
@ -241,59 +233,72 @@ impl Service for KeyServerHttpHandler {
Box::new(future::ok(HttpResponse::builder()
.status(HttpStatusCode::NOT_FOUND)
.body(Body::empty())
.expect("Nothing to parse, cannot fail; qed")
))
.expect("Nothing to parse, cannot fail; qed")))
},
_ => {
let req_method = req.method().clone();
let req_uri = req.uri().clone();
let path = req_uri.path().to_string();
// We cannot consume Self because of the Service trait requirement.
let this = self.clone();
Box::new(req.into_body().concat2().map(move |body| {
let path = req_uri.path().to_string();
if path.starts_with("/") {
this.process(req_method, req_uri, &path, &body, cors)
} else {
warn!(target: "secretstore", "Ignoring invalid {}-request {}", req_method, req_uri);
HttpResponse::builder()
.status(HttpStatusCode::NOT_FOUND)
.body(Body::empty())
.expect("Nothing to parse, cannot fail; qed")
}
}))
Box::new(req.into_body().concat2()
.and_then(move |body| this.process(req_method, req_uri, &path, &body, cors)))
}
}
}
}
fn return_empty(req_uri: &Uri, cors: AllowCors<AccessControlAllowOrigin>, empty: Result<(), Error>) -> HttpResponse<Body> {
return_bytes::<i32>(req_uri, cors, empty.map(|_| None))
fn return_empty(req_type: &str, req_uri: &Uri, cors: AllowCors<AccessControlAllowOrigin>, empty: Result<(), Error>) -> HttpResponse<Body> {
return_bytes::<i32>(req_type, req_uri, cors, empty.map(|_| None))
}
fn return_server_public_key(req_uri: &Uri, cors: AllowCors<AccessControlAllowOrigin>, server_public: Result<Public, Error>) -> HttpResponse<Body> {
return_bytes(req_uri, cors, server_public.map(|k| Some(SerializablePublic(k))))
fn return_server_public_key(
req_type: &str,
req_uri: &Uri,
cors: AllowCors<AccessControlAllowOrigin>,
server_public: Result<Public, Error>,
) -> HttpResponse<Body> {
return_bytes(req_type, req_uri, cors, server_public.map(|k| Some(SerializablePublic(k))))
}
fn return_message_signature(req_uri: &Uri, cors: AllowCors<AccessControlAllowOrigin>, signature: Result<EncryptedDocumentKey, Error>) -> HttpResponse<Body> {
return_bytes(req_uri, cors, signature.map(|s| Some(SerializableBytes(s))))
fn return_message_signature(
req_type: &str,
req_uri: &Uri,
cors: AllowCors<AccessControlAllowOrigin>,
signature: Result<EncryptedDocumentKey, Error>,
) -> HttpResponse<Body> {
return_bytes(req_type, req_uri, cors, signature.map(|s| Some(SerializableBytes(s))))
}
fn return_document_key(req_uri: &Uri, cors: AllowCors<AccessControlAllowOrigin>, document_key: Result<EncryptedDocumentKey, Error>) -> HttpResponse<Body> {
return_bytes(req_uri, cors, document_key.map(|k| Some(SerializableBytes(k))))
fn return_document_key(
req_type: &str,
req_uri: &Uri,
cors: AllowCors<AccessControlAllowOrigin>,
document_key: Result<EncryptedDocumentKey, Error>,
) -> HttpResponse<Body> {
return_bytes(req_type, req_uri, cors, document_key.map(|k| Some(SerializableBytes(k))))
}
fn return_document_key_shadow(req_uri: &Uri, cors: AllowCors<AccessControlAllowOrigin>, document_key_shadow: Result<EncryptedDocumentKeyShadow, Error>)
-> HttpResponse<Body>
{
return_bytes(req_uri, cors, document_key_shadow.map(|k| Some(SerializableEncryptedDocumentKeyShadow {
fn return_document_key_shadow(
req_type: &str,
req_uri: &Uri,
cors: AllowCors<AccessControlAllowOrigin>,
document_key_shadow: Result<EncryptedDocumentKeyShadow, Error>,
) -> HttpResponse<Body> {
return_bytes(req_type, req_uri, cors, document_key_shadow.map(|k| Some(SerializableEncryptedDocumentKeyShadow {
decrypted_secret: k.decrypted_secret.into(),
common_point: k.common_point.expect("always filled when requesting document_key_shadow; qed").into(),
decrypt_shadows: k.decrypt_shadows.expect("always filled when requesting document_key_shadow; qed").into_iter().map(Into::into).collect()
})))
}
fn return_bytes<T: Serialize>(req_uri: &Uri, cors: AllowCors<AccessControlAllowOrigin>, result: Result<Option<T>, Error>) -> HttpResponse<Body> {
fn return_bytes<T: Serialize>(
req_type: &str,
req_uri: &Uri,
cors: AllowCors<AccessControlAllowOrigin>,
result: Result<Option<T>, Error>,
) -> HttpResponse<Body> {
match result {
Ok(Some(result)) => match serde_json::to_vec(&result) {
Ok(result) => {
@ -321,7 +326,10 @@ fn return_bytes<T: Serialize>(req_uri: &Uri, cors: AllowCors<AccessControlAllowO
}
builder.body(Body::empty()).expect("Nothing to parse, cannot fail; qed")
},
Err(err) => return_error(err),
Err(err) => {
warn!(target: "secretstore", "{} request {} has failed with: {}", req_type, req_uri, err);
return_error(err)
},
}
}

View File

@ -22,6 +22,7 @@ mod tasks_queue;
use std::collections::BTreeSet;
use std::sync::Arc;
use futures::Future;
use traits::{ServerKeyGenerator, DocumentKeyServer, MessageSigner, AdminSessionsServer, KeyServer};
use types::{Error, Public, MessageHash, EncryptedMessageSignature, RequestSignature, ServerKeyId,
EncryptedDocumentKey, EncryptedDocumentKeyShadow, NodeId, Requester};
@ -72,45 +73,88 @@ impl Listener {
impl KeyServer for Listener {}
impl ServerKeyGenerator for Listener {
fn generate_key(&self, key_id: &ServerKeyId, author: &Requester, threshold: usize) -> Result<Public, Error> {
fn generate_key(
&self,
key_id: ServerKeyId,
author: Requester,
threshold: usize,
) -> Box<Future<Item=Public, Error=Error> + Send> {
self.key_server.generate_key(key_id, author, threshold)
}
fn restore_key_public(&self, key_id: &ServerKeyId, author: &Requester) -> Result<Public, Error> {
fn restore_key_public(
&self,
key_id: ServerKeyId,
author: Requester,
) -> Box<Future<Item=Public, Error=Error> + Send> {
self.key_server.restore_key_public(key_id, author)
}
}
impl DocumentKeyServer for Listener {
fn store_document_key(&self, key_id: &ServerKeyId, author: &Requester, common_point: Public, encrypted_document_key: Public) -> Result<(), Error> {
fn store_document_key(
&self,
key_id: ServerKeyId,
author: Requester,
common_point: Public,
encrypted_document_key: Public,
) -> Box<Future<Item=(), Error=Error> + Send> {
self.key_server.store_document_key(key_id, author, common_point, encrypted_document_key)
}
fn generate_document_key(&self, key_id: &ServerKeyId, author: &Requester, threshold: usize) -> Result<EncryptedDocumentKey, Error> {
fn generate_document_key(
&self,
key_id: ServerKeyId,
author: Requester,
threshold: usize,
) -> Box<Future<Item=EncryptedDocumentKey, Error=Error> + Send> {
self.key_server.generate_document_key(key_id, author, threshold)
}
fn restore_document_key(&self, key_id: &ServerKeyId, requester: &Requester) -> Result<EncryptedDocumentKey, Error> {
fn restore_document_key(
&self,
key_id: ServerKeyId,
requester: Requester,
) -> Box<Future<Item=EncryptedDocumentKey, Error=Error> + Send> {
self.key_server.restore_document_key(key_id, requester)
}
fn restore_document_key_shadow(&self, key_id: &ServerKeyId, requester: &Requester) -> Result<EncryptedDocumentKeyShadow, Error> {
fn restore_document_key_shadow(
&self,
key_id: ServerKeyId,
requester: Requester,
) -> Box<Future<Item=EncryptedDocumentKeyShadow, Error=Error> + Send> {
self.key_server.restore_document_key_shadow(key_id, requester)
}
}
impl MessageSigner for Listener {
fn sign_message_schnorr(&self, key_id: &ServerKeyId, requester: &Requester, message: MessageHash) -> Result<EncryptedMessageSignature, Error> {
fn sign_message_schnorr(
&self,
key_id: ServerKeyId,
requester: Requester,
message: MessageHash,
) -> Box<Future<Item=EncryptedMessageSignature, Error=Error> + Send> {
self.key_server.sign_message_schnorr(key_id, requester, message)
}
fn sign_message_ecdsa(&self, key_id: &ServerKeyId, requester: &Requester, message: MessageHash) -> Result<EncryptedMessageSignature, Error> {
fn sign_message_ecdsa(
&self,
key_id: ServerKeyId,
requester: Requester,
message: MessageHash,
) -> Box<Future<Item=EncryptedMessageSignature, Error=Error> + Send> {
self.key_server.sign_message_ecdsa(key_id, requester, message)
}
}
impl AdminSessionsServer for Listener {
fn change_servers_set(&self, old_set_signature: RequestSignature, new_set_signature: RequestSignature, new_servers_set: BTreeSet<NodeId>) -> Result<(), Error> {
fn change_servers_set(
&self,
old_set_signature: RequestSignature,
new_set_signature: RequestSignature,
new_servers_set: BTreeSet<NodeId>,
) -> Box<Future<Item=(), Error=Error> + Send> {
self.key_server.change_servers_set(old_set_signature, new_set_signature, new_servers_set)
}
}

View File

@ -467,7 +467,7 @@ impl ClusterSessionsListener<GenerationSession> for ServiceContractListener {
// ignore result - the only thing that we can do is to log the error
let server_key_id = session.id();
if let Some(origin) = session.origin() {
if let Some(generation_result) = session.wait(Some(Default::default())) {
if let Some(generation_result) = session.result() {
let generation_result = generation_result.map(Some).map_err(Into::into);
let _ = Self::process_server_key_generation_result(&self.data, origin, &server_key_id, generation_result);
}
@ -484,7 +484,7 @@ impl ClusterSessionsListener<DecryptionSession> for ServiceContractListener {
let session_id = session.id();
let server_key_id = session_id.id;
if let (Some(requester), Some(origin)) = (session.requester().and_then(|r| r.address(&server_key_id).ok()), session.origin()) {
if let Some(retrieval_result) = session.wait(Some(Default::default())) {
if let Some(retrieval_result) = session.result() {
let retrieval_result = retrieval_result.map(|key_shadow|
session.broadcast_shadows()
.and_then(|broadcast_shadows|
@ -509,8 +509,8 @@ impl ClusterSessionsListener<KeyVersionNegotiationSession<KeyVersionNegotiationT
// we're interested in:
// 1) sessions failed with fatal error
// 2) with decryption continue action
let error = match session.wait() {
Err(ref error) if !error.is_non_fatal() => error.clone(),
let error = match session.result() {
Some(Err(ref error)) if !error.is_non_fatal() => error.clone(),
_ => return,
};

View File

@ -15,6 +15,7 @@
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
use std::collections::BTreeSet;
use futures::Future;
use ethkey::{KeyPair, Signature, Error as EthKeyError};
use ethereum_types::{H256, Address};
use types::{Error, Public, ServerKeyId, MessageHash, EncryptedMessageSignature, RequestSignature, Requester,
@ -39,11 +40,20 @@ pub trait ServerKeyGenerator {
/// `author` is the author of key entry.
/// `threshold + 1` is the minimal number of nodes, required to restore private key.
/// Result is a public portion of SK.
fn generate_key(&self, key_id: &ServerKeyId, author: &Requester, threshold: usize) -> Result<Public, Error>;
fn generate_key(
&self,
key_id: ServerKeyId,
author: Requester,
threshold: usize,
) -> Box<Future<Item=Public, Error=Error> + Send>;
/// Retrieve public portion of previously generated SK.
/// `key_id` is identifier of previously generated SK.
/// `author` is the same author, that has created the server key.
fn restore_key_public(&self, key_id: &ServerKeyId, author: &Requester) -> Result<Public, Error>;
fn restore_key_public(
&self,
key_id: ServerKeyId,
author: Requester,
) -> Box<Future<Item=Public, Error=Error> + Send>;
}
/// Document key (DK) server.
@ -54,20 +64,35 @@ pub trait DocumentKeyServer: ServerKeyGenerator {
/// `common_point` is a result of `k * T` expression, where `T` is generation point and `k` is random scalar in EC field.
/// `encrypted_document_key` is a result of `M + k * y` expression, where `M` is unencrypted document key (point on EC),
/// `k` is the same scalar used in `common_point` calculation and `y` is previously generated public part of SK.
fn store_document_key(&self, key_id: &ServerKeyId, author: &Requester, common_point: Public, encrypted_document_key: Public) -> Result<(), Error>;
fn store_document_key(
&self,
key_id: ServerKeyId,
author: Requester,
common_point: Public,
encrypted_document_key: Public,
) -> Box<Future<Item=(), Error=Error> + Send>;
/// Generate and store both SK and DK. This is a shortcut for consequent calls of `generate_key` and `store_document_key`.
/// The only difference is that DK is generated by DocumentKeyServer (which might be considered unsafe).
/// `key_id` is the caller-provided identifier of generated SK.
/// `author` is the author of server && document key entry.
/// `threshold + 1` is the minimal number of nodes, required to restore private key.
/// Result is a DK, encrypted with caller public key.
fn generate_document_key(&self, key_id: &ServerKeyId, author: &Requester, threshold: usize) -> Result<EncryptedDocumentKey, Error>;
fn generate_document_key(
&self,
key_id: ServerKeyId,
author: Requester,
threshold: usize,
) -> Box<Future<Item=EncryptedDocumentKey, Error=Error> + Send>;
/// Restore previously stored DK.
/// DK is decrypted on the key server (which might be considered unsafe), and then encrypted with caller public key.
/// `key_id` is identifier of previously generated SK.
/// `requester` is the one who requests access to document key. Caller must be on ACL for this function to succeed.
/// Result is a DK, encrypted with caller public key.
fn restore_document_key(&self, key_id: &ServerKeyId, requester: &Requester) -> Result<EncryptedDocumentKey, Error>;
fn restore_document_key(
&self,
key_id: ServerKeyId,
requester: Requester,
) -> Box<Future<Item=EncryptedDocumentKey, Error=Error> + Send>;
/// Restore previously stored DK.
/// To decrypt DK on client:
/// 1) use requestor secret key to decrypt secret coefficients from result.decrypt_shadows
@ -75,7 +100,11 @@ pub trait DocumentKeyServer: ServerKeyGenerator {
/// 3) calculate decrypt_shadow_point: decrypt_shadows_sum * result.common_point
/// 4) calculate decrypted_secret: result.decrypted_secret + decrypt_shadow_point
/// Result is a DK shadow.
fn restore_document_key_shadow(&self, key_id: &ServerKeyId, requester: &Requester) -> Result<EncryptedDocumentKeyShadow, Error>;
fn restore_document_key_shadow(
&self,
key_id: ServerKeyId,
requester: Requester,
) -> Box<Future<Item=EncryptedDocumentKeyShadow, Error=Error> + Send>;
}
/// Message signer.
@ -85,14 +114,24 @@ pub trait MessageSigner: ServerKeyGenerator {
/// `requester` is the one who requests access to server key private.
/// `message` is the message to be signed.
/// Result is a signed message, encrypted with caller public key.
fn sign_message_schnorr(&self, key_id: &ServerKeyId, requester: &Requester, message: MessageHash) -> Result<EncryptedMessageSignature, Error>;
fn sign_message_schnorr(
&self,
key_id: ServerKeyId,
requester: Requester,
message: MessageHash,
) -> Box<Future<Item=EncryptedMessageSignature, Error=Error> + Send>;
/// Generate ECDSA signature for message with previously generated SK.
/// WARNING: only possible when SK was generated using t <= 2 * N.
/// `key_id` is the caller-provided identifier of generated SK.
/// `signature` is `key_id`, signed with caller public key.
/// `message` is the message to be signed.
/// Result is a signed message, encrypted with caller public key.
fn sign_message_ecdsa(&self, key_id: &ServerKeyId, signature: &Requester, message: MessageHash) -> Result<EncryptedMessageSignature, Error>;
fn sign_message_ecdsa(
&self,
key_id: ServerKeyId,
signature: Requester,
message: MessageHash,
) -> Box<Future<Item=EncryptedMessageSignature, Error=Error> + Send>;
}
/// Administrative sessions server.
@ -101,7 +140,12 @@ pub trait AdminSessionsServer {
/// And old nodes (i.e. cluster nodes except new_servers_set) have clear databases.
/// WARNING: newly generated keys will be distributed among all cluster nodes. So this session
/// must be followed with cluster nodes change (either via contract, or config files).
fn change_servers_set(&self, old_set_signature: RequestSignature, new_set_signature: RequestSignature, new_servers_set: BTreeSet<NodeId>) -> Result<(), Error>;
fn change_servers_set(
&self,
old_set_signature: RequestSignature,
new_set_signature: RequestSignature,
new_servers_set: BTreeSet<NodeId>,
) -> Box<Future<Item=(), Error=Error> + Send>;
}
/// Key server.