Improve logging and cleanup in miner around block sealing (#10745)

* Stop breaking out of loop if a non-canonical hash is found

* include expected hash in log msg

* More logging

* Scope

* Syntax

* Log in blank RollingFinality
Escalate bad proposer to warning

* Check validator set size: warn if 1 or even number

* More readable code

* Use SimpleList::new

* Extensive logging on unexpected non-canonical hash

* Wording

* wip

* Update ethcore/blockchain/src/blockchain.rs

Co-Authored-By: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* Improved logging, address grumbles

* Update ethcore/src/engines/validator_set/simple_list.rs

Co-Authored-By: Luke Schoen <ltfschoen@users.noreply.github.com>

* Report benign misbehaviour iff currently a validator

* Report malicious behaviour iff we're a validator

* Escalate to warning and fix wording

* Test reporting behaviour
Don't require node to be part of the validator set to report malicious behaviour

* Include missing parent hash in MissingParent error

* Update ethcore/src/engines/validator_set/simple_list.rs

Co-Authored-By: Luke Schoen <ltfschoen@users.noreply.github.com>

* docs

* remove unneeded into()
Move check for parent_step == step for clarity&efficiency
Remove dead code for Seal::Proposal

* typo

* Wording

* naming

* WIP

* cleanup

* cosmetics

* cosmetics and one less lvar

* spelling

* Better loggin when a block is already in chain

* More logging

* On second thought non-validators are allowed to report

* cleanup

* remove dead code

* Keep track of the hash of the last imported block

* Let it lock

* Serialize access to block sealing

* Take a lock while sealing a block

* Cleanup

* whitespace
This commit is contained in:
David 2019-07-04 18:03:22 +02:00 committed by GitHub
parent fafb534cd3
commit de906d4afd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 153 additions and 120 deletions

View File

@ -755,8 +755,8 @@ impl BlockChain {
Some(TreeRoute {
blocks: from_branch,
ancestor: current_from,
index: index,
is_from_route_finalized: is_from_route_finalized,
index,
is_from_route_finalized,
})
}

View File

@ -178,8 +178,7 @@ impl<'x> OpenBlock<'x> {
) -> Result<Self, Error> {
let number = parent.number() + 1;
let state = State::from_existing(db, parent.state_root().clone(), engine.account_start_nonce(number), factories)?;
let mut r = OpenBlock { block: ExecutedBlock::new(state, last_hashes, tracing), engine, parent: parent.clone() };
let mut r = OpenBlock { block: ExecutedBlock::new(state, last_hashes, tracing), engine, parent: parent.clone()};
r.block.header.set_parent_hash(parent.hash());
r.block.header.set_number(number);

View File

@ -473,7 +473,16 @@ impl Importer {
//
// The header passed is from the original block data and is sealed.
// TODO: should return an error if ImportRoute is none, issue #9910
fn commit_block<B>(&self, block: B, header: &Header, block_data: encoded::Block, pending: Option<PendingTransition>, client: &Client) -> ImportRoute where B: Drain {
fn commit_block<B>(
&self,
block: B,
header: &Header,
block_data: encoded::Block,
pending: Option<PendingTransition>,
client: &Client
) -> ImportRoute
where B: Drain
{
let hash = &header.hash();
let number = header.number();
let parent = header.parent_hash();
@ -499,18 +508,17 @@ impl Importer {
};
let best = {
let hash = best_hash;
let header = chain.block_header_data(&hash)
let header = chain.block_header_data(&best_hash)
.expect("Best block is in the database; qed")
.decode()
.expect("Stored block header is valid RLP; qed");
let details = chain.block_details(&hash)
let details = chain.block_details(&best_hash)
.expect("Best block is in the database; qed");
ExtendedHeader {
parent_total_difficulty: details.total_difficulty - *header.difficulty(),
is_finalized: details.is_finalized,
header: header,
header,
}
};
@ -548,7 +556,7 @@ impl Importer {
}).collect();
let route = chain.insert_block(&mut batch, block_data, receipts.clone(), ExtrasInsert {
fork_choice: fork_choice,
fork_choice,
is_finalized,
});
@ -2410,11 +2418,11 @@ impl ImportSealedBlock for Client {
let _import_lock = self.importer.import_lock.lock();
trace_time!("import_sealed_block");
let block_data = block.rlp_bytes();
let block_bytes = block.rlp_bytes();
let pending = self.importer.check_epoch_end_signal(
&header,
&block_data,
&block_bytes,
&block.receipts,
block.state.db(),
self
@ -2422,7 +2430,7 @@ impl ImportSealedBlock for Client {
let route = self.importer.commit_block(
block,
&header,
encoded::Block::new(block_data),
encoded::Block::new(block_bytes),
pending,
self
);

View File

@ -247,7 +247,7 @@ impl EpochManager {
None => {
// this really should never happen unless the block passed
// hasn't got a parent in the database.
warn!(target: "engine", "No genesis transition found.");
warn!(target: "engine", "No genesis transition found. Block hash {} does not have a parent in the DB", hash);
return false;
}
};
@ -570,7 +570,7 @@ fn header_empty_steps_signers(header: &Header, empty_steps_transition: u64) -> R
fn step_proposer(validators: &dyn ValidatorSet, bh: &H256, step: u64) -> Address {
let proposer = validators.get(bh, step as usize);
trace!(target: "engine", "Fetched proposer for step {}: {}", step, proposer);
trace!(target: "engine", "step_proposer: Fetched proposer for step {}: {}", step, proposer);
proposer
}
@ -823,8 +823,8 @@ impl AuthorityRound {
if skipped_primary != me {
// Stop reporting once validators start repeating.
if !reported.insert(skipped_primary) { break; }
trace!(target: "engine", "Reporting benign misbehaviour (cause: skipped step) at block #{}, epoch set number {}. Own address: {}",
header.number(), set_number, me);
trace!(target: "engine", "Reporting benign misbehaviour (cause: skipped step) at block #{}, epoch set number {}, step proposer={:#x}. Own address: {}",
header.number(), set_number, skipped_primary, me);
self.validators.report_benign(&skipped_primary, set_number, header.number());
} else {
trace!(target: "engine", "Primary that skipped is self, not self-reporting. Own address: {}", me);
@ -1110,12 +1110,12 @@ impl Engine for AuthorityRound {
// filter messages from old and future steps and different parents
let empty_steps = if header.number() >= self.empty_steps_transition {
self.empty_steps(parent_step.into(), step.into(), *header.parent_hash())
self.empty_steps(parent_step, step, *header.parent_hash())
} else {
Vec::new()
};
let expected_diff = calculate_score(parent_step, step.into(), empty_steps.len().into());
let expected_diff = calculate_score(parent_step, step, empty_steps.len());
if header.difficulty() != &expected_diff {
debug!(target: "engine", "Aborting seal generation. The step or empty_steps have changed in the meantime. {:?} != {:?}",
@ -1123,12 +1123,17 @@ impl Engine for AuthorityRound {
return Seal::None;
}
if parent_step > step.into() {
if parent_step > step {
warn!(target: "engine", "Aborting seal generation for invalid step: {} > {}", parent_step, step);
return Seal::None;
} else if parent_step == step {
// this is guarded against by `can_propose` unless the block was signed
// on the same step (implies same key) and on a different node.
warn!("Attempted to seal block on the same step as parent. Is this authority sealing with more than one node?");
return Seal::None;
}
let (validators, set_number) = match self.epoch_set(header) {
let (validators, epoch_transition_number) = match self.epoch_set(header) {
Err(err) => {
warn!(target: "engine", "Unable to generate seal: {}", err);
return Seal::None;
@ -1137,13 +1142,7 @@ impl Engine for AuthorityRound {
};
if is_step_proposer(&*validators, header.parent_hash(), step, header.author()) {
// this is guarded against by `can_propose` unless the block was signed
// on the same step (implies same key) and on a different node.
if parent_step == step {
warn!("Attempted to seal block on the same step as parent. Is this authority sealing with more than one node?");
return Seal::None;
}
trace!(target: "engine", "generate_seal: we are step proposer for step={}, block=#{}", step, header.number());
// if there are no transactions to include in the block, we don't seal and instead broadcast a signed
// `EmptyStep(step, parent_hash)` message. If we exceed the maximum amount of `empty_step` rounds we proceed
// with the seal.
@ -1152,6 +1151,7 @@ impl Engine for AuthorityRound {
empty_steps.len() < self.maximum_empty_steps {
if self.step.can_propose.compare_and_swap(true, false, AtomicOrdering::SeqCst) {
trace!(target: "engine", "generate_seal: generating empty step at step={}, block=#{}", step, header.number());
self.generate_empty_step(header.parent_hash());
}
@ -1178,7 +1178,8 @@ impl Engine for AuthorityRound {
// report any skipped primaries between the parent block and
// the block we're sealing, unless we have empty steps enabled
if header.number() < self.empty_steps_transition {
self.report_skipped(header, step, parent_step, &*validators, set_number);
trace!(target: "engine", "generate_seal: reporting misbehaviour for step={}, block=#{}", step, header.number());
self.report_skipped(header, step, parent_step, &*validators, epoch_transition_number);
}
let mut fields = vec![
@ -1189,7 +1190,7 @@ impl Engine for AuthorityRound {
if let Some(empty_steps_rlp) = empty_steps_rlp {
fields.push(empty_steps_rlp);
}
trace!(target: "engine", "generate_seal: returning Seal::Regular for step={}, block=#{}", step, header.number());
return Seal::Regular(fields);
}
} else {
@ -1199,7 +1200,7 @@ impl Engine for AuthorityRound {
trace!(target: "engine", "generate_seal: {} not a proposer for step {}.",
header.author(), step);
}
trace!(target: "engine", "generate_seal: returning Seal::None for step={}, block=#{}", step, header.number());
Seal::None
}
@ -1739,13 +1740,13 @@ mod tests {
engine.set_signer(Box::new((tap.clone(), addr1, "1".into())));
match engine.generate_seal(&b1, &genesis_header) {
Seal::None | Seal::Proposal(_) => panic!("wrong seal"),
Seal::None => panic!("wrong seal"),
Seal::Regular(_) => {
engine.step();
engine.set_signer(Box::new((tap.clone(), addr2, "0".into())));
match engine.generate_seal(&b2, &genesis_header) {
Seal::Regular(_) | Seal::Proposal(_) => panic!("sealed despite wrong difficulty"),
Seal::Regular(_) => panic!("sealed despite wrong difficulty"),
Seal::None => {}
}
}

View File

@ -156,8 +156,6 @@ impl error::Error for EngineError {
/// Seal type.
#[derive(Debug, PartialEq, Eq)]
pub enum Seal {
/// Proposal seal; should be broadcasted, but not inserted into blockchain.
Proposal(Vec<Bytes>),
/// Regular block seal; should be part of the blockchain.
Regular(Vec<Bytes>),
/// Engine does not generate seal for this block right now.

View File

@ -118,6 +118,7 @@ impl ValidatorSet for ValidatorContract {
}
fn report_benign(&self, address: &Address, _set_block: BlockNumber, block: BlockNumber) {
trace!(target: "engine", "validator set recording benign misbehaviour at block #{} by {:#x}", block, address);
let data = validator_report::functions::report_benign::encode_input(*address, block);
match self.transact(data) {
Ok(_) => warn!(target: "engine", "Reported benign validator misbehaviour {}", address),
@ -162,6 +163,7 @@ mod tests {
#[test]
fn reports_validators() {
let _ = ::env_logger::try_init();
let tap = Arc::new(AccountProvider::transient_provider());
let v1 = tap.insert_account(keccak("1").into(), &"".into()).unwrap();
let client = generate_dummy_client_with_spec(Spec::new_validator_contract);

View File

@ -74,6 +74,8 @@ impl ::engines::StateDependentProof for StateProof {
/// The validator contract should have the following interface:
pub struct ValidatorSafeContract {
contract_address: Address,
/// The LRU cache is indexed by the parent_hash, so given a hash, the value
/// is the validator set valid for the blocks following that hash.
validators: RwLock<MemoryLruCache<H256, SimpleList>>,
client: RwLock<Option<Weak<dyn EngineClient>>>, // TODO [keorn]: remove
}
@ -471,6 +473,7 @@ mod tests {
#[test]
fn knows_validators() {
let _ = env_logger::try_init();
let tap = Arc::new(AccountProvider::transient_provider());
let s0: Secret = keccak("1").into();
let v0 = tap.insert_account(s0.clone(), &"".into()).unwrap();

View File

@ -93,6 +93,7 @@ impl ValidatorSet for TestSet {
}
fn report_benign(&self, _validator: &Address, _set_block: BlockNumber, block: BlockNumber) {
trace!(target: "engine", "test validator set recording benign misbehaviour");
self.last_benign.store(block as usize, AtomicOrdering::SeqCst)
}
}

View File

@ -146,7 +146,7 @@ pub struct MinerOptions {
pub tx_queue_strategy: PrioritizationStrategy,
/// Simple senders penalization.
pub tx_queue_penalization: Penalization,
/// Do we want to mark transactions recieved locally (e.g. RPC) as local if we don't have the sending account?
/// Do we want to mark transactions received locally (e.g. RPC) as local if we don't have the sending account?
pub tx_queue_no_unfamiliar_locals: bool,
/// Do we refuse to accept service transactions even if sender is certified.
pub refuse_service_transactions: bool,
@ -231,6 +231,10 @@ impl SealingWork {
fn reseal_allowed(&self) -> bool {
Instant::now() > self.next_allowed_reseal
}
fn work_available(&self) -> bool {
self.queue.peek_last_ref().is_some()
}
}
/// Keeps track of transactions using priority queue and holds currently mined block.
@ -620,6 +624,7 @@ impl Miner {
trace!(target: "miner", "requires_reseal: sealing enabled");
const SEALING_TIMEOUT_IN_BLOCKS : u64 = 5;
// Disable sealing if there were no requests for SEALING_TIMEOUT_IN_BLOCKS
let had_requests = sealing.last_request.map(|last_request|
best_block.saturating_sub(last_request) <= SEALING_TIMEOUT_IN_BLOCKS
@ -655,8 +660,8 @@ impl Miner {
// TODO: (https://github.com/paritytech/parity-ethereum/issues/10407)
// This is only used in authority_round path, and should be refactored to merge with the other seal() path.
// Attempts to perform internal sealing (one that does not require work) and handles the result depending on the
// type of Seal.
// Attempts to perform internal sealing (one that does not require work: e.g. Clique
// and Aura) and handles the result depending on the type of Seal.
fn seal_and_import_block_internally<C>(&self, chain: &C, block: ClosedBlock) -> bool
where C: BlockChain + SealedBlockImporter,
{
@ -669,63 +674,55 @@ impl Miner {
return false
}
}
trace!(target: "miner", "seal_block_internally: attempting internal seal.");
let block_number = block.header.number();
trace!(target: "miner", "seal_block_internally: attempting internal seal for block #{}", block_number);
let parent_header = match chain.block_header(BlockId::Hash(*block.header.parent_hash())) {
Some(h) => {
match h.decode() {
Ok(decoded_hdr) => decoded_hdr,
Err(_) => return false
Err(e) => {
error!(target: "miner", "seal_block_internally: Block #{}, Could not decode header from parent block (hash={}): {:?}", block_number, block.header.parent_hash(), e);
return false
}
}
}
None => return false,
},
None => {
trace!(target: "miner", "Block #{}: Parent with hash={} does not exist in our DB", block_number, block.header.parent_hash());
return false
},
};
match self.engine.generate_seal(&block, &parent_header) {
// Save proposal for later seal submission and broadcast it.
Seal::Proposal(seal) => {
trace!(target: "miner", "Received a Proposal seal.");
{
let mut sealing = self.sealing.lock();
sealing.next_mandatory_reseal = Instant::now() + self.options.reseal_max_period;
sealing.queue.set_pending(block.clone());
sealing.queue.use_last_ref();
}
let sealing_result =
match self.engine.generate_seal(&block, &parent_header) {
// Directly import a regular sealed block.
Seal::Regular(seal) => {
trace!(target: "miner", "Block #{}: Received a Regular seal.", block_number);
{
let mut sealing = self.sealing.lock();
sealing.next_mandatory_reseal = Instant::now() + self.options.reseal_max_period;
}
block
.lock()
.seal(&*self.engine, seal)
.map(|sealed| {
chain.broadcast_proposal_block(sealed);
true
})
.unwrap_or_else(|e| {
warn!("ERROR: seal failed when given internally generated seal: {}", e);
false
})
},
// Directly import a regular sealed block.
Seal::Regular(seal) => {
trace!(target: "miner", "Received a Regular seal.");
{
let mut sealing = self.sealing.lock();
sealing.next_mandatory_reseal = Instant::now() + self.options.reseal_max_period;
}
block
.lock()
.seal(&*self.engine, seal)
.map(|sealed| {
chain.import_sealed_block(sealed).is_ok()
})
.unwrap_or_else(|e| {
warn!("ERROR: seal failed when given internally generated seal: {}", e);
false
})
},
Seal::None => false,
}
block
.lock()
.seal(&*self.engine, seal)
.map(|sealed| {
match chain.import_sealed_block(sealed) {
Ok(_) => true,
Err(e) => {
error!(target: "miner", "Block #{}: seal_and_import_block_internally: import_sealed_block returned {:?}", block_number, e);
false
}
}
})
.unwrap_or_else(|e| {
warn!("ERROR: Block #{}, importing sealed block failed when given internally generated seal: {}", block_number, e);
false
})
},
Seal::None => false,
};
sealing_result
}
/// Prepares work which has to be done to seal.
@ -793,28 +790,30 @@ impl Miner {
}
/// Prepare a pending block. Returns the preparation status.
fn prepare_pending_block<C>(&self, client: &C) -> BlockPreparationStatus where
C: BlockChain + CallContract + BlockProducer + SealedBlockImporter + Nonce + Sync,
/// Only used by externally sealing engines.
fn prepare_pending_block<C>(&self, client: &C) -> BlockPreparationStatus
where
C: BlockChain + CallContract + BlockProducer + SealedBlockImporter + Nonce + Sync,
{
trace!(target: "miner", "prepare_pending_block: entering");
let prepare_new = {
let mut sealing = self.sealing.lock();
let have_work = sealing.queue.peek_last_ref().is_some();
trace!(target: "miner", "prepare_pending_block: have_work={}", have_work);
if !have_work {
sealing.enabled = true;
true
} else {
false
}
};
// Unless we are `--force-sealing` we create pending blocks if
// 1. we have local pending transactions
// 2. or someone is requesting `eth_getWork`
// When either condition is true, `sealing.enabled` is flipped to true (and if you
// have no more local transactions or stop calling `eth_getWork`, it is set to `false`).
// Here we check if there are pending blocks already (if so, we don't need to create
// a new one); if there are none, we set `sealing.enabled` to true because the
// calling code expects it to be on (or they wouldn't have called this method).
// Yes, it's a bit convoluted.
let prepare_new_block = self.maybe_enable_sealing();
if self.engine.sealing_state() != SealingState::External {
trace!(target: "miner", "prepare_pending_block: engine not sealing externally; not preparing");
return BlockPreparationStatus::NotPrepared;
}
let preparation_status = if prepare_new {
let preparation_status = if prepare_new_block {
// --------------------------------------------------------------------------
// | NOTE Code below requires sealing locks. |
// | Make sure to release the locks before calling that method. |
@ -844,25 +843,34 @@ impl Miner {
preparation_status
}
/// Set `sealing.enabled` to true if there is available work to do (pending or in the queue).
fn maybe_enable_sealing(&self) -> bool {
let mut sealing = self.sealing.lock();
if !sealing.work_available() {
trace!(target: "miner", "maybe_enable_sealing: we have work to do so enabling sealing");
sealing.enabled = true;
true
} else {
false
}
}
/// Prepare pending block, check whether sealing is needed, and then update sealing.
fn prepare_and_update_sealing<C: miner::BlockChainClient>(&self, chain: &C) {
use miner::MinerService;
// Make sure to do it after transaction is imported and lock is dropped.
// We need to create pending block and enable sealing.
let sealing_state = self.engine.sealing_state();
if sealing_state == SealingState::Ready
|| self.prepare_pending_block(chain) == BlockPreparationStatus::NotPrepared {
// If new block has not been prepared (means we already had one)
// or Engine might be able to seal internally,
// we need to update sealing.
self.update_sealing(chain);
match self.engine.sealing_state() {
SealingState::Ready => self.update_sealing(chain),
SealingState::External => {
// this calls `maybe_enable_sealing()`
if self.prepare_pending_block(chain) == BlockPreparationStatus::NotPrepared {
self.update_sealing(chain)
}
}
SealingState::NotReady => { self.maybe_enable_sealing(); },
}
}
}
const SEALING_TIMEOUT_IN_BLOCKS : u64 = 5;
impl miner::MinerService for Miner {
type State = State<::state_db::StateDB>;
@ -1720,12 +1728,16 @@ mod tests {
#[test]
fn internal_seals_without_work() {
let _ = env_logger::try_init();
let spec = Spec::new_instant();
let miner = Miner::new_for_tests(&spec, None);
let client = generate_dummy_client(2);
let import = miner.import_external_transactions(&*client, vec![transaction_with_chain_id(spec.chain_id()).into()]).pop().unwrap();
let import = miner.import_external_transactions(
&*client,
vec![transaction_with_chain_id(spec.chain_id()).into()]
).pop().unwrap();
assert_eq!(import.unwrap(), ());
miner.update_sealing(&*client);
@ -1733,7 +1745,10 @@ mod tests {
assert!(miner.pending_block(0).is_none());
assert_eq!(client.chain_info().best_block_number, 3 as BlockNumber);
assert!(miner.import_own_transaction(&*client, PendingTransaction::new(transaction_with_chain_id(spec.chain_id()).into(), None)).is_ok());
assert!(miner.import_own_transaction(
&*client,
PendingTransaction::new(transaction_with_chain_id(spec.chain_id()).into(), None)
).is_ok());
miner.update_sealing(&*client);
client.flush_queue();

View File

@ -302,7 +302,7 @@ pub fn prove_transaction_virtual<H: AsHashDB<KeccakHasher, DBValue> + Send + Syn
/// Reverting a checkpoint with `revert_to_checkpoint` involves copying
/// original values from the latest checkpoint back into `cache`. The code
/// takes care not to overwrite cached storage while doing that.
/// checkpoint can be discarded with `discard_checkpoint`. All of the orignal
/// A checkpoint can be discarded with `discard_checkpoint`. All of the original
/// backed-up values are moved into a parent checkpoint (if any).
///
pub struct State<B> {

View File

@ -548,12 +548,18 @@ impl BlockDownloader {
let result = if let Some(receipts) = receipts {
io.chain().queue_ancient_block(block, receipts)
} else {
trace_sync!(self, "Importing block #{}/{}", number, h);
io.chain().import_block(block)
};
match result {
Err(EthcoreError::Import(ImportError::AlreadyInChain)) => {
trace_sync!(self, "Block already in chain {:?}", h);
let is_canonical = if io.chain().block_hash(BlockId::Number(number)).is_some() {
"canoncial"
} else {
"not canonical"
};
trace_sync!(self, "Block #{} is already in chain {:?} {}", number, h, is_canonical);
self.block_imported(&h, number, &parent);
},
Err(EthcoreError::Import(ImportError::AlreadyQueued)) => {

View File

@ -14,9 +14,9 @@
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
//! Queue-like datastructure including notion of usage.
//! Queue-like data structure including notion of usage.
/// Special queue-like datastructure that includes the notion of
/// Special queue-like data structure that includes the notion of
/// usage to avoid items that were queued but never used from making it into
/// the queue.
pub struct UsingQueue<T> {
@ -42,7 +42,7 @@ impl<T> UsingQueue<T> {
UsingQueue {
pending: None,
in_use: vec![],
max_size: max_size,
max_size,
}
}

View File

@ -64,7 +64,7 @@ pub trait JournalDB: KeyedHashDB {
fn latest_era(&self) -> Option<u64>;
/// Journal recent database operations as being associated with a given era and id.
// TODO: give the overlay to this function so journaldbs don't manage the overlays themeselves.
// TODO: give the overlay to this function so journaldbs don't manage the overlays themselves.
fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> io::Result<u32>;
/// Mark a given block as canonical, indicating that competing blocks' states may be pruned out.