Freeze pruning while creating snapshot (#205)

* Freeze pruning while creating snapshot
* Use scopeguard for snapshot generation
* Snapshot 1k blocks
* Snapshot number correction
This commit is contained in:
rakita 2021-03-09 09:47:49 +01:00 committed by GitHub
parent 6f50061f0c
commit 7ea5707904
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 99 additions and 67 deletions

11
Cargo.lock generated
View File

@ -572,7 +572,7 @@ dependencies = [
"crossbeam-utils 0.6.6", "crossbeam-utils 0.6.6",
"lazy_static", "lazy_static",
"memoffset", "memoffset",
"scopeguard 1.0.0", "scopeguard 1.1.0",
] ]
[[package]] [[package]]
@ -1014,6 +1014,7 @@ dependencies = [
"rlp_compress", "rlp_compress",
"rlp_derive", "rlp_derive",
"rustc-hex 1.0.0", "rustc-hex 1.0.0",
"scopeguard 1.1.0",
"serde", "serde",
"serde_derive", "serde_derive",
"serde_json", "serde_json",
@ -1871,7 +1872,7 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3753954f7bd71f0e671afb8b5a992d1724cf43b7f95a563cd4a0bde94659ca8" checksum = "a3753954f7bd71f0e671afb8b5a992d1724cf43b7f95a563cd4a0bde94659ca8"
dependencies = [ dependencies = [
"scopeguard 1.0.0", "scopeguard 1.1.0",
"winapi 0.3.8", "winapi 0.3.8",
] ]
@ -2436,7 +2437,7 @@ version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4da24a77a3d8a6d4862d95f72e6fdb9c09a643ecdb402d754004a557f2bec75" checksum = "c4da24a77a3d8a6d4862d95f72e6fdb9c09a643ecdb402d754004a557f2bec75"
dependencies = [ dependencies = [
"scopeguard 1.0.0", "scopeguard 1.1.0",
] ]
[[package]] [[package]]
@ -4126,9 +4127,9 @@ checksum = "94258f53601af11e6a49f722422f6e3425c52b06245a5cf9bc09908b174f5e27"
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"
version = "1.0.0" version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b42e15e59b18a828bbf5c58ea01debb36b9b096346de35d941dcb89009f24a0d" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]] [[package]]
name = "scrypt" name = "scrypt"

View File

@ -60,11 +60,11 @@ use signer;
use sync::{self, SyncConfig}; use sync::{self, SyncConfig};
use user_defaults::UserDefaults; use user_defaults::UserDefaults;
// how often to take periodic snapshots. // How often we attempt to take a snapshot: only snapshot on blocknumbers that are multiples of this.
const SNAPSHOT_PERIOD: u64 = 5000; const SNAPSHOT_PERIOD: u64 = 20000;
// how many blocks to wait before starting a periodic snapshot. // Start snapshoting from `tip`-`history, with this we want to bypass reorgs. Should be smaller than prunning history.
const SNAPSHOT_HISTORY: u64 = 100; const SNAPSHOT_HISTORY: u64 = 50;
// Full client number of DNS threads // Full client number of DNS threads
const FETCH_FULL_NUM_DNS_THREADS: usize = 4; const FETCH_FULL_NUM_DNS_THREADS: usize = 4;

View File

@ -74,6 +74,7 @@ using_queue = { path = "../concensus/miner/using-queue" }
vm = { path = "../vm/vm" } vm = { path = "../vm/vm" }
walkdir = "2.3" walkdir = "2.3"
wasm = { path = "../vm/wasm" } wasm = { path = "../vm/wasm" }
scopeguard = "1.1.0"
[dev-dependencies] [dev-dependencies]
blooms-db = { path = "../db/blooms-db" } blooms-db = { path = "../db/blooms-db" }

View File

@ -21,7 +21,7 @@ use std::{
io::{BufRead, BufReader}, io::{BufRead, BufReader},
str::{from_utf8, FromStr}, str::{from_utf8, FromStr},
sync::{ sync::{
atomic::{AtomicBool, AtomicI64, Ordering as AtomicOrdering}, atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering as AtomicOrdering},
Arc, Weak, Arc, Weak,
}, },
time::{Duration, Instant}, time::{Duration, Instant},
@ -209,6 +209,9 @@ pub struct Client {
/// Database pruning strategy to use for StateDB /// Database pruning strategy to use for StateDB
pruning: journaldb::Algorithm, pruning: journaldb::Algorithm,
/// Don't prune the state we're currently snapshotting
snapshotting_at: AtomicU64,
/// Client uses this to store blocks, traces, etc. /// Client uses this to store blocks, traces, etc.
db: RwLock<Arc<dyn BlockChainDB>>, db: RwLock<Arc<dyn BlockChainDB>>,
@ -946,6 +949,7 @@ impl Client {
tracedb, tracedb,
engine, engine,
pruning: config.pruning.clone(), pruning: config.pruning.clone(),
snapshotting_at: AtomicU64::new(0),
db: RwLock::new(db.clone()), db: RwLock::new(db.clone()),
state_db: RwLock::new(state_db), state_db: RwLock::new(state_db),
report: RwLock::new(Default::default()), report: RwLock::new(Default::default()),
@ -1137,7 +1141,7 @@ impl Client {
mut state_db: StateDB, mut state_db: StateDB,
chain: &BlockChain, chain: &BlockChain,
) -> Result<(), ::error::Error> { ) -> Result<(), ::error::Error> {
let number = match state_db.journal_db().latest_era() { let latest_era = match state_db.journal_db().latest_era() {
Some(n) => n, Some(n) => n,
None => return Ok(()), None => return Ok(()),
}; };
@ -1152,16 +1156,27 @@ impl Client {
break; break;
} }
match state_db.journal_db().earliest_era() { match state_db.journal_db().earliest_era() {
Some(era) if era + self.history <= number => { Some(earliest_era) if earliest_era + self.history <= latest_era => {
trace!(target: "client", "Pruning state for ancient era {}", era); let freeze_at = self.snapshotting_at.load(AtomicOrdering::SeqCst);
match chain.block_hash(era) { if freeze_at > 0 && freeze_at == earliest_era {
// Note: journal_db().mem_used() can be used for a more accurate memory
// consumption measurement but it can be expensive so sticking with the
// faster `journal_size()` instead.
trace!(target: "pruning", "Pruning is paused at era {} (snapshot under way); earliest era={}, latest era={}, journal_size={} Not pruning.",
freeze_at, earliest_era, latest_era, state_db.journal_db().journal_size());
break;
}
trace!(target: "client", "Pruning state for ancient era {}", earliest_era);
match chain.block_hash(earliest_era) {
Some(ancient_hash) => { Some(ancient_hash) => {
let mut batch = DBTransaction::new(); let mut batch = DBTransaction::new();
state_db.mark_canonical(&mut batch, era, &ancient_hash)?; state_db.mark_canonical(&mut batch, earliest_era, &ancient_hash)?;
self.db.read().key_value().write_buffered(batch); self.db.read().key_value().write_buffered(batch);
state_db.journal_db().flush(); state_db.journal_db().flush();
} }
None => debug!(target: "client", "Missing expected hash for block {}", era), None => {
debug!(target: "client", "Missing expected hash for block {}", earliest_era)
}
} }
} }
_ => break, // means that every era is kept, no pruning necessary. _ => break, // means that every era is kept, no pruning necessary.
@ -1349,7 +1364,6 @@ impl Client {
p: &snapshot::Progress, p: &snapshot::Progress,
) -> Result<(), EthcoreError> { ) -> Result<(), EthcoreError> {
let db = self.state_db.read().journal_db().boxed_clone(); let db = self.state_db.read().journal_db().boxed_clone();
let best_block_number = self.chain_info().best_block_number;
let block_number = self let block_number = self
.block_number(at) .block_number(at)
.ok_or_else(|| snapshot::Error::InvalidStartingBlock(at))?; .ok_or_else(|| snapshot::Error::InvalidStartingBlock(at))?;
@ -1360,19 +1374,23 @@ impl Client {
let history = ::std::cmp::min(self.history, 1000); let history = ::std::cmp::min(self.history, 1000);
let start_hash = match at { let (snapshot_block_number, start_hash) = match at {
BlockId::Latest => { BlockId::Latest => {
let best_block_number = self.chain_info().best_block_number;
let start_num = match db.earliest_era() { let start_num = match db.earliest_era() {
Some(era) => ::std::cmp::max(era, best_block_number.saturating_sub(history)), Some(era) => ::std::cmp::max(era, best_block_number.saturating_sub(history)),
None => best_block_number.saturating_sub(history), None => best_block_number.saturating_sub(history),
}; };
self.block_hash(BlockId::Number(start_num)) match self.block_hash(BlockId::Number(start_num)) {
.ok_or_else(|| snapshot::Error::InvalidStartingBlock(at))? Some(h) => (start_num, h),
None => return Err(snapshot::Error::InvalidStartingBlock(at).into()),
}
} }
_ => self _ => match self.block_hash(at) {
.block_hash(at) Some(hash) => (block_number, hash),
.ok_or_else(|| snapshot::Error::InvalidStartingBlock(at))?, None => return Err(snapshot::Error::InvalidStartingBlock(at).into()),
},
}; };
let processing_threads = self.config.snapshot.processing_threads; let processing_threads = self.config.snapshot.processing_threads;
@ -1380,15 +1398,23 @@ impl Client {
.engine .engine
.snapshot_components() .snapshot_components()
.ok_or(snapshot::Error::SnapshotsUnsupported)?; .ok_or(snapshot::Error::SnapshotsUnsupported)?;
snapshot::take_snapshot( self.snapshotting_at
chunker, .store(snapshot_block_number, AtomicOrdering::SeqCst);
&self.chain.read(), {
start_hash, scopeguard::defer! {{
db.as_hash_db(), info!(target: "snapshot", "Re-enabling pruning.");
writer, self.snapshotting_at.store(0, AtomicOrdering::SeqCst)
p, }};
processing_threads, snapshot::take_snapshot(
)?; chunker,
&self.chain.read(),
start_hash,
db.as_hash_db(),
writer,
p,
processing_threads,
)?;
}
Ok(()) Ok(())
} }

View File

@ -38,9 +38,9 @@ use machine::EthereumMachine;
/// Number of blocks in an ethash snapshot. /// Number of blocks in an ethash snapshot.
// make dependent on difficulty incrment divisor? // make dependent on difficulty incrment divisor?
const SNAPSHOT_BLOCKS: u64 = 5000; const SNAPSHOT_BLOCKS: u64 = 1000;
/// Maximum number of blocks allowed in an ethash snapshot. /// Maximum number of blocks allowed in an ethash snapshot.
const MAX_SNAPSHOT_BLOCKS: u64 = 30000; const MAX_SNAPSHOT_BLOCKS: u64 = 10000;
/// Ethash specific seal /// Ethash specific seal
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]

View File

@ -530,45 +530,49 @@ impl Service {
.store(num as usize, Ordering::SeqCst); .store(num as usize, Ordering::SeqCst);
info!("Taking snapshot at #{}", num); info!("Taking snapshot at #{}", num);
self.progress.reset(); {
scopeguard::defer! {{
self.taking_snapshot.store(false, Ordering::SeqCst);
}}
self.progress.reset();
let temp_dir = self.temp_snapshot_dir(); let temp_dir = self.temp_snapshot_dir();
let snapshot_dir = self.snapshot_dir(); let snapshot_dir = self.snapshot_dir();
let _ = fs::remove_dir_all(&temp_dir); let _ = fs::remove_dir_all(&temp_dir);
let writer = LooseWriter::new(temp_dir.clone())?; let writer = LooseWriter::new(temp_dir.clone())?;
let guard = Guard::new(temp_dir.clone()); let guard = Guard::new(temp_dir.clone());
let res = client.take_snapshot(writer, BlockId::Number(num), &self.progress); let res = client.take_snapshot(writer, BlockId::Number(num), &self.progress);
self.taking_snapshot.store(false, Ordering::SeqCst); if let Err(e) = res {
if let Err(e) = res { if client.chain_info().best_block_number >= num + client.pruning_history() {
if client.chain_info().best_block_number >= num + client.pruning_history() { // The state we were snapshotting was pruned before we could finish.
// The state we were snapshotting was pruned before we could finish. info!("Periodic snapshot failed: block state pruned. Run with a longer `--pruning-history` or with `--no-periodic-snapshot`");
info!("Periodic snapshot failed: block state pruned. Run with a longer `--pruning-history` or with `--no-periodic-snapshot`"); return Err(e);
return Err(e); } else {
} else { return Err(e);
return Err(e); }
} }
info!("Finished taking snapshot at #{}", num);
let mut reader = self.reader.write();
// destroy the old snapshot reader.
*reader = None;
if snapshot_dir.exists() {
fs::remove_dir_all(&snapshot_dir)?;
}
fs::rename(temp_dir, &snapshot_dir)?;
*reader = Some(LooseReader::new(snapshot_dir)?);
guard.disarm();
Ok(())
} }
info!("Finished taking snapshot at #{}", num);
let mut reader = self.reader.write();
// destroy the old snapshot reader.
*reader = None;
if snapshot_dir.exists() {
fs::remove_dir_all(&snapshot_dir)?;
}
fs::rename(temp_dir, &snapshot_dir)?;
*reader = Some(LooseReader::new(snapshot_dir)?);
guard.disarm();
Ok(())
} }
/// Initialize the restoration synchronously. /// Initialize the restoration synchronously.