From 0d3423cbe0b70964aa33d2c71685e663adae0315 Mon Sep 17 00:00:00 2001 From: David Date: Mon, 28 Oct 2019 18:24:45 +0100 Subject: [PATCH] Use a lock instead of atomics for snapshot Progress (#11197) * WIP. Typos and logging. * Format todos * Pause pruning while a snapshot is under way Logs, docs and todos * Allocate memory for the full chunk * Name snapshotting threads * Ensure `taking_snapshot` is set to false whenever and however `take_snapshot`returns Rename `take_at` to `request_snapshot_at` Cleanup * Let "in_progress" deletion fail Fix tests * Just use an atomic * Review grumbles * Finish the sentence * Resolve a few todos and clarify comments. * Calculate progress rate since last update * Lockfile * Fix tests * typo * Reinstate default snapshotting frequency Cut down on the logging noise * Use a lock instead of atomics for snapshot Progress * Update ethcore/types/src/snapshot.rs Co-Authored-By: Andronik Ordian * Avoid truncating cast Cleanup --- Cargo.lock | 1 - .../snapshot/snapshot-tests/src/account.rs | 13 +-- .../snapshot/snapshot-tests/src/helpers.rs | 3 +- .../snapshot-tests/src/proof_of_work.rs | 4 +- .../snapshot/snapshot-tests/src/service.rs | 6 +- ethcore/snapshot/snapshot-tests/src/state.rs | 20 +++-- ethcore/snapshot/src/account.rs | 6 +- ethcore/snapshot/src/consensus/authority.rs | 3 +- ethcore/snapshot/src/consensus/work.rs | 7 +- ethcore/snapshot/src/lib.rs | 18 ++--- ethcore/snapshot/src/service.rs | 12 +-- ethcore/snapshot/src/traits.rs | 5 +- ethcore/src/client/client.rs | 2 +- ethcore/types/Cargo.toml | 1 - ethcore/types/src/lib.rs | 1 - ethcore/types/src/snapshot.rs | 81 +++++++------------ parity/informant.rs | 7 +- parity/snapshot_cmd.rs | 26 +++--- 18 files changed, 105 insertions(+), 111 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 823091e8b..4305a617c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -606,7 +606,6 @@ dependencies = [ "parity-crypto 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "parity-snappy 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-util-mem 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "patricia-trie-ethereum 0.1.0", "rlp 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "rlp_derive 0.1.0", diff --git a/ethcore/snapshot/snapshot-tests/src/account.rs b/ethcore/snapshot/snapshot-tests/src/account.rs index fd1e5941b..7e3b10a8e 100644 --- a/ethcore/snapshot/snapshot-tests/src/account.rs +++ b/ethcore/snapshot/snapshot-tests/src/account.rs @@ -28,6 +28,7 @@ use ethereum_types::{H256, Address}; use hash_db::{HashDB, EMPTY_PREFIX}; use keccak_hash::{KECCAK_EMPTY, KECCAK_NULL_RLP, keccak}; use kvdb::DBValue; +use parking_lot::RwLock; use rlp::Rlp; use snapshot::test_helpers::{ACC_EMPTY, to_fat_rlps, from_fat_rlp}; @@ -48,7 +49,7 @@ fn encoding_basic() { let thin_rlp = ::rlp::encode(&account); assert_eq!(::rlp::decode::(&thin_rlp).unwrap(), account); - let p = Progress::new(); + let p = RwLock::new(Progress::new()); let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap(); let fat_rlp = Rlp::new(&fat_rlps[0]).at(1).unwrap(); assert_eq!(from_fat_rlp(&mut AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr)), fat_rlp, H256::zero()).unwrap().0, account); @@ -69,7 +70,7 @@ fn encoding_version() { let thin_rlp = ::rlp::encode(&account); assert_eq!(::rlp::decode::(&thin_rlp).unwrap(), account); - let p = Progress::new(); + let p = RwLock::new(Progress::new()); let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap(); let fat_rlp = Rlp::new(&fat_rlps[0]).at(1).unwrap(); assert_eq!(from_fat_rlp(&mut AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr)), fat_rlp, H256::zero()).unwrap().0, account); @@ -96,7 +97,7 @@ fn encoding_storage() { let thin_rlp = ::rlp::encode(&account); assert_eq!(::rlp::decode::(&thin_rlp).unwrap(), account); - let p = Progress::new(); + let p = RwLock::new(Progress::new()); let fat_rlp = to_fat_rlps(&keccak(&addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap(); let fat_rlp = Rlp::new(&fat_rlp[0]).at(1).unwrap(); @@ -124,7 +125,7 @@ fn encoding_storage_split() { let thin_rlp = ::rlp::encode(&account); assert_eq!(::rlp::decode::(&thin_rlp).unwrap(), account); - let p = Progress::new(); + let p = RwLock::new(Progress::new()); let fat_rlps = to_fat_rlps(&keccak(addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), 500, 1000, &p).unwrap(); let mut root = KECCAK_NULL_RLP; let mut restored_account = None; @@ -170,8 +171,8 @@ fn encoding_code() { }; let mut used_code = HashSet::new(); - let p1 = Progress::new(); - let p2 = Progress::new(); + let p1 = RwLock::new(Progress::new()); + let p2 = RwLock::new(Progress::new()); let fat_rlp1 = to_fat_rlps(&keccak(&addr1), &account1, &AccountDB::from_hash(db.as_hash_db(), keccak(addr1)), &mut used_code, usize::max_value(), usize::max_value(), &p1).unwrap(); let fat_rlp2 = to_fat_rlps(&keccak(&addr2), &account2, &AccountDB::from_hash(db.as_hash_db(), keccak(addr2)), &mut used_code, usize::max_value(), usize::max_value(), &p2).unwrap(); assert_eq!(used_code.len(), 1); diff --git a/ethcore/snapshot/snapshot-tests/src/helpers.rs b/ethcore/snapshot/snapshot-tests/src/helpers.rs index 0c419dd71..c83e14da6 100644 --- a/ethcore/snapshot/snapshot-tests/src/helpers.rs +++ b/ethcore/snapshot/snapshot-tests/src/helpers.rs @@ -40,6 +40,7 @@ use keccak_hash::{KECCAK_NULL_RLP}; use keccak_hasher::KeccakHasher; use kvdb::DBValue; use log::trace; +use parking_lot::RwLock; use rand::Rng; use rlp; use snapshot::{ @@ -146,7 +147,7 @@ pub fn snap(client: &Client) -> (Box, TempDir) { let tempdir = TempDir::new("").unwrap(); let path = tempdir.path().join("file"); let writer = PackedWriter::new(&path).unwrap(); - let progress = Progress::new(); + let progress = RwLock::new(Progress::new()); let hash = client.chain_info().best_block_hash; client.take_snapshot(writer, BlockId::Hash(hash), &progress).unwrap(); diff --git a/ethcore/snapshot/snapshot-tests/src/proof_of_work.rs b/ethcore/snapshot/snapshot-tests/src/proof_of_work.rs index 9d01432ff..90e6bee57 100644 --- a/ethcore/snapshot/snapshot-tests/src/proof_of_work.rs +++ b/ethcore/snapshot/snapshot-tests/src/proof_of_work.rs @@ -31,7 +31,7 @@ use snapshot::{ io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter}, PowSnapshot, }; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; use snappy; use keccak_hash::KECCAK_NULL_RLP; use kvdb::DBTransaction; @@ -74,7 +74,7 @@ fn chunk_and_restore(amount: u64) { &bc, best_hash, &writer, - &Progress::new() + &RwLock::new(Progress::new()) ).unwrap(); let manifest = ManifestData { diff --git a/ethcore/snapshot/snapshot-tests/src/service.rs b/ethcore/snapshot/snapshot-tests/src/service.rs index ebedd5472..9b85f324f 100644 --- a/ethcore/snapshot/snapshot-tests/src/service.rs +++ b/ethcore/snapshot/snapshot-tests/src/service.rs @@ -42,7 +42,7 @@ use ethcore::{ test_helpers::{new_db, new_temp_db, generate_dummy_client_with_spec_and_data, restoration_db_handler} }; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; use ethcore_io::{IoChannel, IoService}; use kvdb_rocksdb::DatabaseConfig; use journaldb::Algorithm; @@ -278,7 +278,7 @@ fn keep_ancient_blocks() { &bc, best_hash, &writer, - &Progress::new() + &RwLock::new(Progress::new()) ).unwrap(); let state_db = client.state_db().journal_db().boxed_clone(); let start_header = bc.block_header_data(&best_hash).unwrap(); @@ -287,7 +287,7 @@ fn keep_ancient_blocks() { state_db.as_hash_db(), &state_root, &writer, - &Progress::new(), + &RwLock::new(Progress::new()), None, 0 ).unwrap(); diff --git a/ethcore/snapshot/snapshot-tests/src/state.rs b/ethcore/snapshot/snapshot-tests/src/state.rs index ef9057964..f53354e69 100644 --- a/ethcore/snapshot/snapshot-tests/src/state.rs +++ b/ethcore/snapshot/snapshot-tests/src/state.rs @@ -35,7 +35,7 @@ use rand_xorshift::XorShiftRng; use ethereum_types::H256; use journaldb::{self, Algorithm}; use kvdb_rocksdb::{Database, DatabaseConfig}; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; use tempdir::TempDir; use crate::helpers::StateProducer; @@ -61,8 +61,9 @@ fn snap_and_restore() { let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap()); let mut state_hashes = Vec::new(); + let progress = RwLock::new(Progress::new()); for part in 0..SNAPSHOT_SUBPARTS { - let mut hashes = chunk_state(&old_db, &state_root, &writer, &Progress::new(), Some(part), 0).unwrap(); + let mut hashes = chunk_state(&old_db, &state_root, &writer, &progress, Some(part), 0).unwrap(); state_hashes.append(&mut hashes); } @@ -133,8 +134,16 @@ fn get_code_from_prev_chunk() { let mut make_chunk = |acc, hash| { let mut db = journaldb::new_memory_db(); AccountDBMut::from_hash(&mut db, hash).insert(EMPTY_PREFIX, &code[..]); - let p = Progress::new(); - let fat_rlp = to_fat_rlps(&hash, &acc, &AccountDB::from_hash(&db, hash), &mut used_code, usize::max_value(), usize::max_value(), &p).unwrap(); + let p = RwLock::new(Progress::new()); + let fat_rlp = to_fat_rlps( + &hash, + &acc, + &AccountDB::from_hash(&db, hash), + &mut used_code, + usize::max_value(), + usize::max_value(), + &p + ).unwrap(); let mut stream = RlpStream::new_list(1); stream.append_raw(&fat_rlp[0], 1); stream.out() @@ -177,8 +186,9 @@ fn checks_flag() { let state_root = producer.state_root(); let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap()); + let progress = RwLock::new(Progress::new()); - let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::new(), None, 0).unwrap(); + let state_hashes = chunk_state(&old_db, &state_root, &writer, &progress, None, 0).unwrap(); writer.into_inner().finish(ManifestData { version: 2, diff --git a/ethcore/snapshot/src/account.rs b/ethcore/snapshot/src/account.rs index da227f176..f7d0657d0 100644 --- a/ethcore/snapshot/src/account.rs +++ b/ethcore/snapshot/src/account.rs @@ -17,7 +17,6 @@ //! Account state encoding and decoding use std::collections::HashSet; -use std::sync::atomic::Ordering; use account_db::{AccountDB, AccountDBMut}; use bytes::Bytes; @@ -31,6 +30,7 @@ use ethtrie::{TrieDB, TrieDBMut}; use hash_db::HashDB; use keccak_hash::{KECCAK_EMPTY, KECCAK_NULL_RLP}; use log::{trace, warn}; +use parking_lot::RwLock; use rlp::{RlpStream, Rlp}; use trie_db::{Trie, TrieMut}; @@ -79,7 +79,7 @@ pub fn to_fat_rlps( used_code: &mut HashSet, first_chunk_size: usize, max_chunk_size: usize, - p: &Progress, + p: &RwLock, ) -> Result, Error> { let db = &(acct_db as &dyn HashDB<_,_>); let db = TrieDB::new(db, &acc.storage_root)?; @@ -135,7 +135,7 @@ pub fn to_fat_rlps( } loop { - if p.abort.load(Ordering::SeqCst) { + if p.read().abort { trace!(target: "snapshot", "to_fat_rlps: aborting snapshot"); return Err(Error::SnapshotAborted); } diff --git a/ethcore/snapshot/src/consensus/authority.rs b/ethcore/snapshot/src/consensus/authority.rs index 6135ba882..b80d4075c 100644 --- a/ethcore/snapshot/src/consensus/authority.rs +++ b/ethcore/snapshot/src/consensus/authority.rs @@ -38,6 +38,7 @@ use ethereum_types::{H256, U256}; use itertools::{Position, Itertools}; use kvdb::KeyValueDB; use log::trace; +use parking_lot::RwLock; use rlp::{RlpStream, Rlp}; use crate::{SnapshotComponents, Rebuilder}; @@ -62,7 +63,7 @@ impl SnapshotComponents for PoaSnapshot { chain: &BlockChain, block_at: H256, sink: &mut ChunkSink, - _progress: &Progress, + _progress: &RwLock, preferred_size: usize, ) -> Result<(), SnapshotError> { let number = chain.block_number(&block_at) diff --git a/ethcore/snapshot/src/consensus/work.rs b/ethcore/snapshot/src/consensus/work.rs index 819e95ca0..9295399a4 100644 --- a/ethcore/snapshot/src/consensus/work.rs +++ b/ethcore/snapshot/src/consensus/work.rs @@ -37,6 +37,7 @@ use engine::Engine; use ethereum_types::{H256, U256}; use kvdb::KeyValueDB; use log::trace; +use parking_lot::RwLock; use rand::rngs::OsRng; use rlp::{RlpStream, Rlp}; use triehash::ordered_trie_root; @@ -72,7 +73,7 @@ impl SnapshotComponents for PowSnapshot { chain: &BlockChain, block_at: H256, chunk_sink: &mut ChunkSink, - progress: &Progress, + progress: &RwLock, preferred_size: usize, ) -> Result<(), SnapshotError> { PowWorker { @@ -110,7 +111,7 @@ struct PowWorker<'a> { rlps: VecDeque, current_hash: H256, writer: &'a mut ChunkSink<'a>, - progress: &'a Progress, + progress: &'a RwLock, preferred_size: usize, } @@ -153,7 +154,7 @@ impl<'a> PowWorker<'a> { last = self.current_hash; self.current_hash = block.header_view().parent_hash(); - self.progress.blocks.fetch_add(1, Ordering::SeqCst); + self.progress.write().blocks += 1; } if loaded_size != 0 { diff --git a/ethcore/snapshot/src/lib.rs b/ethcore/snapshot/src/lib.rs index 406aeeddb..38b2a4403 100644 --- a/ethcore/snapshot/src/lib.rs +++ b/ethcore/snapshot/src/lib.rs @@ -44,7 +44,7 @@ use ethtrie::{TrieDB, TrieDBMut}; use hash_db::HashDB; use journaldb::{self, Algorithm, JournalDB}; use keccak_hasher::KeccakHasher; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; use kvdb::{KeyValueDB, DBValue}; use log::{debug, info, trace}; use num_cpus; @@ -121,7 +121,7 @@ pub fn take_snapshot( block_hash: H256, state_db: &dyn HashDB, writer: W, - p: &Progress, + p: &RwLock, processing_threads: usize, ) -> Result<(), Error> { let start_header = chain.block_header_data(&block_hash) @@ -168,7 +168,7 @@ pub fn take_snapshot( state_hashes.extend(part_state_hashes); } - info!("Took a snapshot at #{} of {} accounts", block_number, p.accounts()); + info!("Took a snapshot at #{} of {} accounts", block_number, p.read().accounts()); Ok((state_hashes, block_hashes)) }).expect("Sub-thread never panics; qed")?; @@ -186,7 +186,7 @@ pub fn take_snapshot( writer.into_inner().finish(manifest_data)?; - p.done.store(true, Ordering::SeqCst); + p.write().done = true; Ok(()) } @@ -202,7 +202,7 @@ pub fn chunk_secondary<'a>( chain: &'a BlockChain, start_hash: H256, writer: &Mutex, - progress: &'a Progress + progress: &'a RwLock ) -> Result, Error> { let mut chunk_hashes = Vec::new(); let mut snappy_buffer = vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)]; @@ -218,7 +218,7 @@ pub fn chunk_secondary<'a>( trace!(target: "snapshot", "wrote secondary chunk. hash: {:x}, size: {}, uncompressed size: {}", hash, size, raw_data.len()); - progress.update(0, size); + progress.write().update(0, size as u64); chunk_hashes.push(hash); Ok(()) }; @@ -242,7 +242,7 @@ struct StateChunker<'a> { cur_size: usize, snappy_buffer: Vec, writer: &'a Mutex, - progress: &'a Progress, + progress: &'a RwLock, thread_idx: usize, } @@ -275,7 +275,7 @@ impl<'a> StateChunker<'a> { self.writer.lock().write_state_chunk(hash, compressed)?; trace!(target: "snapshot", "Thread {} wrote state chunk. size: {}, uncompressed size: {}", self.thread_idx, compressed_size, raw_data.len()); - self.progress.update(num_entries, compressed_size); + self.progress.write().update(num_entries as u64, compressed_size as u64); self.hashes.push(hash); self.cur_size = 0; @@ -300,7 +300,7 @@ pub fn chunk_state<'a>( db: &dyn HashDB, root: &H256, writer: &Mutex, - progress: &'a Progress, + progress: &'a RwLock, part: Option, thread_idx: usize, ) -> Result, Error> { diff --git a/ethcore/snapshot/src/service.rs b/ethcore/snapshot/src/service.rs index da84482db..ad51d3095 100644 --- a/ethcore/snapshot/src/service.rs +++ b/ethcore/snapshot/src/service.rs @@ -259,7 +259,7 @@ pub struct Service { state_chunks: AtomicUsize, block_chunks: AtomicUsize, client: Arc, - progress: Progress, + progress: RwLock, taking_snapshot: AtomicBool, restoring_snapshot: AtomicBool, } @@ -280,7 +280,7 @@ impl Service where C: SnapshotClient + ChainInfo { state_chunks: AtomicUsize::new(0), block_chunks: AtomicUsize::new(0), client: params.client, - progress: Progress::new(), + progress: RwLock::new(Progress::new()), taking_snapshot: AtomicBool::new(false), restoring_snapshot: AtomicBool::new(false), }; @@ -483,9 +483,9 @@ impl Service where C: SnapshotClient + ChainInfo { /// Tick the snapshot service. This will log any active snapshot /// being taken. pub fn tick(&self) { - if self.progress.done() || !self.taking_snapshot.load(Ordering::SeqCst) { return } + if self.progress.read().done() || !self.taking_snapshot.load(Ordering::SeqCst) { return } - let p = &self.progress; + let p = &self.progress.read(); info!("Snapshot: {} accounts, {} blocks, {} bytes", p.accounts(), p.blocks(), p.bytes()); let rate = p.rate(); debug!(target: "snapshot", "Current progress rate: {:.0} acc/s, {:.0} bytes/s (compressed)", rate.0, rate.1); @@ -507,7 +507,7 @@ impl Service where C: SnapshotClient + ChainInfo { self.taking_snapshot.store(false, Ordering::SeqCst); }} let start_time = std::time::Instant::now(); - self.progress.reset(); + *self.progress.write() = Progress::new(); let temp_dir = self.temp_snapshot_dir(); let snapshot_dir = self.snapshot_dir(); @@ -893,7 +893,7 @@ impl SnapshotService for Service { fn abort_snapshot(&self) { if self.taking_snapshot.load(Ordering::SeqCst) { trace!(target: "snapshot", "Aborting snapshot – Snapshot under way"); - self.progress.abort.store(true, Ordering::SeqCst); + self.progress.write().abort = true; } } diff --git a/ethcore/snapshot/src/traits.rs b/ethcore/snapshot/src/traits.rs index 8f5e12a11..7cf812d30 100644 --- a/ethcore/snapshot/src/traits.rs +++ b/ethcore/snapshot/src/traits.rs @@ -26,6 +26,7 @@ use common_types::{ }; use engine::Engine; use ethereum_types::H256; +use parking_lot::RwLock; use crate::io::SnapshotWriter; @@ -108,7 +109,7 @@ pub trait SnapshotComponents: Send { chain: &BlockChain, block_at: H256, chunk_sink: &mut ChunkSink, - progress: &Progress, + progress: &RwLock, preferred_size: usize, ) -> Result<(), SnapshotError>; @@ -141,7 +142,7 @@ pub trait SnapshotClient: BlockChainClient + BlockInfo + DatabaseRestore + Block &self, writer: W, at: BlockId, - p: &Progress, + p: &RwLock, ) -> Result<(), Error>; } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index c1c450486..93e5ca2ec 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -2543,7 +2543,7 @@ impl SnapshotClient for Client { &self, writer: W, at: BlockId, - p: &Progress, + p: &RwLock, ) -> Result<(), EthcoreError> { if let Snapshotting::Unsupported = self.engine.snapshot_mode() { return Err(EthcoreError::Snapshot(SnapshotError::SnapshotsUnsupported)); diff --git a/ethcore/types/Cargo.toml b/ethcore/types/Cargo.toml index dd1dff2fb..509b9d13d 100644 --- a/ethcore/types/Cargo.toml +++ b/ethcore/types/Cargo.toml @@ -15,7 +15,6 @@ parity-bytes = "0.1" parity-crypto = { version = "0.4.2", features = ["publickey"] } parity-util-mem = "0.2.0" parity-snappy = "0.1" -parking_lot = "0.9.0" patricia-trie-ethereum = { path = "../../util/patricia-trie-ethereum" } rlp = "0.4.0" rlp_derive = { path = "../../util/rlp-derive" } diff --git a/ethcore/types/src/lib.rs b/ethcore/types/src/lib.rs index ab17b8837..58ca86cae 100644 --- a/ethcore/types/src/lib.rs +++ b/ethcore/types/src/lib.rs @@ -40,7 +40,6 @@ extern crate parity_crypto; #[macro_use] extern crate derive_more; extern crate keccak_hash as hash; -extern crate parking_lot; extern crate parity_bytes as bytes; extern crate patricia_trie_ethereum as ethtrie; extern crate rlp; diff --git a/ethcore/types/src/snapshot.rs b/ethcore/types/src/snapshot.rs index a3be631be..48b27f137 100644 --- a/ethcore/types/src/snapshot.rs +++ b/ethcore/types/src/snapshot.rs @@ -16,12 +16,10 @@ //! Snapshot type definitions -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::time::Instant; use bytes::Bytes; use ethereum_types::H256; -use parking_lot::RwLock; use rlp::{Rlp, RlpStream, DecoderError}; /// Modes of snapshotting @@ -44,89 +42,68 @@ pub enum Snapshotting { #[derive(Debug)] pub struct Progress { /// Number of accounts processed so far - accounts: AtomicUsize, + accounts: u64, // Number of accounts processed at last tick. - prev_accounts: AtomicUsize, + prev_accounts: u64, /// Number of blocks processed so far - pub blocks: AtomicUsize, + pub blocks: u64, /// Size in bytes of a all compressed chunks processed so far - bytes: AtomicUsize, + bytes: u64, // Number of bytes processed at last tick. - prev_bytes: AtomicUsize, + prev_bytes: u64, /// Signals that the snapshotting process is completed - pub done: AtomicBool, + pub done: bool, /// Signal snapshotting process to abort - pub abort: AtomicBool, + pub abort: bool, - last_tick: RwLock, + last_tick: Instant, } impl Progress { /// Create a new progress tracker. pub fn new() -> Progress { Progress { - accounts: AtomicUsize::new(0), - prev_accounts: AtomicUsize::new(0), - blocks: AtomicUsize::new(0), - bytes: AtomicUsize::new(0), - prev_bytes: AtomicUsize::new(0), - abort: AtomicBool::new(false), - done: AtomicBool::new(false), - last_tick: RwLock::new(Instant::now()), + accounts: 0, + prev_accounts: 0, + blocks: 0, + bytes: 0, + prev_bytes: 0, + abort: false, + done: false, + last_tick: Instant::now(), } } - /// Reset the progress. - pub fn reset(&self) { - self.accounts.store(0, Ordering::Release); - self.blocks.store(0, Ordering::Release); - self.bytes.store(0, Ordering::Release); - self.abort.store(false, Ordering::Release); - - // atomic fence here to ensure the others are written first? - // logs might very rarely get polluted if not. - self.done.store(false, Ordering::Release); - - *self.last_tick.write() = Instant::now(); - } - /// Get the number of accounts snapshotted thus far. - pub fn accounts(&self) -> usize { self.accounts.load(Ordering::Acquire) } + pub fn accounts(&self) -> u64 { self.accounts } /// Get the number of blocks snapshotted thus far. - pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Acquire) } + pub fn blocks(&self) -> u64 { self.blocks } /// Get the written size of the snapshot in bytes. - pub fn bytes(&self) -> usize { self.bytes.load(Ordering::Acquire) } + pub fn bytes(&self) -> u64 { self.bytes } /// Whether the snapshot is complete. - pub fn done(&self) -> bool { self.done.load(Ordering::Acquire) } + pub fn done(&self) -> bool { self.done } /// Return the progress rate over the last tick (i.e. since last update). pub fn rate(&self) -> (f64, f64) { - let last_tick = *self.last_tick.read(); - let dt = last_tick.elapsed().as_secs_f64(); + let dt = self.last_tick.elapsed().as_secs_f64(); if dt < 1.0 { return (0f64, 0f64); } - let delta_acc = self.accounts.load(Ordering::Relaxed) - .saturating_sub(self.prev_accounts.load(Ordering::Relaxed)); - let delta_bytes = self.bytes.load(Ordering::Relaxed) - .saturating_sub(self.prev_bytes.load(Ordering::Relaxed)); + let delta_acc = self.accounts.saturating_sub(self.prev_accounts); + let delta_bytes = self.bytes.saturating_sub(self.prev_bytes); (delta_acc as f64 / dt, delta_bytes as f64 / dt) } /// Update state progress counters and set the last tick. - pub fn update(&self, accounts_delta: usize, bytes_delta: usize) { - *self.last_tick.write() = Instant::now(); - self.prev_accounts.store( - self.accounts.fetch_add(accounts_delta, Ordering::SeqCst), - Ordering::SeqCst - ); - self.prev_bytes.store( - self.bytes.fetch_add(bytes_delta, Ordering::SeqCst), - Ordering::SeqCst - ); + pub fn update(&mut self, accounts_delta: u64, bytes_delta: u64) { + self.last_tick = Instant::now(); + self.prev_accounts = self.accounts; + self.prev_bytes = self.bytes; + self.accounts += accounts_delta; + self.bytes += bytes_delta; } } diff --git a/parity/informant.rs b/parity/informant.rs index b53ffe14e..08ef6423c 100644 --- a/parity/informant.rs +++ b/parity/informant.rs @@ -46,7 +46,7 @@ use ethereum_types::H256; use parking_lot::{RwLock, Mutex}; /// Format byte counts to standard denominations. -pub fn format_bytes(b: usize) -> String { +pub fn format_bytes(b: u64) -> String { match binary_prefix(b as f64) { Standalone(bytes) => format!("{} bytes", bytes), Prefixed(prefix, n) => format!("{:.0} {}B", n, prefix), @@ -69,9 +69,8 @@ impl CacheSizes { use std::fmt::Write; let mut buf = String::new(); - for (name, &size) in &self.sizes { - - write!(buf, " {:>8} {}", paint(style, format_bytes(size)), name) + for (name, size) in &self.sizes { + write!(buf, " {:>8} {}", paint(style, format_bytes(*size as u64)), name) .expect("writing to string won't fail unless OOM; qed") } diff --git a/parity/snapshot_cmd.rs b/parity/snapshot_cmd.rs index 8a9157ad6..94144ddf3 100644 --- a/parity/snapshot_cmd.rs +++ b/parity/snapshot_cmd.rs @@ -27,6 +27,7 @@ use snapshot::service::Service as SnapshotService; use ethcore::client::{Client, DatabaseCompactionProfile, VMType}; use ethcore::miner::Miner; use ethcore_service::ClientService; +use parking_lot::RwLock; use types::{ ids::BlockId, snapshot::Progress, @@ -257,20 +258,25 @@ impl SnapshotCommand { let writer = PackedWriter::new(&file_path) .map_err(|e| format!("Failed to open snapshot writer: {}", e))?; - let progress = Arc::new(Progress::new()); + let progress = Arc::new(RwLock::new(Progress::new())); let p = progress.clone(); let informant_handle = ::std::thread::spawn(move || { ::std::thread::sleep(Duration::from_secs(5)); - let mut last_size = 0; - while !p.done() { - let cur_size = p.bytes(); - if cur_size != last_size { - last_size = cur_size; - let bytes = ::informant::format_bytes(cur_size as usize); - info!("Snapshot: {} accounts (state), {} blocks, {} bytes", p.accounts(), p.blocks(), bytes); + loop { + { + let progress = p.read(); + if !progress.done() { + let cur_size = progress.bytes(); + if cur_size != last_size { + last_size = cur_size; + let bytes = ::informant::format_bytes(cur_size); + info!("Snapshot: {} accounts (state), {} blocks, {} bytes", progress.accounts(), progress.blocks(), bytes); + } + } else { + break; + } } - ::std::thread::sleep(Duration::from_secs(5)); } }); @@ -282,7 +288,7 @@ impl SnapshotCommand { info!("snapshot creation complete"); - assert!(progress.done()); + assert!(progress.read().done()); informant_handle.join().map_err(|_| "failed to join logger thread")?; Ok(())