Beta 2.5.3 (#10776)
* ethcore/res: activate atlantis classic hf on block 8772000 (#10766) * fix docker tags for publishing (#10741) * fix: aura don't add `SystemTime::now()` (#10720) This commit does the following: - Prevent overflow in `verify_timestamp()` by not adding `now` to found faulty timestamp - Use explicit `CheckedSystemTime::checked_add` to prevent potential consensus issues because SystemTime is platform depedent - remove `#[cfg(not(time_checked_add))]` conditional compilation * Update version * Treat empty account the same as non-exist accounts in EIP-1052 (#10775) * DevP2p: Get node IP address and udp port from Socket, if not included in PING packet (#10705) * get node IP address and udp port from Socket, if not included in PING packet * prevent bootnodes from being added to host nodes * code corrections * code corrections * code corrections * code corrections * docs * code corrections * code corrections * Apply suggestions from code review Co-Authored-By: David <dvdplm@gmail.com> * Add a way to signal shutdown to snapshotting threads (#10744) * Add a way to signal shutdown to snapshotting threads * Pass Progress to fat_rlps() so we can abort from there too. * Checking for abort in a single spot * Remove nightly-only weak/strong counts * fix warning * Fix tests * Add dummy impl to abort snapshots * Add another dummy impl for TestSnapshotService * Remove debugging code * Return error instead of the odd Ok(()) Switch to AtomicU64 * revert .as_bytes() change * fix build * fix build maybe
This commit is contained in:
@@ -764,8 +764,8 @@ impl Client {
|
||||
liveness: AtomicBool::new(awake),
|
||||
mode: Mutex::new(config.mode.clone()),
|
||||
chain: RwLock::new(chain),
|
||||
tracedb: tracedb,
|
||||
engine: engine,
|
||||
tracedb,
|
||||
engine,
|
||||
pruning: config.pruning.clone(),
|
||||
db: RwLock::new(db.clone()),
|
||||
state_db: RwLock::new(state_db),
|
||||
@@ -778,8 +778,8 @@ impl Client {
|
||||
ancient_blocks_import_lock: Default::default(),
|
||||
queue_consensus_message: IoChannelQueue::new(usize::max_value()),
|
||||
last_hashes: RwLock::new(VecDeque::new()),
|
||||
factories: factories,
|
||||
history: history,
|
||||
factories,
|
||||
history,
|
||||
on_user_defaults_change: Mutex::new(None),
|
||||
registrar_address,
|
||||
exit_handler: Mutex::new(None),
|
||||
@@ -1138,7 +1138,12 @@ impl Client {
|
||||
|
||||
/// Take a snapshot at the given block.
|
||||
/// If the ID given is "latest", this will default to 1000 blocks behind.
|
||||
pub fn take_snapshot<W: snapshot_io::SnapshotWriter + Send>(&self, writer: W, at: BlockId, p: &snapshot::Progress) -> Result<(), EthcoreError> {
|
||||
pub fn take_snapshot<W: snapshot_io::SnapshotWriter + Send>(
|
||||
&self,
|
||||
writer: W,
|
||||
at: BlockId,
|
||||
p: &snapshot::Progress,
|
||||
) -> Result<(), EthcoreError> {
|
||||
let db = self.state_db.read().journal_db().boxed_clone();
|
||||
let best_block_number = self.chain_info().best_block_number;
|
||||
let block_number = self.block_number(at).ok_or_else(|| snapshot::Error::InvalidStartingBlock(at))?;
|
||||
@@ -1168,8 +1173,16 @@ impl Client {
|
||||
};
|
||||
|
||||
let processing_threads = self.config.snapshot.processing_threads;
|
||||
snapshot::take_snapshot(&*self.engine, &self.chain.read(), start_hash, db.as_hash_db(), writer, p, processing_threads)?;
|
||||
|
||||
let chunker = self.engine.snapshot_components().ok_or(snapshot::Error::SnapshotsUnsupported)?;
|
||||
snapshot::take_snapshot(
|
||||
chunker,
|
||||
&self.chain.read(),
|
||||
start_hash,
|
||||
db.as_hash_db(),
|
||||
writer,
|
||||
p,
|
||||
processing_threads,
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ use std::iter::FromIterator;
|
||||
use std::ops::Deref;
|
||||
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
|
||||
use std::sync::{Weak, Arc};
|
||||
use std::time::{UNIX_EPOCH, SystemTime, Duration};
|
||||
use std::time::{UNIX_EPOCH, Duration};
|
||||
|
||||
use block::*;
|
||||
use client::EngineClient;
|
||||
@@ -42,14 +42,12 @@ use itertools::{self, Itertools};
|
||||
use rlp::{encode, Decodable, DecoderError, Encodable, RlpStream, Rlp};
|
||||
use ethereum_types::{H256, H520, Address, U128, U256};
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use time_utils::CheckedSystemTime;
|
||||
use types::BlockNumber;
|
||||
use types::header::{Header, ExtendedHeader};
|
||||
use types::ancestry_action::AncestryAction;
|
||||
use unexpected::{Mismatch, OutOfBounds};
|
||||
|
||||
#[cfg(not(time_checked_add))]
|
||||
use time_utils::CheckedSystemTime;
|
||||
|
||||
mod finality;
|
||||
|
||||
/// `AuthorityRound` params.
|
||||
@@ -578,10 +576,10 @@ fn verify_timestamp(step: &Step, header_step: u64) -> Result<(), BlockError> {
|
||||
// Returning it further won't recover the sync process.
|
||||
trace!(target: "engine", "verify_timestamp: block too early");
|
||||
|
||||
let now = SystemTime::now();
|
||||
let found = now.checked_add(Duration::from_secs(oob.found)).ok_or(BlockError::TimestampOverflow)?;
|
||||
let max = oob.max.and_then(|m| now.checked_add(Duration::from_secs(m)));
|
||||
let min = oob.min.and_then(|m| now.checked_add(Duration::from_secs(m)));
|
||||
let found = CheckedSystemTime::checked_add(UNIX_EPOCH, Duration::from_secs(oob.found))
|
||||
.ok_or(BlockError::TimestampOverflow)?;
|
||||
let max = oob.max.and_then(|m| CheckedSystemTime::checked_add(UNIX_EPOCH, Duration::from_secs(m)));
|
||||
let min = oob.min.and_then(|m| CheckedSystemTime::checked_add(UNIX_EPOCH, Duration::from_secs(m)));
|
||||
|
||||
let new_oob = OutOfBounds { min, max, found };
|
||||
|
||||
|
||||
@@ -24,13 +24,11 @@ use engines::clique::{VoteType, DIFF_INTURN, DIFF_NOTURN, NULL_AUTHOR, SIGNING_D
|
||||
use error::{Error, BlockError};
|
||||
use ethereum_types::{Address, H64};
|
||||
use rand::Rng;
|
||||
use time_utils::CheckedSystemTime;
|
||||
use types::BlockNumber;
|
||||
use types::header::Header;
|
||||
use unexpected::Mismatch;
|
||||
|
||||
#[cfg(not(feature = "time_checked_add"))]
|
||||
use time_utils::CheckedSystemTime;
|
||||
|
||||
/// Type that keeps track of the state for a given vote
|
||||
// Votes that go against the proposal aren't counted since it's equivalent to not voting
|
||||
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
|
||||
@@ -268,7 +266,7 @@ impl CliqueBlockState {
|
||||
// This is a quite bad API because we must mutate both variables even when already `inturn` fails
|
||||
// That's why we can't return early and must have the `if-else` in the end
|
||||
pub fn calc_next_timestamp(&mut self, timestamp: u64, period: u64) -> Result<(), Error> {
|
||||
let inturn = UNIX_EPOCH.checked_add(Duration::from_secs(timestamp.saturating_add(period)));
|
||||
let inturn = CheckedSystemTime::checked_add(UNIX_EPOCH, Duration::from_secs(timestamp.saturating_add(period)));
|
||||
|
||||
self.next_timestamp_inturn = inturn;
|
||||
|
||||
|
||||
@@ -82,12 +82,10 @@ use parking_lot::RwLock;
|
||||
use rand::Rng;
|
||||
use super::signer::EngineSigner;
|
||||
use unexpected::{Mismatch, OutOfBounds};
|
||||
use time_utils::CheckedSystemTime;
|
||||
use types::BlockNumber;
|
||||
use types::header::{ExtendedHeader, Header};
|
||||
|
||||
#[cfg(not(feature = "time_checked_add"))]
|
||||
use time_utils::CheckedSystemTime;
|
||||
|
||||
use self::block_state::CliqueBlockState;
|
||||
use self::params::CliqueParams;
|
||||
use self::step_service::StepService;
|
||||
@@ -536,7 +534,7 @@ impl Engine<EthereumMachine> for Clique {
|
||||
|
||||
// Don't waste time checking blocks from the future
|
||||
{
|
||||
let limit = SystemTime::now().checked_add(Duration::from_secs(self.period))
|
||||
let limit = CheckedSystemTime::checked_add(SystemTime::now(), Duration::from_secs(self.period))
|
||||
.ok_or(BlockError::TimestampOverflow)?;
|
||||
|
||||
// This should succeed under the contraints that the system clock works
|
||||
@@ -546,7 +544,7 @@ impl Engine<EthereumMachine> for Clique {
|
||||
|
||||
let hdr = Duration::from_secs(header.timestamp());
|
||||
if hdr > limit_as_dur {
|
||||
let found = UNIX_EPOCH.checked_add(hdr).ok_or(BlockError::TimestampOverflow)?;
|
||||
let found = CheckedSystemTime::checked_add(UNIX_EPOCH, hdr).ok_or(BlockError::TimestampOverflow)?;
|
||||
|
||||
Err(BlockError::TemporarilyInvalid(OutOfBounds {
|
||||
min: None,
|
||||
@@ -657,8 +655,8 @@ impl Engine<EthereumMachine> for Clique {
|
||||
// Ensure that the block's timestamp isn't too close to it's parent
|
||||
let limit = parent.timestamp().saturating_add(self.period);
|
||||
if limit > header.timestamp() {
|
||||
let max = UNIX_EPOCH.checked_add(Duration::from_secs(header.timestamp()));
|
||||
let found = UNIX_EPOCH.checked_add(Duration::from_secs(limit))
|
||||
let max = CheckedSystemTime::checked_add(UNIX_EPOCH, Duration::from_secs(header.timestamp()));
|
||||
let found = CheckedSystemTime::checked_add(UNIX_EPOCH, Duration::from_secs(limit))
|
||||
.ok_or(BlockError::TimestampOverflow)?;
|
||||
|
||||
Err(BlockError::InvalidTimestamp(OutOfBounds {
|
||||
|
||||
@@ -314,7 +314,11 @@ impl<'a, T: 'a, V: 'a, B: 'a> Ext for Externalities<'a, T, V, B>
|
||||
}
|
||||
|
||||
fn extcodehash(&self, address: &Address) -> vm::Result<Option<H256>> {
|
||||
Ok(self.state.code_hash(address)?)
|
||||
if self.state.exists_and_not_null(address)? {
|
||||
Ok(self.state.code_hash(address)?)
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn extcodesize(&self, address: &Address) -> vm::Result<Option<usize>> {
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
#![warn(missing_docs, unused_extern_crates)]
|
||||
#![cfg_attr(feature = "time_checked_add", feature(time_checked_add))]
|
||||
|
||||
//! Ethcore library
|
||||
//!
|
||||
@@ -100,6 +99,7 @@ extern crate rlp;
|
||||
extern crate rustc_hex;
|
||||
extern crate serde;
|
||||
extern crate stats;
|
||||
extern crate time_utils;
|
||||
extern crate triehash_ethereum as triehash;
|
||||
extern crate unexpected;
|
||||
extern crate using_queue;
|
||||
@@ -149,9 +149,6 @@ extern crate fetch;
|
||||
#[cfg(all(test, feature = "price-info"))]
|
||||
extern crate parity_runtime;
|
||||
|
||||
#[cfg(not(time_checked_add))]
|
||||
extern crate time_utils;
|
||||
|
||||
pub mod block;
|
||||
pub mod builtin;
|
||||
pub mod client;
|
||||
|
||||
@@ -24,9 +24,10 @@ use ethtrie::{TrieDB, TrieDBMut};
|
||||
use hash::{KECCAK_EMPTY, KECCAK_NULL_RLP};
|
||||
use hash_db::HashDB;
|
||||
use rlp::{RlpStream, Rlp};
|
||||
use snapshot::Error;
|
||||
use snapshot::{Error, Progress};
|
||||
use std::collections::HashSet;
|
||||
use trie::{Trie, TrieMut};
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
// An empty account -- these were replaced with RLP null data for a space optimization in v1.
|
||||
const ACC_EMPTY: BasicAccount = BasicAccount {
|
||||
@@ -65,8 +66,16 @@ impl CodeState {
|
||||
// walk the account's storage trie, returning a vector of RLP items containing the
|
||||
// account address hash, account properties and the storage. Each item contains at most `max_storage_items`
|
||||
// storage records split according to snapshot format definition.
|
||||
pub fn to_fat_rlps(account_hash: &H256, acc: &BasicAccount, acct_db: &AccountDB, used_code: &mut HashSet<H256>, first_chunk_size: usize, max_chunk_size: usize) -> Result<Vec<Bytes>, Error> {
|
||||
let db = &(acct_db as &HashDB<_,_>);
|
||||
pub fn to_fat_rlps(
|
||||
account_hash: &H256,
|
||||
acc: &BasicAccount,
|
||||
acct_db: &AccountDB,
|
||||
used_code: &mut HashSet<H256>,
|
||||
first_chunk_size: usize,
|
||||
max_chunk_size: usize,
|
||||
p: &Progress,
|
||||
) -> Result<Vec<Bytes>, Error> {
|
||||
let db = &(acct_db as &dyn HashDB<_,_>);
|
||||
let db = TrieDB::new(db, &acc.storage_root)?;
|
||||
let mut chunks = Vec::new();
|
||||
let mut db_iter = db.iter()?;
|
||||
@@ -112,6 +121,10 @@ pub fn to_fat_rlps(account_hash: &H256, acc: &BasicAccount, acct_db: &AccountDB,
|
||||
}
|
||||
|
||||
loop {
|
||||
if p.abort.load(Ordering::SeqCst) {
|
||||
trace!(target: "snapshot", "to_fat_rlps: aborting snapshot");
|
||||
return Err(Error::SnapshotAborted);
|
||||
}
|
||||
match db_iter.next() {
|
||||
Some(Ok((k, v))) => {
|
||||
let pair = {
|
||||
@@ -211,6 +224,7 @@ mod tests {
|
||||
use types::basic_account::BasicAccount;
|
||||
use test_helpers::get_temp_state_db;
|
||||
use snapshot::tests::helpers::fill_storage;
|
||||
use snapshot::Progress;
|
||||
|
||||
use hash::{KECCAK_EMPTY, KECCAK_NULL_RLP, keccak};
|
||||
use ethereum_types::{H256, Address};
|
||||
@@ -236,8 +250,8 @@ mod tests {
|
||||
|
||||
let thin_rlp = ::rlp::encode(&account);
|
||||
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
|
||||
|
||||
let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value()).unwrap();
|
||||
let p = Progress::default();
|
||||
let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
|
||||
let fat_rlp = Rlp::new(&fat_rlps[0]).at(1).unwrap();
|
||||
assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hash_db_mut(), &addr), fat_rlp, H256::zero()).unwrap().0, account);
|
||||
}
|
||||
@@ -262,7 +276,9 @@ mod tests {
|
||||
let thin_rlp = ::rlp::encode(&account);
|
||||
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
|
||||
|
||||
let fat_rlp = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value()).unwrap();
|
||||
let p = Progress::default();
|
||||
|
||||
let fat_rlp = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
|
||||
let fat_rlp = Rlp::new(&fat_rlp[0]).at(1).unwrap();
|
||||
assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hash_db_mut(), &addr), fat_rlp, H256::zero()).unwrap().0, account);
|
||||
}
|
||||
@@ -287,7 +303,8 @@ mod tests {
|
||||
let thin_rlp = ::rlp::encode(&account);
|
||||
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
|
||||
|
||||
let fat_rlps = to_fat_rlps(&keccak(addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), 500, 1000).unwrap();
|
||||
let p = Progress::default();
|
||||
let fat_rlps = to_fat_rlps(&keccak(addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), 500, 1000, &p).unwrap();
|
||||
let mut root = KECCAK_NULL_RLP;
|
||||
let mut restored_account = None;
|
||||
for rlp in fat_rlps {
|
||||
@@ -319,20 +336,21 @@ mod tests {
|
||||
nonce: 50.into(),
|
||||
balance: 123456789.into(),
|
||||
storage_root: KECCAK_NULL_RLP,
|
||||
code_hash: code_hash,
|
||||
code_hash,
|
||||
};
|
||||
|
||||
let account2 = BasicAccount {
|
||||
nonce: 400.into(),
|
||||
balance: 98765432123456789usize.into(),
|
||||
storage_root: KECCAK_NULL_RLP,
|
||||
code_hash: code_hash,
|
||||
code_hash,
|
||||
};
|
||||
|
||||
let mut used_code = HashSet::new();
|
||||
|
||||
let fat_rlp1 = to_fat_rlps(&keccak(&addr1), &account1, &AccountDB::new(db.as_hash_db(), &addr1), &mut used_code, usize::max_value(), usize::max_value()).unwrap();
|
||||
let fat_rlp2 = to_fat_rlps(&keccak(&addr2), &account2, &AccountDB::new(db.as_hash_db(), &addr2), &mut used_code, usize::max_value(), usize::max_value()).unwrap();
|
||||
let p1 = Progress::default();
|
||||
let p2 = Progress::default();
|
||||
let fat_rlp1 = to_fat_rlps(&keccak(&addr1), &account1, &AccountDB::new(db.as_hash_db(), &addr1), &mut used_code, usize::max_value(), usize::max_value(), &p1).unwrap();
|
||||
let fat_rlp2 = to_fat_rlps(&keccak(&addr2), &account2, &AccountDB::new(db.as_hash_db(), &addr2), &mut used_code, usize::max_value(), usize::max_value(), &p2).unwrap();
|
||||
assert_eq!(used_code.len(), 1);
|
||||
|
||||
let fat_rlp1 = Rlp::new(&fat_rlp1[0]).at(1).unwrap();
|
||||
@@ -350,6 +368,6 @@ mod tests {
|
||||
#[test]
|
||||
fn encoding_empty_acc() {
|
||||
let mut db = get_temp_state_db();
|
||||
assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hash_db_mut(), &Address::default()), Rlp::new(&::rlp::NULL_RLP), H256::zero()).unwrap(), (ACC_EMPTY, None));
|
||||
assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hash_db_mut(), &Address::zero()), Rlp::new(&::rlp::NULL_RLP), H256::zero()).unwrap(), (ACC_EMPTY, None));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,6 +61,8 @@ pub enum Error {
|
||||
ChunkTooLarge,
|
||||
/// Snapshots not supported by the consensus engine.
|
||||
SnapshotsUnsupported,
|
||||
/// Aborted snapshot
|
||||
SnapshotAborted,
|
||||
/// Bad epoch transition.
|
||||
BadEpochProof(u64),
|
||||
/// Wrong chunk format.
|
||||
@@ -91,6 +93,7 @@ impl fmt::Display for Error {
|
||||
Error::ChunkTooSmall => write!(f, "Chunk size is too small."),
|
||||
Error::ChunkTooLarge => write!(f, "Chunk size is too large."),
|
||||
Error::SnapshotsUnsupported => write!(f, "Snapshots unsupported by consensus engine."),
|
||||
Error::SnapshotAborted => write!(f, "Snapshot was aborted."),
|
||||
Error::BadEpochProof(i) => write!(f, "Bad epoch proof for transition to epoch {}", i),
|
||||
Error::WrongChunkFormat(ref msg) => write!(f, "Wrong chunk format: {}", msg),
|
||||
Error::UnlinkedAncientBlockChain => write!(f, "Unlinked ancient blocks chain"),
|
||||
|
||||
@@ -310,10 +310,7 @@ impl LooseReader {
|
||||
|
||||
dir.pop();
|
||||
|
||||
Ok(LooseReader {
|
||||
dir: dir,
|
||||
manifest: manifest,
|
||||
})
|
||||
Ok(LooseReader { dir, manifest })
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::cmp;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
|
||||
use hash::{keccak, KECCAK_NULL_RLP, KECCAK_EMPTY};
|
||||
|
||||
use account_db::{AccountDB, AccountDBMut};
|
||||
@@ -107,7 +107,7 @@ impl Default for SnapshotConfiguration {
|
||||
fn default() -> Self {
|
||||
SnapshotConfiguration {
|
||||
no_periodic: false,
|
||||
processing_threads: ::std::cmp::max(1, num_cpus::get() / 2),
|
||||
processing_threads: ::std::cmp::max(1, num_cpus::get_physical() / 2),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -117,8 +117,9 @@ impl Default for SnapshotConfiguration {
|
||||
pub struct Progress {
|
||||
accounts: AtomicUsize,
|
||||
blocks: AtomicUsize,
|
||||
size: AtomicUsize, // Todo [rob] use Atomicu64 when it stabilizes.
|
||||
size: AtomicU64,
|
||||
done: AtomicBool,
|
||||
abort: AtomicBool,
|
||||
}
|
||||
|
||||
impl Progress {
|
||||
@@ -127,6 +128,7 @@ impl Progress {
|
||||
self.accounts.store(0, Ordering::Release);
|
||||
self.blocks.store(0, Ordering::Release);
|
||||
self.size.store(0, Ordering::Release);
|
||||
self.abort.store(false, Ordering::Release);
|
||||
|
||||
// atomic fence here to ensure the others are written first?
|
||||
// logs might very rarely get polluted if not.
|
||||
@@ -140,7 +142,7 @@ impl Progress {
|
||||
pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Acquire) }
|
||||
|
||||
/// Get the written size of the snapshot in bytes.
|
||||
pub fn size(&self) -> usize { self.size.load(Ordering::Acquire) }
|
||||
pub fn size(&self) -> u64 { self.size.load(Ordering::Acquire) }
|
||||
|
||||
/// Whether the snapshot is complete.
|
||||
pub fn done(&self) -> bool { self.done.load(Ordering::Acquire) }
|
||||
@@ -148,27 +150,28 @@ impl Progress {
|
||||
}
|
||||
/// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer.
|
||||
pub fn take_snapshot<W: SnapshotWriter + Send>(
|
||||
engine: &EthEngine,
|
||||
chunker: Box<dyn SnapshotComponents>,
|
||||
chain: &BlockChain,
|
||||
block_at: H256,
|
||||
state_db: &HashDB<KeccakHasher, DBValue>,
|
||||
block_hash: H256,
|
||||
state_db: &dyn HashDB<KeccakHasher, DBValue>,
|
||||
writer: W,
|
||||
p: &Progress,
|
||||
processing_threads: usize,
|
||||
) -> Result<(), Error> {
|
||||
let start_header = chain.block_header_data(&block_at)
|
||||
.ok_or_else(|| Error::InvalidStartingBlock(BlockId::Hash(block_at)))?;
|
||||
let start_header = chain.block_header_data(&block_hash)
|
||||
.ok_or_else(|| Error::InvalidStartingBlock(BlockId::Hash(block_hash)))?;
|
||||
let state_root = start_header.state_root();
|
||||
let number = start_header.number();
|
||||
let block_number = start_header.number();
|
||||
|
||||
info!("Taking snapshot starting at block {}", number);
|
||||
info!("Taking snapshot starting at block {}", block_number);
|
||||
|
||||
let version = chunker.current_version();
|
||||
let writer = Mutex::new(writer);
|
||||
let chunker = engine.snapshot_components().ok_or(Error::SnapshotsUnsupported)?;
|
||||
let snapshot_version = chunker.current_version();
|
||||
let (state_hashes, block_hashes) = scope(|scope| -> Result<(Vec<H256>, Vec<H256>), Error> {
|
||||
let writer = &writer;
|
||||
let block_guard = scope.spawn(move || chunk_secondary(chunker, chain, block_at, writer, p));
|
||||
let block_guard = scope.spawn(move || {
|
||||
chunk_secondary(chunker, chain, block_hash, writer, p)
|
||||
});
|
||||
|
||||
// The number of threads must be between 1 and SNAPSHOT_SUBPARTS
|
||||
assert!(processing_threads >= 1, "Cannot use less than 1 threads for creating snapshots");
|
||||
@@ -183,7 +186,7 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
|
||||
|
||||
for part in (thread_idx..SNAPSHOT_SUBPARTS).step_by(num_threads) {
|
||||
debug!(target: "snapshot", "Chunking part {} in thread {}", part, thread_idx);
|
||||
let mut hashes = chunk_state(state_db, &state_root, writer, p, Some(part))?;
|
||||
let mut hashes = chunk_state(state_db, &state_root, writer, p, Some(part), thread_idx)?;
|
||||
chunk_hashes.append(&mut hashes);
|
||||
}
|
||||
|
||||
@@ -207,12 +210,12 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
|
||||
info!(target: "snapshot", "produced {} state chunks and {} block chunks.", state_hashes.len(), block_hashes.len());
|
||||
|
||||
let manifest_data = ManifestData {
|
||||
version: snapshot_version,
|
||||
state_hashes: state_hashes,
|
||||
block_hashes: block_hashes,
|
||||
state_root: state_root,
|
||||
block_number: number,
|
||||
block_hash: block_at,
|
||||
version,
|
||||
state_hashes,
|
||||
block_hashes,
|
||||
state_root,
|
||||
block_number,
|
||||
block_hash,
|
||||
};
|
||||
|
||||
writer.into_inner().finish(manifest_data)?;
|
||||
@@ -228,7 +231,13 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
|
||||
/// Secondary chunks are engine-specific, but they intend to corroborate the state data
|
||||
/// in the state chunks.
|
||||
/// Returns a list of chunk hashes, with the first having the blocks furthest from the genesis.
|
||||
pub fn chunk_secondary<'a>(mut chunker: Box<SnapshotComponents>, chain: &'a BlockChain, start_hash: H256, writer: &Mutex<SnapshotWriter + 'a>, progress: &'a Progress) -> Result<Vec<H256>, Error> {
|
||||
pub fn chunk_secondary<'a>(
|
||||
mut chunker: Box<dyn SnapshotComponents>,
|
||||
chain: &'a BlockChain,
|
||||
start_hash: H256,
|
||||
writer: &Mutex<dyn SnapshotWriter + 'a>,
|
||||
progress: &'a Progress
|
||||
) -> Result<Vec<H256>, Error> {
|
||||
let mut chunk_hashes = Vec::new();
|
||||
let mut snappy_buffer = vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)];
|
||||
|
||||
@@ -243,7 +252,7 @@ pub fn chunk_secondary<'a>(mut chunker: Box<SnapshotComponents>, chain: &'a Bloc
|
||||
trace!(target: "snapshot", "wrote secondary chunk. hash: {:x}, size: {}, uncompressed size: {}",
|
||||
hash, size, raw_data.len());
|
||||
|
||||
progress.size.fetch_add(size, Ordering::SeqCst);
|
||||
progress.size.fetch_add(size as u64, Ordering::SeqCst);
|
||||
chunk_hashes.push(hash);
|
||||
Ok(())
|
||||
};
|
||||
@@ -266,8 +275,9 @@ struct StateChunker<'a> {
|
||||
rlps: Vec<Bytes>,
|
||||
cur_size: usize,
|
||||
snappy_buffer: Vec<u8>,
|
||||
writer: &'a Mutex<SnapshotWriter + 'a>,
|
||||
writer: &'a Mutex<dyn SnapshotWriter + 'a>,
|
||||
progress: &'a Progress,
|
||||
thread_idx: usize,
|
||||
}
|
||||
|
||||
impl<'a> StateChunker<'a> {
|
||||
@@ -297,10 +307,10 @@ impl<'a> StateChunker<'a> {
|
||||
let hash = keccak(&compressed);
|
||||
|
||||
self.writer.lock().write_state_chunk(hash, compressed)?;
|
||||
trace!(target: "snapshot", "wrote state chunk. size: {}, uncompressed size: {}", compressed_size, raw_data.len());
|
||||
trace!(target: "snapshot", "Thread {} wrote state chunk. size: {}, uncompressed size: {}", self.thread_idx, compressed_size, raw_data.len());
|
||||
|
||||
self.progress.accounts.fetch_add(num_entries, Ordering::SeqCst);
|
||||
self.progress.size.fetch_add(compressed_size, Ordering::SeqCst);
|
||||
self.progress.size.fetch_add(compressed_size as u64, Ordering::SeqCst);
|
||||
|
||||
self.hashes.push(hash);
|
||||
self.cur_size = 0;
|
||||
@@ -321,7 +331,14 @@ impl<'a> StateChunker<'a> {
|
||||
///
|
||||
/// Returns a list of hashes of chunks created, or any error it may
|
||||
/// have encountered.
|
||||
pub fn chunk_state<'a>(db: &HashDB<KeccakHasher, DBValue>, root: &H256, writer: &Mutex<SnapshotWriter + 'a>, progress: &'a Progress, part: Option<usize>) -> Result<Vec<H256>, Error> {
|
||||
pub fn chunk_state<'a>(
|
||||
db: &dyn HashDB<KeccakHasher, DBValue>,
|
||||
root: &H256,
|
||||
writer: &Mutex<dyn SnapshotWriter + 'a>,
|
||||
progress: &'a Progress,
|
||||
part: Option<usize>,
|
||||
thread_idx: usize,
|
||||
) -> Result<Vec<H256>, Error> {
|
||||
let account_trie = TrieDB::new(&db, &root)?;
|
||||
|
||||
let mut chunker = StateChunker {
|
||||
@@ -329,8 +346,9 @@ pub fn chunk_state<'a>(db: &HashDB<KeccakHasher, DBValue>, root: &H256, writer:
|
||||
rlps: Vec::new(),
|
||||
cur_size: 0,
|
||||
snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)],
|
||||
writer: writer,
|
||||
progress: progress,
|
||||
writer,
|
||||
progress,
|
||||
thread_idx,
|
||||
};
|
||||
|
||||
let mut used_code = HashSet::new();
|
||||
@@ -365,7 +383,7 @@ pub fn chunk_state<'a>(db: &HashDB<KeccakHasher, DBValue>, root: &H256, writer:
|
||||
let account = ::rlp::decode(&*account_data)?;
|
||||
let account_db = AccountDB::from_hash(db, account_key_hash);
|
||||
|
||||
let fat_rlps = account::to_fat_rlps(&account_key_hash, &account, &account_db, &mut used_code, PREFERRED_CHUNK_SIZE - chunker.chunk_size(), PREFERRED_CHUNK_SIZE)?;
|
||||
let fat_rlps = account::to_fat_rlps(&account_key_hash, &account, &account_db, &mut used_code, PREFERRED_CHUNK_SIZE - chunker.chunk_size(), PREFERRED_CHUNK_SIZE, progress)?;
|
||||
for (i, fat_rlp) in fat_rlps.into_iter().enumerate() {
|
||||
if i > 0 {
|
||||
chunker.write_chunk()?;
|
||||
@@ -383,7 +401,7 @@ pub fn chunk_state<'a>(db: &HashDB<KeccakHasher, DBValue>, root: &H256, writer:
|
||||
|
||||
/// Used to rebuild the state trie piece by piece.
|
||||
pub struct StateRebuilder {
|
||||
db: Box<JournalDB>,
|
||||
db: Box<dyn JournalDB>,
|
||||
state_root: H256,
|
||||
known_code: HashMap<H256, H256>, // code hashes mapped to first account with this code.
|
||||
missing_code: HashMap<H256, Vec<H256>>, // maps code hashes to lists of accounts missing that code.
|
||||
@@ -393,7 +411,7 @@ pub struct StateRebuilder {
|
||||
|
||||
impl StateRebuilder {
|
||||
/// Create a new state rebuilder to write into the given backing DB.
|
||||
pub fn new(db: Arc<KeyValueDB>, pruning: Algorithm) -> Self {
|
||||
pub fn new(db: Arc<dyn KeyValueDB>, pruning: Algorithm) -> Self {
|
||||
StateRebuilder {
|
||||
db: journaldb::new(db.clone(), pruning, ::db::COL_STATE),
|
||||
state_root: KECCAK_NULL_RLP,
|
||||
@@ -411,7 +429,7 @@ impl StateRebuilder {
|
||||
let mut pairs = Vec::with_capacity(rlp.item_count()?);
|
||||
|
||||
// initialize the pairs vector with empty values so we have slots to write into.
|
||||
pairs.resize(rlp.item_count()?, (H256::new(), Vec::new()));
|
||||
pairs.resize(rlp.item_count()?, (H256::zero(), Vec::new()));
|
||||
|
||||
let status = rebuild_accounts(
|
||||
self.db.as_hash_db_mut(),
|
||||
@@ -468,7 +486,7 @@ impl StateRebuilder {
|
||||
/// Finalize the restoration. Check for accounts missing code and make a dummy
|
||||
/// journal entry.
|
||||
/// Once all chunks have been fed, there should be nothing missing.
|
||||
pub fn finalize(mut self, era: u64, id: H256) -> Result<Box<JournalDB>, ::error::Error> {
|
||||
pub fn finalize(mut self, era: u64, id: H256) -> Result<Box<dyn JournalDB>, ::error::Error> {
|
||||
let missing = self.missing_code.keys().cloned().collect::<Vec<_>>();
|
||||
if !missing.is_empty() { return Err(Error::MissingCode(missing).into()) }
|
||||
|
||||
@@ -493,7 +511,7 @@ struct RebuiltStatus {
|
||||
// rebuild a set of accounts and their storage.
|
||||
// returns a status detailing newly-loaded code and accounts missing code.
|
||||
fn rebuild_accounts(
|
||||
db: &mut HashDB<KeccakHasher, DBValue>,
|
||||
db: &mut dyn HashDB<KeccakHasher, DBValue>,
|
||||
account_fat_rlps: Rlp,
|
||||
out_chunk: &mut [(H256, Bytes)],
|
||||
known_code: &HashMap<H256, H256>,
|
||||
@@ -560,7 +578,7 @@ const POW_VERIFY_RATE: f32 = 0.02;
|
||||
/// Verify an old block with the given header, engine, blockchain, body. If `always` is set, it will perform
|
||||
/// the fullest verification possible. If not, it will take a random sample to determine whether it will
|
||||
/// do heavy or light verification.
|
||||
pub fn verify_old_block(rng: &mut OsRng, header: &Header, engine: &EthEngine, chain: &BlockChain, always: bool) -> Result<(), ::error::Error> {
|
||||
pub fn verify_old_block(rng: &mut OsRng, header: &Header, engine: &dyn EthEngine, chain: &BlockChain, always: bool) -> Result<(), ::error::Error> {
|
||||
engine.verify_block_basic(header)?;
|
||||
|
||||
if always || rng.gen::<f32>() <= POW_VERIFY_RATE {
|
||||
|
||||
@@ -415,7 +415,7 @@ impl Service {
|
||||
_ => break,
|
||||
}
|
||||
|
||||
// Writting changes to DB and logging every now and then
|
||||
// Writing changes to DB and logging every now and then
|
||||
if block_number % 1_000 == 0 {
|
||||
next_db.key_value().write_buffered(batch);
|
||||
next_chain.commit();
|
||||
@@ -479,16 +479,12 @@ impl Service {
|
||||
|
||||
let guard = Guard::new(temp_dir.clone());
|
||||
let res = client.take_snapshot(writer, BlockId::Number(num), &self.progress);
|
||||
|
||||
self.taking_snapshot.store(false, Ordering::SeqCst);
|
||||
if let Err(e) = res {
|
||||
if client.chain_info().best_block_number >= num + client.pruning_history() {
|
||||
// "Cancelled" is mincing words a bit -- what really happened
|
||||
// is that the state we were snapshotting got pruned out
|
||||
// before we could finish.
|
||||
info!("Periodic snapshot failed: block state pruned.\
|
||||
Run with a longer `--pruning-history` or with `--no-periodic-snapshot`");
|
||||
return Ok(())
|
||||
// The state we were snapshotting was pruned before we could finish.
|
||||
info!("Periodic snapshot failed: block state pruned. Run with a longer `--pruning-history` or with `--no-periodic-snapshot`");
|
||||
return Err(e);
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
@@ -846,14 +842,29 @@ impl SnapshotService for Service {
|
||||
}
|
||||
}
|
||||
|
||||
fn abort_snapshot(&self) {
|
||||
if self.taking_snapshot.load(Ordering::SeqCst) {
|
||||
trace!(target: "snapshot", "Aborting snapshot – Snapshot under way");
|
||||
self.progress.abort.store(true, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
fn shutdown(&self) {
|
||||
trace!(target: "snapshot", "Shut down SnapshotService");
|
||||
self.abort_restore();
|
||||
trace!(target: "snapshot", "Shut down SnapshotService - restore aborted");
|
||||
self.abort_snapshot();
|
||||
trace!(target: "snapshot", "Shut down SnapshotService - snapshot aborted");
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Service {
|
||||
fn drop(&mut self) {
|
||||
trace!(target: "shutdown", "Dropping Service");
|
||||
self.abort_restore();
|
||||
trace!(target: "shutdown", "Dropping Service - restore aborted");
|
||||
self.abort_snapshot();
|
||||
trace!(target: "shutdown", "Dropping Service - snapshot aborted");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -188,14 +188,15 @@ fn keep_ancient_blocks() {
|
||||
&state_root,
|
||||
&writer,
|
||||
&Progress::default(),
|
||||
None
|
||||
None,
|
||||
0
|
||||
).unwrap();
|
||||
|
||||
let manifest = ::snapshot::ManifestData {
|
||||
version: 2,
|
||||
state_hashes: state_hashes,
|
||||
state_root: state_root,
|
||||
block_hashes: block_hashes,
|
||||
state_hashes,
|
||||
state_root,
|
||||
block_hashes,
|
||||
block_number: NUM_BLOCKS,
|
||||
block_hash: best_hash,
|
||||
};
|
||||
|
||||
@@ -55,7 +55,7 @@ fn snap_and_restore() {
|
||||
|
||||
let mut state_hashes = Vec::new();
|
||||
for part in 0..SNAPSHOT_SUBPARTS {
|
||||
let mut hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), Some(part)).unwrap();
|
||||
let mut hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), Some(part), 0).unwrap();
|
||||
state_hashes.append(&mut hashes);
|
||||
}
|
||||
|
||||
@@ -126,8 +126,8 @@ fn get_code_from_prev_chunk() {
|
||||
let mut make_chunk = |acc, hash| {
|
||||
let mut db = journaldb::new_memory_db();
|
||||
AccountDBMut::from_hash(&mut db, hash).insert(&code[..]);
|
||||
|
||||
let fat_rlp = account::to_fat_rlps(&hash, &acc, &AccountDB::from_hash(&db, hash), &mut used_code, usize::max_value(), usize::max_value()).unwrap();
|
||||
let p = Progress::default();
|
||||
let fat_rlp = account::to_fat_rlps(&hash, &acc, &AccountDB::from_hash(&db, hash), &mut used_code, usize::max_value(), usize::max_value(), &p).unwrap();
|
||||
let mut stream = RlpStream::new_list(1);
|
||||
stream.append_raw(&fat_rlp[0], 1);
|
||||
stream.out()
|
||||
@@ -171,13 +171,13 @@ fn checks_flag() {
|
||||
let state_root = producer.state_root();
|
||||
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());
|
||||
|
||||
let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), None).unwrap();
|
||||
let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), None, 0).unwrap();
|
||||
|
||||
writer.into_inner().finish(::snapshot::ManifestData {
|
||||
version: 2,
|
||||
state_hashes: state_hashes,
|
||||
state_hashes,
|
||||
block_hashes: Vec::new(),
|
||||
state_root: state_root,
|
||||
state_root,
|
||||
block_number: 0,
|
||||
block_hash: H256::default(),
|
||||
}).unwrap();
|
||||
|
||||
@@ -55,6 +55,9 @@ pub trait SnapshotService : Sync + Send {
|
||||
/// no-op if currently restoring.
|
||||
fn restore_block_chunk(&self, hash: H256, chunk: Bytes);
|
||||
|
||||
/// Abort in-progress snapshotting if there is one.
|
||||
fn abort_snapshot(&self);
|
||||
|
||||
/// Shutdown the Snapshot Service by aborting any ongoing restore
|
||||
fn shutdown(&self);
|
||||
}
|
||||
|
||||
@@ -40,7 +40,6 @@ use types::{BlockNumber, header::Header};
|
||||
use types::transaction::SignedTransaction;
|
||||
use verification::queue::kind::blocks::Unverified;
|
||||
|
||||
#[cfg(not(time_checked_add))]
|
||||
use time_utils::CheckedSystemTime;
|
||||
|
||||
/// Preprocessed block data gathered in `verify_block_unordered` call
|
||||
@@ -310,7 +309,7 @@ pub fn verify_header_params(header: &Header, engine: &EthEngine, is_full: bool,
|
||||
// this will resist overflow until `year 2037`
|
||||
let max_time = SystemTime::now() + ACCEPTABLE_DRIFT;
|
||||
let invalid_threshold = max_time + ACCEPTABLE_DRIFT * 9;
|
||||
let timestamp = UNIX_EPOCH.checked_add(Duration::from_secs(header.timestamp()))
|
||||
let timestamp = CheckedSystemTime::checked_add(UNIX_EPOCH, Duration::from_secs(header.timestamp()))
|
||||
.ok_or(BlockError::TimestampOverflow)?;
|
||||
|
||||
if timestamp > invalid_threshold {
|
||||
@@ -334,9 +333,9 @@ fn verify_parent(header: &Header, parent: &Header, engine: &EthEngine) -> Result
|
||||
|
||||
if !engine.is_timestamp_valid(header.timestamp(), parent.timestamp()) {
|
||||
let now = SystemTime::now();
|
||||
let min = now.checked_add(Duration::from_secs(parent.timestamp().saturating_add(1)))
|
||||
let min = CheckedSystemTime::checked_add(now, Duration::from_secs(parent.timestamp().saturating_add(1)))
|
||||
.ok_or(BlockError::TimestampOverflow)?;
|
||||
let found = now.checked_add(Duration::from_secs(header.timestamp()))
|
||||
let found = CheckedSystemTime::checked_add(now, Duration::from_secs(header.timestamp()))
|
||||
.ok_or(BlockError::TimestampOverflow)?;
|
||||
return Err(From::from(BlockError::InvalidTimestamp(OutOfBounds { max: None, min: Some(min), found })))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user