diff --git a/secret_store/src/listener/service_contract.rs b/secret_store/src/listener/service_contract.rs index 05aa7681a..bca739484 100644 --- a/secret_store/src/listener/service_contract.rs +++ b/secret_store/src/listener/service_contract.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use futures::{future, Future}; use parking_lot::RwLock; +use ethcore::filter::Filter; use ethcore::client::{Client, BlockChainClient, BlockId}; use ethkey::{Public, Signature, public_to_address}; use native_contracts::SecretStoreService; @@ -30,15 +31,24 @@ use {ServerKeyId, NodeKeyPair, ContractAddress}; /// Name of the SecretStore contract in the registry. const SERVICE_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_service"; +/// Key server has been added to the set. +const SERVER_KEY_REQUESTED_EVENT_NAME: &'static [u8] = &*b"ServerKeyRequested(bytes32,uint256)"; + /// Number of confirmations required before request can be processed. const REQUEST_CONFIRMATIONS_REQUIRED: u64 = 3; +lazy_static! { + static ref SERVER_KEY_REQUESTED_EVENT_NAME_HASH: H256 = keccak(SERVER_KEY_REQUESTED_EVENT_NAME); +} + /// Service contract trait. pub trait ServiceContract: Send + Sync { /// Update contract. fn update(&self); /// Is contract installed && up-to-date (i.e. chain is synced)? fn is_actual(&self) -> bool; + /// Read contract logs from given blocks. Returns topics of every entry. + fn read_logs(&self, first_block: H256, last_block: H256) -> Box>>; /// Publish generated key. fn read_pending_requests(&self) -> Box>; /// Publish server key. @@ -120,6 +130,34 @@ impl ServiceContract for OnChainServiceContract { && self.client.get().is_some() } + fn read_logs(&self, first_block: H256, last_block: H256) -> Box>> { + let client = match self.client.get() { + Some(client) => client, + None => { + warn!(target: "secretstore", "{}: client is offline during read_pending_requests call", + self.self_key_pair.public()); + return Box::new(::std::iter::empty()); + }, + }; + + // read server key generation requests + let contract_address = self.contract.read().address.clone(); + let request_logs = client.logs(Filter { + from_block: BlockId::Hash(first_block), + to_block: BlockId::Hash(last_block), + address: Some(vec![contract_address]), + topics: vec![ + Some(vec![*SERVER_KEY_REQUESTED_EVENT_NAME_HASH]), + None, + None, + None, + ], + limit: None, + }); + + Box::new(request_logs.into_iter().map(|log| log.entry.topics)) + } + fn read_pending_requests(&self) -> Box> { let client = match self.client.get() { Some(client) => client, @@ -258,6 +296,10 @@ pub mod tests { self.is_actual } + fn read_logs(&self, _first_block: H256, _last_block: H256) -> Box>> { + Box::new(self.logs.clone().into_iter()) + } + fn read_pending_requests(&self) -> Box> { Box::new(self.pending_requests.clone().into_iter()) } diff --git a/secret_store/src/listener/service_contract_listener.rs b/secret_store/src/listener/service_contract_listener.rs index a6327ef3c..ebf9aff58 100644 --- a/secret_store/src/listener/service_contract_listener.rs +++ b/secret_store/src/listener/service_contract_listener.rs @@ -140,6 +140,31 @@ impl ServiceContractListener { contract } + /// Process incoming events of service contract. + fn process_service_contract_events(&self, first: H256, last: H256) { + self.data.tasks_queue.push(self.data.contract.read_logs(first, last) + .filter_map(|topics| match topics.len() { + // when key is already generated && we have this key + 3 if self.data.key_storage.get(&topics[1]).map(|k| k.is_some()).unwrap_or_default() => { + Some(ServiceTask::RestoreServerKey( + topics[1], + )) + } + // when key is not yet generated && this node should be master of this key generation session + 3 if is_processed_by_this_key_server(&*self.data.key_server_set, &*self.data.self_key_pair, &topics[1]) => { + Some(ServiceTask::GenerateServerKey( + topics[1], + topics[2], + )) + }, + 3 => None, + l @ _ => { + warn!(target: "secretstore", "Ignoring ServerKeyRequested event with wrong number of params {}", l); + None + }, + })); + } + /// Service thread procedure. fn run_service_thread(data: Arc) { loop { @@ -294,14 +319,23 @@ impl Drop for ServiceContractListener { impl ChainNotify for ServiceContractListener { fn new_blocks(&self, _imported: Vec, _invalid: Vec, enacted: Vec, _retracted: Vec, _sealed: Vec, _proposed: Vec, _duration: u64) { + let enacted_len = enacted.len(); + if enacted_len == 0 { + return; + } + self.data.contract.update(); if !self.data.contract.is_actual() { return; } + let reason = "enacted.len() != 0; qed"; + self.process_service_contract_events( + enacted.first().expect(reason).clone(), + enacted.last().expect(reason).clone()); + // schedule retry if received enough blocks since last retry // it maybe inaccurate when switching syncing/synced states, but that's ok - let enacted_len = enacted.len(); if self.data.last_retry.fetch_add(enacted_len, Ordering::Relaxed) >= RETRY_INTERVAL_BLOCKS { self.data.tasks_queue.push(ServiceTask::Retry); self.data.last_retry.store(0, Ordering::Relaxed); @@ -545,6 +579,58 @@ mod tests { &"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true); } + #[test] + fn no_tasks_scheduled_when_no_contract_events() { + let listener = make_service_contract_listener(None, None, None); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); + listener.process_service_contract_events(Default::default(), Default::default()); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); + } + + #[test] + fn server_key_generation_is_scheduled_when_requested_key_is_unknnown() { + let mut contract = DummyServiceContract::default(); + contract.logs.push(vec![Default::default(), Default::default(), Default::default()]); + let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); + listener.process_service_contract_events(Default::default(), Default::default()); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 2); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().pop_back(), Some(ServiceTask::GenerateServerKey(Default::default(), Default::default()))); + } + + #[test] + fn no_new_tasks_scheduled_when_requested_key_is_unknown_and_request_belongs_to_other_key_server() { + let server_key_id = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap(); + let mut contract = DummyServiceContract::default(); + contract.logs.push(vec![Default::default(), server_key_id, Default::default()]); + let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); + listener.process_service_contract_events(Default::default(), Default::default()); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); + } + + #[test] + fn server_key_restore_is_scheduled_when_requested_key_is_knnown() { + let mut contract = DummyServiceContract::default(); + contract.logs.push(vec![Default::default(), Default::default(), Default::default()]); + let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None); + listener.data.key_storage.insert(Default::default(), Default::default()).unwrap(); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); + listener.process_service_contract_events(Default::default(), Default::default()); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 2); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().pop_back(), Some(ServiceTask::RestoreServerKey(Default::default()))); + } + + #[test] + fn no_new_tasks_scheduled_when_wrong_number_of_topics_in_log() { + let mut contract = DummyServiceContract::default(); + contract.logs.push(vec![Default::default(), Default::default()]); + let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); + listener.process_service_contract_events(Default::default(), Default::default()); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); + } + #[test] fn generation_session_is_created_when_processing_generate_server_key_task() { let key_server = Arc::new(DummyKeyServer::default());