From 541b14a4abf06d4a9a54869bc09b3a325128e0d8 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 7 Sep 2016 15:27:14 +0200 Subject: [PATCH] periodic snapshot tweaks (#2054) * periodic snapshot tweaks * set SNAPSHOT_HISTORY to 500 --- ethcore/src/snapshot/service.rs | 16 +++++++++++++--- ethcore/src/snapshot/watcher.rs | 29 ++++++++++++++++++++--------- parity/run.rs | 6 ++++-- 3 files changed, 37 insertions(+), 14 deletions(-) diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index ce34a0def..4dbbaa1d4 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -21,7 +21,7 @@ use std::io::ErrorKind; use std::fs; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use super::{ManifestData, StateRebuilder, BlockRebuilder, RestorationStatus, SnapshotService}; use super::io::{SnapshotReader, LooseReader, SnapshotWriter, LooseWriter}; @@ -192,6 +192,7 @@ pub struct Service { block_chunks: AtomicUsize, db_restore: Arc, progress: super::Progress, + taking_snapshot: AtomicBool, } impl Service { @@ -222,6 +223,7 @@ impl Service { block_chunks: AtomicUsize::new(0), db_restore: db_restore, progress: Default::default(), + taking_snapshot: AtomicBool::new(false), }; // create the root snapshot dir if it doesn't exist. @@ -302,7 +304,7 @@ impl Service { /// Tick the snapshot service. This will log any active snapshot /// being taken. pub fn tick(&self) { - if self.progress.done() { return } + if self.progress.done() || !self.taking_snapshot.load(Ordering::SeqCst) { return } let p = &self.progress; info!("Snapshot: {} accounts {} blocks {} bytes", p.accounts(), p.blocks(), p.size()); @@ -313,6 +315,11 @@ impl Service { /// will lead to a race condition where the first one to finish will /// have their produced snapshot overwritten. pub fn take_snapshot(&self, client: &Client, num: u64) -> Result<(), Error> { + if self.taking_snapshot.compare_and_swap(false, true, Ordering::SeqCst) { + info!("Skipping snapshot at #{} as another one is currently in-progress.", num); + return Ok(()); + } + info!("Taking snapshot at #{}", num); self.progress.reset(); @@ -324,7 +331,10 @@ impl Service { let writer = try!(LooseWriter::new(temp_dir.clone())); let guard = Guard::new(temp_dir.clone()); - try!(client.take_snapshot(writer, BlockID::Number(num), &self.progress)); + let res = client.take_snapshot(writer, BlockID::Number(num), &self.progress); + + self.taking_snapshot.store(false, Ordering::SeqCst); + try!(res); info!("Finished taking snapshot at #{}", num); diff --git a/ethcore/src/snapshot/watcher.rs b/ethcore/src/snapshot/watcher.rs index 8f9d3833b..65f47efc8 100644 --- a/ethcore/src/snapshot/watcher.rs +++ b/ethcore/src/snapshot/watcher.rs @@ -33,15 +33,22 @@ trait Oracle: Send + Sync { fn is_major_syncing(&self) -> bool; } -impl Oracle for Client { +struct StandardOracle where F: 'static + Send + Sync + Fn() -> bool { + client: Arc, + sync_status: F, +} + +impl Oracle for StandardOracle + where F: Send + Sync + Fn() -> bool +{ fn to_number(&self, hash: H256) -> Option { - self.block_header(BlockID::Hash(hash)).map(|h| HeaderView::new(&h).number()) + self.client.block_header(BlockID::Hash(hash)).map(|h| HeaderView::new(&h).number()) } fn is_major_syncing(&self) -> bool { - let queue_info = self.queue_info(); + let queue_info = self.client.queue_info(); - queue_info.unverified_queue_size + queue_info.verified_queue_size > 3 + (self.sync_status)() || queue_info.unverified_queue_size + queue_info.verified_queue_size > 3 } } @@ -68,7 +75,7 @@ impl Broadcast for IoChannel { /// A `ChainNotify` implementation which will trigger a snapshot event /// at certain block numbers. pub struct Watcher { - oracle: Arc, + oracle: Box, broadcast: Box, period: u64, history: u64, @@ -78,9 +85,14 @@ impl Watcher { /// Create a new `Watcher` which will trigger a snapshot event /// once every `period` blocks, but only after that block is /// `history` blocks old. - pub fn new(client: Arc, channel: IoChannel, period: u64, history: u64) -> Self { + pub fn new(client: Arc, sync_status: F, channel: IoChannel, period: u64, history: u64) -> Self + where F: 'static + Send + Sync + Fn() -> bool + { Watcher { - oracle: client, + oracle: Box::new(StandardOracle { + client: client, + sync_status: sync_status, + }), broadcast: Box::new(channel), period: period, history: history, @@ -125,7 +137,6 @@ mod tests { use util::{H256, U256}; use std::collections::HashMap; - use std::sync::Arc; struct TestOracle(HashMap); @@ -152,7 +163,7 @@ mod tests { let map = hashes.clone().into_iter().zip(numbers).collect(); let watcher = Watcher { - oracle: Arc::new(TestOracle(map)), + oracle: Box::new(TestOracle(map)), broadcast: Box::new(TestBroadcast(expected)), period: period, history: history, diff --git a/parity/run.rs b/parity/run.rs index aaae33b60..c6571b9b2 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -29,7 +29,7 @@ use ethcore::service::ClientService; use ethcore::account_provider::AccountProvider; use ethcore::miner::{Miner, MinerService, ExternalMiner, MinerOptions}; use ethcore::snapshot; -use ethsync::SyncConfig; +use ethsync::{SyncConfig, SyncProvider}; use informant::Informant; use rpc::{HttpServer, IpcServer, HttpConfiguration, IpcConfiguration}; @@ -51,7 +51,7 @@ use url; const SNAPSHOT_PERIOD: u64 = 10000; // how many blocks to wait before starting a periodic snapshot. -const SNAPSHOT_HISTORY: u64 = 1000; +const SNAPSHOT_HISTORY: u64 = 500; #[derive(Debug, PartialEq)] pub struct RunCmd { @@ -263,8 +263,10 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> { let _watcher = match cmd.no_periodic_snapshot { true => None, false => { + let sync = sync_provider.clone(); let watcher = Arc::new(snapshot::Watcher::new( service.client(), + move || sync.status().is_major_syncing(), service.io().channel(), SNAPSHOT_PERIOD, SNAPSHOT_HISTORY,