From c8076b2f9d9ac45e1a431366eaa5710cedfdcccc Mon Sep 17 00:00:00 2001 From: arkpar Date: Sun, 21 Feb 2016 19:46:29 +0100 Subject: [PATCH 1/6] Threading performance optimizations --- Cargo.lock | 3 +- Cargo.toml | 3 + ethcore/src/block_queue.rs | 146 +++++++++++++++++++----------------- ethcore/src/client.rs | 30 ++++---- ethcore/src/verification.rs | 14 +--- util/sha3/build.rs | 2 +- util/src/lib.rs | 2 + util/src/thread.rs | 43 +++++++++++ 8 files changed, 148 insertions(+), 95 deletions(-) create mode 100644 util/src/thread.rs diff --git a/Cargo.lock b/Cargo.lock index cf747f3cc..50274857f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -151,7 +151,6 @@ dependencies = [ [[package]] name = "eth-secp256k1" version = "0.5.4" -source = "git+https://github.com/arkpar/rust-secp256k1.git#45503e1de68d909b1862e3f2bdb9e1cdfdff3f1e" dependencies = [ "arrayvec 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", "gcc 0.3.24 (registry+https://github.com/rust-lang/crates.io-index)", @@ -223,7 +222,7 @@ dependencies = [ "crossbeam 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "elastic-array 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", - "eth-secp256k1 0.5.4 (git+https://github.com/arkpar/rust-secp256k1.git)", + "eth-secp256k1 0.5.4", "ethcore-devtools 0.9.99", "heapsize 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "igd 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 7fdfc2bee..f28829180 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,3 +30,6 @@ travis-nightly = ["ethcore/json-tests", "dev"] [[bin]] path = "parity/main.rs" name = "parity" + +[profile.release] +debug = true diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index c39f158f0..a51a1e900 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -63,7 +63,7 @@ pub struct BlockQueue { panic_handler: Arc, engine: Arc>, more_to_verify: Arc, - verification: Arc>, + verification: Arc, verifiers: Vec>, deleting: Arc, ready_signal: Arc, @@ -98,12 +98,11 @@ impl QueueSignal { } } -#[derive(Default)] struct Verification { - unverified: VecDeque, - verified: VecDeque, - verifying: VecDeque, - bad: HashSet, + unverified: Mutex>, + verified: Mutex>, + verifying: Mutex>, + bad: Mutex>, } const MAX_UNVERIFIED_QUEUE_SIZE: usize = 50000; @@ -111,7 +110,12 @@ const MAX_UNVERIFIED_QUEUE_SIZE: usize = 50000; impl BlockQueue { /// Creates a new queue instance. pub fn new(engine: Arc>, message_channel: IoChannel) -> BlockQueue { - let verification = Arc::new(Mutex::new(Verification::default())); + let verification = Arc::new(Verification { + unverified: Mutex::new(VecDeque::new()), + verified: Mutex::new(VecDeque::new()), + verifying: Mutex::new(VecDeque::new()), + bad: Mutex::new(HashSet::new()), + }); let more_to_verify = Arc::new(Condvar::new()); let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel }); let deleting = Arc::new(AtomicBool::new(false)); @@ -119,7 +123,7 @@ impl BlockQueue { let panic_handler = PanicHandler::new_in_arc(); let mut verifiers: Vec> = Vec::new(); - let thread_count = max(::num_cpus::get(), 3) - 2; + let thread_count = max(::num_cpus::get(), 5) - 0; for i in 0..thread_count { let verification = verification.clone(); let engine = engine.clone(); @@ -133,7 +137,8 @@ impl BlockQueue { .name(format!("Verifier #{}", i)) .spawn(move || { panic_handler.catch_panic(move || { - BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty) + lower_thread_priority(); + BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty) }).unwrap() }) .expect("Error starting block verification thread") @@ -152,17 +157,17 @@ impl BlockQueue { } } - fn verify(verification: Arc>, engine: Arc>, wait: Arc, ready: Arc, deleting: Arc, empty: Arc) { + fn verify(verification: Arc, engine: Arc>, wait: Arc, ready: Arc, deleting: Arc, empty: Arc) { while !deleting.load(AtomicOrdering::Acquire) { { - let mut lock = verification.lock().unwrap(); + let mut unverified = verification.unverified.lock().unwrap(); - if lock.unverified.is_empty() && lock.verifying.is_empty() { + if unverified.is_empty() && verification.verifying.lock().unwrap().is_empty() { empty.notify_all(); } - while lock.unverified.is_empty() && !deleting.load(AtomicOrdering::Acquire) { - lock = wait.wait(lock).unwrap(); + while unverified.is_empty() && !deleting.load(AtomicOrdering::Acquire) { + unverified = wait.wait(unverified).unwrap(); } if deleting.load(AtomicOrdering::Acquire) { @@ -171,39 +176,42 @@ impl BlockQueue { } let block = { - let mut v = verification.lock().unwrap(); - if v.unverified.is_empty() { + let mut unverified = verification.unverified.lock().unwrap(); + if unverified.is_empty() { continue; } - let block = v.unverified.pop_front().unwrap(); - v.verifying.push_back(VerifyingBlock{ hash: block.header.hash(), block: None }); + let mut verifying = verification.verifying.lock().unwrap(); + let block = unverified.pop_front().unwrap(); + verifying.push_back(VerifyingBlock{ hash: block.header.hash(), block: None }); block }; let block_hash = block.header.hash(); match verify_block_unordered(block.header, block.bytes, engine.deref().deref()) { Ok(verified) => { - let mut v = verification.lock().unwrap(); - for e in &mut v.verifying { + let mut verifying = verification.verifying.lock().unwrap(); + for e in verifying.iter_mut() { if e.hash == block_hash { e.block = Some(verified); break; } } - if !v.verifying.is_empty() && v.verifying.front().unwrap().hash == block_hash { + if !verifying.is_empty() && verifying.front().unwrap().hash == block_hash { // we're next! - let mut vref = v.deref_mut(); - BlockQueue::drain_verifying(&mut vref.verifying, &mut vref.verified, &mut vref.bad); + let mut verified = verification.verified.lock().unwrap(); + let mut bad = verification.bad.lock().unwrap(); + BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); ready.set(); } }, Err(err) => { - let mut v = verification.lock().unwrap(); + let mut verifying = verification.verifying.lock().unwrap(); + let mut verified = verification.verified.lock().unwrap(); + let mut bad = verification.bad.lock().unwrap(); warn!(target: "client", "Stage 2 block verification failed for {}\nError: {:?}", block_hash, err); - v.bad.insert(block_hash.clone()); - v.verifying.retain(|e| e.hash != block_hash); - let mut vref = v.deref_mut(); - BlockQueue::drain_verifying(&mut vref.verifying, &mut vref.verified, &mut vref.bad); + bad.insert(block_hash.clone()); + verifying.retain(|e| e.hash != block_hash); + BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); ready.set(); } } @@ -223,19 +231,21 @@ impl BlockQueue { } /// Clear the queue and stop verification activity. - pub fn clear(&mut self) { - let mut verification = self.verification.lock().unwrap(); - verification.unverified.clear(); - verification.verifying.clear(); - verification.verified.clear(); + pub fn clear(&self) { + let mut unverified = self.verification.unverified.lock().unwrap(); + let mut verifying = self.verification.verifying.lock().unwrap(); + let mut verified = self.verification.verified.lock().unwrap(); + unverified.clear(); + verifying.clear(); + verified.clear(); self.processing.write().unwrap().clear(); } - /// Wait for queue to be empty - pub fn flush(&mut self) { - let mut verification = self.verification.lock().unwrap(); - while !verification.unverified.is_empty() || !verification.verifying.is_empty() { - verification = self.empty.wait(verification).unwrap(); + /// Wait for unverified queue to be empty + pub fn flush(&self) { + let mut unverified = self.verification.unverified.lock().unwrap(); + while !unverified.is_empty() || !self.verification.verifying.lock().unwrap().is_empty() { + unverified = self.empty.wait(unverified).unwrap(); } } @@ -244,27 +254,29 @@ impl BlockQueue { if self.processing.read().unwrap().contains(&hash) { return BlockStatus::Queued; } - if self.verification.lock().unwrap().bad.contains(&hash) { + if self.verification.bad.lock().unwrap().contains(&hash) { return BlockStatus::Bad; } BlockStatus::Unknown } /// Add a block to the queue. - pub fn import_block(&mut self, bytes: Bytes) -> ImportResult { + pub fn import_block(&self, bytes: Bytes) -> ImportResult { let header = BlockView::new(&bytes).header(); let h = header.hash(); - if self.processing.read().unwrap().contains(&h) { - return Err(ImportError::AlreadyQueued); - } { - let mut verification = self.verification.lock().unwrap(); - if verification.bad.contains(&h) { + if self.processing.read().unwrap().contains(&h) { + return Err(ImportError::AlreadyQueued); + } + } + { + let mut bad = self.verification.bad.lock().unwrap(); + if bad.contains(&h) { return Err(ImportError::Bad(None)); } - if verification.bad.contains(&header.parent_hash) { - verification.bad.insert(h.clone()); + if bad.contains(&header.parent_hash) { + bad.insert(h.clone()); return Err(ImportError::Bad(None)); } } @@ -272,39 +284,40 @@ impl BlockQueue { match verify_block_basic(&header, &bytes, self.engine.deref().deref()) { Ok(()) => { self.processing.write().unwrap().insert(h.clone()); - self.verification.lock().unwrap().unverified.push_back(UnVerifiedBlock { header: header, bytes: bytes }); + self.verification.unverified.lock().unwrap().push_back(UnVerifiedBlock { header: header, bytes: bytes }); self.more_to_verify.notify_all(); Ok(h) }, Err(err) => { warn!(target: "client", "Stage 1 block verification failed for {}\nError: {:?}", BlockView::new(&bytes).header_view().sha3(), err); - self.verification.lock().unwrap().bad.insert(h.clone()); + self.verification.bad.lock().unwrap().insert(h.clone()); Err(From::from(err)) } } } /// Mark given block and all its children as bad. Stops verification. - pub fn mark_as_bad(&mut self, hash: &H256) { - let mut verification_lock = self.verification.lock().unwrap(); - let mut verification = verification_lock.deref_mut(); - verification.bad.insert(hash.clone()); + pub fn mark_as_bad(&self, hash: &H256) { + let mut verified_lock = self.verification.verified.lock().unwrap(); + let mut verified = verified_lock.deref_mut(); + let mut bad = self.verification.bad.lock().unwrap(); + bad.insert(hash.clone()); self.processing.write().unwrap().remove(&hash); let mut new_verified = VecDeque::new(); - for block in verification.verified.drain(..) { - if verification.bad.contains(&block.header.parent_hash) { - verification.bad.insert(block.header.hash()); + for block in verified.drain(..) { + if bad.contains(&block.header.parent_hash) { + bad.insert(block.header.hash()); self.processing.write().unwrap().remove(&block.header.hash()); } else { new_verified.push_back(block); } } - verification.verified = new_verified; + *verified = new_verified; } /// Mark given block as processed - pub fn mark_as_good(&mut self, hashes: &[H256]) { + pub fn mark_as_good(&self, hashes: &[H256]) { let mut processing = self.processing.write().unwrap(); for h in hashes { processing.remove(&h); @@ -312,16 +325,16 @@ impl BlockQueue { } /// Removes up to `max` verified blocks from the queue - pub fn drain(&mut self, max: usize) -> Vec { - let mut verification = self.verification.lock().unwrap(); - let count = min(max, verification.verified.len()); + pub fn drain(&self, max: usize) -> Vec { + let mut verified = self.verification.verified.lock().unwrap(); + let count = min(max, verified.len()); let mut result = Vec::with_capacity(count); for _ in 0..count { - let block = verification.verified.pop_front().unwrap(); + let block = verified.pop_front().unwrap(); result.push(block); } self.ready_signal.reset(); - if !verification.verified.is_empty() { + if !verified.is_empty() { self.ready_signal.set(); } result @@ -329,11 +342,10 @@ impl BlockQueue { /// Get queue status. pub fn queue_info(&self) -> BlockQueueInfo { - let verification = self.verification.lock().unwrap(); BlockQueueInfo { - verified_queue_size: verification.verified.len(), - unverified_queue_size: verification.unverified.len(), - verifying_queue_size: verification.verifying.len(), + unverified_queue_size: self.verification.unverified.lock().unwrap().len(), + verifying_queue_size: self.verification.verifying.lock().unwrap().len(), + verified_queue_size: self.verification.verified.lock().unwrap().len(), } } } diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index c3ec4b4d0..0c8580117 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -172,7 +172,7 @@ pub struct Client { chain: Arc>, engine: Arc>, state_db: Mutex, - block_queue: RwLock, + block_queue: BlockQueue, report: RwLock, import_lock: Mutex<()>, panic_handler: Arc, @@ -231,7 +231,7 @@ impl Client { chain: chain, engine: engine, state_db: Mutex::new(state_db), - block_queue: RwLock::new(block_queue), + block_queue: block_queue, report: RwLock::new(Default::default()), import_lock: Mutex::new(()), panic_handler: panic_handler @@ -240,7 +240,7 @@ impl Client { /// Flush the block import queue. pub fn flush_queue(&self) { - self.block_queue.write().unwrap().flush(); + self.block_queue.flush(); } /// This is triggered by a message coming from a block queue when the block is ready for insertion @@ -248,11 +248,11 @@ impl Client { let mut ret = 0; let mut bad = HashSet::new(); let _import_lock = self.import_lock.lock(); - let blocks = self.block_queue.write().unwrap().drain(128); + let blocks = self.block_queue.drain(128); let mut good_blocks = Vec::with_capacity(128); for block in blocks { if bad.contains(&block.header.parent_hash) { - self.block_queue.write().unwrap().mark_as_bad(&block.header.hash()); + self.block_queue.mark_as_bad(&block.header.hash()); bad.insert(block.header.hash()); continue; } @@ -260,7 +260,7 @@ impl Client { let header = &block.header; if let Err(e) = verify_block_family(&header, &block.bytes, self.engine.deref().deref(), self.chain.read().unwrap().deref()) { warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); - self.block_queue.write().unwrap().mark_as_bad(&header.hash()); + self.block_queue.mark_as_bad(&header.hash()); bad.insert(block.header.hash()); break; }; @@ -268,7 +268,7 @@ impl Client { Some(p) => p, None => { warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash); - self.block_queue.write().unwrap().mark_as_bad(&header.hash()); + self.block_queue.mark_as_bad(&header.hash()); bad.insert(block.header.hash()); break; }, @@ -292,13 +292,13 @@ impl Client { Err(e) => { warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); bad.insert(block.header.hash()); - self.block_queue.write().unwrap().mark_as_bad(&header.hash()); + self.block_queue.mark_as_bad(&header.hash()); break; } }; if let Err(e) = verify_block_final(&header, result.block().header()) { warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); - self.block_queue.write().unwrap().mark_as_bad(&header.hash()); + self.block_queue.mark_as_bad(&header.hash()); break; } @@ -317,8 +317,8 @@ impl Client { trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); ret += 1; } - self.block_queue.write().unwrap().mark_as_good(&good_blocks); - if !good_blocks.is_empty() && self.block_queue.read().unwrap().queue_info().is_empty() { + self.block_queue.mark_as_good(&good_blocks); + if !good_blocks.is_empty() && self.block_queue.queue_info().is_empty() { io.send(NetworkIoMessage::User(SyncMessage::BlockVerified)).unwrap(); } ret @@ -389,7 +389,7 @@ impl BlockChainClient for Client { let chain = self.chain.read().unwrap(); match Self::block_hash(&chain, id) { Some(ref hash) if chain.is_known(hash) => BlockStatus::InChain, - Some(hash) => self.block_queue.read().unwrap().block_status(&hash), + Some(hash) => self.block_queue.block_status(&hash), None => BlockStatus::Unknown } } @@ -434,15 +434,15 @@ impl BlockChainClient for Client { if self.block_status(BlockId::Hash(header.parent_hash)) == BlockStatus::Unknown { return Err(ImportError::UnknownParent); } - self.block_queue.write().unwrap().import_block(bytes) + self.block_queue.import_block(bytes) } fn queue_info(&self) -> BlockQueueInfo { - self.block_queue.read().unwrap().queue_info() + self.block_queue.queue_info() } fn clear_queue(&self) { - self.block_queue.write().unwrap().clear(); + self.block_queue.clear(); } fn chain_info(&self) -> BlockChainInfo { diff --git a/ethcore/src/verification.rs b/ethcore/src/verification.rs index c7d5e265f..fa9467e95 100644 --- a/ethcore/src/verification.rs +++ b/ethcore/src/verification.rs @@ -57,18 +57,12 @@ pub fn verify_block_basic(header: &Header, bytes: &[u8], engine: &Engine) -> Res /// Still operates on a individual block /// Returns a PreVerifiedBlock structure populated with transactions pub fn verify_block_unordered(header: Header, bytes: Bytes, engine: &Engine) -> Result { - try!(engine.verify_block_unordered(&header, Some(&bytes))); - for u in Rlp::new(&bytes).at(2).iter().map(|rlp| rlp.as_val::
()) { - try!(engine.verify_block_unordered(&u, None)); - } // Verify transactions. let mut transactions = Vec::new(); - { - let v = BlockView::new(&bytes); - for t in v.transactions() { - try!(engine.verify_transaction(&t, &header)); - transactions.push(t); - } + let v = BlockView::new(&bytes); + for t in v.transactions() { + try!(engine.verify_transaction(&t, &header)); + transactions.push(t); } Ok(PreVerifiedBlock { header: header, diff --git a/util/sha3/build.rs b/util/sha3/build.rs index bbe16d720..9eb36fdb9 100644 --- a/util/sha3/build.rs +++ b/util/sha3/build.rs @@ -21,6 +21,6 @@ extern crate gcc; fn main() { - gcc::compile_library("libtinykeccak.a", &["src/tinykeccak.c"]); + gcc::Config::new().file("src/tinykeccak.c").flag("-O3").compile("libtinykeccak.a"); } diff --git a/util/src/lib.rs b/util/src/lib.rs index 2b7438cf3..5c8bd4fb0 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -143,6 +143,7 @@ pub mod network; pub mod log; pub mod panics; pub mod keys; +mod thread; pub use common::*; pub use misc::*; @@ -163,4 +164,5 @@ pub use semantic_version::*; pub use network::*; pub use io::*; pub use log::*; +pub use thread::*; diff --git a/util/src/thread.rs b/util/src/thread.rs new file mode 100644 index 000000000..b86ca3e86 --- /dev/null +++ b/util/src/thread.rs @@ -0,0 +1,43 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Thread management helpers + +use libc::{c_int, pthread_self, pthread_t}; + +#[repr(C)] +struct sched_param { + priority: c_int, + padding: c_int, +} + +extern { + fn setpriority(which: c_int, who: c_int, prio: c_int) -> c_int; + fn pthread_setschedparam(thread: pthread_t, policy: c_int, param: *const sched_param) -> c_int; +} +const PRIO_DARWIN_THREAD: c_int = 3; +const PRIO_DARWIN_BG: c_int = 0x1000; +const SCHED_RR: c_int = 2; + +/// Lower thread priority and put it into background mode +#[cfg(target_os="macos")] +pub fn lower_thread_priority() { + let sp = sched_param { priority: 0, padding: 0 }; + if unsafe { pthread_setschedparam(pthread_self(), SCHED_RR, &sp) } == -1 { + trace!("Could not decrease thread piority"); + } + //unsafe { setpriority(PRIO_DARWIN_THREAD, 0, PRIO_DARWIN_BG); } +} From 778fa92ebe82bcf7c739b4750a5443f2ed575802 Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 22 Feb 2016 00:36:59 +0100 Subject: [PATCH 2/6] Remove locks from the block chain --- Cargo.lock | 3 +- Cargo.toml | 2 +- ethcore/src/block_queue.rs | 12 +++---- ethcore/src/blockchain.rs | 48 ++++++++++++++++------------ ethcore/src/client.rs | 63 +++++++++++++++++-------------------- ethcore/src/verification.rs | 14 ++++++--- util/src/lib.rs | 2 -- util/src/thread.rs | 43 ------------------------- 8 files changed, 75 insertions(+), 112 deletions(-) delete mode 100644 util/src/thread.rs diff --git a/Cargo.lock b/Cargo.lock index 50274857f..cf747f3cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -151,6 +151,7 @@ dependencies = [ [[package]] name = "eth-secp256k1" version = "0.5.4" +source = "git+https://github.com/arkpar/rust-secp256k1.git#45503e1de68d909b1862e3f2bdb9e1cdfdff3f1e" dependencies = [ "arrayvec 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", "gcc 0.3.24 (registry+https://github.com/rust-lang/crates.io-index)", @@ -222,7 +223,7 @@ dependencies = [ "crossbeam 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "elastic-array 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", - "eth-secp256k1 0.5.4", + "eth-secp256k1 0.5.4 (git+https://github.com/arkpar/rust-secp256k1.git)", "ethcore-devtools 0.9.99", "heapsize 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "igd 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index f28829180..8bc94a3a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,4 +32,4 @@ path = "parity/main.rs" name = "parity" [profile.release] -debug = true +debug = false diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index a51a1e900..ba9867966 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -99,6 +99,7 @@ impl QueueSignal { } struct Verification { + // All locks must be captured in the order declared here. unverified: Mutex>, verified: Mutex>, verifying: Mutex>, @@ -123,7 +124,7 @@ impl BlockQueue { let panic_handler = PanicHandler::new_in_arc(); let mut verifiers: Vec> = Vec::new(); - let thread_count = max(::num_cpus::get(), 5) - 0; + let thread_count = max(::num_cpus::get(), 3) - 2; for i in 0..thread_count { let verification = verification.clone(); let engine = engine.clone(); @@ -137,7 +138,6 @@ impl BlockQueue { .name(format!("Verifier #{}", i)) .spawn(move || { panic_handler.catch_panic(move || { - lower_thread_priority(); BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty) }).unwrap() }) @@ -392,7 +392,7 @@ mod tests { #[test] fn can_import_blocks() { - let mut queue = get_test_queue(); + let queue = get_test_queue(); if let Err(e) = queue.import_block(get_good_dummy_block()) { panic!("error importing block that is valid by definition({:?})", e); } @@ -400,7 +400,7 @@ mod tests { #[test] fn returns_error_for_duplicates() { - let mut queue = get_test_queue(); + let queue = get_test_queue(); if let Err(e) = queue.import_block(get_good_dummy_block()) { panic!("error importing block that is valid by definition({:?})", e); } @@ -419,7 +419,7 @@ mod tests { #[test] fn returns_ok_for_drained_duplicates() { - let mut queue = get_test_queue(); + let queue = get_test_queue(); let block = get_good_dummy_block(); let hash = BlockView::new(&block).header().hash().clone(); if let Err(e) = queue.import_block(block) { @@ -436,7 +436,7 @@ mod tests { #[test] fn returns_empty_once_finished() { - let mut queue = get_test_queue(); + let queue = get_test_queue(); queue.import_block(get_good_dummy_block()).expect("error importing block that is valid by definition"); queue.flush(); queue.drain(1); diff --git a/ethcore/src/blockchain.rs b/ethcore/src/blockchain.rs index cc9ff56fd..22d409e8e 100644 --- a/ethcore/src/blockchain.rs +++ b/ethcore/src/blockchain.rs @@ -16,6 +16,7 @@ //! Blockchain database. +use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrder}; use util::*; use rocksdb::{DB, WriteBatch, Writable}; use header::*; @@ -147,8 +148,9 @@ struct CacheManager { /// /// **Does not do input data verification.** pub struct BlockChain { - pref_cache_size: usize, - max_cache_size: usize, + // All locks must be captured in the order declared here. + pref_cache_size: AtomicUsize, + max_cache_size: AtomicUsize, best_block: RwLock, @@ -166,6 +168,7 @@ pub struct BlockChain { blocks_db: DB, cache_man: RwLock, + insert_lock: Mutex<()> } impl BlockProvider for BlockChain { @@ -261,8 +264,8 @@ impl BlockChain { (0..COLLECTION_QUEUE_SIZE).foreach(|_| cache_man.cache_usage.push_back(HashSet::new())); let bc = BlockChain { - pref_cache_size: 1 << 14, - max_cache_size: 1 << 20, + pref_cache_size: AtomicUsize::new(1 << 14), + max_cache_size: AtomicUsize::new(1 << 20), best_block: RwLock::new(BestBlock::new()), blocks: RwLock::new(HashMap::new()), block_details: RwLock::new(HashMap::new()), @@ -273,6 +276,7 @@ impl BlockChain { extras_db: extras_db, blocks_db: blocks_db, cache_man: RwLock::new(cache_man), + insert_lock: Mutex::new(()), }; // load best block @@ -315,9 +319,9 @@ impl BlockChain { } /// Set the cache configuration. - pub fn configure_cache(&mut self, pref_cache_size: usize, max_cache_size: usize) { - self.pref_cache_size = pref_cache_size; - self.max_cache_size = max_cache_size; + pub fn configure_cache(&self, pref_cache_size: usize, max_cache_size: usize) { + self.pref_cache_size.store(pref_cache_size, AtomicOrder::Relaxed); + self.max_cache_size.store(max_cache_size, AtomicOrder::Relaxed); } /// Returns a tree route between `from` and `to`, which is a tuple of: @@ -435,22 +439,26 @@ impl BlockChain { return; } + let _lock = self.insert_lock.lock(); // store block in db self.blocks_db.put(&hash, &bytes).unwrap(); let (batch, new_best, details) = self.block_to_extras_insert_batch(bytes); - // update best block - let mut best_block = self.best_block.write().unwrap(); - if let Some(b) = new_best { - *best_block = b; + { + // update best block + let mut best_block = self.best_block.write().unwrap(); + if let Some(b) = new_best { + *best_block = b; + } } - // update caches - let mut write = self.block_details.write().unwrap(); - write.remove(&header.parent_hash()); - write.insert(hash.clone(), details); - self.note_used(CacheID::Block(hash)); - + { + // update caches + let mut write = self.block_details.write().unwrap(); + write.remove(&header.parent_hash()); + write.insert(hash.clone(), details); + self.note_used(CacheID::Block(hash)); + } // update extras database self.extras_db.write(batch).unwrap(); } @@ -622,17 +630,17 @@ impl BlockChain { /// Ticks our cache system and throws out any old data. pub fn collect_garbage(&self) { - if self.cache_size().total() < self.pref_cache_size { return; } + if self.cache_size().total() < self.pref_cache_size.load(AtomicOrder::Relaxed) { return; } for _ in 0..COLLECTION_QUEUE_SIZE { { - let mut cache_man = self.cache_man.write().unwrap(); let mut blocks = self.blocks.write().unwrap(); let mut block_details = self.block_details.write().unwrap(); let mut block_hashes = self.block_hashes.write().unwrap(); let mut transaction_addresses = self.transaction_addresses.write().unwrap(); let mut block_logs = self.block_logs.write().unwrap(); let mut blocks_blooms = self.blocks_blooms.write().unwrap(); + let mut cache_man = self.cache_man.write().unwrap(); for id in cache_man.cache_usage.pop_back().unwrap().into_iter() { cache_man.in_use.remove(&id); @@ -650,7 +658,7 @@ impl BlockChain { // TODO: handle block_hashes properly. block_hashes.clear(); } - if self.cache_size().total() < self.max_cache_size { break; } + if self.cache_size().total() < self.max_cache_size.load(AtomicOrder::Relaxed) { break; } } // TODO: m_lastCollection = chrono::system_clock::now(); diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index 0c8580117..68801520c 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -169,7 +169,7 @@ impl ClientReport { /// Blockchain database client backed by a persistent database. Owns and manages a blockchain and a block queue. /// Call `import_block()` to import a block asynchronously; `flush_queue()` flushes the queue. pub struct Client { - chain: Arc>, + chain: Arc, engine: Arc>, state_db: Mutex, block_queue: BlockQueue, @@ -190,7 +190,7 @@ impl Client { dir.push(format!("v{}-sec-pruned", CLIENT_DB_VER_STR)); let path = dir.as_path(); let gb = spec.genesis_block(); - let chain = Arc::new(RwLock::new(BlockChain::new(&gb, path))); + let chain = Arc::new(BlockChain::new(&gb, path)); let mut opts = Options::new(); opts.set_max_open_files(256); opts.create_if_missing(true); @@ -258,13 +258,13 @@ impl Client { } let header = &block.header; - if let Err(e) = verify_block_family(&header, &block.bytes, self.engine.deref().deref(), self.chain.read().unwrap().deref()) { + if let Err(e) = verify_block_family(&header, &block.bytes, self.engine.deref().deref(), self.chain.deref()) { warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); self.block_queue.mark_as_bad(&header.hash()); bad.insert(block.header.hash()); break; }; - let parent = match self.chain.read().unwrap().block_header(&header.parent_hash) { + let parent = match self.chain.block_header(&header.parent_hash) { Some(p) => p, None => { warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash); @@ -278,7 +278,7 @@ impl Client { last_hashes.resize(256, H256::new()); last_hashes[0] = header.parent_hash.clone(); for i in 0..255 { - match self.chain.read().unwrap().block_details(&last_hashes[i]) { + match self.chain.block_details(&last_hashes[i]) { Some(details) => { last_hashes[i + 1] = details.parent.clone(); }, @@ -304,9 +304,9 @@ impl Client { good_blocks.push(header.hash().clone()); - self.chain.write().unwrap().insert_block(&block.bytes); //TODO: err here? + self.chain.insert_block(&block.bytes); //TODO: err here? let ancient = if header.number() >= HISTORY { Some(header.number() - HISTORY) } else { None }; - match result.drain().commit(header.number(), &header.hash(), ancient.map(|n|(n, self.chain.read().unwrap().block_hash(n).unwrap()))) { + match result.drain().commit(header.number(), &header.hash(), ancient.map(|n|(n, self.chain.block_hash(n).unwrap()))) { Ok(_) => (), Err(e) => { warn!(target: "client", "State DB commit failed: {:?}", e); @@ -331,7 +331,7 @@ impl Client { /// Get info on the cache. pub fn cache_info(&self) -> CacheSize { - self.chain.read().unwrap().cache_size() + self.chain.cache_size() } /// Get the report. @@ -341,12 +341,12 @@ impl Client { /// Tick the client. pub fn tick(&self) { - self.chain.read().unwrap().collect_garbage(); + self.chain.collect_garbage(); } /// Set up the cache behaviour. pub fn configure_cache(&self, pref_cache_size: usize, max_cache_size: usize) { - self.chain.write().unwrap().configure_cache(pref_cache_size, max_cache_size); + self.chain.configure_cache(pref_cache_size, max_cache_size); } fn block_hash(chain: &BlockChain, id: BlockId) -> Option { @@ -361,14 +361,12 @@ impl Client { impl BlockChainClient for Client { fn block_header(&self, id: BlockId) -> Option { - let chain = self.chain.read().unwrap(); - Self::block_hash(&chain, id).and_then(|hash| chain.block(&hash).map(|bytes| BlockView::new(&bytes).rlp().at(0).as_raw().to_vec())) + Self::block_hash(&self.chain, id).and_then(|hash| self.chain.block(&hash).map(|bytes| BlockView::new(&bytes).rlp().at(0).as_raw().to_vec())) } fn block_body(&self, id: BlockId) -> Option { - let chain = self.chain.read().unwrap(); - Self::block_hash(&chain, id).and_then(|hash| { - chain.block(&hash).map(|bytes| { + Self::block_hash(&self.chain, id).and_then(|hash| { + self.chain.block(&hash).map(|bytes| { let rlp = Rlp::new(&bytes); let mut body = RlpStream::new_list(2); body.append_raw(rlp.at(1).as_raw(), 1); @@ -379,24 +377,21 @@ impl BlockChainClient for Client { } fn block(&self, id: BlockId) -> Option { - let chain = self.chain.read().unwrap(); - Self::block_hash(&chain, id).and_then(|hash| { - chain.block(&hash) + Self::block_hash(&self.chain, id).and_then(|hash| { + self.chain.block(&hash) }) } fn block_status(&self, id: BlockId) -> BlockStatus { - let chain = self.chain.read().unwrap(); - match Self::block_hash(&chain, id) { - Some(ref hash) if chain.is_known(hash) => BlockStatus::InChain, + match Self::block_hash(&self.chain, id) { + Some(ref hash) if self.chain.is_known(hash) => BlockStatus::InChain, Some(hash) => self.block_queue.block_status(&hash), None => BlockStatus::Unknown } } fn block_total_difficulty(&self, id: BlockId) -> Option { - let chain = self.chain.read().unwrap(); - Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty) + Self::block_hash(&self.chain, id).and_then(|hash| self.chain.block_details(&hash)).map(|d| d.total_difficulty) } fn code(&self, address: &Address) -> Option { @@ -404,18 +399,17 @@ impl BlockChainClient for Client { } fn transaction(&self, id: TransactionId) -> Option { - let chain = self.chain.read().unwrap(); match id { - TransactionId::Hash(ref hash) => chain.transaction_address(hash), - TransactionId::Location(id, index) => Self::block_hash(&chain, id).map(|hash| TransactionAddress { + TransactionId::Hash(ref hash) => self.chain.transaction_address(hash), + TransactionId::Location(id, index) => Self::block_hash(&self.chain, id).map(|hash| TransactionAddress { block_hash: hash, index: index }) - }.and_then(|address| chain.transaction(&address)) + }.and_then(|address| self.chain.transaction(&address)) } fn tree_route(&self, from: &H256, to: &H256) -> Option { - self.chain.read().unwrap().tree_route(from.clone(), to.clone()) + self.chain.tree_route(from.clone(), to.clone()) } fn state_data(&self, _hash: &H256) -> Option { @@ -428,7 +422,7 @@ impl BlockChainClient for Client { fn import_block(&self, bytes: Bytes) -> ImportResult { let header = BlockView::new(&bytes).header(); - if self.chain.read().unwrap().is_known(&header.hash()) { + if self.chain.is_known(&header.hash()) { return Err(ImportError::AlreadyInChain); } if self.block_status(BlockId::Hash(header.parent_hash)) == BlockStatus::Unknown { @@ -446,13 +440,12 @@ impl BlockChainClient for Client { } fn chain_info(&self) -> BlockChainInfo { - let chain = self.chain.read().unwrap(); BlockChainInfo { - total_difficulty: chain.best_block_total_difficulty(), - pending_total_difficulty: chain.best_block_total_difficulty(), - genesis_hash: chain.genesis_hash(), - best_block_hash: chain.best_block_hash(), - best_block_number: From::from(chain.best_block_number()) + total_difficulty: self.chain.best_block_total_difficulty(), + pending_total_difficulty: self.chain.best_block_total_difficulty(), + genesis_hash: self.chain.genesis_hash(), + best_block_hash: self.chain.best_block_hash(), + best_block_number: From::from(self.chain.best_block_number()) } } } diff --git a/ethcore/src/verification.rs b/ethcore/src/verification.rs index fa9467e95..c7d5e265f 100644 --- a/ethcore/src/verification.rs +++ b/ethcore/src/verification.rs @@ -57,12 +57,18 @@ pub fn verify_block_basic(header: &Header, bytes: &[u8], engine: &Engine) -> Res /// Still operates on a individual block /// Returns a PreVerifiedBlock structure populated with transactions pub fn verify_block_unordered(header: Header, bytes: Bytes, engine: &Engine) -> Result { + try!(engine.verify_block_unordered(&header, Some(&bytes))); + for u in Rlp::new(&bytes).at(2).iter().map(|rlp| rlp.as_val::
()) { + try!(engine.verify_block_unordered(&u, None)); + } // Verify transactions. let mut transactions = Vec::new(); - let v = BlockView::new(&bytes); - for t in v.transactions() { - try!(engine.verify_transaction(&t, &header)); - transactions.push(t); + { + let v = BlockView::new(&bytes); + for t in v.transactions() { + try!(engine.verify_transaction(&t, &header)); + transactions.push(t); + } } Ok(PreVerifiedBlock { header: header, diff --git a/util/src/lib.rs b/util/src/lib.rs index 5c8bd4fb0..2b7438cf3 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -143,7 +143,6 @@ pub mod network; pub mod log; pub mod panics; pub mod keys; -mod thread; pub use common::*; pub use misc::*; @@ -164,5 +163,4 @@ pub use semantic_version::*; pub use network::*; pub use io::*; pub use log::*; -pub use thread::*; diff --git a/util/src/thread.rs b/util/src/thread.rs deleted file mode 100644 index b86ca3e86..000000000 --- a/util/src/thread.rs +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2015, 2016 Ethcore (UK) Ltd. -// This file is part of Parity. - -// Parity is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity. If not, see . - -//! Thread management helpers - -use libc::{c_int, pthread_self, pthread_t}; - -#[repr(C)] -struct sched_param { - priority: c_int, - padding: c_int, -} - -extern { - fn setpriority(which: c_int, who: c_int, prio: c_int) -> c_int; - fn pthread_setschedparam(thread: pthread_t, policy: c_int, param: *const sched_param) -> c_int; -} -const PRIO_DARWIN_THREAD: c_int = 3; -const PRIO_DARWIN_BG: c_int = 0x1000; -const SCHED_RR: c_int = 2; - -/// Lower thread priority and put it into background mode -#[cfg(target_os="macos")] -pub fn lower_thread_priority() { - let sp = sched_param { priority: 0, padding: 0 }; - if unsafe { pthread_setschedparam(pthread_self(), SCHED_RR, &sp) } == -1 { - trace!("Could not decrease thread piority"); - } - //unsafe { setpriority(PRIO_DARWIN_THREAD, 0, PRIO_DARWIN_BG); } -} From cb4d17825bc6353461549a4a9c03424cda525cae Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 29 Feb 2016 19:49:29 +0100 Subject: [PATCH 3/6] Fixed lock order --- Cargo.toml | 1 + ethcore/src/blockchain/blockchain.rs | 78 +++++++++++++++------------- 2 files changed, 44 insertions(+), 35 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 25b7caa85..df3beef49 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,3 +35,4 @@ name = "parity" [profile.release] debug = false +lto = false diff --git a/ethcore/src/blockchain/blockchain.rs b/ethcore/src/blockchain/blockchain.rs index 7339f3a1a..f30a674e6 100644 --- a/ethcore/src/blockchain/blockchain.rs +++ b/ethcore/src/blockchain/blockchain.rs @@ -433,48 +433,56 @@ impl BlockChain { batch.put(b"best", &update.info.hash).unwrap(); // These cached values must be updated atomically - let mut best_block = self.best_block.write().unwrap(); - let mut write_hashes = self.block_hashes.write().unwrap(); - let mut write_txs = self.transaction_addresses.write().unwrap(); + { + let mut best_block = self.best_block.write().unwrap(); + let mut write_hashes = self.block_hashes.write().unwrap(); + let mut write_txs = self.transaction_addresses.write().unwrap(); - // update best block - match update.info.location { - BlockLocation::Branch => (), - _ => { - *best_block = BestBlock { - hash: update.info.hash, - number: update.info.number, - total_difficulty: update.info.total_difficulty - }; + // update best block + match update.info.location { + BlockLocation::Branch => (), + _ => { + *best_block = BestBlock { + hash: update.info.hash, + number: update.info.number, + total_difficulty: update.info.total_difficulty + }; + } + } + + for (number, hash) in &update.block_hashes { + batch.put_extras(number, hash); + write_hashes.remove(number); + } + + for (hash, tx_address) in &update.transactions_addresses { + batch.put_extras(hash, tx_address); + write_txs.remove(hash); } } - for (number, hash) in &update.block_hashes { - batch.put_extras(number, hash); - write_hashes.remove(number); + { + let mut write_details = self.block_details.write().unwrap(); + for (hash, details) in update.block_details.into_iter() { + batch.put_extras(&hash, &details); + write_details.insert(hash, details); + } } - let mut write_details = self.block_details.write().unwrap(); - for (hash, details) in update.block_details.into_iter() { - batch.put_extras(&hash, &details); - write_details.insert(hash, details); + { + let mut write_receipts = self.block_receipts.write().unwrap(); + for (hash, receipt) in &update.block_receipts { + batch.put_extras(hash, receipt); + write_receipts.remove(hash); + } } - let mut write_receipts = self.block_receipts.write().unwrap(); - for (hash, receipt) in &update.block_receipts { - batch.put_extras(hash, receipt); - write_receipts.remove(hash); - } - - for (hash, tx_address) in &update.transactions_addresses { - batch.put_extras(hash, tx_address); - write_txs.remove(hash); - } - - let mut write_blocks_blooms = self.blocks_blooms.write().unwrap(); - for (bloom_hash, blocks_bloom) in &update.blocks_blooms { - batch.put_extras(bloom_hash, blocks_bloom); - write_blocks_blooms.remove(bloom_hash); + { + let mut write_blocks_blooms = self.blocks_blooms.write().unwrap(); + for (bloom_hash, blocks_bloom) in &update.blocks_blooms { + batch.put_extras(bloom_hash, blocks_bloom); + write_blocks_blooms.remove(bloom_hash); + } } // update extras database @@ -580,7 +588,7 @@ impl BlockChain { /// This function returns modified transaction addresses. fn prepare_transaction_addresses_update(&self, block_bytes: &[u8], info: &BlockInfo) -> HashMap { let block = BlockView::new(block_bytes); - let transaction_hashes = block.transaction_hashes(); + let transaction_hashes = block.transaction_hashes(); transaction_hashes.into_iter() .enumerate() From d0129ff67b5e69436df88a6758e81d880e924a23 Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 29 Feb 2016 21:15:39 +0100 Subject: [PATCH 4/6] Fixed cache memory leak --- ethcore/src/blockchain/blockchain.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/ethcore/src/blockchain/blockchain.rs b/ethcore/src/blockchain/blockchain.rs index f30a674e6..23e9aaac9 100644 --- a/ethcore/src/blockchain/blockchain.rs +++ b/ethcore/src/blockchain/blockchain.rs @@ -465,6 +465,7 @@ impl BlockChain { let mut write_details = self.block_details.write().unwrap(); for (hash, details) in update.block_details.into_iter() { batch.put_extras(&hash, &details); + self.note_used(CacheID::Extras(ExtrasIndex::BlockDetails, hash.clone())); write_details.insert(hash, details); } } @@ -769,6 +770,14 @@ impl BlockChain { // TODO: handle block_hashes properly. block_hashes.clear(); + + blocks.shrink_to_fit(); + block_details.shrink_to_fit(); + block_hashes.shrink_to_fit(); + transaction_addresses.shrink_to_fit(); + block_logs.shrink_to_fit(); + blocks_blooms.shrink_to_fit(); + block_receipts.shrink_to_fit(); } if self.cache_size().total() < self.max_cache_size.load(AtomicOrder::Relaxed) { break; } } From 324e070581305ded5b9c40942f86837756f2116d Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 2 Mar 2016 01:24:06 +0100 Subject: [PATCH 5/6] Reverted some changes --- ethcore/src/block_queue.rs | 3 +-- util/sha3/build.rs | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index f61eb565d..de411c6e2 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -302,8 +302,7 @@ impl BlockQueue { if self.processing.read().unwrap().contains(&h) { return Err(ImportError::AlreadyQueued); } - } - { + let mut bad = self.verification.bad.lock().unwrap(); if bad.contains(&h) { return Err(ImportError::Bad(None)); diff --git a/util/sha3/build.rs b/util/sha3/build.rs index 9eb36fdb9..bbe16d720 100644 --- a/util/sha3/build.rs +++ b/util/sha3/build.rs @@ -21,6 +21,6 @@ extern crate gcc; fn main() { - gcc::Config::new().file("src/tinykeccak.c").flag("-O3").compile("libtinykeccak.a"); + gcc::compile_library("libtinykeccak.a", &["src/tinykeccak.c"]); } From 5f37f6edb442aa41097f7c014d2a0a08b20cb1f0 Mon Sep 17 00:00:00 2001 From: arkpar Date: Thu, 10 Mar 2016 21:01:17 +0100 Subject: [PATCH 6/6] Correct cache update order --- ethcore/src/blockchain/blockchain.rs | 52 ++++++++++++++-------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/ethcore/src/blockchain/blockchain.rs b/ethcore/src/blockchain/blockchain.rs index 8e56cdc5f..4ebd111d9 100644 --- a/ethcore/src/blockchain/blockchain.rs +++ b/ethcore/src/blockchain/blockchain.rs @@ -452,7 +452,32 @@ impl BlockChain { let batch = DBTransaction::new(); batch.put(b"best", &update.info.hash).unwrap(); - // These cached values must be updated atomically + { + let mut write_details = self.block_details.write().unwrap(); + for (hash, details) in update.block_details.into_iter() { + batch.put_extras(&hash, &details); + self.note_used(CacheID::Extras(ExtrasIndex::BlockDetails, hash.clone())); + write_details.insert(hash, details); + } + } + + { + let mut write_receipts = self.block_receipts.write().unwrap(); + for (hash, receipt) in &update.block_receipts { + batch.put_extras(hash, receipt); + write_receipts.remove(hash); + } + } + + { + let mut write_blocks_blooms = self.blocks_blooms.write().unwrap(); + for (bloom_hash, blocks_bloom) in &update.blocks_blooms { + batch.put_extras(bloom_hash, blocks_bloom); + write_blocks_blooms.remove(bloom_hash); + } + } + + // These cached values must be updated last and togeterh { let mut best_block = self.best_block.write().unwrap(); let mut write_hashes = self.block_hashes.write().unwrap(); @@ -481,31 +506,6 @@ impl BlockChain { } } - { - let mut write_details = self.block_details.write().unwrap(); - for (hash, details) in update.block_details.into_iter() { - batch.put_extras(&hash, &details); - self.note_used(CacheID::Extras(ExtrasIndex::BlockDetails, hash.clone())); - write_details.insert(hash, details); - } - } - - { - let mut write_receipts = self.block_receipts.write().unwrap(); - for (hash, receipt) in &update.block_receipts { - batch.put_extras(hash, receipt); - write_receipts.remove(hash); - } - } - - { - let mut write_blocks_blooms = self.blocks_blooms.write().unwrap(); - for (bloom_hash, blocks_bloom) in &update.blocks_blooms { - batch.put_extras(bloom_hash, blocks_bloom); - write_blocks_blooms.remove(bloom_hash); - } - } - // update extras database self.extras_db.write(batch).unwrap(); }