Threading performance optimizations

This commit is contained in:
arkpar
2016-02-21 19:46:29 +01:00
parent 4e023a1bb8
commit c8076b2f9d
8 changed files with 148 additions and 95 deletions

View File

@@ -63,7 +63,7 @@ pub struct BlockQueue {
panic_handler: Arc<PanicHandler>,
engine: Arc<Box<Engine>>,
more_to_verify: Arc<Condvar>,
verification: Arc<Mutex<Verification>>,
verification: Arc<Verification>,
verifiers: Vec<JoinHandle<()>>,
deleting: Arc<AtomicBool>,
ready_signal: Arc<QueueSignal>,
@@ -98,12 +98,11 @@ impl QueueSignal {
}
}
#[derive(Default)]
struct Verification {
unverified: VecDeque<UnVerifiedBlock>,
verified: VecDeque<PreVerifiedBlock>,
verifying: VecDeque<VerifyingBlock>,
bad: HashSet<H256>,
unverified: Mutex<VecDeque<UnVerifiedBlock>>,
verified: Mutex<VecDeque<PreVerifiedBlock>>,
verifying: Mutex<VecDeque<VerifyingBlock>>,
bad: Mutex<HashSet<H256>>,
}
const MAX_UNVERIFIED_QUEUE_SIZE: usize = 50000;
@@ -111,7 +110,12 @@ const MAX_UNVERIFIED_QUEUE_SIZE: usize = 50000;
impl BlockQueue {
/// Creates a new queue instance.
pub fn new(engine: Arc<Box<Engine>>, message_channel: IoChannel<NetSyncMessage>) -> BlockQueue {
let verification = Arc::new(Mutex::new(Verification::default()));
let verification = Arc::new(Verification {
unverified: Mutex::new(VecDeque::new()),
verified: Mutex::new(VecDeque::new()),
verifying: Mutex::new(VecDeque::new()),
bad: Mutex::new(HashSet::new()),
});
let more_to_verify = Arc::new(Condvar::new());
let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel });
let deleting = Arc::new(AtomicBool::new(false));
@@ -119,7 +123,7 @@ impl BlockQueue {
let panic_handler = PanicHandler::new_in_arc();
let mut verifiers: Vec<JoinHandle<()>> = Vec::new();
let thread_count = max(::num_cpus::get(), 3) - 2;
let thread_count = max(::num_cpus::get(), 5) - 0;
for i in 0..thread_count {
let verification = verification.clone();
let engine = engine.clone();
@@ -133,7 +137,8 @@ impl BlockQueue {
.name(format!("Verifier #{}", i))
.spawn(move || {
panic_handler.catch_panic(move || {
BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty)
lower_thread_priority();
BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty)
}).unwrap()
})
.expect("Error starting block verification thread")
@@ -152,17 +157,17 @@ impl BlockQueue {
}
}
fn verify(verification: Arc<Mutex<Verification>>, engine: Arc<Box<Engine>>, wait: Arc<Condvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<Condvar>) {
fn verify(verification: Arc<Verification>, engine: Arc<Box<Engine>>, wait: Arc<Condvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<Condvar>) {
while !deleting.load(AtomicOrdering::Acquire) {
{
let mut lock = verification.lock().unwrap();
let mut unverified = verification.unverified.lock().unwrap();
if lock.unverified.is_empty() && lock.verifying.is_empty() {
if unverified.is_empty() && verification.verifying.lock().unwrap().is_empty() {
empty.notify_all();
}
while lock.unverified.is_empty() && !deleting.load(AtomicOrdering::Acquire) {
lock = wait.wait(lock).unwrap();
while unverified.is_empty() && !deleting.load(AtomicOrdering::Acquire) {
unverified = wait.wait(unverified).unwrap();
}
if deleting.load(AtomicOrdering::Acquire) {
@@ -171,39 +176,42 @@ impl BlockQueue {
}
let block = {
let mut v = verification.lock().unwrap();
if v.unverified.is_empty() {
let mut unverified = verification.unverified.lock().unwrap();
if unverified.is_empty() {
continue;
}
let block = v.unverified.pop_front().unwrap();
v.verifying.push_back(VerifyingBlock{ hash: block.header.hash(), block: None });
let mut verifying = verification.verifying.lock().unwrap();
let block = unverified.pop_front().unwrap();
verifying.push_back(VerifyingBlock{ hash: block.header.hash(), block: None });
block
};
let block_hash = block.header.hash();
match verify_block_unordered(block.header, block.bytes, engine.deref().deref()) {
Ok(verified) => {
let mut v = verification.lock().unwrap();
for e in &mut v.verifying {
let mut verifying = verification.verifying.lock().unwrap();
for e in verifying.iter_mut() {
if e.hash == block_hash {
e.block = Some(verified);
break;
}
}
if !v.verifying.is_empty() && v.verifying.front().unwrap().hash == block_hash {
if !verifying.is_empty() && verifying.front().unwrap().hash == block_hash {
// we're next!
let mut vref = v.deref_mut();
BlockQueue::drain_verifying(&mut vref.verifying, &mut vref.verified, &mut vref.bad);
let mut verified = verification.verified.lock().unwrap();
let mut bad = verification.bad.lock().unwrap();
BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad);
ready.set();
}
},
Err(err) => {
let mut v = verification.lock().unwrap();
let mut verifying = verification.verifying.lock().unwrap();
let mut verified = verification.verified.lock().unwrap();
let mut bad = verification.bad.lock().unwrap();
warn!(target: "client", "Stage 2 block verification failed for {}\nError: {:?}", block_hash, err);
v.bad.insert(block_hash.clone());
v.verifying.retain(|e| e.hash != block_hash);
let mut vref = v.deref_mut();
BlockQueue::drain_verifying(&mut vref.verifying, &mut vref.verified, &mut vref.bad);
bad.insert(block_hash.clone());
verifying.retain(|e| e.hash != block_hash);
BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad);
ready.set();
}
}
@@ -223,19 +231,21 @@ impl BlockQueue {
}
/// Clear the queue and stop verification activity.
pub fn clear(&mut self) {
let mut verification = self.verification.lock().unwrap();
verification.unverified.clear();
verification.verifying.clear();
verification.verified.clear();
pub fn clear(&self) {
let mut unverified = self.verification.unverified.lock().unwrap();
let mut verifying = self.verification.verifying.lock().unwrap();
let mut verified = self.verification.verified.lock().unwrap();
unverified.clear();
verifying.clear();
verified.clear();
self.processing.write().unwrap().clear();
}
/// Wait for queue to be empty
pub fn flush(&mut self) {
let mut verification = self.verification.lock().unwrap();
while !verification.unverified.is_empty() || !verification.verifying.is_empty() {
verification = self.empty.wait(verification).unwrap();
/// Wait for unverified queue to be empty
pub fn flush(&self) {
let mut unverified = self.verification.unverified.lock().unwrap();
while !unverified.is_empty() || !self.verification.verifying.lock().unwrap().is_empty() {
unverified = self.empty.wait(unverified).unwrap();
}
}
@@ -244,27 +254,29 @@ impl BlockQueue {
if self.processing.read().unwrap().contains(&hash) {
return BlockStatus::Queued;
}
if self.verification.lock().unwrap().bad.contains(&hash) {
if self.verification.bad.lock().unwrap().contains(&hash) {
return BlockStatus::Bad;
}
BlockStatus::Unknown
}
/// Add a block to the queue.
pub fn import_block(&mut self, bytes: Bytes) -> ImportResult {
pub fn import_block(&self, bytes: Bytes) -> ImportResult {
let header = BlockView::new(&bytes).header();
let h = header.hash();
if self.processing.read().unwrap().contains(&h) {
return Err(ImportError::AlreadyQueued);
}
{
let mut verification = self.verification.lock().unwrap();
if verification.bad.contains(&h) {
if self.processing.read().unwrap().contains(&h) {
return Err(ImportError::AlreadyQueued);
}
}
{
let mut bad = self.verification.bad.lock().unwrap();
if bad.contains(&h) {
return Err(ImportError::Bad(None));
}
if verification.bad.contains(&header.parent_hash) {
verification.bad.insert(h.clone());
if bad.contains(&header.parent_hash) {
bad.insert(h.clone());
return Err(ImportError::Bad(None));
}
}
@@ -272,39 +284,40 @@ impl BlockQueue {
match verify_block_basic(&header, &bytes, self.engine.deref().deref()) {
Ok(()) => {
self.processing.write().unwrap().insert(h.clone());
self.verification.lock().unwrap().unverified.push_back(UnVerifiedBlock { header: header, bytes: bytes });
self.verification.unverified.lock().unwrap().push_back(UnVerifiedBlock { header: header, bytes: bytes });
self.more_to_verify.notify_all();
Ok(h)
},
Err(err) => {
warn!(target: "client", "Stage 1 block verification failed for {}\nError: {:?}", BlockView::new(&bytes).header_view().sha3(), err);
self.verification.lock().unwrap().bad.insert(h.clone());
self.verification.bad.lock().unwrap().insert(h.clone());
Err(From::from(err))
}
}
}
/// Mark given block and all its children as bad. Stops verification.
pub fn mark_as_bad(&mut self, hash: &H256) {
let mut verification_lock = self.verification.lock().unwrap();
let mut verification = verification_lock.deref_mut();
verification.bad.insert(hash.clone());
pub fn mark_as_bad(&self, hash: &H256) {
let mut verified_lock = self.verification.verified.lock().unwrap();
let mut verified = verified_lock.deref_mut();
let mut bad = self.verification.bad.lock().unwrap();
bad.insert(hash.clone());
self.processing.write().unwrap().remove(&hash);
let mut new_verified = VecDeque::new();
for block in verification.verified.drain(..) {
if verification.bad.contains(&block.header.parent_hash) {
verification.bad.insert(block.header.hash());
for block in verified.drain(..) {
if bad.contains(&block.header.parent_hash) {
bad.insert(block.header.hash());
self.processing.write().unwrap().remove(&block.header.hash());
}
else {
new_verified.push_back(block);
}
}
verification.verified = new_verified;
*verified = new_verified;
}
/// Mark given block as processed
pub fn mark_as_good(&mut self, hashes: &[H256]) {
pub fn mark_as_good(&self, hashes: &[H256]) {
let mut processing = self.processing.write().unwrap();
for h in hashes {
processing.remove(&h);
@@ -312,16 +325,16 @@ impl BlockQueue {
}
/// Removes up to `max` verified blocks from the queue
pub fn drain(&mut self, max: usize) -> Vec<PreVerifiedBlock> {
let mut verification = self.verification.lock().unwrap();
let count = min(max, verification.verified.len());
pub fn drain(&self, max: usize) -> Vec<PreVerifiedBlock> {
let mut verified = self.verification.verified.lock().unwrap();
let count = min(max, verified.len());
let mut result = Vec::with_capacity(count);
for _ in 0..count {
let block = verification.verified.pop_front().unwrap();
let block = verified.pop_front().unwrap();
result.push(block);
}
self.ready_signal.reset();
if !verification.verified.is_empty() {
if !verified.is_empty() {
self.ready_signal.set();
}
result
@@ -329,11 +342,10 @@ impl BlockQueue {
/// Get queue status.
pub fn queue_info(&self) -> BlockQueueInfo {
let verification = self.verification.lock().unwrap();
BlockQueueInfo {
verified_queue_size: verification.verified.len(),
unverified_queue_size: verification.unverified.len(),
verifying_queue_size: verification.verifying.len(),
unverified_queue_size: self.verification.unverified.lock().unwrap().len(),
verifying_queue_size: self.verification.verifying.lock().unwrap().len(),
verified_queue_size: self.verification.verified.lock().unwrap().len(),
}
}
}