From 083dcc369bac4ec105a2d65364c4731187e66095 Mon Sep 17 00:00:00 2001 From: David Date: Mon, 10 Jun 2019 12:10:26 +0200 Subject: [PATCH] Refactor Clique stepping (#10691) * Use Drop to shutdown stepper thread Make period == 0 an error and remove the Option from step_service * Remove StepService Remove StepService and spawn the stepping thread in `Clique::new()`. Don't store the thread handle and instead trust the `AtomicBool` to signal shutdown time. Don't check for `period > 0`: we assume a valid chainspec file. * Don't shutdown the stepper thread at all, just let it run until exit Also: fix a few warnings and tests * Put kvdb_memorydb back * Warn&exit when engine is dropped Don't sleep too long! * Don't delay stepping thread * Better formatting --- ethcore/src/engines/clique/mod.rs | 55 ++++++++------- ethcore/src/engines/clique/step_service.rs | 80 ---------------------- ethcore/src/engines/mod.rs | 3 - 3 files changed, 27 insertions(+), 111 deletions(-) delete mode 100644 ethcore/src/engines/clique/step_service.rs diff --git a/ethcore/src/engines/clique/mod.rs b/ethcore/src/engines/clique/mod.rs index 180237b01..5884ab05d 100644 --- a/ethcore/src/engines/clique/mod.rs +++ b/ethcore/src/engines/clique/mod.rs @@ -64,7 +64,7 @@ use std::collections::VecDeque; use std::sync::{Arc, Weak}; use std::thread; use std::time; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{Instant, Duration, SystemTime, UNIX_EPOCH}; use block::ExecutedBlock; use client::{BlockId, EngineClient}; @@ -89,11 +89,9 @@ use time_utils::CheckedSystemTime; use self::block_state::CliqueBlockState; use self::params::CliqueParams; -use self::step_service::StepService; mod params; mod block_state; -mod step_service; mod util; // TODO(niklasad1): extract tester types into a separate mod to be shared in the code base @@ -168,7 +166,6 @@ pub struct Clique { block_state_by_hash: RwLock>, proposals: RwLock>, signer: RwLock>>, - step_service: Option, } #[cfg(test)] @@ -181,13 +178,15 @@ pub struct Clique { pub block_state_by_hash: RwLock>, pub proposals: RwLock>, pub signer: RwLock>>, - pub step_service: Option, } impl Clique { /// Initialize Clique engine from empty state. pub fn new(params: CliqueParams, machine: EthereumMachine) -> Result, Error> { - let mut engine = Clique { + /// Step Clique at most every 2 seconds + const SEALING_FREQ: Duration = Duration::from_secs(2); + + let engine = Clique { epoch_length: params.epoch, period: params.period, client: Default::default(), @@ -195,19 +194,29 @@ impl Clique { proposals: Default::default(), signer: Default::default(), machine, - step_service: None, }; - if params.period > 0 { - engine.step_service = Some(StepService::new()); - let engine = Arc::new(engine); - let weak_eng = Arc::downgrade(&engine); - if let Some(step_service) = &engine.step_service { - step_service.start(weak_eng); - } - Ok(engine) - } else { - Ok(Arc::new(engine)) - } + let engine = Arc::new(engine); + let weak_eng = Arc::downgrade(&engine); + + thread::Builder::new().name("StepService".into()) + .spawn(move || { + loop { + let next_step_at = Instant::now() + SEALING_FREQ; + trace!(target: "miner", "StepService: triggering sealing"); + if let Some(eng) = weak_eng.upgrade() { + eng.step() + } else { + warn!(target: "shutdown", "StepService: engine is dropped; exiting."); + break; + } + + let now = Instant::now(); + if now < next_step_at { + thread::sleep(next_step_at - now); + } + } + })?; + Ok(engine) } #[cfg(test)] @@ -225,7 +234,6 @@ impl Clique { proposals: Default::default(), signer: Default::default(), machine: Spec::new_test_machine(), - step_service: None, } } @@ -348,15 +356,6 @@ impl Clique { } } -impl Drop for Clique { - fn drop(&mut self) { - if let Some(step_service) = &self.step_service { - trace!(target: "shutdown", "Clique; stopping step service"); - step_service.stop(); - } - } -} - impl Engine for Clique { fn name(&self) -> &str { "Clique" } diff --git a/ethcore/src/engines/clique/step_service.rs b/ethcore/src/engines/clique/step_service.rs deleted file mode 100644 index a7c977953..000000000 --- a/ethcore/src/engines/clique/step_service.rs +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright 2015-2019 Parity Technologies (UK) Ltd. -// This file is part of Parity Ethereum. - -// Parity Ethereum is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Ethereum is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Ethereum. If not, see . - - -use std::sync::Weak; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::Duration; -use std::thread; -use std::sync::Arc; -use parking_lot::RwLock; - -use engines::Engine; -use machine::Machine; - -/// Service that is managing the engine -pub struct StepService { - shutdown: Arc, - thread: RwLock>>, -} - -impl StepService { - /// Create a new StepService without spawning a sealing thread. - pub fn new() -> Self { - let shutdown = Arc::new(AtomicBool::new(false)); - StepService { shutdown, thread: RwLock::new(None) } - } - - /// Start the StepService: spawns a thread that loops and triggers a sealing operation every 2sec. - pub fn start(&self, engine: Weak>) { - let shutdown = self.shutdown.clone(); - - let thr = thread::Builder::new() - .name("CliqueStepService".into()) - .spawn(move || { - // startup delay. - thread::sleep(Duration::from_secs(5)); - - loop { - // see if we are in shutdown. - if shutdown.load(Ordering::Acquire) { - trace!(target: "shutdown", "CliqueStepService: received shutdown signal!"); - break; - } - - trace!(target: "miner", "CliqueStepService: triggering sealing"); - - // Try sealing - engine.upgrade().map(|x| x.step()); - - // Yield - thread::sleep(Duration::from_millis(2000)); - } - trace!(target: "shutdown", "CliqueStepService: exited loop, shutdown."); - }).expect("CliqueStepService thread failed"); - - *self.thread.write() = Some(thr); - } - - /// Stop the `StepService` - pub fn stop(&self) { - trace!(target: "shutdown", "CliqueStepService: signalling shutting to stepping thread."); - self.shutdown.store(true, Ordering::Release); - if let Some(t) = self.thread.write().take() { - t.join().expect("CliqueStepService thread panicked!"); - } - } -} diff --git a/ethcore/src/engines/mod.rs b/ethcore/src/engines/mod.rs index ec0e5b020..d51e31ea1 100644 --- a/ethcore/src/engines/mod.rs +++ b/ethcore/src/engines/mod.rs @@ -437,9 +437,6 @@ pub trait Engine: Sync + Send { /// Trigger next step of the consensus engine. fn step(&self) {} - /// Stops any services that the may hold the Engine and makes it safe to drop. - fn stop(&mut self) {} - /// Create a factory for building snapshot chunks and restoring from them. /// Returning `None` indicates that this engine doesn't support snapshot creation. fn snapshot_components(&self) -> Option> {