Add a way to signal shutdown to snapshotting threads (#10744)
* Add a way to signal shutdown to snapshotting threads * Pass Progress to fat_rlps() so we can abort from there too. * Checking for abort in a single spot * Remove nightly-only weak/strong counts * fix warning * Fix tests * Add dummy impl to abort snapshots * Add another dummy impl for TestSnapshotService * Remove debugging code * Return error instead of the odd Ok(()) Switch to AtomicU64
This commit is contained in:
parent
be5db14160
commit
d2120ded56
@ -30,8 +30,10 @@ use blockchain::{BlockChainDB, BlockChainDBHandler};
|
|||||||
use ethcore::client::{Client, ClientConfig, ChainNotify, ClientIoMessage};
|
use ethcore::client::{Client, ClientConfig, ChainNotify, ClientIoMessage};
|
||||||
use ethcore::miner::Miner;
|
use ethcore::miner::Miner;
|
||||||
use ethcore::snapshot::service::{Service as SnapshotService, ServiceParams as SnapServiceParams};
|
use ethcore::snapshot::service::{Service as SnapshotService, ServiceParams as SnapServiceParams};
|
||||||
use ethcore::snapshot::{SnapshotService as _SnapshotService, RestorationStatus};
|
use ethcore::snapshot::{SnapshotService as _SnapshotService, RestorationStatus, Error as SnapshotError};
|
||||||
use ethcore::spec::Spec;
|
use ethcore::spec::Spec;
|
||||||
|
use ethcore::error::Error as EthcoreError;
|
||||||
|
|
||||||
|
|
||||||
use ethcore_private_tx::{self, Importer, Signer};
|
use ethcore_private_tx::{self, Importer, Signer};
|
||||||
use Error;
|
use Error;
|
||||||
@ -197,6 +199,7 @@ impl ClientService {
|
|||||||
|
|
||||||
/// Shutdown the Client Service
|
/// Shutdown the Client Service
|
||||||
pub fn shutdown(&self) {
|
pub fn shutdown(&self) {
|
||||||
|
trace!(target: "shutdown", "Shutting down Client Service");
|
||||||
self.snapshot.shutdown();
|
self.snapshot.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -257,7 +260,11 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
|
|||||||
|
|
||||||
let res = thread::Builder::new().name("Periodic Snapshot".into()).spawn(move || {
|
let res = thread::Builder::new().name("Periodic Snapshot".into()).spawn(move || {
|
||||||
if let Err(e) = snapshot.take_snapshot(&*client, num) {
|
if let Err(e) = snapshot.take_snapshot(&*client, num) {
|
||||||
warn!("Failed to take snapshot at block #{}: {}", num, e);
|
match e {
|
||||||
|
EthcoreError::Snapshot(SnapshotError::SnapshotAborted) => info!("Snapshot aborted"),
|
||||||
|
_ => warn!("Failed to take snapshot at block #{}: {}", num, e),
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -764,8 +764,8 @@ impl Client {
|
|||||||
liveness: AtomicBool::new(awake),
|
liveness: AtomicBool::new(awake),
|
||||||
mode: Mutex::new(config.mode.clone()),
|
mode: Mutex::new(config.mode.clone()),
|
||||||
chain: RwLock::new(chain),
|
chain: RwLock::new(chain),
|
||||||
tracedb: tracedb,
|
tracedb,
|
||||||
engine: engine,
|
engine,
|
||||||
pruning: config.pruning.clone(),
|
pruning: config.pruning.clone(),
|
||||||
db: RwLock::new(db.clone()),
|
db: RwLock::new(db.clone()),
|
||||||
state_db: RwLock::new(state_db),
|
state_db: RwLock::new(state_db),
|
||||||
@ -778,8 +778,8 @@ impl Client {
|
|||||||
ancient_blocks_import_lock: Default::default(),
|
ancient_blocks_import_lock: Default::default(),
|
||||||
queue_consensus_message: IoChannelQueue::new(usize::max_value()),
|
queue_consensus_message: IoChannelQueue::new(usize::max_value()),
|
||||||
last_hashes: RwLock::new(VecDeque::new()),
|
last_hashes: RwLock::new(VecDeque::new()),
|
||||||
factories: factories,
|
factories,
|
||||||
history: history,
|
history,
|
||||||
on_user_defaults_change: Mutex::new(None),
|
on_user_defaults_change: Mutex::new(None),
|
||||||
registrar_address,
|
registrar_address,
|
||||||
exit_handler: Mutex::new(None),
|
exit_handler: Mutex::new(None),
|
||||||
@ -1138,7 +1138,12 @@ impl Client {
|
|||||||
|
|
||||||
/// Take a snapshot at the given block.
|
/// Take a snapshot at the given block.
|
||||||
/// If the ID given is "latest", this will default to 1000 blocks behind.
|
/// If the ID given is "latest", this will default to 1000 blocks behind.
|
||||||
pub fn take_snapshot<W: snapshot_io::SnapshotWriter + Send>(&self, writer: W, at: BlockId, p: &snapshot::Progress) -> Result<(), EthcoreError> {
|
pub fn take_snapshot<W: snapshot_io::SnapshotWriter + Send>(
|
||||||
|
&self,
|
||||||
|
writer: W,
|
||||||
|
at: BlockId,
|
||||||
|
p: &snapshot::Progress,
|
||||||
|
) -> Result<(), EthcoreError> {
|
||||||
let db = self.state_db.read().journal_db().boxed_clone();
|
let db = self.state_db.read().journal_db().boxed_clone();
|
||||||
let best_block_number = self.chain_info().best_block_number;
|
let best_block_number = self.chain_info().best_block_number;
|
||||||
let block_number = self.block_number(at).ok_or_else(|| snapshot::Error::InvalidStartingBlock(at))?;
|
let block_number = self.block_number(at).ok_or_else(|| snapshot::Error::InvalidStartingBlock(at))?;
|
||||||
@ -1168,8 +1173,16 @@ impl Client {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let processing_threads = self.config.snapshot.processing_threads;
|
let processing_threads = self.config.snapshot.processing_threads;
|
||||||
snapshot::take_snapshot(&*self.engine, &self.chain.read(), start_hash, db.as_hash_db(), writer, p, processing_threads)?;
|
let chunker = self.engine.snapshot_components().ok_or(snapshot::Error::SnapshotsUnsupported)?;
|
||||||
|
snapshot::take_snapshot(
|
||||||
|
chunker,
|
||||||
|
&self.chain.read(),
|
||||||
|
start_hash,
|
||||||
|
db.as_hash_db(),
|
||||||
|
writer,
|
||||||
|
p,
|
||||||
|
processing_threads,
|
||||||
|
)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,9 +24,10 @@ use ethtrie::{TrieDB, TrieDBMut};
|
|||||||
use hash::{KECCAK_EMPTY, KECCAK_NULL_RLP};
|
use hash::{KECCAK_EMPTY, KECCAK_NULL_RLP};
|
||||||
use hash_db::HashDB;
|
use hash_db::HashDB;
|
||||||
use rlp::{RlpStream, Rlp};
|
use rlp::{RlpStream, Rlp};
|
||||||
use snapshot::Error;
|
use snapshot::{Error, Progress};
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use trie::{Trie, TrieMut};
|
use trie::{Trie, TrieMut};
|
||||||
|
use std::sync::atomic::Ordering;
|
||||||
|
|
||||||
// An empty account -- these were replaced with RLP null data for a space optimization in v1.
|
// An empty account -- these were replaced with RLP null data for a space optimization in v1.
|
||||||
const ACC_EMPTY: BasicAccount = BasicAccount {
|
const ACC_EMPTY: BasicAccount = BasicAccount {
|
||||||
@ -65,7 +66,15 @@ impl CodeState {
|
|||||||
// walk the account's storage trie, returning a vector of RLP items containing the
|
// walk the account's storage trie, returning a vector of RLP items containing the
|
||||||
// account address hash, account properties and the storage. Each item contains at most `max_storage_items`
|
// account address hash, account properties and the storage. Each item contains at most `max_storage_items`
|
||||||
// storage records split according to snapshot format definition.
|
// storage records split according to snapshot format definition.
|
||||||
pub fn to_fat_rlps(account_hash: &H256, acc: &BasicAccount, acct_db: &AccountDB, used_code: &mut HashSet<H256>, first_chunk_size: usize, max_chunk_size: usize) -> Result<Vec<Bytes>, Error> {
|
pub fn to_fat_rlps(
|
||||||
|
account_hash: &H256,
|
||||||
|
acc: &BasicAccount,
|
||||||
|
acct_db: &AccountDB,
|
||||||
|
used_code: &mut HashSet<H256>,
|
||||||
|
first_chunk_size: usize,
|
||||||
|
max_chunk_size: usize,
|
||||||
|
p: &Progress,
|
||||||
|
) -> Result<Vec<Bytes>, Error> {
|
||||||
let db = &(acct_db as &dyn HashDB<_,_>);
|
let db = &(acct_db as &dyn HashDB<_,_>);
|
||||||
let db = TrieDB::new(db, &acc.storage_root)?;
|
let db = TrieDB::new(db, &acc.storage_root)?;
|
||||||
let mut chunks = Vec::new();
|
let mut chunks = Vec::new();
|
||||||
@ -112,6 +121,10 @@ pub fn to_fat_rlps(account_hash: &H256, acc: &BasicAccount, acct_db: &AccountDB,
|
|||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
if p.abort.load(Ordering::SeqCst) {
|
||||||
|
trace!(target: "snapshot", "to_fat_rlps: aborting snapshot");
|
||||||
|
return Err(Error::SnapshotAborted);
|
||||||
|
}
|
||||||
match db_iter.next() {
|
match db_iter.next() {
|
||||||
Some(Ok((k, v))) => {
|
Some(Ok((k, v))) => {
|
||||||
let pair = {
|
let pair = {
|
||||||
@ -211,6 +224,7 @@ mod tests {
|
|||||||
use types::basic_account::BasicAccount;
|
use types::basic_account::BasicAccount;
|
||||||
use test_helpers::get_temp_state_db;
|
use test_helpers::get_temp_state_db;
|
||||||
use snapshot::tests::helpers::fill_storage;
|
use snapshot::tests::helpers::fill_storage;
|
||||||
|
use snapshot::Progress;
|
||||||
|
|
||||||
use hash::{KECCAK_EMPTY, KECCAK_NULL_RLP, keccak};
|
use hash::{KECCAK_EMPTY, KECCAK_NULL_RLP, keccak};
|
||||||
use ethereum_types::{H256, Address};
|
use ethereum_types::{H256, Address};
|
||||||
@ -236,8 +250,8 @@ mod tests {
|
|||||||
|
|
||||||
let thin_rlp = ::rlp::encode(&account);
|
let thin_rlp = ::rlp::encode(&account);
|
||||||
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
|
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
|
||||||
|
let p = Progress::default();
|
||||||
let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value()).unwrap();
|
let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
|
||||||
let fat_rlp = Rlp::new(&fat_rlps[0]).at(1).unwrap();
|
let fat_rlp = Rlp::new(&fat_rlps[0]).at(1).unwrap();
|
||||||
assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hash_db_mut(), &addr), fat_rlp, H256::zero()).unwrap().0, account);
|
assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hash_db_mut(), &addr), fat_rlp, H256::zero()).unwrap().0, account);
|
||||||
}
|
}
|
||||||
@ -262,7 +276,9 @@ mod tests {
|
|||||||
let thin_rlp = ::rlp::encode(&account);
|
let thin_rlp = ::rlp::encode(&account);
|
||||||
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
|
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
|
||||||
|
|
||||||
let fat_rlp = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value()).unwrap();
|
let p = Progress::default();
|
||||||
|
|
||||||
|
let fat_rlp = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
|
||||||
let fat_rlp = Rlp::new(&fat_rlp[0]).at(1).unwrap();
|
let fat_rlp = Rlp::new(&fat_rlp[0]).at(1).unwrap();
|
||||||
assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hash_db_mut(), &addr), fat_rlp, H256::zero()).unwrap().0, account);
|
assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hash_db_mut(), &addr), fat_rlp, H256::zero()).unwrap().0, account);
|
||||||
}
|
}
|
||||||
@ -287,7 +303,8 @@ mod tests {
|
|||||||
let thin_rlp = ::rlp::encode(&account);
|
let thin_rlp = ::rlp::encode(&account);
|
||||||
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
|
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
|
||||||
|
|
||||||
let fat_rlps = to_fat_rlps(&keccak(addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), 500, 1000).unwrap();
|
let p = Progress::default();
|
||||||
|
let fat_rlps = to_fat_rlps(&keccak(addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), 500, 1000, &p).unwrap();
|
||||||
let mut root = KECCAK_NULL_RLP;
|
let mut root = KECCAK_NULL_RLP;
|
||||||
let mut restored_account = None;
|
let mut restored_account = None;
|
||||||
for rlp in fat_rlps {
|
for rlp in fat_rlps {
|
||||||
@ -319,20 +336,21 @@ mod tests {
|
|||||||
nonce: 50.into(),
|
nonce: 50.into(),
|
||||||
balance: 123456789.into(),
|
balance: 123456789.into(),
|
||||||
storage_root: KECCAK_NULL_RLP,
|
storage_root: KECCAK_NULL_RLP,
|
||||||
code_hash: code_hash,
|
code_hash,
|
||||||
};
|
};
|
||||||
|
|
||||||
let account2 = BasicAccount {
|
let account2 = BasicAccount {
|
||||||
nonce: 400.into(),
|
nonce: 400.into(),
|
||||||
balance: 98765432123456789usize.into(),
|
balance: 98765432123456789usize.into(),
|
||||||
storage_root: KECCAK_NULL_RLP,
|
storage_root: KECCAK_NULL_RLP,
|
||||||
code_hash: code_hash,
|
code_hash,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut used_code = HashSet::new();
|
let mut used_code = HashSet::new();
|
||||||
|
let p1 = Progress::default();
|
||||||
let fat_rlp1 = to_fat_rlps(&keccak(&addr1), &account1, &AccountDB::new(db.as_hash_db(), &addr1), &mut used_code, usize::max_value(), usize::max_value()).unwrap();
|
let p2 = Progress::default();
|
||||||
let fat_rlp2 = to_fat_rlps(&keccak(&addr2), &account2, &AccountDB::new(db.as_hash_db(), &addr2), &mut used_code, usize::max_value(), usize::max_value()).unwrap();
|
let fat_rlp1 = to_fat_rlps(&keccak(&addr1), &account1, &AccountDB::new(db.as_hash_db(), &addr1), &mut used_code, usize::max_value(), usize::max_value(), &p1).unwrap();
|
||||||
|
let fat_rlp2 = to_fat_rlps(&keccak(&addr2), &account2, &AccountDB::new(db.as_hash_db(), &addr2), &mut used_code, usize::max_value(), usize::max_value(), &p2).unwrap();
|
||||||
assert_eq!(used_code.len(), 1);
|
assert_eq!(used_code.len(), 1);
|
||||||
|
|
||||||
let fat_rlp1 = Rlp::new(&fat_rlp1[0]).at(1).unwrap();
|
let fat_rlp1 = Rlp::new(&fat_rlp1[0]).at(1).unwrap();
|
||||||
|
@ -62,6 +62,8 @@ pub enum Error {
|
|||||||
ChunkTooLarge,
|
ChunkTooLarge,
|
||||||
/// Snapshots not supported by the consensus engine.
|
/// Snapshots not supported by the consensus engine.
|
||||||
SnapshotsUnsupported,
|
SnapshotsUnsupported,
|
||||||
|
/// Aborted snapshot
|
||||||
|
SnapshotAborted,
|
||||||
/// Bad epoch transition.
|
/// Bad epoch transition.
|
||||||
BadEpochProof(u64),
|
BadEpochProof(u64),
|
||||||
/// Wrong chunk format.
|
/// Wrong chunk format.
|
||||||
@ -103,6 +105,7 @@ impl fmt::Display for Error {
|
|||||||
Error::ChunkTooSmall => write!(f, "Chunk size is too small."),
|
Error::ChunkTooSmall => write!(f, "Chunk size is too small."),
|
||||||
Error::ChunkTooLarge => write!(f, "Chunk size is too large."),
|
Error::ChunkTooLarge => write!(f, "Chunk size is too large."),
|
||||||
Error::SnapshotsUnsupported => write!(f, "Snapshots unsupported by consensus engine."),
|
Error::SnapshotsUnsupported => write!(f, "Snapshots unsupported by consensus engine."),
|
||||||
|
Error::SnapshotAborted => write!(f, "Snapshot was aborted."),
|
||||||
Error::BadEpochProof(i) => write!(f, "Bad epoch proof for transition to epoch {}", i),
|
Error::BadEpochProof(i) => write!(f, "Bad epoch proof for transition to epoch {}", i),
|
||||||
Error::WrongChunkFormat(ref msg) => write!(f, "Wrong chunk format: {}", msg),
|
Error::WrongChunkFormat(ref msg) => write!(f, "Wrong chunk format: {}", msg),
|
||||||
Error::UnlinkedAncientBlockChain => write!(f, "Unlinked ancient blocks chain"),
|
Error::UnlinkedAncientBlockChain => write!(f, "Unlinked ancient blocks chain"),
|
||||||
|
@ -310,10 +310,7 @@ impl LooseReader {
|
|||||||
|
|
||||||
dir.pop();
|
dir.pop();
|
||||||
|
|
||||||
Ok(LooseReader {
|
Ok(LooseReader { dir, manifest })
|
||||||
dir: dir,
|
|
||||||
manifest: manifest,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,7 +22,7 @@
|
|||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
|
||||||
use hash::{keccak, KECCAK_NULL_RLP, KECCAK_EMPTY};
|
use hash::{keccak, KECCAK_NULL_RLP, KECCAK_EMPTY};
|
||||||
|
|
||||||
use account_db::{AccountDB, AccountDBMut};
|
use account_db::{AccountDB, AccountDBMut};
|
||||||
@ -117,8 +117,9 @@ impl Default for SnapshotConfiguration {
|
|||||||
pub struct Progress {
|
pub struct Progress {
|
||||||
accounts: AtomicUsize,
|
accounts: AtomicUsize,
|
||||||
blocks: AtomicUsize,
|
blocks: AtomicUsize,
|
||||||
size: AtomicUsize, // Todo [rob] use Atomicu64 when it stabilizes.
|
size: AtomicU64,
|
||||||
done: AtomicBool,
|
done: AtomicBool,
|
||||||
|
abort: AtomicBool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Progress {
|
impl Progress {
|
||||||
@ -127,6 +128,7 @@ impl Progress {
|
|||||||
self.accounts.store(0, Ordering::Release);
|
self.accounts.store(0, Ordering::Release);
|
||||||
self.blocks.store(0, Ordering::Release);
|
self.blocks.store(0, Ordering::Release);
|
||||||
self.size.store(0, Ordering::Release);
|
self.size.store(0, Ordering::Release);
|
||||||
|
self.abort.store(false, Ordering::Release);
|
||||||
|
|
||||||
// atomic fence here to ensure the others are written first?
|
// atomic fence here to ensure the others are written first?
|
||||||
// logs might very rarely get polluted if not.
|
// logs might very rarely get polluted if not.
|
||||||
@ -140,7 +142,7 @@ impl Progress {
|
|||||||
pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Acquire) }
|
pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Acquire) }
|
||||||
|
|
||||||
/// Get the written size of the snapshot in bytes.
|
/// Get the written size of the snapshot in bytes.
|
||||||
pub fn size(&self) -> usize { self.size.load(Ordering::Acquire) }
|
pub fn size(&self) -> u64 { self.size.load(Ordering::Acquire) }
|
||||||
|
|
||||||
/// Whether the snapshot is complete.
|
/// Whether the snapshot is complete.
|
||||||
pub fn done(&self) -> bool { self.done.load(Ordering::Acquire) }
|
pub fn done(&self) -> bool { self.done.load(Ordering::Acquire) }
|
||||||
@ -148,27 +150,28 @@ impl Progress {
|
|||||||
}
|
}
|
||||||
/// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer.
|
/// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer.
|
||||||
pub fn take_snapshot<W: SnapshotWriter + Send>(
|
pub fn take_snapshot<W: SnapshotWriter + Send>(
|
||||||
engine: &dyn EthEngine,
|
chunker: Box<dyn SnapshotComponents>,
|
||||||
chain: &BlockChain,
|
chain: &BlockChain,
|
||||||
block_at: H256,
|
block_hash: H256,
|
||||||
state_db: &dyn HashDB<KeccakHasher, DBValue>,
|
state_db: &dyn HashDB<KeccakHasher, DBValue>,
|
||||||
writer: W,
|
writer: W,
|
||||||
p: &Progress,
|
p: &Progress,
|
||||||
processing_threads: usize,
|
processing_threads: usize,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let start_header = chain.block_header_data(&block_at)
|
let start_header = chain.block_header_data(&block_hash)
|
||||||
.ok_or_else(|| Error::InvalidStartingBlock(BlockId::Hash(block_at)))?;
|
.ok_or_else(|| Error::InvalidStartingBlock(BlockId::Hash(block_hash)))?;
|
||||||
let state_root = start_header.state_root();
|
let state_root = start_header.state_root();
|
||||||
let number = start_header.number();
|
let block_number = start_header.number();
|
||||||
|
|
||||||
info!("Taking snapshot starting at block {}", number);
|
info!("Taking snapshot starting at block {}", block_number);
|
||||||
|
|
||||||
|
let version = chunker.current_version();
|
||||||
let writer = Mutex::new(writer);
|
let writer = Mutex::new(writer);
|
||||||
let chunker = engine.snapshot_components().ok_or(Error::SnapshotsUnsupported)?;
|
|
||||||
let snapshot_version = chunker.current_version();
|
|
||||||
let (state_hashes, block_hashes) = scope(|scope| -> Result<(Vec<H256>, Vec<H256>), Error> {
|
let (state_hashes, block_hashes) = scope(|scope| -> Result<(Vec<H256>, Vec<H256>), Error> {
|
||||||
let writer = &writer;
|
let writer = &writer;
|
||||||
let block_guard = scope.spawn(move || chunk_secondary(chunker, chain, block_at, writer, p));
|
let block_guard = scope.spawn(move || {
|
||||||
|
chunk_secondary(chunker, chain, block_hash, writer, p)
|
||||||
|
});
|
||||||
|
|
||||||
// The number of threads must be between 1 and SNAPSHOT_SUBPARTS
|
// The number of threads must be between 1 and SNAPSHOT_SUBPARTS
|
||||||
assert!(processing_threads >= 1, "Cannot use less than 1 threads for creating snapshots");
|
assert!(processing_threads >= 1, "Cannot use less than 1 threads for creating snapshots");
|
||||||
@ -183,7 +186,7 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
|
|||||||
|
|
||||||
for part in (thread_idx..SNAPSHOT_SUBPARTS).step_by(num_threads) {
|
for part in (thread_idx..SNAPSHOT_SUBPARTS).step_by(num_threads) {
|
||||||
debug!(target: "snapshot", "Chunking part {} in thread {}", part, thread_idx);
|
debug!(target: "snapshot", "Chunking part {} in thread {}", part, thread_idx);
|
||||||
let mut hashes = chunk_state(state_db, &state_root, writer, p, Some(part))?;
|
let mut hashes = chunk_state(state_db, &state_root, writer, p, Some(part), thread_idx)?;
|
||||||
chunk_hashes.append(&mut hashes);
|
chunk_hashes.append(&mut hashes);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -207,12 +210,12 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
|
|||||||
info!(target: "snapshot", "produced {} state chunks and {} block chunks.", state_hashes.len(), block_hashes.len());
|
info!(target: "snapshot", "produced {} state chunks and {} block chunks.", state_hashes.len(), block_hashes.len());
|
||||||
|
|
||||||
let manifest_data = ManifestData {
|
let manifest_data = ManifestData {
|
||||||
version: snapshot_version,
|
version,
|
||||||
state_hashes: state_hashes,
|
state_hashes,
|
||||||
block_hashes: block_hashes,
|
block_hashes,
|
||||||
state_root: state_root,
|
state_root,
|
||||||
block_number: number,
|
block_number,
|
||||||
block_hash: block_at,
|
block_hash,
|
||||||
};
|
};
|
||||||
|
|
||||||
writer.into_inner().finish(manifest_data)?;
|
writer.into_inner().finish(manifest_data)?;
|
||||||
@ -228,7 +231,13 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
|
|||||||
/// Secondary chunks are engine-specific, but they intend to corroborate the state data
|
/// Secondary chunks are engine-specific, but they intend to corroborate the state data
|
||||||
/// in the state chunks.
|
/// in the state chunks.
|
||||||
/// Returns a list of chunk hashes, with the first having the blocks furthest from the genesis.
|
/// Returns a list of chunk hashes, with the first having the blocks furthest from the genesis.
|
||||||
pub fn chunk_secondary<'a>(mut chunker: Box<dyn SnapshotComponents>, chain: &'a BlockChain, start_hash: H256, writer: &Mutex<dyn SnapshotWriter + 'a>, progress: &'a Progress) -> Result<Vec<H256>, Error> {
|
pub fn chunk_secondary<'a>(
|
||||||
|
mut chunker: Box<dyn SnapshotComponents>,
|
||||||
|
chain: &'a BlockChain,
|
||||||
|
start_hash: H256,
|
||||||
|
writer: &Mutex<dyn SnapshotWriter + 'a>,
|
||||||
|
progress: &'a Progress
|
||||||
|
) -> Result<Vec<H256>, Error> {
|
||||||
let mut chunk_hashes = Vec::new();
|
let mut chunk_hashes = Vec::new();
|
||||||
let mut snappy_buffer = vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)];
|
let mut snappy_buffer = vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)];
|
||||||
|
|
||||||
@ -243,7 +252,7 @@ pub fn chunk_secondary<'a>(mut chunker: Box<dyn SnapshotComponents>, chain: &'a
|
|||||||
trace!(target: "snapshot", "wrote secondary chunk. hash: {:x}, size: {}, uncompressed size: {}",
|
trace!(target: "snapshot", "wrote secondary chunk. hash: {:x}, size: {}, uncompressed size: {}",
|
||||||
hash, size, raw_data.len());
|
hash, size, raw_data.len());
|
||||||
|
|
||||||
progress.size.fetch_add(size, Ordering::SeqCst);
|
progress.size.fetch_add(size as u64, Ordering::SeqCst);
|
||||||
chunk_hashes.push(hash);
|
chunk_hashes.push(hash);
|
||||||
Ok(())
|
Ok(())
|
||||||
};
|
};
|
||||||
@ -268,6 +277,7 @@ struct StateChunker<'a> {
|
|||||||
snappy_buffer: Vec<u8>,
|
snappy_buffer: Vec<u8>,
|
||||||
writer: &'a Mutex<dyn SnapshotWriter + 'a>,
|
writer: &'a Mutex<dyn SnapshotWriter + 'a>,
|
||||||
progress: &'a Progress,
|
progress: &'a Progress,
|
||||||
|
thread_idx: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> StateChunker<'a> {
|
impl<'a> StateChunker<'a> {
|
||||||
@ -297,10 +307,10 @@ impl<'a> StateChunker<'a> {
|
|||||||
let hash = keccak(&compressed);
|
let hash = keccak(&compressed);
|
||||||
|
|
||||||
self.writer.lock().write_state_chunk(hash, compressed)?;
|
self.writer.lock().write_state_chunk(hash, compressed)?;
|
||||||
trace!(target: "snapshot", "wrote state chunk. size: {}, uncompressed size: {}", compressed_size, raw_data.len());
|
trace!(target: "snapshot", "Thread {} wrote state chunk. size: {}, uncompressed size: {}", self.thread_idx, compressed_size, raw_data.len());
|
||||||
|
|
||||||
self.progress.accounts.fetch_add(num_entries, Ordering::SeqCst);
|
self.progress.accounts.fetch_add(num_entries, Ordering::SeqCst);
|
||||||
self.progress.size.fetch_add(compressed_size, Ordering::SeqCst);
|
self.progress.size.fetch_add(compressed_size as u64, Ordering::SeqCst);
|
||||||
|
|
||||||
self.hashes.push(hash);
|
self.hashes.push(hash);
|
||||||
self.cur_size = 0;
|
self.cur_size = 0;
|
||||||
@ -321,7 +331,14 @@ impl<'a> StateChunker<'a> {
|
|||||||
///
|
///
|
||||||
/// Returns a list of hashes of chunks created, or any error it may
|
/// Returns a list of hashes of chunks created, or any error it may
|
||||||
/// have encountered.
|
/// have encountered.
|
||||||
pub fn chunk_state<'a>(db: &dyn HashDB<KeccakHasher, DBValue>, root: &H256, writer: &Mutex<dyn SnapshotWriter + 'a>, progress: &'a Progress, part: Option<usize>) -> Result<Vec<H256>, Error> {
|
pub fn chunk_state<'a>(
|
||||||
|
db: &dyn HashDB<KeccakHasher, DBValue>,
|
||||||
|
root: &H256,
|
||||||
|
writer: &Mutex<dyn SnapshotWriter + 'a>,
|
||||||
|
progress: &'a Progress,
|
||||||
|
part: Option<usize>,
|
||||||
|
thread_idx: usize,
|
||||||
|
) -> Result<Vec<H256>, Error> {
|
||||||
let account_trie = TrieDB::new(&db, &root)?;
|
let account_trie = TrieDB::new(&db, &root)?;
|
||||||
|
|
||||||
let mut chunker = StateChunker {
|
let mut chunker = StateChunker {
|
||||||
@ -329,8 +346,9 @@ pub fn chunk_state<'a>(db: &dyn HashDB<KeccakHasher, DBValue>, root: &H256, writ
|
|||||||
rlps: Vec::new(),
|
rlps: Vec::new(),
|
||||||
cur_size: 0,
|
cur_size: 0,
|
||||||
snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)],
|
snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)],
|
||||||
writer: writer,
|
writer,
|
||||||
progress: progress,
|
progress,
|
||||||
|
thread_idx,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut used_code = HashSet::new();
|
let mut used_code = HashSet::new();
|
||||||
@ -365,7 +383,7 @@ pub fn chunk_state<'a>(db: &dyn HashDB<KeccakHasher, DBValue>, root: &H256, writ
|
|||||||
let account = ::rlp::decode(&*account_data)?;
|
let account = ::rlp::decode(&*account_data)?;
|
||||||
let account_db = AccountDB::from_hash(db, account_key_hash);
|
let account_db = AccountDB::from_hash(db, account_key_hash);
|
||||||
|
|
||||||
let fat_rlps = account::to_fat_rlps(&account_key_hash, &account, &account_db, &mut used_code, PREFERRED_CHUNK_SIZE - chunker.chunk_size(), PREFERRED_CHUNK_SIZE)?;
|
let fat_rlps = account::to_fat_rlps(&account_key_hash, &account, &account_db, &mut used_code, PREFERRED_CHUNK_SIZE - chunker.chunk_size(), PREFERRED_CHUNK_SIZE, progress)?;
|
||||||
for (i, fat_rlp) in fat_rlps.into_iter().enumerate() {
|
for (i, fat_rlp) in fat_rlps.into_iter().enumerate() {
|
||||||
if i > 0 {
|
if i > 0 {
|
||||||
chunker.write_chunk()?;
|
chunker.write_chunk()?;
|
||||||
|
@ -415,7 +415,7 @@ impl Service {
|
|||||||
_ => break,
|
_ => break,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Writting changes to DB and logging every now and then
|
// Writing changes to DB and logging every now and then
|
||||||
if block_number % 1_000 == 0 {
|
if block_number % 1_000 == 0 {
|
||||||
next_db.key_value().write_buffered(batch);
|
next_db.key_value().write_buffered(batch);
|
||||||
next_chain.commit();
|
next_chain.commit();
|
||||||
@ -479,16 +479,12 @@ impl Service {
|
|||||||
|
|
||||||
let guard = Guard::new(temp_dir.clone());
|
let guard = Guard::new(temp_dir.clone());
|
||||||
let res = client.take_snapshot(writer, BlockId::Number(num), &self.progress);
|
let res = client.take_snapshot(writer, BlockId::Number(num), &self.progress);
|
||||||
|
|
||||||
self.taking_snapshot.store(false, Ordering::SeqCst);
|
self.taking_snapshot.store(false, Ordering::SeqCst);
|
||||||
if let Err(e) = res {
|
if let Err(e) = res {
|
||||||
if client.chain_info().best_block_number >= num + client.pruning_history() {
|
if client.chain_info().best_block_number >= num + client.pruning_history() {
|
||||||
// "Cancelled" is mincing words a bit -- what really happened
|
// The state we were snapshotting was pruned before we could finish.
|
||||||
// is that the state we were snapshotting got pruned out
|
info!("Periodic snapshot failed: block state pruned. Run with a longer `--pruning-history` or with `--no-periodic-snapshot`");
|
||||||
// before we could finish.
|
return Err(e);
|
||||||
info!("Periodic snapshot failed: block state pruned.\
|
|
||||||
Run with a longer `--pruning-history` or with `--no-periodic-snapshot`");
|
|
||||||
return Ok(())
|
|
||||||
} else {
|
} else {
|
||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
@ -846,14 +842,29 @@ impl SnapshotService for Service {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn abort_snapshot(&self) {
|
||||||
|
if self.taking_snapshot.load(Ordering::SeqCst) {
|
||||||
|
trace!(target: "snapshot", "Aborting snapshot – Snapshot under way");
|
||||||
|
self.progress.abort.store(true, Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn shutdown(&self) {
|
fn shutdown(&self) {
|
||||||
|
trace!(target: "snapshot", "Shut down SnapshotService");
|
||||||
self.abort_restore();
|
self.abort_restore();
|
||||||
|
trace!(target: "snapshot", "Shut down SnapshotService - restore aborted");
|
||||||
|
self.abort_snapshot();
|
||||||
|
trace!(target: "snapshot", "Shut down SnapshotService - snapshot aborted");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for Service {
|
impl Drop for Service {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
|
trace!(target: "shutdown", "Dropping Service");
|
||||||
self.abort_restore();
|
self.abort_restore();
|
||||||
|
trace!(target: "shutdown", "Dropping Service - restore aborted");
|
||||||
|
self.abort_snapshot();
|
||||||
|
trace!(target: "shutdown", "Dropping Service - snapshot aborted");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,14 +188,15 @@ fn keep_ancient_blocks() {
|
|||||||
&state_root,
|
&state_root,
|
||||||
&writer,
|
&writer,
|
||||||
&Progress::default(),
|
&Progress::default(),
|
||||||
None
|
None,
|
||||||
|
0
|
||||||
).unwrap();
|
).unwrap();
|
||||||
|
|
||||||
let manifest = ::snapshot::ManifestData {
|
let manifest = ::snapshot::ManifestData {
|
||||||
version: 2,
|
version: 2,
|
||||||
state_hashes: state_hashes,
|
state_hashes,
|
||||||
state_root: state_root,
|
state_root,
|
||||||
block_hashes: block_hashes,
|
block_hashes,
|
||||||
block_number: NUM_BLOCKS,
|
block_number: NUM_BLOCKS,
|
||||||
block_hash: best_hash,
|
block_hash: best_hash,
|
||||||
};
|
};
|
||||||
|
@ -58,7 +58,7 @@ fn snap_and_restore() {
|
|||||||
|
|
||||||
let mut state_hashes = Vec::new();
|
let mut state_hashes = Vec::new();
|
||||||
for part in 0..SNAPSHOT_SUBPARTS {
|
for part in 0..SNAPSHOT_SUBPARTS {
|
||||||
let mut hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), Some(part)).unwrap();
|
let mut hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), Some(part), 0).unwrap();
|
||||||
state_hashes.append(&mut hashes);
|
state_hashes.append(&mut hashes);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -129,8 +129,8 @@ fn get_code_from_prev_chunk() {
|
|||||||
let mut make_chunk = |acc, hash| {
|
let mut make_chunk = |acc, hash| {
|
||||||
let mut db = journaldb::new_memory_db();
|
let mut db = journaldb::new_memory_db();
|
||||||
AccountDBMut::from_hash(&mut db, hash).insert(&code[..]);
|
AccountDBMut::from_hash(&mut db, hash).insert(&code[..]);
|
||||||
|
let p = Progress::default();
|
||||||
let fat_rlp = account::to_fat_rlps(&hash, &acc, &AccountDB::from_hash(&db, hash), &mut used_code, usize::max_value(), usize::max_value()).unwrap();
|
let fat_rlp = account::to_fat_rlps(&hash, &acc, &AccountDB::from_hash(&db, hash), &mut used_code, usize::max_value(), usize::max_value(), &p).unwrap();
|
||||||
let mut stream = RlpStream::new_list(1);
|
let mut stream = RlpStream::new_list(1);
|
||||||
stream.append_raw(&fat_rlp[0], 1);
|
stream.append_raw(&fat_rlp[0], 1);
|
||||||
stream.out()
|
stream.out()
|
||||||
@ -174,13 +174,13 @@ fn checks_flag() {
|
|||||||
let state_root = producer.state_root();
|
let state_root = producer.state_root();
|
||||||
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());
|
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());
|
||||||
|
|
||||||
let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), None).unwrap();
|
let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), None, 0).unwrap();
|
||||||
|
|
||||||
writer.into_inner().finish(::snapshot::ManifestData {
|
writer.into_inner().finish(::snapshot::ManifestData {
|
||||||
version: 2,
|
version: 2,
|
||||||
state_hashes: state_hashes,
|
state_hashes,
|
||||||
block_hashes: Vec::new(),
|
block_hashes: Vec::new(),
|
||||||
state_root: state_root,
|
state_root,
|
||||||
block_number: 0,
|
block_number: 0,
|
||||||
block_hash: H256::zero(),
|
block_hash: H256::zero(),
|
||||||
}).unwrap();
|
}).unwrap();
|
||||||
|
@ -55,6 +55,9 @@ pub trait SnapshotService : Sync + Send {
|
|||||||
/// no-op if currently restoring.
|
/// no-op if currently restoring.
|
||||||
fn restore_block_chunk(&self, hash: H256, chunk: Bytes);
|
fn restore_block_chunk(&self, hash: H256, chunk: Bytes);
|
||||||
|
|
||||||
|
/// Abort in-progress snapshotting if there is one.
|
||||||
|
fn abort_snapshot(&self);
|
||||||
|
|
||||||
/// Shutdown the Snapshot Service by aborting any ongoing restore
|
/// Shutdown the Snapshot Service by aborting any ongoing restore
|
||||||
fn shutdown(&self);
|
fn shutdown(&self);
|
||||||
}
|
}
|
||||||
|
@ -122,6 +122,8 @@ impl SnapshotService for TestSnapshotService {
|
|||||||
self.block_restoration_chunks.lock().clear();
|
self.block_restoration_chunks.lock().clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn abort_snapshot(&self) {}
|
||||||
|
|
||||||
fn restore_state_chunk(&self, hash: H256, chunk: Bytes) {
|
fn restore_state_chunk(&self, hash: H256, chunk: Bytes) {
|
||||||
if self.restoration_manifest.lock().as_ref().map_or(false, |m| m.state_hashes.iter().any(|h| h == &hash)) {
|
if self.restoration_manifest.lock().as_ref().map_or(false, |m| m.state_hashes.iter().any(|h| h == &hash)) {
|
||||||
self.state_restoration_chunks.lock().insert(hash, chunk);
|
self.state_restoration_chunks.lock().insert(hash, chunk);
|
||||||
|
@ -15,7 +15,6 @@
|
|||||||
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
|
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
//! Ethcore client application.
|
//! Ethcore client application.
|
||||||
|
|
||||||
#![warn(missing_docs)]
|
#![warn(missing_docs)]
|
||||||
|
|
||||||
extern crate ansi_term;
|
extern crate ansi_term;
|
||||||
|
@ -900,17 +900,27 @@ impl RunningClient {
|
|||||||
// Create a weak reference to the client so that we can wait on shutdown
|
// Create a weak reference to the client so that we can wait on shutdown
|
||||||
// until it is dropped
|
// until it is dropped
|
||||||
let weak_client = Arc::downgrade(&client);
|
let weak_client = Arc::downgrade(&client);
|
||||||
// Shutdown and drop the ServiceClient
|
// Shutdown and drop the ClientService
|
||||||
client_service.shutdown();
|
client_service.shutdown();
|
||||||
|
trace!(target: "shutdown", "ClientService shut down");
|
||||||
drop(client_service);
|
drop(client_service);
|
||||||
|
trace!(target: "shutdown", "ClientService dropped");
|
||||||
// drop this stuff as soon as exit detected.
|
// drop this stuff as soon as exit detected.
|
||||||
drop(rpc);
|
drop(rpc);
|
||||||
|
trace!(target: "shutdown", "RPC dropped");
|
||||||
drop(keep_alive);
|
drop(keep_alive);
|
||||||
|
trace!(target: "shutdown", "KeepAlive dropped");
|
||||||
// to make sure timer does not spawn requests while shutdown is in progress
|
// to make sure timer does not spawn requests while shutdown is in progress
|
||||||
informant.shutdown();
|
informant.shutdown();
|
||||||
|
trace!(target: "shutdown", "Informant shut down");
|
||||||
// just Arc is dropping here, to allow other reference release in its default time
|
// just Arc is dropping here, to allow other reference release in its default time
|
||||||
drop(informant);
|
drop(informant);
|
||||||
|
trace!(target: "shutdown", "Informant dropped");
|
||||||
drop(client);
|
drop(client);
|
||||||
|
trace!(target: "shutdown", "Client dropped");
|
||||||
|
// This may help when debugging ref cycles. Requires nightly-only `#![feature(weak_counts)]`
|
||||||
|
// trace!(target: "shutdown", "Waiting for refs to Client to shutdown, strong_count={:?}, weak_count={:?}", weak_client.strong_count(), weak_client.weak_count());
|
||||||
|
trace!(target: "shutdown", "Waiting for refs to Client to shutdown");
|
||||||
wait_for_drop(weak_client);
|
wait_for_drop(weak_client);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -944,24 +954,30 @@ fn print_running_environment(data_dir: &str, dirs: &Directories, db_dirs: &Datab
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn wait_for_drop<T>(w: Weak<T>) {
|
fn wait_for_drop<T>(w: Weak<T>) {
|
||||||
let sleep_duration = Duration::from_secs(1);
|
const SLEEP_DURATION: Duration = Duration::from_secs(1);
|
||||||
let warn_timeout = Duration::from_secs(60);
|
const WARN_TIMEOUT: Duration = Duration::from_secs(60);
|
||||||
let max_timeout = Duration::from_secs(300);
|
const MAX_TIMEOUT: Duration = Duration::from_secs(300);
|
||||||
|
|
||||||
let instant = Instant::now();
|
let instant = Instant::now();
|
||||||
let mut warned = false;
|
let mut warned = false;
|
||||||
|
|
||||||
while instant.elapsed() < max_timeout {
|
while instant.elapsed() < MAX_TIMEOUT {
|
||||||
if w.upgrade().is_none() {
|
if w.upgrade().is_none() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if !warned && instant.elapsed() > warn_timeout {
|
if !warned && instant.elapsed() > WARN_TIMEOUT {
|
||||||
warned = true;
|
warned = true;
|
||||||
warn!("Shutdown is taking longer than expected.");
|
warn!("Shutdown is taking longer than expected.");
|
||||||
}
|
}
|
||||||
|
|
||||||
thread::sleep(sleep_duration);
|
thread::sleep(SLEEP_DURATION);
|
||||||
|
|
||||||
|
// When debugging shutdown issues on a nightly build it can help to enable this with the
|
||||||
|
// `#![feature(weak_counts)]` added to lib.rs (TODO: enable when
|
||||||
|
// https://github.com/rust-lang/rust/issues/57977 is stable)
|
||||||
|
// trace!(target: "shutdown", "Waiting for client to drop, strong_count={:?}, weak_count={:?}", w.strong_count(), w.weak_count());
|
||||||
|
trace!(target: "shutdown", "Waiting for client to drop");
|
||||||
}
|
}
|
||||||
|
|
||||||
warn!("Shutdown timeout reached, exiting uncleanly.");
|
warn!("Shutdown timeout reached, exiting uncleanly.");
|
||||||
|
@ -261,7 +261,7 @@ impl SnapshotCommand {
|
|||||||
let cur_size = p.size();
|
let cur_size = p.size();
|
||||||
if cur_size != last_size {
|
if cur_size != last_size {
|
||||||
last_size = cur_size;
|
last_size = cur_size;
|
||||||
let bytes = ::informant::format_bytes(p.size());
|
let bytes = ::informant::format_bytes(cur_size as usize);
|
||||||
info!("Snapshot: {} accounts {} blocks {}", p.accounts(), p.blocks(), bytes);
|
info!("Snapshot: {} accounts {} blocks {}", p.accounts(), p.blocks(), bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,6 +48,7 @@ impl SnapshotService for TestSnapshotService {
|
|||||||
fn status(&self) -> RestorationStatus { self.status.lock().clone() }
|
fn status(&self) -> RestorationStatus { self.status.lock().clone() }
|
||||||
fn begin_restore(&self, _manifest: ManifestData) { }
|
fn begin_restore(&self, _manifest: ManifestData) { }
|
||||||
fn abort_restore(&self) { }
|
fn abort_restore(&self) { }
|
||||||
|
fn abort_snapshot(&self) {}
|
||||||
fn restore_state_chunk(&self, _hash: H256, _chunk: Bytes) { }
|
fn restore_state_chunk(&self, _hash: H256, _chunk: Bytes) { }
|
||||||
fn restore_block_chunk(&self, _hash: H256, _chunk: Bytes) { }
|
fn restore_block_chunk(&self, _hash: H256, _chunk: Bytes) { }
|
||||||
fn shutdown(&self) { }
|
fn shutdown(&self) { }
|
||||||
|
Loading…
Reference in New Issue
Block a user