SecretStore: reorganize service contract read

This commit is contained in:
Svyatoslav Nikolsky 2017-12-21 16:19:15 +03:00
parent ff094e0a03
commit 9104d4673c
3 changed files with 117 additions and 67 deletions

View File

@ -43,12 +43,10 @@ lazy_static! {
/// Service contract trait.
pub trait ServiceContract: Send + Sync {
/// Update contract.
fn update(&self);
/// Is contract installed && up-to-date (i.e. chain is synced)?
fn is_actual(&self) -> bool;
/// Read contract logs from given blocks. Returns topics of every entry.
fn read_logs(&self, first_block: H256, last_block: H256) -> Box<Iterator<Item=Vec<H256>>>;
/// Update contract when new blocks are enacted. Returns true if contract is installed && up-to-date (i.e. chain is synced).
fn update(&self) -> bool;
/// Read recent contract logs. Returns topics of every entry.
fn read_logs(&self) -> Box<Iterator<Item=Vec<H256>>>;
/// Publish generated key.
fn read_pending_requests(&self) -> Box<Iterator<Item=(bool, ServiceTask)>>;
/// Publish server key.
@ -64,7 +62,15 @@ pub struct OnChainServiceContract {
/// Contract addresss.
address: ContractAddress,
/// Contract.
contract: RwLock<Arc<SecretStoreService>>,
data: RwLock<SecretStoreServiceData>,
}
/// On-chain service contract data.
struct SecretStoreServiceData {
/// Contract.
pub contract: Arc<SecretStoreService>,
/// Last block we have read logs from.
pub last_log_block: Option<H256>,
}
/// Pending requests iterator.
@ -105,47 +111,72 @@ impl OnChainServiceContract {
client: client,
self_key_pair: self_key_pair,
address: address,
contract: RwLock::new(Arc::new(SecretStoreService::new(contract_addr))),
data: RwLock::new(SecretStoreServiceData {
contract: Arc::new(SecretStoreService::new(contract_addr)),
last_log_block: None,
}),
}
}
}
impl ServiceContract for OnChainServiceContract {
fn update(&self) {
fn update(&self) -> bool {
// TODO [Sec]: registry_address currently reads from BlockId::Latest, instead of
// from block with REQUEST_CONFIRMATIONS_REQUIRED confirmations
if let &ContractAddress::Registry = &self.address {
if let Some(client) = self.client.get() {
// update contract address from registry
let service_contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()).unwrap_or_default();
if self.contract.read().address != service_contract_addr {
if self.data.read().contract.address != service_contract_addr {
trace!(target: "secretstore", "{}: installing service contract from address {}",
self.self_key_pair.public(), service_contract_addr);
*self.contract.write() = Arc::new(SecretStoreService::new(service_contract_addr));
self.data.write().contract = Arc::new(SecretStoreService::new(service_contract_addr));
}
}
}
}
fn is_actual(&self) -> bool {
self.contract.read().address != Default::default()
self.data.read().contract.address != Default::default()
&& self.client.get().is_some()
}
fn read_logs(&self, first_block: H256, last_block: H256) -> Box<Iterator<Item=Vec<H256>>> {
fn read_logs(&self) -> Box<Iterator<Item=Vec<H256>>> {
let client = match self.client.get() {
Some(client) => client,
None => {
warn!(target: "secretstore", "{}: client is offline during read_pending_requests call",
warn!(target: "secretstore", "{}: client is offline during read_logs call",
self.self_key_pair.public());
return Box::new(::std::iter::empty());
},
};
// prepare range of blocks to read logs from
let (address, first_block, last_block) = {
let mut data = self.data.write();
let address = data.contract.address.clone();
let confirmed_block = match get_confirmed_block_hash(&*client, REQUEST_CONFIRMATIONS_REQUIRED) {
Some(confirmed_block) => confirmed_block,
None => return Box::new(::std::iter::empty()), // no block with enough confirmations
};
let first_block = match data.last_log_block.take().and_then(|b| client.tree_route(&b, &confirmed_block)) {
// if we have a route from last_log_block to confirmed_block => search for logs on this route
//
// potentially this could lead us to reading same logs twice when reorganizing to the fork, which
// already has been canonical previosuly
// the worst thing that can happen in this case is spending some time reading unneeded data from SS db
Some(ref route) if route.index < route.blocks.len() => route.blocks[route.index],
// else we care only about confirmed block
_ => confirmed_block.clone(),
};
data.last_log_block = Some(confirmed_block.clone());
(address, first_block, confirmed_block)
};
// read server key generation requests
let contract_address = self.contract.read().address.clone();
let request_logs = client.logs(Filter {
from_block: BlockId::Hash(first_block),
to_block: BlockId::Hash(last_block),
address: Some(vec![contract_address]),
from_block: BlockId::Hash(first_block.clone()),
to_block: BlockId::Hash(last_block.clone()),
address: Some(vec![address]),
topics: vec![
Some(vec![*SERVER_KEY_REQUESTED_EVENT_NAME_HASH]),
None,
@ -155,6 +186,9 @@ impl ServiceContract for OnChainServiceContract {
limit: None,
});
trace!(target: "secretstore", "{}: read {} events from service contract in blocks {}..{}",
self.self_key_pair.public(), request_logs.len(), first_block, last_block);
Box::new(request_logs.into_iter().map(|log| log.entry.topics))
}
@ -168,15 +202,15 @@ impl ServiceContract for OnChainServiceContract {
},
};
let contract = self.contract.read();
match contract.address == Default::default() {
// we only need requests that are here for more than REQUEST_CONFIRMATIONS_REQUIRED blocks
// => we're reading from Latest - (REQUEST_CONFIRMATIONS_REQUIRED + 1) block
let data = self.data.read();
match data.contract.address == Default::default() {
true => Box::new(::std::iter::empty()),
false => client.block_number(BlockId::Latest)
.and_then(|b| b.checked_sub(REQUEST_CONFIRMATIONS_REQUIRED))
.and_then(|b| client.block_hash(BlockId::Number(b)))
false => get_confirmed_block_hash(&*client, REQUEST_CONFIRMATIONS_REQUIRED + 1)
.and_then(|b| {
let do_call = |a, d| future::done(client.call_contract(BlockId::Hash(b.clone()), a, d));
contract.server_key_generation_requests_count(&do_call).wait()
data.contract.server_key_generation_requests_count(&do_call).wait()
.map_err(|error| {
warn!(target: "secretstore", "{}: call to server_key_generation_requests_count failed: {}",
self.self_key_pair.public(), error);
@ -187,7 +221,7 @@ impl ServiceContract for OnChainServiceContract {
})
.map(|(b, l)| Box::new(PendingRequestsIterator {
client: client,
contract: contract.clone(),
contract: data.contract.clone(),
self_key_pair: self.self_key_pair.clone(),
block: b,
index: 0.into(),
@ -199,8 +233,8 @@ impl ServiceContract for OnChainServiceContract {
fn publish_server_key(&self, server_key_id: &ServerKeyId, server_key: &Public) -> Result<(), String> {
// only publish if contract address is set && client is online
let contract = self.contract.read();
if contract.address == Default::default() {
let data = self.data.read();
if data.contract.address == Default::default() {
// it is not an error, because key could be generated even without contract
return Ok(());
}
@ -215,7 +249,7 @@ impl ServiceContract for OnChainServiceContract {
// or key has been requested using HTTP API
let do_call = |a, d| future::done(client.call_contract(BlockId::Latest, a, d));
let self_address = public_to_address(self.self_key_pair.public());
if contract.get_server_key_confirmation_status(&do_call, server_key_id.clone(), self_address).wait().unwrap_or(false) {
if data.contract.get_server_key_confirmation_status(&do_call, server_key_id.clone(), self_address).wait().unwrap_or(false) {
return Ok(());
}
@ -223,7 +257,7 @@ impl ServiceContract for OnChainServiceContract {
let server_key_hash = keccak(server_key);
let signed_server_key = self.self_key_pair.sign(&server_key_hash).map_err(|e| format!("{}", e))?;
let signed_server_key: Signature = signed_server_key.into_electrum().into();
let transaction_data = contract.encode_server_key_generated_input(server_key_id.clone(),
let transaction_data = data.contract.encode_server_key_generated_input(server_key_id.clone(),
server_key.to_vec(),
signed_server_key.v(),
signed_server_key.r().into(),
@ -232,7 +266,7 @@ impl ServiceContract for OnChainServiceContract {
// send transaction
client.transact_contract(
contract.address.clone(),
data.contract.address.clone(),
transaction_data
).map_err(|e| format!("{}", e))?;
@ -271,6 +305,13 @@ impl Iterator for PendingRequestsIterator {
}
}
/// Get hash of the last block with at least n confirmations.
fn get_confirmed_block_hash(client: &Client, confirmations: u64) -> Option<H256> {
client.block_number(BlockId::Latest)
.and_then(|b| b.checked_sub(confirmations))
.and_then(|b| client.block_hash(BlockId::Number(b)))
}
#[cfg(test)]
pub mod tests {
use parking_lot::Mutex;
@ -289,14 +330,11 @@ pub mod tests {
}
impl ServiceContract for DummyServiceContract {
fn update(&self) {
fn update(&self) -> bool {
true
}
fn is_actual(&self) -> bool {
self.is_actual
}
fn read_logs(&self, _first_block: H256, _last_block: H256) -> Box<Iterator<Item=Vec<H256>>> {
fn read_logs(&self) -> Box<Iterator<Item=Vec<H256>>> {
Box::new(self.logs.clone().into_iter())
}

View File

@ -141,8 +141,8 @@ impl ServiceContractListener {
}
/// Process incoming events of service contract.
fn process_service_contract_events(&self, first: H256, last: H256) {
self.data.tasks_queue.push(self.data.contract.read_logs(first, last)
fn process_service_contract_events(&self) {
self.data.tasks_queue.push_many(self.data.contract.read_logs()
.filter_map(|topics| match topics.len() {
// when key is already generated && we have this key
3 if self.data.key_storage.get(&topics[1]).map(|k| k.is_some()).unwrap_or_default() => {
@ -324,15 +324,11 @@ impl ChainNotify for ServiceContractListener {
return;
}
self.data.contract.update();
if !self.data.contract.is_actual() {
if !self.data.contract.update() {
return;
}
let reason = "enacted.len() != 0; qed";
self.process_service_contract_events(
enacted.first().expect(reason).clone(),
enacted.last().expect(reason).clone());
self.process_service_contract_events();
// schedule retry if received enough blocks since last retry
// it maybe inaccurate when switching syncing/synced states, but that's ok
@ -582,9 +578,9 @@ mod tests {
#[test]
fn no_tasks_scheduled_when_no_contract_events() {
let listener = make_service_contract_listener(None, None, None);
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
listener.process_service_contract_events(Default::default(), Default::default());
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
listener.process_service_contract_events();
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
}
#[test]
@ -592,10 +588,10 @@ mod tests {
let mut contract = DummyServiceContract::default();
contract.logs.push(vec![Default::default(), Default::default(), Default::default()]);
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None);
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
listener.process_service_contract_events(Default::default(), Default::default());
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 2);
assert_eq!(listener.data.tasks_queue.service_tasks.lock().pop_back(), Some(ServiceTask::GenerateServerKey(Default::default(), Default::default())));
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
listener.process_service_contract_events();
assert_eq!(listener.data.tasks_queue.snapshot().len(), 2);
assert_eq!(listener.data.tasks_queue.snapshot().pop_back(), Some(ServiceTask::GenerateServerKey(Default::default(), Default::default())));
}
#[test]
@ -604,9 +600,9 @@ mod tests {
let mut contract = DummyServiceContract::default();
contract.logs.push(vec![Default::default(), server_key_id, Default::default()]);
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None);
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
listener.process_service_contract_events(Default::default(), Default::default());
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
listener.process_service_contract_events();
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
}
#[test]
@ -615,10 +611,10 @@ mod tests {
contract.logs.push(vec![Default::default(), Default::default(), Default::default()]);
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None);
listener.data.key_storage.insert(Default::default(), Default::default()).unwrap();
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
listener.process_service_contract_events(Default::default(), Default::default());
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 2);
assert_eq!(listener.data.tasks_queue.service_tasks.lock().pop_back(), Some(ServiceTask::RestoreServerKey(Default::default())));
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
listener.process_service_contract_events();
assert_eq!(listener.data.tasks_queue.snapshot().len(), 2);
assert_eq!(listener.data.tasks_queue.snapshot().pop_back(), Some(ServiceTask::RestoreServerKey(Default::default())));
}
#[test]
@ -626,9 +622,9 @@ mod tests {
let mut contract = DummyServiceContract::default();
contract.logs.push(vec![Default::default(), Default::default()]);
let listener = make_service_contract_listener(Some(Arc::new(contract)), None, None);
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
listener.process_service_contract_events(Default::default(), Default::default());
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
listener.process_service_contract_events();
assert_eq!(listener.data.tasks_queue.snapshot().len(), 1);
}
#[test]

View File

@ -19,14 +19,14 @@ use parking_lot::{Mutex, Condvar};
#[derive(Default)]
/// Service tasks queue.
pub struct TasksQueue<Task> {
pub struct TasksQueue<Task: Clone> {
/// Service event.
service_event: Condvar,
/// Service tasks queue.
service_tasks: Mutex<VecDeque<Task>>,
}
impl<Task> TasksQueue<Task> {
impl<Task> TasksQueue<Task> where Task: Clone {
/// Create new tasks queue.
pub fn new() -> Self {
TasksQueue {
@ -35,6 +35,12 @@ impl<Task> TasksQueue<Task> {
}
}
#[cfg(test)]
/// Get current tasks snapshot.
pub fn snapshot(&self) -> VecDeque<Task> {
self.service_tasks.lock().clone()
}
/// Push task to the front of queue.
pub fn push_front(&self, task: Task) {
let mut service_tasks = self.service_tasks.lock();
@ -49,7 +55,17 @@ impl<Task> TasksQueue<Task> {
self.service_event.notify_all();
}
/// Wait for new task.
/// Push task to the back of queue.
pub fn push_many<I: Iterator<Item=Task>>(&self, tasks: I) {
let mut service_tasks = self.service_tasks.lock();
let previous_len = service_tasks.len();
service_tasks.extend(tasks);
if service_tasks.len() != previous_len {
self.service_event.notify_all();
}
}
/// Wait for new task (task is removed from the front of queue).
pub fn wait(&self) -> Task {
let mut service_tasks = self.service_tasks.lock();
if service_tasks.is_empty() {
@ -57,6 +73,6 @@ impl<Task> TasksQueue<Task> {
}
service_tasks.pop_front()
.expect("service_event is only fired when there are new tasks or is_shutdown == true; is_shutdown == false; qed")
.expect("service_event is only fired when there are new tasks; qed")
}
}