Merge pull request #503 from ethcore/client_bugs

Fixing marking blocks as bad & SyncMessage bugs + small client refactoring.
This commit is contained in:
Gav Wood 2016-02-26 21:10:04 +01:00
commit e64293dbe7
6 changed files with 153 additions and 100 deletions

@ -1 +1 @@
Subproject commit f32954b3ddb5af2dc3dc9ec6d9a28bee848fdf70 Subproject commit 3116f85a499ceaf4dfdc46726060fc056e2d7829

View File

@ -144,20 +144,20 @@ impl IsBlock for ExecutedBlock {
/// Block that is ready for transactions to be added. /// Block that is ready for transactions to be added.
/// ///
/// It's a bit like a Vec<Transaction>, eccept that whenever a transaction is pushed, we execute it and /// It's a bit like a Vec<Transaction>, except that whenever a transaction is pushed, we execute it and
/// maintain the system `state()`. We also archive execution receipts in preparation for later block creation. /// maintain the system `state()`. We also archive execution receipts in preparation for later block creation.
pub struct OpenBlock<'x, 'y> { pub struct OpenBlock<'x> {
block: ExecutedBlock, block: ExecutedBlock,
engine: &'x Engine, engine: &'x Engine,
last_hashes: &'y LastHashes, last_hashes: LastHashes,
} }
/// Just like OpenBlock, except that we've applied `Engine::on_close_block`, finished up the non-seal header fields, /// Just like OpenBlock, except that we've applied `Engine::on_close_block`, finished up the non-seal header fields,
/// and collected the uncles. /// and collected the uncles.
/// ///
/// There is no function available to push a transaction. If you want that you'll need to `reopen()` it. /// There is no function available to push a transaction. If you want that you'll need to `reopen()` it.
pub struct ClosedBlock<'x, 'y> { pub struct ClosedBlock<'x> {
open_block: OpenBlock<'x, 'y>, open_block: OpenBlock<'x>,
uncle_bytes: Bytes, uncle_bytes: Bytes,
} }
@ -169,9 +169,9 @@ pub struct SealedBlock {
uncle_bytes: Bytes, uncle_bytes: Bytes,
} }
impl<'x, 'y> OpenBlock<'x, 'y> { impl<'x> OpenBlock<'x> {
/// Create a new OpenBlock ready for transaction pushing. /// Create a new OpenBlock ready for transaction pushing.
pub fn new(engine: &'x Engine, db: JournalDB, parent: &Header, last_hashes: &'y LastHashes, author: Address, extra_data: Bytes) -> Self { pub fn new(engine: &'x Engine, db: JournalDB, parent: &Header, last_hashes: LastHashes, author: Address, extra_data: Bytes) -> Self {
let mut r = OpenBlock { let mut r = OpenBlock {
block: ExecutedBlock::new(State::from_existing(db, parent.state_root().clone(), engine.account_start_nonce())), block: ExecutedBlock::new(State::from_existing(db, parent.state_root().clone(), engine.account_start_nonce())),
engine: engine, engine: engine,
@ -259,7 +259,7 @@ impl<'x, 'y> OpenBlock<'x, 'y> {
} }
/// Turn this into a `ClosedBlock`. A BlockChain must be provided in order to figure out the uncles. /// Turn this into a `ClosedBlock`. A BlockChain must be provided in order to figure out the uncles.
pub fn close(self) -> ClosedBlock<'x, 'y> { pub fn close(self) -> ClosedBlock<'x> {
let mut s = self; let mut s = self;
s.engine.on_close_block(&mut s.block); s.engine.on_close_block(&mut s.block);
s.block.base.header.transactions_root = ordered_trie_root(s.block.base.transactions.iter().map(|ref e| e.rlp_bytes().to_vec()).collect()); s.block.base.header.transactions_root = ordered_trie_root(s.block.base.transactions.iter().map(|ref e| e.rlp_bytes().to_vec()).collect());
@ -275,16 +275,16 @@ impl<'x, 'y> OpenBlock<'x, 'y> {
} }
} }
impl<'x, 'y> IsBlock for OpenBlock<'x, 'y> { impl<'x> IsBlock for OpenBlock<'x> {
fn block(&self) -> &ExecutedBlock { &self.block } fn block(&self) -> &ExecutedBlock { &self.block }
} }
impl<'x, 'y> IsBlock for ClosedBlock<'x, 'y> { impl<'x> IsBlock for ClosedBlock<'x> {
fn block(&self) -> &ExecutedBlock { &self.open_block.block } fn block(&self) -> &ExecutedBlock { &self.open_block.block }
} }
impl<'x, 'y> ClosedBlock<'x, 'y> { impl<'x> ClosedBlock<'x> {
fn new(open_block: OpenBlock<'x, 'y>, uncle_bytes: Bytes) -> Self { fn new(open_block: OpenBlock<'x>, uncle_bytes: Bytes) -> Self {
ClosedBlock { ClosedBlock {
open_block: open_block, open_block: open_block,
uncle_bytes: uncle_bytes, uncle_bytes: uncle_bytes,
@ -307,7 +307,7 @@ impl<'x, 'y> ClosedBlock<'x, 'y> {
} }
/// Turn this back into an `OpenBlock`. /// Turn this back into an `OpenBlock`.
pub fn reopen(self) -> OpenBlock<'x, 'y> { self.open_block } pub fn reopen(self) -> OpenBlock<'x> { self.open_block }
/// Drop this object and return the underlieing database. /// Drop this object and return the underlieing database.
pub fn drain(self) -> JournalDB { self.open_block.block.state.drop().1 } pub fn drain(self) -> JournalDB { self.open_block.block.state.drop().1 }
@ -332,7 +332,7 @@ impl IsBlock for SealedBlock {
} }
/// Enact the block given by block header, transactions and uncles /// Enact the block given by block header, transactions and uncles
pub fn enact<'x, 'y>(header: &Header, transactions: &[SignedTransaction], uncles: &[Header], engine: &'x Engine, db: JournalDB, parent: &Header, last_hashes: &'y LastHashes) -> Result<ClosedBlock<'x, 'y>, Error> { pub fn enact<'x>(header: &Header, transactions: &[SignedTransaction], uncles: &[Header], engine: &'x Engine, db: JournalDB, parent: &Header, last_hashes: LastHashes) -> Result<ClosedBlock<'x>, Error> {
{ {
if ::log::max_log_level() >= ::log::LogLevel::Trace { if ::log::max_log_level() >= ::log::LogLevel::Trace {
let s = State::from_existing(db.clone(), parent.state_root().clone(), engine.account_start_nonce()); let s = State::from_existing(db.clone(), parent.state_root().clone(), engine.account_start_nonce());
@ -350,20 +350,20 @@ pub fn enact<'x, 'y>(header: &Header, transactions: &[SignedTransaction], uncles
} }
/// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header /// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header
pub fn enact_bytes<'x, 'y>(block_bytes: &[u8], engine: &'x Engine, db: JournalDB, parent: &Header, last_hashes: &'y LastHashes) -> Result<ClosedBlock<'x, 'y>, Error> { pub fn enact_bytes<'x>(block_bytes: &[u8], engine: &'x Engine, db: JournalDB, parent: &Header, last_hashes: LastHashes) -> Result<ClosedBlock<'x>, Error> {
let block = BlockView::new(block_bytes); let block = BlockView::new(block_bytes);
let header = block.header(); let header = block.header();
enact(&header, &block.transactions(), &block.uncles(), engine, db, parent, last_hashes) enact(&header, &block.transactions(), &block.uncles(), engine, db, parent, last_hashes)
} }
/// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header /// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header
pub fn enact_verified<'x, 'y>(block: &PreVerifiedBlock, engine: &'x Engine, db: JournalDB, parent: &Header, last_hashes: &'y LastHashes) -> Result<ClosedBlock<'x, 'y>, Error> { pub fn enact_verified<'x>(block: &PreVerifiedBlock, engine: &'x Engine, db: JournalDB, parent: &Header, last_hashes: LastHashes) -> Result<ClosedBlock<'x>, Error> {
let view = BlockView::new(&block.bytes); let view = BlockView::new(&block.bytes);
enact(&block.header, &block.transactions, &view.uncles(), engine, db, parent, last_hashes) enact(&block.header, &block.transactions, &view.uncles(), engine, db, parent, last_hashes)
} }
/// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header. Seal the block aferwards /// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header. Seal the block aferwards
pub fn enact_and_seal(block_bytes: &[u8], engine: &Engine, db: JournalDB, parent: &Header, last_hashes: &LastHashes) -> Result<SealedBlock, Error> { pub fn enact_and_seal(block_bytes: &[u8], engine: &Engine, db: JournalDB, parent: &Header, last_hashes: LastHashes) -> Result<SealedBlock, Error> {
let header = BlockView::new(block_bytes).header_view(); let header = BlockView::new(block_bytes).header_view();
Ok(try!(try!(enact_bytes(block_bytes, engine, db, parent, last_hashes)).seal(header.seal()))) Ok(try!(try!(enact_bytes(block_bytes, engine, db, parent, last_hashes)).seal(header.seal())))
} }
@ -384,7 +384,7 @@ mod tests {
let mut db = db_result.take(); let mut db = db_result.take();
engine.spec().ensure_db_good(&mut db); engine.spec().ensure_db_good(&mut db);
let last_hashes = vec![genesis_header.hash()]; let last_hashes = vec![genesis_header.hash()];
let b = OpenBlock::new(engine.deref(), db, &genesis_header, &last_hashes, Address::zero(), vec![]); let b = OpenBlock::new(engine.deref(), db, &genesis_header, last_hashes, Address::zero(), vec![]);
let b = b.close(); let b = b.close();
let _ = b.seal(vec![]); let _ = b.seal(vec![]);
} }
@ -398,14 +398,14 @@ mod tests {
let mut db_result = get_temp_journal_db(); let mut db_result = get_temp_journal_db();
let mut db = db_result.take(); let mut db = db_result.take();
engine.spec().ensure_db_good(&mut db); engine.spec().ensure_db_good(&mut db);
let b = OpenBlock::new(engine.deref(), db, &genesis_header, &vec![genesis_header.hash()], Address::zero(), vec![]).close().seal(vec![]).unwrap(); let b = OpenBlock::new(engine.deref(), db, &genesis_header, vec![genesis_header.hash()], Address::zero(), vec![]).close().seal(vec![]).unwrap();
let orig_bytes = b.rlp_bytes(); let orig_bytes = b.rlp_bytes();
let orig_db = b.drain(); let orig_db = b.drain();
let mut db_result = get_temp_journal_db(); let mut db_result = get_temp_journal_db();
let mut db = db_result.take(); let mut db = db_result.take();
engine.spec().ensure_db_good(&mut db); engine.spec().ensure_db_good(&mut db);
let e = enact_and_seal(&orig_bytes, engine.deref(), db, &genesis_header, &vec![genesis_header.hash()]).unwrap(); let e = enact_and_seal(&orig_bytes, engine.deref(), db, &genesis_header, vec![genesis_header.hash()]).unwrap();
assert_eq!(e.rlp_bytes(), orig_bytes); assert_eq!(e.rlp_bytes(), orig_bytes);

View File

@ -285,18 +285,24 @@ impl BlockQueue {
} }
/// Mark given block and all its children as bad. Stops verification. /// Mark given block and all its children as bad. Stops verification.
pub fn mark_as_bad(&mut self, hash: &H256) { pub fn mark_as_bad(&mut self, block_hashes: &[H256]) {
let mut verification_lock = self.verification.lock().unwrap(); let mut verification_lock = self.verification.lock().unwrap();
let mut processing = self.processing.write().unwrap();
let mut verification = verification_lock.deref_mut(); let mut verification = verification_lock.deref_mut();
verification.bad.reserve(block_hashes.len());
for hash in block_hashes {
verification.bad.insert(hash.clone()); verification.bad.insert(hash.clone());
self.processing.write().unwrap().remove(&hash); processing.remove(&hash);
}
let mut new_verified = VecDeque::new(); let mut new_verified = VecDeque::new();
for block in verification.verified.drain(..) { for block in verification.verified.drain(..) {
if verification.bad.contains(&block.header.parent_hash) { if verification.bad.contains(&block.header.parent_hash) {
verification.bad.insert(block.header.hash()); verification.bad.insert(block.header.hash());
self.processing.write().unwrap().remove(&block.header.hash()); processing.remove(&block.header.hash());
} } else {
else {
new_verified.push_back(block); new_verified.push_back(block);
} }
} }
@ -304,10 +310,10 @@ impl BlockQueue {
} }
/// Mark given block as processed /// Mark given block as processed
pub fn mark_as_good(&mut self, hashes: &[H256]) { pub fn mark_as_good(&mut self, block_hashes: &[H256]) {
let mut processing = self.processing.write().unwrap(); let mut processing = self.processing.write().unwrap();
for h in hashes { for hash in block_hashes {
processing.remove(&h); processing.remove(&hash);
} }
} }

View File

@ -21,7 +21,7 @@ use util::panics::*;
use blockchain::{BlockChain, BlockProvider, CacheSize}; use blockchain::{BlockChain, BlockProvider, CacheSize};
use views::BlockView; use views::BlockView;
use error::*; use error::*;
use header::BlockNumber; use header::{BlockNumber, Header};
use state::State; use state::State;
use spec::Spec; use spec::Spec;
use engine::Engine; use engine::Engine;
@ -227,85 +227,127 @@ impl Client {
self.block_queue.write().unwrap().flush(); self.block_queue.write().unwrap().flush();
} }
/// This is triggered by a message coming from a block queue when the block is ready for insertion fn build_last_hashes(&self, header: &Header) -> LastHashes {
pub fn import_verified_blocks(&self, io: &IoChannel<NetSyncMessage>) -> usize {
let mut ret = 0;
let mut bad = HashSet::new();
let _import_lock = self.import_lock.lock();
let blocks = self.block_queue.write().unwrap().drain(128);
let mut good_blocks = Vec::with_capacity(128);
for block in blocks {
if bad.contains(&block.header.parent_hash) {
self.block_queue.write().unwrap().mark_as_bad(&block.header.hash());
bad.insert(block.header.hash());
continue;
}
let header = &block.header;
if let Err(e) = verify_block_family(&header, &block.bytes, self.engine.deref().deref(), self.chain.read().unwrap().deref()) {
warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
self.block_queue.write().unwrap().mark_as_bad(&header.hash());
bad.insert(block.header.hash());
break;
};
let parent = match self.chain.read().unwrap().block_header(&header.parent_hash) {
Some(p) => p,
None => {
warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash);
self.block_queue.write().unwrap().mark_as_bad(&header.hash());
bad.insert(block.header.hash());
break;
},
};
// build last hashes
let mut last_hashes = LastHashes::new(); let mut last_hashes = LastHashes::new();
last_hashes.resize(256, H256::new()); last_hashes.resize(256, H256::new());
last_hashes[0] = header.parent_hash.clone(); last_hashes[0] = header.parent_hash.clone();
let chain = self.chain.read().unwrap();
for i in 0..255 { for i in 0..255 {
match self.chain.read().unwrap().block_details(&last_hashes[i]) { match chain.block_details(&last_hashes[i]) {
Some(details) => { Some(details) => {
last_hashes[i + 1] = details.parent.clone(); last_hashes[i + 1] = details.parent.clone();
}, },
None => break, None => break,
} }
} }
last_hashes
let db = self.state_db.lock().unwrap().clone();
let result = match enact_verified(&block, self.engine.deref().deref(), db, &parent, &last_hashes) {
Ok(b) => b,
Err(e) => {
warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
bad.insert(block.header.hash());
self.block_queue.write().unwrap().mark_as_bad(&header.hash());
break;
} }
fn check_and_close_block(&self, block: &PreVerifiedBlock) -> Result<ClosedBlock, ()> {
let engine = self.engine.deref().deref();
let header = &block.header;
// Verify Block Family
let verify_family_result = verify_block_family(&header, &block.bytes, engine, self.chain.read().unwrap().deref());
if let Err(e) = verify_family_result {
warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
return Err(());
}; };
if let Err(e) = verify_block_final(&header, result.block().header()) {
// Check if Parent is in chain
let chain_has_parent = self.chain.read().unwrap().block_header(&header.parent_hash);
if let None = chain_has_parent {
warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash);
return Err(());
};
// Enact Verified Block
let parent = chain_has_parent.unwrap();
let last_hashes = self.build_last_hashes(header);
let db = self.state_db.lock().unwrap().clone();
let enact_result = enact_verified(&block, engine, db, &parent, last_hashes);
if let Err(e) = enact_result {
warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
return Err(());
};
// Final Verification
let closed_block = enact_result.unwrap();
if let Err(e) = verify_block_final(&header, closed_block.block().header()) {
warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
self.block_queue.write().unwrap().mark_as_bad(&header.hash()); return Err(());
}
Ok(closed_block)
}
/// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self, io: &IoChannel<NetSyncMessage>) -> usize {
let max_blocks_to_import = 128;
let mut good_blocks = Vec::with_capacity(max_blocks_to_import);
let mut bad_blocks = HashSet::new();
let _import_lock = self.import_lock.lock();
let blocks = self.block_queue.write().unwrap().drain(max_blocks_to_import);
for block in blocks {
let header = &block.header;
if bad_blocks.contains(&header.parent_hash) {
bad_blocks.insert(header.hash());
continue;
}
let closed_block = self.check_and_close_block(&block);
if let Err(_) = closed_block {
bad_blocks.insert(header.hash());
break; break;
} }
good_blocks.push(header.hash().clone()); // Insert block
let closed_block = closed_block.unwrap();
self.chain.write().unwrap().insert_block(&block.bytes, closed_block.block().receipts().clone());
good_blocks.push(header.hash());
let ancient = if header.number() >= HISTORY {
let n = header.number() - HISTORY;
let chain = self.chain.read().unwrap();
Some((n, chain.block_hash(n).unwrap()))
} else {
None
};
// Commit results
closed_block.drain()
.commit(header.number(), &header.hash(), ancient)
.expect("State DB commit failed.");
self.chain.write().unwrap().insert_block(&block.bytes, result.block().receipts().clone()); //TODO: err here?
let ancient = if header.number() >= HISTORY { Some(header.number() - HISTORY) } else { None };
match result.drain().commit(header.number(), &header.hash(), ancient.map(|n|(n, self.chain.read().unwrap().block_hash(n).unwrap()))) {
Ok(_) => (),
Err(e) => {
warn!(target: "client", "State DB commit failed: {:?}", e);
break;
}
}
self.report.write().unwrap().accrue_block(&block); self.report.write().unwrap().accrue_block(&block);
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
ret += 1;
} }
self.block_queue.write().unwrap().mark_as_good(&good_blocks);
if !good_blocks.is_empty() && self.block_queue.read().unwrap().queue_info().is_empty() { let imported = good_blocks.len();
io.send(NetworkIoMessage::User(SyncMessage::BlockVerified)).unwrap(); let bad_blocks = bad_blocks.into_iter().collect::<Vec<H256>>();
{
let mut block_queue = self.block_queue.write().unwrap();
block_queue.mark_as_bad(&bad_blocks);
block_queue.mark_as_good(&good_blocks);
} }
ret
{
let block_queue = self.block_queue.read().unwrap();
if !good_blocks.is_empty() && block_queue.queue_info().is_empty() {
io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks {
good: good_blocks,
bad: bad_blocks,
})).unwrap();
}
}
imported
} }
/// Get a copy of the best block's state. /// Get a copy of the best block's state.

View File

@ -282,7 +282,7 @@ mod tests {
let mut db = db_result.take(); let mut db = db_result.take();
engine.spec().ensure_db_good(&mut db); engine.spec().ensure_db_good(&mut db);
let last_hashes = vec![genesis_header.hash()]; let last_hashes = vec![genesis_header.hash()];
let b = OpenBlock::new(engine.deref(), db, &genesis_header, &last_hashes, Address::zero(), vec![]); let b = OpenBlock::new(engine.deref(), db, &genesis_header, last_hashes, Address::zero(), vec![]);
let b = b.close(); let b = b.close();
assert_eq!(b.state().balance(&Address::zero()), U256::from_str("4563918244f40000").unwrap()); assert_eq!(b.state().balance(&Address::zero()), U256::from_str("4563918244f40000").unwrap());
} }
@ -295,7 +295,7 @@ mod tests {
let mut db = db_result.take(); let mut db = db_result.take();
engine.spec().ensure_db_good(&mut db); engine.spec().ensure_db_good(&mut db);
let last_hashes = vec![genesis_header.hash()]; let last_hashes = vec![genesis_header.hash()];
let mut b = OpenBlock::new(engine.deref(), db, &genesis_header, &last_hashes, Address::zero(), vec![]); let mut b = OpenBlock::new(engine.deref(), db, &genesis_header, last_hashes, Address::zero(), vec![]);
let mut uncle = Header::new(); let mut uncle = Header::new();
let uncle_author = address_from_hex("ef2d6d194084c2de36e0dabfce45d046b37d1106"); let uncle_author = address_from_hex("ef2d6d194084c2de36e0dabfce45d046b37d1106");
uncle.author = uncle_author.clone(); uncle.author = uncle_author.clone();

View File

@ -26,7 +26,12 @@ use client::Client;
#[derive(Clone)] #[derive(Clone)]
pub enum SyncMessage { pub enum SyncMessage {
/// New block has been imported into the blockchain /// New block has been imported into the blockchain
NewChainBlock(Bytes), //TODO: use Cow NewChainBlocks {
/// Hashes of blocks imported to blockchain
good: Vec<H256>,
/// Hashes of blocks not imported to blockchain
bad: Vec<H256>,
},
/// A block is ready /// A block is ready
BlockVerified, BlockVerified,
} }