Check for parent on queue import

This commit is contained in:
arkpar 2016-02-02 12:12:32 +01:00
parent 5db17514fd
commit bd684e3732
4 changed files with 54 additions and 11 deletions

View File

@ -9,6 +9,7 @@ use engine::Engine;
use views::*; use views::*;
use header::*; use header::*;
use service::*; use service::*;
use client::BlockStatus;
/// Block queue status /// Block queue status
#[derive(Debug)] #[derive(Debug)]
@ -41,7 +42,7 @@ pub struct BlockQueue {
deleting: Arc<AtomicBool>, deleting: Arc<AtomicBool>,
ready_signal: Arc<QueueSignal>, ready_signal: Arc<QueueSignal>,
empty: Arc<Condvar>, empty: Arc<Condvar>,
processing: HashSet<H256> processing: RwLock<HashSet<H256>>
} }
struct UnVerifiedBlock { struct UnVerifiedBlock {
@ -106,7 +107,7 @@ impl BlockQueue {
verification: verification.clone(), verification: verification.clone(),
verifiers: verifiers, verifiers: verifiers,
deleting: deleting.clone(), deleting: deleting.clone(),
processing: HashSet::new(), processing: RwLock::new(HashSet::new()),
empty: empty.clone(), empty: empty.clone(),
} }
} }
@ -196,11 +197,22 @@ impl BlockQueue {
} }
} }
/// Check if the block is currently in the queue
pub fn block_status(&self, hash: &H256) -> BlockStatus {
if self.processing.read().unwrap().contains(&hash) {
return BlockStatus::Queued;
}
if self.verification.lock().unwrap().bad.contains(&hash) {
return BlockStatus::Bad;
}
BlockStatus::Unknown
}
/// Add a block to the queue. /// Add a block to the queue.
pub fn import_block(&mut self, bytes: Bytes) -> ImportResult { pub fn import_block(&mut self, bytes: Bytes) -> ImportResult {
let header = BlockView::new(&bytes).header(); let header = BlockView::new(&bytes).header();
let h = header.hash(); let h = header.hash();
if self.processing.contains(&h) { if self.processing.read().unwrap().contains(&h) {
return Err(ImportError::AlreadyQueued); return Err(ImportError::AlreadyQueued);
} }
{ {
@ -217,7 +229,7 @@ impl BlockQueue {
match verify_block_basic(&header, &bytes, self.engine.deref().deref()) { match verify_block_basic(&header, &bytes, self.engine.deref().deref()) {
Ok(()) => { Ok(()) => {
self.processing.insert(h.clone()); self.processing.write().unwrap().insert(h.clone());
self.verification.lock().unwrap().unverified.push_back(UnVerifiedBlock { header: header, bytes: bytes }); self.verification.lock().unwrap().unverified.push_back(UnVerifiedBlock { header: header, bytes: bytes });
self.more_to_verify.notify_all(); self.more_to_verify.notify_all();
Ok(h) Ok(h)
@ -235,10 +247,12 @@ impl BlockQueue {
let mut verification_lock = self.verification.lock().unwrap(); let mut verification_lock = self.verification.lock().unwrap();
let mut verification = verification_lock.deref_mut(); let mut verification = verification_lock.deref_mut();
verification.bad.insert(hash.clone()); verification.bad.insert(hash.clone());
self.processing.write().unwrap().remove(&hash);
let mut new_verified = VecDeque::new(); let mut new_verified = VecDeque::new();
for block in verification.verified.drain(..) { for block in verification.verified.drain(..) {
if verification.bad.contains(&block.header.parent_hash) { if verification.bad.contains(&block.header.parent_hash) {
verification.bad.insert(block.header.hash()); verification.bad.insert(block.header.hash());
self.processing.write().unwrap().remove(&block.header.hash());
} }
else { else {
new_verified.push_back(block); new_verified.push_back(block);
@ -247,6 +261,15 @@ impl BlockQueue {
verification.verified = new_verified; verification.verified = new_verified;
} }
/// Mark given block as processed
pub fn mark_as_good(&mut self, hashes: &[H256]) {
let mut processing = self.processing.write().unwrap();
for h in hashes {
processing.remove(&h);
}
//TODO: reward peers
}
/// Removes up to `max` verified blocks from the queue /// Removes up to `max` verified blocks from the queue
pub fn drain(&mut self, max: usize) -> Vec<PreVerifiedBlock> { pub fn drain(&mut self, max: usize) -> Vec<PreVerifiedBlock> {
let mut verification = self.verification.lock().unwrap(); let mut verification = self.verification.lock().unwrap();
@ -254,7 +277,6 @@ impl BlockQueue {
let mut result = Vec::with_capacity(count); let mut result = Vec::with_capacity(count);
for _ in 0..count { for _ in 0..count {
let block = verification.verified.pop_front().unwrap(); let block = verification.verified.pop_front().unwrap();
self.processing.remove(&block.header.hash());
result.push(block); result.push(block);
} }
self.ready_signal.reset(); self.ready_signal.reset();

View File

@ -15,7 +15,7 @@ use verification::*;
use block::*; use block::*;
/// General block status /// General block status
#[derive(Debug)] #[derive(Debug, Eq, PartialEq)]
pub enum BlockStatus { pub enum BlockStatus {
/// Part of the blockchain. /// Part of the blockchain.
InChain, InChain,
@ -204,6 +204,7 @@ impl Client {
let mut bad = HashSet::new(); let mut bad = HashSet::new();
let _import_lock = self.import_lock.lock(); let _import_lock = self.import_lock.lock();
let blocks = self.block_queue.write().unwrap().drain(128); let blocks = self.block_queue.write().unwrap().drain(128);
let mut good_blocks = Vec::with_capacity(128);
for block in blocks { for block in blocks {
if bad.contains(&block.header.parent_hash) { if bad.contains(&block.header.parent_hash) {
self.block_queue.write().unwrap().mark_as_bad(&block.header.hash()); self.block_queue.write().unwrap().mark_as_bad(&block.header.hash());
@ -256,6 +257,8 @@ impl Client {
break; break;
} }
good_blocks.push(header.hash().clone());
self.chain.write().unwrap().insert_block(&block.bytes); //TODO: err here? self.chain.write().unwrap().insert_block(&block.bytes); //TODO: err here?
let ancient = if header.number() >= HISTORY { Some(header.number() - HISTORY) } else { None }; let ancient = if header.number() >= HISTORY { Some(header.number() - HISTORY) } else { None };
match result.drain().commit(header.number(), &header.hash(), ancient.map(|n|(n, self.chain.read().unwrap().block_hash(n).unwrap()))) { match result.drain().commit(header.number(), &header.hash(), ancient.map(|n|(n, self.chain.read().unwrap().block_hash(n).unwrap()))) {
@ -269,6 +272,7 @@ impl Client {
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
ret += 1; ret += 1;
} }
self.block_queue.write().unwrap().mark_as_good(&good_blocks);
ret ret
} }
@ -318,7 +322,11 @@ impl BlockChainClient for Client {
} }
fn block_status(&self, hash: &H256) -> BlockStatus { fn block_status(&self, hash: &H256) -> BlockStatus {
if self.chain.read().unwrap().is_known(&hash) { BlockStatus::InChain } else { BlockStatus::Unknown } if self.chain.read().unwrap().is_known(&hash) {
BlockStatus::InChain
} else {
self.block_queue.read().unwrap().block_status(hash)
}
} }
fn block_total_difficulty(&self, hash: &H256) -> Option<U256> { fn block_total_difficulty(&self, hash: &H256) -> Option<U256> {
@ -365,6 +373,9 @@ impl BlockChainClient for Client {
if self.chain.read().unwrap().is_known(&header.hash()) { if self.chain.read().unwrap().is_known(&header.hash()) {
return Err(ImportError::AlreadyInChain); return Err(ImportError::AlreadyInChain);
} }
if self.block_status(&header.parent_hash) == BlockStatus::Unknown {
return Err(ImportError::UnknownParent);
}
self.block_queue.write().unwrap().import_block(bytes) self.block_queue.write().unwrap().import_block(bytes)
} }

View File

@ -130,14 +130,16 @@ pub enum BlockError {
} }
#[derive(Debug)] #[derive(Debug)]
/// TODO [arkpar] Please document me /// Import to the block queue result
pub enum ImportError { pub enum ImportError {
/// TODO [arkpar] Please document me /// Bad block detected
Bad(Option<Error>), Bad(Option<Error>),
/// TODO [arkpar] Please document me /// Already in the block chain
AlreadyInChain, AlreadyInChain,
/// TODO [arkpar] Please document me /// Already in the block queue
AlreadyQueued, AlreadyQueued,
/// Unknown parent
UnknownParent,
} }
impl From<Error> for ImportError { impl From<Error> for ImportError {

View File

@ -408,6 +408,7 @@ impl ChainSync {
trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h); trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h);
let header_view = HeaderView::new(header_rlp.as_raw()); let header_view = HeaderView::new(header_rlp.as_raw());
// TODO: Decompose block and add to self.headers and self.bodies instead // TODO: Decompose block and add to self.headers and self.bodies instead
let mut unknown = false;
if header_view.number() == From::from(self.last_imported_block + 1) { if header_view.number() == From::from(self.last_imported_block + 1) {
match io.chain().import_block(block_rlp.as_raw().to_vec()) { match io.chain().import_block(block_rlp.as_raw().to_vec()) {
Err(ImportError::AlreadyInChain) => { Err(ImportError::AlreadyInChain) => {
@ -416,6 +417,10 @@ impl ChainSync {
Err(ImportError::AlreadyQueued) => { Err(ImportError::AlreadyQueued) => {
trace!(target: "sync", "New block already queued {:?}", h); trace!(target: "sync", "New block already queued {:?}", h);
}, },
Err(ImportError::UnknownParent) => {
unknown = true;
trace!(target: "sync", "New block with unknown parent {:?}", h);
},
Ok(_) => { Ok(_) => {
trace!(target: "sync", "New block queued {:?}", h); trace!(target: "sync", "New block queued {:?}", h);
}, },
@ -426,6 +431,9 @@ impl ChainSync {
}; };
} }
else { else {
unknown = true;
}
if unknown {
trace!(target: "sync", "New block unknown {:?}", h); trace!(target: "sync", "New block unknown {:?}", h);
//TODO: handle too many unknown blocks //TODO: handle too many unknown blocks
let difficulty: U256 = try!(r.val_at(1)); let difficulty: U256 = try!(r.val_at(1));