openethereum/miner/src/pool/listener.rs

160 lines
4.2 KiB
Rust
Raw Normal View History

// Copyright 2015-2018 Parity Technologies (UK) Ltd.
New Transaction Queue implementation (#8074) * Implementation of Verifier, Scoring and Ready. * Queue in progress. * TransactionPool. * Prepare for txpool release. * Miner refactor [WiP] * WiP reworking miner. * Make it compile. * Add some docs. * Split blockchain access to a separate file. * Work on miner API. * Fix ethcore tests. * Refactor miner interface for sealing/work packages. * Implement next nonce. * RPC compiles. * Implement couple of missing methdods for RPC. * Add transaction queue listeners. * Compiles! * Clean-up and parallelize. * Get rid of RefCell in header. * Revert "Get rid of RefCell in header." This reverts commit 0f2424c9b7319a786e1565ea2a8a6d801a21b4fb. * Override Sync requirement. * Fix status display. * Unify logging. * Extract some cheap checks. * Measurements and optimizations. * Fix scoring bug, heap size of bug and add cache * Disable tx queueing and parallel verification. * Make ethcore and ethcore-miner compile again. * Make RPC compile again. * Bunch of txpool tests. * Migrate transaction queue tests. * Nonce Cap * Nonce cap cache and tests. * Remove stale future transactions from the queue. * Optimize scoring and write some tests. * Simple penalization. * Clean up and support for different scoring algorithms. * Add CLI parameters for the new queue. * Remove banning queue. * Disable debug build. * Change per_sender limit to be 1% instead of 5% * Avoid cloning when propagating transactions. * Remove old todo. * Post-review fixes. * Fix miner options default. * Implement back ready transactions for light client. * Get rid of from_pending_block * Pass rejection reason. * Add more details to drop. * Rollback heap size of. * Avoid cloning hashes when propagating and include more details on rejection. * Fix tests. * Introduce nonces cache. * Remove uneccessary hashes allocation. * Lower the mem limit. * Re-enable parallel verification. * Add miner log. Don't check the type if not below min_gas_price. * Add more traces, fix disabling miner. * Fix creating pending blocks twice on AuRa authorities. * Fix tests. * re-use pending blocks in AuRa * Use reseal_min_period to prevent too frequent update_sealing. * Fix log to contain hash not sender. * Optimize local transactions. * Fix aura tests. * Update locks comments. * Get rid of unsafe Sync impl. * Review fixes. * Remove excessive matches. * Fix compilation errors. * Use new pool in private transactions. * Fix private-tx test. * Fix secret store tests. * Actually use gas_floor_target * Fix config tests. * Fix pool tests. * Address grumbles.
2018-04-13 17:34:27 +02:00
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Notifier for new transaction hashes.
use std::fmt;
use std::sync::Arc;
use ethereum_types::H256;
use txpool::{self, VerifiedTransaction};
use pool::VerifiedTransaction as Transaction;
type Listener = Box<Fn(&[H256]) + Send + Sync>;
/// Manages notifications to pending transaction listeners.
#[derive(Default)]
pub struct Notifier {
listeners: Vec<Listener>,
pending: Vec<H256>,
}
impl fmt::Debug for Notifier {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Notifier")
.field("listeners", &self.listeners.len())
.field("pending", &self.pending)
.finish()
}
}
impl Notifier {
/// Add new listener to receive notifications.
pub fn add(&mut self, f: Listener) {
self.listeners.push(f)
}
/// Notify listeners about all currently pending transactions.
pub fn notify(&mut self) {
for l in &self.listeners {
(l)(&self.pending);
}
self.pending.clear();
}
}
impl txpool::Listener<Transaction> for Notifier {
fn added(&mut self, tx: &Arc<Transaction>, _old: Option<&Arc<Transaction>>) {
self.pending.push(*tx.hash());
}
}
/// Transaction pool logger.
#[derive(Default, Debug)]
pub struct Logger;
impl txpool::Listener<Transaction> for Logger {
fn added(&mut self, tx: &Arc<Transaction>, old: Option<&Arc<Transaction>>) {
debug!(target: "txqueue", "[{:?}] Added to the pool.", tx.hash());
debug!(
target: "txqueue",
"[{hash:?}] Sender: {sender}, nonce: {nonce}, gasPrice: {gas_price}, gas: {gas}, value: {value}, dataLen: {data}))",
hash = tx.hash(),
sender = tx.sender(),
nonce = tx.signed().nonce,
gas_price = tx.signed().gas_price,
gas = tx.signed().gas,
value = tx.signed().value,
data = tx.signed().data.len(),
);
if let Some(old) = old {
debug!(target: "txqueue", "[{:?}] Dropped. Replaced by [{:?}]", old.hash(), tx.hash());
}
}
fn rejected(&mut self, _tx: &Arc<Transaction>, reason: &txpool::ErrorKind) {
trace!(target: "txqueue", "Rejected {}.", reason);
}
fn dropped(&mut self, tx: &Arc<Transaction>, new: Option<&Transaction>) {
match new {
Some(new) => debug!(target: "txqueue", "[{:?}] Pushed out by [{:?}]", tx.hash(), new.hash()),
None => debug!(target: "txqueue", "[{:?}] Dropped.", tx.hash()),
}
}
fn invalid(&mut self, tx: &Arc<Transaction>) {
debug!(target: "txqueue", "[{:?}] Marked as invalid by executor.", tx.hash());
}
fn canceled(&mut self, tx: &Arc<Transaction>) {
debug!(target: "txqueue", "[{:?}] Canceled by the user.", tx.hash());
}
fn mined(&mut self, tx: &Arc<Transaction>) {
debug!(target: "txqueue", "[{:?}] Mined.", tx.hash());
}
}
#[cfg(test)]
mod tests {
use super::*;
use parking_lot::Mutex;
use transaction;
use txpool::Listener;
#[test]
fn should_notify_listeners() {
// given
let received = Arc::new(Mutex::new(vec![]));
let r = received.clone();
let listener = Box::new(move |hashes: &[H256]| {
*r.lock() = hashes.iter().map(|x| *x).collect();
});
let mut tx_listener = Notifier::default();
tx_listener.add(listener);
// when
let tx = new_tx();
tx_listener.added(&tx, None);
assert_eq!(*received.lock(), vec![]);
// then
tx_listener.notify();
assert_eq!(
*received.lock(),
vec!["13aff4201ac1dc49daf6a7cf07b558ed956511acbaabf9502bdacc353953766d".parse().unwrap()]
);
}
fn new_tx() -> Arc<Transaction> {
let signed = transaction::Transaction {
action: transaction::Action::Create,
data: vec![1, 2, 3],
nonce: 5.into(),
gas: 21_000.into(),
gas_price: 5.into(),
value: 0.into(),
}.fake_sign(5.into());
Arc::new(Transaction::from_pending_block_transaction(signed))
}
}