Use a lock instead of atomics for snapshot Progress (#11197)

* WIP. Typos and logging.

* Format todos

* Pause pruning while a snapshot is under way
Logs, docs and todos

* Allocate memory for the full chunk

* Name snapshotting threads

* Ensure `taking_snapshot` is set to false whenever and however `take_snapshot`returns
Rename `take_at` to `request_snapshot_at`
Cleanup

* Let "in_progress" deletion fail
Fix tests

* Just use an atomic

* Review grumbles

* Finish the sentence

* Resolve a few todos and clarify comments.

* Calculate progress rate since last update

* Lockfile

* Fix tests

* typo

* Reinstate default snapshotting frequency
Cut down on the logging noise

* Use a lock instead of atomics for snapshot Progress

* Update ethcore/types/src/snapshot.rs

Co-Authored-By: Andronik Ordian <write@reusable.software>

* Avoid truncating cast
Cleanup
This commit is contained in:
David 2019-10-28 18:24:45 +01:00 committed by GitHub
parent 293e06e0f4
commit 0d3423cbe0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 105 additions and 111 deletions

1
Cargo.lock generated
View File

@ -606,7 +606,6 @@ dependencies = [
"parity-crypto 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "parity-crypto 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-snappy 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-snappy 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-util-mem 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-util-mem 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"patricia-trie-ethereum 0.1.0", "patricia-trie-ethereum 0.1.0",
"rlp 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "rlp 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"rlp_derive 0.1.0", "rlp_derive 0.1.0",

View File

@ -28,6 +28,7 @@ use ethereum_types::{H256, Address};
use hash_db::{HashDB, EMPTY_PREFIX}; use hash_db::{HashDB, EMPTY_PREFIX};
use keccak_hash::{KECCAK_EMPTY, KECCAK_NULL_RLP, keccak}; use keccak_hash::{KECCAK_EMPTY, KECCAK_NULL_RLP, keccak};
use kvdb::DBValue; use kvdb::DBValue;
use parking_lot::RwLock;
use rlp::Rlp; use rlp::Rlp;
use snapshot::test_helpers::{ACC_EMPTY, to_fat_rlps, from_fat_rlp}; use snapshot::test_helpers::{ACC_EMPTY, to_fat_rlps, from_fat_rlp};
@ -48,7 +49,7 @@ fn encoding_basic() {
let thin_rlp = ::rlp::encode(&account); let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account); assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
let p = Progress::new(); let p = RwLock::new(Progress::new());
let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap(); let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
let fat_rlp = Rlp::new(&fat_rlps[0]).at(1).unwrap(); let fat_rlp = Rlp::new(&fat_rlps[0]).at(1).unwrap();
assert_eq!(from_fat_rlp(&mut AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr)), fat_rlp, H256::zero()).unwrap().0, account); assert_eq!(from_fat_rlp(&mut AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr)), fat_rlp, H256::zero()).unwrap().0, account);
@ -69,7 +70,7 @@ fn encoding_version() {
let thin_rlp = ::rlp::encode(&account); let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account); assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
let p = Progress::new(); let p = RwLock::new(Progress::new());
let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap(); let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
let fat_rlp = Rlp::new(&fat_rlps[0]).at(1).unwrap(); let fat_rlp = Rlp::new(&fat_rlps[0]).at(1).unwrap();
assert_eq!(from_fat_rlp(&mut AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr)), fat_rlp, H256::zero()).unwrap().0, account); assert_eq!(from_fat_rlp(&mut AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr)), fat_rlp, H256::zero()).unwrap().0, account);
@ -96,7 +97,7 @@ fn encoding_storage() {
let thin_rlp = ::rlp::encode(&account); let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account); assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
let p = Progress::new(); let p = RwLock::new(Progress::new());
let fat_rlp = to_fat_rlps(&keccak(&addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap(); let fat_rlp = to_fat_rlps(&keccak(&addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
let fat_rlp = Rlp::new(&fat_rlp[0]).at(1).unwrap(); let fat_rlp = Rlp::new(&fat_rlp[0]).at(1).unwrap();
@ -124,7 +125,7 @@ fn encoding_storage_split() {
let thin_rlp = ::rlp::encode(&account); let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account); assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
let p = Progress::new(); let p = RwLock::new(Progress::new());
let fat_rlps = to_fat_rlps(&keccak(addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), 500, 1000, &p).unwrap(); let fat_rlps = to_fat_rlps(&keccak(addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), 500, 1000, &p).unwrap();
let mut root = KECCAK_NULL_RLP; let mut root = KECCAK_NULL_RLP;
let mut restored_account = None; let mut restored_account = None;
@ -170,8 +171,8 @@ fn encoding_code() {
}; };
let mut used_code = HashSet::new(); let mut used_code = HashSet::new();
let p1 = Progress::new(); let p1 = RwLock::new(Progress::new());
let p2 = Progress::new(); let p2 = RwLock::new(Progress::new());
let fat_rlp1 = to_fat_rlps(&keccak(&addr1), &account1, &AccountDB::from_hash(db.as_hash_db(), keccak(addr1)), &mut used_code, usize::max_value(), usize::max_value(), &p1).unwrap(); let fat_rlp1 = to_fat_rlps(&keccak(&addr1), &account1, &AccountDB::from_hash(db.as_hash_db(), keccak(addr1)), &mut used_code, usize::max_value(), usize::max_value(), &p1).unwrap();
let fat_rlp2 = to_fat_rlps(&keccak(&addr2), &account2, &AccountDB::from_hash(db.as_hash_db(), keccak(addr2)), &mut used_code, usize::max_value(), usize::max_value(), &p2).unwrap(); let fat_rlp2 = to_fat_rlps(&keccak(&addr2), &account2, &AccountDB::from_hash(db.as_hash_db(), keccak(addr2)), &mut used_code, usize::max_value(), usize::max_value(), &p2).unwrap();
assert_eq!(used_code.len(), 1); assert_eq!(used_code.len(), 1);

View File

@ -40,6 +40,7 @@ use keccak_hash::{KECCAK_NULL_RLP};
use keccak_hasher::KeccakHasher; use keccak_hasher::KeccakHasher;
use kvdb::DBValue; use kvdb::DBValue;
use log::trace; use log::trace;
use parking_lot::RwLock;
use rand::Rng; use rand::Rng;
use rlp; use rlp;
use snapshot::{ use snapshot::{
@ -146,7 +147,7 @@ pub fn snap(client: &Client) -> (Box<dyn SnapshotReader>, TempDir) {
let tempdir = TempDir::new("").unwrap(); let tempdir = TempDir::new("").unwrap();
let path = tempdir.path().join("file"); let path = tempdir.path().join("file");
let writer = PackedWriter::new(&path).unwrap(); let writer = PackedWriter::new(&path).unwrap();
let progress = Progress::new(); let progress = RwLock::new(Progress::new());
let hash = client.chain_info().best_block_hash; let hash = client.chain_info().best_block_hash;
client.take_snapshot(writer, BlockId::Hash(hash), &progress).unwrap(); client.take_snapshot(writer, BlockId::Hash(hash), &progress).unwrap();

View File

@ -31,7 +31,7 @@ use snapshot::{
io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter}, io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter},
PowSnapshot, PowSnapshot,
}; };
use parking_lot::Mutex; use parking_lot::{Mutex, RwLock};
use snappy; use snappy;
use keccak_hash::KECCAK_NULL_RLP; use keccak_hash::KECCAK_NULL_RLP;
use kvdb::DBTransaction; use kvdb::DBTransaction;
@ -74,7 +74,7 @@ fn chunk_and_restore(amount: u64) {
&bc, &bc,
best_hash, best_hash,
&writer, &writer,
&Progress::new() &RwLock::new(Progress::new())
).unwrap(); ).unwrap();
let manifest = ManifestData { let manifest = ManifestData {

View File

@ -42,7 +42,7 @@ use ethcore::{
test_helpers::{new_db, new_temp_db, generate_dummy_client_with_spec_and_data, restoration_db_handler} test_helpers::{new_db, new_temp_db, generate_dummy_client_with_spec_and_data, restoration_db_handler}
}; };
use parking_lot::Mutex; use parking_lot::{Mutex, RwLock};
use ethcore_io::{IoChannel, IoService}; use ethcore_io::{IoChannel, IoService};
use kvdb_rocksdb::DatabaseConfig; use kvdb_rocksdb::DatabaseConfig;
use journaldb::Algorithm; use journaldb::Algorithm;
@ -278,7 +278,7 @@ fn keep_ancient_blocks() {
&bc, &bc,
best_hash, best_hash,
&writer, &writer,
&Progress::new() &RwLock::new(Progress::new())
).unwrap(); ).unwrap();
let state_db = client.state_db().journal_db().boxed_clone(); let state_db = client.state_db().journal_db().boxed_clone();
let start_header = bc.block_header_data(&best_hash).unwrap(); let start_header = bc.block_header_data(&best_hash).unwrap();
@ -287,7 +287,7 @@ fn keep_ancient_blocks() {
state_db.as_hash_db(), state_db.as_hash_db(),
&state_root, &state_root,
&writer, &writer,
&Progress::new(), &RwLock::new(Progress::new()),
None, None,
0 0
).unwrap(); ).unwrap();

View File

@ -35,7 +35,7 @@ use rand_xorshift::XorShiftRng;
use ethereum_types::H256; use ethereum_types::H256;
use journaldb::{self, Algorithm}; use journaldb::{self, Algorithm};
use kvdb_rocksdb::{Database, DatabaseConfig}; use kvdb_rocksdb::{Database, DatabaseConfig};
use parking_lot::Mutex; use parking_lot::{Mutex, RwLock};
use tempdir::TempDir; use tempdir::TempDir;
use crate::helpers::StateProducer; use crate::helpers::StateProducer;
@ -61,8 +61,9 @@ fn snap_and_restore() {
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap()); let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());
let mut state_hashes = Vec::new(); let mut state_hashes = Vec::new();
let progress = RwLock::new(Progress::new());
for part in 0..SNAPSHOT_SUBPARTS { for part in 0..SNAPSHOT_SUBPARTS {
let mut hashes = chunk_state(&old_db, &state_root, &writer, &Progress::new(), Some(part), 0).unwrap(); let mut hashes = chunk_state(&old_db, &state_root, &writer, &progress, Some(part), 0).unwrap();
state_hashes.append(&mut hashes); state_hashes.append(&mut hashes);
} }
@ -133,8 +134,16 @@ fn get_code_from_prev_chunk() {
let mut make_chunk = |acc, hash| { let mut make_chunk = |acc, hash| {
let mut db = journaldb::new_memory_db(); let mut db = journaldb::new_memory_db();
AccountDBMut::from_hash(&mut db, hash).insert(EMPTY_PREFIX, &code[..]); AccountDBMut::from_hash(&mut db, hash).insert(EMPTY_PREFIX, &code[..]);
let p = Progress::new(); let p = RwLock::new(Progress::new());
let fat_rlp = to_fat_rlps(&hash, &acc, &AccountDB::from_hash(&db, hash), &mut used_code, usize::max_value(), usize::max_value(), &p).unwrap(); let fat_rlp = to_fat_rlps(
&hash,
&acc,
&AccountDB::from_hash(&db, hash),
&mut used_code,
usize::max_value(),
usize::max_value(),
&p
).unwrap();
let mut stream = RlpStream::new_list(1); let mut stream = RlpStream::new_list(1);
stream.append_raw(&fat_rlp[0], 1); stream.append_raw(&fat_rlp[0], 1);
stream.out() stream.out()
@ -177,8 +186,9 @@ fn checks_flag() {
let state_root = producer.state_root(); let state_root = producer.state_root();
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap()); let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());
let progress = RwLock::new(Progress::new());
let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::new(), None, 0).unwrap(); let state_hashes = chunk_state(&old_db, &state_root, &writer, &progress, None, 0).unwrap();
writer.into_inner().finish(ManifestData { writer.into_inner().finish(ManifestData {
version: 2, version: 2,

View File

@ -17,7 +17,6 @@
//! Account state encoding and decoding //! Account state encoding and decoding
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::atomic::Ordering;
use account_db::{AccountDB, AccountDBMut}; use account_db::{AccountDB, AccountDBMut};
use bytes::Bytes; use bytes::Bytes;
@ -31,6 +30,7 @@ use ethtrie::{TrieDB, TrieDBMut};
use hash_db::HashDB; use hash_db::HashDB;
use keccak_hash::{KECCAK_EMPTY, KECCAK_NULL_RLP}; use keccak_hash::{KECCAK_EMPTY, KECCAK_NULL_RLP};
use log::{trace, warn}; use log::{trace, warn};
use parking_lot::RwLock;
use rlp::{RlpStream, Rlp}; use rlp::{RlpStream, Rlp};
use trie_db::{Trie, TrieMut}; use trie_db::{Trie, TrieMut};
@ -79,7 +79,7 @@ pub fn to_fat_rlps(
used_code: &mut HashSet<H256>, used_code: &mut HashSet<H256>,
first_chunk_size: usize, first_chunk_size: usize,
max_chunk_size: usize, max_chunk_size: usize,
p: &Progress, p: &RwLock<Progress>,
) -> Result<Vec<Bytes>, Error> { ) -> Result<Vec<Bytes>, Error> {
let db = &(acct_db as &dyn HashDB<_,_>); let db = &(acct_db as &dyn HashDB<_,_>);
let db = TrieDB::new(db, &acc.storage_root)?; let db = TrieDB::new(db, &acc.storage_root)?;
@ -135,7 +135,7 @@ pub fn to_fat_rlps(
} }
loop { loop {
if p.abort.load(Ordering::SeqCst) { if p.read().abort {
trace!(target: "snapshot", "to_fat_rlps: aborting snapshot"); trace!(target: "snapshot", "to_fat_rlps: aborting snapshot");
return Err(Error::SnapshotAborted); return Err(Error::SnapshotAborted);
} }

View File

@ -38,6 +38,7 @@ use ethereum_types::{H256, U256};
use itertools::{Position, Itertools}; use itertools::{Position, Itertools};
use kvdb::KeyValueDB; use kvdb::KeyValueDB;
use log::trace; use log::trace;
use parking_lot::RwLock;
use rlp::{RlpStream, Rlp}; use rlp::{RlpStream, Rlp};
use crate::{SnapshotComponents, Rebuilder}; use crate::{SnapshotComponents, Rebuilder};
@ -62,7 +63,7 @@ impl SnapshotComponents for PoaSnapshot {
chain: &BlockChain, chain: &BlockChain,
block_at: H256, block_at: H256,
sink: &mut ChunkSink, sink: &mut ChunkSink,
_progress: &Progress, _progress: &RwLock<Progress>,
preferred_size: usize, preferred_size: usize,
) -> Result<(), SnapshotError> { ) -> Result<(), SnapshotError> {
let number = chain.block_number(&block_at) let number = chain.block_number(&block_at)

View File

@ -37,6 +37,7 @@ use engine::Engine;
use ethereum_types::{H256, U256}; use ethereum_types::{H256, U256};
use kvdb::KeyValueDB; use kvdb::KeyValueDB;
use log::trace; use log::trace;
use parking_lot::RwLock;
use rand::rngs::OsRng; use rand::rngs::OsRng;
use rlp::{RlpStream, Rlp}; use rlp::{RlpStream, Rlp};
use triehash::ordered_trie_root; use triehash::ordered_trie_root;
@ -72,7 +73,7 @@ impl SnapshotComponents for PowSnapshot {
chain: &BlockChain, chain: &BlockChain,
block_at: H256, block_at: H256,
chunk_sink: &mut ChunkSink, chunk_sink: &mut ChunkSink,
progress: &Progress, progress: &RwLock<Progress>,
preferred_size: usize, preferred_size: usize,
) -> Result<(), SnapshotError> { ) -> Result<(), SnapshotError> {
PowWorker { PowWorker {
@ -110,7 +111,7 @@ struct PowWorker<'a> {
rlps: VecDeque<Bytes>, rlps: VecDeque<Bytes>,
current_hash: H256, current_hash: H256,
writer: &'a mut ChunkSink<'a>, writer: &'a mut ChunkSink<'a>,
progress: &'a Progress, progress: &'a RwLock<Progress>,
preferred_size: usize, preferred_size: usize,
} }
@ -153,7 +154,7 @@ impl<'a> PowWorker<'a> {
last = self.current_hash; last = self.current_hash;
self.current_hash = block.header_view().parent_hash(); self.current_hash = block.header_view().parent_hash();
self.progress.blocks.fetch_add(1, Ordering::SeqCst); self.progress.write().blocks += 1;
} }
if loaded_size != 0 { if loaded_size != 0 {

View File

@ -44,7 +44,7 @@ use ethtrie::{TrieDB, TrieDBMut};
use hash_db::HashDB; use hash_db::HashDB;
use journaldb::{self, Algorithm, JournalDB}; use journaldb::{self, Algorithm, JournalDB};
use keccak_hasher::KeccakHasher; use keccak_hasher::KeccakHasher;
use parking_lot::Mutex; use parking_lot::{Mutex, RwLock};
use kvdb::{KeyValueDB, DBValue}; use kvdb::{KeyValueDB, DBValue};
use log::{debug, info, trace}; use log::{debug, info, trace};
use num_cpus; use num_cpus;
@ -121,7 +121,7 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
block_hash: H256, block_hash: H256,
state_db: &dyn HashDB<KeccakHasher, DBValue>, state_db: &dyn HashDB<KeccakHasher, DBValue>,
writer: W, writer: W,
p: &Progress, p: &RwLock<Progress>,
processing_threads: usize, processing_threads: usize,
) -> Result<(), Error> { ) -> Result<(), Error> {
let start_header = chain.block_header_data(&block_hash) let start_header = chain.block_header_data(&block_hash)
@ -168,7 +168,7 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
state_hashes.extend(part_state_hashes); state_hashes.extend(part_state_hashes);
} }
info!("Took a snapshot at #{} of {} accounts", block_number, p.accounts()); info!("Took a snapshot at #{} of {} accounts", block_number, p.read().accounts());
Ok((state_hashes, block_hashes)) Ok((state_hashes, block_hashes))
}).expect("Sub-thread never panics; qed")?; }).expect("Sub-thread never panics; qed")?;
@ -186,7 +186,7 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
writer.into_inner().finish(manifest_data)?; writer.into_inner().finish(manifest_data)?;
p.done.store(true, Ordering::SeqCst); p.write().done = true;
Ok(()) Ok(())
} }
@ -202,7 +202,7 @@ pub fn chunk_secondary<'a>(
chain: &'a BlockChain, chain: &'a BlockChain,
start_hash: H256, start_hash: H256,
writer: &Mutex<dyn SnapshotWriter + 'a>, writer: &Mutex<dyn SnapshotWriter + 'a>,
progress: &'a Progress progress: &'a RwLock<Progress>
) -> Result<Vec<H256>, Error> { ) -> Result<Vec<H256>, Error> {
let mut chunk_hashes = Vec::new(); let mut chunk_hashes = Vec::new();
let mut snappy_buffer = vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)]; let mut snappy_buffer = vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)];
@ -218,7 +218,7 @@ pub fn chunk_secondary<'a>(
trace!(target: "snapshot", "wrote secondary chunk. hash: {:x}, size: {}, uncompressed size: {}", trace!(target: "snapshot", "wrote secondary chunk. hash: {:x}, size: {}, uncompressed size: {}",
hash, size, raw_data.len()); hash, size, raw_data.len());
progress.update(0, size); progress.write().update(0, size as u64);
chunk_hashes.push(hash); chunk_hashes.push(hash);
Ok(()) Ok(())
}; };
@ -242,7 +242,7 @@ struct StateChunker<'a> {
cur_size: usize, cur_size: usize,
snappy_buffer: Vec<u8>, snappy_buffer: Vec<u8>,
writer: &'a Mutex<dyn SnapshotWriter + 'a>, writer: &'a Mutex<dyn SnapshotWriter + 'a>,
progress: &'a Progress, progress: &'a RwLock<Progress>,
thread_idx: usize, thread_idx: usize,
} }
@ -275,7 +275,7 @@ impl<'a> StateChunker<'a> {
self.writer.lock().write_state_chunk(hash, compressed)?; self.writer.lock().write_state_chunk(hash, compressed)?;
trace!(target: "snapshot", "Thread {} wrote state chunk. size: {}, uncompressed size: {}", self.thread_idx, compressed_size, raw_data.len()); trace!(target: "snapshot", "Thread {} wrote state chunk. size: {}, uncompressed size: {}", self.thread_idx, compressed_size, raw_data.len());
self.progress.update(num_entries, compressed_size); self.progress.write().update(num_entries as u64, compressed_size as u64);
self.hashes.push(hash); self.hashes.push(hash);
self.cur_size = 0; self.cur_size = 0;
@ -300,7 +300,7 @@ pub fn chunk_state<'a>(
db: &dyn HashDB<KeccakHasher, DBValue>, db: &dyn HashDB<KeccakHasher, DBValue>,
root: &H256, root: &H256,
writer: &Mutex<dyn SnapshotWriter + 'a>, writer: &Mutex<dyn SnapshotWriter + 'a>,
progress: &'a Progress, progress: &'a RwLock<Progress>,
part: Option<usize>, part: Option<usize>,
thread_idx: usize, thread_idx: usize,
) -> Result<Vec<H256>, Error> { ) -> Result<Vec<H256>, Error> {

View File

@ -259,7 +259,7 @@ pub struct Service<C: Send + Sync + 'static> {
state_chunks: AtomicUsize, state_chunks: AtomicUsize,
block_chunks: AtomicUsize, block_chunks: AtomicUsize,
client: Arc<C>, client: Arc<C>,
progress: Progress, progress: RwLock<Progress>,
taking_snapshot: AtomicBool, taking_snapshot: AtomicBool,
restoring_snapshot: AtomicBool, restoring_snapshot: AtomicBool,
} }
@ -280,7 +280,7 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
state_chunks: AtomicUsize::new(0), state_chunks: AtomicUsize::new(0),
block_chunks: AtomicUsize::new(0), block_chunks: AtomicUsize::new(0),
client: params.client, client: params.client,
progress: Progress::new(), progress: RwLock::new(Progress::new()),
taking_snapshot: AtomicBool::new(false), taking_snapshot: AtomicBool::new(false),
restoring_snapshot: AtomicBool::new(false), restoring_snapshot: AtomicBool::new(false),
}; };
@ -483,9 +483,9 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
/// Tick the snapshot service. This will log any active snapshot /// Tick the snapshot service. This will log any active snapshot
/// being taken. /// being taken.
pub fn tick(&self) { pub fn tick(&self) {
if self.progress.done() || !self.taking_snapshot.load(Ordering::SeqCst) { return } if self.progress.read().done() || !self.taking_snapshot.load(Ordering::SeqCst) { return }
let p = &self.progress; let p = &self.progress.read();
info!("Snapshot: {} accounts, {} blocks, {} bytes", p.accounts(), p.blocks(), p.bytes()); info!("Snapshot: {} accounts, {} blocks, {} bytes", p.accounts(), p.blocks(), p.bytes());
let rate = p.rate(); let rate = p.rate();
debug!(target: "snapshot", "Current progress rate: {:.0} acc/s, {:.0} bytes/s (compressed)", rate.0, rate.1); debug!(target: "snapshot", "Current progress rate: {:.0} acc/s, {:.0} bytes/s (compressed)", rate.0, rate.1);
@ -507,7 +507,7 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
self.taking_snapshot.store(false, Ordering::SeqCst); self.taking_snapshot.store(false, Ordering::SeqCst);
}} }}
let start_time = std::time::Instant::now(); let start_time = std::time::Instant::now();
self.progress.reset(); *self.progress.write() = Progress::new();
let temp_dir = self.temp_snapshot_dir(); let temp_dir = self.temp_snapshot_dir();
let snapshot_dir = self.snapshot_dir(); let snapshot_dir = self.snapshot_dir();
@ -893,7 +893,7 @@ impl<C: Send + Sync> SnapshotService for Service<C> {
fn abort_snapshot(&self) { fn abort_snapshot(&self) {
if self.taking_snapshot.load(Ordering::SeqCst) { if self.taking_snapshot.load(Ordering::SeqCst) {
trace!(target: "snapshot", "Aborting snapshot Snapshot under way"); trace!(target: "snapshot", "Aborting snapshot Snapshot under way");
self.progress.abort.store(true, Ordering::SeqCst); self.progress.write().abort = true;
} }
} }

View File

@ -26,6 +26,7 @@ use common_types::{
}; };
use engine::Engine; use engine::Engine;
use ethereum_types::H256; use ethereum_types::H256;
use parking_lot::RwLock;
use crate::io::SnapshotWriter; use crate::io::SnapshotWriter;
@ -108,7 +109,7 @@ pub trait SnapshotComponents: Send {
chain: &BlockChain, chain: &BlockChain,
block_at: H256, block_at: H256,
chunk_sink: &mut ChunkSink, chunk_sink: &mut ChunkSink,
progress: &Progress, progress: &RwLock<Progress>,
preferred_size: usize, preferred_size: usize,
) -> Result<(), SnapshotError>; ) -> Result<(), SnapshotError>;
@ -141,7 +142,7 @@ pub trait SnapshotClient: BlockChainClient + BlockInfo + DatabaseRestore + Block
&self, &self,
writer: W, writer: W,
at: BlockId, at: BlockId,
p: &Progress, p: &RwLock<Progress>,
) -> Result<(), Error>; ) -> Result<(), Error>;
} }

View File

@ -2543,7 +2543,7 @@ impl SnapshotClient for Client {
&self, &self,
writer: W, writer: W,
at: BlockId, at: BlockId,
p: &Progress, p: &RwLock<Progress>,
) -> Result<(), EthcoreError> { ) -> Result<(), EthcoreError> {
if let Snapshotting::Unsupported = self.engine.snapshot_mode() { if let Snapshotting::Unsupported = self.engine.snapshot_mode() {
return Err(EthcoreError::Snapshot(SnapshotError::SnapshotsUnsupported)); return Err(EthcoreError::Snapshot(SnapshotError::SnapshotsUnsupported));

View File

@ -15,7 +15,6 @@ parity-bytes = "0.1"
parity-crypto = { version = "0.4.2", features = ["publickey"] } parity-crypto = { version = "0.4.2", features = ["publickey"] }
parity-util-mem = "0.2.0" parity-util-mem = "0.2.0"
parity-snappy = "0.1" parity-snappy = "0.1"
parking_lot = "0.9.0"
patricia-trie-ethereum = { path = "../../util/patricia-trie-ethereum" } patricia-trie-ethereum = { path = "../../util/patricia-trie-ethereum" }
rlp = "0.4.0" rlp = "0.4.0"
rlp_derive = { path = "../../util/rlp-derive" } rlp_derive = { path = "../../util/rlp-derive" }

View File

@ -40,7 +40,6 @@ extern crate parity_crypto;
#[macro_use] #[macro_use]
extern crate derive_more; extern crate derive_more;
extern crate keccak_hash as hash; extern crate keccak_hash as hash;
extern crate parking_lot;
extern crate parity_bytes as bytes; extern crate parity_bytes as bytes;
extern crate patricia_trie_ethereum as ethtrie; extern crate patricia_trie_ethereum as ethtrie;
extern crate rlp; extern crate rlp;

View File

@ -16,12 +16,10 @@
//! Snapshot type definitions //! Snapshot type definitions
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::Instant; use std::time::Instant;
use bytes::Bytes; use bytes::Bytes;
use ethereum_types::H256; use ethereum_types::H256;
use parking_lot::RwLock;
use rlp::{Rlp, RlpStream, DecoderError}; use rlp::{Rlp, RlpStream, DecoderError};
/// Modes of snapshotting /// Modes of snapshotting
@ -44,89 +42,68 @@ pub enum Snapshotting {
#[derive(Debug)] #[derive(Debug)]
pub struct Progress { pub struct Progress {
/// Number of accounts processed so far /// Number of accounts processed so far
accounts: AtomicUsize, accounts: u64,
// Number of accounts processed at last tick. // Number of accounts processed at last tick.
prev_accounts: AtomicUsize, prev_accounts: u64,
/// Number of blocks processed so far /// Number of blocks processed so far
pub blocks: AtomicUsize, pub blocks: u64,
/// Size in bytes of a all compressed chunks processed so far /// Size in bytes of a all compressed chunks processed so far
bytes: AtomicUsize, bytes: u64,
// Number of bytes processed at last tick. // Number of bytes processed at last tick.
prev_bytes: AtomicUsize, prev_bytes: u64,
/// Signals that the snapshotting process is completed /// Signals that the snapshotting process is completed
pub done: AtomicBool, pub done: bool,
/// Signal snapshotting process to abort /// Signal snapshotting process to abort
pub abort: AtomicBool, pub abort: bool,
last_tick: RwLock<Instant>, last_tick: Instant,
} }
impl Progress { impl Progress {
/// Create a new progress tracker. /// Create a new progress tracker.
pub fn new() -> Progress { pub fn new() -> Progress {
Progress { Progress {
accounts: AtomicUsize::new(0), accounts: 0,
prev_accounts: AtomicUsize::new(0), prev_accounts: 0,
blocks: AtomicUsize::new(0), blocks: 0,
bytes: AtomicUsize::new(0), bytes: 0,
prev_bytes: AtomicUsize::new(0), prev_bytes: 0,
abort: AtomicBool::new(false), abort: false,
done: AtomicBool::new(false), done: false,
last_tick: RwLock::new(Instant::now()), last_tick: Instant::now(),
} }
} }
/// Reset the progress.
pub fn reset(&self) {
self.accounts.store(0, Ordering::Release);
self.blocks.store(0, Ordering::Release);
self.bytes.store(0, Ordering::Release);
self.abort.store(false, Ordering::Release);
// atomic fence here to ensure the others are written first?
// logs might very rarely get polluted if not.
self.done.store(false, Ordering::Release);
*self.last_tick.write() = Instant::now();
}
/// Get the number of accounts snapshotted thus far. /// Get the number of accounts snapshotted thus far.
pub fn accounts(&self) -> usize { self.accounts.load(Ordering::Acquire) } pub fn accounts(&self) -> u64 { self.accounts }
/// Get the number of blocks snapshotted thus far. /// Get the number of blocks snapshotted thus far.
pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Acquire) } pub fn blocks(&self) -> u64 { self.blocks }
/// Get the written size of the snapshot in bytes. /// Get the written size of the snapshot in bytes.
pub fn bytes(&self) -> usize { self.bytes.load(Ordering::Acquire) } pub fn bytes(&self) -> u64 { self.bytes }
/// Whether the snapshot is complete. /// Whether the snapshot is complete.
pub fn done(&self) -> bool { self.done.load(Ordering::Acquire) } pub fn done(&self) -> bool { self.done }
/// Return the progress rate over the last tick (i.e. since last update). /// Return the progress rate over the last tick (i.e. since last update).
pub fn rate(&self) -> (f64, f64) { pub fn rate(&self) -> (f64, f64) {
let last_tick = *self.last_tick.read(); let dt = self.last_tick.elapsed().as_secs_f64();
let dt = last_tick.elapsed().as_secs_f64();
if dt < 1.0 { if dt < 1.0 {
return (0f64, 0f64); return (0f64, 0f64);
} }
let delta_acc = self.accounts.load(Ordering::Relaxed) let delta_acc = self.accounts.saturating_sub(self.prev_accounts);
.saturating_sub(self.prev_accounts.load(Ordering::Relaxed)); let delta_bytes = self.bytes.saturating_sub(self.prev_bytes);
let delta_bytes = self.bytes.load(Ordering::Relaxed)
.saturating_sub(self.prev_bytes.load(Ordering::Relaxed));
(delta_acc as f64 / dt, delta_bytes as f64 / dt) (delta_acc as f64 / dt, delta_bytes as f64 / dt)
} }
/// Update state progress counters and set the last tick. /// Update state progress counters and set the last tick.
pub fn update(&self, accounts_delta: usize, bytes_delta: usize) { pub fn update(&mut self, accounts_delta: u64, bytes_delta: u64) {
*self.last_tick.write() = Instant::now(); self.last_tick = Instant::now();
self.prev_accounts.store( self.prev_accounts = self.accounts;
self.accounts.fetch_add(accounts_delta, Ordering::SeqCst), self.prev_bytes = self.bytes;
Ordering::SeqCst self.accounts += accounts_delta;
); self.bytes += bytes_delta;
self.prev_bytes.store(
self.bytes.fetch_add(bytes_delta, Ordering::SeqCst),
Ordering::SeqCst
);
} }
} }

View File

@ -46,7 +46,7 @@ use ethereum_types::H256;
use parking_lot::{RwLock, Mutex}; use parking_lot::{RwLock, Mutex};
/// Format byte counts to standard denominations. /// Format byte counts to standard denominations.
pub fn format_bytes(b: usize) -> String { pub fn format_bytes(b: u64) -> String {
match binary_prefix(b as f64) { match binary_prefix(b as f64) {
Standalone(bytes) => format!("{} bytes", bytes), Standalone(bytes) => format!("{} bytes", bytes),
Prefixed(prefix, n) => format!("{:.0} {}B", n, prefix), Prefixed(prefix, n) => format!("{:.0} {}B", n, prefix),
@ -69,9 +69,8 @@ impl CacheSizes {
use std::fmt::Write; use std::fmt::Write;
let mut buf = String::new(); let mut buf = String::new();
for (name, &size) in &self.sizes { for (name, size) in &self.sizes {
write!(buf, " {:>8} {}", paint(style, format_bytes(*size as u64)), name)
write!(buf, " {:>8} {}", paint(style, format_bytes(size)), name)
.expect("writing to string won't fail unless OOM; qed") .expect("writing to string won't fail unless OOM; qed")
} }

View File

@ -27,6 +27,7 @@ use snapshot::service::Service as SnapshotService;
use ethcore::client::{Client, DatabaseCompactionProfile, VMType}; use ethcore::client::{Client, DatabaseCompactionProfile, VMType};
use ethcore::miner::Miner; use ethcore::miner::Miner;
use ethcore_service::ClientService; use ethcore_service::ClientService;
use parking_lot::RwLock;
use types::{ use types::{
ids::BlockId, ids::BlockId,
snapshot::Progress, snapshot::Progress,
@ -257,20 +258,25 @@ impl SnapshotCommand {
let writer = PackedWriter::new(&file_path) let writer = PackedWriter::new(&file_path)
.map_err(|e| format!("Failed to open snapshot writer: {}", e))?; .map_err(|e| format!("Failed to open snapshot writer: {}", e))?;
let progress = Arc::new(Progress::new()); let progress = Arc::new(RwLock::new(Progress::new()));
let p = progress.clone(); let p = progress.clone();
let informant_handle = ::std::thread::spawn(move || { let informant_handle = ::std::thread::spawn(move || {
::std::thread::sleep(Duration::from_secs(5)); ::std::thread::sleep(Duration::from_secs(5));
let mut last_size = 0; let mut last_size = 0;
while !p.done() { loop {
let cur_size = p.bytes(); {
if cur_size != last_size { let progress = p.read();
last_size = cur_size; if !progress.done() {
let bytes = ::informant::format_bytes(cur_size as usize); let cur_size = progress.bytes();
info!("Snapshot: {} accounts (state), {} blocks, {} bytes", p.accounts(), p.blocks(), bytes); if cur_size != last_size {
last_size = cur_size;
let bytes = ::informant::format_bytes(cur_size);
info!("Snapshot: {} accounts (state), {} blocks, {} bytes", progress.accounts(), progress.blocks(), bytes);
}
} else {
break;
}
} }
::std::thread::sleep(Duration::from_secs(5)); ::std::thread::sleep(Duration::from_secs(5));
} }
}); });
@ -282,7 +288,7 @@ impl SnapshotCommand {
info!("snapshot creation complete"); info!("snapshot creation complete");
assert!(progress.done()); assert!(progress.read().done());
informant_handle.join().map_err(|_| "failed to join logger thread")?; informant_handle.join().map_err(|_| "failed to join logger thread")?;
Ok(()) Ok(())