SecretStore: fixing grumbles (part1)

This commit is contained in:
Svyatoslav Nikolsky 2017-12-20 14:50:46 +03:00
parent 5d792324e6
commit 794de9f743
9 changed files with 100 additions and 110 deletions

View File

@ -553,7 +553,7 @@ usage! {
ARG arg_secretstore_contract: (String) = "none", or |c: &Config| otry!(c.secretstore).service_contract.clone(),
"--secretstore-contract=[SOURCE]",
"Secret Store Service contract source: none, registry (contract address is read from registry) or address.",
"Secret Store Service contract address source: none, registry (contract address is read from registry) or address.",
ARG arg_secretstore_nodes: (String) = "", or |c: &Config| otry!(c.secretstore).nodes.as_ref().map(|vec| vec.join(",")),
"--secretstore-nodes=[NODES]",

View File

@ -1082,7 +1082,7 @@ impl Configuration {
Ok(match self.args.arg_secretstore_contract.as_ref() {
"none" => None,
"registry" => Some(SecretStoreContractAddress::Registry),
a @ _ => Some(SecretStoreContractAddress::Address(a.parse().map_err(|e| format!("{}", e))?)),
a => Some(SecretStoreContractAddress::Address(a.parse().map_err(|e| format!("{}", e))?)),
})
}

View File

@ -203,9 +203,8 @@ impl<T> SessionImpl<T> where T: SessionTransport {
self.core.completed.wait(&mut data);
}
data.result.as_ref()
data.result.clone()
.expect("checked above or waited for completed; completed is only signaled when result.is_some(); qed")
.clone()
}
/// Initialize session.

View File

@ -86,16 +86,8 @@ struct SessionData<T: SessionTransport> {
pub version: Option<H256>,
/// Consensus session.
pub consensus_session: Option<ShareAddChangeConsensusSession<T>>,
/// NewKeyShare: threshold.
pub key_share_threshold: Option<usize>,
/// NewKeyShare: author.
pub key_share_author: Option<Public>,
/// NewKeyShare: joint public.
pub key_share_joint_public: Option<Public>,
/// NewKeyShare: Common (shared) encryption point.
pub key_share_common_point: Option<Public>,
/// NewKeyShare: Encrypted point.
pub key_share_encrypted_point: Option<Public>,
/// NewKeyShare (for nodes being added).
pub new_key_share: Option<NewKeyShare>,
/// Nodes id numbers.
pub id_numbers: Option<BTreeMap<NodeId, Option<Secret>>>,
/// Secret subshares received from nodes.
@ -104,6 +96,20 @@ struct SessionData<T: SessionTransport> {
pub result: Option<Result<(), Error>>,
}
/// New key share.
struct NewKeyShare {
/// NewKeyShare: threshold.
pub threshold: usize,
/// NewKeyShare: author.
pub author: Public,
/// NewKeyShare: joint public.
pub joint_public: Public,
/// NewKeyShare: Common (shared) encryption point.
pub common_point: Option<Public>,
/// NewKeyShare: Encrypted point.
pub encrypted_point: Option<Public>,
}
/// Session state.
#[derive(Debug, PartialEq)]
enum SessionState {
@ -167,11 +173,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
state: SessionState::ConsensusEstablishing,
version: None,
consensus_session: None,
key_share_threshold: None,
key_share_author: None,
key_share_joint_public: None,
key_share_common_point: None,
key_share_encrypted_point: None,
new_key_share: None,
id_numbers: None,
secret_subshares: None,
result: None,
@ -427,9 +429,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
}
// we only expect this message once
if data.key_share_threshold.is_some() || data.key_share_author.is_some() ||
data.key_share_common_point.is_some() || data.key_share_encrypted_point.is_some() ||
data.key_share_joint_public.is_some() {
if data.new_key_share.is_some() {
return Err(Error::InvalidStateForRequest);
}
@ -444,11 +444,13 @@ impl<T> SessionImpl<T> where T: SessionTransport {
// update data
data.state = SessionState::WaitingForKeysDissemination;
data.key_share_threshold = Some(message.threshold);
data.key_share_author = Some(message.author.clone().into());
data.key_share_joint_public = Some(message.joint_public.clone().into());
data.key_share_common_point = message.common_point.clone().map(Into::into);
data.key_share_encrypted_point = message.encrypted_point.clone().map(Into::into);
data.new_key_share = Some(NewKeyShare {
threshold: message.threshold,
author: message.author.clone().into(),
joint_public: message.joint_public.clone().into(),
common_point: message.common_point.clone().map(Into::into),
encrypted_point: message.encrypted_point.clone().map(Into::into),
});
let id_numbers = data.id_numbers.as_mut()
.expect("common key share data is expected after initialization; id_numers are filled during initialization; qed");
@ -667,8 +669,9 @@ impl<T> SessionImpl<T> where T: SessionTransport {
let id_numbers = data.id_numbers.as_ref().expect(explanation);
let secret_subshares = data.secret_subshares.as_ref().expect(explanation);
let threshold = core.key_share.as_ref().map(|ks| ks.threshold)
.unwrap_or_else(|| *data.key_share_threshold.as_ref()
.expect("computation occurs after receiving key share threshold if not having one already; qed"));
.unwrap_or_else(|| data.new_key_share.as_ref()
.expect("computation occurs after receiving key share threshold if not having one already; qed")
.threshold);
let explanation = "id_numbers are checked to have Some value for every consensus group node when consensus is establishe; qed";
let sender_id_number = id_numbers[sender].as_ref().expect(explanation);
@ -694,16 +697,17 @@ impl<T> SessionImpl<T> where T: SessionTransport {
let refreshed_key_version = DocumentKeyShareVersion::new(id_numbers.clone().into_iter().map(|(k, v)| (k.clone(),
v.expect("id_numbers are checked to have Some value for every consensus group node when consensus is establishe; qed"))).collect(),
secret_share);
let mut refreshed_key_share = core.key_share.as_ref().cloned().unwrap_or_else(|| DocumentKeyShare {
author: data.key_share_author.clone()
.expect("this is new node; on new nodes this field is filled before KRD; session is completed after KRD; qed"),
threshold: data.key_share_threshold.clone()
.expect("this is new node; on new nodes this field is filled before KRD; session is completed after KRD; qed"),
public: data.key_share_joint_public.clone()
.expect("this is new node; on new nodes this field is filled before KRD; session is completed after KRD; qed"),
common_point: data.key_share_common_point.clone(),
encrypted_point: data.key_share_encrypted_point.clone(),
versions: Vec::new(),
let mut refreshed_key_share = core.key_share.as_ref().cloned().unwrap_or_else(|| {
let new_key_share = data.new_key_share.as_ref()
.expect("this is new node; on new nodes this field is filled before KRD; session is completed after KRD; qed");
DocumentKeyShare {
author: new_key_share.author.clone(),
threshold: new_key_share.threshold,
public: new_key_share.joint_public.clone(),
common_point: new_key_share.common_point.clone(),
encrypted_point: new_key_share.encrypted_point.clone(),
versions: Vec::new(),
}
});
refreshed_key_share.versions.push(refreshed_key_version);

View File

@ -294,25 +294,7 @@ impl<S, SC, D> ClusterSessionsContainer<S, SC, D> where S: ClusterSession, SC: C
queue: VecDeque::new(),
};
sessions.insert(session_id, queued_session);
// notify listeners
let mut listeners = self.listeners.lock();
let mut listener_index = 0;
loop {
if listener_index >= listeners.len() {
break;
}
match listeners[listener_index].upgrade() {
Some(listener) => {
listener.on_session_inserted(session.clone());
listener_index += 1;
},
None => {
listeners.swap_remove(listener_index);
},
}
}
self.notify_listeners(|l| l.on_session_inserted(session.clone()));
Ok(session)
}
@ -320,25 +302,7 @@ impl<S, SC, D> ClusterSessionsContainer<S, SC, D> where S: ClusterSession, SC: C
pub fn remove(&self, session_id: &S::Id) {
if let Some(session) = self.sessions.write().remove(session_id) {
self.container_state.lock().on_session_completed();
// notify listeners
let mut listeners = self.listeners.lock();
let mut listener_index = 0;
loop {
if listener_index >= listeners.len() {
break;
}
match listeners[listener_index].upgrade() {
Some(listener) => {
listener.on_session_removed(session.session.clone());
listener_index += 1;
},
None => {
listeners.swap_remove(listener_index);
},
}
}
self.notify_listeners(|l| l.on_session_removed(session.session.clone()));
}
}
@ -385,6 +349,22 @@ impl<S, SC, D> ClusterSessionsContainer<S, SC, D> where S: ClusterSession, SC: C
}
}
}
fn notify_listeners<F: Fn(&ClusterSessionsListener<S>) -> ()>(&self, callback: F) {
let mut listeners = self.listeners.lock();
let mut listener_index = 0;
while listener_index < listeners.len() {
match listeners[listener_index].upgrade() {
Some(listener) => {
callback(&*listener);
listener_index += 1;
},
None => {
listeners.swap_remove(listener_index);
},
}
}
}
}
impl<S, SC, D> ClusterSessionsContainer<S, SC, D> where S: ClusterSession, SC: ClusterSessionCreator<S, D>, SessionId: From<S::Id> {

View File

@ -77,7 +77,7 @@ pub fn start(client: Arc<Client>, sync: Arc<SyncProvider>, self_key_pair: Arc<No
} else {
Arc::new(acl_storage::DummyAclStorage::default())
};
let key_server_set = key_server_set::OnChainKeyServerSet::new(&client, &sync, config.cluster_config.nodes.clone())?; // TODO: return empty set until fully synced
let key_server_set = key_server_set::OnChainKeyServerSet::new(&client, &sync, config.cluster_config.nodes.clone())?;
let key_storage = Arc::new(key_storage::PersistentKeyStorage::new(&config)?);
let key_server = Arc::new(key_server::KeyServerImpl::new(&config.cluster_config, key_server_set.clone(), self_key_pair.clone(), acl_storage, key_storage.clone())?);
let cluster = key_server.cluster();

View File

@ -87,8 +87,8 @@ impl KeyServerHttpListener {
let listener_address = format!("{}:{}", listener_address.address, listener_address.port);
let http_server = HttpServer::http(&listener_address).expect("cannot start HttpServer");
let http_server = http_server.handle(KeyServerHttpHandler {
handler: shared_handler.clone(),
}).expect("cannot start HttpServer");
handler: shared_handler.clone(),
}).expect("cannot start HttpServer");
let listener = KeyServerHttpListener {
http_server: http_server,

View File

@ -83,15 +83,15 @@ struct PendingRequestsIterator {
impl OnChainServiceContract {
/// Create new on-chain service contract.
pub fn new(client: &Arc<Client>, sync: &Arc<SyncProvider>, address: ContractAddress, self_key_pair: Arc<NodeKeyPair>) -> Self {
let contract_addr = match &address {
&ContractAddress::Registry => client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned())
let contract_addr = match address {
ContractAddress::Registry => client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned())
.map(|address| {
trace!(target: "secretstore", "{}: installing service contract from address {}",
self_key_pair.public(), address);
address
})
.unwrap_or_default(),
&ContractAddress::Address(ref address) => {
ContractAddress::Address(ref address) => {
trace!(target: "secretstore", "{}: installing service contract from address {}",
self_key_pair.public(), address);
address.clone()
@ -231,12 +231,10 @@ impl ServiceContract for OnChainServiceContract {
)?;
// send transaction
if contract.address != Default::default() {
client.transact_contract(
contract.address.clone(),
transaction_data
).map_err(|e| format!("{}", e))?;
}
client.transact_contract(
contract.address.clone(),
transaction_data
).map_err(|e| format!("{}", e))?;
Ok(())
}

View File

@ -31,11 +31,11 @@ use key_storage::KeyStorage;
use listener::service_contract::ServiceContract;
use {ServerKeyId, NodeKeyPair, KeyServer};
/// Retry interval (in blocks). Every RETRY_INTEVAL_BLOCKS blocks each KeyServer reads pending requests from
/// Retry interval (in blocks). Every RETRY_INTERVAL_BLOCKS blocks each KeyServer reads pending requests from
/// service contract && tries to re-execute. The reason to have this mechanism is primarily because keys
/// servers set change takes a lot of time + there could be some races, when blocks are coming to different
/// KS at different times. This isn't intended to fix && respond to general session errors!
const RETRY_INTEVAL_BLOCKS: usize = 30;
const RETRY_INTERVAL_BLOCKS: usize = 30;
/// Max failed retry requests (in single retry interval). The reason behind this constant is that if several
/// pending requests have failed, then most probably other will fail too.
@ -255,7 +255,7 @@ impl ServiceContractListener {
// only process request, which haven't been processed recently
// there could be a lag when we've just generated server key && retrying on the same block
// (or before our tx is mined) - state is not updated yet
if retry_data.generated_keys.contains(&server_key_id){
if retry_data.generated_keys.contains(&server_key_id) {
continue;
}
@ -343,7 +343,7 @@ impl ChainNotify for ServiceContractListener {
// schedule retry if received enough blocks since last retry
// it maybe inaccurate when switching syncing/synced states, but that's ok
if self.data.last_retry.fetch_add(enacted_len, Ordering::Relaxed) >= RETRY_INTEVAL_BLOCKS {
if self.data.last_retry.fetch_add(enacted_len, Ordering::Relaxed) >= RETRY_INTERVAL_BLOCKS {
self.data.tasks_queue.push(::std::iter::once(ServiceTask::Retry));
self.data.last_retry.store(0, Ordering::Relaxed);
}
@ -356,22 +356,17 @@ impl ClusterSessionsListener<GenerationSession> for ServiceContractListener {
// when it is started by this node, it is published from process_service_task
if !is_processed_by_this_key_server(&*self.data.key_server_set, &*self.data.self_key_pair, &session.id()) {
// by this time sesion must already be completed - either successfully, or not
debug_assert!(session.is_finished());
assert!(session.is_finished());
// ignore result - the only thing that we can do is to log the error
let _ = session.wait(Some(Default::default()))
match session.wait(Some(Default::default()))
.map_err(|e| format!("{}", e))
.and_then(|server_key| Self::publish_server_key(&self.data, &session.id(), &server_key))
.map(|_| {
trace!(target: "secretstore", "{}: completed foreign GenerateServerKey({}) request",
self.data.self_key_pair.public(), session.id());
()
})
.map_err(|error| {
warn!(target: "secretstore", "{}: failed to process GenerateServerKey({}) request with: {}",
self.data.self_key_pair.public(), session.id(), error);
error
});
.and_then(|server_key| Self::publish_server_key(&self.data, &session.id(), &server_key)) {
Ok(_) => trace!(target: "secretstore", "{}: completed foreign GenerateServerKey({}) request",
self.data.self_key_pair.public(), session.id()),
Err(error) => warn!(target: "secretstore", "{}: failed to process GenerateServerKey({}) request with: {}",
self.data.self_key_pair.public(), session.id(), error),
}
}
}
}
@ -417,9 +412,12 @@ impl TasksQueue {
fn is_processed_by_this_key_server(key_server_set: &KeyServerSet, self_key_pair: &NodeKeyPair, server_key_id: &H256) -> bool {
let servers = key_server_set.get();
let total_servers_count = servers.len();
if total_servers_count == 0 {
return false;
match total_servers_count {
0 => return false,
1 => return true,
_ => (),
}
let this_server_index = match servers.keys().enumerate().find(|&(_, s)| s == self_key_pair.public()) {
Some((index, _)) => index,
None => return false,
@ -480,13 +478,24 @@ mod tests {
}
#[test]
fn is_not_processed_by_this_key_server_when_not_a_part_of_servers_set() {
fn is_processed_by_this_key_server_with_single_server() {
let self_key_pair = Random.generate().unwrap();
assert_eq!(is_processed_by_this_key_server(
&MapKeyServerSet::new(vec![
(self_key_pair.public().clone(), "127.0.0.1:8080".parse().unwrap())
].into_iter().collect()),
&PlainNodeKeyPair::new(self_key_pair),
&Default::default()), true);
}
#[test]
fn is_not_processed_by_this_key_server_when_not_a_part_of_servers_set() {
assert!(is_processed_by_this_key_server(
&MapKeyServerSet::new(vec![
(Random.generate().unwrap().public().clone(), "127.0.0.1:8080".parse().unwrap())
].into_iter().collect()),
&PlainNodeKeyPair::new(Random.generate().unwrap()),
&Default::default()), false);
&Default::default()));
}
#[test]