Make InstantSeal Instant again (#11186)

* Make InstantSeal Instant again

* update_sealing if there are transactions in pool after impoerting a block, some line formatting

* Apply suggestions from code review

Co-Authored-By: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* InstantSeal specific behaviour

* introduce engine.should_reseal_on_update, remove InstantSealService

* remove unused code

* add force param to update_sealing

* better docc

* even better docs

* revert code changes, doc corrections, sort dep

* code optimization

* fix test

* fix bench
This commit is contained in:
Seun LanLege
2019-11-10 11:41:31 +01:00
committed by Andronik Ordian
parent 8adde605e9
commit 887aa62fdb
16 changed files with 134 additions and 66 deletions

View File

@@ -78,7 +78,8 @@ use client_traits::{
StateClient,
StateOrBlock,
Tick,
TransactionInfo
TransactionInfo,
ForceUpdateSealing
};
use db::{keys::BlockDetails, Readable, Writable};
use engine::Engine;
@@ -2383,7 +2384,9 @@ impl ImportSealedBlock for Client {
let raw = block.rlp_bytes();
let header = block.header.clone();
let hash = header.hash();
self.notify(|n| n.block_pre_import(&raw, &hash, header.difficulty()));
self.notify(|n| {
n.block_pre_import(&raw, &hash, header.difficulty())
});
let route = {
// Do a super duper basic verification to detect potential bugs
@@ -2471,19 +2474,22 @@ impl ::miner::TransactionVerifierClient for Client {}
impl ::miner::BlockChainClient for Client {}
impl client_traits::EngineClient for Client {
fn update_sealing(&self) {
self.importer.miner.update_sealing(self)
fn update_sealing(&self, force: ForceUpdateSealing) {
self.importer.miner.update_sealing(self, force)
}
fn submit_seal(&self, block_hash: H256, seal: Vec<Bytes>) {
let import = self.importer.miner.submit_seal(block_hash, seal).and_then(|block| self.import_sealed_block(block));
let import = self.importer.miner.submit_seal(block_hash, seal)
.and_then(|block| self.import_sealed_block(block));
if let Err(err) = import {
warn!(target: "poa", "Wrong internal seal submission! {:?}", err);
}
}
fn broadcast_consensus_message(&self, message: Bytes) {
self.notify(|notify| notify.broadcast(ChainMessageType::Consensus(message.clone())));
self.notify(|notify| {
notify.broadcast(ChainMessageType::Consensus(message.clone()))
});
}
fn epoch_transition_for(&self, parent_hash: H256) -> Option<EpochTransition> {
@@ -2622,13 +2628,21 @@ impl ImportExportBlocks for Client {
if i % 10000 == 0 {
info!("#{}", i);
}
let b = self.block(BlockId::Number(i)).ok_or("Error exporting incomplete chain")?.into_inner();
let b = self.block(BlockId::Number(i))
.ok_or("Error exporting incomplete chain")?
.into_inner();
match format {
DataFormat::Binary => {
out.write(&b).map_err(|e| format!("Couldn't write to stream. Cause: {}", e))?;
out.write(&b)
.map_err(|e| {
format!("Couldn't write to stream. Cause: {}", e)
})?;
}
DataFormat::Hex => {
out.write_fmt(format_args!("{}\n", b.pretty())).map_err(|e| format!("Couldn't write to stream. Cause: {}", e))?;
out.write_fmt(format_args!("{}\n", b.pretty()))
.map_err(|e| {
format!("Couldn't write to stream. Cause: {}", e)
})?;
}
}
}
@@ -2648,7 +2662,10 @@ impl ImportExportBlocks for Client {
let format = match format {
Some(format) => format,
None => {
first_read = source.read(&mut first_bytes).map_err(|_| "Error reading from the file/stream.")?;
first_read = source.read(&mut first_bytes)
.map_err(|_| {
"Error reading from the file/stream."
})?;
match first_bytes[0] {
0xf9 => DataFormat::Binary,
_ => DataFormat::Hex,
@@ -2659,7 +2676,9 @@ impl ImportExportBlocks for Client {
let do_import = |bytes: Vec<u8>| {
let block = Unverified::from_rlp(bytes).map_err(|_| "Invalid block rlp")?;
let number = block.header.number();
while self.queue_info().is_full() { std::thread::sleep(Duration::from_secs(1)); }
while self.queue_info().is_full() {
std::thread::sleep(Duration::from_secs(1));
}
match self.import_block(block) {
Err(EthcoreError::Import(ImportError::AlreadyInChain)) => {
trace!("Skipping block #{}: already in chain.", number);
@@ -2680,33 +2699,45 @@ impl ImportExportBlocks for Client {
} else {
let mut bytes = vec![0; READAHEAD_BYTES];
let n = source.read(&mut bytes)
.map_err(|err| format!("Error reading from the file/stream: {:?}", err))?;
.map_err(|err| {
format!("Error reading from the file/stream: {:?}", err)
})?;
(bytes, n)
};
if n == 0 { break; }
first_read = 0;
let s = PayloadInfo::from(&bytes)
.map_err(|e| format!("Invalid RLP in the file/stream: {:?}", e))?.total();
.map_err(|e| {
format!("Invalid RLP in the file/stream: {:?}", e)
})?.total();
bytes.resize(s, 0);
source.read_exact(&mut bytes[n..])
.map_err(|err| format!("Error reading from the file/stream: {:?}", err))?;
.map_err(|err| {
format!("Error reading from the file/stream: {:?}", err)
})?;
do_import(bytes)?;
}
}
DataFormat::Hex => {
for line in BufReader::new(source).lines() {
let s = line
.map_err(|err| format!("Error reading from the file/stream: {:?}", err))?;
.map_err(|err| {
format!("Error reading from the file/stream: {:?}", err)
})?;
let s = if first_read > 0 {
from_utf8(&first_bytes)
.map_err(|err| format!("Invalid UTF-8: {:?}", err))?
.map_err(|err| {
format!("Invalid UTF-8: {:?}", err)
})?
.to_owned() + &(s[..])
} else {
s
};
first_read = 0;
let bytes = s.from_hex()
.map_err(|err| format!("Invalid hex in file/stream: {:?}", err))?;
.map_err(|err| {
format!("Invalid hex in file/stream: {:?}", err)
})?;
do_import(bytes)?;
}
}

View File

@@ -58,7 +58,7 @@ use using_queue::{UsingQueue, GetAction};
use block::{ClosedBlock, SealedBlock};
use client::{BlockProducer, SealedBlockImporter, Client};
use client_traits::{BlockChain, ChainInfo, EngineClient, Nonce, TransactionInfo};
use client_traits::{BlockChain, ChainInfo, Nonce, TransactionInfo, EngineClient, ForceUpdateSealing};
use engine::{Engine, signer::EngineSigner};
use machine::executive::contract_address;
use spec::Spec;
@@ -294,6 +294,7 @@ impl Miner {
let tx_queue_strategy = options.tx_queue_strategy;
let nonce_cache_size = cmp::max(4096, limits.max_count / 4);
let refuse_service_transactions = options.refuse_service_transactions;
let engine = spec.engine.clone();
Miner {
sealing: Mutex::new(SealingWork {
@@ -312,7 +313,7 @@ impl Miner {
options,
transaction_queue: Arc::new(TransactionQueue::new(limits, verifier_options, tx_queue_strategy)),
accounts: Arc::new(accounts),
engine: spec.engine.clone(),
engine,
io_channel: RwLock::new(None),
service_transaction_checker: if refuse_service_transactions {
None
@@ -865,12 +866,12 @@ impl Miner {
match self.engine.sealing_state() {
SealingState::Ready => {
self.maybe_enable_sealing();
self.update_sealing(chain)
self.update_sealing(chain, ForceUpdateSealing::No);
}
SealingState::External => {
// this calls `maybe_enable_sealing()`
if self.prepare_pending_block(chain) == BlockPreparationStatus::NotPrepared {
self.update_sealing(chain)
self.update_sealing(chain, ForceUpdateSealing::No);
}
}
SealingState::NotReady => { self.maybe_enable_sealing(); },
@@ -1263,14 +1264,16 @@ impl miner::MinerService for Miner {
/// Update sealing if required.
/// Prepare the block and work if the Engine does not seal internally.
fn update_sealing<C>(&self, chain: &C) where
fn update_sealing<C>(&self, chain: &C, force: ForceUpdateSealing) where
C: BlockChain + CallContract + BlockProducer + SealedBlockImporter + Nonce + Sync,
{
trace!(target: "miner", "update_sealing");
// Do nothing if reseal is not required,
// Do nothing if we don't want to force update_sealing and reseal is not required.
// but note that `requires_reseal` updates internal state.
if !self.requires_reseal(chain.chain_info().best_block_number) {
if force == ForceUpdateSealing::No &&
!self.requires_reseal(chain.chain_info().best_block_number)
{
return;
}
@@ -1305,13 +1308,14 @@ impl miner::MinerService for Miner {
if self.seal_and_import_block_internally(chain, block) {
trace!(target: "miner", "update_sealing: imported internally sealed block");
}
return
},
SealingState::NotReady => unreachable!("We returned right after sealing_state was computed. qed."),
SealingState::External => {
trace!(target: "miner", "update_sealing: engine does not seal internally, preparing work");
self.prepare_work(block, original_work_hash)
self.prepare_work(block, original_work_hash);
},
}
};
}
fn is_currently_sealing(&self) -> bool {
@@ -1423,7 +1427,7 @@ impl miner::MinerService for Miner {
// | NOTE Code below requires sealing locks. |
// | Make sure to release the locks before calling that method. |
// --------------------------------------------------------------------------
self.update_sealing(chain);
self.update_sealing(chain, ForceUpdateSealing::No);
}
}
@@ -1441,7 +1445,6 @@ impl miner::MinerService for Miner {
let engine = self.engine.clone();
let accounts = self.accounts.clone();
let service_transaction_checker = self.service_transaction_checker.clone();
let cull = move |chain: &Client| {
let client = PoolClient::new(
chain,
@@ -1451,8 +1454,9 @@ impl miner::MinerService for Miner {
service_transaction_checker.as_ref(),
);
queue.cull(client);
if is_internal_import {
chain.update_sealing();
if engine.should_reseal_on_update() {
// force update_sealing here to skip `reseal_required` checks
chain.update_sealing(ForceUpdateSealing::Yes);
}
};
@@ -1461,8 +1465,9 @@ impl miner::MinerService for Miner {
}
} else {
self.transaction_queue.cull(client);
if is_internal_import {
self.update_sealing(chain);
if self.engine.should_reseal_on_update() {
// force update_sealing here to skip `reseal_required` checks
self.update_sealing(chain, ForceUpdateSealing::Yes);
}
}
}
@@ -1793,7 +1798,7 @@ mod tests {
).pop().unwrap();
assert_eq!(import.unwrap(), ());
miner.update_sealing(&*client);
miner.update_sealing(&*client, ForceUpdateSealing::No);
client.flush_queue();
assert!(miner.pending_block(0).is_none());
assert_eq!(client.chain_info().best_block_number, 3 as BlockNumber);
@@ -1803,7 +1808,7 @@ mod tests {
PendingTransaction::new(transaction_with_chain_id(spec.chain_id()).into(), None)
).is_ok());
miner.update_sealing(&*client);
miner.update_sealing(&*client, ForceUpdateSealing::No);
client.flush_queue();
assert!(miner.pending_block(0).is_none());
assert_eq!(client.chain_info().best_block_number, 4 as BlockNumber);
@@ -1831,7 +1836,7 @@ mod tests {
let miner = Miner::new_for_tests(&spec, None);
let client = generate_dummy_client(2);
miner.update_sealing(&*client);
miner.update_sealing(&*client, ForceUpdateSealing::No);
assert!(miner.is_currently_sealing());
}
@@ -1842,7 +1847,7 @@ mod tests {
let miner = Miner::new_for_tests(&spec, None);
let client = generate_dummy_client(2);
miner.update_sealing(&*client);
miner.update_sealing(&*client, ForceUpdateSealing::No);
assert!(!miner.is_currently_sealing());
}
@@ -1853,7 +1858,7 @@ mod tests {
let miner = Miner::new_for_tests(&spec, None);
let client = generate_dummy_client(2);
miner.update_sealing(&*client);
miner.update_sealing(&*client, ForceUpdateSealing::No);
assert!(!miner.is_currently_sealing());
}
@@ -1872,7 +1877,7 @@ mod tests {
miner.add_work_listener(Box::new(DummyNotifyWork));
let client = generate_dummy_client(2);
miner.update_sealing(&*client);
miner.update_sealing(&*client, ForceUpdateSealing::No);
assert!(miner.is_currently_sealing());
}

View File

@@ -47,7 +47,7 @@ use types::{
use call_contract::CallContract;
use registrar::RegistrarClient;
use client_traits::{BlockChain, ChainInfo, AccountData, Nonce, ScheduleInfo};
use client_traits::{BlockChain, ChainInfo, AccountData, Nonce, ScheduleInfo, ForceUpdateSealing};
use account_state::state::StateInfo;
use crate::{
@@ -87,7 +87,7 @@ pub trait MinerService : Send + Sync {
where C: BlockChain + CallContract + BlockProducer + SealedBlockImporter + Nonce + Sync;
/// Update current pending block
fn update_sealing<C>(&self, chain: &C)
fn update_sealing<C>(&self, chain: &C, force: ForceUpdateSealing)
where C: BlockChain + CallContract + BlockProducer + SealedBlockImporter + Nonce + Sync;
// Notifications

View File

@@ -72,7 +72,7 @@ use client::{
use client_traits::{
BlockInfo, Nonce, Balance, ChainInfo, TransactionInfo, BlockChainClient, ImportBlock,
AccountData, BlockChain, IoClient, BadBlocks, ScheduleInfo, StateClient, ProvingBlockChainClient,
StateOrBlock
StateOrBlock, ForceUpdateSealing
};
use engine::Engine;
use machine::executed::Executed;
@@ -963,8 +963,8 @@ impl ProvingBlockChainClient for TestBlockChainClient {
}
impl client_traits::EngineClient for TestBlockChainClient {
fn update_sealing(&self) {
self.miner.update_sealing(self)
fn update_sealing(&self, force: ForceUpdateSealing) {
self.miner.update_sealing(self, force)
}
fn submit_seal(&self, block_hash: H256, seal: Vec<Bytes>) {