diff --git a/secret_store/src/listener/service_contract.rs b/secret_store/src/listener/service_contract.rs index bca739484..c49bf49b7 100644 --- a/secret_store/src/listener/service_contract.rs +++ b/secret_store/src/listener/service_contract.rs @@ -43,12 +43,10 @@ lazy_static! { /// Service contract trait. pub trait ServiceContract: Send + Sync { - /// Update contract. - fn update(&self); - /// Is contract installed && up-to-date (i.e. chain is synced)? - fn is_actual(&self) -> bool; - /// Read contract logs from given blocks. Returns topics of every entry. - fn read_logs(&self, first_block: H256, last_block: H256) -> Box>>; + /// Update contract when new blocks are enacted. Returns true if contract is installed && up-to-date (i.e. chain is synced). + fn update(&self) -> bool; + /// Read recent contract logs. Returns topics of every entry. + fn read_logs(&self) -> Box>>; /// Publish generated key. fn read_pending_requests(&self) -> Box>; /// Publish server key. @@ -64,7 +62,15 @@ pub struct OnChainServiceContract { /// Contract addresss. address: ContractAddress, /// Contract. - contract: RwLock>, + data: RwLock, +} + +/// On-chain service contract data. +struct SecretStoreServiceData { + /// Contract. + pub contract: Arc, + /// Last block we have read logs from. + pub last_log_block: Option, } /// Pending requests iterator. @@ -105,47 +111,72 @@ impl OnChainServiceContract { client: client, self_key_pair: self_key_pair, address: address, - contract: RwLock::new(Arc::new(SecretStoreService::new(contract_addr))), + data: RwLock::new(SecretStoreServiceData { + contract: Arc::new(SecretStoreService::new(contract_addr)), + last_log_block: None, + }), } } } impl ServiceContract for OnChainServiceContract { - fn update(&self) { + fn update(&self) -> bool { + // TODO [Sec]: registry_address currently reads from BlockId::Latest, instead of + // from block with REQUEST_CONFIRMATIONS_REQUIRED confirmations if let &ContractAddress::Registry = &self.address { if let Some(client) = self.client.get() { // update contract address from registry let service_contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()).unwrap_or_default(); - if self.contract.read().address != service_contract_addr { + if self.data.read().contract.address != service_contract_addr { trace!(target: "secretstore", "{}: installing service contract from address {}", self.self_key_pair.public(), service_contract_addr); - *self.contract.write() = Arc::new(SecretStoreService::new(service_contract_addr)); + self.data.write().contract = Arc::new(SecretStoreService::new(service_contract_addr)); } } } - } - fn is_actual(&self) -> bool { - self.contract.read().address != Default::default() + self.data.read().contract.address != Default::default() && self.client.get().is_some() } - fn read_logs(&self, first_block: H256, last_block: H256) -> Box>> { + fn read_logs(&self) -> Box>> { let client = match self.client.get() { Some(client) => client, None => { - warn!(target: "secretstore", "{}: client is offline during read_pending_requests call", + warn!(target: "secretstore", "{}: client is offline during read_logs call", self.self_key_pair.public()); return Box::new(::std::iter::empty()); }, }; + // prepare range of blocks to read logs from + let (address, first_block, last_block) = { + let mut data = self.data.write(); + let address = data.contract.address.clone(); + let confirmed_block = match get_confirmed_block_hash(&*client, REQUEST_CONFIRMATIONS_REQUIRED) { + Some(confirmed_block) => confirmed_block, + None => return Box::new(::std::iter::empty()), // no block with enough confirmations + }; + let first_block = match data.last_log_block.take().and_then(|b| client.tree_route(&b, &confirmed_block)) { + // if we have a route from last_log_block to confirmed_block => search for logs on this route + // + // potentially this could lead us to reading same logs twice when reorganizing to the fork, which + // already has been canonical previosuly + // the worst thing that can happen in this case is spending some time reading unneeded data from SS db + Some(ref route) if route.index < route.blocks.len() => route.blocks[route.index], + // else we care only about confirmed block + _ => confirmed_block.clone(), + }; + + data.last_log_block = Some(confirmed_block.clone()); + (address, first_block, confirmed_block) + }; + // read server key generation requests - let contract_address = self.contract.read().address.clone(); let request_logs = client.logs(Filter { - from_block: BlockId::Hash(first_block), - to_block: BlockId::Hash(last_block), - address: Some(vec![contract_address]), + from_block: BlockId::Hash(first_block.clone()), + to_block: BlockId::Hash(last_block.clone()), + address: Some(vec![address]), topics: vec![ Some(vec![*SERVER_KEY_REQUESTED_EVENT_NAME_HASH]), None, @@ -155,6 +186,9 @@ impl ServiceContract for OnChainServiceContract { limit: None, }); + trace!(target: "secretstore", "{}: read {} events from service contract in blocks {}..{}", + self.self_key_pair.public(), request_logs.len(), first_block, last_block); + Box::new(request_logs.into_iter().map(|log| log.entry.topics)) } @@ -168,15 +202,15 @@ impl ServiceContract for OnChainServiceContract { }, }; - let contract = self.contract.read(); - match contract.address == Default::default() { + // we only need requests that are here for more than REQUEST_CONFIRMATIONS_REQUIRED blocks + // => we're reading from Latest - (REQUEST_CONFIRMATIONS_REQUIRED + 1) block + let data = self.data.read(); + match data.contract.address == Default::default() { true => Box::new(::std::iter::empty()), - false => client.block_number(BlockId::Latest) - .and_then(|b| b.checked_sub(REQUEST_CONFIRMATIONS_REQUIRED)) - .and_then(|b| client.block_hash(BlockId::Number(b))) + false => get_confirmed_block_hash(&*client, REQUEST_CONFIRMATIONS_REQUIRED + 1) .and_then(|b| { let do_call = |a, d| future::done(client.call_contract(BlockId::Hash(b.clone()), a, d)); - contract.server_key_generation_requests_count(&do_call).wait() + data.contract.server_key_generation_requests_count(&do_call).wait() .map_err(|error| { warn!(target: "secretstore", "{}: call to server_key_generation_requests_count failed: {}", self.self_key_pair.public(), error); @@ -187,7 +221,7 @@ impl ServiceContract for OnChainServiceContract { }) .map(|(b, l)| Box::new(PendingRequestsIterator { client: client, - contract: contract.clone(), + contract: data.contract.clone(), self_key_pair: self.self_key_pair.clone(), block: b, index: 0.into(), @@ -199,8 +233,8 @@ impl ServiceContract for OnChainServiceContract { fn publish_server_key(&self, server_key_id: &ServerKeyId, server_key: &Public) -> Result<(), String> { // only publish if contract address is set && client is online - let contract = self.contract.read(); - if contract.address == Default::default() { + let data = self.data.read(); + if data.contract.address == Default::default() { // it is not an error, because key could be generated even without contract return Ok(()); } @@ -215,7 +249,7 @@ impl ServiceContract for OnChainServiceContract { // or key has been requested using HTTP API let do_call = |a, d| future::done(client.call_contract(BlockId::Latest, a, d)); let self_address = public_to_address(self.self_key_pair.public()); - if contract.get_server_key_confirmation_status(&do_call, server_key_id.clone(), self_address).wait().unwrap_or(false) { + if data.contract.get_server_key_confirmation_status(&do_call, server_key_id.clone(), self_address).wait().unwrap_or(false) { return Ok(()); } @@ -223,7 +257,7 @@ impl ServiceContract for OnChainServiceContract { let server_key_hash = keccak(server_key); let signed_server_key = self.self_key_pair.sign(&server_key_hash).map_err(|e| format!("{}", e))?; let signed_server_key: Signature = signed_server_key.into_electrum().into(); - let transaction_data = contract.encode_server_key_generated_input(server_key_id.clone(), + let transaction_data = data.contract.encode_server_key_generated_input(server_key_id.clone(), server_key.to_vec(), signed_server_key.v(), signed_server_key.r().into(), @@ -232,7 +266,7 @@ impl ServiceContract for OnChainServiceContract { // send transaction client.transact_contract( - contract.address.clone(), + data.contract.address.clone(), transaction_data ).map_err(|e| format!("{}", e))?; @@ -271,6 +305,13 @@ impl Iterator for PendingRequestsIterator { } } +/// Get hash of the last block with at least n confirmations. +fn get_confirmed_block_hash(client: &Client, confirmations: u64) -> Option { + client.block_number(BlockId::Latest) + .and_then(|b| b.checked_sub(confirmations)) + .and_then(|b| client.block_hash(BlockId::Number(b))) +} + #[cfg(test)] pub mod tests { use parking_lot::Mutex; @@ -289,14 +330,11 @@ pub mod tests { } impl ServiceContract for DummyServiceContract { - fn update(&self) { + fn update(&self) -> bool { + true } - fn is_actual(&self) -> bool { - self.is_actual - } - - fn read_logs(&self, _first_block: H256, _last_block: H256) -> Box>> { + fn read_logs(&self) -> Box>> { Box::new(self.logs.clone().into_iter()) } diff --git a/secret_store/src/listener/service_contract_listener.rs b/secret_store/src/listener/service_contract_listener.rs index ebf9aff58..9031461e9 100644 --- a/secret_store/src/listener/service_contract_listener.rs +++ b/secret_store/src/listener/service_contract_listener.rs @@ -141,8 +141,8 @@ impl ServiceContractListener { } /// Process incoming events of service contract. - fn process_service_contract_events(&self, first: H256, last: H256) { - self.data.tasks_queue.push(self.data.contract.read_logs(first, last) + fn process_service_contract_events(&self) { + self.data.tasks_queue.push_many(self.data.contract.read_logs() .filter_map(|topics| match topics.len() { // when key is already generated && we have this key 3 if self.data.key_storage.get(&topics[1]).map(|k| k.is_some()).unwrap_or_default() => { @@ -324,15 +324,11 @@ impl ChainNotify for ServiceContractListener { return; } - self.data.contract.update(); - if !self.data.contract.is_actual() { + if !self.data.contract.update() { return; } - let reason = "enacted.len() != 0; qed"; - self.process_service_contract_events( - enacted.first().expect(reason).clone(), - enacted.last().expect(reason).clone()); + self.process_service_contract_events(); // schedule retry if received enough blocks since last retry // it maybe inaccurate when switching syncing/synced states, but that's ok @@ -582,9 +578,9 @@ mod tests { #[test] fn no_tasks_scheduled_when_no_contract_events() { let listener = make_service_contract_listener(None, None, None); - assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); - listener.process_service_contract_events(Default::default(), Default::default()); - assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); + listener.process_service_contract_events(); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); } #[test] @@ -592,10 +588,10 @@ mod tests { let mut contract = DummyServiceContract::default(); contract.logs.push(vec![Default::default(), Default::default(), Default::default()]); let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None); - assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); - listener.process_service_contract_events(Default::default(), Default::default()); - assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 2); - assert_eq!(listener.data.tasks_queue.service_tasks.lock().pop_back(), Some(ServiceTask::GenerateServerKey(Default::default(), Default::default()))); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); + listener.process_service_contract_events(); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 2); + assert_eq!(listener.data.tasks_queue.snapshot().pop_back(), Some(ServiceTask::GenerateServerKey(Default::default(), Default::default()))); } #[test] @@ -604,9 +600,9 @@ mod tests { let mut contract = DummyServiceContract::default(); contract.logs.push(vec![Default::default(), server_key_id, Default::default()]); let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None); - assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); - listener.process_service_contract_events(Default::default(), Default::default()); - assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); + listener.process_service_contract_events(); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); } #[test] @@ -615,10 +611,10 @@ mod tests { contract.logs.push(vec![Default::default(), Default::default(), Default::default()]); let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None); listener.data.key_storage.insert(Default::default(), Default::default()).unwrap(); - assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); - listener.process_service_contract_events(Default::default(), Default::default()); - assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 2); - assert_eq!(listener.data.tasks_queue.service_tasks.lock().pop_back(), Some(ServiceTask::RestoreServerKey(Default::default()))); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); + listener.process_service_contract_events(); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 2); + assert_eq!(listener.data.tasks_queue.snapshot().pop_back(), Some(ServiceTask::RestoreServerKey(Default::default()))); } #[test] @@ -626,9 +622,9 @@ mod tests { let mut contract = DummyServiceContract::default(); contract.logs.push(vec![Default::default(), Default::default()]); let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None); - assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); - listener.process_service_contract_events(Default::default(), Default::default()); - assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); + listener.process_service_contract_events(); + assert_eq!(listener.data.tasks_queue.snapshot().len(), 1); } #[test] diff --git a/secret_store/src/listener/tasks_queue.rs b/secret_store/src/listener/tasks_queue.rs index f313c8431..8e4700dbb 100644 --- a/secret_store/src/listener/tasks_queue.rs +++ b/secret_store/src/listener/tasks_queue.rs @@ -19,14 +19,14 @@ use parking_lot::{Mutex, Condvar}; #[derive(Default)] /// Service tasks queue. -pub struct TasksQueue { +pub struct TasksQueue { /// Service event. service_event: Condvar, /// Service tasks queue. service_tasks: Mutex>, } -impl TasksQueue { +impl TasksQueue where Task: Clone { /// Create new tasks queue. pub fn new() -> Self { TasksQueue { @@ -35,6 +35,12 @@ impl TasksQueue { } } + #[cfg(test)] + /// Get current tasks snapshot. + pub fn snapshot(&self) -> VecDeque { + self.service_tasks.lock().clone() + } + /// Push task to the front of queue. pub fn push_front(&self, task: Task) { let mut service_tasks = self.service_tasks.lock(); @@ -49,7 +55,17 @@ impl TasksQueue { self.service_event.notify_all(); } - /// Wait for new task. + /// Push task to the back of queue. + pub fn push_many>(&self, tasks: I) { + let mut service_tasks = self.service_tasks.lock(); + let previous_len = service_tasks.len(); + service_tasks.extend(tasks); + if service_tasks.len() != previous_len { + self.service_event.notify_all(); + } + } + + /// Wait for new task (task is removed from the front of queue). pub fn wait(&self) -> Task { let mut service_tasks = self.service_tasks.lock(); if service_tasks.is_empty() { @@ -57,6 +73,6 @@ impl TasksQueue { } service_tasks.pop_front() - .expect("service_event is only fired when there are new tasks or is_shutdown == true; is_shutdown == false; qed") + .expect("service_event is only fired when there are new tasks; qed") } }