Executable queue for ancient blocks inclusion (#208)

* Executable queue for ancient blocks inclusion
* Add drop trait for client
* Added shutdown to tests
* Remove doubled call
* Use reth-util from reth repo
This commit is contained in:
rakita
2021-03-25 16:04:32 +01:00
committed by GitHub
parent 85391f99ac
commit c9190a39ed
11 changed files with 110 additions and 55 deletions

View File

@@ -34,7 +34,6 @@ use blockchain::{
use bytes::{Bytes, ToPretty};
use call_contract::CallContract;
use db::{DBTransaction, DBValue, KeyValueDB};
use error::Error;
use ethcore_miner::pool::VerifiedTransaction;
use ethereum_types::{Address, H256, H264, U256};
use hash::keccak;
@@ -80,8 +79,8 @@ use engines::{
MAX_UNCLE_AGE,
};
use error::{
BlockError, CallError, Error as EthcoreError, ErrorKind as EthcoreErrorKind, EthcoreResult,
ExecutionError, ImportErrorKind,
BlockError, CallError, Error, Error as EthcoreError, ErrorKind as EthcoreErrorKind,
EthcoreResult, ExecutionError, ImportErrorKind, QueueErrorKind,
};
use executive::{contract_address, Executed, Executive, TransactOptions};
use factory::{Factories, VmFactory};
@@ -105,14 +104,14 @@ use vm::Schedule;
// re-export
pub use blockchain::CacheSize as BlockChainCacheSize;
use db::{keys::BlockDetails, Readable, Writable};
pub use reth_util::queue::ExecutionQueue;
pub use types::{block_status::BlockStatus, blockchain_info::BlockChainInfo};
pub use verification::QueueInfo as BlockQueueInfo;
use_contract!(registry, "res/contracts/registrar.json");
const MAX_ANCIENT_BLOCKS_QUEUE_SIZE: usize = 4096;
const ANCIENT_BLOCKS_QUEUE_SIZE: usize = 4096;
// Max number of blocks imported at once.
const MAX_ANCIENT_BLOCKS_TO_IMPORT: usize = 4;
const ANCIENT_BLOCKS_BATCH_SIZE: usize = 4;
const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2;
const MIN_HISTORY_SIZE: u64 = 8;
@@ -235,10 +234,9 @@ pub struct Client {
/// Queued transactions from IO
queue_transactions: IoChannelQueue,
/// Ancient blocks import queue
queue_ancient_blocks: IoChannelQueue,
/// Queued ancient blocks, make sure they are imported in order.
queued_ancient_blocks: Arc<RwLock<(HashSet<H256>, VecDeque<(Unverified, Bytes)>)>>,
ancient_blocks_import_lock: Arc<Mutex<()>>,
queued_ancient_blocks: Arc<RwLock<HashSet<H256>>>,
queued_ancient_blocks_executer: Mutex<Option<ExecutionQueue<(Unverified, Bytes)>>>,
/// Consensus messages import queue
queue_consensus_message: IoChannelQueue,
@@ -973,9 +971,8 @@ impl Client {
io_channel: RwLock::new(message_channel),
notify: RwLock::new(Vec::new()),
queue_transactions: IoChannelQueue::new(config.transaction_verification_queue_size),
queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE),
queued_ancient_blocks: Default::default(),
ancient_blocks_import_lock: Default::default(),
queued_ancient_blocks_executer: Default::default(),
queue_consensus_message: IoChannelQueue::new(usize::max_value()),
last_hashes: RwLock::new(VecDeque::new()),
factories,
@@ -987,6 +984,44 @@ impl Client {
config,
});
let exec_client = client.clone();
let queued = client.queued_ancient_blocks.clone();
let queued_ancient_blocks_executer = ExecutionQueue::new(
ANCIENT_BLOCKS_QUEUE_SIZE,
ANCIENT_BLOCKS_BATCH_SIZE,
move |ancient_block: Vec<(Unverified, Bytes)>| {
trace_time!("import_ancient_block");
for (unverified, receipts_bytes) in ancient_block {
let hash = unverified.hash();
if !exec_client.chain.read().is_known(&unverified.parent_hash()) {
queued.write().remove(&hash);
continue;
}
let result = exec_client.importer.import_old_block(
unverified,
&receipts_bytes,
&**exec_client.db.read().key_value(),
&*exec_client.chain.read(),
);
if let Err(e) = result {
error!(target: "client", "Error importing ancient block: {}", e);
let mut queued = queued.write();
queued.clear();
}
// remove from pending
queued.write().remove(&hash);
}
},
"ancient_block_exec",
);
client
.queued_ancient_blocks_executer
.lock()
.replace(queued_ancient_blocks_executer);
// prune old states.
{
let state_db = client.state_db.read().boxed_clone();
@@ -1034,6 +1069,15 @@ impl Client {
Ok(client)
}
/// signals shutdown of application. We do cleanup here.
pub fn shutdown(&self) {
let mut abe = self.queued_ancient_blocks_executer.lock();
if abe.is_some() {
abe.as_mut().unwrap().end()
}
*abe = None;
}
/// Wakes up client if it's a sleep.
pub fn keep_alive(&self) {
let should_wake = match *self.mode.lock() {
@@ -1866,6 +1910,12 @@ impl StateClient for Client {
}
}
impl Drop for Client {
fn drop(&mut self) {
self.shutdown()
}
}
impl Call for Client {
type State = State<::state_db::StateDB>;
@@ -2714,7 +2764,7 @@ impl IoClient for Client {
let parent_hash = unverified.parent_hash();
// NOTE To prevent race condition with import, make sure to check queued blocks first
// (and attempt to acquire lock)
let is_parent_pending = self.queued_ancient_blocks.read().0.contains(&parent_hash);
let is_parent_pending = self.queued_ancient_blocks.read().contains(&parent_hash);
if !is_parent_pending && !self.chain.read().is_known(&parent_hash) {
bail!(EthcoreErrorKind::Block(BlockError::UnknownParent(
parent_hash
@@ -2722,49 +2772,33 @@ impl IoClient for Client {
}
}
// we queue blocks here and trigger an IO message.
// we queue blocks here and trigger an Executer.
{
let mut queued = self.queued_ancient_blocks.write();
queued.0.insert(hash);
queued.1.push_back((unverified, receipts_bytes));
queued.insert(hash);
}
let queued = self.queued_ancient_blocks.clone();
let lock = self.ancient_blocks_import_lock.clone();
self.queue_ancient_blocks
.queue(&self.io_channel.read(), 1, move |client| {
trace_time!("import_ancient_block");
// Make sure to hold the lock here to prevent importing out of order.
// We use separate lock, cause we don't want to block queueing.
let _lock = lock.lock();
for _i in 0..MAX_ANCIENT_BLOCKS_TO_IMPORT {
let first = queued.write().1.pop_front();
if let Some((unverified, receipts_bytes)) = first {
let hash = unverified.hash();
let result = client.importer.import_old_block(
unverified,
&receipts_bytes,
&**client.db.read().key_value(),
&*client.chain.read(),
);
if let Err(e) = result {
error!(target: "client", "Error importing ancient block: {}", e);
let mut queued = queued.write();
queued.0.clear();
queued.1.clear();
}
// remove from pending
queued.write().0.remove(&hash);
} else {
break;
}
// see content of executer in Client::new()
match self.queued_ancient_blocks_executer.lock().as_ref() {
Some(queue) => {
if !queue.enqueue((unverified, receipts_bytes)) {
bail!(EthcoreErrorKind::Queue(QueueErrorKind::Full(
ANCIENT_BLOCKS_QUEUE_SIZE
)));
}
})?;
}
None => (),
}
Ok(hash)
}
fn ancient_block_queue_fullness(&self) -> f32 {
match self.queued_ancient_blocks_executer.lock().as_ref() {
Some(queue) => queue.len() as f32 / ANCIENT_BLOCKS_QUEUE_SIZE as f32,
None => 1.0, //return 1.0 if queue is not set
}
}
fn queue_consensus_message(&self, message: Bytes) {
match self
.queue_consensus_message

View File

@@ -1087,6 +1087,10 @@ impl IoClient for TestBlockChainClient {
self.miner.import_external_transactions(self, txs);
}
fn ancient_block_queue_fullness(&self) -> f32 {
0.0
}
fn queue_ancient_block(&self, unverified: Unverified, _r: Bytes) -> EthcoreResult<H256> {
self.import_block(unverified)
}

View File

@@ -220,6 +220,9 @@ pub trait IoClient: Sync + Send {
receipts_bytes: Bytes,
) -> EthcoreResult<H256>;
/// Return percentage of how full is queue that handles ancient blocks. 0 if empty, 1 if full.
fn ancient_block_queue_fullness(&self) -> f32;
/// Queue conensus engine message.
fn queue_consensus_message(&self, message: Bytes);
}

View File

@@ -232,6 +232,7 @@ pub fn json_chain_test<H: FnMut(&str, HookType)>(
client.chain_info().best_block_hash == blockchain.best_block.into()
&& post_state_success,
);
client.shutdown()
}
}

View File

@@ -52,6 +52,7 @@ extern crate parking_lot;
extern crate patricia_trie_ethereum as ethtrie;
extern crate rand;
extern crate rayon;
extern crate reth_util;
extern crate rlp;
extern crate rustc_hex;
extern crate serde;

View File

@@ -369,7 +369,7 @@ fn recover_aborted_recovery() {
}
e => panic!("Snapshot restoration must be ongoing ; {:?}", e),
}
// abort restoration so that we can delete snapshot root folder
service.abort_restore();
// Remove the snapshot directory, and restart the restoration

View File

@@ -243,5 +243,6 @@ fn can_trace_block_and_uncle_reward() {
// Test1. Check block filter
let traces = client.block_traces(BlockId::Number(3));
client.shutdown();
assert_eq!(traces.unwrap().len(), 3);
}