From 7ea57079041745dab9be08abad53a197931e8c07 Mon Sep 17 00:00:00 2001 From: rakita Date: Tue, 9 Mar 2021 09:47:49 +0100 Subject: [PATCH] Freeze pruning while creating snapshot (#205) * Freeze pruning while creating snapshot * Use scopeguard for snapshot generation * Snapshot 1k blocks * Snapshot number correction --- Cargo.lock | 11 ++-- bin/oe/run.rs | 8 +-- crates/ethcore/Cargo.toml | 1 + crates/ethcore/src/client/client.rs | 72 ++++++++++++++++++-------- crates/ethcore/src/ethereum/ethash.rs | 4 +- crates/ethcore/src/snapshot/service.rs | 70 +++++++++++++------------ 6 files changed, 99 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0abc8c1c8..a25e0bbf3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -572,7 +572,7 @@ dependencies = [ "crossbeam-utils 0.6.6", "lazy_static", "memoffset", - "scopeguard 1.0.0", + "scopeguard 1.1.0", ] [[package]] @@ -1014,6 +1014,7 @@ dependencies = [ "rlp_compress", "rlp_derive", "rustc-hex 1.0.0", + "scopeguard 1.1.0", "serde", "serde_derive", "serde_json", @@ -1871,7 +1872,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3753954f7bd71f0e671afb8b5a992d1724cf43b7f95a563cd4a0bde94659ca8" dependencies = [ - "scopeguard 1.0.0", + "scopeguard 1.1.0", "winapi 0.3.8", ] @@ -2436,7 +2437,7 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4da24a77a3d8a6d4862d95f72e6fdb9c09a643ecdb402d754004a557f2bec75" dependencies = [ - "scopeguard 1.0.0", + "scopeguard 1.1.0", ] [[package]] @@ -4126,9 +4127,9 @@ checksum = "94258f53601af11e6a49f722422f6e3425c52b06245a5cf9bc09908b174f5e27" [[package]] name = "scopeguard" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b42e15e59b18a828bbf5c58ea01debb36b9b096346de35d941dcb89009f24a0d" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "scrypt" diff --git a/bin/oe/run.rs b/bin/oe/run.rs index b1892aebb..d4aac267c 100644 --- a/bin/oe/run.rs +++ b/bin/oe/run.rs @@ -60,11 +60,11 @@ use signer; use sync::{self, SyncConfig}; use user_defaults::UserDefaults; -// how often to take periodic snapshots. -const SNAPSHOT_PERIOD: u64 = 5000; +// How often we attempt to take a snapshot: only snapshot on blocknumbers that are multiples of this. +const SNAPSHOT_PERIOD: u64 = 20000; -// how many blocks to wait before starting a periodic snapshot. -const SNAPSHOT_HISTORY: u64 = 100; +// Start snapshoting from `tip`-`history, with this we want to bypass reorgs. Should be smaller than prunning history. +const SNAPSHOT_HISTORY: u64 = 50; // Full client number of DNS threads const FETCH_FULL_NUM_DNS_THREADS: usize = 4; diff --git a/crates/ethcore/Cargo.toml b/crates/ethcore/Cargo.toml index 57c75a967..ab6eba7a0 100644 --- a/crates/ethcore/Cargo.toml +++ b/crates/ethcore/Cargo.toml @@ -74,6 +74,7 @@ using_queue = { path = "../concensus/miner/using-queue" } vm = { path = "../vm/vm" } walkdir = "2.3" wasm = { path = "../vm/wasm" } +scopeguard = "1.1.0" [dev-dependencies] blooms-db = { path = "../db/blooms-db" } diff --git a/crates/ethcore/src/client/client.rs b/crates/ethcore/src/client/client.rs index a9cba475f..a6c3c7a97 100644 --- a/crates/ethcore/src/client/client.rs +++ b/crates/ethcore/src/client/client.rs @@ -21,7 +21,7 @@ use std::{ io::{BufRead, BufReader}, str::{from_utf8, FromStr}, sync::{ - atomic::{AtomicBool, AtomicI64, Ordering as AtomicOrdering}, + atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering as AtomicOrdering}, Arc, Weak, }, time::{Duration, Instant}, @@ -209,6 +209,9 @@ pub struct Client { /// Database pruning strategy to use for StateDB pruning: journaldb::Algorithm, + /// Don't prune the state we're currently snapshotting + snapshotting_at: AtomicU64, + /// Client uses this to store blocks, traces, etc. db: RwLock>, @@ -946,6 +949,7 @@ impl Client { tracedb, engine, pruning: config.pruning.clone(), + snapshotting_at: AtomicU64::new(0), db: RwLock::new(db.clone()), state_db: RwLock::new(state_db), report: RwLock::new(Default::default()), @@ -1137,7 +1141,7 @@ impl Client { mut state_db: StateDB, chain: &BlockChain, ) -> Result<(), ::error::Error> { - let number = match state_db.journal_db().latest_era() { + let latest_era = match state_db.journal_db().latest_era() { Some(n) => n, None => return Ok(()), }; @@ -1152,16 +1156,27 @@ impl Client { break; } match state_db.journal_db().earliest_era() { - Some(era) if era + self.history <= number => { - trace!(target: "client", "Pruning state for ancient era {}", era); - match chain.block_hash(era) { + Some(earliest_era) if earliest_era + self.history <= latest_era => { + let freeze_at = self.snapshotting_at.load(AtomicOrdering::SeqCst); + if freeze_at > 0 && freeze_at == earliest_era { + // Note: journal_db().mem_used() can be used for a more accurate memory + // consumption measurement but it can be expensive so sticking with the + // faster `journal_size()` instead. + trace!(target: "pruning", "Pruning is paused at era {} (snapshot under way); earliest era={}, latest era={}, journal_size={} – Not pruning.", + freeze_at, earliest_era, latest_era, state_db.journal_db().journal_size()); + break; + } + trace!(target: "client", "Pruning state for ancient era {}", earliest_era); + match chain.block_hash(earliest_era) { Some(ancient_hash) => { let mut batch = DBTransaction::new(); - state_db.mark_canonical(&mut batch, era, &ancient_hash)?; + state_db.mark_canonical(&mut batch, earliest_era, &ancient_hash)?; self.db.read().key_value().write_buffered(batch); state_db.journal_db().flush(); } - None => debug!(target: "client", "Missing expected hash for block {}", era), + None => { + debug!(target: "client", "Missing expected hash for block {}", earliest_era) + } } } _ => break, // means that every era is kept, no pruning necessary. @@ -1349,7 +1364,6 @@ impl Client { p: &snapshot::Progress, ) -> Result<(), EthcoreError> { let db = self.state_db.read().journal_db().boxed_clone(); - let best_block_number = self.chain_info().best_block_number; let block_number = self .block_number(at) .ok_or_else(|| snapshot::Error::InvalidStartingBlock(at))?; @@ -1360,19 +1374,23 @@ impl Client { let history = ::std::cmp::min(self.history, 1000); - let start_hash = match at { + let (snapshot_block_number, start_hash) = match at { BlockId::Latest => { + let best_block_number = self.chain_info().best_block_number; let start_num = match db.earliest_era() { Some(era) => ::std::cmp::max(era, best_block_number.saturating_sub(history)), None => best_block_number.saturating_sub(history), }; - self.block_hash(BlockId::Number(start_num)) - .ok_or_else(|| snapshot::Error::InvalidStartingBlock(at))? + match self.block_hash(BlockId::Number(start_num)) { + Some(h) => (start_num, h), + None => return Err(snapshot::Error::InvalidStartingBlock(at).into()), + } } - _ => self - .block_hash(at) - .ok_or_else(|| snapshot::Error::InvalidStartingBlock(at))?, + _ => match self.block_hash(at) { + Some(hash) => (block_number, hash), + None => return Err(snapshot::Error::InvalidStartingBlock(at).into()), + }, }; let processing_threads = self.config.snapshot.processing_threads; @@ -1380,15 +1398,23 @@ impl Client { .engine .snapshot_components() .ok_or(snapshot::Error::SnapshotsUnsupported)?; - snapshot::take_snapshot( - chunker, - &self.chain.read(), - start_hash, - db.as_hash_db(), - writer, - p, - processing_threads, - )?; + self.snapshotting_at + .store(snapshot_block_number, AtomicOrdering::SeqCst); + { + scopeguard::defer! {{ + info!(target: "snapshot", "Re-enabling pruning."); + self.snapshotting_at.store(0, AtomicOrdering::SeqCst) + }}; + snapshot::take_snapshot( + chunker, + &self.chain.read(), + start_hash, + db.as_hash_db(), + writer, + p, + processing_threads, + )?; + } Ok(()) } diff --git a/crates/ethcore/src/ethereum/ethash.rs b/crates/ethcore/src/ethereum/ethash.rs index be7520f2d..9210d1a4d 100644 --- a/crates/ethcore/src/ethereum/ethash.rs +++ b/crates/ethcore/src/ethereum/ethash.rs @@ -38,9 +38,9 @@ use machine::EthereumMachine; /// Number of blocks in an ethash snapshot. // make dependent on difficulty incrment divisor? -const SNAPSHOT_BLOCKS: u64 = 5000; +const SNAPSHOT_BLOCKS: u64 = 1000; /// Maximum number of blocks allowed in an ethash snapshot. -const MAX_SNAPSHOT_BLOCKS: u64 = 30000; +const MAX_SNAPSHOT_BLOCKS: u64 = 10000; /// Ethash specific seal #[derive(Debug, PartialEq)] diff --git a/crates/ethcore/src/snapshot/service.rs b/crates/ethcore/src/snapshot/service.rs index 17799b2b1..c212c3d7f 100644 --- a/crates/ethcore/src/snapshot/service.rs +++ b/crates/ethcore/src/snapshot/service.rs @@ -530,45 +530,49 @@ impl Service { .store(num as usize, Ordering::SeqCst); info!("Taking snapshot at #{}", num); - self.progress.reset(); + { + scopeguard::defer! {{ + self.taking_snapshot.store(false, Ordering::SeqCst); + }} + self.progress.reset(); - let temp_dir = self.temp_snapshot_dir(); - let snapshot_dir = self.snapshot_dir(); + let temp_dir = self.temp_snapshot_dir(); + let snapshot_dir = self.snapshot_dir(); - let _ = fs::remove_dir_all(&temp_dir); + let _ = fs::remove_dir_all(&temp_dir); - let writer = LooseWriter::new(temp_dir.clone())?; + let writer = LooseWriter::new(temp_dir.clone())?; - let guard = Guard::new(temp_dir.clone()); - let res = client.take_snapshot(writer, BlockId::Number(num), &self.progress); - self.taking_snapshot.store(false, Ordering::SeqCst); - if let Err(e) = res { - if client.chain_info().best_block_number >= num + client.pruning_history() { - // The state we were snapshotting was pruned before we could finish. - info!("Periodic snapshot failed: block state pruned. Run with a longer `--pruning-history` or with `--no-periodic-snapshot`"); - return Err(e); - } else { - return Err(e); + let guard = Guard::new(temp_dir.clone()); + let res = client.take_snapshot(writer, BlockId::Number(num), &self.progress); + if let Err(e) = res { + if client.chain_info().best_block_number >= num + client.pruning_history() { + // The state we were snapshotting was pruned before we could finish. + info!("Periodic snapshot failed: block state pruned. Run with a longer `--pruning-history` or with `--no-periodic-snapshot`"); + return Err(e); + } else { + return Err(e); + } } + + info!("Finished taking snapshot at #{}", num); + + let mut reader = self.reader.write(); + + // destroy the old snapshot reader. + *reader = None; + + if snapshot_dir.exists() { + fs::remove_dir_all(&snapshot_dir)?; + } + + fs::rename(temp_dir, &snapshot_dir)?; + + *reader = Some(LooseReader::new(snapshot_dir)?); + + guard.disarm(); + Ok(()) } - - info!("Finished taking snapshot at #{}", num); - - let mut reader = self.reader.write(); - - // destroy the old snapshot reader. - *reader = None; - - if snapshot_dir.exists() { - fs::remove_dir_all(&snapshot_dir)?; - } - - fs::rename(temp_dir, &snapshot_dir)?; - - *reader = Some(LooseReader::new(snapshot_dir)?); - - guard.disarm(); - Ok(()) } /// Initialize the restoration synchronously.