Fixed AuthorityRound deadlock on shutdown, closes #8088 (#8803)

This commit is contained in:
Marek Kotewicz 2018-06-08 16:30:44 +02:00 committed by Afri Schoedon
parent 13efb6586d
commit 1f39a1bd76

View File

@ -382,12 +382,16 @@ impl Decodable for SealedEmptyStep {
} }
} }
struct PermissionedStep {
inner: Step,
can_propose: AtomicBool,
}
/// Engine using `AuthorityRound` proof-of-authority BFT consensus. /// Engine using `AuthorityRound` proof-of-authority BFT consensus.
pub struct AuthorityRound { pub struct AuthorityRound {
transition_service: IoService<()>, transition_service: IoService<()>,
step: Arc<Step>, step: Arc<PermissionedStep>,
can_propose: AtomicBool, client: Arc<RwLock<Option<Weak<EngineClient>>>>,
client: RwLock<Option<Weak<EngineClient>>>,
signer: RwLock<EngineSigner>, signer: RwLock<EngineSigner>,
validators: Box<ValidatorSet>, validators: Box<ValidatorSet>,
validate_score_transition: u64, validate_score_transition: u64,
@ -407,7 +411,7 @@ pub struct AuthorityRound {
// header-chain validator. // header-chain validator.
struct EpochVerifier { struct EpochVerifier {
step: Arc<Step>, step: Arc<PermissionedStep>,
subchain_validators: SimpleList, subchain_validators: SimpleList,
empty_steps_transition: u64, empty_steps_transition: u64,
} }
@ -415,7 +419,7 @@ struct EpochVerifier {
impl super::EpochVerifier<EthereumMachine> for EpochVerifier { impl super::EpochVerifier<EthereumMachine> for EpochVerifier {
fn verify_light(&self, header: &Header) -> Result<(), Error> { fn verify_light(&self, header: &Header) -> Result<(), Error> {
// Validate the timestamp // Validate the timestamp
verify_timestamp(&*self.step, header_step(header, self.empty_steps_transition)?)?; verify_timestamp(&self.step.inner, header_step(header, self.empty_steps_transition)?)?;
// always check the seal since it's fast. // always check the seal since it's fast.
// nothing heavier to do. // nothing heavier to do.
verify_external(header, &self.subchain_validators, self.empty_steps_transition) verify_external(header, &self.subchain_validators, self.empty_steps_transition)
@ -615,13 +619,15 @@ impl AuthorityRound {
let engine = Arc::new( let engine = Arc::new(
AuthorityRound { AuthorityRound {
transition_service: IoService::<()>::start()?, transition_service: IoService::<()>::start()?,
step: Arc::new(Step { step: Arc::new(PermissionedStep {
inner: AtomicUsize::new(initial_step), inner: Step {
calibrate: our_params.start_step.is_none(), inner: AtomicUsize::new(initial_step),
duration: our_params.step_duration, calibrate: our_params.start_step.is_none(),
duration: our_params.step_duration,
},
can_propose: AtomicBool::new(true),
}), }),
can_propose: AtomicBool::new(true), client: Arc::new(RwLock::new(None)),
client: RwLock::new(None),
signer: Default::default(), signer: Default::default(),
validators: our_params.validators, validators: our_params.validators,
validate_score_transition: our_params.validate_score_transition, validate_score_transition: our_params.validate_score_transition,
@ -641,7 +647,10 @@ impl AuthorityRound {
// Do not initialize timeouts for tests. // Do not initialize timeouts for tests.
if should_timeout { if should_timeout {
let handler = TransitionHandler { engine: Arc::downgrade(&engine) }; let handler = TransitionHandler {
step: engine.step.clone(),
client: engine.client.clone(),
};
engine.transition_service.register_handler(Arc::new(handler))?; engine.transition_service.register_handler(Arc::new(handler))?;
} }
Ok(engine) Ok(engine)
@ -666,7 +675,7 @@ impl AuthorityRound {
} }
fn generate_empty_step(&self, parent_hash: &H256) { fn generate_empty_step(&self, parent_hash: &H256) {
let step = self.step.load(); let step = self.step.inner.load();
let empty_step_rlp = empty_step_rlp(step, parent_hash); let empty_step_rlp = empty_step_rlp(step, parent_hash);
if let Ok(signature) = self.sign(keccak(&empty_step_rlp)).map(Into::into) { if let Ok(signature) = self.sign(keccak(&empty_step_rlp)).map(Into::into) {
@ -698,34 +707,37 @@ fn unix_now() -> Duration {
} }
struct TransitionHandler { struct TransitionHandler {
engine: Weak<AuthorityRound>, step: Arc<PermissionedStep>,
client: Arc<RwLock<Option<Weak<EngineClient>>>>,
} }
const ENGINE_TIMEOUT_TOKEN: TimerToken = 23; const ENGINE_TIMEOUT_TOKEN: TimerToken = 23;
impl IoHandler<()> for TransitionHandler { impl IoHandler<()> for TransitionHandler {
fn initialize(&self, io: &IoContext<()>) { fn initialize(&self, io: &IoContext<()>) {
if let Some(engine) = self.engine.upgrade() { let remaining = self.step.inner.duration_remaining().as_millis();
let remaining = engine.step.duration_remaining().as_millis(); io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(remaining))
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(remaining)) .unwrap_or_else(|e| warn!(target: "engine", "Failed to start consensus step timer: {}.", e))
.unwrap_or_else(|e| warn!(target: "engine", "Failed to start consensus step timer: {}.", e))
}
} }
fn timeout(&self, io: &IoContext<()>, timer: TimerToken) { fn timeout(&self, io: &IoContext<()>, timer: TimerToken) {
if timer == ENGINE_TIMEOUT_TOKEN { if timer == ENGINE_TIMEOUT_TOKEN {
if let Some(engine) = self.engine.upgrade() { // NOTE we might be lagging by couple of steps in case the timeout
// NOTE we might be lagging by couple of steps in case the timeout // has not been called fast enough.
// has not been called fast enough. // Make sure to advance up to the actual step.
// Make sure to advance up to the actual step. while self.step.inner.duration_remaining().as_millis() == 0 {
while engine.step.duration_remaining().as_millis() == 0 { self.step.inner.increment();
engine.step(); self.step.can_propose.store(true, AtomicOrdering::SeqCst);
if let Some(ref weak) = *self.client.read() {
if let Some(c) = weak.upgrade() {
c.update_sealing();
}
} }
let next_run_at = engine.step.duration_remaining().as_millis() >> 2;
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(next_run_at))
.unwrap_or_else(|e| warn!(target: "engine", "Failed to restart consensus step timer: {}.", e))
} }
let next_run_at = self.step.inner.duration_remaining().as_millis() >> 2;
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(next_run_at))
.unwrap_or_else(|e| warn!(target: "engine", "Failed to restart consensus step timer: {}.", e))
} }
} }
} }
@ -742,8 +754,8 @@ impl Engine<EthereumMachine> for AuthorityRound {
} }
fn step(&self) { fn step(&self) {
self.step.increment(); self.step.inner.increment();
self.can_propose.store(true, AtomicOrdering::SeqCst); self.step.can_propose.store(true, AtomicOrdering::SeqCst);
if let Some(ref weak) = *self.client.read() { if let Some(ref weak) = *self.client.read() {
if let Some(c) = weak.upgrade() { if let Some(c) = weak.upgrade() {
c.update_sealing(); c.update_sealing();
@ -790,7 +802,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
fn populate_from_parent(&self, header: &mut Header, parent: &Header) { fn populate_from_parent(&self, header: &mut Header, parent: &Header) {
let parent_step = header_step(parent, self.empty_steps_transition).expect("Header has been verified; qed"); let parent_step = header_step(parent, self.empty_steps_transition).expect("Header has been verified; qed");
let current_step = self.step.load(); let current_step = self.step.inner.load();
let current_empty_steps_len = if header.number() >= self.empty_steps_transition { let current_empty_steps_len = if header.number() >= self.empty_steps_transition {
self.empty_steps(parent_step.into(), current_step.into(), parent.hash()).len() self.empty_steps(parent_step.into(), current_step.into(), parent.hash()).len()
@ -816,7 +828,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
let empty_step: EmptyStep = rlp.as_val().map_err(fmt_err)?;; let empty_step: EmptyStep = rlp.as_val().map_err(fmt_err)?;;
if empty_step.verify(&*self.validators).unwrap_or(false) { if empty_step.verify(&*self.validators).unwrap_or(false) {
if self.step.check_future(empty_step.step).is_ok() { if self.step.inner.check_future(empty_step.step).is_ok() {
trace!(target: "engine", "handle_message: received empty step message {:?}", empty_step); trace!(target: "engine", "handle_message: received empty step message {:?}", empty_step);
self.handle_empty_step_message(empty_step); self.handle_empty_step_message(empty_step);
} else { } else {
@ -836,7 +848,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
fn generate_seal(&self, block: &ExecutedBlock, parent: &Header) -> Seal { fn generate_seal(&self, block: &ExecutedBlock, parent: &Header) -> Seal {
// first check to avoid generating signature most of the time // first check to avoid generating signature most of the time
// (but there's still a race to the `compare_and_swap`) // (but there's still a race to the `compare_and_swap`)
if !self.can_propose.load(AtomicOrdering::SeqCst) { if !self.step.can_propose.load(AtomicOrdering::SeqCst) {
trace!(target: "engine", "Aborting seal generation. Can't propose."); trace!(target: "engine", "Aborting seal generation. Can't propose.");
return Seal::None; return Seal::None;
} }
@ -845,7 +857,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
let parent_step: U256 = header_step(parent, self.empty_steps_transition) let parent_step: U256 = header_step(parent, self.empty_steps_transition)
.expect("Header has been verified; qed").into(); .expect("Header has been verified; qed").into();
let step = self.step.load(); let step = self.step.inner.load();
// filter messages from old and future steps and different parents // filter messages from old and future steps and different parents
let empty_steps = if header.number() >= self.empty_steps_transition { let empty_steps = if header.number() >= self.empty_steps_transition {
@ -922,7 +934,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
trace!(target: "engine", "generate_seal: Issuing a block for step {}.", step); trace!(target: "engine", "generate_seal: Issuing a block for step {}.", step);
// only issue the seal if we were the first to reach the compare_and_swap. // only issue the seal if we were the first to reach the compare_and_swap.
if self.can_propose.compare_and_swap(true, false, AtomicOrdering::SeqCst) { if self.step.can_propose.compare_and_swap(true, false, AtomicOrdering::SeqCst) {
self.clear_empty_steps(parent_step); self.clear_empty_steps(parent_step);
@ -999,7 +1011,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
.decode()?; .decode()?;
let parent_step = header_step(&parent, self.empty_steps_transition)?; let parent_step = header_step(&parent, self.empty_steps_transition)?;
let current_step = self.step.load(); let current_step = self.step.inner.load();
self.empty_steps(parent_step.into(), current_step.into(), parent.hash()) self.empty_steps(parent_step.into(), current_step.into(), parent.hash())
} else { } else {
// we're verifying a block, extract empty steps from the seal // we're verifying a block, extract empty steps from the seal
@ -1052,7 +1064,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
// If yes then probably benign reporting needs to be moved further in the verification. // If yes then probably benign reporting needs to be moved further in the verification.
let set_number = header.number(); let set_number = header.number();
match verify_timestamp(&*self.step, header_step(header, self.empty_steps_transition)?) { match verify_timestamp(&self.step.inner, header_step(header, self.empty_steps_transition)?) {
Err(BlockError::InvalidSeal) => { Err(BlockError::InvalidSeal) => {
self.validators.report_benign(header.author(), set_number, header.number()); self.validators.report_benign(header.author(), set_number, header.number());
Err(BlockError::InvalidSeal.into()) Err(BlockError::InvalidSeal.into())
@ -1294,7 +1306,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
// This way, upon encountering an epoch change, the proposer from the // This way, upon encountering an epoch change, the proposer from the
// new set will be forced to wait until the next step to avoid sealing a // new set will be forced to wait until the next step to avoid sealing a
// block that breaks the invariant that the parent's step < the block's step. // block that breaks the invariant that the parent's step < the block's step.
self.can_propose.store(false, AtomicOrdering::SeqCst); self.step.can_propose.store(false, AtomicOrdering::SeqCst);
return Some(combine_proofs(signal_number, &pending.proof, &*finality_proof)); return Some(combine_proofs(signal_number, &pending.proof, &*finality_proof));
} }
} }