Prevent blockchain & miner racing when accessing pending block. (#9310)
* Prevent blockchain & miner racing when accessing pending block. * Fix unavailability of pending block during reseal.
This commit is contained in:
parent
65a1d88907
commit
30e40079ca
@ -319,14 +319,15 @@ impl Miner {
|
|||||||
/// Retrieves an existing pending block iff it's not older than given block number.
|
/// Retrieves an existing pending block iff it's not older than given block number.
|
||||||
///
|
///
|
||||||
/// NOTE: This will not prepare a new pending block if it's not existing.
|
/// NOTE: This will not prepare a new pending block if it's not existing.
|
||||||
/// See `map_pending_block` for alternative behaviour.
|
|
||||||
fn map_existing_pending_block<F, T>(&self, f: F, latest_block_number: BlockNumber) -> Option<T> where
|
fn map_existing_pending_block<F, T>(&self, f: F, latest_block_number: BlockNumber) -> Option<T> where
|
||||||
F: FnOnce(&ClosedBlock) -> T,
|
F: FnOnce(&ClosedBlock) -> T,
|
||||||
{
|
{
|
||||||
self.sealing.lock().queue
|
self.sealing.lock().queue
|
||||||
.peek_last_ref()
|
.peek_last_ref()
|
||||||
.and_then(|b| {
|
.and_then(|b| {
|
||||||
if b.block().header().number() > latest_block_number {
|
// to prevent a data race between block import and updating pending block
|
||||||
|
// we allow the number to be equal.
|
||||||
|
if b.block().header().number() >= latest_block_number {
|
||||||
Some(f(b))
|
Some(f(b))
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
@ -365,7 +366,7 @@ impl Miner {
|
|||||||
// if at least one was pushed successfully, close and enqueue new ClosedBlock;
|
// if at least one was pushed successfully, close and enqueue new ClosedBlock;
|
||||||
// otherwise, leave everything alone.
|
// otherwise, leave everything alone.
|
||||||
// otherwise, author a fresh block.
|
// otherwise, author a fresh block.
|
||||||
let mut open_block = match sealing.queue.pop_if(|b| b.block().header().parent_hash() == &best_hash) {
|
let mut open_block = match sealing.queue.get_pending_if(|b| b.block().header().parent_hash() == &best_hash) {
|
||||||
Some(old_block) => {
|
Some(old_block) => {
|
||||||
trace!(target: "miner", "prepare_block: Already have previous work; updating and returning");
|
trace!(target: "miner", "prepare_block: Already have previous work; updating and returning");
|
||||||
// add transactions to old_block
|
// add transactions to old_block
|
||||||
@ -628,7 +629,7 @@ impl Miner {
|
|||||||
{
|
{
|
||||||
let mut sealing = self.sealing.lock();
|
let mut sealing = self.sealing.lock();
|
||||||
sealing.next_mandatory_reseal = Instant::now() + self.options.reseal_max_period;
|
sealing.next_mandatory_reseal = Instant::now() + self.options.reseal_max_period;
|
||||||
sealing.queue.push(block.clone());
|
sealing.queue.set_pending(block.clone());
|
||||||
sealing.queue.use_last_ref();
|
sealing.queue.use_last_ref();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -690,7 +691,7 @@ impl Miner {
|
|||||||
);
|
);
|
||||||
let is_new = original_work_hash.map_or(true, |h| h != block_hash);
|
let is_new = original_work_hash.map_or(true, |h| h != block_hash);
|
||||||
|
|
||||||
sealing.queue.push(block);
|
sealing.queue.set_pending(block);
|
||||||
|
|
||||||
#[cfg(feature = "work-notify")]
|
#[cfg(feature = "work-notify")]
|
||||||
{
|
{
|
||||||
@ -1108,7 +1109,7 @@ impl miner::MinerService for Miner {
|
|||||||
Some(false) => {
|
Some(false) => {
|
||||||
trace!(target: "miner", "update_sealing: engine is not keen to seal internally right now");
|
trace!(target: "miner", "update_sealing: engine is not keen to seal internally right now");
|
||||||
// anyway, save the block for later use
|
// anyway, save the block for later use
|
||||||
self.sealing.lock().queue.push(block);
|
self.sealing.lock().queue.set_pending(block);
|
||||||
},
|
},
|
||||||
None => {
|
None => {
|
||||||
trace!(target: "miner", "update_sealing: engine does not seal internally, preparing work");
|
trace!(target: "miner", "update_sealing: engine does not seal internally, preparing work");
|
||||||
|
@ -54,7 +54,7 @@ impl<T> UsingQueue<T> {
|
|||||||
|
|
||||||
/// Return a reference to the item at the top of the queue (or `None` if the queue is empty);
|
/// Return a reference to the item at the top of the queue (or `None` if the queue is empty);
|
||||||
/// this constitutes using the item and will remain in the queue for at least another
|
/// this constitutes using the item and will remain in the queue for at least another
|
||||||
/// `max_size` invocations of `push()`.
|
/// `max_size` invocations of `set_pending() + use_last_ref()`.
|
||||||
pub fn use_last_ref(&mut self) -> Option<&T> {
|
pub fn use_last_ref(&mut self) -> Option<&T> {
|
||||||
if let Some(x) = self.pending.take() {
|
if let Some(x) = self.pending.take() {
|
||||||
self.in_use.push(x);
|
self.in_use.push(x);
|
||||||
@ -65,9 +65,9 @@ impl<T> UsingQueue<T> {
|
|||||||
self.in_use.last()
|
self.in_use.last()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Place an item on the end of the queue. The previously `push()`ed item will be removed
|
/// Place an item on the end of the queue. The previously pending item will be removed
|
||||||
/// if `use_last_ref()` since it was `push()`ed.
|
/// if `use_last_ref()` since it was set.
|
||||||
pub fn push(&mut self, b: T) {
|
pub fn set_pending(&mut self, b: T) {
|
||||||
self.pending = Some(b);
|
self.pending = Some(b);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -100,17 +100,16 @@ impl<T> UsingQueue<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the most recently pushed block if `f` returns `true` with a reference to it as
|
/// Returns a clone of the pending block if `f` returns `true` with a reference to it as
|
||||||
/// a parameter, otherwise `None`.
|
/// a parameter, otherwise `None`.
|
||||||
/// Will not destroy a block if a reference to it has previously been returned by `use_last_ref`,
|
///
|
||||||
/// but rather clone it.
|
/// If pending block is not available will clone the first of the used blocks that match the predicate.
|
||||||
pub fn pop_if<P>(&mut self, predicate: P) -> Option<T> where P: Fn(&T) -> bool, T: Clone {
|
pub fn get_pending_if<P>(&mut self, predicate: P) -> Option<T> where P: Fn(&T) -> bool, T: Clone {
|
||||||
// a bit clumsy - TODO: think about a nicer way of expressing this.
|
// a bit clumsy - TODO: think about a nicer way of expressing this.
|
||||||
if let Some(x) = self.pending.take() {
|
if let Some(ref x) = self.pending {
|
||||||
if predicate(&x) {
|
if predicate(x) {
|
||||||
Some(x)
|
Some(x.clone())
|
||||||
} else {
|
} else {
|
||||||
self.pending = Some(x);
|
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -122,21 +121,21 @@ impl<T> UsingQueue<T> {
|
|||||||
#[test]
|
#[test]
|
||||||
fn should_not_find_when_pushed() {
|
fn should_not_find_when_pushed() {
|
||||||
let mut q = UsingQueue::new(2);
|
let mut q = UsingQueue::new(2);
|
||||||
q.push(1);
|
q.set_pending(1);
|
||||||
assert!(q.take_used_if(|i| i == &1).is_none());
|
assert!(q.take_used_if(|i| i == &1).is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_not_find_when_pushed_with_clone() {
|
fn should_not_find_when_pushed_with_clone() {
|
||||||
let mut q = UsingQueue::new(2);
|
let mut q = UsingQueue::new(2);
|
||||||
q.push(1);
|
q.set_pending(1);
|
||||||
assert!(q.clone_used_if(|i| i == &1).is_none());
|
assert!(q.clone_used_if(|i| i == &1).is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_find_when_pushed_and_used() {
|
fn should_find_when_pushed_and_used() {
|
||||||
let mut q = UsingQueue::new(2);
|
let mut q = UsingQueue::new(2);
|
||||||
q.push(1);
|
q.set_pending(1);
|
||||||
q.use_last_ref();
|
q.use_last_ref();
|
||||||
assert!(q.take_used_if(|i| i == &1).unwrap() == 1);
|
assert!(q.take_used_if(|i| i == &1).unwrap() == 1);
|
||||||
}
|
}
|
||||||
@ -144,7 +143,7 @@ fn should_find_when_pushed_and_used() {
|
|||||||
#[test]
|
#[test]
|
||||||
fn should_have_same_semantics_for_get_take_clone() {
|
fn should_have_same_semantics_for_get_take_clone() {
|
||||||
let mut q = UsingQueue::new(2);
|
let mut q = UsingQueue::new(2);
|
||||||
q.push(1);
|
q.set_pending(1);
|
||||||
assert!(q.get_used_if(GetAction::Clone, |i| i == &1).is_none());
|
assert!(q.get_used_if(GetAction::Clone, |i| i == &1).is_none());
|
||||||
assert!(q.get_used_if(GetAction::Take, |i| i == &1).is_none());
|
assert!(q.get_used_if(GetAction::Take, |i| i == &1).is_none());
|
||||||
q.use_last_ref();
|
q.use_last_ref();
|
||||||
@ -158,7 +157,7 @@ fn should_have_same_semantics_for_get_take_clone() {
|
|||||||
#[test]
|
#[test]
|
||||||
fn should_find_when_pushed_and_used_with_clone() {
|
fn should_find_when_pushed_and_used_with_clone() {
|
||||||
let mut q = UsingQueue::new(2);
|
let mut q = UsingQueue::new(2);
|
||||||
q.push(1);
|
q.set_pending(1);
|
||||||
q.use_last_ref();
|
q.use_last_ref();
|
||||||
assert!(q.clone_used_if(|i| i == &1).unwrap() == 1);
|
assert!(q.clone_used_if(|i| i == &1).unwrap() == 1);
|
||||||
}
|
}
|
||||||
@ -166,7 +165,7 @@ fn should_find_when_pushed_and_used_with_clone() {
|
|||||||
#[test]
|
#[test]
|
||||||
fn should_not_find_again_when_pushed_and_taken() {
|
fn should_not_find_again_when_pushed_and_taken() {
|
||||||
let mut q = UsingQueue::new(2);
|
let mut q = UsingQueue::new(2);
|
||||||
q.push(1);
|
q.set_pending(1);
|
||||||
q.use_last_ref();
|
q.use_last_ref();
|
||||||
assert!(q.take_used_if(|i| i == &1).unwrap() == 1);
|
assert!(q.take_used_if(|i| i == &1).unwrap() == 1);
|
||||||
assert!(q.clone_used_if(|i| i == &1).is_none());
|
assert!(q.clone_used_if(|i| i == &1).is_none());
|
||||||
@ -175,7 +174,7 @@ fn should_not_find_again_when_pushed_and_taken() {
|
|||||||
#[test]
|
#[test]
|
||||||
fn should_find_again_when_pushed_and_cloned() {
|
fn should_find_again_when_pushed_and_cloned() {
|
||||||
let mut q = UsingQueue::new(2);
|
let mut q = UsingQueue::new(2);
|
||||||
q.push(1);
|
q.set_pending(1);
|
||||||
q.use_last_ref();
|
q.use_last_ref();
|
||||||
assert!(q.clone_used_if(|i| i == &1).unwrap() == 1);
|
assert!(q.clone_used_if(|i| i == &1).unwrap() == 1);
|
||||||
assert!(q.clone_used_if(|i| i == &1).unwrap() == 1);
|
assert!(q.clone_used_if(|i| i == &1).unwrap() == 1);
|
||||||
@ -185,9 +184,9 @@ fn should_find_again_when_pushed_and_cloned() {
|
|||||||
#[test]
|
#[test]
|
||||||
fn should_find_when_others_used() {
|
fn should_find_when_others_used() {
|
||||||
let mut q = UsingQueue::new(2);
|
let mut q = UsingQueue::new(2);
|
||||||
q.push(1);
|
q.set_pending(1);
|
||||||
q.use_last_ref();
|
q.use_last_ref();
|
||||||
q.push(2);
|
q.set_pending(2);
|
||||||
q.use_last_ref();
|
q.use_last_ref();
|
||||||
assert!(q.take_used_if(|i| i == &1).is_some());
|
assert!(q.take_used_if(|i| i == &1).is_some());
|
||||||
}
|
}
|
||||||
@ -195,9 +194,9 @@ fn should_find_when_others_used() {
|
|||||||
#[test]
|
#[test]
|
||||||
fn should_not_find_when_too_many_used() {
|
fn should_not_find_when_too_many_used() {
|
||||||
let mut q = UsingQueue::new(1);
|
let mut q = UsingQueue::new(1);
|
||||||
q.push(1);
|
q.set_pending(1);
|
||||||
q.use_last_ref();
|
q.use_last_ref();
|
||||||
q.push(2);
|
q.set_pending(2);
|
||||||
q.use_last_ref();
|
q.use_last_ref();
|
||||||
assert!(q.take_used_if(|i| i == &1).is_none());
|
assert!(q.take_used_if(|i| i == &1).is_none());
|
||||||
}
|
}
|
||||||
@ -205,8 +204,8 @@ fn should_not_find_when_too_many_used() {
|
|||||||
#[test]
|
#[test]
|
||||||
fn should_not_find_when_not_used_and_then_pushed() {
|
fn should_not_find_when_not_used_and_then_pushed() {
|
||||||
let mut q = UsingQueue::new(3);
|
let mut q = UsingQueue::new(3);
|
||||||
q.push(1);
|
q.set_pending(1);
|
||||||
q.push(2);
|
q.set_pending(2);
|
||||||
q.use_last_ref();
|
q.use_last_ref();
|
||||||
assert!(q.take_used_if(|i| i == &1).is_none());
|
assert!(q.take_used_if(|i| i == &1).is_none());
|
||||||
}
|
}
|
||||||
@ -214,19 +213,19 @@ fn should_not_find_when_not_used_and_then_pushed() {
|
|||||||
#[test]
|
#[test]
|
||||||
fn should_peek_correctly_after_push() {
|
fn should_peek_correctly_after_push() {
|
||||||
let mut q = UsingQueue::new(3);
|
let mut q = UsingQueue::new(3);
|
||||||
q.push(1);
|
q.set_pending(1);
|
||||||
assert_eq!(q.peek_last_ref(), Some(&1));
|
assert_eq!(q.peek_last_ref(), Some(&1));
|
||||||
q.push(2);
|
q.set_pending(2);
|
||||||
assert_eq!(q.peek_last_ref(), Some(&2));
|
assert_eq!(q.peek_last_ref(), Some(&2));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_inspect_correctly() {
|
fn should_inspect_correctly() {
|
||||||
let mut q = UsingQueue::new(3);
|
let mut q = UsingQueue::new(3);
|
||||||
q.push(1);
|
q.set_pending(1);
|
||||||
assert_eq!(q.use_last_ref(), Some(&1));
|
assert_eq!(q.use_last_ref(), Some(&1));
|
||||||
assert_eq!(q.peek_last_ref(), Some(&1));
|
assert_eq!(q.peek_last_ref(), Some(&1));
|
||||||
q.push(2);
|
q.set_pending(2);
|
||||||
assert_eq!(q.use_last_ref(), Some(&2));
|
assert_eq!(q.use_last_ref(), Some(&2));
|
||||||
assert_eq!(q.peek_last_ref(), Some(&2));
|
assert_eq!(q.peek_last_ref(), Some(&2));
|
||||||
}
|
}
|
||||||
@ -234,9 +233,9 @@ fn should_inspect_correctly() {
|
|||||||
#[test]
|
#[test]
|
||||||
fn should_not_find_when_not_used_peeked_and_then_pushed() {
|
fn should_not_find_when_not_used_peeked_and_then_pushed() {
|
||||||
let mut q = UsingQueue::new(3);
|
let mut q = UsingQueue::new(3);
|
||||||
q.push(1);
|
q.set_pending(1);
|
||||||
q.peek_last_ref();
|
q.peek_last_ref();
|
||||||
q.push(2);
|
q.set_pending(2);
|
||||||
q.use_last_ref();
|
q.use_last_ref();
|
||||||
assert!(q.take_used_if(|i| i == &1).is_none());
|
assert!(q.take_used_if(|i| i == &1).is_none());
|
||||||
}
|
}
|
||||||
@ -244,34 +243,34 @@ fn should_not_find_when_not_used_peeked_and_then_pushed() {
|
|||||||
#[test]
|
#[test]
|
||||||
fn should_pop_used() {
|
fn should_pop_used() {
|
||||||
let mut q = UsingQueue::new(3);
|
let mut q = UsingQueue::new(3);
|
||||||
q.push(1);
|
q.set_pending(1);
|
||||||
q.use_last_ref();
|
q.use_last_ref();
|
||||||
let popped = q.pop_if(|i| i == &1);
|
let popped = q.get_pending_if(|i| i == &1);
|
||||||
assert_eq!(popped, Some(1));
|
assert_eq!(popped, Some(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_pop_unused() {
|
fn should_not_pop_last_pending() {
|
||||||
let mut q = UsingQueue::new(3);
|
let mut q = UsingQueue::new(3);
|
||||||
q.push(1);
|
q.set_pending(1);
|
||||||
assert_eq!(q.pop_if(|i| i == &1), Some(1));
|
assert_eq!(q.get_pending_if(|i| i == &1), Some(1));
|
||||||
assert_eq!(q.pop_if(|i| i == &1), None);
|
assert_eq!(q.get_pending_if(|i| i == &1), Some(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_not_pop_unused_before_used() {
|
fn should_not_pop_unused_before_used() {
|
||||||
let mut q = UsingQueue::new(3);
|
let mut q = UsingQueue::new(3);
|
||||||
q.push(1);
|
q.set_pending(1);
|
||||||
q.push(2);
|
q.set_pending(2);
|
||||||
let popped = q.pop_if(|i| i == &1);
|
let popped = q.get_pending_if(|i| i == &1);
|
||||||
assert_eq!(popped, None);
|
assert_eq!(popped, None);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_not_remove_used_popped() {
|
fn should_not_remove_used_popped() {
|
||||||
let mut q = UsingQueue::new(3);
|
let mut q = UsingQueue::new(3);
|
||||||
q.push(1);
|
q.set_pending(1);
|
||||||
q.use_last_ref();
|
q.use_last_ref();
|
||||||
assert_eq!(q.pop_if(|i| i == &1), Some(1));
|
assert_eq!(q.get_pending_if(|i| i == &1), Some(1));
|
||||||
assert_eq!(q.pop_if(|i| i == &1), Some(1));
|
assert_eq!(q.get_pending_if(|i| i == &1), Some(1));
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user