From 990c5c8faa6cded59257846bd58039e0c48ad085 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 23 Feb 2016 18:44:13 +0100 Subject: [PATCH] Refactoring client and fixing mark_as_bad & SyncMessage bugs --- ethcore/src/client.rs | 145 +++++++++++++++++++++++++---------------- ethcore/src/service.rs | 7 +- 2 files changed, 95 insertions(+), 57 deletions(-) diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index c3ec4b4d0..aa71fbbfa 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -22,7 +22,7 @@ use rocksdb::{Options, DB, DBCompactionStyle}; use blockchain::{BlockChain, BlockProvider, CacheSize}; use views::BlockView; use error::*; -use header::BlockNumber; +use header::{BlockNumber, Header}; use state::State; use spec::Spec; use engine::Engine; @@ -243,85 +243,117 @@ impl Client { self.block_queue.write().unwrap().flush(); } + fn build_last_hashes(&self, header: &Header) -> LastHashes { + let mut last_hashes = LastHashes::new(); + last_hashes.resize(256, H256::new()); + last_hashes[0] = header.parent_hash.clone(); + let chain = self.chain.read().unwrap(); + for i in 0..255 { + match chain.block_details(&last_hashes[i]) { + Some(details) => { + last_hashes[i + 1] = details.parent.clone(); + }, + None => break, + } + } + last_hashes + } + /// This is triggered by a message coming from a block queue when the block is ready for insertion pub fn import_verified_blocks(&self, io: &IoChannel) -> usize { - let mut ret = 0; - let mut bad = HashSet::new(); + let max_blocks_to_import = 128; + + let mut imported = 0; + let mut good_blocks = Vec::with_capacity(max_blocks_to_import); + let mut bad_blocks = HashSet::new(); + let engine = self.engine.deref().deref(); + let _import_lock = self.import_lock.lock(); - let blocks = self.block_queue.write().unwrap().drain(128); - let mut good_blocks = Vec::with_capacity(128); + let blocks = self.block_queue.write().unwrap().drain(max_blocks_to_import); + for block in blocks { - if bad.contains(&block.header.parent_hash) { - self.block_queue.write().unwrap().mark_as_bad(&block.header.hash()); - bad.insert(block.header.hash()); + let header = &block.header; + let header_hash = block.header.hash(); + let bad_contains_parent = bad_blocks.contains(&header.parent_hash); + + let mark_block_as_bad = || { + self.block_queue.write().unwrap().mark_as_bad(&header_hash); + bad_blocks.insert(header_hash); + }; + + if bad_contains_parent { + mark_block_as_bad(); continue; } - let header = &block.header; - if let Err(e) = verify_block_family(&header, &block.bytes, self.engine.deref().deref(), self.chain.read().unwrap().deref()) { + // Verify Block Family + let verify_family_result = verify_block_family(&header, &block.bytes, engine, self.chain.read().unwrap().deref()); + if let Err(e) = verify_family_result { warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); - self.block_queue.write().unwrap().mark_as_bad(&header.hash()); - bad.insert(block.header.hash()); + mark_block_as_bad(); break; }; - let parent = match self.chain.read().unwrap().block_header(&header.parent_hash) { - Some(p) => p, - None => { - warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash); - self.block_queue.write().unwrap().mark_as_bad(&header.hash()); - bad.insert(block.header.hash()); - break; - }, - }; - // build last hashes - let mut last_hashes = LastHashes::new(); - last_hashes.resize(256, H256::new()); - last_hashes[0] = header.parent_hash.clone(); - for i in 0..255 { - match self.chain.read().unwrap().block_details(&last_hashes[i]) { - Some(details) => { - last_hashes[i + 1] = details.parent.clone(); - }, - None => break, - } - } + // Check if Parent is in chain + let chain_has_parent = self.chain.read().unwrap().block_header(&header.parent_hash); + if let None = chain_has_parent { + warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash); + mark_block_as_bad(); + break; + }; + + // Enact Verified Block + let parent = chain_has_parent.unwrap(); + let last_hashes = self.build_last_hashes(header); let db = self.state_db.lock().unwrap().clone(); - let result = match enact_verified(&block, self.engine.deref().deref(), db, &parent, &last_hashes) { - Ok(b) => b, - Err(e) => { - warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); - bad.insert(block.header.hash()); - self.block_queue.write().unwrap().mark_as_bad(&header.hash()); - break; - } + + let enact_result = enact_verified(&block, engine, db, &parent, &last_hashes); + if let Err(e) = enact_result { + warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); + mark_block_as_bad(); + break; }; - if let Err(e) = verify_block_final(&header, result.block().header()) { + + // Final Verification + let enact_result = enact_result.unwrap(); + if let Err(e) = verify_block_final(&header, enact_result.block().header()) { warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); - self.block_queue.write().unwrap().mark_as_bad(&header.hash()); + mark_block_as_bad(); break; } - good_blocks.push(header.hash().clone()); - + // Insert block self.chain.write().unwrap().insert_block(&block.bytes); //TODO: err here? - let ancient = if header.number() >= HISTORY { Some(header.number() - HISTORY) } else { None }; - match result.drain().commit(header.number(), &header.hash(), ancient.map(|n|(n, self.chain.read().unwrap().block_hash(n).unwrap()))) { - Ok(_) => (), - Err(e) => { - warn!(target: "client", "State DB commit failed: {:?}", e); - break; - } + good_blocks.push(header.hash()); + + let ancient = if header.number() >= HISTORY { + let n = header.number() - HISTORY; + let chain = self.chain.read().unwrap(); + Some((n, chain.block_hash(n).unwrap())) + } else { + None + }; + + // Commit results + let commit_result = enact_result.drain().commit(header.number(), &header.hash(), ancient); + if let Err(e) = commit_result { + warn!(target: "client", "State DB commit failed: {:?}", e); + break; } + self.report.write().unwrap().accrue_block(&block); trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); - ret += 1; + imported += 1; } + self.block_queue.write().unwrap().mark_as_good(&good_blocks); if !good_blocks.is_empty() && self.block_queue.read().unwrap().queue_info().is_empty() { - io.send(NetworkIoMessage::User(SyncMessage::BlockVerified)).unwrap(); + io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks { + good: good_blocks, + bad: bad_blocks.into_iter().collect(), + })).unwrap(); } - ret + imported } /// Get a copy of the best block's state. @@ -393,7 +425,7 @@ impl BlockChainClient for Client { None => BlockStatus::Unknown } } - + fn block_total_difficulty(&self, id: BlockId) -> Option { let chain = self.chain.read().unwrap(); Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty) @@ -435,6 +467,7 @@ impl BlockChainClient for Client { return Err(ImportError::UnknownParent); } self.block_queue.write().unwrap().import_block(bytes) + // TODO [ToDr] remove transactions } fn queue_info(&self) -> BlockQueueInfo { diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 534aab49d..cdf3425e8 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -26,7 +26,12 @@ use client::Client; #[derive(Clone)] pub enum SyncMessage { /// New block has been imported into the blockchain - NewChainBlock(Bytes), //TODO: use Cow + NewChainBlocks { + /// Hashes of blocks imported to blockchain + good: Vec, + /// Hashes of blocks not imported to blockchain + bad: Vec, + }, /// A block is ready BlockVerified, }