diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 53407d609..fa3db14d2 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -2545,16 +2545,6 @@ impl ProvingBlockChainClient for Client { impl SnapshotClient for Client {} -impl Drop for Client { - fn drop(&mut self) { - if let Some(c) = Arc::get_mut(&mut self.engine) { - c.stop() - } else { - warn!(target: "shutdown", "unable to get mut ref for engine for shutdown."); - } - } -} - /// Returns `LocalizedReceipt` given `LocalizedTransaction` /// and a vector of receipts from given block up to transaction index. fn transaction_receipt( diff --git a/ethcore/src/engines/clique/mod.rs b/ethcore/src/engines/clique/mod.rs index 38a27e58d..fff6cb3ab 100644 --- a/ethcore/src/engines/clique/mod.rs +++ b/ethcore/src/engines/clique/mod.rs @@ -168,7 +168,7 @@ pub struct Clique { block_state_by_hash: RwLock>, proposals: RwLock>, signer: RwLock>>, - step_service: Option>, + step_service: Option, } #[cfg(test)] @@ -181,15 +181,15 @@ pub struct Clique { pub block_state_by_hash: RwLock>, pub proposals: RwLock>, pub signer: RwLock>>, - pub step_service: Option>, + pub step_service: Option, } impl Clique { /// Initialize Clique engine from empty state. - pub fn new(our_params: CliqueParams, machine: EthereumMachine) -> Result, Error> { + pub fn new(params: CliqueParams, machine: EthereumMachine) -> Result, Error> { let mut engine = Clique { - epoch_length: our_params.epoch, - period: our_params.period, + epoch_length: params.epoch, + period: params.period, client: Default::default(), block_state_by_hash: RwLock::new(LruCache::new(STATE_CACHE_NUM)), proposals: Default::default(), @@ -197,14 +197,17 @@ impl Clique { machine, step_service: None, }; - - let res = Arc::new(engine); - - if our_params.period > 0 { - engine.step_service = Some(StepService::start(Arc::downgrade(&res) as Weak>)); + if params.period > 0 { + engine.step_service = Some(StepService::new()); + let engine = Arc::new(engine); + let weak_eng = Arc::downgrade(&engine); + if let Some(step_service) = &engine.step_service { + step_service.start(weak_eng); + } + Ok(engine) + } else { + Ok(Arc::new(engine)) } - - Ok(res) } #[cfg(test)] @@ -345,6 +348,15 @@ impl Clique { } } +impl Drop for Clique { + fn drop(&mut self) { + if let Some(step_service) = &self.step_service { + trace!(target: "shutdown", "Clique; stopping step service"); + step_service.stop(); + } + } +} + impl Engine for Clique { fn name(&self) -> &str { "Clique" } @@ -695,7 +707,7 @@ impl Engine for Clique { trace!(target: "engine", "populate_from_parent in sealing"); // It's unclear how to prevent creating new blocks unless we are authorized, the best way (and geth does this too) - // it's just to ignore setting an correct difficulty here, we will check authorization in next step in generate_seal anyway. + // it's just to ignore setting a correct difficulty here, we will check authorization in next step in generate_seal anyway. if let Some(signer) = self.signer.read().as_ref() { let state = match self.state(&parent) { Err(e) => { @@ -744,14 +756,6 @@ impl Engine for Clique { } } - fn stop(&mut self) { - if let Some(mut s) = self.step_service.as_mut() { - Arc::get_mut(&mut s).map(|x| x.stop()); - } else { - warn!(target: "engine", "Stopping `CliqueStepService` failed requires mutable access"); - } - } - /// Clique timestamp is set to parent + period , or current time which ever is higher. fn open_block_header_timestamp(&self, parent_timestamp: u64) -> u64 { let now = time::SystemTime::now().duration_since(time::UNIX_EPOCH).unwrap_or_default(); diff --git a/ethcore/src/engines/clique/step_service.rs b/ethcore/src/engines/clique/step_service.rs index 7a4b5269d..a7c977953 100644 --- a/ethcore/src/engines/clique/step_service.rs +++ b/ethcore/src/engines/clique/step_service.rs @@ -20,6 +20,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use std::thread; use std::sync::Arc; +use parking_lot::RwLock; use engines::Engine; use machine::Machine; @@ -27,16 +28,21 @@ use machine::Machine; /// Service that is managing the engine pub struct StepService { shutdown: Arc, - thread: Option>, + thread: RwLock>>, } impl StepService { - /// Start the `StepService` - pub fn start(engine: Weak>) -> Arc { + /// Create a new StepService without spawning a sealing thread. + pub fn new() -> Self { let shutdown = Arc::new(AtomicBool::new(false)); - let s = shutdown.clone(); + StepService { shutdown, thread: RwLock::new(None) } + } - let thread = thread::Builder::new() + /// Start the StepService: spawns a thread that loops and triggers a sealing operation every 2sec. + pub fn start(&self, engine: Weak>) { + let shutdown = self.shutdown.clone(); + + let thr = thread::Builder::new() .name("CliqueStepService".into()) .spawn(move || { // startup delay. @@ -45,8 +51,8 @@ impl StepService { loop { // see if we are in shutdown. if shutdown.load(Ordering::Acquire) { - trace!(target: "miner", "CliqueStepService: received shutdown signal!"); - break; + trace!(target: "shutdown", "CliqueStepService: received shutdown signal!"); + break; } trace!(target: "miner", "CliqueStepService: triggering sealing"); @@ -57,20 +63,17 @@ impl StepService { // Yield thread::sleep(Duration::from_millis(2000)); } - trace!(target: "miner", "CliqueStepService: shutdown."); + trace!(target: "shutdown", "CliqueStepService: exited loop, shutdown."); }).expect("CliqueStepService thread failed"); - Arc::new(StepService { - shutdown: s, - thread: Some(thread), - }) + *self.thread.write() = Some(thr); } /// Stop the `StepService` - pub fn stop(&mut self) { - trace!(target: "miner", "CliqueStepService: shutting down."); + pub fn stop(&self) { + trace!(target: "shutdown", "CliqueStepService: signalling shutting to stepping thread."); self.shutdown.store(true, Ordering::Release); - if let Some(t) = self.thread.take() { + if let Some(t) = self.thread.write().take() { t.join().expect("CliqueStepService thread panicked!"); } }