commit
						3885cc07e4
					
				| @ -19,6 +19,7 @@ time = "0.1" | |||||||
| #interpolate_idents = { git = "https://github.com/SkylerLipthay/interpolate_idents" } | #interpolate_idents = { git = "https://github.com/SkylerLipthay/interpolate_idents" } | ||||||
| evmjit = { path = "rust-evmjit", optional = true } | evmjit = { path = "rust-evmjit", optional = true } | ||||||
| ethash = { path = "ethash" } | ethash = { path = "ethash" } | ||||||
|  | num_cpus = "0.2" | ||||||
| 
 | 
 | ||||||
| [features] | [features] | ||||||
| jit = ["evmjit"] | jit = ["evmjit"] | ||||||
|  | |||||||
							
								
								
									
										34
									
								
								src/block.rs
									
									
									
									
									
								
							
							
						
						
									
										34
									
								
								src/block.rs
									
									
									
									
									
								
							| @ -1,6 +1,7 @@ | |||||||
| use common::*; | use common::*; | ||||||
| use engine::*; | use engine::*; | ||||||
| use state::*; | use state::*; | ||||||
|  | use verification::PreVerifiedBlock; | ||||||
| 
 | 
 | ||||||
| /// A transaction/receipt execution entry.
 | /// A transaction/receipt execution entry.
 | ||||||
| pub struct Entry { | pub struct Entry { | ||||||
| @ -263,30 +264,39 @@ impl IsBlock for SealedBlock { | |||||||
| 	fn block(&self) -> &Block { &self.block } | 	fn block(&self) -> &Block { &self.block } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| /// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header
 | /// Enact the block given by block header, transactions and uncles
 | ||||||
| pub fn enact<'x, 'y>(block_bytes: &[u8], engine: &'x Engine, db: OverlayDB, parent: &Header, last_hashes: &'y LastHashes) -> Result<ClosedBlock<'x, 'y>, Error> { | pub fn enact<'x, 'y>(header: &Header, transactions: &[Transaction], uncles: &[Header], engine: &'x Engine, db: OverlayDB, parent: &Header, last_hashes: &'y LastHashes) -> Result<ClosedBlock<'x, 'y>, Error> { | ||||||
| 	{ | 	{ | ||||||
| 		let header = BlockView::new(block_bytes).header_view(); |  | ||||||
| 		let s = State::from_existing(db.clone(), parent.state_root().clone(), engine.account_start_nonce()); | 		let s = State::from_existing(db.clone(), parent.state_root().clone(), engine.account_start_nonce()); | ||||||
| 		trace!("enact(): root={}, author={}, author_balance={}\n", s.root(), header.author(), s.balance(&header.author())); | 		trace!("enact(): root={}, author={}, author_balance={}\n", s.root(), header.author(), s.balance(&header.author())); | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	let block = BlockView::new(block_bytes); | 	let mut b = OpenBlock::new(engine, db, parent, last_hashes, header.author().clone(), header.extra_data().clone()); | ||||||
| 	let header = block.header_view(); | 	b.set_difficulty(*header.difficulty()); | ||||||
| 	let mut b = OpenBlock::new(engine, db, parent, last_hashes, header.author(), header.extra_data()); | 	b.set_gas_limit(*header.gas_limit()); | ||||||
| 	b.set_difficulty(header.difficulty()); |  | ||||||
| 	b.set_gas_limit(header.gas_limit()); |  | ||||||
| 	b.set_timestamp(header.timestamp()); | 	b.set_timestamp(header.timestamp()); | ||||||
| //	info!("enact: Enacting #{}. env_info={:?}", header.number(), b.env_info());
 | 	for t in transactions { try!(b.push_transaction(t.clone(), None)); } | ||||||
| 	for t in block.transactions().into_iter() { try!(b.push_transaction(t, None)); } | 	for u in uncles { try!(b.push_uncle(u.clone())); } | ||||||
| 	for u in block.uncles().into_iter() { try!(b.push_uncle(u)); } |  | ||||||
| 	Ok(b.close()) | 	Ok(b.close()) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | /// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header
 | ||||||
|  | pub fn enact_bytes<'x, 'y>(block_bytes: &[u8], engine: &'x Engine, db: OverlayDB, parent: &Header, last_hashes: &'y LastHashes) -> Result<ClosedBlock<'x, 'y>, Error> { | ||||||
|  | 	let block = BlockView::new(block_bytes); | ||||||
|  | 	let header = block.header(); | ||||||
|  | 	enact(&header, &block.transactions(), &block.uncles(), engine, db, parent, last_hashes) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | /// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header
 | ||||||
|  | pub fn enact_verified<'x, 'y>(block: &PreVerifiedBlock, engine: &'x Engine, db: OverlayDB, parent: &Header, last_hashes: &'y LastHashes) -> Result<ClosedBlock<'x, 'y>, Error> { | ||||||
|  | 	let view = BlockView::new(&block.bytes); | ||||||
|  | 	enact(&block.header, &block.transactions, &view.uncles(), engine, db, parent, last_hashes) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| /// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header. Seal the block aferwards
 | /// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header. Seal the block aferwards
 | ||||||
| pub fn enact_and_seal(block_bytes: &[u8], engine: &Engine, db: OverlayDB, parent: &Header, last_hashes: &LastHashes) -> Result<SealedBlock, Error> { | pub fn enact_and_seal(block_bytes: &[u8], engine: &Engine, db: OverlayDB, parent: &Header, last_hashes: &LastHashes) -> Result<SealedBlock, Error> { | ||||||
| 	let header = BlockView::new(block_bytes).header_view(); | 	let header = BlockView::new(block_bytes).header_view(); | ||||||
| 	Ok(try!(try!(enact(block_bytes, engine, db, parent, last_hashes)).seal(header.seal()))) | 	Ok(try!(try!(enact_bytes(block_bytes, engine, db, parent, last_hashes)).seal(header.seal()))) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[test] | #[test] | ||||||
|  | |||||||
							
								
								
									
										103
									
								
								src/client.rs
									
									
									
									
									
								
							
							
						
						
									
										103
									
								
								src/client.rs
									
									
									
									
									
								
							| @ -88,7 +88,7 @@ pub trait BlockChainClient : Sync + Send { | |||||||
| 	fn block_receipts(&self, hash: &H256) -> Option<Bytes>; | 	fn block_receipts(&self, hash: &H256) -> Option<Bytes>; | ||||||
| 
 | 
 | ||||||
| 	/// Import a block into the blockchain.
 | 	/// Import a block into the blockchain.
 | ||||||
| 	fn import_block(&mut self, byte: &[u8]) -> ImportResult; | 	fn import_block(&mut self, bytes: Bytes) -> ImportResult; | ||||||
| 
 | 
 | ||||||
| 	/// Get block queue information.
 | 	/// Get block queue information.
 | ||||||
| 	fn queue_status(&self) -> BlockQueueStatus; | 	fn queue_status(&self) -> BlockQueueStatus; | ||||||
| @ -152,58 +152,75 @@ impl Client { | |||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/// This is triggered by a message coming from a block queue when the block is ready for insertion
 | 	/// This is triggered by a message coming from a block queue when the block is ready for insertion
 | ||||||
| 	pub fn import_verified_block(&mut self, bytes: Bytes) { | 	pub fn import_verified_blocks(&mut self) { | ||||||
| 		let block = BlockView::new(&bytes); | 		
 | ||||||
| 		let header = block.header(); | 		let mut bad = HashSet::new(); | ||||||
| 		if let Err(e) = verify_block_family(&header, &bytes, self.engine.deref().deref(), self.chain.read().unwrap().deref()) { | 		let blocks = self.queue.drain(128); 
 | ||||||
| 			warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); | 		if blocks.is_empty() { | ||||||
| 			self.queue.mark_as_bad(&header.hash()); |  | ||||||
| 			return; | 			return; | ||||||
| 		}; | 		} | ||||||
| 		let parent = match self.chain.read().unwrap().block_header(&header.parent_hash) { | 
 | ||||||
| 			Some(p) => p, | 		for block in blocks { | ||||||
| 			None => { | 			if bad.contains(&block.header.parent_hash) { | ||||||
| 				warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash); | 				self.queue.mark_as_bad(&block.header.hash()); | ||||||
|  | 				bad.insert(block.header.hash()); | ||||||
|  | 				continue; | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			let header = &block.header; | ||||||
|  | 			if let Err(e) = verify_block_family(&header, &block.bytes, self.engine.deref().deref(), self.chain.read().unwrap().deref()) { | ||||||
|  | 				warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); | ||||||
| 				self.queue.mark_as_bad(&header.hash()); | 				self.queue.mark_as_bad(&header.hash()); | ||||||
|  | 				bad.insert(block.header.hash()); | ||||||
| 				return; | 				return; | ||||||
| 			}, | 			}; | ||||||
| 		}; | 			let parent = match self.chain.read().unwrap().block_header(&header.parent_hash) { | ||||||
| 		// build last hashes
 | 				Some(p) => p, | ||||||
| 		let mut last_hashes = LastHashes::new(); | 				None => { | ||||||
| 		last_hashes.resize(256, H256::new()); | 					warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash); | ||||||
| 		last_hashes[0] = header.parent_hash.clone(); | 					self.queue.mark_as_bad(&header.hash()); | ||||||
| 		for i in 0..255 { | 					bad.insert(block.header.hash()); | ||||||
| 			match self.chain.read().unwrap().block_details(&last_hashes[i]) { | 					return; | ||||||
| 				Some(details) => { |  | ||||||
| 					last_hashes[i + 1] = details.parent.clone(); |  | ||||||
| 				}, | 				}, | ||||||
| 				None => break, | 			}; | ||||||
|  | 			// build last hashes
 | ||||||
|  | 			let mut last_hashes = LastHashes::new(); | ||||||
|  | 			last_hashes.resize(256, H256::new()); | ||||||
|  | 			last_hashes[0] = header.parent_hash.clone(); | ||||||
|  | 			for i in 0..255 { | ||||||
|  | 				match self.chain.read().unwrap().block_details(&last_hashes[i]) { | ||||||
|  | 					Some(details) => { | ||||||
|  | 						last_hashes[i + 1] = details.parent.clone(); | ||||||
|  | 					}, | ||||||
|  | 					None => break, | ||||||
|  | 				} | ||||||
| 			} | 			} | ||||||
| 		} |  | ||||||
| 
 | 
 | ||||||
| 		let result = match enact(&bytes, self.engine.deref().deref(), self.state_db.clone(), &parent, &last_hashes) { | 			let result = match enact_verified(&block, self.engine.deref().deref(), self.state_db.clone(), &parent, &last_hashes) { | ||||||
| 			Ok(b) => b, | 				Ok(b) => b, | ||||||
| 			Err(e) => { | 				Err(e) => { | ||||||
| 				warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); | 					warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); | ||||||
|  | 				bad.insert(block.header.hash()); | ||||||
|  | 					self.queue.mark_as_bad(&header.hash()); | ||||||
|  | 					return; | ||||||
|  | 				} | ||||||
|  | 			}; | ||||||
|  | 			if let Err(e) = verify_block_final(&header, result.block().header()) { | ||||||
|  | 				warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); | ||||||
| 				self.queue.mark_as_bad(&header.hash()); | 				self.queue.mark_as_bad(&header.hash()); | ||||||
| 				return; | 				return; | ||||||
| 			} | 			} | ||||||
| 		}; |  | ||||||
| 		if let Err(e) = verify_block_final(&header, result.block().header()) { |  | ||||||
| 			warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); |  | ||||||
| 			self.queue.mark_as_bad(&header.hash()); |  | ||||||
| 			return; |  | ||||||
| 		} |  | ||||||
| 
 | 
 | ||||||
| 		self.chain.write().unwrap().insert_block(&bytes); //TODO: err here?
 | 			self.chain.write().unwrap().insert_block(&block.bytes); //TODO: err here?
 | ||||||
| 		match result.drain().commit() { | 			match result.drain().commit() { | ||||||
| 			Ok(_) => (), | 				Ok(_) => (), | ||||||
| 			Err(e) => { | 				Err(e) => { | ||||||
| 				warn!(target: "client", "State DB commit failed: {:?}", e); | 					warn!(target: "client", "State DB commit failed: {:?}", e); | ||||||
| 				return; | 					return; | ||||||
|  | 				} | ||||||
| 			} | 			} | ||||||
|  | 			info!(target: "client", "Imported #{} ({})", header.number(), header.hash()); | ||||||
| 		} | 		} | ||||||
| 		info!(target: "client", "Imported #{} ({})", header.number(), header.hash()); |  | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -261,8 +278,8 @@ impl BlockChainClient for Client { | |||||||
| 		unimplemented!(); | 		unimplemented!(); | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	fn import_block(&mut self, bytes: &[u8]) -> ImportResult { | 	fn import_block(&mut self, bytes: Bytes) -> ImportResult { | ||||||
| 		let header = BlockView::new(bytes).header(); | 		let header = BlockView::new(&bytes).header(); | ||||||
| 		if self.chain.read().unwrap().is_known(&header.hash()) { | 		if self.chain.read().unwrap().is_known(&header.hash()) { | ||||||
| 			return Err(ImportError::AlreadyInChain); | 			return Err(ImportError::AlreadyInChain); | ||||||
| 		} | 		} | ||||||
|  | |||||||
| @ -146,6 +146,10 @@ impl Engine for Ethash { | |||||||
| 		} | 		} | ||||||
| 		Ok(()) | 		Ok(()) | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	fn verify_transaction(&self, t: &Transaction, _header: &Header) -> Result<(), Error> { | ||||||
|  | 		t.sender().map(|_|()) // Perform EC recovery and cache sender
 | ||||||
|  | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl Ethash { | impl Ethash { | ||||||
|  | |||||||
| @ -83,6 +83,7 @@ extern crate heapsize; | |||||||
| extern crate crypto; | extern crate crypto; | ||||||
| extern crate time; | extern crate time; | ||||||
| extern crate env_logger; | extern crate env_logger; | ||||||
|  | extern crate num_cpus; | ||||||
| #[cfg(feature = "jit" )] | #[cfg(feature = "jit" )] | ||||||
| extern crate evmjit; | extern crate evmjit; | ||||||
| #[macro_use] | #[macro_use] | ||||||
|  | |||||||
							
								
								
									
										228
									
								
								src/queue.rs
									
									
									
									
									
								
							
							
						
						
									
										228
									
								
								src/queue.rs
									
									
									
									
									
								
							| @ -1,59 +1,245 @@ | |||||||
|  | use std::thread::{JoinHandle, self}; | ||||||
|  | use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; | ||||||
| use util::*; | use util::*; | ||||||
| use verification::*; | use verification::*; | ||||||
| use error::*; | use error::*; | ||||||
| use engine::Engine; | use engine::Engine; | ||||||
| use sync::*; | use sync::*; | ||||||
| use views::*; | use views::*; | ||||||
|  | use header::*; | ||||||
| 
 | 
 | ||||||
| /// A queue of blocks. Sits between network or other I/O and the BlockChain.
 | /// A queue of blocks. Sits between network or other I/O and the BlockChain.
 | ||||||
| /// Sorts them ready for blockchain insertion.
 | /// Sorts them ready for blockchain insertion.
 | ||||||
| pub struct BlockQueue { | pub struct BlockQueue { | ||||||
| 	engine: Arc<Box<Engine>>, | 	engine: Arc<Box<Engine>>, | ||||||
|  | 	more_to_verify: Arc<Condvar>, | ||||||
|  | 	verification: Arc<Mutex<Verification>>, | ||||||
|  | 	verifiers: Vec<JoinHandle<()>>, | ||||||
|  | 	deleting: Arc<AtomicBool>, | ||||||
|  | 	ready_signal: Arc<QueueSignal>, | ||||||
|  | 	processing: HashSet<H256> | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | struct UnVerifiedBlock { | ||||||
|  | 	header: Header, | ||||||
|  | 	bytes: Bytes, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | struct VerifyingBlock { | ||||||
|  | 	hash: H256, | ||||||
|  | 	block: Option<PreVerifiedBlock>, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | struct QueueSignal { | ||||||
|  | 	signalled: AtomicBool, | ||||||
| 	message_channel: IoChannel<NetSyncMessage>, | 	message_channel: IoChannel<NetSyncMessage>, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl QueueSignal { | ||||||
|  | 	fn set(&self) { | ||||||
|  | 		if self.signalled.compare_and_swap(false, true, AtomicOrdering::Relaxed) == false { | ||||||
|  | 			self.message_channel.send(UserMessage(SyncMessage::BlockVerified)).expect("Error sending BlockVerified message"); | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	fn reset(&self) { | ||||||
|  | 		self.signalled.store(false, AtomicOrdering::Relaxed); | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | #[derive(Default)] | ||||||
|  | struct Verification { | ||||||
|  | 	unverified: VecDeque<UnVerifiedBlock>, | ||||||
|  | 	verified: VecDeque<PreVerifiedBlock>, | ||||||
|  | 	verifying: VecDeque<VerifyingBlock>, | ||||||
| 	bad: HashSet<H256>, | 	bad: HashSet<H256>, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl BlockQueue { | impl BlockQueue { | ||||||
| 	/// Creates a new queue instance.
 | 	/// Creates a new queue instance.
 | ||||||
| 	pub fn new(engine: Arc<Box<Engine>>, message_channel: IoChannel<NetSyncMessage>) -> BlockQueue { | 	pub fn new(engine: Arc<Box<Engine>>, message_channel: IoChannel<NetSyncMessage>) -> BlockQueue { | ||||||
|  | 		let verification = Arc::new(Mutex::new(Verification::default())); | ||||||
|  | 		let more_to_verify = Arc::new(Condvar::new()); | ||||||
|  | 		let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel }); | ||||||
|  | 		let deleting = Arc::new(AtomicBool::new(false)); | ||||||
|  | 
 | ||||||
|  | 		let mut verifiers: Vec<JoinHandle<()>> = Vec::new(); | ||||||
|  | 		let thread_count = max(::num_cpus::get(), 2) - 1; | ||||||
|  | 		for _ in 0..thread_count { | ||||||
|  | 			let verification = verification.clone(); | ||||||
|  | 			let engine = engine.clone(); | ||||||
|  | 			let more_to_verify = more_to_verify.clone(); | ||||||
|  | 			let ready_signal = ready_signal.clone(); | ||||||
|  | 			let deleting = deleting.clone(); | ||||||
|  | 			verifiers.push(thread::spawn(move || BlockQueue::verify(verification, engine, more_to_verify, ready_signal,  deleting))); | ||||||
|  | 		} | ||||||
| 		BlockQueue { | 		BlockQueue { | ||||||
| 			engine: engine, | 			engine: engine, | ||||||
| 			message_channel: message_channel, | 			ready_signal: ready_signal.clone(), | ||||||
| 			bad: HashSet::new(), | 			more_to_verify: more_to_verify.clone(), | ||||||
|  | 			verification: verification.clone(), | ||||||
|  | 			verifiers: verifiers, | ||||||
|  | 			deleting: deleting.clone(), | ||||||
|  | 			processing: HashSet::new(), | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	fn verify(verification: Arc<Mutex<Verification>>, engine: Arc<Box<Engine>>, wait: Arc<Condvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>) { | ||||||
|  | 		while !deleting.load(AtomicOrdering::Relaxed) { | ||||||
|  | 			{ | ||||||
|  | 				let mut lock = verification.lock().unwrap(); | ||||||
|  | 				while lock.unverified.is_empty() && !deleting.load(AtomicOrdering::Relaxed) { | ||||||
|  | 					lock = wait.wait(lock).unwrap(); | ||||||
|  | 				} | ||||||
|  | 				
 | ||||||
|  | 				if deleting.load(AtomicOrdering::Relaxed) { | ||||||
|  | 					return; | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			let block = { | ||||||
|  | 				let mut v = verification.lock().unwrap(); | ||||||
|  | 				if v.unverified.is_empty() { | ||||||
|  | 					continue; | ||||||
|  | 				} | ||||||
|  | 				let block = v.unverified.pop_front().unwrap(); | ||||||
|  | 				v.verifying.push_back(VerifyingBlock{ hash: block.header.hash(), block: None }); | ||||||
|  | 				block | ||||||
|  | 			}; | ||||||
|  | 
 | ||||||
|  | 			let block_hash = block.header.hash(); | ||||||
|  | 			match verify_block_unordered(block.header, block.bytes, engine.deref().deref()) { | ||||||
|  | 				Ok(verified) => { | ||||||
|  | 					let mut v = verification.lock().unwrap(); | ||||||
|  | 					for e in &mut v.verifying { | ||||||
|  | 						if e.hash == block_hash { | ||||||
|  | 							e.block = Some(verified); | ||||||
|  | 							break; | ||||||
|  | 						} | ||||||
|  | 					} | ||||||
|  | 					if !v.verifying.is_empty() && v.verifying.front().unwrap().hash == block_hash { | ||||||
|  | 						// we're next!
 | ||||||
|  | 						let mut vref = v.deref_mut(); | ||||||
|  | 						BlockQueue::drain_verifying(&mut vref.verifying, &mut vref.verified, &mut vref.bad); | ||||||
|  | 						ready.set(); | ||||||
|  | 					} | ||||||
|  | 				}, | ||||||
|  | 				Err(err) => { | ||||||
|  | 					let mut v = verification.lock().unwrap(); | ||||||
|  | 					warn!(target: "client", "Stage 2 block verification failed for {}\nError: {:?}", block_hash, err); | ||||||
|  | 					v.bad.insert(block_hash.clone()); | ||||||
|  | 					v.verifying.retain(|e| e.hash != block_hash); | ||||||
|  | 					let mut vref = v.deref_mut(); | ||||||
|  | 					BlockQueue::drain_verifying(&mut vref.verifying, &mut vref.verified, &mut vref.bad); | ||||||
|  | 					ready.set(); | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	fn drain_verifying(verifying: &mut VecDeque<VerifyingBlock>, verified: &mut VecDeque<PreVerifiedBlock>, bad: &mut HashSet<H256>) { | ||||||
|  | 		while !verifying.is_empty() && verifying.front().unwrap().block.is_some() { | ||||||
|  | 			let block = verifying.pop_front().unwrap().block.unwrap(); | ||||||
|  | 			if bad.contains(&block.header.parent_hash) { | ||||||
|  | 				bad.insert(block.header.hash()); | ||||||
|  | 			} | ||||||
|  | 			else { | ||||||
|  | 				verified.push_back(block); | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/// Clear the queue and stop verification activity.
 | 	/// Clear the queue and stop verification activity.
 | ||||||
| 	pub fn clear(&mut self) { | 	pub fn clear(&mut self) { | ||||||
|  | 		let mut verification = self.verification.lock().unwrap(); | ||||||
|  | 		verification.unverified.clear(); | ||||||
|  | 		verification.verifying.clear(); | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/// Add a block to the queue.
 | 	/// Add a block to the queue.
 | ||||||
| 	pub fn import_block(&mut self, bytes: &[u8]) -> ImportResult { | 	pub fn import_block(&mut self, bytes: Bytes) -> ImportResult { | ||||||
| 		let header = BlockView::new(bytes).header(); | 		let header = BlockView::new(&bytes).header(); | ||||||
| 		if self.bad.contains(&header.hash()) { | 		if self.processing.contains(&header.hash()) { | ||||||
| 			return Err(ImportError::Bad(None)); | 			return Err(ImportError::AlreadyQueued); | ||||||
|  | 		} | ||||||
|  | 		{ | ||||||
|  | 			let mut verification = self.verification.lock().unwrap(); | ||||||
|  | 			if verification.bad.contains(&header.hash()) { | ||||||
|  | 				return Err(ImportError::Bad(None)); | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			if verification.bad.contains(&header.parent_hash) { | ||||||
|  | 				verification.bad.insert(header.hash()); | ||||||
|  | 				return Err(ImportError::Bad(None)); | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if self.bad.contains(&header.parent_hash) { | 		match verify_block_basic(&header, &bytes, self.engine.deref().deref()) { | ||||||
| 			self.bad.insert(header.hash()); | 			Ok(()) => { | ||||||
| 			return Err(ImportError::Bad(None)); | 				self.processing.insert(header.hash()); | ||||||
|  | 				self.verification.lock().unwrap().unverified.push_back(UnVerifiedBlock { header: header, bytes: bytes }); | ||||||
|  | 				self.more_to_verify.notify_all(); | ||||||
|  | 			}, | ||||||
|  | 			Err(err) => { | ||||||
|  | 				warn!(target: "client", "Stage 1 block verification failed for {}\nError: {:?}", BlockView::new(&bytes).header_view().sha3(), err); | ||||||
|  | 				self.verification.lock().unwrap().bad.insert(header.hash()); | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 
 |  | ||||||
| 		try!(verify_block_basic(&header, bytes, self.engine.deref().deref()).map_err(|e| { |  | ||||||
| 			warn!(target: "client", "Stage 1 block verification failed for {}\nError: {:?}", BlockView::new(&bytes).header_view().sha3(), e); |  | ||||||
| 			e |  | ||||||
| 		})); |  | ||||||
| 		try!(verify_block_unordered(&header, bytes, self.engine.deref().deref()).map_err(|e| { |  | ||||||
| 			warn!(target: "client", "Stage 2 block verification failed for {}\nError: {:?}", BlockView::new(&bytes).header_view().sha3(), e); |  | ||||||
| 			e |  | ||||||
| 		})); |  | ||||||
| 		try!(self.message_channel.send(UserMessage(SyncMessage::BlockVerified(bytes.to_vec()))).map_err(|e| Error::from(e))); |  | ||||||
| 		Ok(()) | 		Ok(()) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	/// Mark given block and all its children as bad. Stops verification.
 | ||||||
| 	pub fn mark_as_bad(&mut self, hash: &H256) { | 	pub fn mark_as_bad(&mut self, hash: &H256) { | ||||||
| 		self.bad.insert(hash.clone()); | 		let mut verification_lock = self.verification.lock().unwrap(); | ||||||
| 		//TODO: walk the queue
 | 		let mut verification = verification_lock.deref_mut(); | ||||||
|  | 		verification.bad.insert(hash.clone()); | ||||||
|  | 		let mut new_verified = VecDeque::new(); | ||||||
|  | 		for block in verification.verified.drain(..) { | ||||||
|  | 			if verification.bad.contains(&block.header.parent_hash) { | ||||||
|  | 				verification.bad.insert(block.header.hash()); | ||||||
|  | 			} | ||||||
|  | 			else { | ||||||
|  | 				new_verified.push_back(block); | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		verification.verified = new_verified; | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	pub fn drain(&mut self, max: usize) -> Vec<PreVerifiedBlock> { | ||||||
|  | 		let mut verification = self.verification.lock().unwrap(); | ||||||
|  | 		let count = min(max, verification.verified.len()); | ||||||
|  | 		let mut result = Vec::with_capacity(count); | ||||||
|  | 		for _ in 0..count { | ||||||
|  | 			let block = verification.verified.pop_front().unwrap(); | ||||||
|  | 			self.processing.remove(&block.header.hash()); | ||||||
|  | 			result.push(block); | ||||||
|  | 		} | ||||||
|  | 		self.ready_signal.reset(); | ||||||
|  | 		result | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | impl Drop for BlockQueue { | ||||||
|  | 	fn drop(&mut self) { | ||||||
|  | 		self.clear(); | ||||||
|  | 		self.deleting.store(true, AtomicOrdering::Relaxed); | ||||||
|  | 		self.more_to_verify.notify_all(); | ||||||
|  | 		for t in self.verifiers.drain(..) { | ||||||
|  | 			t.join().unwrap(); | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | #[cfg(test)] | ||||||
|  | mod tests { | ||||||
|  | 	use util::*; | ||||||
|  | 	use spec::*; | ||||||
|  | 	use queue::*; | ||||||
|  | 
 | ||||||
|  | 	#[test] | ||||||
|  | 	fn test_block_queue() { | ||||||
|  | 		// TODO better test
 | ||||||
|  | 		let spec = Spec::new_test(); | ||||||
|  | 		let engine = spec.to_engine().unwrap(); | ||||||
|  | 		let _ = BlockQueue::new(Arc::new(engine), IoChannel::disconnected()); | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | |||||||
| @ -55,8 +55,8 @@ impl IoHandler<NetSyncMessage> for ClientIoHandler { | |||||||
| 		match net_message { | 		match net_message { | ||||||
| 			&mut UserMessage(ref mut message) =>  { | 			&mut UserMessage(ref mut message) =>  { | ||||||
| 				match message { | 				match message { | ||||||
| 					&mut SyncMessage::BlockVerified(ref mut bytes) => { | 					&mut SyncMessage::BlockVerified => { | ||||||
| 						self.client.write().unwrap().import_verified_block(mem::replace(bytes, Bytes::new())); | 						self.client.write().unwrap().import_verified_blocks(); | ||||||
| 					}, | 					}, | ||||||
| 					_ => {}, // ignore other messages
 | 					_ => {}, // ignore other messages
 | ||||||
| 				} | 				} | ||||||
|  | |||||||
| @ -401,7 +401,7 @@ impl ChainSync { | |||||||
| 		let header_view = HeaderView::new(header_rlp.as_raw()); | 		let header_view = HeaderView::new(header_rlp.as_raw()); | ||||||
| 		// TODO: Decompose block and add to self.headers and self.bodies instead
 | 		// TODO: Decompose block and add to self.headers and self.bodies instead
 | ||||||
| 		if header_view.number() == From::from(self.last_imported_block + 1) { | 		if header_view.number() == From::from(self.last_imported_block + 1) { | ||||||
| 			match io.chain().import_block(block_rlp.as_raw()) { | 			match io.chain().import_block(block_rlp.as_raw().to_vec()) { | ||||||
| 				Err(ImportError::AlreadyInChain) => { | 				Err(ImportError::AlreadyInChain) => { | ||||||
| 					trace!(target: "sync", "New block already in chain {:?}", h); | 					trace!(target: "sync", "New block already in chain {:?}", h); | ||||||
| 				}, | 				}, | ||||||
| @ -655,7 +655,7 @@ impl ChainSync { | |||||||
| 				block_rlp.append_raw(body.at(0).as_raw(), 1); | 				block_rlp.append_raw(body.at(0).as_raw(), 1); | ||||||
| 				block_rlp.append_raw(body.at(1).as_raw(), 1); | 				block_rlp.append_raw(body.at(1).as_raw(), 1); | ||||||
| 				let h = &headers.1[i].hash; | 				let h = &headers.1[i].hash; | ||||||
| 				match io.chain().import_block(&block_rlp.out()) { | 				match io.chain().import_block(block_rlp.out()) { | ||||||
| 					Err(ImportError::AlreadyInChain) => { | 					Err(ImportError::AlreadyInChain) => { | ||||||
| 						trace!(target: "sync", "Block already in chain {:?}", h); | 						trace!(target: "sync", "Block already in chain {:?}", h); | ||||||
| 						self.last_imported_block = headers.0 + i as BlockNumber; | 						self.last_imported_block = headers.0 + i as BlockNumber; | ||||||
|  | |||||||
| @ -43,7 +43,7 @@ pub enum SyncMessage { | |||||||
| 	/// New block has been imported into the blockchain
 | 	/// New block has been imported into the blockchain
 | ||||||
| 	NewChainBlock(Bytes), | 	NewChainBlock(Bytes), | ||||||
| 	/// A block is ready 
 | 	/// A block is ready 
 | ||||||
| 	BlockVerified(Bytes), | 	BlockVerified, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub type NetSyncMessage = NetworkIoMessage<SyncMessage>; | pub type NetSyncMessage = NetworkIoMessage<SyncMessage>; | ||||||
|  | |||||||
| @ -43,7 +43,7 @@ impl TestBlockChainClient { | |||||||
| 			rlp.append(&header); | 			rlp.append(&header); | ||||||
| 			rlp.append_raw(&rlp::NULL_RLP, 1); | 			rlp.append_raw(&rlp::NULL_RLP, 1); | ||||||
| 			rlp.append_raw(uncles.as_raw(), 1); | 			rlp.append_raw(uncles.as_raw(), 1); | ||||||
| 			self.import_block(rlp.as_raw()).unwrap(); | 			self.import_block(rlp.as_raw().to_vec()).unwrap(); | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| @ -110,7 +110,7 @@ impl BlockChainClient for TestBlockChainClient { | |||||||
| 		None | 		None | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	fn import_block(&mut self, b: &[u8]) -> ImportResult { | 	fn import_block(&mut self, b: Bytes) -> ImportResult { | ||||||
| 		let header = Rlp::new(&b).val_at::<BlockHeader>(0); | 		let header = Rlp::new(&b).val_at::<BlockHeader>(0); | ||||||
| 		let number: usize = header.number as usize; | 		let number: usize = header.number as usize; | ||||||
| 		if number > self.blocks.len() { | 		if number > self.blocks.len() { | ||||||
| @ -132,7 +132,7 @@ impl BlockChainClient for TestBlockChainClient { | |||||||
| 		if number == self.numbers.len() { | 		if number == self.numbers.len() { | ||||||
| 			self.difficulty = self.difficulty + header.difficulty; | 			self.difficulty = self.difficulty + header.difficulty; | ||||||
| 			self.last_hash = header.hash(); | 			self.last_hash = header.hash(); | ||||||
| 			self.blocks.insert(header.hash(), b.to_vec()); | 			self.blocks.insert(header.hash(), b); | ||||||
| 			self.numbers.insert(number, header.hash()); | 			self.numbers.insert(number, header.hash()); | ||||||
| 			let mut parent_hash = header.parent_hash; | 			let mut parent_hash = header.parent_hash; | ||||||
| 			if number > 0 { | 			if number > 0 { | ||||||
|  | |||||||
| @ -9,6 +9,16 @@ use common::*; | |||||||
| use engine::Engine; | use engine::Engine; | ||||||
| use blockchain::*; | use blockchain::*; | ||||||
| 
 | 
 | ||||||
|  | /// Preprocessed block data gathered in `verify_block_unordered` call
 | ||||||
|  | pub struct PreVerifiedBlock { | ||||||
|  | 	/// Populated block header
 | ||||||
|  | 	pub header: Header, | ||||||
|  | 	/// Populated block transactions
 | ||||||
|  | 	pub transactions: Vec<Transaction>, | ||||||
|  | 	/// Block bytes
 | ||||||
|  | 	pub bytes: Bytes, | ||||||
|  | } | ||||||
|  | 
 | ||||||
| /// Phase 1 quick block verification. Only does checks that are cheap. Operates on a single block
 | /// Phase 1 quick block verification. Only does checks that are cheap. Operates on a single block
 | ||||||
| pub fn verify_block_basic(header: &Header, bytes: &[u8], engine: &Engine) -> Result<(), Error> { | pub fn verify_block_basic(header: &Header, bytes: &[u8], engine: &Engine) -> Result<(), Error> { | ||||||
| 	try!(verify_header(&header, engine)); | 	try!(verify_header(&header, engine)); | ||||||
| @ -29,19 +39,26 @@ pub fn verify_block_basic(header: &Header, bytes: &[u8], engine: &Engine) -> Res | |||||||
| 
 | 
 | ||||||
| /// Phase 2 verification. Perform costly checks such as transaction signatures and block nonce for ethash.
 | /// Phase 2 verification. Perform costly checks such as transaction signatures and block nonce for ethash.
 | ||||||
| /// Still operates on a individual block
 | /// Still operates on a individual block
 | ||||||
| /// TODO: return cached transactions, header hash.
 | /// Returns a PreVerifiedBlock structure populated with transactions
 | ||||||
| pub fn verify_block_unordered(header: &Header, bytes: &[u8], engine: &Engine) -> Result<(), Error> { | pub fn verify_block_unordered(header: Header, bytes: Bytes, engine: &Engine) -> Result<PreVerifiedBlock, Error> { | ||||||
| 	try!(engine.verify_block_unordered(&header, Some(bytes))); | 	try!(engine.verify_block_unordered(&header, Some(&bytes))); | ||||||
| 	for u in Rlp::new(bytes).at(2).iter().map(|rlp| rlp.as_val::<Header>()) { | 	for u in Rlp::new(&bytes).at(2).iter().map(|rlp| rlp.as_val::<Header>()) { | ||||||
| 		try!(engine.verify_block_unordered(&u, None)); | 		try!(engine.verify_block_unordered(&u, None)); | ||||||
| 	} | 	} | ||||||
| 	// Verify transactions. 
 | 	// Verify transactions. 
 | ||||||
| 	// TODO: pass in pre-recovered transactions - maybe verify_transaction wants to call `sender()`.
 | 	let mut transactions = Vec::new(); | ||||||
| 	let v = BlockView::new(bytes); | 	{ | ||||||
| 	for t in v.transactions() { | 		let v = BlockView::new(&bytes); | ||||||
| 		try!(engine.verify_transaction(&t, &header)); | 		for t in v.transactions() { | ||||||
|  | 			try!(engine.verify_transaction(&t, &header)); | ||||||
|  | 			transactions.push(t); | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 	Ok(()) | 	Ok(PreVerifiedBlock { | ||||||
|  | 		header: header, | ||||||
|  | 		transactions: transactions, | ||||||
|  | 		bytes: bytes, | ||||||
|  | 	}) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| /// Phase 3 verification. Check block information against parent and uncles.
 | /// Phase 3 verification. Check block information against parent and uncles.
 | ||||||
|  | |||||||
| @ -151,14 +151,22 @@ impl<Message> Handler for IoManager<Message> where Message: Send + 'static { | |||||||
| /// Allows sending messages into the event loop. All the IO handlers will get the message
 | /// Allows sending messages into the event loop. All the IO handlers will get the message
 | ||||||
| /// in the `message` callback.
 | /// in the `message` callback.
 | ||||||
| pub struct IoChannel<Message> where Message: Send { | pub struct IoChannel<Message> where Message: Send { | ||||||
| 	channel: Sender<IoMessage<Message>> 
 | 	channel: Option<Sender<IoMessage<Message>>> | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl<Message> IoChannel<Message> where Message: Send { | impl<Message> IoChannel<Message> where Message: Send { | ||||||
| 	pub fn send(&mut self, message: Message) -> Result<(), IoError> { | 	/// Send a msessage through the channel
 | ||||||
| 		try!(self.channel.send(IoMessage::UserMessage(message))); | 	pub fn send(&self, message: Message) -> Result<(), IoError> { | ||||||
|  | 		if let Some(ref channel) = self.channel { | ||||||
|  | 			try!(channel.send(IoMessage::UserMessage(message))); | ||||||
|  | 		} | ||||||
| 		Ok(()) | 		Ok(()) | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	/// Create a new channel to connected to event loop.
 | ||||||
|  | 	pub fn disconnected() -> IoChannel<Message> { | ||||||
|  | 		IoChannel { channel: None } | ||||||
|  | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| /// General IO Service. Starts an event loop and dispatches IO requests.
 | /// General IO Service. Starts an event loop and dispatches IO requests.
 | ||||||
| @ -198,7 +206,7 @@ impl<Message> IoService<Message> where Message: Send + 'static { | |||||||
| 
 | 
 | ||||||
| 	/// Create a new message channel
 | 	/// Create a new message channel
 | ||||||
| 	pub fn channel(&mut self) -> IoChannel<Message> { | 	pub fn channel(&mut self) -> IoChannel<Message> { | ||||||
| 		IoChannel { channel: self.host_channel.clone() } | 		IoChannel { channel: Some(self.host_channel.clone()) } | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user