Client refactoring [WIP]

This commit is contained in:
Tomasz Drwięga 2016-02-24 10:55:34 +01:00
parent 4084acd869
commit d3fe3f2691
2 changed files with 81 additions and 66 deletions

View File

@ -285,19 +285,22 @@ impl BlockQueue {
} }
/// Mark given block and all its children as bad. Stops verification. /// Mark given block and all its children as bad. Stops verification.
pub fn mark_as_bad(&mut self, hash: &H256) { pub fn mark_as_bad(&mut self, hashes: &[H256]) {
let mut verification_lock = self.verification.lock().unwrap(); let mut verification_lock = self.verification.lock().unwrap();
let mut verification = verification_lock.deref_mut(); let mut verification = verification_lock.deref_mut();
verification.bad.insert(hash.clone());
self.processing.write().unwrap().remove(&hash);
let mut new_verified = VecDeque::new(); let mut new_verified = VecDeque::new();
for block in verification.verified.drain(..) {
if verification.bad.contains(&block.header.parent_hash) { for hash in hashes {
verification.bad.insert(block.header.hash()); verification.bad.insert(hash.clone());
self.processing.write().unwrap().remove(&block.header.hash()); self.processing.write().unwrap().remove(&hash);
} for block in verification.verified.drain(..) {
else { if verification.bad.contains(&block.header.parent_hash) {
new_verified.push_back(block); verification.bad.insert(block.header.hash());
self.processing.write().unwrap().remove(&block.header.hash());
}
else {
new_verified.push_back(block);
}
} }
} }
verification.verified = new_verified; verification.verified = new_verified;

View File

@ -259,70 +259,73 @@ impl Client {
last_hashes last_hashes
} }
fn check_and_close_block(&self, block: &PreVerifiedBlock) -> Result<ClosedBlock, ()> {
let engine = self.engine.deref().deref();
let header = &block.header;
let header_hash = block.header.hash();
// Verify Block Family
let verify_family_result = verify_block_family(&header, &block.bytes, engine, self.chain.read().unwrap().deref());
if let Err(e) = verify_family_result {
warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
return Err(());
};
// Check if Parent is in chain
let chain_has_parent = self.chain.read().unwrap().block_header(&header.parent_hash);
if let None = chain_has_parent {
warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash);
return Err(());
};
// Enact Verified Block
let parent = chain_has_parent.unwrap();
let last_hashes = self.build_last_hashes(header);
let db = self.state_db.lock().unwrap().clone();
let enact_result = enact_verified(&block, engine, db, &parent, last_hashes);
if let Err(e) = enact_result {
warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
return Err(());
};
// Final Verification
let closed_block = enact_result.unwrap();
if let Err(e) = verify_block_final(&header, closed_block.block().header()) {
warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
return Err(());
}
Ok(closed_block)
}
/// This is triggered by a message coming from a block queue when the block is ready for insertion /// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self, io: &IoChannel<NetSyncMessage>) -> usize { pub fn import_verified_blocks(&self, io: &IoChannel<NetSyncMessage>) -> usize {
let max_blocks_to_import = 128; let max_blocks_to_import = 128;
let mut imported = 0;
let mut good_blocks = Vec::with_capacity(max_blocks_to_import); let mut good_blocks = Vec::with_capacity(max_blocks_to_import);
let mut bad_blocks = HashSet::new(); let mut bad_blocks = HashSet::new();
let engine = self.engine.deref().deref();
let _import_lock = self.import_lock.lock(); let _import_lock = self.import_lock.lock();
let blocks = self.block_queue.write().unwrap().drain(max_blocks_to_import); let blocks = self.block_queue.write().unwrap().drain(max_blocks_to_import);
for block in blocks { for block in blocks {
let header = &block.header; let header = block.header;
let header_hash = block.header.hash();
let bad_contains_parent = bad_blocks.contains(&header.parent_hash);
let mark_block_as_bad = || { if bad_blocks.contains(&header.parent_hash) {
self.block_queue.write().unwrap().mark_as_bad(&header_hash); bad_blocks.insert(header.hash());
bad_blocks.insert(header_hash);
};
if bad_contains_parent {
mark_block_as_bad();
continue; continue;
} }
// Verify Block Family
let verify_family_result = verify_block_family(&header, &block.bytes, engine, self.chain.read().unwrap().deref());
if let Err(e) = verify_family_result {
warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
mark_block_as_bad();
break;
};
// Check if Parent is in chain let closed_block = self.check_and_close_block(&block);
let chain_has_parent = self.chain.read().unwrap().block_header(&header.parent_hash); if let Err(_) = closed_block {
if let None = chain_has_parent { bad_blocks.insert(header.hash());
warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash);
mark_block_as_bad();
break;
};
// Enact Verified Block
let parent = chain_has_parent.unwrap();
let last_hashes = self.build_last_hashes(header);
let db = self.state_db.lock().unwrap().clone();
let enact_result = enact_verified(&block, engine, db, &parent, &last_hashes);
if let Err(e) = enact_result {
warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
mark_block_as_bad();
break;
};
// Final Verification
let enact_result = enact_result.unwrap();
if let Err(e) = verify_block_final(&header, enact_result.block().header()) {
warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
mark_block_as_bad();
break; break;
} }
// Insert block // Insert block
let closed_block = closed_block.unwrap();
self.chain.write().unwrap().insert_block(&block.bytes); //TODO: err here? self.chain.write().unwrap().insert_block(&block.bytes); //TODO: err here?
good_blocks.push(header.hash()); good_blocks.push(header.hash());
@ -335,24 +338,33 @@ impl Client {
}; };
// Commit results // Commit results
let commit_result = enact_result.drain().commit(header.number(), &header.hash(), ancient); closed_block.drain()
if let Err(e) = commit_result { .commit(header.number(), &header.hash(), ancient)
warn!(target: "client", "State DB commit failed: {:?}", e); .expect("State DB commit failed.");
break;
}
self.report.write().unwrap().accrue_block(&block); self.report.write().unwrap().accrue_block(&block);
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
imported += 1;
} }
self.block_queue.write().unwrap().mark_as_good(&good_blocks); let imported = good_blocks.len();
if !good_blocks.is_empty() && self.block_queue.read().unwrap().queue_info().is_empty() { let bad_blocks = bad_blocks.into_iter().collect::<Vec<H256>>();
io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks {
good: good_blocks, {
bad: bad_blocks.into_iter().collect(), let block_queue = self.block_queue.write().unwrap();
})).unwrap(); block_queue.mark_as_bad(&bad_blocks);
block_queue.mark_as_good(&good_blocks);
} }
{
let block_queue = self.block_queue.read().unwrap();
if !good_blocks.is_empty() && block_queue.queue_info().is_empty() {
io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks {
good: good_blocks,
bad: bad_blocks,
})).unwrap();
}
}
imported imported
} }