Step duration map configuration parameter ported from the POA Network fork (#10902)

* step duration map configuration parameter ported from POA Network fork

* step duration map refactoring

* added a test of step duration change

* refactoring of vector search; return Err instead of panicking

* removed dead code and the Config engine error variant

* doc correction

* converted triples to struct StepDurationInfo
This commit is contained in:
Vladimir Komendantskiy 2019-10-28 13:39:18 +00:00 committed by David
parent 2c97bcc1a4
commit e0e79fdee0
4 changed files with 254 additions and 72 deletions

View File

@ -33,11 +33,12 @@
use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::{cmp, fmt}; use std::{cmp, fmt};
use std::iter::FromIterator; use std::iter::{self, FromIterator};
use std::ops::Deref; use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::sync::atomic::{AtomicU64, AtomicBool, Ordering as AtomicOrdering};
use std::sync::{Weak, Arc}; use std::sync::{Weak, Arc};
use std::time::{UNIX_EPOCH, Duration}; use std::time::{UNIX_EPOCH, Duration};
use std::u64;
use client_traits::EngineClient; use client_traits::EngineClient;
use engine::{Engine, ConstructedVerifier}; use engine::{Engine, ConstructedVerifier};
@ -83,12 +84,13 @@ use self::finality::RollingFinality;
/// `AuthorityRound` params. /// `AuthorityRound` params.
pub struct AuthorityRoundParams { pub struct AuthorityRoundParams {
/// Time to wait before next block or authority switching, /// A map defining intervals of blocks with the given times (in seconds) to wait before next
/// in seconds. /// block or authority switching. The keys in the map are steps of starting blocks of those
/// periods. The entry at `0` should be defined.
/// ///
/// Deliberately typed as u16 as too high of a value leads /// Wait times (durations) are additionally required to be less than 65535 since larger values
/// to slow block issuance. /// lead to slow block issuance.
pub step_duration: u16, pub step_durations: BTreeMap<u64, u64>,
/// Starting step, /// Starting step,
pub start_step: Option<u64>, pub start_step: Option<u64>,
/// Valid validators. /// Valid validators.
@ -121,11 +123,27 @@ const U16_MAX: usize = ::std::u16::MAX as usize;
impl From<ethjson::spec::AuthorityRoundParams> for AuthorityRoundParams { impl From<ethjson::spec::AuthorityRoundParams> for AuthorityRoundParams {
fn from(p: ethjson::spec::AuthorityRoundParams) -> Self { fn from(p: ethjson::spec::AuthorityRoundParams) -> Self {
let mut step_duration_usize: usize = p.step_duration.into(); let map_step_duration = |u: ethjson::uint::Uint| {
if step_duration_usize > U16_MAX { let mut step_duration_usize: usize = u.into();
step_duration_usize = U16_MAX; if step_duration_usize == 0 {
warn!(target: "engine", "step_duration is too high ({}), setting it to {}", step_duration_usize, U16_MAX); panic!("AuthorityRoundParams: step duration cannot be 0");
} }
if step_duration_usize > U16_MAX {
warn!(target: "engine", "step duration is too high ({}), setting it to {}", step_duration_usize, U16_MAX);
step_duration_usize = U16_MAX;
}
step_duration_usize as u64
};
let step_durations: BTreeMap<_, _> = match p.step_duration {
ethjson::spec::StepDuration::Single(u) =>
iter::once((0, map_step_duration(u))).collect(),
ethjson::spec::StepDuration::Transitions(tr) => {
if tr.is_empty() {
panic!("AuthorityRoundParams: step duration transitions cannot be empty");
}
tr.into_iter().map(|(timestamp, u)| (timestamp.into(), map_step_duration(u))).collect()
}
};
let transition_block_num = p.block_reward_contract_transition.map_or(0, Into::into); let transition_block_num = p.block_reward_contract_transition.map_or(0, Into::into);
let mut br_transitions: BTreeMap<_, _> = p.block_reward_contract_transitions let mut br_transitions: BTreeMap<_, _> = p.block_reward_contract_transitions
.unwrap_or_default() .unwrap_or_default()
@ -151,7 +169,7 @@ impl From<ethjson::spec::AuthorityRoundParams> for AuthorityRoundParams {
); );
} }
AuthorityRoundParams { AuthorityRoundParams {
step_duration: step_duration_usize as u16, step_durations,
validators: new_validator_set(p.validators), validators: new_validator_set(p.validators),
start_step: p.start_step.map(Into::into), start_step: p.start_step.map(Into::into),
validate_score_transition: p.validate_score_transition.map_or(0, Into::into), validate_score_transition: p.validate_score_transition.map_or(0, Into::into),
@ -169,53 +187,88 @@ impl From<ethjson::spec::AuthorityRoundParams> for AuthorityRoundParams {
} }
} }
// Helper for managing the step. /// A triple containing the first step number and the starting timestamp of the given step duration.
#[derive(Clone, Debug)]
struct StepDurationInfo {
transition_step: u64,
transition_timestamp: u64,
step_duration: u64,
}
/// Helper for managing the step.
#[derive(Debug)] #[derive(Debug)]
struct Step { struct Step {
calibrate: bool, // whether calibration is enabled. calibrate: bool, // whether calibration is enabled.
inner: AtomicUsize, inner: AtomicU64,
duration: u16, /// Planned durations of steps.
durations: Vec<StepDurationInfo>,
} }
impl Step { impl Step {
fn load(&self) -> u64 { self.inner.load(AtomicOrdering::SeqCst) as u64 } fn load(&self) -> u64 { self.inner.load(AtomicOrdering::SeqCst) }
/// Finds the remaining duration of the current step. Panics if there was a counter under- or
/// overflow.
fn duration_remaining(&self) -> Duration { fn duration_remaining(&self) -> Duration {
let now = unix_now(); self.opt_duration_remaining().unwrap_or_else(|| {
let expected_seconds = self.load()
.checked_add(1)
.and_then(|ctr| ctr.checked_mul(self.duration as u64))
.map(Duration::from_secs);
match expected_seconds {
Some(step_end) if step_end > now => step_end - now,
Some(_) => Duration::from_secs(0),
None => {
let ctr = self.load(); let ctr = self.load();
error!(target: "engine", "Step counter is too high: {}, aborting", ctr); error!(target: "engine", "Step counter under- or overflow: {}, aborting", ctr);
panic!("step counter is too high: {}", ctr) panic!("step counter under- or overflow: {}", ctr)
}, })
} }
/// Finds the remaining duration of the current step. Returns `None` if there was a counter
/// under- or overflow.
fn opt_duration_remaining(&self) -> Option<Duration> {
let next_step = self.load().checked_add(1)?;
let StepDurationInfo { transition_step, transition_timestamp, step_duration } =
self.durations.iter()
.take_while(|info| info.transition_step < next_step)
.last()
.expect("durations cannot be empty")
.clone();
let next_time = transition_timestamp
.checked_add(next_step.checked_sub(transition_step)?.checked_mul(step_duration)?)?;
Some(Duration::from_secs(next_time.saturating_sub(unix_now().as_secs())))
} }
/// Increments the step number.
///
/// Panics if the new step number is `u64::MAX`.
fn increment(&self) { fn increment(&self) {
use std::usize;
// fetch_add won't panic on overflow but will rather wrap // fetch_add won't panic on overflow but will rather wrap
// around, leading to zero as the step counter, which might // around, leading to zero as the step counter, which might
// lead to unexpected situations, so it's better to shut down. // lead to unexpected situations, so it's better to shut down.
if self.inner.fetch_add(1, AtomicOrdering::SeqCst) == usize::MAX { if self.inner.fetch_add(1, AtomicOrdering::SeqCst) == u64::MAX {
error!(target: "engine", "Step counter is too high: {}, aborting", usize::MAX); error!(target: "engine", "Step counter is too high: {}, aborting", u64::MAX);
panic!("step counter is too high: {}", usize::MAX); panic!("step counter is too high: {}", u64::MAX);
} }
} }
fn calibrate(&self) { fn calibrate(&self) {
if self.calibrate { if self.calibrate {
let new_step = unix_now().as_secs() / (self.duration as u64); if self.opt_calibrate().is_none() {
self.inner.store(new_step as usize, AtomicOrdering::SeqCst); let ctr = self.load();
error!(target: "engine", "Step counter under- or overflow: {}, aborting", ctr);
panic!("step counter under- or overflow: {}", ctr)
} }
} }
}
/// Calibrates the AuRa step number according to the current time.
fn opt_calibrate(&self) -> Option<()> {
let now = unix_now().as_secs();
let StepDurationInfo { transition_step, transition_timestamp, step_duration } =
self.durations.iter()
.take_while(|info| info.transition_timestamp < now)
.last()
.expect("durations cannot be empty")
.clone();
let new_step = (now.checked_sub(transition_timestamp)? / step_duration)
.checked_add(transition_step)?;
self.inner.store(new_step, AtomicOrdering::SeqCst);
Some(())
}
fn check_future(&self, given: u64) -> Result<(), Option<OutOfBounds<u64>>> { fn check_future(&self, given: u64) -> Result<(), Option<OutOfBounds<u64>>> {
const REJECTED_STEP_DRIFT: u64 = 4; const REJECTED_STEP_DRIFT: u64 = 4;
@ -234,7 +287,9 @@ impl Step {
Err(None) Err(None)
// wait a bit for blocks in near future // wait a bit for blocks in near future
} else if given > current { } else if given > current {
let d = self.duration as u64; let d = self.durations.iter().take_while(|info| info.transition_step <= current).last()
.expect("Duration map has at least a 0 entry.")
.step_duration;
Err(Some(OutOfBounds { Err(Some(OutOfBounds {
min: None, min: None,
max: Some(d * current), max: Some(d * current),
@ -730,23 +785,54 @@ impl<'a, A: ?Sized, B> Deref for CowLike<'a, A, B> where B: AsRef<A> {
impl AuthorityRound { impl AuthorityRound {
/// Create a new instance of AuthorityRound engine. /// Create a new instance of AuthorityRound engine.
pub fn new(our_params: AuthorityRoundParams, machine: Machine) -> Result<Arc<Self>, Error> { pub fn new(our_params: AuthorityRoundParams, machine: Machine) -> Result<Arc<Self>, Error> {
if our_params.step_duration == 0 { if !our_params.step_durations.contains_key(&0) {
error!(target: "engine", "Authority Round step duration can't be zero, aborting"); error!(target: "engine", "Authority Round step 0 duration is undefined, aborting");
panic!("authority_round: step duration can't be zero") return Err(Error::Engine(EngineError::Custom(String::from("step 0 duration is undefined"))));
}
if our_params.step_durations.values().any(|v| *v == 0) {
error!(target: "engine", "Authority Round step duration cannot be 0");
return Err(Error::Engine(EngineError::Custom(String::from("step duration cannot be 0"))));
} }
let should_timeout = our_params.start_step.is_none(); let should_timeout = our_params.start_step.is_none();
let initial_step = our_params.start_step.unwrap_or_else(|| (unix_now().as_secs() / (our_params.step_duration as u64))); let initial_step = our_params.start_step.unwrap_or(0);
let mut durations = Vec::new();
let mut prev_step = 0u64;
let mut prev_time = 0u64;
let mut prev_dur = our_params.step_durations[&0];
durations.push(StepDurationInfo {
transition_step: prev_step,
transition_timestamp: prev_time,
step_duration: prev_dur
});
for (time, dur) in our_params.step_durations.iter().skip(1) {
let (step, time) = next_step_time_duration(
StepDurationInfo{
transition_step: prev_step,
transition_timestamp: prev_time,
step_duration: prev_dur,
}, *time)
.ok_or(BlockError::TimestampOverflow)?;
durations.push(StepDurationInfo {
transition_step: step,
transition_timestamp: time,
step_duration: *dur
});
prev_step = step;
prev_time = time;
prev_dur = *dur;
}
let step = Step {
inner: AtomicU64::new(initial_step),
calibrate: our_params.start_step.is_none(),
durations,
};
step.calibrate();
let engine = Arc::new( let engine = Arc::new(
AuthorityRound { AuthorityRound {
transition_service: IoService::<()>::start()?, transition_service: IoService::<()>::start()?,
step: Arc::new(PermissionedStep { step: Arc::new(PermissionedStep { inner: step, can_propose: AtomicBool::new(true) }),
inner: Step {
inner: AtomicUsize::new(initial_step as usize),
calibrate: our_params.start_step.is_none(),
duration: our_params.step_duration,
},
can_propose: AtomicBool::new(true),
}),
client: Arc::new(RwLock::new(None)), client: Arc::new(RwLock::new(None)),
signer: RwLock::new(None), signer: RwLock::new(None),
validators: our_params.validators, validators: our_params.validators,
@ -994,8 +1080,10 @@ impl IoHandler<()> for TransitionHandler {
} }
} }
let next_run_at = AsMillis::as_millis(&self.step.inner.duration_remaining()) >> 2; let next_run_at = Duration::from_millis(
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(next_run_at)) AsMillis::as_millis(&self.step.inner.duration_remaining()) >> 2
);
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, next_run_at)
.unwrap_or_else(|e| warn!(target: "engine", "Failed to restart consensus step timer: {}.", e)) .unwrap_or_else(|e| warn!(target: "engine", "Failed to restart consensus step timer: {}.", e))
} }
} }
@ -1694,12 +1782,27 @@ impl Engine for AuthorityRound {
} }
} }
/// A helper accumulator function mapping a step duration and a step duration transition timestamp
/// to the corresponding step number and the correct starting second of the step.
fn next_step_time_duration(info: StepDurationInfo, time: u64) -> Option<(u64, u64)>
{
let step_diff = time.checked_add(info.step_duration)?
.checked_sub(1)?
.checked_sub(info.transition_timestamp)?
.checked_div(info.step_duration)?;
Some((
info.transition_step.checked_add(step_diff)?,
step_diff.checked_mul(info.step_duration)?.checked_add(time)?,
))
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering}; use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering as AtomicOrdering};
use std::time::Duration;
use keccak_hash::keccak; use keccak_hash::keccak;
use accounts::AccountProvider; use accounts::AccountProvider;
use ethereum_types::{Address, H520, H256, U256}; use ethereum_types::{Address, H520, H256, U256};
@ -1726,13 +1829,16 @@ mod tests {
use ethjson; use ethjson;
use serde_json; use serde_json;
use super::{AuthorityRoundParams, AuthorityRound, EmptyStep, SealedEmptyStep, calculate_score}; use super::{
AuthorityRoundParams, AuthorityRound, EmptyStep, SealedEmptyStep, StepDurationInfo,
calculate_score,
};
fn build_aura<F>(f: F) -> Arc<AuthorityRound> where fn build_aura<F>(f: F) -> Arc<AuthorityRound> where
F: FnOnce(&mut AuthorityRoundParams), F: FnOnce(&mut AuthorityRoundParams),
{ {
let mut params = AuthorityRoundParams { let mut params = AuthorityRoundParams {
step_duration: 1, step_durations: [(0, 1)].to_vec().into_iter().collect(),
start_step: Some(1), start_step: Some(1),
validators: Box::new(TestSet::default()), validators: Box::new(TestSet::default()),
validate_score_transition: 0, validate_score_transition: 0,
@ -2061,29 +2167,67 @@ mod tests {
use super::Step; use super::Step;
let step = Step { let step = Step {
calibrate: false, calibrate: false,
inner: AtomicUsize::new(::std::usize::MAX), inner: AtomicU64::new(::std::u64::MAX),
duration: 1, durations: [StepDurationInfo {
transition_step: 0,
transition_timestamp: 0,
step_duration: 1,
}].to_vec().into_iter().collect(),
}; };
step.increment(); step.increment();
} }
#[test] #[test]
#[should_panic(expected="counter is too high")] #[should_panic(expected="step counter under- or overflow")]
fn test_counter_duration_remaining_too_high() { fn test_counter_duration_remaining_too_high() {
use super::Step; use super::Step;
let step = Step { let step = Step {
calibrate: false, calibrate: false,
inner: AtomicUsize::new(::std::usize::MAX), inner: AtomicU64::new(::std::u64::MAX),
duration: 1, durations: [StepDurationInfo {
transition_step: 0,
transition_timestamp: 0,
step_duration: 1,
}].to_vec().into_iter().collect(),
}; };
step.duration_remaining(); step.duration_remaining();
} }
#[test] #[test]
#[should_panic(expected="authority_round: step duration can't be zero")] fn test_change_step_duration() {
use super::Step;
use std::thread;
let now = super::unix_now().as_secs();
let step = Step {
calibrate: true,
inner: AtomicU64::new(::std::u64::MAX),
durations: [
StepDurationInfo { transition_step: 0, transition_timestamp: 0, step_duration: 1 },
StepDurationInfo { transition_step: now, transition_timestamp: now, step_duration: 2 },
StepDurationInfo { transition_step: now + 1, transition_timestamp: now + 2, step_duration: 4 },
].to_vec().into_iter().collect(),
};
// calibrated step `now`
step.calibrate();
let duration_remaining = step.duration_remaining();
assert_eq!(step.inner.load(AtomicOrdering::SeqCst), now);
assert!(duration_remaining <= Duration::from_secs(2));
thread::sleep(duration_remaining);
step.increment();
// calibrated step `now + 1`
step.calibrate();
let duration_remaining = step.duration_remaining();
assert_eq!(step.inner.load(AtomicOrdering::SeqCst), now + 1);
assert!(duration_remaining > Duration::from_secs(2));
assert!(duration_remaining <= Duration::from_secs(4));
}
#[test]
#[should_panic(expected="called `Result::unwrap()` on an `Err` value: Engine(Custom(\"step duration cannot be 0\"))")]
fn test_step_duration_zero() { fn test_step_duration_zero() {
build_aura(|params| { build_aura(|params| {
params.step_duration = 0; params.step_durations = [(0, 0)].to_vec().into_iter().collect();
}); });
} }
@ -2473,7 +2617,7 @@ mod tests {
#[test] #[test]
fn test_empty_steps() { fn test_empty_steps() {
let engine = build_aura(|p| { let engine = build_aura(|p| {
p.step_duration = 4; p.step_durations = [(0, 4)].to_vec().into_iter().collect();
p.empty_steps_transition = 0; p.empty_steps_transition = 0;
p.maximum_empty_steps = 0; p.maximum_empty_steps = 0;
}); });
@ -2507,7 +2651,7 @@ mod tests {
let (_spec, tap, accounts) = setup_empty_steps(); let (_spec, tap, accounts) = setup_empty_steps();
let engine = build_aura(|p| { let engine = build_aura(|p| {
p.validators = Box::new(SimpleList::new(accounts.clone())); p.validators = Box::new(SimpleList::new(accounts.clone()));
p.step_duration = 4; p.step_durations = [(0, 4)].to_vec().into_iter().collect();
p.empty_steps_transition = 0; p.empty_steps_transition = 0;
p.maximum_empty_steps = 0; p.maximum_empty_steps = 0;
}); });
@ -2544,7 +2688,7 @@ mod tests {
let (_spec, tap, accounts) = setup_empty_steps(); let (_spec, tap, accounts) = setup_empty_steps();
let engine = build_aura(|p| { let engine = build_aura(|p| {
p.validators = Box::new(SimpleList::new(accounts.clone())); p.validators = Box::new(SimpleList::new(accounts.clone()));
p.step_duration = 4; p.step_durations = [(0, 4)].to_vec().into_iter().collect();
p.empty_steps_transition = 0; p.empty_steps_transition = 0;
p.maximum_empty_steps = 0; p.maximum_empty_steps = 0;
}); });

View File

@ -41,7 +41,7 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use crate::{bytes::Bytes, hash::Address, uint::Uint}; use crate::{bytes::Bytes, hash::Address, uint::Uint};
use serde::Deserialize; use serde::Deserialize;
use super::ValidatorSet; use super::{StepDuration, ValidatorSet};
/// Authority params deserialization. /// Authority params deserialization.
#[derive(Debug, PartialEq, Deserialize)] #[derive(Debug, PartialEq, Deserialize)]
@ -49,7 +49,7 @@ use super::ValidatorSet;
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct AuthorityRoundParams { pub struct AuthorityRoundParams {
/// Block duration, in seconds. /// Block duration, in seconds.
pub step_duration: Uint, pub step_duration: StepDuration,
/// Valid authorities /// Valid authorities
pub validators: ValidatorSet, pub validators: ValidatorSet,
/// Starting step. Determined automatically if not specified. /// Starting step. Determined automatically if not specified.
@ -107,7 +107,7 @@ pub struct AuthorityRound {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{Address, Uint}; use super::{Address, Uint, StepDuration};
use ethereum_types::{U256, H160}; use ethereum_types::{U256, H160};
use crate::spec::{validator_set::ValidatorSet, authority_round::AuthorityRound}; use crate::spec::{validator_set::ValidatorSet, authority_round::AuthorityRound};
use std::str::FromStr; use std::str::FromStr;
@ -129,7 +129,7 @@ mod tests {
}"#; }"#;
let deserialized: AuthorityRound = serde_json::from_str(s).unwrap(); let deserialized: AuthorityRound = serde_json::from_str(s).unwrap();
assert_eq!(deserialized.params.step_duration, Uint(U256::from(0x02))); assert_eq!(deserialized.params.step_duration, StepDuration::Single(Uint(U256::from(2))));
assert_eq!( assert_eq!(
deserialized.params.validators, deserialized.params.validators,
ValidatorSet::List(vec![Address(H160::from_str("c6d9d2cd449a754c494264e1809c50e34d64562b").unwrap())]), ValidatorSet::List(vec![Address(H160::from_str("c6d9d2cd449a754c494264e1809c50e34d64562b").unwrap())]),

View File

@ -32,6 +32,7 @@ pub mod null_engine;
pub mod instant_seal; pub mod instant_seal;
pub mod hardcoded_sync; pub mod hardcoded_sync;
pub mod clique; pub mod clique;
pub mod step_duration;
pub use self::account::Account; pub use self::account::Account;
pub use self::builtin::{Builtin, Pricing, Linear}; pub use self::builtin::{Builtin, Pricing, Linear};
@ -49,3 +50,4 @@ pub use self::clique::{Clique, CliqueParams};
pub use self::null_engine::{NullEngine, NullEngineParams}; pub use self::null_engine::{NullEngine, NullEngineParams};
pub use self::instant_seal::{InstantSeal, InstantSealParams}; pub use self::instant_seal::{InstantSeal, InstantSealParams};
pub use self::hardcoded_sync::HardcodedSync; pub use self::hardcoded_sync::HardcodedSync;
pub use self::step_duration::StepDuration;

View File

@ -0,0 +1,36 @@
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Step duration configuration parameter
use std::collections::BTreeMap;
use serde::Deserialize;
use crate::uint::Uint;
/// Step duration can be specified either as a `Uint` (in seconds), in which case it will be
/// constant, or as a list of pairs consisting of a timestamp of type `Uint` and a duration, in
/// which case the duration of a step will be determined by a mapping arising from that list.
#[derive(Debug, PartialEq, Deserialize)]
#[serde(deny_unknown_fields)]
#[serde(untagged)]
pub enum StepDuration {
/// Duration of all steps.
Single(Uint),
/// Step duration transitions: a mapping of timestamp to step durations.
Transitions(BTreeMap<Uint, Uint>),
}