From 7c5fd042f3b9c2448c4fb3eb1bffb7c09b13a729 Mon Sep 17 00:00:00 2001 From: David Forstenlechner Date: Wed, 25 Sep 2019 19:45:49 +0200 Subject: [PATCH] [client]: Fix for incorrectly dropped consensus messages (#11082) (#11086) Fixes a race condition causing the currently_queued counter to underflow and consensus messages getting dropped incorrectly as a consequence. --- ethcore/src/client/client.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 2e004c3c2..c0e057d84 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -16,10 +16,11 @@ use std::cmp; use std::collections::{BTreeMap, HashSet, VecDeque}; +use std::convert::TryFrom; use std::io::{BufRead, BufReader}; use std::str::from_utf8; use std::sync::{Arc, Weak}; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering}; +use std::sync::atomic::{AtomicBool, AtomicI64, Ordering as AtomicOrdering}; use std::time::{Duration, Instant}; use ansi_term::Colour; @@ -2739,12 +2740,17 @@ fn transaction_receipt( /// Queue some items to be processed by IO client. struct IoChannelQueue { - currently_queued: Arc, - limit: usize, + /// Using a *signed* integer for counting currently queued messages since the + /// order in which the counter is incremented and decremented is not defined. + /// Using an unsigned integer can (and will) result in integer underflow, + /// incorrectly rejecting messages and returning a FullQueue error. + currently_queued: Arc, + limit: i64, } impl IoChannelQueue { pub fn new(limit: usize) -> Self { + let limit = i64::try_from(limit).unwrap_or(i64::max_value()); IoChannelQueue { currently_queued: Default::default(), limit, @@ -2756,9 +2762,12 @@ impl IoChannelQueue { { let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed); if queue_size >= self.limit { - return Err(EthcoreError::FullQueue(self.limit)) + let err_limit = usize::try_from(self.limit).unwrap_or(usize::max_value()); + return Err(EthcoreError::FullQueue(err_limit)) }; + let count = i64::try_from(count).unwrap_or(i64::max_value()); + let currently_queued = self.currently_queued.clone(); let _ok = channel.send(ClientIoMessage::execute(move |client| { currently_queued.fetch_sub(count, AtomicOrdering::SeqCst);