Multithreaded snapshot creation (#9239)
* Add Progress to Snapshot Secondary chunks creation * Use half of CPUs to multithread snapshot creation * Use env var to define number of threads * info to debug logs * Add Snapshot threads as CLI option * Randomize chunks per thread * Remove randomness, add debugging * Add warning * Add tracing * Use parity-common fix seek branch * Fix log * Fix tests * Fix tests * PR Grumbles * PR Grumble II * Update Cargo.lock * PR Grumbles * Default snapshot threads to half number of CPUs * Fix default snapshot threads // min 1
This commit is contained in:
committed by
Afri Schoedon
parent
ef4a61c769
commit
4ddd69cc55
@@ -1150,7 +1150,8 @@ impl Client {
|
||||
},
|
||||
};
|
||||
|
||||
snapshot::take_snapshot(&*self.engine, &self.chain.read(), start_hash, db.as_hashdb(), writer, p)?;
|
||||
let processing_threads = self.config.snapshot.processing_threads;
|
||||
snapshot::take_snapshot(&*self.engine, &self.chain.read(), start_hash, db.as_hashdb(), writer, p, processing_threads)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ use std::fmt::{Display, Formatter, Error as FmtError};
|
||||
|
||||
use verification::{VerifierType, QueueConfig};
|
||||
use journaldb;
|
||||
use snapshot::SnapshotConfiguration;
|
||||
|
||||
pub use std::time::Duration;
|
||||
pub use blockchain::Config as BlockChainConfig;
|
||||
@@ -120,6 +121,8 @@ pub struct ClientConfig {
|
||||
pub check_seal: bool,
|
||||
/// Maximal number of transactions queued for verification in a separate thread.
|
||||
pub transaction_verification_queue_size: usize,
|
||||
/// Snapshot configuration
|
||||
pub snapshot: SnapshotConfiguration,
|
||||
}
|
||||
|
||||
impl Default for ClientConfig {
|
||||
@@ -144,6 +147,7 @@ impl Default for ClientConfig {
|
||||
history_mem: 32 * mb,
|
||||
check_seal: true,
|
||||
transaction_verification_queue_size: 8192,
|
||||
snapshot: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ use machine::EthereumMachine;
|
||||
use ids::BlockId;
|
||||
use header::Header;
|
||||
use receipt::Receipt;
|
||||
use snapshot::{Error, ManifestData};
|
||||
use snapshot::{Error, ManifestData, Progress};
|
||||
|
||||
use itertools::{Position, Itertools};
|
||||
use rlp::{RlpStream, Rlp};
|
||||
@@ -59,6 +59,7 @@ impl SnapshotComponents for PoaSnapshot {
|
||||
chain: &BlockChain,
|
||||
block_at: H256,
|
||||
sink: &mut ChunkSink,
|
||||
_progress: &Progress,
|
||||
preferred_size: usize,
|
||||
) -> Result<(), Error> {
|
||||
let number = chain.block_number(&block_at)
|
||||
|
||||
@@ -22,7 +22,7 @@ use std::sync::Arc;
|
||||
|
||||
use blockchain::{BlockChain, BlockChainDB};
|
||||
use engines::EthEngine;
|
||||
use snapshot::{Error, ManifestData};
|
||||
use snapshot::{Error, ManifestData, Progress};
|
||||
|
||||
use ethereum_types::H256;
|
||||
|
||||
@@ -49,6 +49,7 @@ pub trait SnapshotComponents: Send {
|
||||
chain: &BlockChain,
|
||||
block_at: H256,
|
||||
chunk_sink: &mut ChunkSink,
|
||||
progress: &Progress,
|
||||
preferred_size: usize,
|
||||
) -> Result<(), Error>;
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ use std::sync::Arc;
|
||||
|
||||
use blockchain::{BlockChain, BlockChainDB, BlockProvider};
|
||||
use engines::EthEngine;
|
||||
use snapshot::{Error, ManifestData};
|
||||
use snapshot::{Error, ManifestData, Progress};
|
||||
use snapshot::block::AbridgedBlock;
|
||||
use ethereum_types::H256;
|
||||
use kvdb::KeyValueDB;
|
||||
@@ -65,6 +65,7 @@ impl SnapshotComponents for PowSnapshot {
|
||||
chain: &BlockChain,
|
||||
block_at: H256,
|
||||
chunk_sink: &mut ChunkSink,
|
||||
progress: &Progress,
|
||||
preferred_size: usize,
|
||||
) -> Result<(), Error> {
|
||||
PowWorker {
|
||||
@@ -72,6 +73,7 @@ impl SnapshotComponents for PowSnapshot {
|
||||
rlps: VecDeque::new(),
|
||||
current_hash: block_at,
|
||||
writer: chunk_sink,
|
||||
progress: progress,
|
||||
preferred_size: preferred_size,
|
||||
}.chunk_all(self.blocks)
|
||||
}
|
||||
@@ -96,6 +98,7 @@ struct PowWorker<'a> {
|
||||
rlps: VecDeque<Bytes>,
|
||||
current_hash: H256,
|
||||
writer: &'a mut ChunkSink<'a>,
|
||||
progress: &'a Progress,
|
||||
preferred_size: usize,
|
||||
}
|
||||
|
||||
@@ -138,6 +141,7 @@ impl<'a> PowWorker<'a> {
|
||||
|
||||
last = self.current_hash;
|
||||
self.current_hash = block.header_view().parent_hash();
|
||||
self.progress.blocks.fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
if loaded_size != 0 {
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
//! https://wiki.parity.io/Warp-Sync-Snapshot-Format
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::cmp;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use hash::{keccak, KECCAK_NULL_RLP, KECCAK_EMPTY};
|
||||
@@ -43,6 +44,7 @@ use trie::{Trie, TrieMut};
|
||||
use ethtrie::{TrieDB, TrieDBMut};
|
||||
use rlp::{RlpStream, Rlp};
|
||||
use bloom_journal::Bloom;
|
||||
use num_cpus;
|
||||
|
||||
use self::io::SnapshotWriter;
|
||||
|
||||
@@ -88,6 +90,28 @@ const MAX_CHUNK_SIZE: usize = PREFERRED_CHUNK_SIZE / 4 * 5;
|
||||
const MIN_SUPPORTED_STATE_CHUNK_VERSION: u64 = 1;
|
||||
// current state chunk version.
|
||||
const STATE_CHUNK_VERSION: u64 = 2;
|
||||
/// number of snapshot subparts, must be a power of 2 in [1; 256]
|
||||
const SNAPSHOT_SUBPARTS: usize = 16;
|
||||
/// Maximum number of snapshot subparts (must be a multiple of `SNAPSHOT_SUBPARTS`)
|
||||
const MAX_SNAPSHOT_SUBPARTS: usize = 256;
|
||||
|
||||
/// Configuration for the Snapshot service
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct SnapshotConfiguration {
|
||||
/// If `true`, no periodic snapshots will be created
|
||||
pub no_periodic: bool,
|
||||
/// Number of threads for creating snapshots
|
||||
pub processing_threads: usize,
|
||||
}
|
||||
|
||||
impl Default for SnapshotConfiguration {
|
||||
fn default() -> Self {
|
||||
SnapshotConfiguration {
|
||||
no_periodic: false,
|
||||
processing_threads: ::std::cmp::max(1, num_cpus::get() / 2),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A progress indicator for snapshots.
|
||||
#[derive(Debug, Default)]
|
||||
@@ -130,7 +154,8 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
|
||||
block_at: H256,
|
||||
state_db: &HashDB<KeccakHasher>,
|
||||
writer: W,
|
||||
p: &Progress
|
||||
p: &Progress,
|
||||
processing_threads: usize,
|
||||
) -> Result<(), Error> {
|
||||
let start_header = chain.block_header_data(&block_at)
|
||||
.ok_or(Error::InvalidStartingBlock(BlockId::Hash(block_at)))?;
|
||||
@@ -142,17 +167,45 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
|
||||
let writer = Mutex::new(writer);
|
||||
let chunker = engine.snapshot_components().ok_or(Error::SnapshotsUnsupported)?;
|
||||
let snapshot_version = chunker.current_version();
|
||||
let (state_hashes, block_hashes) = scope(|scope| {
|
||||
let (state_hashes, block_hashes) = scope(|scope| -> Result<(Vec<H256>, Vec<H256>), Error> {
|
||||
let writer = &writer;
|
||||
let block_guard = scope.spawn(move || chunk_secondary(chunker, chain, block_at, writer, p));
|
||||
let state_res = chunk_state(state_db, &state_root, writer, p);
|
||||
|
||||
state_res.and_then(|state_hashes| {
|
||||
block_guard.join().map(|block_hashes| (state_hashes, block_hashes))
|
||||
})
|
||||
// The number of threads must be between 1 and SNAPSHOT_SUBPARTS
|
||||
assert!(processing_threads >= 1, "Cannot use less than 1 threads for creating snapshots");
|
||||
let num_threads: usize = cmp::min(processing_threads, SNAPSHOT_SUBPARTS);
|
||||
info!(target: "snapshot", "Using {} threads for Snapshot creation.", num_threads);
|
||||
|
||||
let mut state_guards = Vec::with_capacity(num_threads as usize);
|
||||
|
||||
for thread_idx in 0..num_threads {
|
||||
let state_guard = scope.spawn(move || -> Result<Vec<H256>, Error> {
|
||||
let mut chunk_hashes = Vec::new();
|
||||
|
||||
for part in (thread_idx..SNAPSHOT_SUBPARTS).step_by(num_threads) {
|
||||
debug!(target: "snapshot", "Chunking part {} in thread {}", part, thread_idx);
|
||||
let mut hashes = chunk_state(state_db, &state_root, writer, p, Some(part))?;
|
||||
chunk_hashes.append(&mut hashes);
|
||||
}
|
||||
|
||||
Ok(chunk_hashes)
|
||||
});
|
||||
state_guards.push(state_guard);
|
||||
}
|
||||
|
||||
let block_hashes = block_guard.join()?;
|
||||
let mut state_hashes = Vec::new();
|
||||
|
||||
for guard in state_guards {
|
||||
let part_state_hashes = guard.join()?;
|
||||
state_hashes.extend(part_state_hashes);
|
||||
}
|
||||
|
||||
debug!(target: "snapshot", "Took a snapshot of {} accounts", p.accounts.load(Ordering::SeqCst));
|
||||
Ok((state_hashes, block_hashes))
|
||||
})?;
|
||||
|
||||
info!("produced {} state chunks and {} block chunks.", state_hashes.len(), block_hashes.len());
|
||||
info!(target: "snapshot", "produced {} state chunks and {} block chunks.", state_hashes.len(), block_hashes.len());
|
||||
|
||||
let manifest_data = ManifestData {
|
||||
version: snapshot_version,
|
||||
@@ -200,6 +253,7 @@ pub fn chunk_secondary<'a>(mut chunker: Box<SnapshotComponents>, chain: &'a Bloc
|
||||
chain,
|
||||
start_hash,
|
||||
&mut chunk_sink,
|
||||
progress,
|
||||
PREFERRED_CHUNK_SIZE,
|
||||
)?;
|
||||
}
|
||||
@@ -263,10 +317,12 @@ impl<'a> StateChunker<'a> {
|
||||
|
||||
/// Walk the given state database starting from the given root,
|
||||
/// creating chunks and writing them out.
|
||||
/// `part` is a number between 0 and 15, which describe which part of
|
||||
/// the tree should be chunked.
|
||||
///
|
||||
/// Returns a list of hashes of chunks created, or any error it may
|
||||
/// have encountered.
|
||||
pub fn chunk_state<'a>(db: &HashDB<KeccakHasher>, root: &H256, writer: &Mutex<SnapshotWriter + 'a>, progress: &'a Progress) -> Result<Vec<H256>, Error> {
|
||||
pub fn chunk_state<'a>(db: &HashDB<KeccakHasher>, root: &H256, writer: &Mutex<SnapshotWriter + 'a>, progress: &'a Progress, part: Option<usize>) -> Result<Vec<H256>, Error> {
|
||||
let account_trie = TrieDB::new(db, &root)?;
|
||||
|
||||
let mut chunker = StateChunker {
|
||||
@@ -281,11 +337,33 @@ pub fn chunk_state<'a>(db: &HashDB<KeccakHasher>, root: &H256, writer: &Mutex<Sn
|
||||
let mut used_code = HashSet::new();
|
||||
|
||||
// account_key here is the address' hash.
|
||||
for item in account_trie.iter()? {
|
||||
let mut account_iter = account_trie.iter()?;
|
||||
|
||||
let mut seek_to = None;
|
||||
|
||||
if let Some(part) = part {
|
||||
assert!(part < 16, "Wrong chunk state part number (must be <16) in snapshot creation.");
|
||||
|
||||
let part_offset = MAX_SNAPSHOT_SUBPARTS / SNAPSHOT_SUBPARTS;
|
||||
let mut seek_from = vec![0; 32];
|
||||
seek_from[0] = (part * part_offset) as u8;
|
||||
account_iter.seek(&seek_from)?;
|
||||
|
||||
// Set the upper-bound, except for the last part
|
||||
if part < SNAPSHOT_SUBPARTS - 1 {
|
||||
seek_to = Some(((part + 1) * part_offset) as u8)
|
||||
}
|
||||
}
|
||||
|
||||
for item in account_iter {
|
||||
let (account_key, account_data) = item?;
|
||||
let account = ::rlp::decode(&*account_data)?;
|
||||
let account_key_hash = H256::from_slice(&account_key);
|
||||
|
||||
if seek_to.map_or(false, |seek_to| account_key[0] >= seek_to) {
|
||||
break;
|
||||
}
|
||||
|
||||
let account = ::rlp::decode(&*account_data)?;
|
||||
let account_db = AccountDB::from_hash(db, account_key_hash);
|
||||
|
||||
let fat_rlps = account::to_fat_rlps(&account_key_hash, &account, &account_db, &mut used_code, PREFERRED_CHUNK_SIZE - chunker.chunk_size(), PREFERRED_CHUNK_SIZE)?;
|
||||
|
||||
@@ -22,7 +22,7 @@ use hash::{KECCAK_NULL_RLP, keccak};
|
||||
|
||||
use basic_account::BasicAccount;
|
||||
use snapshot::account;
|
||||
use snapshot::{chunk_state, Error as SnapshotError, Progress, StateRebuilder};
|
||||
use snapshot::{chunk_state, Error as SnapshotError, Progress, StateRebuilder, SNAPSHOT_SUBPARTS};
|
||||
use snapshot::io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter};
|
||||
use super::helpers::{compare_dbs, StateProducer};
|
||||
|
||||
@@ -53,7 +53,11 @@ fn snap_and_restore() {
|
||||
let state_root = producer.state_root();
|
||||
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());
|
||||
|
||||
let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default()).unwrap();
|
||||
let mut state_hashes = Vec::new();
|
||||
for part in 0..SNAPSHOT_SUBPARTS {
|
||||
let mut hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), Some(part)).unwrap();
|
||||
state_hashes.append(&mut hashes);
|
||||
}
|
||||
|
||||
writer.into_inner().finish(::snapshot::ManifestData {
|
||||
version: 2,
|
||||
@@ -164,7 +168,7 @@ fn checks_flag() {
|
||||
let state_root = producer.state_root();
|
||||
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());
|
||||
|
||||
let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default()).unwrap();
|
||||
let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), None).unwrap();
|
||||
|
||||
writer.into_inner().finish(::snapshot::ManifestData {
|
||||
version: 2,
|
||||
|
||||
Reference in New Issue
Block a user