openethereum/miner/using-queue/src/lib.rs

277 lines
8.1 KiB
Rust
Raw Normal View History

// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
2016-03-24 08:49:54 +01:00
// Parity Ethereum is free software: you can redistribute it and/or modify
2016-03-24 08:49:54 +01:00
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
2016-03-24 08:49:54 +01:00
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
2016-03-24 08:49:54 +01:00
//! Queue-like datastructure including notion of usage.
/// Special queue-like datastructure that includes the notion of
/// usage to avoid items that were queued but never used from making it into
/// the queue.
pub struct UsingQueue<T> {
2016-03-24 08:49:54 +01:00
/// Not yet being sealed by a miner, but if one asks for work, we'd prefer they do this.
pending: Option<T>,
/// Currently being sealed by miners.
in_use: Vec<T>,
/// The maximum allowable number of items in_use.
max_size: usize,
}
/// Take an item or just clone it?
pub enum GetAction {
/// Remove the item, faster but you can't get it back.
Take,
/// Clone the item, slower but you can get it again.
Clone,
}
impl<T> UsingQueue<T> {
2016-03-24 08:49:54 +01:00
/// Create a new struct with a maximum size of `max_size`.
pub fn new(max_size: usize) -> UsingQueue<T> {
UsingQueue {
pending: None,
in_use: vec![],
max_size: max_size,
}
}
/// Return a reference to the item at the top of the queue (or `None` if the queue is empty);
/// it doesn't constitute noting that the item is used.
pub fn peek_last_ref(&self) -> Option<&T> {
self.pending.as_ref().or(self.in_use.last())
}
2016-03-24 08:49:54 +01:00
/// Return a reference to the item at the top of the queue (or `None` if the queue is empty);
/// this constitutes using the item and will remain in the queue for at least another
/// `max_size` invocations of `set_pending() + use_last_ref()`.
2016-03-24 08:49:54 +01:00
pub fn use_last_ref(&mut self) -> Option<&T> {
if let Some(x) = self.pending.take() {
self.in_use.push(x);
if self.in_use.len() > self.max_size {
self.in_use.remove(0);
}
}
self.in_use.last()
}
/// Place an item on the end of the queue. The previously pending item will be removed
/// if `use_last_ref()` since it was set.
pub fn set_pending(&mut self, b: T) {
2016-03-24 08:49:54 +01:00
self.pending = Some(b);
}
/// Is there anything in the queue currently?
pub fn is_in_use(&self) -> bool { self.in_use.len() > 0 }
2016-03-24 08:49:54 +01:00
/// Clears everything; the queue is entirely reset.
pub fn reset(&mut self) {
self.pending = None;
self.in_use.clear();
}
2016-03-24 23:03:22 +01:00
/// Returns `Some` item which is the first that `f` returns `true` with a reference to it
/// as a parameter or `None` if no such item exists in the queue.
New Transaction Queue implementation (#8074) * Implementation of Verifier, Scoring and Ready. * Queue in progress. * TransactionPool. * Prepare for txpool release. * Miner refactor [WiP] * WiP reworking miner. * Make it compile. * Add some docs. * Split blockchain access to a separate file. * Work on miner API. * Fix ethcore tests. * Refactor miner interface for sealing/work packages. * Implement next nonce. * RPC compiles. * Implement couple of missing methdods for RPC. * Add transaction queue listeners. * Compiles! * Clean-up and parallelize. * Get rid of RefCell in header. * Revert "Get rid of RefCell in header." This reverts commit 0f2424c9b7319a786e1565ea2a8a6d801a21b4fb. * Override Sync requirement. * Fix status display. * Unify logging. * Extract some cheap checks. * Measurements and optimizations. * Fix scoring bug, heap size of bug and add cache * Disable tx queueing and parallel verification. * Make ethcore and ethcore-miner compile again. * Make RPC compile again. * Bunch of txpool tests. * Migrate transaction queue tests. * Nonce Cap * Nonce cap cache and tests. * Remove stale future transactions from the queue. * Optimize scoring and write some tests. * Simple penalization. * Clean up and support for different scoring algorithms. * Add CLI parameters for the new queue. * Remove banning queue. * Disable debug build. * Change per_sender limit to be 1% instead of 5% * Avoid cloning when propagating transactions. * Remove old todo. * Post-review fixes. * Fix miner options default. * Implement back ready transactions for light client. * Get rid of from_pending_block * Pass rejection reason. * Add more details to drop. * Rollback heap size of. * Avoid cloning hashes when propagating and include more details on rejection. * Fix tests. * Introduce nonces cache. * Remove uneccessary hashes allocation. * Lower the mem limit. * Re-enable parallel verification. * Add miner log. Don't check the type if not below min_gas_price. * Add more traces, fix disabling miner. * Fix creating pending blocks twice on AuRa authorities. * Fix tests. * re-use pending blocks in AuRa * Use reseal_min_period to prevent too frequent update_sealing. * Fix log to contain hash not sender. * Optimize local transactions. * Fix aura tests. * Update locks comments. * Get rid of unsafe Sync impl. * Review fixes. * Remove excessive matches. * Fix compilation errors. * Use new pool in private transactions. * Fix private-tx test. * Fix secret store tests. * Actually use gas_floor_target * Fix config tests. * Fix pool tests. * Address grumbles.
2018-04-13 17:34:27 +02:00
fn take_used_if<P>(&mut self, predicate: P) -> Option<T> where P: Fn(&T) -> bool {
2016-03-24 23:03:22 +01:00
self.in_use.iter().position(|r| predicate(r)).map(|i| self.in_use.remove(i))
}
/// Returns `Some` item which is the first that `f` returns `true` with a reference to it
/// as a parameter or `None` if no such item exists in the queue.
fn clone_used_if<P>(&mut self, predicate: P) -> Option<T> where P: Fn(&T) -> bool, T: Clone {
self.in_use.iter().find(|r| predicate(r)).cloned()
}
/// Fork-function for `take_used_if` and `clone_used_if`.
pub fn get_used_if<P>(&mut self, action: GetAction, predicate: P) -> Option<T> where P: Fn(&T) -> bool, T: Clone {
match action {
GetAction::Take => self.take_used_if(predicate),
GetAction::Clone => self.clone_used_if(predicate),
}
}
/// Returns a clone of the pending block if `f` returns `true` with a reference to it as
2016-03-24 08:49:54 +01:00
/// a parameter, otherwise `None`.
///
/// If pending block is not available will clone the first of the used blocks that match the predicate.
pub fn get_pending_if<P>(&mut self, predicate: P) -> Option<T> where P: Fn(&T) -> bool, T: Clone {
2016-03-24 08:49:54 +01:00
// a bit clumsy - TODO: think about a nicer way of expressing this.
if let Some(ref x) = self.pending {
if predicate(x) {
Some(x.clone())
2016-03-24 08:49:54 +01:00
} else {
None
}
} else {
self.in_use.last().into_iter().filter(|x| predicate(x)).next().cloned()
}
}
}
#[test]
fn should_not_find_when_pushed() {
2016-03-24 08:49:54 +01:00
let mut q = UsingQueue::new(2);
q.set_pending(1);
assert!(q.take_used_if(|i| i == &1).is_none());
2016-03-24 08:49:54 +01:00
}
#[test]
fn should_not_find_when_pushed_with_clone() {
let mut q = UsingQueue::new(2);
q.set_pending(1);
assert!(q.clone_used_if(|i| i == &1).is_none());
}
2016-03-24 08:49:54 +01:00
#[test]
fn should_find_when_pushed_and_used() {
let mut q = UsingQueue::new(2);
q.set_pending(1);
2016-03-24 08:49:54 +01:00
q.use_last_ref();
assert!(q.take_used_if(|i| i == &1).unwrap() == 1);
}
#[test]
fn should_have_same_semantics_for_get_take_clone() {
let mut q = UsingQueue::new(2);
q.set_pending(1);
assert!(q.get_used_if(GetAction::Clone, |i| i == &1).is_none());
assert!(q.get_used_if(GetAction::Take, |i| i == &1).is_none());
q.use_last_ref();
assert!(q.get_used_if(GetAction::Clone, |i| i == &1).unwrap() == 1);
assert!(q.get_used_if(GetAction::Clone, |i| i == &1).unwrap() == 1);
assert!(q.get_used_if(GetAction::Take, |i| i == &1).unwrap() == 1);
assert!(q.get_used_if(GetAction::Clone, |i| i == &1).is_none());
assert!(q.get_used_if(GetAction::Take, |i| i == &1).is_none());
}
#[test]
fn should_find_when_pushed_and_used_with_clone() {
let mut q = UsingQueue::new(2);
q.set_pending(1);
q.use_last_ref();
assert!(q.clone_used_if(|i| i == &1).unwrap() == 1);
}
#[test]
fn should_not_find_again_when_pushed_and_taken() {
let mut q = UsingQueue::new(2);
q.set_pending(1);
q.use_last_ref();
assert!(q.take_used_if(|i| i == &1).unwrap() == 1);
assert!(q.clone_used_if(|i| i == &1).is_none());
}
#[test]
fn should_find_again_when_pushed_and_cloned() {
let mut q = UsingQueue::new(2);
q.set_pending(1);
q.use_last_ref();
assert!(q.clone_used_if(|i| i == &1).unwrap() == 1);
assert!(q.clone_used_if(|i| i == &1).unwrap() == 1);
assert!(q.take_used_if(|i| i == &1).unwrap() == 1);
2016-03-24 08:49:54 +01:00
}
#[test]
fn should_find_when_others_used() {
let mut q = UsingQueue::new(2);
q.set_pending(1);
2016-03-24 08:49:54 +01:00
q.use_last_ref();
q.set_pending(2);
2016-03-24 08:49:54 +01:00
q.use_last_ref();
assert!(q.take_used_if(|i| i == &1).is_some());
2016-03-24 08:49:54 +01:00
}
#[test]
fn should_not_find_when_too_many_used() {
let mut q = UsingQueue::new(1);
q.set_pending(1);
2016-03-24 08:49:54 +01:00
q.use_last_ref();
q.set_pending(2);
2016-03-24 08:49:54 +01:00
q.use_last_ref();
assert!(q.take_used_if(|i| i == &1).is_none());
2016-03-24 08:49:54 +01:00
}
#[test]
fn should_not_find_when_not_used_and_then_pushed() {
let mut q = UsingQueue::new(3);
q.set_pending(1);
q.set_pending(2);
2016-03-24 08:49:54 +01:00
q.use_last_ref();
assert!(q.take_used_if(|i| i == &1).is_none());
2016-03-24 08:49:54 +01:00
}
#[test]
fn should_peek_correctly_after_push() {
let mut q = UsingQueue::new(3);
q.set_pending(1);
2016-03-24 08:49:54 +01:00
assert_eq!(q.peek_last_ref(), Some(&1));
q.set_pending(2);
2016-03-24 08:49:54 +01:00
assert_eq!(q.peek_last_ref(), Some(&2));
}
#[test]
fn should_inspect_correctly() {
let mut q = UsingQueue::new(3);
q.set_pending(1);
2016-03-24 08:49:54 +01:00
assert_eq!(q.use_last_ref(), Some(&1));
assert_eq!(q.peek_last_ref(), Some(&1));
q.set_pending(2);
2016-03-24 08:49:54 +01:00
assert_eq!(q.use_last_ref(), Some(&2));
assert_eq!(q.peek_last_ref(), Some(&2));
}
#[test]
fn should_not_find_when_not_used_peeked_and_then_pushed() {
let mut q = UsingQueue::new(3);
q.set_pending(1);
2016-03-24 08:49:54 +01:00
q.peek_last_ref();
q.set_pending(2);
2016-03-24 08:49:54 +01:00
q.use_last_ref();
assert!(q.take_used_if(|i| i == &1).is_none());
2016-03-24 08:49:54 +01:00
}
#[test]
fn should_pop_used() {
let mut q = UsingQueue::new(3);
q.set_pending(1);
2016-03-24 08:49:54 +01:00
q.use_last_ref();
let popped = q.get_pending_if(|i| i == &1);
2016-03-24 08:49:54 +01:00
assert_eq!(popped, Some(1));
}
#[test]
fn should_not_pop_last_pending() {
2016-03-24 08:49:54 +01:00
let mut q = UsingQueue::new(3);
q.set_pending(1);
assert_eq!(q.get_pending_if(|i| i == &1), Some(1));
assert_eq!(q.get_pending_if(|i| i == &1), Some(1));
2016-03-24 08:49:54 +01:00
}
#[test]
fn should_not_pop_unused_before_used() {
let mut q = UsingQueue::new(3);
q.set_pending(1);
q.set_pending(2);
let popped = q.get_pending_if(|i| i == &1);
2016-03-24 08:49:54 +01:00
assert_eq!(popped, None);
}
#[test]
fn should_not_remove_used_popped() {
let mut q = UsingQueue::new(3);
q.set_pending(1);
2016-03-24 08:49:54 +01:00
q.use_last_ref();
assert_eq!(q.get_pending_if(|i| i == &1), Some(1));
assert_eq!(q.get_pending_if(|i| i == &1), Some(1));
2016-03-24 08:49:54 +01:00
}