From d3fe3f26918c676353f96461a94ac6a8aa6fceb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 24 Feb 2016 10:55:34 +0100 Subject: [PATCH] Client refactoring [WIP] --- ethcore/src/block_queue.rs | 23 ++++--- ethcore/src/client.rs | 124 ++++++++++++++++++++----------------- 2 files changed, 81 insertions(+), 66 deletions(-) diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index c39f158f0..b802e3c26 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -285,19 +285,22 @@ impl BlockQueue { } /// Mark given block and all its children as bad. Stops verification. - pub fn mark_as_bad(&mut self, hash: &H256) { + pub fn mark_as_bad(&mut self, hashes: &[H256]) { let mut verification_lock = self.verification.lock().unwrap(); let mut verification = verification_lock.deref_mut(); - verification.bad.insert(hash.clone()); - self.processing.write().unwrap().remove(&hash); let mut new_verified = VecDeque::new(); - for block in verification.verified.drain(..) { - if verification.bad.contains(&block.header.parent_hash) { - verification.bad.insert(block.header.hash()); - self.processing.write().unwrap().remove(&block.header.hash()); - } - else { - new_verified.push_back(block); + + for hash in hashes { + verification.bad.insert(hash.clone()); + self.processing.write().unwrap().remove(&hash); + for block in verification.verified.drain(..) { + if verification.bad.contains(&block.header.parent_hash) { + verification.bad.insert(block.header.hash()); + self.processing.write().unwrap().remove(&block.header.hash()); + } + else { + new_verified.push_back(block); + } } } verification.verified = new_verified; diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index 87dba3dd5..3c44b97bf 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -259,70 +259,73 @@ impl Client { last_hashes } + fn check_and_close_block(&self, block: &PreVerifiedBlock) -> Result { + let engine = self.engine.deref().deref(); + let header = &block.header; + let header_hash = block.header.hash(); + + // Verify Block Family + let verify_family_result = verify_block_family(&header, &block.bytes, engine, self.chain.read().unwrap().deref()); + if let Err(e) = verify_family_result { + warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); + return Err(()); + }; + + // Check if Parent is in chain + let chain_has_parent = self.chain.read().unwrap().block_header(&header.parent_hash); + if let None = chain_has_parent { + warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash); + return Err(()); + }; + + // Enact Verified Block + let parent = chain_has_parent.unwrap(); + let last_hashes = self.build_last_hashes(header); + let db = self.state_db.lock().unwrap().clone(); + + let enact_result = enact_verified(&block, engine, db, &parent, last_hashes); + if let Err(e) = enact_result { + warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); + return Err(()); + }; + + // Final Verification + let closed_block = enact_result.unwrap(); + if let Err(e) = verify_block_final(&header, closed_block.block().header()) { + warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); + return Err(()); + } + + Ok(closed_block) + } + /// This is triggered by a message coming from a block queue when the block is ready for insertion pub fn import_verified_blocks(&self, io: &IoChannel) -> usize { let max_blocks_to_import = 128; - let mut imported = 0; let mut good_blocks = Vec::with_capacity(max_blocks_to_import); let mut bad_blocks = HashSet::new(); - let engine = self.engine.deref().deref(); let _import_lock = self.import_lock.lock(); let blocks = self.block_queue.write().unwrap().drain(max_blocks_to_import); for block in blocks { - let header = &block.header; - let header_hash = block.header.hash(); - let bad_contains_parent = bad_blocks.contains(&header.parent_hash); + let header = block.header; - let mark_block_as_bad = || { - self.block_queue.write().unwrap().mark_as_bad(&header_hash); - bad_blocks.insert(header_hash); - }; - - if bad_contains_parent { - mark_block_as_bad(); + if bad_blocks.contains(&header.parent_hash) { + bad_blocks.insert(header.hash()); continue; } - // Verify Block Family - let verify_family_result = verify_block_family(&header, &block.bytes, engine, self.chain.read().unwrap().deref()); - if let Err(e) = verify_family_result { - warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); - mark_block_as_bad(); - break; - }; - // Check if Parent is in chain - let chain_has_parent = self.chain.read().unwrap().block_header(&header.parent_hash); - if let None = chain_has_parent { - warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash); - mark_block_as_bad(); - break; - }; - - // Enact Verified Block - let parent = chain_has_parent.unwrap(); - let last_hashes = self.build_last_hashes(header); - let db = self.state_db.lock().unwrap().clone(); - - let enact_result = enact_verified(&block, engine, db, &parent, &last_hashes); - if let Err(e) = enact_result { - warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); - mark_block_as_bad(); - break; - }; - - // Final Verification - let enact_result = enact_result.unwrap(); - if let Err(e) = verify_block_final(&header, enact_result.block().header()) { - warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); - mark_block_as_bad(); + let closed_block = self.check_and_close_block(&block); + if let Err(_) = closed_block { + bad_blocks.insert(header.hash()); break; } // Insert block + let closed_block = closed_block.unwrap(); self.chain.write().unwrap().insert_block(&block.bytes); //TODO: err here? good_blocks.push(header.hash()); @@ -335,24 +338,33 @@ impl Client { }; // Commit results - let commit_result = enact_result.drain().commit(header.number(), &header.hash(), ancient); - if let Err(e) = commit_result { - warn!(target: "client", "State DB commit failed: {:?}", e); - break; - } + closed_block.drain() + .commit(header.number(), &header.hash(), ancient) + .expect("State DB commit failed."); self.report.write().unwrap().accrue_block(&block); trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); - imported += 1; } - self.block_queue.write().unwrap().mark_as_good(&good_blocks); - if !good_blocks.is_empty() && self.block_queue.read().unwrap().queue_info().is_empty() { - io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks { - good: good_blocks, - bad: bad_blocks.into_iter().collect(), - })).unwrap(); + let imported = good_blocks.len(); + let bad_blocks = bad_blocks.into_iter().collect::>(); + + { + let block_queue = self.block_queue.write().unwrap(); + block_queue.mark_as_bad(&bad_blocks); + block_queue.mark_as_good(&good_blocks); } + + { + let block_queue = self.block_queue.read().unwrap(); + if !good_blocks.is_empty() && block_queue.queue_info().is_empty() { + io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks { + good: good_blocks, + bad: bad_blocks, + })).unwrap(); + } + } + imported }