Refactor Clique stepping (#10691)

* Use Drop to shutdown stepper thread
Make period == 0 an error and remove the Option from step_service

* Remove StepService

Remove StepService and spawn the stepping thread in `Clique::new()`. Don't store the thread handle and instead trust the `AtomicBool` to signal shutdown time.
Don't check for `period > 0`: we assume a valid chainspec file.

* Don't shutdown the stepper thread at all, just let it run until exit
Also: fix a few warnings and tests

* Put kvdb_memorydb back

* Warn&exit when engine is dropped
Don't sleep too long!

* Don't delay stepping thread

* Better formatting
This commit is contained in:
David 2019-06-10 12:10:26 +02:00 committed by GitHub
parent 7827cc048e
commit 083dcc369b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 27 additions and 111 deletions

View File

@ -64,7 +64,7 @@ use std::collections::VecDeque;
use std::sync::{Arc, Weak};
use std::thread;
use std::time;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::time::{Instant, Duration, SystemTime, UNIX_EPOCH};
use block::ExecutedBlock;
use client::{BlockId, EngineClient};
@ -89,11 +89,9 @@ use time_utils::CheckedSystemTime;
use self::block_state::CliqueBlockState;
use self::params::CliqueParams;
use self::step_service::StepService;
mod params;
mod block_state;
mod step_service;
mod util;
// TODO(niklasad1): extract tester types into a separate mod to be shared in the code base
@ -168,7 +166,6 @@ pub struct Clique {
block_state_by_hash: RwLock<LruCache<H256, CliqueBlockState>>,
proposals: RwLock<HashMap<Address, VoteType>>,
signer: RwLock<Option<Box<EngineSigner>>>,
step_service: Option<StepService>,
}
#[cfg(test)]
@ -181,13 +178,15 @@ pub struct Clique {
pub block_state_by_hash: RwLock<LruCache<H256, CliqueBlockState>>,
pub proposals: RwLock<HashMap<Address, VoteType>>,
pub signer: RwLock<Option<Box<EngineSigner>>>,
pub step_service: Option<StepService>,
}
impl Clique {
/// Initialize Clique engine from empty state.
pub fn new(params: CliqueParams, machine: EthereumMachine) -> Result<Arc<Self>, Error> {
let mut engine = Clique {
/// Step Clique at most every 2 seconds
const SEALING_FREQ: Duration = Duration::from_secs(2);
let engine = Clique {
epoch_length: params.epoch,
period: params.period,
client: Default::default(),
@ -195,19 +194,29 @@ impl Clique {
proposals: Default::default(),
signer: Default::default(),
machine,
step_service: None,
};
if params.period > 0 {
engine.step_service = Some(StepService::new());
let engine = Arc::new(engine);
let weak_eng = Arc::downgrade(&engine);
if let Some(step_service) = &engine.step_service {
step_service.start(weak_eng);
}
Ok(engine)
thread::Builder::new().name("StepService".into())
.spawn(move || {
loop {
let next_step_at = Instant::now() + SEALING_FREQ;
trace!(target: "miner", "StepService: triggering sealing");
if let Some(eng) = weak_eng.upgrade() {
eng.step()
} else {
Ok(Arc::new(engine))
warn!(target: "shutdown", "StepService: engine is dropped; exiting.");
break;
}
let now = Instant::now();
if now < next_step_at {
thread::sleep(next_step_at - now);
}
}
})?;
Ok(engine)
}
#[cfg(test)]
@ -225,7 +234,6 @@ impl Clique {
proposals: Default::default(),
signer: Default::default(),
machine: Spec::new_test_machine(),
step_service: None,
}
}
@ -348,15 +356,6 @@ impl Clique {
}
}
impl Drop for Clique {
fn drop(&mut self) {
if let Some(step_service) = &self.step_service {
trace!(target: "shutdown", "Clique; stopping step service");
step_service.stop();
}
}
}
impl Engine<EthereumMachine> for Clique {
fn name(&self) -> &str { "Clique" }

View File

@ -1,80 +0,0 @@
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Weak;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use std::thread;
use std::sync::Arc;
use parking_lot::RwLock;
use engines::Engine;
use machine::Machine;
/// Service that is managing the engine
pub struct StepService {
shutdown: Arc<AtomicBool>,
thread: RwLock<Option<thread::JoinHandle<()>>>,
}
impl StepService {
/// Create a new StepService without spawning a sealing thread.
pub fn new() -> Self {
let shutdown = Arc::new(AtomicBool::new(false));
StepService { shutdown, thread: RwLock::new(None) }
}
/// Start the StepService: spawns a thread that loops and triggers a sealing operation every 2sec.
pub fn start<M: Machine + 'static>(&self, engine: Weak<Engine<M>>) {
let shutdown = self.shutdown.clone();
let thr = thread::Builder::new()
.name("CliqueStepService".into())
.spawn(move || {
// startup delay.
thread::sleep(Duration::from_secs(5));
loop {
// see if we are in shutdown.
if shutdown.load(Ordering::Acquire) {
trace!(target: "shutdown", "CliqueStepService: received shutdown signal!");
break;
}
trace!(target: "miner", "CliqueStepService: triggering sealing");
// Try sealing
engine.upgrade().map(|x| x.step());
// Yield
thread::sleep(Duration::from_millis(2000));
}
trace!(target: "shutdown", "CliqueStepService: exited loop, shutdown.");
}).expect("CliqueStepService thread failed");
*self.thread.write() = Some(thr);
}
/// Stop the `StepService`
pub fn stop(&self) {
trace!(target: "shutdown", "CliqueStepService: signalling shutting to stepping thread.");
self.shutdown.store(true, Ordering::Release);
if let Some(t) = self.thread.write().take() {
t.join().expect("CliqueStepService thread panicked!");
}
}
}

View File

@ -437,9 +437,6 @@ pub trait Engine<M: Machine>: Sync + Send {
/// Trigger next step of the consensus engine.
fn step(&self) {}
/// Stops any services that the may hold the Engine and makes it safe to drop.
fn stop(&mut self) {}
/// Create a factory for building snapshot chunks and restoring from them.
/// Returning `None` indicates that this engine doesn't support snapshot creation.
fn snapshot_components(&self) -> Option<Box<SnapshotComponents>> {