Revert "SecretStore: get rid of read_logs in ServiceContract"

This reverts commit 6efca8860a.
This commit is contained in:
Svyatoslav Nikolsky 2017-12-21 11:44:55 +03:00
parent ee1ce42546
commit ff094e0a03
2 changed files with 129 additions and 1 deletions

View File

@ -17,6 +17,7 @@
use std::sync::Arc;
use futures::{future, Future};
use parking_lot::RwLock;
use ethcore::filter::Filter;
use ethcore::client::{Client, BlockChainClient, BlockId};
use ethkey::{Public, Signature, public_to_address};
use native_contracts::SecretStoreService;
@ -30,15 +31,24 @@ use {ServerKeyId, NodeKeyPair, ContractAddress};
/// Name of the SecretStore contract in the registry.
const SERVICE_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_service";
/// Key server has been added to the set.
const SERVER_KEY_REQUESTED_EVENT_NAME: &'static [u8] = &*b"ServerKeyRequested(bytes32,uint256)";
/// Number of confirmations required before request can be processed.
const REQUEST_CONFIRMATIONS_REQUIRED: u64 = 3;
lazy_static! {
static ref SERVER_KEY_REQUESTED_EVENT_NAME_HASH: H256 = keccak(SERVER_KEY_REQUESTED_EVENT_NAME);
}
/// Service contract trait.
pub trait ServiceContract: Send + Sync {
/// Update contract.
fn update(&self);
/// Is contract installed && up-to-date (i.e. chain is synced)?
fn is_actual(&self) -> bool;
/// Read contract logs from given blocks. Returns topics of every entry.
fn read_logs(&self, first_block: H256, last_block: H256) -> Box<Iterator<Item=Vec<H256>>>;
/// Publish generated key.
fn read_pending_requests(&self) -> Box<Iterator<Item=(bool, ServiceTask)>>;
/// Publish server key.
@ -120,6 +130,34 @@ impl ServiceContract for OnChainServiceContract {
&& self.client.get().is_some()
}
fn read_logs(&self, first_block: H256, last_block: H256) -> Box<Iterator<Item=Vec<H256>>> {
let client = match self.client.get() {
Some(client) => client,
None => {
warn!(target: "secretstore", "{}: client is offline during read_pending_requests call",
self.self_key_pair.public());
return Box::new(::std::iter::empty());
},
};
// read server key generation requests
let contract_address = self.contract.read().address.clone();
let request_logs = client.logs(Filter {
from_block: BlockId::Hash(first_block),
to_block: BlockId::Hash(last_block),
address: Some(vec![contract_address]),
topics: vec![
Some(vec![*SERVER_KEY_REQUESTED_EVENT_NAME_HASH]),
None,
None,
None,
],
limit: None,
});
Box::new(request_logs.into_iter().map(|log| log.entry.topics))
}
fn read_pending_requests(&self) -> Box<Iterator<Item=(bool, ServiceTask)>> {
let client = match self.client.get() {
Some(client) => client,
@ -258,6 +296,10 @@ pub mod tests {
self.is_actual
}
fn read_logs(&self, _first_block: H256, _last_block: H256) -> Box<Iterator<Item=Vec<H256>>> {
Box::new(self.logs.clone().into_iter())
}
fn read_pending_requests(&self) -> Box<Iterator<Item=(bool, ServiceTask)>> {
Box::new(self.pending_requests.clone().into_iter())
}

View File

@ -140,6 +140,31 @@ impl ServiceContractListener {
contract
}
/// Process incoming events of service contract.
fn process_service_contract_events(&self, first: H256, last: H256) {
self.data.tasks_queue.push(self.data.contract.read_logs(first, last)
.filter_map(|topics| match topics.len() {
// when key is already generated && we have this key
3 if self.data.key_storage.get(&topics[1]).map(|k| k.is_some()).unwrap_or_default() => {
Some(ServiceTask::RestoreServerKey(
topics[1],
))
}
// when key is not yet generated && this node should be master of this key generation session
3 if is_processed_by_this_key_server(&*self.data.key_server_set, &*self.data.self_key_pair, &topics[1]) => {
Some(ServiceTask::GenerateServerKey(
topics[1],
topics[2],
))
},
3 => None,
l @ _ => {
warn!(target: "secretstore", "Ignoring ServerKeyRequested event with wrong number of params {}", l);
None
},
}));
}
/// Service thread procedure.
fn run_service_thread(data: Arc<ServiceContractListenerData>) {
loop {
@ -294,14 +319,23 @@ impl Drop for ServiceContractListener {
impl ChainNotify for ServiceContractListener {
fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, enacted: Vec<H256>, _retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: u64) {
let enacted_len = enacted.len();
if enacted_len == 0 {
return;
}
self.data.contract.update();
if !self.data.contract.is_actual() {
return;
}
let reason = "enacted.len() != 0; qed";
self.process_service_contract_events(
enacted.first().expect(reason).clone(),
enacted.last().expect(reason).clone());
// schedule retry if received enough blocks since last retry
// it maybe inaccurate when switching syncing/synced states, but that's ok
let enacted_len = enacted.len();
if self.data.last_retry.fetch_add(enacted_len, Ordering::Relaxed) >= RETRY_INTERVAL_BLOCKS {
self.data.tasks_queue.push(ServiceTask::Retry);
self.data.last_retry.store(0, Ordering::Relaxed);
@ -545,6 +579,58 @@ mod tests {
&"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true);
}
#[test]
fn no_tasks_scheduled_when_no_contract_events() {
let listener = make_service_contract_listener(None, None, None);
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
listener.process_service_contract_events(Default::default(), Default::default());
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
}
#[test]
fn server_key_generation_is_scheduled_when_requested_key_is_unknnown() {
let mut contract = DummyServiceContract::default();
contract.logs.push(vec![Default::default(), Default::default(), Default::default()]);
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None);
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
listener.process_service_contract_events(Default::default(), Default::default());
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 2);
assert_eq!(listener.data.tasks_queue.service_tasks.lock().pop_back(), Some(ServiceTask::GenerateServerKey(Default::default(), Default::default())));
}
#[test]
fn no_new_tasks_scheduled_when_requested_key_is_unknown_and_request_belongs_to_other_key_server() {
let server_key_id = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap();
let mut contract = DummyServiceContract::default();
contract.logs.push(vec![Default::default(), server_key_id, Default::default()]);
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None);
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
listener.process_service_contract_events(Default::default(), Default::default());
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
}
#[test]
fn server_key_restore_is_scheduled_when_requested_key_is_knnown() {
let mut contract = DummyServiceContract::default();
contract.logs.push(vec![Default::default(), Default::default(), Default::default()]);
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None);
listener.data.key_storage.insert(Default::default(), Default::default()).unwrap();
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
listener.process_service_contract_events(Default::default(), Default::default());
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 2);
assert_eq!(listener.data.tasks_queue.service_tasks.lock().pop_back(), Some(ServiceTask::RestoreServerKey(Default::default())));
}
#[test]
fn no_new_tasks_scheduled_when_wrong_number_of_topics_in_log() {
let mut contract = DummyServiceContract::default();
contract.logs.push(vec![Default::default(), Default::default()]);
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None);
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
listener.process_service_contract_events(Default::default(), Default::default());
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
}
#[test]
fn generation_session_is_created_when_processing_generate_server_key_task() {
let key_server = Arc::new(DummyKeyServer::default());