From a131c33bb228cf3bf3d4786546c2193098ee3d9b Mon Sep 17 00:00:00 2001 From: arkpar Date: Sun, 17 Jan 2016 23:07:58 +0100 Subject: [PATCH 1/4] Multithreaded block queue --- Cargo.toml | 1 + src/block.rs | 34 ++++--- src/client.rs | 103 +++++++++++--------- src/ethereum/ethash.rs | 4 + src/lib.rs | 1 + src/queue.rs | 214 +++++++++++++++++++++++++++++++++++++---- src/service.rs | 4 +- src/sync/chain.rs | 4 +- src/sync/mod.rs | 2 +- src/sync/tests.rs | 6 +- src/verification.rs | 37 +++++-- util/src/io/service.rs | 2 +- 12 files changed, 317 insertions(+), 95 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cea775ef3..0583aa78f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ time = "0.1" #interpolate_idents = { git = "https://github.com/SkylerLipthay/interpolate_idents" } evmjit = { path = "rust-evmjit", optional = true } ethash = { path = "ethash" } +num_cpus = "0.2" [features] jit = ["evmjit"] diff --git a/src/block.rs b/src/block.rs index d149d6132..e5ca18e8c 100644 --- a/src/block.rs +++ b/src/block.rs @@ -1,6 +1,7 @@ use common::*; use engine::*; use state::*; +use verification::PreVerifiedBlock; /// A transaction/receipt execution entry. pub struct Entry { @@ -263,30 +264,39 @@ impl IsBlock for SealedBlock { fn block(&self) -> &Block { &self.block } } -/// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header -pub fn enact<'x, 'y>(block_bytes: &[u8], engine: &'x Engine, db: OverlayDB, parent: &Header, last_hashes: &'y LastHashes) -> Result, Error> { +/// Enact the block given by block header, transactions and uncles +pub fn enact<'x, 'y>(header: &Header, transactions: &[Transaction], uncles: &[Header], engine: &'x Engine, db: OverlayDB, parent: &Header, last_hashes: &'y LastHashes) -> Result, Error> { { - let header = BlockView::new(block_bytes).header_view(); let s = State::from_existing(db.clone(), parent.state_root().clone(), engine.account_start_nonce()); trace!("enact(): root={}, author={}, author_balance={}\n", s.root(), header.author(), s.balance(&header.author())); } - let block = BlockView::new(block_bytes); - let header = block.header_view(); - let mut b = OpenBlock::new(engine, db, parent, last_hashes, header.author(), header.extra_data()); - b.set_difficulty(header.difficulty()); - b.set_gas_limit(header.gas_limit()); + let mut b = OpenBlock::new(engine, db, parent, last_hashes, header.author().clone(), header.extra_data().clone()); + b.set_difficulty(*header.difficulty()); + b.set_gas_limit(*header.gas_limit()); b.set_timestamp(header.timestamp()); -// info!("enact: Enacting #{}. env_info={:?}", header.number(), b.env_info()); - for t in block.transactions().into_iter() { try!(b.push_transaction(t, None)); } - for u in block.uncles().into_iter() { try!(b.push_uncle(u)); } + for t in transactions { try!(b.push_transaction(t.clone(), None)); } + for u in uncles { try!(b.push_uncle(u.clone())); } Ok(b.close()) } +/// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header +pub fn enact_bytes<'x, 'y>(block_bytes: &[u8], engine: &'x Engine, db: OverlayDB, parent: &Header, last_hashes: &'y LastHashes) -> Result, Error> { + let block = BlockView::new(block_bytes); + let header = block.header(); + enact(&header, &block.transactions(), &block.uncles(), engine, db, parent, last_hashes) +} + +/// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header +pub fn enact_verified<'x, 'y>(block: &PreVerifiedBlock, engine: &'x Engine, db: OverlayDB, parent: &Header, last_hashes: &'y LastHashes) -> Result, Error> { + let view = BlockView::new(&block.bytes); + enact(&block.header, &block.transactions, &view.uncles(), engine, db, parent, last_hashes) +} + /// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header. Seal the block aferwards pub fn enact_and_seal(block_bytes: &[u8], engine: &Engine, db: OverlayDB, parent: &Header, last_hashes: &LastHashes) -> Result { let header = BlockView::new(block_bytes).header_view(); - Ok(try!(try!(enact(block_bytes, engine, db, parent, last_hashes)).seal(header.seal()))) + Ok(try!(try!(enact_bytes(block_bytes, engine, db, parent, last_hashes)).seal(header.seal()))) } #[test] diff --git a/src/client.rs b/src/client.rs index 3ee84ccd7..e02ab37d8 100644 --- a/src/client.rs +++ b/src/client.rs @@ -88,7 +88,7 @@ pub trait BlockChainClient : Sync + Send { fn block_receipts(&self, hash: &H256) -> Option; /// Import a block into the blockchain. - fn import_block(&mut self, byte: &[u8]) -> ImportResult; + fn import_block(&mut self, bytes: Bytes) -> ImportResult; /// Get block queue information. fn queue_status(&self) -> BlockQueueStatus; @@ -152,58 +152,75 @@ impl Client { } /// This is triggered by a message coming from a block queue when the block is ready for insertion - pub fn import_verified_block(&mut self, bytes: Bytes) { - let block = BlockView::new(&bytes); - let header = block.header(); - if let Err(e) = verify_block_family(&header, &bytes, self.engine.deref().deref(), self.chain.read().unwrap().deref()) { - warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); - self.queue.mark_as_bad(&header.hash()); + pub fn import_verified_blocks(&mut self) { + + let mut bad = HashSet::new(); + let blocks = self.queue.drain(128); + if blocks.is_empty() { return; - }; - let parent = match self.chain.read().unwrap().block_header(&header.parent_hash) { - Some(p) => p, - None => { - warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash); + } + + for block in blocks { + if bad.contains(&block.header.parent_hash) { + self.queue.mark_as_bad(&block.header.hash()); + bad.insert(block.header.hash()); + continue; + } + + let header = &block.header; + if let Err(e) = verify_block_family(&header, &block.bytes, self.engine.deref().deref(), self.chain.read().unwrap().deref()) { + warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); self.queue.mark_as_bad(&header.hash()); + bad.insert(block.header.hash()); return; - }, - }; - // build last hashes - let mut last_hashes = LastHashes::new(); - last_hashes.resize(256, H256::new()); - last_hashes[0] = header.parent_hash.clone(); - for i in 0..255 { - match self.chain.read().unwrap().block_details(&last_hashes[i]) { - Some(details) => { - last_hashes[i + 1] = details.parent.clone(); + }; + let parent = match self.chain.read().unwrap().block_header(&header.parent_hash) { + Some(p) => p, + None => { + warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash); + self.queue.mark_as_bad(&header.hash()); + bad.insert(block.header.hash()); + return; }, - None => break, + }; + // build last hashes + let mut last_hashes = LastHashes::new(); + last_hashes.resize(256, H256::new()); + last_hashes[0] = header.parent_hash.clone(); + for i in 0..255 { + match self.chain.read().unwrap().block_details(&last_hashes[i]) { + Some(details) => { + last_hashes[i + 1] = details.parent.clone(); + }, + None => break, + } } - } - let result = match enact(&bytes, self.engine.deref().deref(), self.state_db.clone(), &parent, &last_hashes) { - Ok(b) => b, - Err(e) => { - warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); + let result = match enact_verified(&block, self.engine.deref().deref(), self.state_db.clone(), &parent, &last_hashes) { + Ok(b) => b, + Err(e) => { + warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); + bad.insert(block.header.hash()); + self.queue.mark_as_bad(&header.hash()); + return; + } + }; + if let Err(e) = verify_block_final(&header, result.block().header()) { + warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); self.queue.mark_as_bad(&header.hash()); return; } - }; - if let Err(e) = verify_block_final(&header, result.block().header()) { - warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); - self.queue.mark_as_bad(&header.hash()); - return; - } - self.chain.write().unwrap().insert_block(&bytes); //TODO: err here? - match result.drain().commit() { - Ok(_) => (), - Err(e) => { - warn!(target: "client", "State DB commit failed: {:?}", e); - return; + self.chain.write().unwrap().insert_block(&block.bytes); //TODO: err here? + match result.drain().commit() { + Ok(_) => (), + Err(e) => { + warn!(target: "client", "State DB commit failed: {:?}", e); + return; + } } + info!(target: "client", "Imported #{} ({})", header.number(), header.hash()); } - info!(target: "client", "Imported #{} ({})", header.number(), header.hash()); } } @@ -261,8 +278,8 @@ impl BlockChainClient for Client { unimplemented!(); } - fn import_block(&mut self, bytes: &[u8]) -> ImportResult { - let header = BlockView::new(bytes).header(); + fn import_block(&mut self, bytes: Bytes) -> ImportResult { + let header = BlockView::new(&bytes).header(); if self.chain.read().unwrap().is_known(&header.hash()) { return Err(ImportError::AlreadyInChain); } diff --git a/src/ethereum/ethash.rs b/src/ethereum/ethash.rs index 99ffc3186..8a86cf4e5 100644 --- a/src/ethereum/ethash.rs +++ b/src/ethereum/ethash.rs @@ -146,6 +146,10 @@ impl Engine for Ethash { } Ok(()) } + + fn verify_transaction(&self, t: &Transaction, _header: &Header) -> Result<(), Error> { + t.sender().map(|_|()) // Perform EC recovery and cache sender + } } impl Ethash { diff --git a/src/lib.rs b/src/lib.rs index 7e4fdf33e..8180736a5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,6 +83,7 @@ extern crate heapsize; extern crate crypto; extern crate time; extern crate env_logger; +extern crate num_cpus; #[cfg(feature = "jit" )] extern crate evmjit; #[macro_use] diff --git a/src/queue.rs b/src/queue.rs index 5ca361834..a51d3014c 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,59 +1,231 @@ +use std::thread::{JoinHandle, self}; +use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use util::*; use verification::*; use error::*; use engine::Engine; use sync::*; use views::*; +use header::*; /// A queue of blocks. Sits between network or other I/O and the BlockChain. /// Sorts them ready for blockchain insertion. pub struct BlockQueue { engine: Arc>, + more_to_verify: Arc, + verification: Arc>, + verifiers: Vec>, + deleting: Arc, + ready_signal: Arc, + processing: HashSet +} + +struct UnVerifiedBlock { + header: Header, + bytes: Bytes, +} + +struct VerifyingBlock { + hash: H256, + block: Option, +} + +struct QueueSignal { + signalled: AtomicBool, message_channel: IoChannel, +} + +impl QueueSignal { + fn set(&self) { + if self.signalled.compare_and_swap(false, true, AtomicOrdering::Relaxed) == false { + self.message_channel.send(UserMessage(SyncMessage::BlockVerified)).expect("Error seding BlockVerified message"); + } + } + fn reset(&self) { + self.signalled.store(false, AtomicOrdering::Relaxed); + } +} + +#[derive(Default)] +struct Verification { + unverified: VecDeque, + verified: VecDeque, + verifying: VecDeque, bad: HashSet, } impl BlockQueue { /// Creates a new queue instance. pub fn new(engine: Arc>, message_channel: IoChannel) -> BlockQueue { + let verification = Arc::new(Mutex::new(Verification::default())); + let more_to_verify = Arc::new(Condvar::new()); + let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel }); + let deleting = Arc::new(AtomicBool::new(false)); + + let mut verifiers: Vec> = Vec::new(); + let thread_count = max(::num_cpus::get(), 2) - 1; + for _ in 0..thread_count { + let verification = verification.clone(); + let engine = engine.clone(); + let more_to_verify = more_to_verify.clone(); + let ready_signal = ready_signal.clone(); + let deleting = deleting.clone(); + verifiers.push(thread::spawn(move || BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting))); + } BlockQueue { engine: engine, - message_channel: message_channel, - bad: HashSet::new(), + ready_signal: ready_signal.clone(), + more_to_verify: more_to_verify.clone(), + verification: verification.clone(), + verifiers: verifiers, + deleting: deleting.clone(), + processing: HashSet::new(), + } + } + + fn verify(verification: Arc>, engine: Arc>, wait: Arc, ready: Arc, deleting: Arc) { + while !deleting.load(AtomicOrdering::Relaxed) { + { + let mut lock = verification.lock().unwrap(); + while lock.unverified.is_empty() && !deleting.load(AtomicOrdering::Relaxed) { + lock = wait.wait(lock).unwrap(); + } + + if deleting.load(AtomicOrdering::Relaxed) { + return; + } + } + + let block = { + let mut v = verification.lock().unwrap(); + if v.unverified.is_empty() { + continue; + } + let block = v.unverified.pop_front().unwrap(); + v.verifying.push_back(VerifyingBlock{ hash: block.header.hash(), block: None }); + block + }; + + let block_hash = block.header.hash(); + match verify_block_unordered(block.header, block.bytes, engine.deref().deref()) { + Ok(verified) => { + let mut v = verification.lock().unwrap(); + for e in &mut v.verifying { + if e.hash == block_hash { + e.block = Some(verified); + break; + } + } + if !v.verifying.is_empty() && v.verifying.front().unwrap().hash == block_hash { + // we're next! + let mut vref = v.deref_mut(); + BlockQueue::drain_verifying(&mut vref.verifying, &mut vref.verified, &mut vref.bad); + ready.set(); + } + }, + Err(err) => { + let mut v = verification.lock().unwrap(); + warn!(target: "client", "Stage 2 block verification failed for {}\nError: {:?}", block_hash, err); + v.bad.insert(block_hash.clone()); + v.verifying.retain(|e| e.hash != block_hash); + let mut vref = v.deref_mut(); + BlockQueue::drain_verifying(&mut vref.verifying, &mut vref.verified, &mut vref.bad); + ready.set(); + } + } + } + } + + fn drain_verifying(verifying: &mut VecDeque, verified: &mut VecDeque, bad: &mut HashSet) { + while !verifying.is_empty() && verifying.front().unwrap().block.is_some() { + let block = verifying.pop_front().unwrap().block.unwrap(); + if bad.contains(&block.header.parent_hash) { + bad.insert(block.header.hash()); + } + else { + verified.push_back(block); + } } } /// Clear the queue and stop verification activity. pub fn clear(&mut self) { + let mut verification = self.verification.lock().unwrap(); + verification.unverified.clear(); + verification.verifying.clear(); } /// Add a block to the queue. - pub fn import_block(&mut self, bytes: &[u8]) -> ImportResult { - let header = BlockView::new(bytes).header(); - if self.bad.contains(&header.hash()) { - return Err(ImportError::Bad(None)); + pub fn import_block(&mut self, bytes: Bytes) -> ImportResult { + let header = BlockView::new(&bytes).header(); + if self.processing.contains(&header.hash()) { + return Err(ImportError::AlreadyQueued); + } + { + let mut verification = self.verification.lock().unwrap(); + if verification.bad.contains(&header.hash()) { + return Err(ImportError::Bad(None)); + } + + if verification.bad.contains(&header.parent_hash) { + verification.bad.insert(header.hash()); + return Err(ImportError::Bad(None)); + } } - if self.bad.contains(&header.parent_hash) { - self.bad.insert(header.hash()); - return Err(ImportError::Bad(None)); + match verify_block_basic(&header, &bytes, self.engine.deref().deref()) { + Ok(()) => { + self.processing.insert(header.hash()); + self.verification.lock().unwrap().unverified.push_back(UnVerifiedBlock { header: header, bytes: bytes }); + self.more_to_verify.notify_all(); + }, + Err(err) => { + warn!(target: "client", "Stage 1 block verification failed for {}\nError: {:?}", BlockView::new(&bytes).header_view().sha3(), err); + self.verification.lock().unwrap().bad.insert(header.hash()); + } } - - try!(verify_block_basic(&header, bytes, self.engine.deref().deref()).map_err(|e| { - warn!(target: "client", "Stage 1 block verification failed for {}\nError: {:?}", BlockView::new(&bytes).header_view().sha3(), e); - e - })); - try!(verify_block_unordered(&header, bytes, self.engine.deref().deref()).map_err(|e| { - warn!(target: "client", "Stage 2 block verification failed for {}\nError: {:?}", BlockView::new(&bytes).header_view().sha3(), e); - e - })); - try!(self.message_channel.send(UserMessage(SyncMessage::BlockVerified(bytes.to_vec()))).map_err(|e| Error::from(e))); Ok(()) } + /// Mark given block and all its children as bad. Stops verification. pub fn mark_as_bad(&mut self, hash: &H256) { - self.bad.insert(hash.clone()); - //TODO: walk the queue + let mut verification_lock = self.verification.lock().unwrap(); + let mut verification = verification_lock.deref_mut(); + verification.bad.insert(hash.clone()); + let mut new_verified = VecDeque::new(); + for block in verification.verified.drain(..) { + if verification.bad.contains(&block.header.parent_hash) { + verification.bad.insert(block.header.hash()); + } + else { + new_verified.push_back(block); + } + } + verification.verified = new_verified; + } + + pub fn drain(&mut self, max: usize) -> Vec { + let mut verification = self.verification.lock().unwrap(); + let count = min(max, verification.verified.len()); + let mut result = Vec::with_capacity(count); + for _ in 0..count { + let block = verification.verified.pop_front().unwrap(); + self.processing.remove(&block.header.hash()); + result.push(block); + } + self.ready_signal.reset(); + result + } +} + +impl Drop for BlockQueue { + fn drop(&mut self) { + self.clear(); + self.deleting.store(true, AtomicOrdering::Relaxed); + self.more_to_verify.notify_all(); + for t in self.verifiers.drain(..) { + t.join().unwrap(); + } } } diff --git a/src/service.rs b/src/service.rs index 3bc137c9c..036c99bc4 100644 --- a/src/service.rs +++ b/src/service.rs @@ -55,8 +55,8 @@ impl IoHandler for ClientIoHandler { match net_message { &mut UserMessage(ref mut message) => { match message { - &mut SyncMessage::BlockVerified(ref mut bytes) => { - self.client.write().unwrap().import_verified_block(mem::replace(bytes, Bytes::new())); + &mut SyncMessage::BlockVerified => { + self.client.write().unwrap().import_verified_blocks(); }, _ => {}, // ignore other messages } diff --git a/src/sync/chain.rs b/src/sync/chain.rs index 43f5968f4..15fe6d1f9 100644 --- a/src/sync/chain.rs +++ b/src/sync/chain.rs @@ -401,7 +401,7 @@ impl ChainSync { let header_view = HeaderView::new(header_rlp.as_raw()); // TODO: Decompose block and add to self.headers and self.bodies instead if header_view.number() == From::from(self.last_imported_block + 1) { - match io.chain().import_block(block_rlp.as_raw()) { + match io.chain().import_block(block_rlp.as_raw().to_vec()) { Err(ImportError::AlreadyInChain) => { trace!(target: "sync", "New block already in chain {:?}", h); }, @@ -655,7 +655,7 @@ impl ChainSync { block_rlp.append_raw(body.at(0).as_raw(), 1); block_rlp.append_raw(body.at(1).as_raw(), 1); let h = &headers.1[i].hash; - match io.chain().import_block(&block_rlp.out()) { + match io.chain().import_block(block_rlp.out()) { Err(ImportError::AlreadyInChain) => { trace!(target: "sync", "Block already in chain {:?}", h); self.last_imported_block = headers.0 + i as BlockNumber; diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 491fa8e40..da91a6889 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -43,7 +43,7 @@ pub enum SyncMessage { /// New block has been imported into the blockchain NewChainBlock(Bytes), /// A block is ready - BlockVerified(Bytes), + BlockVerified, } pub type NetSyncMessage = NetworkIoMessage; diff --git a/src/sync/tests.rs b/src/sync/tests.rs index 84a8bf21f..05d7ac317 100644 --- a/src/sync/tests.rs +++ b/src/sync/tests.rs @@ -43,7 +43,7 @@ impl TestBlockChainClient { rlp.append(&header); rlp.append_raw(&rlp::NULL_RLP, 1); rlp.append_raw(uncles.as_raw(), 1); - self.import_block(rlp.as_raw()).unwrap(); + self.import_block(rlp.as_raw().to_vec()).unwrap(); } } } @@ -110,7 +110,7 @@ impl BlockChainClient for TestBlockChainClient { None } - fn import_block(&mut self, b: &[u8]) -> ImportResult { + fn import_block(&mut self, b: Bytes) -> ImportResult { let header = Rlp::new(&b).val_at::(0); let number: usize = header.number as usize; if number > self.blocks.len() { @@ -132,7 +132,7 @@ impl BlockChainClient for TestBlockChainClient { if number == self.numbers.len() { self.difficulty = self.difficulty + header.difficulty; self.last_hash = header.hash(); - self.blocks.insert(header.hash(), b.to_vec()); + self.blocks.insert(header.hash(), b); self.numbers.insert(number, header.hash()); let mut parent_hash = header.parent_hash; if number > 0 { diff --git a/src/verification.rs b/src/verification.rs index 6df49ac31..3d852dc3e 100644 --- a/src/verification.rs +++ b/src/verification.rs @@ -9,6 +9,16 @@ use common::*; use engine::Engine; use blockchain::*; +/// Preprocessed block data gathered in `verify_block_unordered` call +pub struct PreVerifiedBlock { + /// Populated block header + pub header: Header, + /// Populated block transactions + pub transactions: Vec, + /// Block bytes + pub bytes: Bytes, +} + /// Phase 1 quick block verification. Only does checks that are cheap. Operates on a single block pub fn verify_block_basic(header: &Header, bytes: &[u8], engine: &Engine) -> Result<(), Error> { try!(verify_header(&header, engine)); @@ -29,19 +39,26 @@ pub fn verify_block_basic(header: &Header, bytes: &[u8], engine: &Engine) -> Res /// Phase 2 verification. Perform costly checks such as transaction signatures and block nonce for ethash. /// Still operates on a individual block -/// TODO: return cached transactions, header hash. -pub fn verify_block_unordered(header: &Header, bytes: &[u8], engine: &Engine) -> Result<(), Error> { - try!(engine.verify_block_unordered(&header, Some(bytes))); - for u in Rlp::new(bytes).at(2).iter().map(|rlp| rlp.as_val::
()) { +/// Returns a PreVerifiedBlock structure populated with transactions +pub fn verify_block_unordered(header: Header, bytes: Bytes, engine: &Engine) -> Result { + try!(engine.verify_block_unordered(&header, Some(&bytes))); + for u in Rlp::new(&bytes).at(2).iter().map(|rlp| rlp.as_val::
()) { try!(engine.verify_block_unordered(&u, None)); } - // Verify transactions. - // TODO: pass in pre-recovered transactions - maybe verify_transaction wants to call `sender()`. - let v = BlockView::new(bytes); - for t in v.transactions() { - try!(engine.verify_transaction(&t, &header)); + // Verify transactions. + let mut transactions = Vec::new(); + { + let v = BlockView::new(&bytes); + for t in v.transactions() { + try!(engine.verify_transaction(&t, &header)); + transactions.push(t); + } } - Ok(()) + Ok(PreVerifiedBlock { + header: header, + transactions: transactions, + bytes: bytes, + }) } /// Phase 3 verification. Check block information against parent and uncles. diff --git a/util/src/io/service.rs b/util/src/io/service.rs index 4ccfb2407..dee7603d7 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -155,7 +155,7 @@ pub struct IoChannel where Message: Send { } impl IoChannel where Message: Send { - pub fn send(&mut self, message: Message) -> Result<(), IoError> { + pub fn send(&self, message: Message) -> Result<(), IoError> { try!(self.channel.send(IoMessage::UserMessage(message))); Ok(()) } From d05e7e031ba436a7f15c1d1684a3a9e3e8b61107 Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 18 Jan 2016 00:24:20 +0100 Subject: [PATCH 2/4] Basic queue test --- src/queue.rs | 14 ++++++++++++++ util/src/io/service.rs | 14 +++++++++++--- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/src/queue.rs b/src/queue.rs index a51d3014c..9af6294a7 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -229,3 +229,17 @@ impl Drop for BlockQueue { } } +#[cfg(test)] +mod tests { + use util::*; + use spec::*; + use queue::*; + + #[test] + fn test_block_queue() { + // TODO better test + let spec = Spec::new_test(); + let engine = spec.to_engine().unwrap(); + let _ = BlockQueue::new(Arc::new(engine), IoChannel::disconnected()); + } +} diff --git a/util/src/io/service.rs b/util/src/io/service.rs index dee7603d7..4a96d19a7 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -151,14 +151,22 @@ impl Handler for IoManager where Message: Send + 'static { /// Allows sending messages into the event loop. All the IO handlers will get the message /// in the `message` callback. pub struct IoChannel where Message: Send { - channel: Sender> + channel: Option>> } impl IoChannel where Message: Send { + /// Send a msessage through the channel pub fn send(&self, message: Message) -> Result<(), IoError> { - try!(self.channel.send(IoMessage::UserMessage(message))); + if let Some(ref channel) = self.channel { + try!(channel.send(IoMessage::UserMessage(message))); + } Ok(()) } + + /// Create a new channel to connected to event loop. + pub fn disconnected() -> IoChannel { + IoChannel { channel: None } + } } /// General IO Service. Starts an event loop and dispatches IO requests. @@ -198,7 +206,7 @@ impl IoService where Message: Send + 'static { /// Create a new message channel pub fn channel(&mut self) -> IoChannel { - IoChannel { channel: self.host_channel.clone() } + IoChannel { channel: Some(self.host_channel.clone()) } } } From e862fc62577bec590aae47b708e8016ecc91fb85 Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 18 Jan 2016 01:39:19 +0100 Subject: [PATCH 3/4] Typo --- src/queue.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/queue.rs b/src/queue.rs index 9af6294a7..5803b3e5f 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -38,7 +38,7 @@ struct QueueSignal { impl QueueSignal { fn set(&self) { if self.signalled.compare_and_swap(false, true, AtomicOrdering::Relaxed) == false { - self.message_channel.send(UserMessage(SyncMessage::BlockVerified)).expect("Error seding BlockVerified message"); + self.message_channel.send(UserMessage(SyncMessage::BlockVerified)).expect("Error sending BlockVerified message"); } } fn reset(&self) { From 77d2303b55f601f603f6c68894fcbe82c627669a Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 18 Jan 2016 12:49:39 +0100 Subject: [PATCH 4/4] Use sha3 crate in ethash --- ethash/Cargo.toml | 2 +- ethash/src/compute.rs | 54 ++++++++++---------------------- ethash/src/lib.rs | 2 +- util/Cargo.toml | 5 +-- util/sha3/Cargo.toml | 11 +++++++ util/{ => sha3}/build.rs | 0 util/sha3/src/lib.rs | 4 +++ util/{ => sha3}/src/tinykeccak.c | 0 util/src/sha3.rs | 5 ++- 9 files changed, 37 insertions(+), 46 deletions(-) create mode 100644 util/sha3/Cargo.toml rename util/{ => sha3}/build.rs (100%) create mode 100644 util/sha3/src/lib.rs rename util/{ => sha3}/src/tinykeccak.c (100%) diff --git a/ethash/Cargo.toml b/ethash/Cargo.toml index 2633a85eb..16af6525c 100644 --- a/ethash/Cargo.toml +++ b/ethash/Cargo.toml @@ -6,4 +6,4 @@ authors = ["arkpar u32 { #[inline] fn sha3_512(input: &[u8], output: &mut [u8]) { - let mut sha3 = Keccak::new_keccak512(); - sha3.update(input); - sha3.finalize(output); + unsafe { sha3::sha3_512(output.as_mut_ptr(), output.len(), input.as_ptr(), input.len()) }; } #[inline] @@ -107,9 +105,7 @@ fn get_seedhash(block_number: u64) -> H256 { let epochs = block_number / ETHASH_EPOCH_LENGTH; let mut ret: H256 = [0u8; 32]; for _ in 0..epochs { - let mut sha3 = Keccak::new_keccak256(); - sha3.update(&ret); - sha3.finalize(&mut ret); + unsafe { sha3::sha3_256(ret[..].as_mut_ptr(), 32, ret[..].as_ptr(), 32) }; } ret } @@ -125,15 +121,12 @@ pub fn quick_get_difficulty(header_hash: &H256, nonce: u64, mix_hash: &H256) -> unsafe { ptr::copy_nonoverlapping(header_hash.as_ptr(), buf.as_mut_ptr(), 32) }; unsafe { ptr::copy_nonoverlapping(mem::transmute(&nonce), buf[32..].as_mut_ptr(), 8) }; - let mut sha3 = Keccak::new_keccak512(); - sha3.update(&buf[0..40]); - sha3.finalize(&mut buf); + unsafe { sha3::sha3_512(buf.as_mut_ptr(), 64, buf.as_ptr(), 40) }; unsafe { ptr::copy_nonoverlapping(mix_hash.as_ptr(), buf[64..].as_mut_ptr(), 32) }; let mut hash = [0u8; 32]; - let mut sha3 = Keccak::new_keccak256(); - sha3.update(&buf); - sha3.finalize(&mut hash); + unsafe { sha3::sha3_256(hash.as_mut_ptr(), hash.len(), buf.as_ptr(), buf.len()) }; + hash.as_mut_ptr(); hash } @@ -157,10 +150,7 @@ fn hash_compute(light: &Light, full_size: usize, header_hash: &H256, nonce: u64 // compute sha3-512 hash and replicate across mix unsafe { - let mut sha3 = Keccak::new_keccak512(); - sha3.update(&s_mix.get_unchecked(0).bytes[0..40]); - sha3.finalize(&mut s_mix.get_unchecked_mut(0).bytes); - + sha3::sha3_512(s_mix.get_unchecked_mut(0).bytes.as_mut_ptr(), NODE_BYTES, s_mix.get_unchecked(0).bytes.as_ptr(), 40); let (f_mix, mut mix) = s_mix.split_at_mut(1); for w in 0..MIX_WORDS { *mix.get_unchecked_mut(0).as_words_mut().get_unchecked_mut(w) = *f_mix.get_unchecked(0).as_words().get_unchecked(w % NODE_WORDS); @@ -189,15 +179,13 @@ fn hash_compute(light: &Light, full_size: usize, header_hash: &H256, nonce: u64 *mix.get_unchecked_mut(0).as_words_mut().get_unchecked_mut(i) = reduction; } - let mut mix_hash: H256 = [0u8; 32]; + let mut mix_hash = [0u8; 32]; + let mut buf = [0u8; 32 + 64]; + ptr::copy_nonoverlapping(f_mix.get_unchecked_mut(0).bytes.as_ptr(), buf.as_mut_ptr(), 64); + ptr::copy_nonoverlapping(mix.get_unchecked_mut(0).bytes.as_ptr(), buf[64..].as_mut_ptr(), 32); ptr::copy_nonoverlapping(mix.get_unchecked_mut(0).bytes.as_ptr(), mix_hash.as_mut_ptr(), 32); let mut value: H256 = [0u8; 32]; - - let mut sha3 = Keccak::new_keccak256(); - sha3.update(&f_mix.get_unchecked(0).bytes); - sha3.update(&mix_hash); - sha3.finalize(&mut value); - + sha3::sha3_256(value.as_mut_ptr(), value.len(), buf.as_ptr(), buf.len()); ProofOfWork { mix_hash: mix_hash, value: value, @@ -212,10 +200,7 @@ fn calculate_dag_item(node_index: u32, light: &Light) -> Node { let init = cache_nodes.get_unchecked(node_index as usize % num_parent_nodes); let mut ret = init.clone(); *ret.as_words_mut().get_unchecked_mut(0) ^= node_index; - - let mut sha3 = Keccak::new_keccak512(); - sha3.update(&ret.bytes); - sha3.finalize(&mut ret.bytes); + sha3::sha3_512(ret.bytes.as_mut_ptr(), ret.bytes.len(), ret.bytes.as_ptr(), ret.bytes.len()); for i in 0..ETHASH_DATASET_PARENTS { let parent_index = fnv_hash(node_index ^ i, *ret.as_words().get_unchecked(i as usize % NODE_WORDS)) % num_parent_nodes as u32; @@ -224,10 +209,7 @@ fn calculate_dag_item(node_index: u32, light: &Light) -> Node { *ret.as_words_mut().get_unchecked_mut(w) = fnv_hash(*ret.as_words().get_unchecked(w), *parent.as_words().get_unchecked(w)); } } - - let mut sha3 = Keccak::new_keccak512(); - sha3.update(&ret.bytes); - sha3.finalize(&mut ret.bytes); + sha3::sha3_512(ret.bytes.as_mut_ptr(), ret.bytes.len(), ret.bytes.as_ptr(), ret.bytes.len()); ret } } @@ -246,9 +228,7 @@ fn light_new(block_number: u64) -> Light { unsafe { sha3_512(&seedhash[0..32], &mut nodes.get_unchecked_mut(0).bytes); for i in 1..num_nodes { - let mut sha3 = Keccak::new_keccak512(); - sha3.update(&nodes.get_unchecked_mut(i - 1).bytes); - sha3.finalize(&mut nodes.get_unchecked_mut(i).bytes); + sha3::sha3_512(nodes.get_unchecked_mut(i).bytes.as_mut_ptr(), NODE_BYTES, nodes.get_unchecked(i - 1).bytes.as_ptr(), NODE_BYTES); } for _ in 0..ETHASH_CACHE_ROUNDS { @@ -275,9 +255,9 @@ fn test_difficulty_test() { let mix_hash = [0x1f, 0xff, 0x04, 0xce, 0xc9, 0x41, 0x73, 0xfd, 0x59, 0x1e, 0x3d, 0x89, 0x60, 0xce, 0x6b, 0xdf, 0x8b, 0x19, 0x71, 0x04, 0x8c, 0x71, 0xff, 0x93, 0x7b, 0xb2, 0xd3, 0x2a, 0x64, 0x31, 0xab, 0x6d ]; let nonce = 0xd7b3ac70a301a249; let boundary_good = [0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x3e, 0x9b, 0x6c, 0x69, 0xbc, 0x2c, 0xe2, 0xa2, 0x4a, 0x8e, 0x95, 0x69, 0xef, 0xc7, 0xd7, 0x1b, 0x33, 0x35, 0xdf, 0x36, 0x8c, 0x9a, 0xe9, 0x7e, 0x53, 0x84]; - assert!(quick_check_difficulty(&hash, nonce, &mix_hash, &boundary_good)); + assert_eq!(quick_get_difficulty(&hash, nonce, &mix_hash)[..], boundary_good[..]); let boundary_bad = [0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x3a, 0x9b, 0x6c, 0x69, 0xbc, 0x2c, 0xe2, 0xa2, 0x4a, 0x8e, 0x95, 0x69, 0xef, 0xc7, 0xd7, 0x1b, 0x33, 0x35, 0xdf, 0x36, 0x8c, 0x9a, 0xe9, 0x7e, 0x53, 0x84]; - assert!(!quick_check_difficulty(&hash, nonce, &mix_hash, &boundary_bad)); + assert!(quick_get_difficulty(&hash, nonce, &mix_hash)[..] != boundary_bad[..]); } #[test] diff --git a/ethash/src/lib.rs b/ethash/src/lib.rs index 166574bef..f7b6d2308 100644 --- a/ethash/src/lib.rs +++ b/ethash/src/lib.rs @@ -1,6 +1,6 @@ //! Ethash implementation //! See https://github.com/ethereum/wiki/wiki/Ethash -extern crate tiny_keccak; +extern crate sha3; mod sizes; mod compute; diff --git a/util/Cargo.toml b/util/Cargo.toml index 6b60fddf1..02fdad17f 100644 --- a/util/Cargo.toml +++ b/util/Cargo.toml @@ -5,10 +5,6 @@ license = "GPL-3.0" name = "ethcore-util" version = "0.1.0" authors = ["Ethcore "] -build = "build.rs" - -[build-dependencies] -gcc = "0.3" [dependencies] log = "0.3" @@ -27,6 +23,7 @@ elastic-array = "0.4" heapsize = "0.2" itertools = "0.4" slab = { git = "https://github.com/arkpar/slab.git" } +sha3 = { path = "sha3" } [dev-dependencies] json-tests = { path = "json-tests" } diff --git a/util/sha3/Cargo.toml b/util/sha3/Cargo.toml new file mode 100644 index 000000000..ac423b22f --- /dev/null +++ b/util/sha3/Cargo.toml @@ -0,0 +1,11 @@ +[package] +description = "Rust bindings for tinykeccak C library" +homepage = "http://ethcore.io" +license = "GPL-3.0" +name = "sha3" +version = "0.1.0" +authors = ["Ethcore "] +build = "build.rs" + +[build-dependencies] +gcc = "0.3" diff --git a/util/build.rs b/util/sha3/build.rs similarity index 100% rename from util/build.rs rename to util/sha3/build.rs diff --git a/util/sha3/src/lib.rs b/util/sha3/src/lib.rs new file mode 100644 index 000000000..de2bf6e3e --- /dev/null +++ b/util/sha3/src/lib.rs @@ -0,0 +1,4 @@ +extern { + pub fn sha3_256(out: *mut u8, outlen: usize, input: *const u8, inputlen: usize) -> i32; + pub fn sha3_512(out: *mut u8, outlen: usize, input: *const u8, inputlen: usize) -> i32; +} diff --git a/util/src/tinykeccak.c b/util/sha3/src/tinykeccak.c similarity index 100% rename from util/src/tinykeccak.c rename to util/sha3/src/tinykeccak.c diff --git a/util/src/sha3.rs b/util/src/sha3.rs index c251edcfb..a33ac61f7 100644 --- a/util/src/sha3.rs +++ b/util/src/sha3.rs @@ -1,14 +1,13 @@ //! Wrapper around tiny-keccak crate. +extern crate sha3 as sha3_ext; use std::mem::uninitialized; use bytes::{BytesConvertable, Populatable}; use hash::{H256, FixedHash}; +use self::sha3_ext::*; pub const SHA3_EMPTY: H256 = H256( [0xc5, 0xd2, 0x46, 0x01, 0x86, 0xf7, 0x23, 0x3c, 0x92, 0x7e, 0x7d, 0xb2, 0xdc, 0xc7, 0x03, 0xc0, 0xe5, 0x00, 0xb6, 0x53, 0xca, 0x82, 0x27, 0x3b, 0x7b, 0xfa, 0xd8, 0x04, 0x5d, 0x85, 0xa4, 0x70] ); -extern { - fn sha3_256(out: *mut u8, outlen: usize, input: *const u8, inputlen: usize) -> i32; -} /// Types implementing this trait are sha3able. ///