Fix compiler warning (that will become an error) (#10683)
* Remove annoying compiler warnings * Fix compiler warning (that will become an error) Fixes https://github.com/paritytech/parity-ethereum/issues/10648 I'm not sure this fix is Good™ but rustc seems happy enough. There's a deeper issue which may or may not be related to this: the Engine is not shutdown properly and the `StepService` thread keeps running indefinitely after Ctrl-C (so `update_sealing()` is called repeatedly for 300sec). I don't think this is related to Clique as I've seen this happen on mainnet as well, but I wonder if the effects of it are worse for a PoA network where the node can create new blocks all on its own? * Use saturating_sub * WIP * Fix warning, second attempt The idea here is to avoid using `Arc::get_mut()` (which does not work: fails every time here) and instead trust `drop()` to do the right thing. This is a conservative change. I think this can be reformed further, e.g. by `impl Drop for StepService` and halt the thread there, or even skip `join()`ing the thread entirely and trust the `AtomicBool` to signal shutdown. I also have doubts abut the `Option<StepService>`: seems a bit much to have an `Option` there and it makes things cumbersome.
This commit is contained in:
parent
d250f348a3
commit
752031a657
@ -2545,16 +2545,6 @@ impl ProvingBlockChainClient for Client {
|
|||||||
|
|
||||||
impl SnapshotClient for Client {}
|
impl SnapshotClient for Client {}
|
||||||
|
|
||||||
impl Drop for Client {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
if let Some(c) = Arc::get_mut(&mut self.engine) {
|
|
||||||
c.stop()
|
|
||||||
} else {
|
|
||||||
warn!(target: "shutdown", "unable to get mut ref for engine for shutdown.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns `LocalizedReceipt` given `LocalizedTransaction`
|
/// Returns `LocalizedReceipt` given `LocalizedTransaction`
|
||||||
/// and a vector of receipts from given block up to transaction index.
|
/// and a vector of receipts from given block up to transaction index.
|
||||||
fn transaction_receipt(
|
fn transaction_receipt(
|
||||||
|
@ -168,7 +168,7 @@ pub struct Clique {
|
|||||||
block_state_by_hash: RwLock<LruCache<H256, CliqueBlockState>>,
|
block_state_by_hash: RwLock<LruCache<H256, CliqueBlockState>>,
|
||||||
proposals: RwLock<HashMap<Address, VoteType>>,
|
proposals: RwLock<HashMap<Address, VoteType>>,
|
||||||
signer: RwLock<Option<Box<EngineSigner>>>,
|
signer: RwLock<Option<Box<EngineSigner>>>,
|
||||||
step_service: Option<Arc<StepService>>,
|
step_service: Option<StepService>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@ -181,15 +181,15 @@ pub struct Clique {
|
|||||||
pub block_state_by_hash: RwLock<LruCache<H256, CliqueBlockState>>,
|
pub block_state_by_hash: RwLock<LruCache<H256, CliqueBlockState>>,
|
||||||
pub proposals: RwLock<HashMap<Address, VoteType>>,
|
pub proposals: RwLock<HashMap<Address, VoteType>>,
|
||||||
pub signer: RwLock<Option<Box<EngineSigner>>>,
|
pub signer: RwLock<Option<Box<EngineSigner>>>,
|
||||||
pub step_service: Option<Arc<StepService>>,
|
pub step_service: Option<StepService>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Clique {
|
impl Clique {
|
||||||
/// Initialize Clique engine from empty state.
|
/// Initialize Clique engine from empty state.
|
||||||
pub fn new(our_params: CliqueParams, machine: EthereumMachine) -> Result<Arc<Self>, Error> {
|
pub fn new(params: CliqueParams, machine: EthereumMachine) -> Result<Arc<Self>, Error> {
|
||||||
let mut engine = Clique {
|
let mut engine = Clique {
|
||||||
epoch_length: our_params.epoch,
|
epoch_length: params.epoch,
|
||||||
period: our_params.period,
|
period: params.period,
|
||||||
client: Default::default(),
|
client: Default::default(),
|
||||||
block_state_by_hash: RwLock::new(LruCache::new(STATE_CACHE_NUM)),
|
block_state_by_hash: RwLock::new(LruCache::new(STATE_CACHE_NUM)),
|
||||||
proposals: Default::default(),
|
proposals: Default::default(),
|
||||||
@ -197,14 +197,17 @@ impl Clique {
|
|||||||
machine,
|
machine,
|
||||||
step_service: None,
|
step_service: None,
|
||||||
};
|
};
|
||||||
|
if params.period > 0 {
|
||||||
let res = Arc::new(engine);
|
engine.step_service = Some(StepService::new());
|
||||||
|
let engine = Arc::new(engine);
|
||||||
if our_params.period > 0 {
|
let weak_eng = Arc::downgrade(&engine);
|
||||||
engine.step_service = Some(StepService::start(Arc::downgrade(&res) as Weak<Engine<_>>));
|
if let Some(step_service) = &engine.step_service {
|
||||||
|
step_service.start(weak_eng);
|
||||||
|
}
|
||||||
|
Ok(engine)
|
||||||
|
} else {
|
||||||
|
Ok(Arc::new(engine))
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(res)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@ -345,6 +348,15 @@ impl Clique {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Drop for Clique {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Some(step_service) = &self.step_service {
|
||||||
|
trace!(target: "shutdown", "Clique; stopping step service");
|
||||||
|
step_service.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Engine<EthereumMachine> for Clique {
|
impl Engine<EthereumMachine> for Clique {
|
||||||
fn name(&self) -> &str { "Clique" }
|
fn name(&self) -> &str { "Clique" }
|
||||||
|
|
||||||
@ -695,7 +707,7 @@ impl Engine<EthereumMachine> for Clique {
|
|||||||
trace!(target: "engine", "populate_from_parent in sealing");
|
trace!(target: "engine", "populate_from_parent in sealing");
|
||||||
|
|
||||||
// It's unclear how to prevent creating new blocks unless we are authorized, the best way (and geth does this too)
|
// It's unclear how to prevent creating new blocks unless we are authorized, the best way (and geth does this too)
|
||||||
// it's just to ignore setting an correct difficulty here, we will check authorization in next step in generate_seal anyway.
|
// it's just to ignore setting a correct difficulty here, we will check authorization in next step in generate_seal anyway.
|
||||||
if let Some(signer) = self.signer.read().as_ref() {
|
if let Some(signer) = self.signer.read().as_ref() {
|
||||||
let state = match self.state(&parent) {
|
let state = match self.state(&parent) {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@ -744,14 +756,6 @@ impl Engine<EthereumMachine> for Clique {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stop(&mut self) {
|
|
||||||
if let Some(mut s) = self.step_service.as_mut() {
|
|
||||||
Arc::get_mut(&mut s).map(|x| x.stop());
|
|
||||||
} else {
|
|
||||||
warn!(target: "engine", "Stopping `CliqueStepService` failed requires mutable access");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Clique timestamp is set to parent + period , or current time which ever is higher.
|
/// Clique timestamp is set to parent + period , or current time which ever is higher.
|
||||||
fn open_block_header_timestamp(&self, parent_timestamp: u64) -> u64 {
|
fn open_block_header_timestamp(&self, parent_timestamp: u64) -> u64 {
|
||||||
let now = time::SystemTime::now().duration_since(time::UNIX_EPOCH).unwrap_or_default();
|
let now = time::SystemTime::now().duration_since(time::UNIX_EPOCH).unwrap_or_default();
|
||||||
|
@ -20,6 +20,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use parking_lot::RwLock;
|
||||||
|
|
||||||
use engines::Engine;
|
use engines::Engine;
|
||||||
use machine::Machine;
|
use machine::Machine;
|
||||||
@ -27,16 +28,21 @@ use machine::Machine;
|
|||||||
/// Service that is managing the engine
|
/// Service that is managing the engine
|
||||||
pub struct StepService {
|
pub struct StepService {
|
||||||
shutdown: Arc<AtomicBool>,
|
shutdown: Arc<AtomicBool>,
|
||||||
thread: Option<thread::JoinHandle<()>>,
|
thread: RwLock<Option<thread::JoinHandle<()>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StepService {
|
impl StepService {
|
||||||
/// Start the `StepService`
|
/// Create a new StepService without spawning a sealing thread.
|
||||||
pub fn start<M: Machine + 'static>(engine: Weak<Engine<M>>) -> Arc<Self> {
|
pub fn new() -> Self {
|
||||||
let shutdown = Arc::new(AtomicBool::new(false));
|
let shutdown = Arc::new(AtomicBool::new(false));
|
||||||
let s = shutdown.clone();
|
StepService { shutdown, thread: RwLock::new(None) }
|
||||||
|
}
|
||||||
|
|
||||||
let thread = thread::Builder::new()
|
/// Start the StepService: spawns a thread that loops and triggers a sealing operation every 2sec.
|
||||||
|
pub fn start<M: Machine + 'static>(&self, engine: Weak<Engine<M>>) {
|
||||||
|
let shutdown = self.shutdown.clone();
|
||||||
|
|
||||||
|
let thr = thread::Builder::new()
|
||||||
.name("CliqueStepService".into())
|
.name("CliqueStepService".into())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
// startup delay.
|
// startup delay.
|
||||||
@ -45,7 +51,7 @@ impl StepService {
|
|||||||
loop {
|
loop {
|
||||||
// see if we are in shutdown.
|
// see if we are in shutdown.
|
||||||
if shutdown.load(Ordering::Acquire) {
|
if shutdown.load(Ordering::Acquire) {
|
||||||
trace!(target: "miner", "CliqueStepService: received shutdown signal!");
|
trace!(target: "shutdown", "CliqueStepService: received shutdown signal!");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -57,20 +63,17 @@ impl StepService {
|
|||||||
// Yield
|
// Yield
|
||||||
thread::sleep(Duration::from_millis(2000));
|
thread::sleep(Duration::from_millis(2000));
|
||||||
}
|
}
|
||||||
trace!(target: "miner", "CliqueStepService: shutdown.");
|
trace!(target: "shutdown", "CliqueStepService: exited loop, shutdown.");
|
||||||
}).expect("CliqueStepService thread failed");
|
}).expect("CliqueStepService thread failed");
|
||||||
|
|
||||||
Arc::new(StepService {
|
*self.thread.write() = Some(thr);
|
||||||
shutdown: s,
|
|
||||||
thread: Some(thread),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stop the `StepService`
|
/// Stop the `StepService`
|
||||||
pub fn stop(&mut self) {
|
pub fn stop(&self) {
|
||||||
trace!(target: "miner", "CliqueStepService: shutting down.");
|
trace!(target: "shutdown", "CliqueStepService: signalling shutting to stepping thread.");
|
||||||
self.shutdown.store(true, Ordering::Release);
|
self.shutdown.store(true, Ordering::Release);
|
||||||
if let Some(t) = self.thread.take() {
|
if let Some(t) = self.thread.write().take() {
|
||||||
t.join().expect("CliqueStepService thread panicked!");
|
t.join().expect("CliqueStepService thread panicked!");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user