Refactoring client and fixing mark_as_bad & SyncMessage bugs

This commit is contained in:
Tomasz Drwięga 2016-02-23 18:44:13 +01:00
parent 6fe189cbd9
commit 990c5c8faa
2 changed files with 95 additions and 57 deletions

View File

@ -22,7 +22,7 @@ use rocksdb::{Options, DB, DBCompactionStyle};
use blockchain::{BlockChain, BlockProvider, CacheSize}; use blockchain::{BlockChain, BlockProvider, CacheSize};
use views::BlockView; use views::BlockView;
use error::*; use error::*;
use header::BlockNumber; use header::{BlockNumber, Header};
use state::State; use state::State;
use spec::Spec; use spec::Spec;
use engine::Engine; use engine::Engine;
@ -243,85 +243,117 @@ impl Client {
self.block_queue.write().unwrap().flush(); self.block_queue.write().unwrap().flush();
} }
fn build_last_hashes(&self, header: &Header) -> LastHashes {
let mut last_hashes = LastHashes::new();
last_hashes.resize(256, H256::new());
last_hashes[0] = header.parent_hash.clone();
let chain = self.chain.read().unwrap();
for i in 0..255 {
match chain.block_details(&last_hashes[i]) {
Some(details) => {
last_hashes[i + 1] = details.parent.clone();
},
None => break,
}
}
last_hashes
}
/// This is triggered by a message coming from a block queue when the block is ready for insertion /// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self, io: &IoChannel<NetSyncMessage>) -> usize { pub fn import_verified_blocks(&self, io: &IoChannel<NetSyncMessage>) -> usize {
let mut ret = 0; let max_blocks_to_import = 128;
let mut bad = HashSet::new();
let mut imported = 0;
let mut good_blocks = Vec::with_capacity(max_blocks_to_import);
let mut bad_blocks = HashSet::new();
let engine = self.engine.deref().deref();
let _import_lock = self.import_lock.lock(); let _import_lock = self.import_lock.lock();
let blocks = self.block_queue.write().unwrap().drain(128); let blocks = self.block_queue.write().unwrap().drain(max_blocks_to_import);
let mut good_blocks = Vec::with_capacity(128);
for block in blocks { for block in blocks {
if bad.contains(&block.header.parent_hash) { let header = &block.header;
self.block_queue.write().unwrap().mark_as_bad(&block.header.hash()); let header_hash = block.header.hash();
bad.insert(block.header.hash()); let bad_contains_parent = bad_blocks.contains(&header.parent_hash);
let mark_block_as_bad = || {
self.block_queue.write().unwrap().mark_as_bad(&header_hash);
bad_blocks.insert(header_hash);
};
if bad_contains_parent {
mark_block_as_bad();
continue; continue;
} }
let header = &block.header; // Verify Block Family
if let Err(e) = verify_block_family(&header, &block.bytes, self.engine.deref().deref(), self.chain.read().unwrap().deref()) { let verify_family_result = verify_block_family(&header, &block.bytes, engine, self.chain.read().unwrap().deref());
if let Err(e) = verify_family_result {
warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
self.block_queue.write().unwrap().mark_as_bad(&header.hash()); mark_block_as_bad();
bad.insert(block.header.hash());
break; break;
}; };
let parent = match self.chain.read().unwrap().block_header(&header.parent_hash) {
Some(p) => p,
None => {
warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash);
self.block_queue.write().unwrap().mark_as_bad(&header.hash());
bad.insert(block.header.hash());
break;
},
};
// build last hashes
let mut last_hashes = LastHashes::new();
last_hashes.resize(256, H256::new());
last_hashes[0] = header.parent_hash.clone();
for i in 0..255 {
match self.chain.read().unwrap().block_details(&last_hashes[i]) {
Some(details) => {
last_hashes[i + 1] = details.parent.clone();
},
None => break,
}
}
// Check if Parent is in chain
let chain_has_parent = self.chain.read().unwrap().block_header(&header.parent_hash);
if let None = chain_has_parent {
warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash);
mark_block_as_bad();
break;
};
// Enact Verified Block
let parent = chain_has_parent.unwrap();
let last_hashes = self.build_last_hashes(header);
let db = self.state_db.lock().unwrap().clone(); let db = self.state_db.lock().unwrap().clone();
let result = match enact_verified(&block, self.engine.deref().deref(), db, &parent, &last_hashes) {
Ok(b) => b, let enact_result = enact_verified(&block, engine, db, &parent, &last_hashes);
Err(e) => { if let Err(e) = enact_result {
warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
bad.insert(block.header.hash()); mark_block_as_bad();
self.block_queue.write().unwrap().mark_as_bad(&header.hash()); break;
break;
}
}; };
if let Err(e) = verify_block_final(&header, result.block().header()) {
// Final Verification
let enact_result = enact_result.unwrap();
if let Err(e) = verify_block_final(&header, enact_result.block().header()) {
warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
self.block_queue.write().unwrap().mark_as_bad(&header.hash()); mark_block_as_bad();
break; break;
} }
good_blocks.push(header.hash().clone()); // Insert block
self.chain.write().unwrap().insert_block(&block.bytes); //TODO: err here? self.chain.write().unwrap().insert_block(&block.bytes); //TODO: err here?
let ancient = if header.number() >= HISTORY { Some(header.number() - HISTORY) } else { None }; good_blocks.push(header.hash());
match result.drain().commit(header.number(), &header.hash(), ancient.map(|n|(n, self.chain.read().unwrap().block_hash(n).unwrap()))) {
Ok(_) => (), let ancient = if header.number() >= HISTORY {
Err(e) => { let n = header.number() - HISTORY;
warn!(target: "client", "State DB commit failed: {:?}", e); let chain = self.chain.read().unwrap();
break; Some((n, chain.block_hash(n).unwrap()))
} } else {
None
};
// Commit results
let commit_result = enact_result.drain().commit(header.number(), &header.hash(), ancient);
if let Err(e) = commit_result {
warn!(target: "client", "State DB commit failed: {:?}", e);
break;
} }
self.report.write().unwrap().accrue_block(&block); self.report.write().unwrap().accrue_block(&block);
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
ret += 1; imported += 1;
} }
self.block_queue.write().unwrap().mark_as_good(&good_blocks); self.block_queue.write().unwrap().mark_as_good(&good_blocks);
if !good_blocks.is_empty() && self.block_queue.read().unwrap().queue_info().is_empty() { if !good_blocks.is_empty() && self.block_queue.read().unwrap().queue_info().is_empty() {
io.send(NetworkIoMessage::User(SyncMessage::BlockVerified)).unwrap(); io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks {
good: good_blocks,
bad: bad_blocks.into_iter().collect(),
})).unwrap();
} }
ret imported
} }
/// Get a copy of the best block's state. /// Get a copy of the best block's state.
@ -393,7 +425,7 @@ impl BlockChainClient for Client {
None => BlockStatus::Unknown None => BlockStatus::Unknown
} }
} }
fn block_total_difficulty(&self, id: BlockId) -> Option<U256> { fn block_total_difficulty(&self, id: BlockId) -> Option<U256> {
let chain = self.chain.read().unwrap(); let chain = self.chain.read().unwrap();
Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty) Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty)
@ -435,6 +467,7 @@ impl BlockChainClient for Client {
return Err(ImportError::UnknownParent); return Err(ImportError::UnknownParent);
} }
self.block_queue.write().unwrap().import_block(bytes) self.block_queue.write().unwrap().import_block(bytes)
// TODO [ToDr] remove transactions
} }
fn queue_info(&self) -> BlockQueueInfo { fn queue_info(&self) -> BlockQueueInfo {

View File

@ -26,7 +26,12 @@ use client::Client;
#[derive(Clone)] #[derive(Clone)]
pub enum SyncMessage { pub enum SyncMessage {
/// New block has been imported into the blockchain /// New block has been imported into the blockchain
NewChainBlock(Bytes), //TODO: use Cow NewChainBlocks {
/// Hashes of blocks imported to blockchain
good: Vec<H256>,
/// Hashes of blocks not imported to blockchain
bad: Vec<H256>,
},
/// A block is ready /// A block is ready
BlockVerified, BlockVerified,
} }