SecretStore: service pack 1 (#8435)

* SecretStore: error unify initial commit

SecretStore: pass real error in error messages

SecretStore: is_internal_error -> Error::is_non_fatal

warnings

SecretStore: ConsensusTemporaryUnreachable

fix after merge

removed comments

removed comments

SecretStore: updated HTTP error responses

SecretStore: more ConsensusTemporaryUnreachable tests

fix after rebase

* SecretStore: unified SS contract config options && read

* SecretStore: service pack

SecretStore: service pack (continue)

* fixed grumbles
This commit is contained in:
Svyatoslav Nikolsky 2018-06-14 10:01:52 +03:00 committed by GitHub
parent b37b3cd1fc
commit 6f758bc7b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 853 additions and 434 deletions

View File

@ -563,38 +563,42 @@ usage! {
"--no-secretstore-http",
"Disable Secret Store HTTP API.",
FLAG flag_no_secretstore_acl_check: (bool) = false, or |c: &Config| c.secretstore.as_ref()?.disable_acl_check.clone(),
"--no-acl-check",
"Disable ACL check (useful for test environments).",
FLAG flag_no_secretstore_auto_migrate: (bool) = false, or |c: &Config| c.secretstore.as_ref()?.disable_auto_migrate.clone(),
"--no-secretstore-auto-migrate",
"Do not run servers set change session automatically when servers set changes. This option has no effect when servers set is read from configuration file.",
ARG arg_secretstore_contract: (String) = "none", or |c: &Config| c.secretstore.as_ref()?.service_contract.clone(),
ARG arg_secretstore_acl_contract: (Option<String>) = Some("registry".into()), or |c: &Config| c.secretstore.as_ref()?.acl_contract.clone(),
"--secretstore-acl-contract=[SOURCE]",
"Secret Store permissioning contract address source: none, registry (contract address is read from 'secretstore_acl_checker' entry in registry) or address.",
ARG arg_secretstore_contract: (Option<String>) = None, or |c: &Config| c.secretstore.as_ref()?.service_contract.clone(),
"--secretstore-contract=[SOURCE]",
"Secret Store Service contract address source: none, registry (contract address is read from secretstore_service entry in registry) or address.",
"Secret Store Service contract address source: none, registry (contract address is read from 'secretstore_service' entry in registry) or address.",
ARG arg_secretstore_srv_gen_contract: (String) = "none", or |c: &Config| c.secretstore.as_ref()?.service_contract_srv_gen.clone(),
ARG arg_secretstore_srv_gen_contract: (Option<String>) = None, or |c: &Config| c.secretstore.as_ref()?.service_contract_srv_gen.clone(),
"--secretstore-srv-gen-contract=[SOURCE]",
"Secret Store Service server key generation contract address source: none, registry (contract address is read from secretstore_service_srv_gen entry in registry) or address.",
"Secret Store Service server key generation contract address source: none, registry (contract address is read from 'secretstore_service_srv_gen' entry in registry) or address.",
ARG arg_secretstore_srv_retr_contract: (String) = "none", or |c: &Config| c.secretstore.as_ref()?.service_contract_srv_retr.clone(),
ARG arg_secretstore_srv_retr_contract: (Option<String>) = None, or |c: &Config| c.secretstore.as_ref()?.service_contract_srv_retr.clone(),
"--secretstore-srv-retr-contract=[SOURCE]",
"Secret Store Service server key retrieval contract address source: none, registry (contract address is read from secretstore_service_srv_retr entry in registry) or address.",
"Secret Store Service server key retrieval contract address source: none, registry (contract address is read from 'secretstore_service_srv_retr' entry in registry) or address.",
ARG arg_secretstore_doc_store_contract: (String) = "none", or |c: &Config| c.secretstore.as_ref()?.service_contract_doc_store.clone(),
ARG arg_secretstore_doc_store_contract: (Option<String>) = None, or |c: &Config| c.secretstore.as_ref()?.service_contract_doc_store.clone(),
"--secretstore-doc-store-contract=[SOURCE]",
"Secret Store Service document key store contract address source: none, registry (contract address is read from secretstore_service_doc_store entry in registry) or address.",
"Secret Store Service document key store contract address source: none, registry (contract address is read from 'secretstore_service_doc_store' entry in registry) or address.",
ARG arg_secretstore_doc_sretr_contract: (String) = "none", or |c: &Config| c.secretstore.as_ref()?.service_contract_doc_sretr.clone(),
ARG arg_secretstore_doc_sretr_contract: (Option<String>) = None, or |c: &Config| c.secretstore.as_ref()?.service_contract_doc_sretr.clone(),
"--secretstore-doc-sretr-contract=[SOURCE]",
"Secret Store Service document key shadow retrieval contract address source: none, registry (contract address is read from secretstore_service_doc_sretr entry in registry) or address.",
"Secret Store Service document key shadow retrieval contract address source: none, registry (contract address is read from 'secretstore_service_doc_sretr' entry in registry) or address.",
ARG arg_secretstore_nodes: (String) = "", or |c: &Config| c.secretstore.as_ref()?.nodes.as_ref().map(|vec| vec.join(",")),
"--secretstore-nodes=[NODES]",
"Comma-separated list of other secret store cluster nodes in form NODE_PUBLIC_KEY_IN_HEX@NODE_IP_ADDR:NODE_PORT.",
ARG arg_secretstore_server_set_contract: (Option<String>) = Some("registry".into()), or |c: &Config| c.secretstore.as_ref()?.server_set_contract.clone(),
"--secretstore-server-set-contract=[SOURCE]",
"Secret Store server set contract address source: none, registry (contract address is read from 'secretstore_server_set' entry in registry) or address.",
ARG arg_secretstore_interface: (String) = "local", or |c: &Config| c.secretstore.as_ref()?.interface.clone(),
"--secretstore-interface=[IP]",
"Specify the hostname portion for listening to Secret Store Key Server internal requests, IP should be an interface's IP address, or local.",
@ -1193,8 +1197,8 @@ struct Dapps {
struct SecretStore {
disable: Option<bool>,
disable_http: Option<bool>,
disable_acl_check: Option<bool>,
disable_auto_migrate: Option<bool>,
acl_contract: Option<String>,
service_contract: Option<String>,
service_contract_srv_gen: Option<String>,
service_contract_srv_retr: Option<String>,
@ -1203,6 +1207,7 @@ struct SecretStore {
self_secret: Option<String>,
admin_public: Option<String>,
nodes: Option<Vec<String>>,
server_set_contract: Option<String>,
interface: Option<String>,
port: Option<u16>,
http_interface: Option<String>,
@ -1620,16 +1625,17 @@ mod tests {
// SECRETSTORE
flag_no_secretstore: false,
flag_no_secretstore_http: false,
flag_no_secretstore_acl_check: false,
flag_no_secretstore_auto_migrate: false,
arg_secretstore_contract: "none".into(),
arg_secretstore_srv_gen_contract: "none".into(),
arg_secretstore_srv_retr_contract: "none".into(),
arg_secretstore_doc_store_contract: "none".into(),
arg_secretstore_doc_sretr_contract: "none".into(),
arg_secretstore_acl_contract: Some("registry".into()),
arg_secretstore_contract: Some("none".into()),
arg_secretstore_srv_gen_contract: Some("none".into()),
arg_secretstore_srv_retr_contract: Some("none".into()),
arg_secretstore_doc_store_contract: Some("none".into()),
arg_secretstore_doc_sretr_contract: Some("none".into()),
arg_secretstore_secret: None,
arg_secretstore_admin_public: None,
arg_secretstore_nodes: "".into(),
arg_secretstore_server_set_contract: Some("registry".into()),
arg_secretstore_interface: "local".into(),
arg_secretstore_port: 8083u16,
arg_secretstore_http_interface: "local".into(),
@ -1881,8 +1887,8 @@ mod tests {
secretstore: Some(SecretStore {
disable: None,
disable_http: None,
disable_acl_check: None,
disable_auto_migrate: None,
acl_contract: None,
service_contract: None,
service_contract_srv_gen: None,
service_contract_srv_retr: None,
@ -1891,6 +1897,7 @@ mod tests {
self_secret: None,
admin_public: None,
nodes: None,
server_set_contract: None,
interface: None,
port: Some(8083),
http_interface: None,

View File

@ -91,12 +91,13 @@ pass = "test_pass"
[secretstore]
disable = false
disable_http = false
disable_acl_check = false
acl_contract = "registry"
service_contract = "none"
service_contract_srv_gen = "none"
service_contract_srv_retr = "none"
service_contract_doc_store = "none"
service_contract_doc_sretr = "none"
server_set_contract = "registry"
nodes = []
http_interface = "local"
http_port = 8082

View File

@ -604,8 +604,8 @@ impl Configuration {
Ok(SecretStoreConfiguration {
enabled: self.secretstore_enabled(),
http_enabled: self.secretstore_http_enabled(),
acl_check_enabled: self.secretstore_acl_check_enabled(),
auto_migrate_enabled: self.secretstore_auto_migrate_enabled(),
acl_check_contract_address: self.secretstore_acl_check_contract_address()?,
service_contract_address: self.secretstore_service_contract_address()?,
service_contract_srv_gen_address: self.secretstore_service_contract_srv_gen_address()?,
service_contract_srv_retr_address: self.secretstore_service_contract_srv_retr_address()?,
@ -613,6 +613,7 @@ impl Configuration {
service_contract_doc_sretr_address: self.secretstore_service_contract_doc_sretr_address()?,
self_secret: self.secretstore_self_secret()?,
nodes: self.secretstore_nodes()?,
key_server_set_contract_address: self.secretstore_key_server_set_contract_address()?,
interface: self.secretstore_interface(),
port: self.args.arg_ports_shift + self.args.arg_secretstore_port,
http_interface: self.secretstore_http_interface(),
@ -1099,14 +1100,14 @@ impl Configuration {
!self.args.flag_no_secretstore_http && cfg!(feature = "secretstore")
}
fn secretstore_acl_check_enabled(&self) -> bool {
!self.args.flag_no_secretstore_acl_check
}
fn secretstore_auto_migrate_enabled(&self) -> bool {
!self.args.flag_no_secretstore_auto_migrate
}
fn secretstore_acl_check_contract_address(&self) -> Result<Option<SecretStoreContractAddress>, String> {
into_secretstore_service_contract_address(self.args.arg_secretstore_acl_contract.as_ref())
}
fn secretstore_service_contract_address(&self) -> Result<Option<SecretStoreContractAddress>, String> {
into_secretstore_service_contract_address(self.args.arg_secretstore_contract.as_ref())
}
@ -1127,6 +1128,10 @@ impl Configuration {
into_secretstore_service_contract_address(self.args.arg_secretstore_doc_sretr_contract.as_ref())
}
fn secretstore_key_server_set_contract_address(&self) -> Result<Option<SecretStoreContractAddress>, String> {
into_secretstore_service_contract_address(self.args.arg_secretstore_server_set_contract.as_ref())
}
fn verifier_settings(&self) -> VerifierSettings {
let mut settings = VerifierSettings::default();
settings.scale_verifiers = self.args.flag_scale_verifiers;
@ -1145,11 +1150,11 @@ impl Configuration {
}
}
fn into_secretstore_service_contract_address(s: &str) -> Result<Option<SecretStoreContractAddress>, String> {
match s {
"none" => Ok(None),
"registry" => Ok(Some(SecretStoreContractAddress::Registry)),
a => Ok(Some(SecretStoreContractAddress::Address(a.parse().map_err(|e| format!("{}", e))?))),
fn into_secretstore_service_contract_address(s: Option<&String>) -> Result<Option<SecretStoreContractAddress>, String> {
match s.map(String::as_str) {
None | Some("none") => Ok(None),
Some("registry") => Ok(Some(SecretStoreContractAddress::Registry)),
Some(a) => Ok(Some(SecretStoreContractAddress::Address(a.parse().map_err(|e| format!("{}", e))?))),
}
}

View File

@ -50,10 +50,10 @@ pub struct Configuration {
pub enabled: bool,
/// Is HTTP API enabled?
pub http_enabled: bool,
/// Is ACL check enabled.
pub acl_check_enabled: bool,
/// Is auto migrate enabled.
pub auto_migrate_enabled: bool,
/// ACL check contract address.
pub acl_check_contract_address: Option<ContractAddress>,
/// Service contract address.
pub service_contract_address: Option<ContractAddress>,
/// Server key generation service contract address.
@ -68,6 +68,8 @@ pub struct Configuration {
pub self_secret: Option<NodeSecretKey>,
/// Other nodes IDs + addresses.
pub nodes: BTreeMap<Public, (String, u16)>,
/// Key Server Set contract address. If None, 'nodes' map is used.
pub key_server_set_contract_address: Option<ContractAddress>,
/// Interface to listen to
pub interface: String,
/// Port to listen to
@ -135,7 +137,7 @@ mod server {
impl KeyServer {
/// Create new key server
pub fn new(mut conf: Configuration, deps: Dependencies) -> Result<Self, String> {
if !conf.acl_check_enabled {
if conf.acl_check_contract_address.is_none() {
warn!("Running SecretStore with disabled ACL check: {}", Red.bold().paint("everyone has access to stored keys"));
}
@ -174,7 +176,7 @@ mod server {
service_contract_srv_retr_address: conf.service_contract_srv_retr_address.map(into_service_contract_address),
service_contract_doc_store_address: conf.service_contract_doc_store_address.map(into_service_contract_address),
service_contract_doc_sretr_address: conf.service_contract_doc_sretr_address.map(into_service_contract_address),
acl_check_enabled: conf.acl_check_enabled,
acl_check_contract_address: conf.acl_check_contract_address.map(into_service_contract_address),
cluster_config: ethcore_secretstore::ClusterConfiguration {
threads: 4,
listener_address: ethcore_secretstore::NodeAddress {
@ -185,6 +187,7 @@ mod server {
address: ip,
port: port,
})).collect(),
key_server_set_contract_address: conf.key_server_set_contract_address.map(into_service_contract_address),
allow_connecting_to_higher_nodes: true,
admin_public: conf.admin_public,
auto_migrate_enabled: conf.auto_migrate_enabled,
@ -212,8 +215,8 @@ impl Default for Configuration {
Configuration {
enabled: true,
http_enabled: true,
acl_check_enabled: true,
auto_migrate_enabled: true,
acl_check_contract_address: Some(ContractAddress::Registry),
service_contract_address: None,
service_contract_srv_gen_address: None,
service_contract_srv_retr_address: None,
@ -222,6 +225,7 @@ impl Default for Configuration {
self_secret: None,
admin_public: None,
nodes: BTreeMap::new(),
key_server_set_contract_address: Some(ContractAddress::Registry),
interface: "127.0.0.1".to_owned(),
port: 8083,
http_interface: "127.0.0.1".to_owned(),

View File

@ -18,11 +18,11 @@ use std::sync::Arc;
use std::collections::{HashMap, HashSet};
use std::time::Duration;
use parking_lot::{Mutex, RwLock};
use ethcore::client::{BlockId, ChainNotify, ChainRoute, CallContract, RegistryInfo};
use ethcore::client::{BlockId, ChainNotify, ChainRoute, CallContract};
use ethereum_types::{H256, Address};
use bytes::Bytes;
use trusted_client::TrustedClient;
use types::{Error, ServerKeyId};
use types::{Error, ServerKeyId, ContractAddress};
use_contract!(acl_storage, "AclStorage", "res/acl_storage.json");
@ -44,8 +44,10 @@ pub struct OnChainAclStorage {
struct CachedContract {
/// Blockchain client.
client: TrustedClient,
/// Contract address.
contract_addr: Option<Address>,
/// Contract address source.
address_source: ContractAddress,
/// Current contract address.
contract_address: Option<Address>,
/// Contract at given address.
contract: acl_storage::AclStorage,
}
@ -57,10 +59,10 @@ pub struct DummyAclStorage {
}
impl OnChainAclStorage {
pub fn new(trusted_client: TrustedClient) -> Result<Arc<Self>, Error> {
pub fn new(trusted_client: TrustedClient, address_source: ContractAddress) -> Result<Arc<Self>, Error> {
let client = trusted_client.get_untrusted();
let acl_storage = Arc::new(OnChainAclStorage {
contract: Mutex::new(CachedContract::new(trusted_client)),
contract: Mutex::new(CachedContract::new(trusted_client, address_source)),
});
client
.ok_or_else(|| Error::Internal("Constructing OnChainAclStorage without active Client".into()))?
@ -78,36 +80,37 @@ impl AclStorage for OnChainAclStorage {
impl ChainNotify for OnChainAclStorage {
fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, route: ChainRoute, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: Duration) {
if !route.enacted().is_empty() || !route.retracted().is_empty() {
self.contract.lock().update()
self.contract.lock().update_contract_address()
}
}
}
impl CachedContract {
pub fn new(client: TrustedClient) -> Self {
CachedContract {
pub fn new(client: TrustedClient, address_source: ContractAddress) -> Self {
let mut contract = CachedContract {
client,
contract_addr: None,
address_source,
contract_address: None,
contract: acl_storage::AclStorage::default(),
}
};
contract.update_contract_address();
contract
}
pub fn update(&mut self) {
if let Some(client) = self.client.get() {
match client.registry_address(ACL_CHECKER_CONTRACT_REGISTRY_NAME.to_owned(), BlockId::Latest) {
Some(new_contract_addr) if Some(new_contract_addr).as_ref() != self.contract_addr.as_ref() => {
trace!(target: "secretstore", "Configuring for ACL checker contract from {}", new_contract_addr);
self.contract_addr = Some(new_contract_addr);
},
Some(_) | None => ()
}
pub fn update_contract_address(&mut self) {
let contract_address = self.client.read_contract_address(ACL_CHECKER_CONTRACT_REGISTRY_NAME.into(), &self.address_source);
if contract_address != self.contract_address {
trace!(target: "secretstore", "Configuring for ACL checker contract from address {:?}",
contract_address);
self.contract_address = contract_address;
}
}
pub fn check(&mut self, requester: Address, document: &ServerKeyId) -> Result<bool, Error> {
if let Some(client) = self.client.get() {
// call contract to check access
match self.contract_addr {
// call contract to check accesss
match self.contract_address {
Some(contract_address) => {
let do_call = |data| client.call_contract(BlockId::Latest, contract_address, data);
self.contract.functions()

View File

@ -303,6 +303,7 @@ pub mod tests {
address: "127.0.0.1".into(),
port: start_port + (j as u16),
})).collect(),
key_server_set_contract_address: None,
allow_connecting_to_higher_nodes: false,
admin_public: None,
auto_migrate_enabled: false,
@ -312,7 +313,7 @@ pub mod tests {
.collect();
let key_storages = (0..num_nodes).map(|_| Arc::new(DummyKeyStorage::default())).collect::<Vec<_>>();
let key_servers: Vec<_> = configs.into_iter().enumerate().map(|(i, cfg)|
KeyServerImpl::new(&cfg, Arc::new(MapKeyServerSet::new(key_servers_set.clone())),
KeyServerImpl::new(&cfg, Arc::new(MapKeyServerSet::new(false, key_servers_set.clone())),
Arc::new(PlainNodeKeyPair::new(key_pairs[i].clone())),
Arc::new(DummyAclStorage::default()),
key_storages[i].clone()).unwrap()

View File

@ -25,7 +25,8 @@ use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSessi
use key_server_cluster::decryption_session::SessionImpl as DecryptionSession;
use key_server_cluster::signing_session_ecdsa::SessionImpl as EcdsaSigningSession;
use key_server_cluster::signing_session_schnorr::SessionImpl as SchnorrSigningSession;
use key_server_cluster::message::{Message, KeyVersionNegotiationMessage, RequestKeyVersions, KeyVersions};
use key_server_cluster::message::{Message, KeyVersionNegotiationMessage, RequestKeyVersions,
KeyVersions, KeyVersionsError, FailedKeyVersionContinueAction};
use key_server_cluster::admin_sessions::ShareChangeSessionMeta;
// TODO [Opt]: change sessions so that versions are sent by chunks.
@ -34,6 +35,8 @@ const VERSIONS_PER_MESSAGE: usize = 32;
/// Key version negotiation transport.
pub trait SessionTransport {
/// Broadcast message to all nodes.
fn broadcast(&self, message: KeyVersionNegotiationMessage) -> Result<(), Error>;
/// Send message to given node.
fn send(&self, node: &NodeId, message: KeyVersionNegotiationMessage) -> Result<(), Error>;
}
@ -63,6 +66,13 @@ pub enum ContinueAction {
EcdsaSign(Arc<EcdsaSigningSession>, H256),
}
/// Failed action after key version is negotiated.
#[derive(Clone, Debug, PartialEq)]
pub enum FailedContinueAction {
/// Decryption origin + requester.
Decrypt(Option<Address>, Address),
}
/// Immutable session data.
struct SessionCore<T: SessionTransport> {
/// Session meta.
@ -92,9 +102,11 @@ struct SessionData {
/// { Version => Nodes }
pub versions: Option<BTreeMap<H256, BTreeSet<NodeId>>>,
/// Session result.
pub result: Option<Result<(H256, NodeId), Error>>,
pub result: Option<Result<Option<(H256, NodeId)>, Error>>,
/// Continue action.
pub continue_with: Option<ContinueAction>,
/// Failed continue action (reported in error message by master node).
pub failed_continue_with: Option<FailedContinueAction>,
}
/// SessionImpl creation parameters
@ -155,6 +167,7 @@ pub struct LargestSupportResultComputer;
impl<T> SessionImpl<T> where T: SessionTransport {
/// Create new session.
pub fn new(params: SessionParams<T>) -> Self {
let threshold = params.key_share.as_ref().map(|key_share| key_share.threshold);
SessionImpl {
core: SessionCore {
meta: params.meta,
@ -168,10 +181,11 @@ impl<T> SessionImpl<T> where T: SessionTransport {
data: Mutex::new(SessionData {
state: SessionState::WaitingForInitialization,
confirmations: None,
threshold: None,
threshold: threshold,
versions: None,
result: None,
continue_with: None,
failed_continue_with: None,
})
}
}
@ -183,7 +197,8 @@ impl<T> SessionImpl<T> where T: SessionTransport {
/// Return key threshold.
pub fn key_threshold(&self) -> Result<usize, Error> {
Ok(self.data.lock().threshold.clone().ok_or(Error::InvalidStateForRequest)?)
self.data.lock().threshold.clone()
.ok_or(Error::InvalidStateForRequest)
}
/// Return result computer reference.
@ -203,8 +218,13 @@ impl<T> SessionImpl<T> where T: SessionTransport {
self.data.lock().continue_with.take()
}
/// Take failed continue action.
pub fn take_failed_continue_action(&self) -> Option<FailedContinueAction> {
self.data.lock().failed_continue_with.take()
}
/// Wait for session completion.
pub fn wait(&self) -> Result<(H256, NodeId), Error> {
pub fn wait(&self) -> Result<Option<(H256, NodeId)>, Error> {
Self::wait_session(&self.core.completed, &self.data, None, |data| data.result.clone())
.expect("wait_session returns Some if called without timeout; qed")
}
@ -270,6 +290,12 @@ impl<T> SessionImpl<T> where T: SessionTransport {
&KeyVersionNegotiationMessage::KeyVersions(ref message) =>
self.on_key_versions(sender, message),
&KeyVersionNegotiationMessage::KeyVersionsError(ref message) => {
// remember failed continue action
if let Some(FailedKeyVersionContinueAction::Decrypt(Some(ref origin), ref requester)) = message.continue_with {
self.data.lock().failed_continue_with =
Some(FailedContinueAction::Decrypt(Some(origin.clone().into()), requester.clone().into()));
}
self.on_session_error(sender, message.error.clone());
Ok(())
},
@ -309,6 +335,8 @@ impl<T> SessionImpl<T> where T: SessionTransport {
// update state
data.state = SessionState::Finished;
data.result = Some(Ok(None));
self.core.completed.notify_all();
Ok(())
}
@ -361,8 +389,47 @@ impl<T> SessionImpl<T> where T: SessionTransport {
let confirmations = data.confirmations.as_ref().expect(reason);
let versions = data.versions.as_ref().expect(reason);
if let Some(result) = core.result_computer.compute_result(data.threshold.clone(), confirmations, versions) {
// when the master node processing decryption service request, it starts with a key version negotiation session
// if the negotiation fails, only master node knows about it
// => if the error is fatal, only the master will know about it and report it to the contract && the request will never be rejected
// => let's broadcast fatal error so that every other node know about it, and, if it trusts to master node
// will report error to the contract
if let (Some(continue_with), Err(error)) = (data.continue_with.as_ref(), result.as_ref()) {
let origin = match *continue_with {
ContinueAction::Decrypt(_, origin, _, _) => origin.clone(),
_ => None,
};
let requester = match *continue_with {
ContinueAction::Decrypt(ref session, _, _, _) => session.requester().and_then(|r| r.address(&core.meta.id).ok()),
_ => None,
};
if origin.is_some() && requester.is_some() && !error.is_non_fatal() {
let requester = requester.expect("checked in above condition; qed");
data.failed_continue_with =
Some(FailedContinueAction::Decrypt(origin.clone(), requester.clone()));
let send_result = core.transport.broadcast(KeyVersionNegotiationMessage::KeyVersionsError(KeyVersionsError {
session: core.meta.id.clone().into(),
sub_session: core.sub_session.clone().into(),
session_nonce: core.nonce,
error: error.clone(),
continue_with: Some(FailedKeyVersionContinueAction::Decrypt(
origin.map(Into::into),
requester.into(),
)),
}));
if let Err(send_error) = send_result {
warn!(target: "secretstore_net", "{}: failed to broadcast key version negotiation error {}: {}",
core.meta.self_node_id, error, send_error);
}
}
}
data.state = SessionState::Finished;
data.result = Some(result);
data.result = Some(result.map(Some));
core.completed.notify_all();
}
}
@ -390,7 +457,7 @@ impl<T> ClusterSession for SessionImpl<T> where T: SessionTransport {
data.confirmations.as_mut().expect("checked a line above; qed").clear();
Self::try_complete(&self.core, &mut *data);
if data.state != SessionState::Finished {
warn!("{}: key version negotiation session failed with timeout", self.core.meta.self_node_id);
warn!(target: "secretstore_net", "{}: key version negotiation session failed with timeout", self.core.meta.self_node_id);
data.result = Some(Err(Error::ConsensusTemporaryUnreachable));
self.core.completed.notify_all();
@ -407,17 +474,22 @@ impl<T> ClusterSession for SessionImpl<T> where T: SessionTransport {
if data.confirmations.is_some() {
let is_waiting_for_confirmation = data.confirmations.as_mut().expect("checked a line above; qed").remove(node);
if is_waiting_for_confirmation {
Self::try_complete(&self.core, &mut *data);
if data.state != SessionState::Finished {
warn!("{}: key version negotiation session failed because {} connection has timeouted", self.core.meta.self_node_id, node);
if !is_waiting_for_confirmation {
return;
}
data.state = SessionState::Finished;
data.result = Some(Err(error));
self.core.completed.notify_all();
}
Self::try_complete(&self.core, &mut *data);
if data.state == SessionState::Finished {
return;
}
}
warn!(target: "secretstore_net", "{}: key version negotiation session failed because of {} from {}",
self.core.meta.self_node_id, error, node);
data.state = SessionState::Finished;
data.result = Some(Err(error));
self.core.completed.notify_all();
}
fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error> {
@ -429,6 +501,10 @@ impl<T> ClusterSession for SessionImpl<T> where T: SessionTransport {
}
impl SessionTransport for IsolatedSessionTransport {
fn broadcast(&self, message: KeyVersionNegotiationMessage) -> Result<(), Error> {
self.cluster.broadcast(Message::KeyVersionNegotiation(message))
}
fn send(&self, node: &NodeId, message: KeyVersionNegotiationMessage) -> Result<(), Error> {
self.cluster.send(node, Message::KeyVersionNegotiation(message))
}
@ -514,20 +590,28 @@ impl SessionResultComputer for LargestSupportResultComputer {
mod tests {
use std::sync::Arc;
use std::collections::{VecDeque, BTreeMap, BTreeSet};
use key_server_cluster::{NodeId, SessionId, Error, KeyStorage, DummyKeyStorage, DocumentKeyShare, DocumentKeyShareVersion};
use ethkey::public_to_address;
use key_server_cluster::{NodeId, SessionId, Error, KeyStorage, DummyKeyStorage,
DocumentKeyShare, DocumentKeyShareVersion};
use key_server_cluster::math;
use key_server_cluster::cluster::Cluster;
use key_server_cluster::cluster::tests::DummyCluster;
use key_server_cluster::cluster_sessions::ClusterSession;
use key_server_cluster::admin_sessions::ShareChangeSessionMeta;
use key_server_cluster::decryption_session::create_default_decryption_session;
use key_server_cluster::message::{Message, KeyVersionNegotiationMessage, RequestKeyVersions, KeyVersions};
use super::{SessionImpl, SessionTransport, SessionParams, FastestResultComputer, LargestSupportResultComputer,
SessionResultComputer, SessionState};
SessionResultComputer, SessionState, ContinueAction, FailedContinueAction};
struct DummyTransport {
cluster: Arc<DummyCluster>,
}
impl SessionTransport for DummyTransport {
fn broadcast(&self, message: KeyVersionNegotiationMessage) -> Result<(), Error> {
self.cluster.broadcast(Message::KeyVersionNegotiation(message))
}
fn send(&self, node: &NodeId, message: KeyVersionNegotiationMessage) -> Result<(), Error> {
self.cluster.send(node, Message::KeyVersionNegotiation(message))
}
@ -600,6 +684,27 @@ mod tests {
pub fn session(&self, idx: usize) -> &SessionImpl<DummyTransport> {
&self.nodes.values().nth(idx).unwrap().session
}
pub fn take_message(&mut self) -> Option<(NodeId, NodeId, Message)> {
self.nodes.values()
.filter_map(|n| n.cluster.take_message().map(|m| (n.session.meta().self_node_id.clone(), m.0, m.1)))
.nth(0)
.or_else(|| self.queue.pop_front())
}
pub fn process_message(&mut self, msg: (NodeId, NodeId, Message)) -> Result<(), Error> {
match msg.2 {
Message::KeyVersionNegotiation(message) =>
self.nodes[&msg.1].session.process_message(&msg.0, &message),
_ => panic!("unexpected"),
}
}
pub fn run(&mut self) {
while let Some((from, to, message)) = self.take_message() {
self.process_message((from, to, message)).unwrap();
}
}
}
#[test]
@ -739,6 +844,9 @@ mod tests {
ml.session(0).initialize(ml.nodes.keys().cloned().collect()).unwrap();
// we can't be sure that node has given key version because previous ShareAdd session could fail
assert!(ml.session(0).data.lock().state != SessionState::Finished);
// check that upon completion, threshold is known
assert_eq!(ml.session(0).key_threshold(), Ok(1));
}
#[test]
@ -757,4 +865,30 @@ mod tests {
let computer = LargestSupportResultComputer;
assert_eq!(computer.compute_result(Some(10), &Default::default(), &Default::default()), Some(Err(Error::ServerKeyIsNotFound)));
}
#[test]
fn fatal_error_is_not_broadcasted_if_started_without_origin() {
let mut ml = MessageLoop::empty(3);
ml.session(0).set_continue_action(ContinueAction::Decrypt(create_default_decryption_session(), None, false, false));
ml.session(0).initialize(ml.nodes.keys().cloned().collect()).unwrap();
ml.run();
assert!(ml.nodes.values().all(|n| n.session.is_finished() &&
n.session.take_failed_continue_action().is_none()));
}
#[test]
fn fatal_error_is_broadcasted_if_started_with_origin() {
let mut ml = MessageLoop::empty(3);
ml.session(0).set_continue_action(ContinueAction::Decrypt(create_default_decryption_session(), Some(1.into()), true, true));
ml.session(0).initialize(ml.nodes.keys().cloned().collect()).unwrap();
ml.run();
// on all nodes session is completed
assert!(ml.nodes.values().all(|n| n.session.is_finished()));
// slave nodes have non-empty failed continue action
assert!(ml.nodes.values().skip(1).all(|n| n.session.take_failed_continue_action()
== Some(FailedContinueAction::Decrypt(Some(1.into()), public_to_address(&2.into())))));
}
}

View File

@ -249,10 +249,16 @@ impl SessionImpl {
})?;
consensus_session.initialize(self.core.all_nodes_set.clone())?;
debug_assert!(consensus_session.state() != ConsensusSessionState::ConsensusEstablished);
let is_finished = consensus_session.state() == ConsensusSessionState::ConsensusEstablished;
data.consensus_session = Some(consensus_session);
data.new_nodes_set = Some(new_nodes_set);
// this is the case when all other nodes are isolated
if is_finished {
Self::complete_session(&self.core, &mut *data)?;
}
Ok(())
}
@ -483,6 +489,7 @@ impl SessionImpl {
false => {
let master_plan = ShareChangeSessionPlan {
key_version: message.version.clone().into(),
version_holders: message.version_holders.iter().cloned().map(Into::into).collect(),
consensus_group: message.consensus_group.iter().cloned().map(Into::into).collect(),
new_nodes_map: message.new_nodes_map.iter().map(|(k, v)| (k.clone().into(), v.clone().map(Into::into))).collect(),
};
@ -496,22 +503,20 @@ impl SessionImpl {
let master_node_id = message.master_node_id.clone().into();
if let Some(key_share) = self.core.key_storage.get(&key_id)? {
let version = message.version.clone().into();
if let Ok(key_version) = key_share.version(&version) {
let key_share_owners = key_version.id_numbers.keys().cloned().collect();
let new_nodes_set = data.new_nodes_set.as_ref()
.expect("new_nodes_set is filled during consensus establishing; change sessions are running after this; qed");
let local_plan = prepare_share_change_session_plan(
&self.core.all_nodes_set,
key_share.threshold,
&key_id,
version,
&master_node_id,
&key_share_owners,
new_nodes_set)?;
let key_share_owners = message.version_holders.iter().cloned().map(Into::into).collect();
let new_nodes_set = data.new_nodes_set.as_ref()
.expect("new_nodes_set is filled during consensus establishing; change sessions are running after this; qed");
let local_plan = prepare_share_change_session_plan(
&self.core.all_nodes_set,
key_share.threshold,
&key_id,
version,
&master_node_id,
&key_share_owners,
new_nodes_set)?;
if local_plan.new_nodes_map.keys().collect::<BTreeSet<_>>() != master_plan.new_nodes_map.keys().collect::<BTreeSet<_>>() {
return Err(Error::InvalidMessage);
}
if local_plan.new_nodes_map.keys().collect::<BTreeSet<_>>() != master_plan.new_nodes_map.keys().collect::<BTreeSet<_>>() {
return Err(Error::InvalidMessage);
}
}
@ -791,7 +796,9 @@ impl SessionImpl {
// get selected version && old nodes set from key negotiation session
let negotiation_session = data.negotiation_sessions.remove(&key_id)
.expect("share change session is only initialized when negotiation is completed; qed");
let (selected_version, selected_master) = negotiation_session.wait()?;
let (selected_version, selected_master) = negotiation_session
.wait()?
.expect("initialize_share_change_session is only called on share change master; negotiation session completes with some on master; qed");
let selected_version_holders = negotiation_session.version_holders(&selected_version)?;
let selected_version_threshold = negotiation_session.key_threshold()?;
@ -818,6 +825,7 @@ impl SessionImpl {
session_nonce: core.nonce,
key_id: key_id.clone().into(),
version: selected_version.into(),
version_holders: old_nodes_set.iter().cloned().map(Into::into).collect(),
master_node_id: selected_master.clone().into(),
consensus_group: session_plan.consensus_group.iter().cloned().map(Into::into).collect(),
new_nodes_map: session_plan.new_nodes_map.iter()
@ -942,7 +950,8 @@ impl ClusterSession for SessionImpl {
let mut data = self.data.lock();
warn!("{}: servers set change session failed: {} on {}", self.core.meta.self_node_id, error, node);
warn!(target: "secretstore_net", "{}: servers set change session failed: {} on {}",
self.core.meta.self_node_id, error, node);
data.state = SessionState::Finished;
data.result = Some(Err(error));
@ -1007,6 +1016,14 @@ impl JobTransport for UnknownSessionsJobTransport {
}
impl KeyVersionNegotiationTransport for ServersSetChangeKeyVersionNegotiationTransport {
fn broadcast(&self, message: KeyVersionNegotiationMessage) -> Result<(), Error> {
self.cluster.broadcast(Message::ServersSetChange(ServersSetChangeMessage::ShareChangeKeyVersionNegotiation(ShareChangeKeyVersionNegotiation {
session: self.id.clone().into(),
session_nonce: self.nonce,
message: message,
})))
}
fn send(&self, node: &NodeId, message: KeyVersionNegotiationMessage) -> Result<(), Error> {
self.cluster.send(node, Message::ServersSetChange(ServersSetChangeMessage::ShareChangeKeyVersionNegotiation(ShareChangeKeyVersionNegotiation {
session: self.id.clone().into(),
@ -1343,4 +1360,48 @@ pub mod tests {
// check that all sessions have finished
assert!(ml.nodes.iter().filter(|&(k, _)| !nodes_to_remove.contains(k)).all(|(_, n)| n.session.is_finished()));
}
#[test]
fn removing_node_from_cluster_of_2_works() {
// initial 2-of-2 session
let gml = generate_key(1, generate_nodes_ids(2));
let master_node_id = gml.nodes.keys().cloned().nth(0).unwrap();
// make 2nd node isolated so that key becomes irrecoverable (make sure the session is completed, even though key is irrecoverable)
let nodes_to_isolate: BTreeSet<_> = gml.nodes.keys().cloned().skip(1).take(1).collect();
let new_nodes_set: BTreeSet<_> = gml.nodes.keys().cloned().filter(|n| !nodes_to_isolate.contains(&n)).collect();
let mut ml = MessageLoop::new(&gml, master_node_id, None, BTreeSet::new(), BTreeSet::new(), nodes_to_isolate.clone());
ml.nodes[&master_node_id].session.initialize(new_nodes_set, ml.all_set_signature.clone(), ml.new_set_signature.clone()).unwrap();
ml.run();
// check that session on master node has completed (session on 2nd node is not even started in network mode)
assert!(ml.nodes.values().take(1).all(|n| n.session.is_finished()));
}
#[test]
fn adding_node_that_has_lost_its_database_works() {
// initial 2-of-2 session
let gml = generate_key(1, generate_nodes_ids(2));
let master_node_id = gml.nodes.keys().cloned().nth(0).unwrap();
// insert 1 node so that it becames 2-of-3 session
let nodes_to_add: BTreeSet<_> = (0..1).map(|_| Random.generate().unwrap().public().clone()).collect();
let mut ml = MessageLoop::new(&gml, master_node_id, None, nodes_to_add.clone(), BTreeSet::new(), BTreeSet::new());
ml.nodes[&master_node_id].session.initialize(ml.nodes.keys().cloned().collect(), ml.all_set_signature.clone(), ml.new_set_signature.clone()).unwrap();
ml.run();
// now let's say new node has lost its db and we're trying to join it again
ml.nodes[nodes_to_add.iter().nth(0).unwrap()].key_storage.clear().unwrap();
// this time old nodes have version, where new node is mentioned, but it doesn't report it when negotiating
let mut ml = MessageLoop::new(&gml, master_node_id, None, nodes_to_add, BTreeSet::new(), BTreeSet::new());
ml.nodes[&master_node_id].session.initialize(ml.nodes.keys().cloned().collect(), ml.all_set_signature.clone(), ml.new_set_signature.clone()).unwrap();
ml.run();
// try to recover secret for every possible combination of nodes && check that secret is the same
check_secret_is_preserved(ml.original_key_pair.clone(), ml.nodes.iter().map(|(k, v)| (k.clone(), v.key_storage.clone())).collect());
// check that all sessions have finished
assert!(ml.nodes.values().all(|n| n.session.is_finished()));
}
}

View File

@ -39,7 +39,7 @@ pub trait SessionTransport: Clone + JobTransport<PartialJobRequest=ServersSetCha
/// Send message to given node.
fn send(&self, node: &NodeId, message: ShareAddMessage) -> Result<(), Error>;
/// Set data for master node (sent to slave nodes in consensus session initialization message).
fn set_master_data(&mut self, consensus_group: BTreeSet<NodeId>, id_numbers: BTreeMap<NodeId, Option<Secret>>);
fn set_master_data(&mut self, consensus_group: BTreeSet<NodeId>, version_holders: BTreeSet<NodeId>, id_numbers: BTreeMap<NodeId, Option<Secret>>);
}
/// Share addition session.
@ -86,6 +86,8 @@ struct SessionData<T: SessionTransport> {
pub version: Option<H256>,
/// Consensus session.
pub consensus_session: Option<ShareAddChangeConsensusSession<T>>,
/// Holders of key version.
pub version_holders: Option<BTreeSet<NodeId>>,
/// NewKeyShare (for nodes being added).
pub new_key_share: Option<NewKeyShare>,
/// Nodes id numbers.
@ -144,6 +146,8 @@ pub struct IsolatedSessionTransport {
version: Option<H256>,
/// Session-level nonce.
nonce: u64,
/// Holders of key version.
version_holders: Option<BTreeSet<NodeId>>,
/// Consensus group.
consensus_group: Option<BTreeSet<NodeId>>,
/// Id numbers of all new nodes.
@ -171,6 +175,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
state: SessionState::ConsensusEstablishing,
version: None,
consensus_session: None,
version_holders: None,
new_key_share: None,
id_numbers: None,
secret_subshares: None,
@ -180,7 +185,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
}
/// Set pre-established consensus data.
pub fn set_consensus_output(&self, version: &H256, consensus_group: BTreeSet<NodeId>, mut new_nodes_map: BTreeMap<NodeId, Option<Secret>>) -> Result<(), Error> {
pub fn set_consensus_output(&self, version: &H256, consensus_group: BTreeSet<NodeId>, version_holders: BTreeSet<NodeId>, mut new_nodes_map: BTreeMap<NodeId, Option<Secret>>) -> Result<(), Error> {
let mut data = self.data.lock();
// check state
@ -191,18 +196,30 @@ impl<T> SessionImpl<T> where T: SessionTransport {
// key share version is required on ShareAdd master node
if let Some(key_share) = self.core.key_share.as_ref() {
if let Ok(key_version) = key_share.version(version) {
let non_isolated_nodes = self.core.transport.nodes();
for (node, id_number) in &key_version.id_numbers {
{
let external_id_number = new_nodes_map.get(node);
match external_id_number {
Some(&Some(ref external_id_number)) => {
if !version_holders.contains(node) {
// possible when joining version holder, that has lost its database
// and haven't reported version ownership
continue;
}
if external_id_number == id_number {
continue;
}
return Err(Error::ConsensusUnreachable);
},
Some(&None) => (),
None => return Err(Error::ConsensusUnreachable),
None => {
if non_isolated_nodes.contains(node) {
return Err(Error::ConsensusUnreachable)
}
continue;
},
}
}
@ -217,7 +234,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
}
// check passed consensus data
Self::check_nodes_map(&self.core, version, &consensus_group, &new_nodes_map)?;
Self::check_nodes_map(&self.core, version, &consensus_group, &version_holders, &new_nodes_map)?;
// update data
data.version = Some(version.clone());
@ -225,6 +242,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
data.secret_subshares = Some(consensus_group.into_iter()
.map(|n| (n, None))
.collect());
data.version_holders = Some(version_holders);
Ok(())
}
@ -281,13 +299,14 @@ impl<T> SessionImpl<T> where T: SessionTransport {
.take(key_share.threshold)
.cloned())
.collect();
let version_holders = &old_nodes_set;
// now check nodes map
Self::check_nodes_map(&self.core, &version, &consensus_group, &new_nodes_map)?;
Self::check_nodes_map(&self.core, &version, &consensus_group, version_holders, &new_nodes_map)?;
// prepare consensus session transport
let mut consensus_transport = self.core.transport.clone();
consensus_transport.set_master_data(consensus_group.clone(), new_nodes_map.clone());
consensus_transport.set_master_data(consensus_group.clone(), version_holders.clone(), new_nodes_map.clone());
// create && initialize consensus session
let mut consensus_session = ConsensusSession::new(ConsensusSessionParams {
@ -306,6 +325,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
data.consensus_session = Some(consensus_session);
data.id_numbers = Some(new_nodes_map);
data.secret_subshares = Some(consensus_group.into_iter().map(|n| (n, None)).collect());
data.version_holders = Some(version_holders.clone());
Ok(())
}
@ -351,16 +371,17 @@ impl<T> SessionImpl<T> where T: SessionTransport {
};
// process consensus message
let (is_establishing_consensus, is_consensus_established, version, new_nodes_map, consensus_group) = {
let (is_establishing_consensus, is_consensus_established, version, new_nodes_map, consensus_group, version_holders) = {
let consensus_session = data.consensus_session.as_mut().ok_or(Error::InvalidMessage)?;
let is_establishing_consensus = consensus_session.state() == ConsensusSessionState::EstablishingConsensus;
let (version, new_nodes_map, consensus_group) = match &message.message {
let (version, new_nodes_map, consensus_group, version_holders) = match &message.message {
&ConsensusMessageOfShareAdd::InitializeConsensusSession(ref message) => {
consensus_session.on_consensus_partial_request(sender, ServersSetChangeAccessRequest::from(message))?;
let version = message.version.clone().into();
let consensus_group = message.consensus_group.iter().cloned().map(Into::into).collect();
let version_holders = message.version_holders.iter().cloned().map(Into::into).collect();
let new_nodes_map: BTreeMap<_, _> = message.new_nodes_map.iter()
.map(|(n, nn)| (n.clone().into(), Some(nn.clone().into())))
.collect();
@ -371,13 +392,13 @@ impl<T> SessionImpl<T> where T: SessionTransport {
}
// check old set of nodes
Self::check_nodes_map(&self.core, &version, &consensus_group, &new_nodes_map)?;
Self::check_nodes_map(&self.core, &version, &consensus_group, &version_holders, &new_nodes_map)?;
(Some(version), Some(new_nodes_map), Some(consensus_group))
(Some(version), Some(new_nodes_map), Some(consensus_group), Some(version_holders))
},
&ConsensusMessageOfShareAdd::ConfirmConsensusInitialization(ref message) => {
consensus_session.on_consensus_partial_response(sender, message.is_confirmed)?;
(None, None, None)
(None, None, None, None)
},
};
@ -387,6 +408,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
version,
new_nodes_map,
consensus_group,
version_holders,
)
};
@ -400,6 +422,9 @@ impl<T> SessionImpl<T> where T: SessionTransport {
if let Some(consensus_group) = consensus_group {
data.secret_subshares = Some(consensus_group.into_iter().map(|n| (n, None)).collect());
}
if let Some(version_holders) = version_holders {
data.version_holders = Some(version_holders);
}
// if consensus is stablished, proceed
if !is_establishing_consensus || !is_consensus_established || self.core.meta.self_node_id != self.core.meta.master_node_id {
@ -451,7 +476,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
});
let id_numbers = data.id_numbers.as_mut()
.expect("common key share data is expected after initialization; id_numers are filled during initialization; qed");
.expect("common key share data is expected after initialization; id_numbers are filled during initialization; qed");
for (node, id_number) in &message.id_numbers {
let id_number: Secret = id_number.clone().into();
{
@ -519,7 +544,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
}
/// Check nodes map.
fn check_nodes_map(core: &SessionCore<T>, version: &H256, consensus_group: &BTreeSet<NodeId>, new_nodes_map: &BTreeMap<NodeId, Option<Secret>>) -> Result<(), Error> {
fn check_nodes_map(core: &SessionCore<T>, version: &H256, consensus_group: &BTreeSet<NodeId>, version_holders: &BTreeSet<NodeId>, new_nodes_map: &BTreeMap<NodeId, Option<Secret>>) -> Result<(), Error> {
// check if this node has given version
let has_this_version = match core.key_share.as_ref() {
Some(key_share) => key_share.version(version).is_ok(),
@ -546,7 +571,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
}
// there must be at least one new node in new_nodes_map
if key_version.id_numbers.len() >= new_nodes_map.len() {
if key_version.id_numbers.keys().filter(|n| non_isolated_nodes.contains(n) && version_holders.contains(n)).count() >= new_nodes_map.len() {
return Err(Error::ConsensusUnreachable);
}
},
@ -607,6 +632,8 @@ impl<T> SessionImpl<T> where T: SessionTransport {
let explanation = "disseminate_common_share_data is only called on master node; master node has specified version of the key; qed";
let old_key_share = core.key_share.as_ref().expect(explanation);
let old_key_version = old_key_share.version(data.version.as_ref().expect(explanation)).expect(explanation);
let version_holders = data.version_holders.as_ref()
.expect("disseminate_common_share_data is only called on master node; version holders is created during initialization on master node; qed");
let consensus_group = data.secret_subshares.as_ref()
.expect("disseminate_common_share_data is only called on master node; consensus group is created during initialization on master node; qed");
let nodes = data.id_numbers.as_ref()
@ -622,7 +649,9 @@ impl<T> SessionImpl<T> where T: SessionTransport {
joint_public: old_key_share.public.clone().into(),
common_point: old_key_share.common_point.clone().map(Into::into),
encrypted_point: old_key_share.encrypted_point.clone().map(Into::into),
id_numbers: old_key_version.id_numbers.iter().map(|(k, v)| (k.clone().into(), v.clone().into())).collect(),
id_numbers: old_key_version.id_numbers.iter()
.filter(|&(k, _)| version_holders.contains(k))
.map(|(k, v)| (k.clone().into(), v.clone().into())).collect(),
}))?;
}
@ -765,7 +794,8 @@ impl<T> ClusterSession for SessionImpl<T> where T: SessionTransport {
let mut data = self.data.lock();
warn!("{}: share add session failed: {} on {}", self.core.meta.self_node_id, error, node);
warn!(target: "secretstore_net", "{}: share add session failed: {} on {}",
self.core.meta.self_node_id, error, node);
data.state = SessionState::Finished;
data.result = Some(Err(error));
@ -788,6 +818,7 @@ impl IsolatedSessionTransport {
nonce: nonce,
cluster: cluster,
id_numbers: None,
version_holders: None,
consensus_group: None,
}
}
@ -806,6 +837,7 @@ impl JobTransport for IsolatedSessionTransport {
session_nonce: self.nonce,
message: ConsensusMessageOfShareAdd::InitializeConsensusSession(InitializeConsensusSessionOfShareAdd {
version: self.version.clone().expect(explanation).into(),
version_holders: self.version_holders.as_ref().expect(explanation).iter().cloned().map(Into::into).collect(),
consensus_group: self.consensus_group.as_ref().expect(explanation).iter().cloned().map(Into::into).collect(),
old_nodes_set: request.old_servers_set.into_iter().map(Into::into).collect(),
new_nodes_map: request.new_servers_set.into_iter()
@ -836,7 +868,8 @@ impl SessionTransport for IsolatedSessionTransport {
self.cluster.nodes()
}
fn set_master_data(&mut self, consensus_group: BTreeSet<NodeId>, id_numbers: BTreeMap<NodeId, Option<Secret>>) {
fn set_master_data(&mut self, consensus_group: BTreeSet<NodeId>, version_holders: BTreeSet<NodeId>, id_numbers: BTreeMap<NodeId, Option<Secret>>) {
self.version_holders = Some(version_holders);
self.consensus_group = Some(consensus_group);
self.id_numbers = Some(id_numbers);
}

View File

@ -48,6 +48,8 @@ pub struct ShareChangeSession {
key_storage: Arc<KeyStorage>,
/// Key version.
key_version: H256,
/// Nodes that have reported version ownership.
version_holders: Option<BTreeSet<NodeId>>,
/// Consensus group to use in ShareAdd session.
consensus_group: Option<BTreeSet<NodeId>>,
/// Nodes to add shares for.
@ -63,6 +65,8 @@ pub struct ShareChangeSession {
pub struct ShareChangeSessionPlan {
/// Key version that plan is valid for.
pub key_version: H256,
/// Nodes that have reported version ownership.
pub version_holders: BTreeSet<NodeId>,
/// Consensus group to use in ShareAdd session.
pub consensus_group: BTreeSet<NodeId>,
/// Nodes to add shares for.
@ -102,6 +106,7 @@ impl ShareChangeSession {
// we can't create sessions right now, because key share is read when session is created, but it can change in previous session
let key_version = params.plan.key_version;
let consensus_group = if !params.plan.consensus_group.is_empty() { Some(params.plan.consensus_group) } else { None };
let version_holders = if !params.plan.version_holders.is_empty() { Some(params.plan.version_holders) } else { None };
let new_nodes_map = if !params.plan.new_nodes_map.is_empty() { Some(params.plan.new_nodes_map) } else { None };
debug_assert!(new_nodes_map.is_some());
@ -113,6 +118,7 @@ impl ShareChangeSession {
cluster: params.cluster,
key_storage: params.key_storage,
key_version: key_version,
version_holders: version_holders,
consensus_group: consensus_group,
new_nodes_map: new_nodes_map,
share_add_session: None,
@ -158,6 +164,7 @@ impl ShareChangeSession {
/// Create new share add session.
fn create_share_add_session(&mut self) -> Result<(), Error> {
let consensus_group = self.consensus_group.take().ok_or(Error::InvalidStateForRequest)?;
let version_holders = self.version_holders.take().ok_or(Error::InvalidStateForRequest)?;
let new_nodes_map = self.new_nodes_map.take().ok_or(Error::InvalidStateForRequest)?;
let share_add_session = ShareAddSessionImpl::new(ShareAddSessionParams {
meta: self.meta.clone(),
@ -166,7 +173,7 @@ impl ShareChangeSession {
key_storage: self.key_storage.clone(),
admin_public: None,
})?;
share_add_session.set_consensus_output(&self.key_version, consensus_group, new_nodes_map)?;
share_add_session.set_consensus_output(&self.key_version, consensus_group, version_holders, new_nodes_map)?;
self.share_add_session = Some(share_add_session);
Ok(())
}
@ -221,7 +228,7 @@ impl ShareAddSessionTransport for ShareChangeTransport {
self.cluster.nodes()
}
fn set_master_data(&mut self, _consensus_group: BTreeSet<NodeId>, _id_numbers: BTreeMap<NodeId, Option<Secret>>) {
fn set_master_data(&mut self, _consensus_group: BTreeSet<NodeId>, _version_holders: BTreeSet<NodeId>, _id_numbers: BTreeMap<NodeId, Option<Secret>>) {
unreachable!("only called when establishing consensus; this transport is never used for establishing consensus; qed")
}
@ -242,6 +249,7 @@ pub fn prepare_share_change_session_plan(cluster_nodes: &BTreeSet<NodeId>, thres
key_id, threshold, old_key_version_owners.len());
return Ok(ShareChangeSessionPlan {
key_version: key_version,
version_holders: Default::default(),
consensus_group: Default::default(),
new_nodes_map: Default::default(),
});
@ -279,6 +287,7 @@ pub fn prepare_share_change_session_plan(cluster_nodes: &BTreeSet<NodeId>, thres
Ok(ShareChangeSessionPlan {
key_version: key_version,
version_holders: old_key_version_owners.clone(),
consensus_group: consensus_group,
new_nodes_map: new_nodes_map,
})

View File

@ -812,6 +812,28 @@ impl JobTransport for DecryptionJobTransport {
}
}
#[cfg(test)]
pub fn create_default_decryption_session() -> Arc<SessionImpl> {
use acl_storage::DummyAclStorage;
use key_server_cluster::cluster::tests::DummyCluster;
Arc::new(SessionImpl::new(SessionParams {
meta: SessionMeta {
id: Default::default(),
self_node_id: Default::default(),
master_node_id: Default::default(),
threshold: 0,
configured_nodes_count: 0,
connected_nodes_count: 0,
},
access_key: Secret::zero(),
key_share: Default::default(),
acl_storage: Arc::new(DummyAclStorage::default()),
cluster: Arc::new(DummyCluster::new(Default::default())),
nonce: 0,
}, Some(Requester::Public(2.into()))).unwrap())
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@ -84,6 +84,8 @@ pub trait ClusterClient: Send + Sync {
fn add_generation_listener(&self, listener: Arc<ClusterSessionsListener<GenerationSession>>);
/// Listen for new decryption sessions.
fn add_decryption_listener(&self, listener: Arc<ClusterSessionsListener<DecryptionSession>>);
/// Listen for new key version negotiation sessions.
fn add_key_version_negotiation_listener(&self, listener: Arc<ClusterSessionsListener<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>>);
/// Ask node to make 'faulty' generation sessions.
#[cfg(test)]
@ -198,9 +200,10 @@ pub struct ClusterConnections {
pub data: RwLock<ClusterConnectionsData>,
}
#[derive(Default)]
/// Cluster connections data.
pub struct ClusterConnectionsData {
/// Is this node isolated from cluster?
pub is_isolated: bool,
/// Active key servers set.
pub nodes: BTreeMap<Public, SocketAddr>,
/// Active connections to key servers.
@ -384,6 +387,9 @@ impl ClusterCore {
for connection in data.connections.active_connections() {
let last_message_diff = Instant::now() - connection.last_message_time();
if last_message_diff > KEEP_ALIVE_DISCONNECT_INTERVAL {
warn!(target: "secretstore_net", "{}: keep alive timeout for node {}",
data.self_key_pair.public(), connection.node_id());
data.connections.remove(data.clone(), connection.node_id(), connection.is_inbound());
data.sessions.on_connection_timeout(connection.node_id());
}
@ -484,7 +490,7 @@ impl ClusterCore {
if is_master_node && session.is_finished() {
data.sessions.negotiation_sessions.remove(&session.id());
match session.wait() {
Ok((version, master)) => match session.take_continue_action() {
Ok(Some((version, master))) => match session.take_continue_action() {
Some(ContinueAction::Decrypt(session, origin, is_shadow_decryption, is_broadcast_decryption)) => {
let initialization_error = if data.self_key_pair.public() == &master {
session.initialize(origin, version, is_shadow_decryption, is_broadcast_decryption)
@ -523,18 +529,19 @@ impl ClusterCore {
},
None => (),
},
Ok(None) => unreachable!("is_master_node; session is finished; negotiation version always finished with result on master; qed"),
Err(error) => match session.take_continue_action() {
Some(ContinueAction::Decrypt(session, _, _, _)) => {
data.sessions.decryption_sessions.remove(&session.id());
session.on_session_error(&meta.self_node_id, error);
data.sessions.decryption_sessions.remove(&session.id());
},
Some(ContinueAction::SchnorrSign(session, _)) => {
data.sessions.schnorr_signing_sessions.remove(&session.id());
session.on_session_error(&meta.self_node_id, error);
data.sessions.schnorr_signing_sessions.remove(&session.id());
},
Some(ContinueAction::EcdsaSign(session, _)) => {
data.sessions.ecdsa_signing_sessions.remove(&session.id());
session.on_session_error(&meta.self_node_id, error);
data.sessions.ecdsa_signing_sessions.remove(&session.id());
},
None => (),
},
@ -653,7 +660,7 @@ impl ClusterCore {
impl ClusterConnections {
pub fn new(config: &ClusterConfiguration) -> Result<Self, Error> {
let mut nodes = config.key_server_set.snapshot().current_set;
nodes.remove(config.self_key_pair.public());
let is_isolated = nodes.remove(config.self_key_pair.public()).is_none();
let trigger: Box<ConnectionTrigger> = match config.auto_migrate_enabled {
false => Box::new(SimpleConnectionTrigger::new(config.key_server_set.clone(), config.self_key_pair.clone(), config.admin_public.clone())),
@ -668,6 +675,7 @@ impl ClusterConnections {
trigger: Mutex::new(trigger),
connector: connector,
data: RwLock::new(ClusterConnectionsData {
is_isolated: is_isolated,
nodes: nodes,
connections: BTreeMap::new(),
}),
@ -734,8 +742,13 @@ impl ClusterConnections {
self.maintain_connection_trigger(maintain_action, data);
}
pub fn connected_nodes(&self) -> BTreeSet<NodeId> {
self.data.read().connections.keys().cloned().collect()
pub fn connected_nodes(&self) -> Result<BTreeSet<NodeId>, Error> {
let data = self.data.read();
if data.is_isolated {
return Err(Error::NodeDisconnected);
}
Ok(data.connections.keys().cloned().collect())
}
pub fn active_connections(&self)-> Vec<Arc<Connection>> {
@ -897,7 +910,7 @@ impl ClusterClientImpl {
}
fn create_key_version_negotiation_session(&self, session_id: SessionId) -> Result<Arc<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>, Error> {
let mut connected_nodes = self.data.connections.connected_nodes();
let mut connected_nodes = self.data.connections.connected_nodes()?;
connected_nodes.insert(self.data.self_key_pair.public().clone());
let access_key = Random.generate()?.secret().clone();
@ -934,7 +947,7 @@ impl ClusterClient for ClusterClientImpl {
}
fn new_generation_session(&self, session_id: SessionId, origin: Option<Address>, author: Address, threshold: usize) -> Result<Arc<GenerationSession>, Error> {
let mut connected_nodes = self.data.connections.connected_nodes();
let mut connected_nodes = self.data.connections.connected_nodes()?;
connected_nodes.insert(self.data.self_key_pair.public().clone());
let cluster = create_cluster_view(&self.data, true)?;
@ -945,7 +958,7 @@ impl ClusterClient for ClusterClientImpl {
}
fn new_encryption_session(&self, session_id: SessionId, requester: Requester, common_point: Public, encrypted_point: Public) -> Result<Arc<EncryptionSession>, Error> {
let mut connected_nodes = self.data.connections.connected_nodes();
let mut connected_nodes = self.data.connections.connected_nodes()?;
connected_nodes.insert(self.data.self_key_pair.public().clone());
let cluster = create_cluster_view(&self.data, true)?;
@ -956,7 +969,7 @@ impl ClusterClient for ClusterClientImpl {
}
fn new_decryption_session(&self, session_id: SessionId, origin: Option<Address>, requester: Requester, version: Option<H256>, is_shadow_decryption: bool, is_broadcast_decryption: bool) -> Result<Arc<DecryptionSession>, Error> {
let mut connected_nodes = self.data.connections.connected_nodes();
let mut connected_nodes = self.data.connections.connected_nodes()?;
connected_nodes.insert(self.data.self_key_pair.public().clone());
let access_key = Random.generate()?.secret().clone();
@ -982,7 +995,7 @@ impl ClusterClient for ClusterClientImpl {
}
fn new_schnorr_signing_session(&self, session_id: SessionId, requester: Requester, version: Option<H256>, message_hash: H256) -> Result<Arc<SchnorrSigningSession>, Error> {
let mut connected_nodes = self.data.connections.connected_nodes();
let mut connected_nodes = self.data.connections.connected_nodes()?;
connected_nodes.insert(self.data.self_key_pair.public().clone());
let access_key = Random.generate()?.secret().clone();
@ -1007,7 +1020,7 @@ impl ClusterClient for ClusterClientImpl {
}
fn new_ecdsa_signing_session(&self, session_id: SessionId, requester: Requester, version: Option<H256>, message_hash: H256) -> Result<Arc<EcdsaSigningSession>, Error> {
let mut connected_nodes = self.data.connections.connected_nodes();
let mut connected_nodes = self.data.connections.connected_nodes()?;
connected_nodes.insert(self.data.self_key_pair.public().clone());
let access_key = Random.generate()?.secret().clone();
@ -1037,7 +1050,7 @@ impl ClusterClient for ClusterClientImpl {
}
fn new_servers_set_change_session(&self, session_id: Option<SessionId>, migration_id: Option<H256>, new_nodes_set: BTreeSet<NodeId>, old_set_signature: Signature, new_set_signature: Signature) -> Result<Arc<AdminSession>, Error> {
let mut connected_nodes = self.data.connections.connected_nodes();
let mut connected_nodes = self.data.connections.connected_nodes()?;
connected_nodes.insert(self.data.self_key_pair.public().clone());
let session_id = match session_id {
@ -1069,6 +1082,10 @@ impl ClusterClient for ClusterClientImpl {
self.data.sessions.decryption_sessions.add_listener(listener);
}
fn add_key_version_negotiation_listener(&self, listener: Arc<ClusterSessionsListener<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>>) {
self.data.sessions.negotiation_sessions.add_listener(listener);
}
#[cfg(test)]
fn connect(&self) {
ClusterCore::connect_disconnected_nodes(self.data.clone());
@ -1153,6 +1170,7 @@ pub mod tests {
fn add_generation_listener(&self, _listener: Arc<ClusterSessionsListener<GenerationSession>>) {}
fn add_decryption_listener(&self, _listener: Arc<ClusterSessionsListener<DecryptionSession>>) {}
fn add_key_version_negotiation_listener(&self, _listener: Arc<ClusterSessionsListener<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>>) {}
fn make_faulty_generation_sessions(&self) { unimplemented!("test-only") }
fn generation_session(&self, _session_id: &SessionId) -> Option<Arc<GenerationSession>> { unimplemented!("test-only") }
@ -1249,7 +1267,7 @@ pub mod tests {
threads: 1,
self_key_pair: Arc::new(PlainNodeKeyPair::new(key_pairs[i].clone())),
listen_address: ("127.0.0.1".to_owned(), ports_begin + i as u16),
key_server_set: Arc::new(MapKeyServerSet::new(key_pairs.iter().enumerate()
key_server_set: Arc::new(MapKeyServerSet::new(false, key_pairs.iter().enumerate()
.map(|(j, kp)| (kp.public().clone(), format!("127.0.0.1:{}", ports_begin + j as u16).parse().unwrap()))
.collect())),
allow_connecting_to_higher_nodes: false,

View File

@ -330,10 +330,7 @@ impl<S, SC, D> ClusterSessionsContainer<S, SC, D> where S: ClusterSession, SC: C
}
pub fn remove(&self, session_id: &S::Id) {
if let Some(session) = self.sessions.write().remove(session_id) {
self.container_state.lock().on_session_completed();
self.notify_listeners(|l| l.on_session_removed(session.session.clone()));
}
self.do_remove(session_id, &mut *self.sessions.write());
}
pub fn enqueue_message(&self, session_id: &S::Id, sender: NodeId, message: Message, is_queued_message: bool) {
@ -361,7 +358,7 @@ impl<S, SC, D> ClusterSessionsContainer<S, SC, D> where S: ClusterSession, SC: C
};
if remove_session {
sessions.remove(&sid);
self.do_remove(&sid, &mut *sessions);
}
}
}
@ -374,12 +371,20 @@ impl<S, SC, D> ClusterSessionsContainer<S, SC, D> where S: ClusterSession, SC: C
session.session.on_node_timeout(node_id);
session.session.is_finished()
};
if remove_session {
sessions.remove(&sid);
self.do_remove(&sid, &mut *sessions);
}
}
}
fn do_remove(&self, session_id: &S::Id, sessions: &mut BTreeMap<S::Id, QueuedSession<S>>) {
if let Some(session) = sessions.remove(session_id) {
self.container_state.lock().on_session_completed();
self.notify_listeners(|l| l.on_session_removed(session.session.clone()));
}
}
fn notify_listeners<F: Fn(&ClusterSessionsListener<S>) -> ()>(&self, callback: F) {
let mut listeners = self.listeners.lock();
let mut listener_index = 0;
@ -554,7 +559,7 @@ pub fn create_cluster_view(data: &Arc<ClusterData>, requires_all_connections: bo
}
}
let mut connected_nodes = data.connections.connected_nodes();
let mut connected_nodes = data.connections.connected_nodes()?;
connected_nodes.insert(data.self_key_pair.public().clone());
let connected_nodes_count = connected_nodes.len();
@ -571,7 +576,8 @@ mod tests {
use key_server_cluster::connection_trigger::SimpleServersSetChangeSessionCreatorConnector;
use key_server_cluster::cluster::tests::DummyCluster;
use key_server_cluster::generation_session::{SessionImpl as GenerationSession};
use super::{ClusterSessions, AdminSessionCreationData, ClusterSessionsListener};
use super::{ClusterSessions, AdminSessionCreationData, ClusterSessionsListener,
ClusterSessionsContainerState, SESSION_TIMEOUT_INTERVAL};
pub fn make_cluster_sessions() -> ClusterSessions {
let key_pair = Random.generate().unwrap();
@ -579,7 +585,7 @@ mod tests {
threads: 1,
self_key_pair: Arc::new(PlainNodeKeyPair::new(key_pair.clone())),
listen_address: ("127.0.0.1".to_owned(), 100_u16),
key_server_set: Arc::new(MapKeyServerSet::new(vec![(key_pair.public().clone(), format!("127.0.0.1:{}", 100).parse().unwrap())].into_iter().collect())),
key_server_set: Arc::new(MapKeyServerSet::new(false, vec![(key_pair.public().clone(), format!("127.0.0.1:{}", 100).parse().unwrap())].into_iter().collect())),
allow_connecting_to_higher_nodes: false,
key_storage: Arc::new(DummyKeyStorage::default()),
acl_storage: Arc::new(DummyAclStorage::default()),
@ -644,4 +650,41 @@ mod tests {
assert_eq!(listener.inserted.load(Ordering::Relaxed), 1);
assert_eq!(listener.removed.load(Ordering::Relaxed), 1);
}
#[test]
fn last_session_removal_sets_container_state_to_idle() {
let sessions = make_cluster_sessions();
sessions.generation_sessions.insert(Arc::new(DummyCluster::new(Default::default())), Default::default(), Default::default(), None, false, None).unwrap();
assert_eq!(*sessions.generation_sessions.container_state.lock(), ClusterSessionsContainerState::Active(1));
sessions.generation_sessions.remove(&Default::default());
assert_eq!(*sessions.generation_sessions.container_state.lock(), ClusterSessionsContainerState::Idle);
}
#[test]
fn last_session_removal_by_timeout_sets_container_state_to_idle() {
let sessions = make_cluster_sessions();
sessions.generation_sessions.insert(Arc::new(DummyCluster::new(Default::default())), Default::default(), Default::default(), None, false, None).unwrap();
assert_eq!(*sessions.generation_sessions.container_state.lock(), ClusterSessionsContainerState::Active(1));
sessions.generation_sessions.sessions.write().get_mut(&Default::default()).unwrap().last_message_time -= SESSION_TIMEOUT_INTERVAL * 2;
sessions.generation_sessions.stop_stalled_sessions();
assert_eq!(sessions.generation_sessions.sessions.read().len(), 0);
assert_eq!(*sessions.generation_sessions.container_state.lock(), ClusterSessionsContainerState::Idle);
}
#[test]
fn last_session_removal_by_node_timeout_sets_container_state_to_idle() {
let sessions = make_cluster_sessions();
sessions.generation_sessions.insert(Arc::new(DummyCluster::new(Default::default())), Default::default(), Default::default(), None, false, None).unwrap();
assert_eq!(*sessions.generation_sessions.container_state.lock(), ClusterSessionsContainerState::Active(1));
sessions.generation_sessions.on_connection_timeout(&Default::default());
assert_eq!(sessions.generation_sessions.sessions.read().len(), 0);
assert_eq!(*sessions.generation_sessions.container_state.lock(), ClusterSessionsContainerState::Idle);
}
}

View File

@ -353,6 +353,9 @@ impl ClusterSessionCreator<KeyVersionNegotiationSessionImpl<VersionNegotiationTr
sub_session: sid.access_key.into(),
session_nonce: nonce,
error: err.into(),
// we don't care about continue action here. it only matters when we're completing the session with confirmed
// fatal error from result computer
continue_with: None,
}))
}

View File

@ -161,15 +161,20 @@ impl TriggerConnections {
fn adjust_connections(self_node_id: &NodeId, data: &mut ClusterConnectionsData, required_set: &BTreeMap<NodeId, SocketAddr>) {
if !required_set.contains_key(self_node_id) {
trace!(target: "secretstore_net", "{}: isolated from cluser", self_node_id);
if !data.is_isolated {
trace!(target: "secretstore_net", "{}: isolated from cluser", self_node_id);
}
data.is_isolated = true;
data.connections.clear();
data.nodes.clear();
return;
}
data.is_isolated = false;
for node_to_disconnect in select_nodes_to_disconnect(&data.nodes, required_set) {
if let Entry::Occupied(entry) = data.connections.entry(node_to_disconnect.clone()) {
trace!(target: "secretstore_net", "{}: removing connection to {} at {}",
trace!(target: "secretstore_net", "{}: adjusting connections - removing connection to {} at {}",
self_node_id, entry.get().node_id(), entry.get().node_address());
entry.remove();
}
@ -204,6 +209,14 @@ mod tests {
use super::{Maintain, TriggerConnections, ConnectionsAction, ConnectionTrigger, SimpleConnectionTrigger,
select_nodes_to_disconnect, adjust_connections};
fn default_connection_data() -> ClusterConnectionsData {
ClusterConnectionsData {
is_isolated: false,
nodes: Default::default(),
connections: Default::default(),
}
}
fn create_connections() -> TriggerConnections {
TriggerConnections {
self_key_pair: Arc::new(PlainNodeKeyPair::new(Random.generate().unwrap())),
@ -252,59 +265,64 @@ mod tests {
fn adjust_connections_disconnects_from_all_nodes_if_not_a_part_of_key_server() {
let self_node_id = Random.generate().unwrap().public().clone();
let other_node_id = Random.generate().unwrap().public().clone();
let mut connection_data: ClusterConnectionsData = Default::default();
let mut connection_data = default_connection_data();
connection_data.nodes.insert(other_node_id.clone(), "127.0.0.1:8081".parse().unwrap());
let required_set = connection_data.nodes.clone();
adjust_connections(&self_node_id, &mut connection_data, &required_set);
assert!(connection_data.nodes.is_empty());
assert!(connection_data.is_isolated);
}
#[test]
fn adjust_connections_connects_to_new_nodes() {
let self_node_id = Random.generate().unwrap().public().clone();
let other_node_id = Random.generate().unwrap().public().clone();
let mut connection_data: ClusterConnectionsData = Default::default();
let mut connection_data = default_connection_data();
let required_set = vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap()),
(other_node_id.clone(), "127.0.0.1:8082".parse().unwrap())].into_iter().collect();
adjust_connections(&self_node_id, &mut connection_data, &required_set);
assert!(connection_data.nodes.contains_key(&other_node_id));
assert!(!connection_data.is_isolated);
}
#[test]
fn adjust_connections_reconnects_from_changed_nodes() {
let self_node_id = Random.generate().unwrap().public().clone();
let other_node_id = Random.generate().unwrap().public().clone();
let mut connection_data: ClusterConnectionsData = Default::default();
let mut connection_data = default_connection_data();
connection_data.nodes.insert(other_node_id.clone(), "127.0.0.1:8082".parse().unwrap());
let required_set = vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap()),
(other_node_id.clone(), "127.0.0.1:8083".parse().unwrap())].into_iter().collect();
adjust_connections(&self_node_id, &mut connection_data, &required_set);
assert_eq!(connection_data.nodes.get(&other_node_id), Some(&"127.0.0.1:8083".parse().unwrap()));
assert!(!connection_data.is_isolated);
}
#[test]
fn adjust_connections_disconnects_from_removed_nodes() {
let self_node_id = Random.generate().unwrap().public().clone();
let other_node_id = Random.generate().unwrap().public().clone();
let mut connection_data: ClusterConnectionsData = Default::default();
let mut connection_data = default_connection_data();
connection_data.nodes.insert(other_node_id.clone(), "127.0.0.1:8082".parse().unwrap());
let required_set = vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect();
adjust_connections(&self_node_id, &mut connection_data, &required_set);
assert!(connection_data.nodes.is_empty());
assert!(!connection_data.is_isolated);
}
#[test]
fn adjust_connections_does_not_connects_to_self() {
let self_node_id = Random.generate().unwrap().public().clone();
let mut connection_data: ClusterConnectionsData = Default::default();
let mut connection_data = default_connection_data();
let required_set = vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap())].into_iter().collect();
adjust_connections(&self_node_id, &mut connection_data, &required_set);
assert!(connection_data.nodes.is_empty());
assert!(!connection_data.is_isolated);
}
#[test]
@ -315,7 +333,7 @@ mod tests {
let migration_node_id = Random.generate().unwrap().public().clone();
let new_node_id = Random.generate().unwrap().public().clone();
let mut connections_data: ClusterConnectionsData = Default::default();
let mut connections_data = default_connection_data();
connections.maintain(ConnectionsAction::ConnectToCurrentSet, &mut connections_data, &KeyServerSetSnapshot {
current_set: vec![(self_node_id.clone(), "127.0.0.1:8081".parse().unwrap()),
(current_node_id.clone(), "127.0.0.1:8082".parse().unwrap())].into_iter().collect(),
@ -337,7 +355,7 @@ mod tests {
let migration_node_id = Random.generate().unwrap().public().clone();
let new_node_id = Random.generate().unwrap().public().clone();
let mut connections_data: ClusterConnectionsData = Default::default();
let mut connections_data = default_connection_data();
connections.maintain(ConnectionsAction::ConnectToMigrationSet, &mut connections_data, &KeyServerSetSnapshot {
current_set: vec![(current_node_id.clone(), "127.0.0.1:8082".parse().unwrap())].into_iter().collect(),
new_set: vec![(new_node_id.clone(), "127.0.0.1:8083".parse().unwrap())].into_iter().collect(),
@ -354,7 +372,7 @@ mod tests {
#[test]
fn simple_connections_trigger_only_maintains_connections() {
let key_server_set = Arc::new(MapKeyServerSet::new(Default::default()));
let key_server_set = Arc::new(MapKeyServerSet::new(false, Default::default()));
let self_key_pair = Arc::new(PlainNodeKeyPair::new(Random.generate().unwrap()));
let mut trigger = SimpleConnectionTrigger::new(key_server_set, self_key_pair, None);
assert_eq!(trigger.on_maintain(), Some(Maintain::Connections));

View File

@ -79,7 +79,6 @@ impl JobExecutor for KeyAccessJob {
self.requester = Some(partial_request.clone());
self.acl_storage.check(partial_request.address(&self.id).map_err(Error::InsufficientRequesterData)?, &self.id)
.map_err(|_| Error::AccessDenied)
.map(|is_confirmed| if is_confirmed { JobPartialRequestAction::Respond(true) } else { JobPartialRequestAction::Reject(false) })
}

View File

@ -429,6 +429,8 @@ pub struct InitializeConsensusSessionWithServersSet {
pub struct InitializeConsensusSessionOfShareAdd {
/// Key version.
pub version: SerializableH256,
/// Nodes that have reported version ownership.
pub version_holders: BTreeSet<MessageNodeId>,
/// threshold+1 nodes from old_nodes_set selected for shares redistribution.
pub consensus_group: BTreeSet<MessageNodeId>,
/// Old nodes set: all non-isolated owners of selected key share version.
@ -876,6 +878,8 @@ pub struct InitializeShareChangeSession {
pub key_id: MessageSessionId,
/// Key vesion to use in ShareAdd session.
pub version: SerializableH256,
/// Nodes that have confirmed version ownership.
pub version_holders: BTreeSet<MessageNodeId>,
/// Master node.
pub master_node_id: MessageNodeId,
/// Consensus group to use in ShareAdd session.
@ -1039,6 +1043,16 @@ pub struct KeyVersionsError {
pub session_nonce: u64,
/// Error message.
pub error: Error,
/// Continue action from failed node (if any). This field is oly filled
/// when error has occured when trying to compute result on master node.
pub continue_with: Option<FailedKeyVersionContinueAction>,
}
/// Key version continue action from failed node.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum FailedKeyVersionContinueAction {
/// Decryption session: origin + requester.
Decrypt(Option<SerializableAddress>, SerializableAddress),
}
impl Message {
@ -1059,6 +1073,7 @@ impl Message {
_ => false
},
Message::KeyVersionNegotiation(KeyVersionNegotiationMessage::RequestKeyVersions(_)) => true,
Message::KeyVersionNegotiation(KeyVersionNegotiationMessage::KeyVersionsError(ref msg)) if msg.continue_with.is_some() => true,
Message::ShareAdd(ShareAddMessage::ShareAddConsensusMessage(ref msg)) => match msg.message {
ConsensusMessageOfShareAdd::InitializeConsensusSession(_) => true,
_ => false

View File

@ -19,16 +19,13 @@ use std::net::SocketAddr;
use std::collections::{BTreeMap, HashSet};
use std::time::Duration;
use parking_lot::Mutex;
use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify, ChainRoute, CallContract, RegistryInfo};
use ethcore::filter::Filter;
use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify, ChainRoute, CallContract};
use ethkey::public_to_address;
use hash::keccak;
use ethereum_types::{H256, Address};
use bytes::Bytes;
use types::{Error, Public, NodeAddress, NodeId};
use trusted_client::TrustedClient;
use helpers::{get_confirmed_block_hash, REQUEST_CONFIRMATIONS_REQUIRED};
use {NodeKeyPair};
use {NodeKeyPair, ContractAddress};
use_contract!(key_server, "KeyServerSet", "res/key_server_set.json");
@ -39,22 +36,6 @@ const MIGRATION_CONFIRMATIONS_REQUIRED: u64 = 5;
/// Number of blocks before the same-migration transaction (be it start or confirmation) will be retried.
const TRANSACTION_RETRY_INTERVAL_BLOCKS: u64 = 30;
/// Key server has been added to the set.
const ADDED_EVENT_NAME: &'static [u8] = &*b"KeyServerAdded(address)";
/// Key server has been removed from the set.
const REMOVED_EVENT_NAME: &'static [u8] = &*b"KeyServerRemoved(address)";
/// Migration has started.
const MIGRATION_STARTED_EVENT_NAME: &'static [u8] = &*b"MigrationStarted()";
/// Migration has completed.
const MIGRATION_COMPLETED_EVENT_NAME: &'static [u8] = &*b"MigrationCompleted()";
lazy_static! {
static ref ADDED_EVENT_NAME_HASH: H256 = keccak(ADDED_EVENT_NAME);
static ref REMOVED_EVENT_NAME_HASH: H256 = keccak(REMOVED_EVENT_NAME);
static ref MIGRATION_STARTED_EVENT_NAME_HASH: H256 = keccak(MIGRATION_STARTED_EVENT_NAME);
static ref MIGRATION_COMPLETED_EVENT_NAME_HASH: H256 = keccak(MIGRATION_COMPLETED_EVENT_NAME);
}
#[derive(Default, Debug, Clone, PartialEq)]
/// Key Server Set state.
pub struct KeyServerSetSnapshot {
@ -81,6 +62,8 @@ pub struct KeyServerSetMigration {
/// Key Server Set
pub trait KeyServerSet: Send + Sync {
/// Is this node currently isolated from the set?
fn is_isolated(&self) -> bool;
/// Get server set state.
fn snapshot(&self) -> KeyServerSetSnapshot;
/// Start migration.
@ -117,7 +100,9 @@ struct PreviousMigrationTransaction {
struct CachedContract {
/// Blockchain client.
client: TrustedClient,
/// Contract address.
/// Contract address source.
contract_address_source: Option<ContractAddress>,
/// Current contract address.
contract_address: Option<Address>,
/// Contract interface.
contract: key_server::KeyServerSet,
@ -136,10 +121,10 @@ struct CachedContract {
}
impl OnChainKeyServerSet {
pub fn new(trusted_client: TrustedClient, self_key_pair: Arc<NodeKeyPair>, auto_migrate_enabled: bool, key_servers: BTreeMap<Public, NodeAddress>) -> Result<Arc<Self>, Error> {
pub fn new(trusted_client: TrustedClient, contract_address_source: Option<ContractAddress>, self_key_pair: Arc<NodeKeyPair>, auto_migrate_enabled: bool, key_servers: BTreeMap<Public, NodeAddress>) -> Result<Arc<Self>, Error> {
let client = trusted_client.get_untrusted();
let key_server_set = Arc::new(OnChainKeyServerSet {
contract: Mutex::new(CachedContract::new(trusted_client, self_key_pair, auto_migrate_enabled, key_servers)?),
contract: Mutex::new(CachedContract::new(trusted_client, contract_address_source, self_key_pair, auto_migrate_enabled, key_servers)?),
});
client
.ok_or_else(|| Error::Internal("Constructing OnChainKeyServerSet without active Client".into()))?
@ -149,6 +134,10 @@ impl OnChainKeyServerSet {
}
impl KeyServerSet for OnChainKeyServerSet {
fn is_isolated(&self) -> bool {
self.contract.lock().is_isolated()
}
fn snapshot(&self) -> KeyServerSetSnapshot {
self.contract.lock().snapshot()
}
@ -244,16 +233,21 @@ impl <F: Fn(Vec<u8>) -> Result<Vec<u8>, String>> KeyServerSubset<F> for NewKeySe
}
impl CachedContract {
pub fn new(client: TrustedClient, self_key_pair: Arc<NodeKeyPair>, auto_migrate_enabled: bool, key_servers: BTreeMap<Public, NodeAddress>) -> Result<Self, Error> {
let server_set = key_servers.into_iter()
.map(|(p, addr)| {
let addr = format!("{}:{}", addr.address, addr.port).parse()
.map_err(|err| Error::Internal(format!("error parsing node address: {}", err)))?;
Ok((p, addr))
})
.collect::<Result<BTreeMap<_, _>, Error>>()?;
Ok(CachedContract {
pub fn new(client: TrustedClient, contract_address_source: Option<ContractAddress>, self_key_pair: Arc<NodeKeyPair>, auto_migrate_enabled: bool, key_servers: BTreeMap<Public, NodeAddress>) -> Result<Self, Error> {
let server_set = match contract_address_source.is_none() {
true => key_servers.into_iter()
.map(|(p, addr)| {
let addr = format!("{}:{}", addr.address, addr.port).parse()
.map_err(|err| Error::Internal(format!("error parsing node address: {}", err)))?;
Ok((p, addr))
})
.collect::<Result<BTreeMap<_, _>, Error>>()?,
false => Default::default(),
};
let mut contract = CachedContract {
client: client,
contract_address_source: contract_address_source,
contract_address: None,
contract: key_server::KeyServerSet::default(),
auto_migrate_enabled: auto_migrate_enabled,
@ -266,19 +260,46 @@ impl CachedContract {
..Default::default()
},
self_key_pair: self_key_pair,
})
};
contract.update_contract_address();
Ok(contract)
}
pub fn update_contract_address(&mut self) {
if let Some(ref contract_address_source) = self.contract_address_source {
let contract_address = self.client.read_contract_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.into(), contract_address_source);
if contract_address != self.contract_address {
trace!(target: "secretstore", "{}: Configuring for key server set contract from address {:?}",
self.self_key_pair.public(), contract_address);
self.contract_address = contract_address;
}
}
}
pub fn update(&mut self, enacted: Vec<H256>, retracted: Vec<H256>) {
// no need to update when servers set is hardcoded
if self.contract_address_source.is_none() {
return;
}
if let Some(client) = self.client.get() {
// read new snapshot from registry (if something has changed)
self.read_from_registry_if_required(&*client, enacted, retracted);
// read new snapshot from reqistry (if something has chnaged)
if !enacted.is_empty() || !retracted.is_empty() {
self.update_contract_address();
self.read_from_registry(&*client);
}
// update number of confirmations (if there's future new set)
self.update_number_of_confirmations_if_required(&*client);
}
}
fn is_isolated(&self) -> bool {
!self.snapshot.current_set.contains_key(self.self_key_pair.public())
}
fn snapshot(&self) -> KeyServerSetSnapshot {
self.snapshot.clone()
}
@ -295,12 +316,11 @@ impl CachedContract {
let transaction_data = self.contract.functions().start_migration().input(migration_id);
// send transaction
if let Err(error) = self.client.transact_contract(*contract_address, transaction_data) {
warn!(target: "secretstore_net", "{}: failed to submit auto-migration start transaction: {}",
self.self_key_pair.public(), error);
} else {
trace!(target: "secretstore_net", "{}: sent auto-migration start transaction",
self.self_key_pair.public());
match self.client.transact_contract(*contract_address, transaction_data) {
Ok(_) => trace!(target: "secretstore_net", "{}: sent auto-migration start transaction",
self.self_key_pair.public()),
Err(error) => warn!(target: "secretstore_net", "{}: failed to submit auto-migration start transaction: {}",
self.self_key_pair.public(), error),
}
}
}
@ -317,55 +337,16 @@ impl CachedContract {
let transaction_data = self.contract.functions().confirm_migration().input(migration_id);
// send transaction
if let Err(error) = self.client.transact_contract(contract_address, transaction_data) {
warn!(target: "secretstore_net", "{}: failed to submit auto-migration confirmation transaction: {}",
self.self_key_pair.public(), error);
} else {
trace!(target: "secretstore_net", "{}: sent auto-migration confirm transaction",
self.self_key_pair.public());
match self.client.transact_contract(contract_address, transaction_data) {
Ok(_) => trace!(target: "secretstore_net", "{}: sent auto-migration confirm transaction",
self.self_key_pair.public()),
Err(error) => warn!(target: "secretstore_net", "{}: failed to submit auto-migration confirmation transaction: {}",
self.self_key_pair.public(), error),
}
}
}
fn read_from_registry_if_required(&mut self, client: &Client, enacted: Vec<H256>, retracted: Vec<H256>) {
// read new contract from registry
let new_contract_addr = get_confirmed_block_hash(&*client, REQUEST_CONFIRMATIONS_REQUIRED).and_then(|block_hash| client.registry_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.to_owned(), BlockId::Hash(block_hash)));
// new contract installed => read nodes set from the contract
if self.contract_address.as_ref() != new_contract_addr.as_ref() {
self.read_from_registry(&*client, new_contract_addr);
return;
}
// check for contract events
let is_set_changed = self.contract_address.is_some() && enacted.iter()
.chain(retracted.iter())
.any(|block_hash| !client.logs(Filter {
from_block: BlockId::Hash(block_hash.clone()),
to_block: BlockId::Hash(block_hash.clone()),
address: self.contract_address.map(|address| vec![address]),
topics: vec![
Some(vec![*ADDED_EVENT_NAME_HASH, *REMOVED_EVENT_NAME_HASH,
*MIGRATION_STARTED_EVENT_NAME_HASH, *MIGRATION_COMPLETED_EVENT_NAME_HASH]),
None,
None,
None,
],
limit: Some(1),
}).is_empty());
// to simplify processing - just re-read the whole nodes set from the contract
if is_set_changed {
self.read_from_registry(&*client, new_contract_addr);
}
}
fn read_from_registry(&mut self, client: &Client, new_contract_address: Option<Address>) {
if let Some(ref contract_addr) = new_contract_address {
trace!(target: "secretstore", "Configuring for key server set contract from {}", contract_addr);
}
self.contract_address = new_contract_address;
fn read_from_registry(&mut self, client: &Client) {
let contract_address = match self.contract_address {
Some(contract_address) => contract_address,
None => {
@ -505,7 +486,7 @@ fn update_future_set(future_new_set: &mut Option<FutureNewSet>, new_snapshot: &m
return;
}
// new no migration is required => no need to delay visibility
// no migration is required => no need to delay visibility
if !is_migration_required(&new_snapshot.current_set, &new_snapshot.new_set) {
*future_new_set = None;
return;
@ -602,18 +583,24 @@ pub mod tests {
#[derive(Default)]
pub struct MapKeyServerSet {
is_isolated: bool,
nodes: BTreeMap<Public, SocketAddr>,
}
impl MapKeyServerSet {
pub fn new(nodes: BTreeMap<Public, SocketAddr>) -> Self {
pub fn new(is_isolated: bool, nodes: BTreeMap<Public, SocketAddr>) -> Self {
MapKeyServerSet {
is_isolated: is_isolated,
nodes: nodes,
}
}
}
impl KeyServerSet for MapKeyServerSet {
fn is_isolated(&self) -> bool {
self.is_isolated
}
fn snapshot(&self) -> KeyServerSetSnapshot {
KeyServerSetSnapshot {
current_set: self.nodes.clone(),

View File

@ -466,7 +466,6 @@ pub mod tests {
#[test]
fn persistent_key_storage() {
let tempdir = TempDir::new("").unwrap();
let key1 = ServerKeyId::from(1);
let value1 = DocumentKeyShare {
author: Default::default(),

View File

@ -82,16 +82,15 @@ pub use traits::{NodeKeyPair, KeyServer};
pub use self::node_key_pair::{PlainNodeKeyPair, KeyStoreNodeKeyPair};
/// Start new key server instance
pub fn start(client: Arc<Client>, sync: Arc<SyncProvider>, miner: Arc<Miner>, self_key_pair: Arc<NodeKeyPair>, config: ServiceConfiguration, db: Arc<KeyValueDB>) -> Result<Box<KeyServer>, Error> {
pub fn start(client: Arc<Client>, sync: Arc<SyncProvider>, miner: Arc<Miner>, self_key_pair: Arc<NodeKeyPair>, mut config: ServiceConfiguration, db: Arc<KeyValueDB>) -> Result<Box<KeyServer>, Error> {
let trusted_client = trusted_client::TrustedClient::new(self_key_pair.clone(), client.clone(), sync, miner);
let acl_storage: Arc<acl_storage::AclStorage> = if config.acl_check_enabled {
acl_storage::OnChainAclStorage::new(trusted_client.clone())?
} else {
Arc::new(acl_storage::DummyAclStorage::default())
};
let acl_storage: Arc<acl_storage::AclStorage> = match config.acl_check_contract_address.take() {
Some(acl_check_contract_address) => acl_storage::OnChainAclStorage::new(trusted_client.clone(), acl_check_contract_address)?,
None => Arc::new(acl_storage::DummyAclStorage::default()),
};
let key_server_set = key_server_set::OnChainKeyServerSet::new(trusted_client.clone(), self_key_pair.clone(),
config.cluster_config.auto_migrate_enabled, config.cluster_config.nodes.clone())?;
let key_server_set = key_server_set::OnChainKeyServerSet::new(trusted_client.clone(), config.cluster_config.key_server_set_contract_address.take(),
self_key_pair.clone(), config.cluster_config.auto_migrate_enabled, config.cluster_config.nodes.clone())?;
let key_storage = Arc::new(key_storage::PersistentKeyStorage::new(db)?);
let key_server = Arc::new(key_server::KeyServerImpl::new(&config.cluster_config, key_server_set.clone(), self_key_pair.clone(), acl_storage.clone(), key_storage.clone())?);
let cluster = key_server.cluster();

View File

@ -18,7 +18,7 @@ use std::sync::Arc;
use parking_lot::RwLock;
use ethabi::RawLog;
use ethcore::filter::Filter;
use ethcore::client::{Client, BlockChainClient, BlockId, RegistryInfo, CallContract};
use ethcore::client::{Client, BlockChainClient, BlockId, CallContract};
use ethkey::{Public, public_to_address};
use hash::keccak;
use bytes::Bytes;
@ -99,8 +99,8 @@ pub struct OnChainServiceContract {
self_key_pair: Arc<NodeKeyPair>,
/// Contract registry name (if any).
name: String,
/// Contract address.
address: ContractAddress,
/// Contract address source.
address_source: ContractAddress,
/// Contract.
contract: service::Service,
/// Contract.
@ -109,8 +109,8 @@ pub struct OnChainServiceContract {
/// On-chain service contract data.
struct ServiceData {
/// Actual contract address.
pub contract_address: Address,
/// Current contract address.
pub contract_address: Option<Address>,
/// Last block we have read logs from.
pub last_log_block: Option<H256>,
}
@ -136,38 +136,26 @@ struct DocumentKeyShadowRetrievalService;
impl OnChainServiceContract {
/// Create new on-chain service contract.
pub fn new(mask: ApiMask, client: TrustedClient, name: String, address: ContractAddress, self_key_pair: Arc<NodeKeyPair>) -> Self {
let contract_addr = match address {
ContractAddress::Registry => client.get().and_then(|c| c.registry_address(name.clone(), BlockId::Latest)
.map(|address| {
trace!(target: "secretstore", "{}: installing {} service contract from address {}",
self_key_pair.public(), name, address);
address
}))
.unwrap_or_default(),
ContractAddress::Address(ref address) => {
trace!(target: "secretstore", "{}: installing service contract from address {}",
self_key_pair.public(), address);
address.clone()
},
};
OnChainServiceContract {
pub fn new(mask: ApiMask, client: TrustedClient, name: String, address_source: ContractAddress, self_key_pair: Arc<NodeKeyPair>) -> Self {
let contract = OnChainServiceContract {
mask: mask,
client: client,
self_key_pair: self_key_pair,
name: name,
address: address,
address_source: address_source,
contract: service::Service::default(),
data: RwLock::new(ServiceData {
contract_address: contract_addr,
contract_address: None,
last_log_block: None,
}),
}
};
contract.update_contract_address();
contract
}
/// Send transaction to the service contract.
fn send_contract_transaction<C, P>(&self, origin: &Address, server_key_id: &ServerKeyId, is_response_required: C, prepare_tx: P) -> Result<(), String>
fn send_contract_transaction<C, P>(&self, tx_name: &str, origin: &Address, server_key_id: &ServerKeyId, is_response_required: C, prepare_tx: P) -> Result<(), String>
where C: FnOnce(&Client, &Address, &service::Service, &ServerKeyId, &Address) -> bool,
P: FnOnce(&Client, &Address, &service::Service) -> Result<Bytes, String> {
// only publish if contract address is set && client is online
@ -193,6 +181,9 @@ impl OnChainServiceContract {
transaction_data
).map_err(|e| format!("{}", e))?;
trace!(target: "secretstore", "{}: transaction {} sent to service contract",
self.self_key_pair.public(), tx_name);
Ok(())
}
@ -228,26 +219,25 @@ impl OnChainServiceContract {
.ok()
.unwrap_or_else(|| Box::new(::std::iter::empty()))
}
/// Update service contract address.
fn update_contract_address(&self) -> bool {
let contract_address = self.client.read_contract_address(self.name.clone(), &self.address_source);
let mut data = self.data.write();
if contract_address != data.contract_address {
trace!(target: "secretstore", "{}: installing {} service contract from address {:?}",
self.self_key_pair.public(), self.name, contract_address);
data.contract_address = contract_address;
}
data.contract_address.is_some()
}
}
impl ServiceContract for OnChainServiceContract {
fn update(&self) -> bool {
if let &ContractAddress::Registry = &self.address {
if let Some(client) = self.client.get() {
if let Some(block_hash) = get_confirmed_block_hash(&*client, REQUEST_CONFIRMATIONS_REQUIRED) {
// update contract address from registry
let service_contract_addr = client.registry_address(self.name.clone(), BlockId::Hash(block_hash)).unwrap_or_default();
if self.data.read().contract_address != service_contract_addr {
trace!(target: "secretstore", "{}: installing {} service contract from address {}",
self.self_key_pair.public(), self.name, service_contract_addr);
self.data.write().contract_address = service_contract_addr;
}
}
}
}
self.data.read().contract_address != Default::default()
&& self.client.get().is_some()
self.update_contract_address() && self.client.get().is_some()
}
fn read_logs(&self) -> Box<Iterator<Item=ServiceTask>> {
@ -263,7 +253,10 @@ impl ServiceContract for OnChainServiceContract {
// prepare range of blocks to read logs from
let (address, first_block, last_block) = {
let mut data = self.data.write();
let address = data.contract_address;
let address = match data.contract_address {
Some(address) => address,
None => return Box::new(::std::iter::empty()), // no contract installed
};
let confirmed_block = match get_confirmed_block_hash(&*client, REQUEST_CONFIRMATIONS_REQUIRED) {
Some(confirmed_block) => confirmed_block,
None => return Box::new(::std::iter::empty()), // no block with enough confirmations
@ -326,31 +319,31 @@ impl ServiceContract for OnChainServiceContract {
// we only need requests that are here for more than REQUEST_CONFIRMATIONS_REQUIRED blocks
// => we're reading from Latest - (REQUEST_CONFIRMATIONS_REQUIRED + 1) block
let data = self.data.read();
match data.contract_address == Default::default() {
true => Box::new(::std::iter::empty()),
false => get_confirmed_block_hash(&*client, REQUEST_CONFIRMATIONS_REQUIRED + 1)
match data.contract_address {
None => Box::new(::std::iter::empty()),
Some(contract_address) => get_confirmed_block_hash(&*client, REQUEST_CONFIRMATIONS_REQUIRED + 1)
.map(|b| {
let block = BlockId::Hash(b);
let iter = match self.mask.server_key_generation_requests {
true => Box::new(self.create_pending_requests_iterator(client.clone(), &data.contract_address, &block,
true => Box::new(self.create_pending_requests_iterator(client.clone(), &contract_address, &block,
&ServerKeyGenerationService::read_pending_requests_count,
&ServerKeyGenerationService::read_pending_request)) as Box<Iterator<Item=(bool, ServiceTask)>>,
false => Box::new(::std::iter::empty()),
};
let iter = match self.mask.server_key_retrieval_requests {
true => Box::new(iter.chain(self.create_pending_requests_iterator(client.clone(), &data.contract_address, &block,
true => Box::new(iter.chain(self.create_pending_requests_iterator(client.clone(), &contract_address, &block,
&ServerKeyRetrievalService::read_pending_requests_count,
&ServerKeyRetrievalService::read_pending_request))),
false => iter,
};
let iter = match self.mask.document_key_store_requests {
true => Box::new(iter.chain(self.create_pending_requests_iterator(client.clone(), &data.contract_address, &block,
true => Box::new(iter.chain(self.create_pending_requests_iterator(client.clone(), &contract_address, &block,
&DocumentKeyStoreService::read_pending_requests_count,
&DocumentKeyStoreService::read_pending_request))),
false => iter,
};
let iter = match self.mask.document_key_shadow_retrieval_requests {
true => Box::new(iter.chain(self.create_pending_requests_iterator(client, &data.contract_address, &block,
true => Box::new(iter.chain(self.create_pending_requests_iterator(client, &contract_address, &block,
&DocumentKeyShadowRetrievalService::read_pending_requests_count,
&DocumentKeyShadowRetrievalService::read_pending_request))),
false => iter
@ -363,63 +356,59 @@ impl ServiceContract for OnChainServiceContract {
}
fn publish_generated_server_key(&self, origin: &Address, server_key_id: &ServerKeyId, server_key: Public) -> Result<(), String> {
self.send_contract_transaction(origin, server_key_id, ServerKeyGenerationService::is_response_required, |_, _, service|
Ok(ServerKeyGenerationService::prepare_pubish_tx_data(service, server_key_id, &server_key))
)
self.send_contract_transaction("publish_generated_server_key", origin, server_key_id, ServerKeyGenerationService::is_response_required,
|_, _, service| Ok(ServerKeyGenerationService::prepare_pubish_tx_data(service, server_key_id, &server_key)))
}
fn publish_server_key_generation_error(&self, origin: &Address, server_key_id: &ServerKeyId) -> Result<(), String> {
self.send_contract_transaction(origin, server_key_id, ServerKeyGenerationService::is_response_required, |_, _, service|
Ok(ServerKeyGenerationService::prepare_error_tx_data(service, server_key_id))
)
self.send_contract_transaction("publish_server_key_generation_error", origin, server_key_id, ServerKeyGenerationService::is_response_required,
|_, _, service| Ok(ServerKeyGenerationService::prepare_error_tx_data(service, server_key_id)))
}
fn publish_retrieved_server_key(&self, origin: &Address, server_key_id: &ServerKeyId, server_key: Public, threshold: usize) -> Result<(), String> {
let threshold = serialize_threshold(threshold)?;
self.send_contract_transaction(origin, server_key_id, ServerKeyRetrievalService::is_response_required, |_, _, service|
Ok(ServerKeyRetrievalService::prepare_pubish_tx_data(service, server_key_id, server_key, threshold))
)
self.send_contract_transaction("publish_retrieved_server_key", origin, server_key_id, ServerKeyRetrievalService::is_response_required,
|_, _, service| Ok(ServerKeyRetrievalService::prepare_pubish_tx_data(service, server_key_id, server_key, threshold)))
}
fn publish_server_key_retrieval_error(&self, origin: &Address, server_key_id: &ServerKeyId) -> Result<(), String> {
self.send_contract_transaction(origin, server_key_id, ServerKeyRetrievalService::is_response_required, |_, _, service|
Ok(ServerKeyRetrievalService::prepare_error_tx_data(service, server_key_id))
)
self.send_contract_transaction("publish_server_key_retrieval_error", origin, server_key_id, ServerKeyRetrievalService::is_response_required,
|_, _, service| Ok(ServerKeyRetrievalService::prepare_error_tx_data(service, server_key_id)))
}
fn publish_stored_document_key(&self, origin: &Address, server_key_id: &ServerKeyId) -> Result<(), String> {
self.send_contract_transaction(origin, server_key_id, DocumentKeyStoreService::is_response_required, |_, _, service|
Ok(DocumentKeyStoreService::prepare_pubish_tx_data(service, server_key_id))
)
self.send_contract_transaction("publish_stored_document_key", origin, server_key_id, DocumentKeyStoreService::is_response_required,
|_, _, service| Ok(DocumentKeyStoreService::prepare_pubish_tx_data(service, server_key_id)))
}
fn publish_document_key_store_error(&self, origin: &Address, server_key_id: &ServerKeyId) -> Result<(), String> {
self.send_contract_transaction(origin, server_key_id, DocumentKeyStoreService::is_response_required, |_, _, service|
Ok(DocumentKeyStoreService::prepare_error_tx_data(service, server_key_id))
)
self.send_contract_transaction("publish_document_key_store_error", origin, server_key_id, DocumentKeyStoreService::is_response_required,
|_, _, service| Ok(DocumentKeyStoreService::prepare_error_tx_data(service, server_key_id)))
}
fn publish_retrieved_document_key_common(&self, origin: &Address, server_key_id: &ServerKeyId, requester: &Address, common_point: Public, threshold: usize) -> Result<(), String> {
let threshold = serialize_threshold(threshold)?;
self.send_contract_transaction(origin, server_key_id, |client, contract_address, contract, server_key_id, key_server|
DocumentKeyShadowRetrievalService::is_response_required(client, contract_address, contract, server_key_id, requester, key_server),
|_, _, service|
Ok(DocumentKeyShadowRetrievalService::prepare_pubish_common_tx_data(service, server_key_id, requester, common_point, threshold))
self.send_contract_transaction("publish_retrieved_document_key_common", origin, server_key_id,
|client, contract_address, contract, server_key_id, key_server|
DocumentKeyShadowRetrievalService::is_response_required(client, contract_address, contract, server_key_id, requester, key_server),
|_, _, service|
Ok(DocumentKeyShadowRetrievalService::prepare_pubish_common_tx_data(service, server_key_id, requester, common_point, threshold))
)
}
fn publish_retrieved_document_key_personal(&self, origin: &Address, server_key_id: &ServerKeyId, requester: &Address, participants: &[Address], decrypted_secret: Public, shadow: Bytes) -> Result<(), String> {
self.send_contract_transaction(origin, server_key_id, |_, _, _, _, _| true,
self.send_contract_transaction("publish_retrieved_document_key_personal", origin, server_key_id, |_, _, _, _, _| true,
move |client, address, service|
DocumentKeyShadowRetrievalService::prepare_pubish_personal_tx_data(client, address, service, server_key_id, requester, participants, decrypted_secret, shadow)
)
}
fn publish_document_key_retrieval_error(&self, origin: &Address, server_key_id: &ServerKeyId, requester: &Address) -> Result<(), String> {
self.send_contract_transaction(origin, server_key_id, |client, contract_address, contract, server_key_id, key_server|
DocumentKeyShadowRetrievalService::is_response_required(client, contract_address, contract, server_key_id, requester, key_server),
|_, _, service|
Ok(DocumentKeyShadowRetrievalService::prepare_error_tx_data(service, server_key_id, requester))
self.send_contract_transaction("publish_document_key_retrieval_error", origin, server_key_id,
|client, contract_address, contract, server_key_id, key_server|
DocumentKeyShadowRetrievalService::is_response_required(client, contract_address, contract, server_key_id, requester, key_server),
|_, _, service|
Ok(DocumentKeyShadowRetrievalService::prepare_error_tx_data(service, server_key_id, requester))
)
}
}
@ -742,16 +731,16 @@ impl DocumentKeyShadowRetrievalService {
.map(|not_confirmed| (
not_confirmed,
match is_common_retrieval_completed {
true => ServiceTask::RetrieveShadowDocumentKeyCommon(
true => ServiceTask::RetrieveShadowDocumentKeyPersonal(
contract_address.clone(),
server_key_id,
requester,
),
false => ServiceTask::RetrieveShadowDocumentKeyCommon(
contract_address.clone(),
server_key_id,
public_to_address(&requester),
),
false => ServiceTask::RetrieveShadowDocumentKeyPersonal(
contract_address.clone(),
server_key_id,
requester,
)
},
))
.map_err(|error| format!("{}", error))

View File

@ -25,11 +25,13 @@ use ethkey::{Public, public_to_address};
use bytes::Bytes;
use ethereum_types::{H256, U256, Address};
use key_server_set::KeyServerSet;
use key_server_cluster::{ClusterClient, ClusterSessionsListener, ClusterSession};
use key_server_cluster::{NodeId, ClusterClient, ClusterSessionsListener, ClusterSession};
use key_server_cluster::math;
use key_server_cluster::generation_session::SessionImpl as GenerationSession;
use key_server_cluster::encryption_session::{check_encrypted_data, update_encrypted_data};
use key_server_cluster::decryption_session::SessionImpl as DecryptionSession;
use key_server_cluster::key_version_negotiation_session::{SessionImpl as KeyVersionNegotiationSession,
IsolatedSessionTransport as KeyVersionNegotiationTransport, FailedContinueAction};
use key_storage::KeyStorage;
use acl_storage::AclStorage;
use listener::service_contract::ServiceContract;
@ -138,7 +140,6 @@ impl ServiceContractListener {
key_server_set: params.key_server_set,
key_storage: params.key_storage,
});
data.tasks_queue.push(ServiceTask::Retry);
// we are not starting thread when in test mode
let service_handle = if cfg!(test) {
@ -154,11 +155,17 @@ impl ServiceContractListener {
});
contract.data.cluster.add_generation_listener(contract.clone());
contract.data.cluster.add_decryption_listener(contract.clone());
contract.data.cluster.add_key_version_negotiation_listener(contract.clone());
Ok(contract)
}
/// Process incoming events of service contract.
fn process_service_contract_events(&self) {
// shortcut: do not process events if we're isolated from the cluster
if self.data.key_server_set.is_isolated() {
return;
}
self.data.tasks_queue.push_many(self.data.contract.read_logs()
.filter_map(|task| Self::filter_task(&self.data, task)));
}
@ -168,7 +175,7 @@ impl ServiceContractListener {
match task {
// when this node should be master of this server key generation session
ServiceTask::GenerateServerKey(origin, server_key_id, author, threshold) if is_processed_by_this_key_server(
&*data.key_server_set, &*data.self_key_pair, &server_key_id) =>
&*data.key_server_set, data.self_key_pair.public(), &server_key_id) =>
Some(ServiceTask::GenerateServerKey(origin, server_key_id, author, threshold)),
// when server key is not yet generated and generation must be initiated by other node
ServiceTask::GenerateServerKey(_, _, _, _) => None,
@ -187,7 +194,7 @@ impl ServiceContractListener {
// when this node should be master of this document key decryption session
ServiceTask::RetrieveShadowDocumentKeyPersonal(origin, server_key_id, requester) if is_processed_by_this_key_server(
&*data.key_server_set, &*data.self_key_pair, &server_key_id) =>
&*data.key_server_set, data.self_key_pair.public(), &server_key_id) =>
Some(ServiceTask::RetrieveShadowDocumentKeyPersonal(origin, server_key_id, requester)),
// when server key is not yet generated and generation must be initiated by other node
ServiceTask::RetrieveShadowDocumentKeyPersonal(_, _, _) => None,
@ -211,7 +218,7 @@ impl ServiceContractListener {
};
}
trace!(target: "secretstore_net", "{}: ServiceContractListener thread stopped", data.self_key_pair.public());
trace!(target: "secretstore", "{}: ServiceContractListener thread stopped", data.self_key_pair.public());
}
/// Process single service task.
@ -430,7 +437,7 @@ impl Drop for ServiceContractListener {
impl ChainNotify for ServiceContractListener {
fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, route: ChainRoute, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: Duration) {
let enacted_len = route.enacted().len();
if enacted_len == 0 {
if enacted_len == 0 && route.retracted().is_empty() {
return;
}
@ -443,8 +450,11 @@ impl ChainNotify for ServiceContractListener {
// schedule retry if received enough blocks since last retry
// it maybe inaccurate when switching syncing/synced states, but that's ok
if self.data.last_retry.fetch_add(enacted_len, Ordering::Relaxed) >= RETRY_INTERVAL_BLOCKS {
self.data.tasks_queue.push(ServiceTask::Retry);
self.data.last_retry.store(0, Ordering::Relaxed);
// shortcut: do not retry if we're isolated from the cluster
if !self.data.key_server_set.is_isolated() {
self.data.tasks_queue.push(ServiceTask::Retry);
self.data.last_retry.store(0, Ordering::Relaxed);
}
}
}
}
@ -491,6 +501,35 @@ impl ClusterSessionsListener<DecryptionSession> for ServiceContractListener {
}
}
impl ClusterSessionsListener<KeyVersionNegotiationSession<KeyVersionNegotiationTransport>> for ServiceContractListener {
fn on_session_removed(&self, session: Arc<KeyVersionNegotiationSession<KeyVersionNegotiationTransport>>) {
// by this time sesion must already be completed - either successfully, or not
assert!(session.is_finished());
// we're interested in:
// 1) sessions failed with fatal error
// 2) with decryption continue action
let error = match session.wait() {
Err(ref error) if !error.is_non_fatal() => error.clone(),
_ => return,
};
let (origin, requester) = match session.take_failed_continue_action() {
Some(FailedContinueAction::Decrypt(Some(origin), requester)) => (origin, requester),
_ => return,
};
// check if master node is responsible for processing key requests
let meta = session.meta();
if !is_processed_by_this_key_server(&*self.data.key_server_set, &meta.master_node_id, &meta.id) {
return;
}
// ignore result as we're already processing an error
let _ = Self::process_document_key_retrieval_result(&self.data, origin, &meta.id, &requester, Err(error));
}
}
impl ::std::fmt::Display for ServiceTask {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
match *self {
@ -520,8 +559,8 @@ fn log_service_task_result(task: &ServiceTask, self_id: &Public, result: Result<
result
}
/// Returns true when session, related to `server_key_id` must be started on this KeyServer.
fn is_processed_by_this_key_server(key_server_set: &KeyServerSet, self_key_pair: &NodeKeyPair, server_key_id: &H256) -> bool {
/// Returns true when session, related to `server_key_id` must be started on `node`.
fn is_processed_by_this_key_server(key_server_set: &KeyServerSet, node: &NodeId, server_key_id: &H256) -> bool {
let servers = key_server_set.snapshot().current_set;
let total_servers_count = servers.len();
match total_servers_count {
@ -530,7 +569,7 @@ fn is_processed_by_this_key_server(key_server_set: &KeyServerSet, self_key_pair:
_ => (),
}
let this_server_index = match servers.keys().enumerate().find(|&(_, s)| s == self_key_pair.public()) {
let this_server_index = match servers.keys().enumerate().find(|&(_, s)| s == node) {
Some((index, _)) => index,
None => return false,
};
@ -554,8 +593,9 @@ mod tests {
use acl_storage::{AclStorage, DummyAclStorage};
use key_storage::{KeyStorage, DocumentKeyShare};
use key_storage::tests::DummyKeyStorage;
use key_server_set::KeyServerSet;
use key_server_set::tests::MapKeyServerSet;
use {PlainNodeKeyPair, ServerKeyId};
use {NodeKeyPair, PlainNodeKeyPair, ServerKeyId};
use super::{ServiceTask, ServiceContractListener, ServiceContractListenerParams, is_processed_by_this_key_server};
fn create_non_empty_key_storage(has_doc_key: bool) -> Arc<DummyKeyStorage> {
@ -571,19 +611,23 @@ mod tests {
key_storage
}
fn make_service_contract_listener(contract: Option<Arc<ServiceContract>>, cluster: Option<Arc<DummyClusterClient>>, key_storage: Option<Arc<KeyStorage>>, acl_storage: Option<Arc<AclStorage>>) -> Arc<ServiceContractListener> {
let contract = contract.unwrap_or_else(|| Arc::new(DummyServiceContract::default()));
let cluster = cluster.unwrap_or_else(|| Arc::new(DummyClusterClient::default()));
let key_storage = key_storage.unwrap_or_else(|| Arc::new(DummyKeyStorage::default()));
let acl_storage = acl_storage.unwrap_or_else(|| Arc::new(DummyAclStorage::default()));
let servers_set = Arc::new(MapKeyServerSet::new(vec![
fn make_servers_set(is_isolated: bool) -> Arc<KeyServerSet> {
Arc::new(MapKeyServerSet::new(is_isolated, vec![
("79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8".parse().unwrap(),
"127.0.0.1:8080".parse().unwrap()),
("c6047f9441ed7d6d3045406e95c07cd85c778e4b8cef3ca7abac09b95c709ee51ae168fea63dc339a3c58419466ceaeef7f632653266d0e1236431a950cfe52a".parse().unwrap(),
"127.0.0.1:8080".parse().unwrap()),
("f9308a019258c31049344f85f89d5229b531c845836f99b08601f113bce036f9388f7b0f632de8140fe337e62a37f3566500a99934c2231b6cb9fd7584b8e672".parse().unwrap(),
"127.0.0.1:8080".parse().unwrap()),
].into_iter().collect()));
].into_iter().collect()))
}
fn make_service_contract_listener(contract: Option<Arc<ServiceContract>>, cluster: Option<Arc<DummyClusterClient>>, key_storage: Option<Arc<KeyStorage>>, acl_storage: Option<Arc<AclStorage>>, servers_set: Option<Arc<KeyServerSet>>) -> Arc<ServiceContractListener> {
let contract = contract.unwrap_or_else(|| Arc::new(DummyServiceContract::default()));
let cluster = cluster.unwrap_or_else(|| Arc::new(DummyClusterClient::default()));
let key_storage = key_storage.unwrap_or_else(|| Arc::new(DummyKeyStorage::default()));
let acl_storage = acl_storage.unwrap_or_else(|| Arc::new(DummyAclStorage::default()));
let servers_set = servers_set.unwrap_or_else(|| make_servers_set(false));
let self_key_pair = Arc::new(PlainNodeKeyPair::new(KeyPair::from_secret("0000000000000000000000000000000000000000000000000000000000000001".parse().unwrap()).unwrap()));
ServiceContractListener::new(ServiceContractListenerParams {
contract: contract,
@ -599,7 +643,7 @@ mod tests {
fn is_not_processed_by_this_key_server_with_zero_servers() {
assert_eq!(is_processed_by_this_key_server(
&MapKeyServerSet::default(),
&PlainNodeKeyPair::new(Random.generate().unwrap()),
Random.generate().unwrap().public(),
&Default::default()), false);
}
@ -607,27 +651,27 @@ mod tests {
fn is_processed_by_this_key_server_with_single_server() {
let self_key_pair = Random.generate().unwrap();
assert_eq!(is_processed_by_this_key_server(
&MapKeyServerSet::new(vec![
&MapKeyServerSet::new(false, vec![
(self_key_pair.public().clone(), "127.0.0.1:8080".parse().unwrap())
].into_iter().collect()),
&PlainNodeKeyPair::new(self_key_pair),
self_key_pair.public(),
&Default::default()), true);
}
#[test]
fn is_not_processed_by_this_key_server_when_not_a_part_of_servers_set() {
assert!(is_processed_by_this_key_server(
&MapKeyServerSet::new(vec![
&MapKeyServerSet::new(false, vec![
(Random.generate().unwrap().public().clone(), "127.0.0.1:8080".parse().unwrap())
].into_iter().collect()),
&PlainNodeKeyPair::new(Random.generate().unwrap()),
Random.generate().unwrap().public(),
&Default::default()));
}
#[test]
fn is_processed_by_this_key_server_in_set_of_3() {
// servers set is ordered && server range depends on index of this server
let servers_set = MapKeyServerSet::new(vec![
let servers_set = MapKeyServerSet::new(false, vec![
// secret: 0000000000000000000000000000000000000000000000000000000000000001
("79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8".parse().unwrap(),
"127.0.0.1:8080".parse().unwrap()),
@ -642,46 +686,46 @@ mod tests {
// 1st server: process hashes [0x0; 0x555...555]
let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret(
"0000000000000000000000000000000000000000000000000000000000000001".parse().unwrap()).unwrap());
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"0000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"3000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"5555555555555555555555555555555555555555555555555555555555555555".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"5555555555555555555555555555555555555555555555555555555555555556".parse().unwrap()), false);
// 2nd server: process hashes from 0x555...556 to 0xaaa...aab
let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret(
"0000000000000000000000000000000000000000000000000000000000000002".parse().unwrap()).unwrap());
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"5555555555555555555555555555555555555555555555555555555555555555".parse().unwrap()), false);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"5555555555555555555555555555555555555555555555555555555555555556".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"7555555555555555555555555555555555555555555555555555555555555555".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaac".parse().unwrap()), false);
// 3rd server: process hashes from 0x800...000 to 0xbff...ff
let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret(
"0000000000000000000000000000000000000000000000000000000000000003".parse().unwrap()).unwrap());
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab".parse().unwrap()), false);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaac".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"daaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaac".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true);
}
#[test]
fn is_processed_by_this_key_server_in_set_of_4() {
// servers set is ordered && server range depends on index of this server
let servers_set = MapKeyServerSet::new(vec![
let servers_set = MapKeyServerSet::new(false, vec![
// secret: 0000000000000000000000000000000000000000000000000000000000000001
("79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8".parse().unwrap(),
"127.0.0.1:8080".parse().unwrap()),
@ -699,62 +743,72 @@ mod tests {
// 1st server: process hashes [0x0; 0x3ff...ff]
let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret(
"0000000000000000000000000000000000000000000000000000000000000001".parse().unwrap()).unwrap());
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"0000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"2000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"3fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"4000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), false);
// 2nd server: process hashes from 0x400...000 to 0x7ff...ff
let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret(
"0000000000000000000000000000000000000000000000000000000000000002".parse().unwrap()).unwrap());
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"3fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), false);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"4000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"6000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"8000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), false);
// 3rd server: process hashes from 0x800...000 to 0xbff...ff
let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret(
"0000000000000000000000000000000000000000000000000000000000000004".parse().unwrap()).unwrap());
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), false);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"8000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"a000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"bfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"c000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), false);
// 4th server: process hashes from 0xc00...000 to 0xfff...ff
let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret(
"0000000000000000000000000000000000000000000000000000000000000003".parse().unwrap()).unwrap());
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"bfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), false);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"c000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"e000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
assert_eq!(is_processed_by_this_key_server(&servers_set, key_pair.public(),
&"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true);
}
#[test]
fn no_tasks_scheduled_when_no_contract_events() {
let listener = make_service_contract_listener(None, None, None, None);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
let listener = make_service_contract_listener(None, None, None, None, None);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 0);
listener.process_service_contract_events();
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 0);
}
#[test]
fn tasks_are_not_scheduled_on_isolated_node() {
let mut contract = DummyServiceContract::default();
contract.logs.push(ServiceTask::GenerateServerKey(Default::default(), Default::default(), Default::default(), 0));
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None, Some(make_servers_set(true)));
assert_eq!(listener.data.tasks_queue.snapshot().len(), 0);
listener.process_service_contract_events();
assert_eq!(listener.data.tasks_queue.snapshot().len(), 0);
}
// server key generation tests
@ -763,10 +817,10 @@ mod tests {
fn server_key_generation_is_scheduled_when_requested() {
let mut contract = DummyServiceContract::default();
contract.logs.push(ServiceTask::GenerateServerKey(Default::default(), Default::default(), Default::default(), 0));
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None, None);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 0);
listener.process_service_contract_events();
assert_eq!(listener.data.tasks_queue.snapshot().len(), 2);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
assert_eq!(listener.data.tasks_queue.snapshot().pop_back(), Some(ServiceTask::GenerateServerKey(
Default::default(), Default::default(), Default::default(), 0)));
}
@ -776,16 +830,16 @@ mod tests {
let server_key_id = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap();
let mut contract = DummyServiceContract::default();
contract.logs.push(ServiceTask::GenerateServerKey(Default::default(), server_key_id, Default::default(), 0));
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None, None);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 0);
listener.process_service_contract_events();
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 0);
}
#[test]
fn generation_session_is_created_when_processing_generate_server_key_task() {
let cluster = Arc::new(DummyClusterClient::default());
let listener = make_service_contract_listener(None, Some(cluster.clone()), None, None);
let listener = make_service_contract_listener(None, Some(cluster.clone()), None, None, None);
ServiceContractListener::process_service_task(&listener.data, ServiceTask::GenerateServerKey(
Default::default(), Default::default(), Default::default(), Default::default())).unwrap_err();
assert_eq!(cluster.generation_requests_count.load(Ordering::Relaxed), 1);
@ -797,7 +851,7 @@ mod tests {
contract.pending_requests.push((false, ServiceTask::GenerateServerKey(Default::default(),
Default::default(), Default::default(), Default::default())));
let cluster = Arc::new(DummyClusterClient::default());
let listener = make_service_contract_listener(Some(Arc::new(contract)), Some(cluster.clone()), None, None);
let listener = make_service_contract_listener(Some(Arc::new(contract)), Some(cluster.clone()), None, None, None);
listener.data.retry_data.lock().affected_server_keys.insert(Default::default());
ServiceContractListener::retry_pending_requests(&listener.data).unwrap();
assert_eq!(cluster.generation_requests_count.load(Ordering::Relaxed), 0);
@ -809,10 +863,10 @@ mod tests {
fn server_key_retrieval_is_scheduled_when_requested() {
let mut contract = DummyServiceContract::default();
contract.logs.push(ServiceTask::RetrieveServerKey(Default::default(), Default::default()));
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None, None);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 0);
listener.process_service_contract_events();
assert_eq!(listener.data.tasks_queue.snapshot().len(), 2);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
assert_eq!(listener.data.tasks_queue.snapshot().pop_back(), Some(ServiceTask::RetrieveServerKey(
Default::default(), Default::default())));
}
@ -822,10 +876,10 @@ mod tests {
let server_key_id: ServerKeyId = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap();
let mut contract = DummyServiceContract::default();
contract.logs.push(ServiceTask::RetrieveServerKey(Default::default(), server_key_id.clone()));
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None, None);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 0);
listener.process_service_contract_events();
assert_eq!(listener.data.tasks_queue.snapshot().len(), 2);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
assert_eq!(listener.data.tasks_queue.snapshot().pop_back(), Some(ServiceTask::RetrieveServerKey(
Default::default(), server_key_id)));
}
@ -834,7 +888,7 @@ mod tests {
fn server_key_is_retrieved_when_processing_retrieve_server_key_task() {
let contract = Arc::new(DummyServiceContract::default());
let key_storage = create_non_empty_key_storage(false);
let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage), None);
let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage), None, None);
ServiceContractListener::process_service_task(&listener.data, ServiceTask::RetrieveServerKey(
Default::default(), Default::default())).unwrap();
assert_eq!(*contract.retrieved_server_keys.lock(), vec![(Default::default(),
@ -844,7 +898,7 @@ mod tests {
#[test]
fn server_key_retrieval_failure_is_reported_when_processing_retrieve_server_key_task_and_key_is_unknown() {
let contract = Arc::new(DummyServiceContract::default());
let listener = make_service_contract_listener(Some(contract.clone()), None, None, None);
let listener = make_service_contract_listener(Some(contract.clone()), None, None, None, None);
ServiceContractListener::process_service_task(&listener.data, ServiceTask::RetrieveServerKey(
Default::default(), Default::default())).unwrap();
assert_eq!(*contract.server_keys_retrieval_failures.lock(), vec![Default::default()]);
@ -855,7 +909,7 @@ mod tests {
let mut contract = DummyServiceContract::default();
contract.pending_requests.push((false, ServiceTask::RetrieveServerKey(Default::default(), Default::default())));
let cluster = Arc::new(DummyClusterClient::default());
let listener = make_service_contract_listener(Some(Arc::new(contract)), Some(cluster.clone()), None, None);
let listener = make_service_contract_listener(Some(Arc::new(contract)), Some(cluster.clone()), None, None, None);
listener.data.retry_data.lock().affected_server_keys.insert(Default::default());
ServiceContractListener::retry_pending_requests(&listener.data).unwrap();
assert_eq!(cluster.generation_requests_count.load(Ordering::Relaxed), 0);
@ -868,10 +922,10 @@ mod tests {
let mut contract = DummyServiceContract::default();
contract.logs.push(ServiceTask::StoreDocumentKey(Default::default(), Default::default(),
Default::default(), Default::default(), Default::default()));
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None, None);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 0);
listener.process_service_contract_events();
assert_eq!(listener.data.tasks_queue.snapshot().len(), 2);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
assert_eq!(listener.data.tasks_queue.snapshot().pop_back(), Some(ServiceTask::StoreDocumentKey(
Default::default(), Default::default(), Default::default(), Default::default(), Default::default())));
}
@ -882,10 +936,10 @@ mod tests {
let mut contract = DummyServiceContract::default();
contract.logs.push(ServiceTask::StoreDocumentKey(Default::default(), server_key_id.clone(),
Default::default(), Default::default(), Default::default()));
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None, None);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 0);
listener.process_service_contract_events();
assert_eq!(listener.data.tasks_queue.snapshot().len(), 2);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
assert_eq!(listener.data.tasks_queue.snapshot().pop_back(), Some(ServiceTask::StoreDocumentKey(
Default::default(), server_key_id, Default::default(), Default::default(), Default::default())));
}
@ -894,7 +948,7 @@ mod tests {
fn document_key_is_stored_when_processing_store_document_key_task() {
let contract = Arc::new(DummyServiceContract::default());
let key_storage = create_non_empty_key_storage(false);
let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage.clone()), None);
let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage.clone()), None, None);
ServiceContractListener::process_service_task(&listener.data, ServiceTask::StoreDocumentKey(
Default::default(), Default::default(), Default::default(), Default::default(), Default::default())).unwrap();
assert_eq!(*contract.stored_document_keys.lock(), vec![Default::default()]);
@ -907,7 +961,7 @@ mod tests {
#[test]
fn document_key_store_failure_reported_when_no_server_key() {
let contract = Arc::new(DummyServiceContract::default());
let listener = make_service_contract_listener(Some(contract.clone()), None, None, None);
let listener = make_service_contract_listener(Some(contract.clone()), None, None, None, None);
ServiceContractListener::process_service_task(&listener.data, ServiceTask::StoreDocumentKey(
Default::default(), Default::default(), Default::default(), Default::default(), Default::default())).unwrap_err();
assert_eq!(*contract.document_keys_store_failures.lock(), vec![Default::default()]);
@ -917,7 +971,7 @@ mod tests {
fn document_key_store_failure_reported_when_document_key_already_set() {
let contract = Arc::new(DummyServiceContract::default());
let key_storage = create_non_empty_key_storage(true);
let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage), None);
let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage), None, None);
ServiceContractListener::process_service_task(&listener.data, ServiceTask::StoreDocumentKey(
Default::default(), Default::default(), Default::default(), Default::default(), Default::default())).unwrap_err();
assert_eq!(*contract.document_keys_store_failures.lock(), vec![Default::default()]);
@ -927,7 +981,7 @@ mod tests {
fn document_key_store_failure_reported_when_author_differs() {
let contract = Arc::new(DummyServiceContract::default());
let key_storage = create_non_empty_key_storage(false);
let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage), None);
let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage), None, None);
ServiceContractListener::process_service_task(&listener.data, ServiceTask::StoreDocumentKey(
Default::default(), Default::default(), 1.into(), Default::default(), Default::default())).unwrap_err();
assert_eq!(*contract.document_keys_store_failures.lock(), vec![Default::default()]);
@ -939,10 +993,10 @@ mod tests {
fn document_key_shadow_common_retrieval_is_scheduled_when_requested() {
let mut contract = DummyServiceContract::default();
contract.logs.push(ServiceTask::RetrieveShadowDocumentKeyCommon(Default::default(), Default::default(), Default::default()));
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None, None);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 0);
listener.process_service_contract_events();
assert_eq!(listener.data.tasks_queue.snapshot().len(), 2);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
assert_eq!(listener.data.tasks_queue.snapshot().pop_back(), Some(ServiceTask::RetrieveShadowDocumentKeyCommon(
Default::default(), Default::default(), Default::default())));
}
@ -952,10 +1006,10 @@ mod tests {
let server_key_id: ServerKeyId = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap();
let mut contract = DummyServiceContract::default();
contract.logs.push(ServiceTask::RetrieveShadowDocumentKeyCommon(Default::default(), server_key_id.clone(), Default::default()));
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None, None, None);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 0);
listener.process_service_contract_events();
assert_eq!(listener.data.tasks_queue.snapshot().len(), 2);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
assert_eq!(listener.data.tasks_queue.snapshot().pop_back(), Some(ServiceTask::RetrieveShadowDocumentKeyCommon(
Default::default(), server_key_id, Default::default())));
}
@ -964,7 +1018,7 @@ mod tests {
fn document_key_shadow_common_is_retrieved_when_processing_document_key_shadow_common_retrieval_task() {
let contract = Arc::new(DummyServiceContract::default());
let key_storage = create_non_empty_key_storage(true);
let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage.clone()), None);
let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage.clone()), None, None);
ServiceContractListener::process_service_task(&listener.data, ServiceTask::RetrieveShadowDocumentKeyCommon(
Default::default(), Default::default(), Default::default())).unwrap();
assert_eq!(*contract.common_shadow_retrieved_document_keys.lock(), vec![(Default::default(), Default::default(),
@ -977,7 +1031,7 @@ mod tests {
acl_storage.prohibit(Default::default(), Default::default());
let contract = Arc::new(DummyServiceContract::default());
let key_storage = create_non_empty_key_storage(true);
let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage.clone()), Some(Arc::new(acl_storage)));
let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage.clone()), Some(Arc::new(acl_storage)), None);
ServiceContractListener::process_service_task(&listener.data, ServiceTask::RetrieveShadowDocumentKeyCommon(
Default::default(), Default::default(), Default::default())).unwrap_err();
assert_eq!(*contract.document_keys_shadow_retrieval_failures.lock(), vec![(Default::default(), Default::default())]);
@ -986,7 +1040,7 @@ mod tests {
#[test]
fn document_key_shadow_common_retrieval_failure_reported_when_no_server_key() {
let contract = Arc::new(DummyServiceContract::default());
let listener = make_service_contract_listener(Some(contract.clone()), None, None, None);
let listener = make_service_contract_listener(Some(contract.clone()), None, None, None, None);
ServiceContractListener::process_service_task(&listener.data, ServiceTask::RetrieveShadowDocumentKeyCommon(
Default::default(), Default::default(), Default::default())).unwrap_err();
assert_eq!(*contract.document_keys_shadow_retrieval_failures.lock(), vec![(Default::default(), Default::default())]);
@ -996,7 +1050,7 @@ mod tests {
fn document_key_shadow_common_retrieval_failure_reported_when_no_document_key() {
let contract = Arc::new(DummyServiceContract::default());
let key_storage = create_non_empty_key_storage(false);
let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage.clone()), None);
let listener = make_service_contract_listener(Some(contract.clone()), None, Some(key_storage.clone()), None, None);
ServiceContractListener::process_service_task(&listener.data, ServiceTask::RetrieveShadowDocumentKeyCommon(
Default::default(), Default::default(), Default::default())).unwrap_err();
assert_eq!(*contract.document_keys_shadow_retrieval_failures.lock(), vec![(Default::default(), Default::default())]);

View File

@ -17,11 +17,12 @@
use std::sync::{Arc, Weak};
use bytes::Bytes;
use ethereum_types::Address;
use ethcore::client::{Client, BlockChainClient, ChainInfo, Nonce};
use ethcore::client::{Client, BlockChainClient, ChainInfo, Nonce, BlockId, RegistryInfo};
use ethcore::miner::{Miner, MinerService};
use sync::SyncProvider;
use transaction::{Transaction, SignedTransaction, Action};
use {Error, NodeKeyPair};
use helpers::{get_confirmed_block_hash, REQUEST_CONFIRMATIONS_REQUIRED};
use {Error, NodeKeyPair, ContractAddress};
#[derive(Clone)]
/// 'Trusted' client weak reference.
@ -84,6 +85,18 @@ impl TrustedClient {
let signed = SignedTransaction::new(transaction.with_signature(signature, chain_id))?;
miner.import_own_transaction(&*client, signed.into())
.map_err(|e| Error::Internal(format!("failed to import tx: {}", e)))
.map(|_| ())
}
/// Read contract address. If address source is registry, address only returned if current client state is
/// trusted. Address from registry is read from registry from block latest block with
/// REQUEST_CONFIRMATIONS_REQUIRED confirmations.
pub fn read_contract_address(&self, registry_name: String, address: &ContractAddress) -> Option<Address> {
match *address {
ContractAddress::Address(ref address) => Some(address.clone()),
ContractAddress::Registry => self.get().and_then(|client|
get_confirmed_block_hash(&*client, REQUEST_CONFIRMATIONS_REQUIRED)
.and_then(|block| client.registry_address(registry_name, BlockId::Hash(block)))
),
}
}
}

View File

@ -66,8 +66,8 @@ pub struct ServiceConfiguration {
pub service_contract_doc_store_address: Option<ContractAddress>,
/// Document key shadow retrieval service contract address.
pub service_contract_doc_sretr_address: Option<ContractAddress>,
/// Is ACL check enabled. If false, everyone has access to all keys. Useful for tests only.
pub acl_check_enabled: bool,
/// ACL check contract address. If None, everyone has access to all keys. Useful for tests only.
pub acl_check_contract_address: Option<ContractAddress>,
/// Cluster configuration.
pub cluster_config: ClusterConfiguration,
}
@ -81,6 +81,8 @@ pub struct ClusterConfiguration {
pub listener_address: NodeAddress,
/// All cluster nodes addresses.
pub nodes: BTreeMap<ethkey::Public, NodeAddress>,
/// Key Server Set contract address. If None, servers from 'nodes' map are used.
pub key_server_set_contract_address: Option<ContractAddress>,
/// Allow outbound connections to 'higher' nodes.
/// This is useful for tests, but slower a bit for production.
pub allow_connecting_to_higher_nodes: bool,