Fixes a race condition causing the currently_queued counter to underflow and consensus messages getting dropped incorrectly as a consequence.
This commit is contained in:
parent
d9201aa6f2
commit
7c5fd042f3
@ -16,10 +16,11 @@
|
|||||||
|
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
use std::collections::{BTreeMap, HashSet, VecDeque};
|
use std::collections::{BTreeMap, HashSet, VecDeque};
|
||||||
|
use std::convert::TryFrom;
|
||||||
use std::io::{BufRead, BufReader};
|
use std::io::{BufRead, BufReader};
|
||||||
use std::str::from_utf8;
|
use std::str::from_utf8;
|
||||||
use std::sync::{Arc, Weak};
|
use std::sync::{Arc, Weak};
|
||||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering};
|
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering as AtomicOrdering};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use ansi_term::Colour;
|
use ansi_term::Colour;
|
||||||
@ -2739,12 +2740,17 @@ fn transaction_receipt(
|
|||||||
|
|
||||||
/// Queue some items to be processed by IO client.
|
/// Queue some items to be processed by IO client.
|
||||||
struct IoChannelQueue {
|
struct IoChannelQueue {
|
||||||
currently_queued: Arc<AtomicUsize>,
|
/// Using a *signed* integer for counting currently queued messages since the
|
||||||
limit: usize,
|
/// order in which the counter is incremented and decremented is not defined.
|
||||||
|
/// Using an unsigned integer can (and will) result in integer underflow,
|
||||||
|
/// incorrectly rejecting messages and returning a FullQueue error.
|
||||||
|
currently_queued: Arc<AtomicI64>,
|
||||||
|
limit: i64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IoChannelQueue {
|
impl IoChannelQueue {
|
||||||
pub fn new(limit: usize) -> Self {
|
pub fn new(limit: usize) -> Self {
|
||||||
|
let limit = i64::try_from(limit).unwrap_or(i64::max_value());
|
||||||
IoChannelQueue {
|
IoChannelQueue {
|
||||||
currently_queued: Default::default(),
|
currently_queued: Default::default(),
|
||||||
limit,
|
limit,
|
||||||
@ -2756,9 +2762,12 @@ impl IoChannelQueue {
|
|||||||
{
|
{
|
||||||
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
|
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
|
||||||
if queue_size >= self.limit {
|
if queue_size >= self.limit {
|
||||||
return Err(EthcoreError::FullQueue(self.limit))
|
let err_limit = usize::try_from(self.limit).unwrap_or(usize::max_value());
|
||||||
|
return Err(EthcoreError::FullQueue(err_limit))
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let count = i64::try_from(count).unwrap_or(i64::max_value());
|
||||||
|
|
||||||
let currently_queued = self.currently_queued.clone();
|
let currently_queued = self.currently_queued.clone();
|
||||||
let _ok = channel.send(ClientIoMessage::execute(move |client| {
|
let _ok = channel.send(ClientIoMessage::execute(move |client| {
|
||||||
currently_queued.fetch_sub(count, AtomicOrdering::SeqCst);
|
currently_queued.fetch_sub(count, AtomicOrdering::SeqCst);
|
||||||
|
Loading…
Reference in New Issue
Block a user