Extract snapshot to own crate (#11010)

* Move snapshot to own crate
Sort out imports

* WIP cargo toml

* Make snapshotting generic over the client
Sort out tests

* Sort out types from blockchain and client

* Sort out sync

* Sort out imports and generics

* Sort out main binary

* Fix sync test-helpers

* Sort out import for secret-store

* Sort out more imports

* Fix easy todos

* cleanup

* Cleanup

* remove unneded workspace member

* cleanup

* Sort out test-helpers dependency on account-db

* Update ethcore/client-traits/src/lib.rs

Co-Authored-By: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* Update ethcore/snapshot/Cargo.toml
This commit is contained in:
David
2019-09-03 11:29:25 +02:00
committed by GitHub
parent 396ccdbcc1
commit d193ddde19
68 changed files with 627 additions and 486 deletions

View File

@@ -1,195 +0,0 @@
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
use bytes::Bytes;
use ethereum_types::{H256, U256};
use types::transaction::UnverifiedTransaction;
use blockchain::ImportRoute;
use std::time::Duration;
use std::collections::HashMap;
/// Messages to broadcast via chain
pub enum ChainMessageType {
/// Consensus message
Consensus(Vec<u8>),
/// Message with private transaction
PrivateTransaction(H256, Vec<u8>),
/// Message with signed private transaction
SignedPrivateTransaction(H256, Vec<u8>),
/// Private state request for the particular private contract
PrivateStateRequest(H256),
}
/// Route type to indicate whether it is enacted or retracted.
#[derive(Clone)]
pub enum ChainRouteType {
/// Enacted block
Enacted,
/// Retracted block
Retracted
}
/// A complete chain enacted retracted route.
#[derive(Default, Clone)]
pub struct ChainRoute {
route: Vec<(H256, ChainRouteType)>,
enacted: Vec<H256>,
retracted: Vec<H256>,
}
impl<'a> From<&'a [ImportRoute]> for ChainRoute {
fn from(import_results: &'a [ImportRoute]) -> ChainRoute {
ChainRoute::new(import_results.iter().flat_map(|route| {
route.retracted.iter().map(|h| (*h, ChainRouteType::Retracted))
.chain(route.enacted.iter().map(|h| (*h, ChainRouteType::Enacted)))
}).collect())
}
}
impl ChainRoute {
/// Create a new ChainRoute based on block hash and route type pairs.
pub fn new(route: Vec<(H256, ChainRouteType)>) -> Self {
let (enacted, retracted) = Self::to_enacted_retracted(&route);
Self { route, enacted, retracted }
}
/// Gather all non-duplicate enacted and retracted blocks.
fn to_enacted_retracted(route: &[(H256, ChainRouteType)]) -> (Vec<H256>, Vec<H256>) {
fn map_to_vec(map: Vec<(H256, bool)>) -> Vec<H256> {
map.into_iter().map(|(k, _v)| k).collect()
}
// Because we are doing multiple inserts some of the blocks that were enacted in import `k`
// could be retracted in import `k+1`. This is why to understand if after all inserts
// the block is enacted or retracted we iterate over all routes and at the end final state
// will be in the hashmap
let map = route.iter().fold(HashMap::new(), |mut map, route| {
match &route.1 {
&ChainRouteType::Enacted => {
map.insert(route.0, true);
},
&ChainRouteType::Retracted => {
map.insert(route.0, false);
},
}
map
});
// Split to enacted retracted (using hashmap value)
let (enacted, retracted) = map.into_iter().partition(|&(_k, v)| v);
// And convert tuples to keys
(map_to_vec(enacted), map_to_vec(retracted))
}
/// Consume route and return the enacted retracted form.
pub fn into_enacted_retracted(self) -> (Vec<H256>, Vec<H256>) {
(self.enacted, self.retracted)
}
/// All non-duplicate enacted blocks.
pub fn enacted(&self) -> &[H256] {
&self.enacted
}
/// All non-duplicate retracted blocks.
pub fn retracted(&self) -> &[H256] {
&self.retracted
}
/// All blocks in the route.
pub fn route(&self) -> &[(H256, ChainRouteType)] {
&self.route
}
}
/// Used by `ChainNotify` `new_blocks()`
pub struct NewBlocks {
/// Imported blocks
pub imported: Vec<H256>,
/// Invalid blocks
pub invalid: Vec<H256>,
/// Route
pub route: ChainRoute,
/// Sealed
pub sealed: Vec<H256>,
/// Block bytes.
pub proposed: Vec<Bytes>,
/// Duration
pub duration: Duration,
/// Has more blocks to import
pub has_more_blocks_to_import: bool,
}
impl NewBlocks {
/// Constructor
pub fn new(
imported: Vec<H256>,
invalid: Vec<H256>,
route: ChainRoute,
sealed: Vec<H256>,
proposed: Vec<Bytes>,
duration: Duration,
has_more_blocks_to_import: bool,
) -> NewBlocks {
NewBlocks {
imported,
invalid,
route,
sealed,
proposed,
duration,
has_more_blocks_to_import,
}
}
}
/// Represents what has to be handled by actor listening to chain events
pub trait ChainNotify : Send + Sync {
/// fires when chain has new blocks.
fn new_blocks(&self, _new_blocks: NewBlocks) {
// does nothing by default
}
/// fires when chain achieves active mode
fn start(&self) {
// does nothing by default
}
/// fires when chain achieves passive mode
fn stop(&self) {
// does nothing by default
}
/// fires when chain broadcasts a message
fn broadcast(&self, _message_type: ChainMessageType) {
// does nothing by default
}
/// fires when new block is about to be imported
/// implementations should be light
fn block_pre_import(&self, _bytes: &Bytes, _hash: &H256, _difficulty: &U256) {
// does nothing by default
}
/// fires when new transactions are received from a peer
fn transactions_received(&self,
_txs: &[UnverifiedTransaction],
_peer_id: usize,
) {
// does nothing by default
}
}

View File

@@ -21,7 +21,7 @@ use std::sync::{Arc, Weak};
use std::time::{Instant, Duration};
use account_state::state::StateInfo;
use blockchain::{BlockReceipts, BlockChain, BlockChainDB, BlockProvider, TreeRoute, ImportRoute, TransactionAddress, ExtrasInsert, BlockNumberKey};
use blockchain::{BlockReceipts, BlockChain, BlockChainDB, BlockProvider, TreeRoute, TransactionAddress, ExtrasInsert, BlockNumberKey};
use bytes::Bytes;
use call_contract::{CallContract, RegistryInfo};
use ethcore_miner::pool::VerifiedTransaction;
@@ -41,15 +41,15 @@ use block::{LockedBlock, Drain, ClosedBlock, OpenBlock, enact_verified, SealedBl
use client::ancient_import::AncientVerifier;
use client::{
ReopenBlock, PrepareOpenBlock, ImportSealedBlock, BroadcastProposalBlock,
Call, BlockProducer, SealedBlockImporter, ChainNotify, EngineInfo,
ClientConfig, NewBlocks, ChainRoute, ChainMessageType, bad_blocks,
Call, BlockProducer, SealedBlockImporter, EngineInfo,
ClientConfig, bad_blocks,
};
use client_traits::{
BlockInfo, ScheduleInfo, StateClient, BlockChainReset,
Nonce, Balance, ChainInfo, TransactionInfo, ImportBlock,
AccountData, BlockChain as BlockChainTrait, BlockChainClient,
IoClient, BadBlocks, ProvingBlockChainClient, SnapshotClient,
DatabaseRestore, SnapshotWriter, Tick,
DatabaseRestore, SnapshotWriter, Tick, ChainNotify,
StateOrBlock,
};
use engine::Engine;
@@ -72,7 +72,9 @@ use types::{
block::PreverifiedBlock,
block_status::BlockStatus,
blockchain_info::BlockChainInfo,
chain_notify::{NewBlocks, ChainRoute, ChainMessageType},
client_types::ClientReport,
import_route::ImportRoute,
io_message::ClientIoMessage,
encoded,
engines::{
@@ -986,12 +988,14 @@ impl Client {
self.importer.miner.clone()
}
#[cfg(test)]
/// Access state from tests
#[cfg(any(test, feature = "test-helpers"))]
pub fn state_db(&self) -> ::parking_lot::RwLockReadGuard<StateDB> {
self.state_db.read()
}
#[cfg(test)]
/// Access the BlockChain from tests
#[cfg(any(test, feature = "test-helpers"))]
pub fn chain(&self) -> Arc<BlockChain> {
self.chain.read().clone()
}

View File

@@ -31,7 +31,6 @@ pub use self::config::{ClientConfig, DatabaseCompactionProfile, BlockChainConfig
pub use self::evm_test_client::{EvmTestClient, EvmTestError, TransactErr, TransactSuccess};
#[cfg(any(test, feature = "test-helpers"))]
pub use self::test_client::{TestBlockChainClient, EachBlockWith, TestState};
pub use self::chain_notify::{ChainNotify, NewBlocks, ChainRoute, ChainRouteType, ChainMessageType};
pub use self::traits::{
ReopenBlock, PrepareOpenBlock, ImportSealedBlock, BroadcastProposalBlock,
Call, EngineInfo, BlockProducer, SealedBlockImporter,
@@ -40,4 +39,3 @@ pub use self::traits::{
pub use verification::VerifierType;
mod traits;
mod chain_notify;

View File

@@ -53,16 +53,13 @@
//! cargo build --release
//! ```
extern crate account_db;
extern crate account_state;
extern crate ansi_term;
extern crate client_traits;
extern crate common_types as types;
extern crate crossbeam_utils;
extern crate engine;
extern crate ethabi;
extern crate ethcore_blockchain as blockchain;
extern crate ethcore_bloom_journal as bloom_journal;
extern crate ethcore_call_contract as call_contract;
extern crate ethcore_db as db;
extern crate ethcore_io as io;
@@ -74,13 +71,10 @@ extern crate hash_db;
extern crate itertools;
extern crate journaldb;
extern crate keccak_hash as hash;
extern crate keccak_hasher;
extern crate kvdb;
extern crate machine;
extern crate memory_cache;
extern crate num_cpus;
extern crate parity_bytes as bytes;
extern crate parity_snappy as snappy;
extern crate parking_lot;
extern crate trie_db as trie;
extern crate patricia_trie_ethereum as ethtrie;
@@ -88,6 +82,7 @@ extern crate rand;
extern crate rayon;
extern crate rlp;
extern crate serde;
extern crate snapshot;
extern crate spec;
extern crate state_db;
extern crate trace;
@@ -107,6 +102,8 @@ extern crate ethcore_stratum;
#[cfg(any(test, feature = "stratum"))]
extern crate ethash;
#[cfg(any(test, feature = "test-helpers"))]
extern crate account_db;
#[cfg(any(test, feature = "test-helpers"))]
extern crate ethkey;
#[cfg(any(test, feature = "test-helpers"))]
@@ -157,7 +154,6 @@ extern crate parity_runtime;
pub mod block;
pub mod client;
pub mod miner;
pub mod snapshot;
#[cfg(test)]
mod tests;

View File

@@ -1,424 +0,0 @@
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
//! Account state encoding and decoding
use account_db::{AccountDB, AccountDBMut};
use types::{
basic_account::BasicAccount,
snapshot::Progress,
errors::SnapshotError as Error,
};
use bytes::Bytes;
use ethereum_types::{H256, U256};
use ethtrie::{TrieDB, TrieDBMut};
use hash::{KECCAK_EMPTY, KECCAK_NULL_RLP};
use hash_db::HashDB;
use rlp::{RlpStream, Rlp};
use std::collections::HashSet;
use trie::{Trie, TrieMut};
use std::sync::atomic::Ordering;
// An empty account -- these were replaced with RLP null data for a space optimization in v1.
const ACC_EMPTY: BasicAccount = BasicAccount {
nonce: U256([0, 0, 0, 0]),
balance: U256([0, 0, 0, 0]),
storage_root: KECCAK_NULL_RLP,
code_hash: KECCAK_EMPTY,
code_version: U256([0, 0, 0, 0]),
};
// whether an encoded account has code and how it is referred to.
#[repr(u8)]
enum CodeState {
// the account has no code.
Empty = 0,
// raw code is encoded.
Inline = 1,
// the code is referred to by hash.
Hash = 2,
}
impl CodeState {
fn from(x: u8) -> Result<Self, Error> {
match x {
0 => Ok(CodeState::Empty),
1 => Ok(CodeState::Inline),
2 => Ok(CodeState::Hash),
_ => Err(Error::UnrecognizedCodeState(x))
}
}
fn raw(self) -> u8 {
self as u8
}
}
// walk the account's storage trie, returning a vector of RLP items containing the
// account address hash, account properties and the storage. Each item contains at most `max_storage_items`
// storage records split according to snapshot format definition.
pub fn to_fat_rlps(
account_hash: &H256,
acc: &BasicAccount,
acct_db: &AccountDB,
used_code: &mut HashSet<H256>,
first_chunk_size: usize,
max_chunk_size: usize,
p: &Progress,
) -> Result<Vec<Bytes>, Error> {
let db = &(acct_db as &dyn HashDB<_,_>);
let db = TrieDB::new(db, &acc.storage_root)?;
let mut chunks = Vec::new();
let mut db_iter = db.iter()?;
let mut target_chunk_size = first_chunk_size;
let mut account_stream = RlpStream::new_list(2);
let mut leftover: Option<Vec<u8>> = None;
loop {
account_stream.append(account_hash);
let use_short_version = acc.code_version.is_zero();
match use_short_version {
true => { account_stream.begin_list(5); },
false => { account_stream.begin_list(6); },
}
account_stream.append(&acc.nonce)
.append(&acc.balance);
// [has_code, code_hash].
if acc.code_hash == KECCAK_EMPTY {
account_stream.append(&CodeState::Empty.raw()).append_empty_data();
} else if used_code.contains(&acc.code_hash) {
account_stream.append(&CodeState::Hash.raw()).append(&acc.code_hash);
} else {
match acct_db.get(&acc.code_hash, hash_db::EMPTY_PREFIX) {
Some(c) => {
used_code.insert(acc.code_hash.clone());
account_stream.append(&CodeState::Inline.raw()).append(&&*c);
}
None => {
warn!("code lookup failed during snapshot");
account_stream.append(&false).append_empty_data();
}
}
}
if !use_short_version {
account_stream.append(&acc.code_version);
}
account_stream.begin_unbounded_list();
if account_stream.len() > target_chunk_size {
// account does not fit, push an empty record to mark a new chunk
target_chunk_size = max_chunk_size;
chunks.push(Vec::new());
}
if let Some(pair) = leftover.take() {
if !account_stream.append_raw_checked(&pair, 1, target_chunk_size) {
return Err(Error::ChunkTooSmall);
}
}
loop {
if p.abort.load(Ordering::SeqCst) {
trace!(target: "snapshot", "to_fat_rlps: aborting snapshot");
return Err(Error::SnapshotAborted);
}
match db_iter.next() {
Some(Ok((k, v))) => {
let pair = {
let mut stream = RlpStream::new_list(2);
stream.append(&k).append(&&*v);
stream.drain()
};
if !account_stream.append_raw_checked(&pair, 1, target_chunk_size) {
account_stream.complete_unbounded_list();
let stream = ::std::mem::replace(&mut account_stream, RlpStream::new_list(2));
chunks.push(stream.out());
target_chunk_size = max_chunk_size;
leftover = Some(pair);
break;
}
},
Some(Err(e)) => {
return Err(e.into());
},
None => {
account_stream.complete_unbounded_list();
let stream = ::std::mem::replace(&mut account_stream, RlpStream::new_list(2));
chunks.push(stream.out());
return Ok(chunks);
}
}
}
}
}
// decode a fat rlp, and rebuild the storage trie as we go.
// returns the account structure along with its newly recovered code,
// if it exists.
pub fn from_fat_rlp(
acct_db: &mut AccountDBMut,
rlp: Rlp,
mut storage_root: H256,
) -> Result<(BasicAccount, Option<Bytes>), Error> {
// check for special case of empty account.
if rlp.is_empty() {
return Ok((ACC_EMPTY, None));
}
let use_short_version = match rlp.item_count()? {
5 => true,
6 => false,
_ => return Err(rlp::DecoderError::RlpIncorrectListLen.into()),
};
let nonce = rlp.val_at(0)?;
let balance = rlp.val_at(1)?;
let code_state: CodeState = {
let raw: u8 = rlp.val_at(2)?;
CodeState::from(raw)?
};
// load the code if it exists.
let (code_hash, new_code) = match code_state {
CodeState::Empty => (KECCAK_EMPTY, None),
CodeState::Inline => {
let code: Bytes = rlp.val_at(3)?;
let code_hash = acct_db.insert(hash_db::EMPTY_PREFIX, &code);
(code_hash, Some(code))
}
CodeState::Hash => {
let code_hash = rlp.val_at(3)?;
(code_hash, None)
}
};
let code_version = if use_short_version {
U256::zero()
} else {
rlp.val_at(4)?
};
{
let mut storage_trie = if storage_root.is_zero() {
TrieDBMut::new(acct_db, &mut storage_root)
} else {
TrieDBMut::from_existing(acct_db, &mut storage_root)?
};
let pairs = rlp.at(if use_short_version { 4 } else { 5 })?;
for pair_rlp in pairs.iter() {
let k: Bytes = pair_rlp.val_at(0)?;
let v: Bytes = pair_rlp.val_at(1)?;
storage_trie.insert(&k, &v)?;
}
}
let acc = BasicAccount {
nonce,
balance,
storage_root,
code_hash,
code_version,
};
Ok((acc, new_code))
}
#[cfg(test)]
mod tests {
use account_db::{AccountDB, AccountDBMut};
use types::basic_account::BasicAccount;
use test_helpers::get_temp_state_db;
use snapshot::tests::helpers::fill_storage;
use snapshot::Progress;
use hash::{KECCAK_EMPTY, KECCAK_NULL_RLP, keccak};
use ethereum_types::{H256, Address};
use hash_db::{HashDB, EMPTY_PREFIX};
use kvdb::DBValue;
use rlp::Rlp;
use std::collections::HashSet;
use super::{ACC_EMPTY, to_fat_rlps, from_fat_rlp};
#[test]
fn encoding_basic() {
let mut db = get_temp_state_db();
let addr = Address::random();
let account = BasicAccount {
nonce: 50.into(),
balance: 123456789.into(),
storage_root: KECCAK_NULL_RLP,
code_hash: KECCAK_EMPTY,
code_version: 0.into(),
};
let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
let p = Progress::default();
let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
let fat_rlp = Rlp::new(&fat_rlps[0]).at(1).unwrap();
assert_eq!(from_fat_rlp(&mut AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr)), fat_rlp, H256::zero()).unwrap().0, account);
}
#[test]
fn encoding_version() {
let mut db = get_temp_state_db();
let addr = Address::random();
let account = BasicAccount {
nonce: 50.into(),
balance: 123456789.into(),
storage_root: KECCAK_NULL_RLP,
code_hash: KECCAK_EMPTY,
code_version: 1.into(),
};
let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
let p = Progress::default();
let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
let fat_rlp = Rlp::new(&fat_rlps[0]).at(1).unwrap();
assert_eq!(from_fat_rlp(&mut AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr)), fat_rlp, H256::zero()).unwrap().0, account);
}
#[test]
fn encoding_storage() {
let mut db = get_temp_state_db();
let addr = Address::random();
let account = {
let acct_db = AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr));
let mut root = KECCAK_NULL_RLP;
fill_storage(acct_db, &mut root, &mut H256::zero());
BasicAccount {
nonce: 25.into(),
balance: 987654321.into(),
storage_root: root,
code_hash: KECCAK_EMPTY,
code_version: 0.into(),
}
};
let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
let p = Progress::default();
let fat_rlp = to_fat_rlps(&keccak(&addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
let fat_rlp = Rlp::new(&fat_rlp[0]).at(1).unwrap();
assert_eq!(from_fat_rlp(&mut AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr)), fat_rlp, H256::zero()).unwrap().0, account);
}
#[test]
fn encoding_storage_split() {
let mut db = get_temp_state_db();
let addr = Address::random();
let account = {
let acct_db = AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr));
let mut root = KECCAK_NULL_RLP;
fill_storage(acct_db, &mut root, &mut H256::zero());
BasicAccount {
nonce: 25.into(),
balance: 987654321.into(),
storage_root: root,
code_hash: KECCAK_EMPTY,
code_version: 0.into(),
}
};
let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
let p = Progress::default();
let fat_rlps = to_fat_rlps(&keccak(addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), 500, 1000, &p).unwrap();
let mut root = KECCAK_NULL_RLP;
let mut restored_account = None;
for rlp in fat_rlps {
let fat_rlp = Rlp::new(&rlp).at(1).unwrap();
restored_account = Some(from_fat_rlp(&mut AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr)), fat_rlp, root).unwrap().0);
root = restored_account.as_ref().unwrap().storage_root.clone();
}
assert_eq!(restored_account, Some(account));
}
#[test]
fn encoding_code() {
let mut db = get_temp_state_db();
let addr1 = Address::random();
let addr2 = Address::random();
let code_hash = {
let mut acct_db = AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr1));
acct_db.insert(EMPTY_PREFIX, b"this is definitely code")
};
{
let mut acct_db = AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr2));
acct_db.emplace(code_hash.clone(), EMPTY_PREFIX, DBValue::from_slice(b"this is definitely code"));
}
let account1 = BasicAccount {
nonce: 50.into(),
balance: 123456789.into(),
storage_root: KECCAK_NULL_RLP,
code_hash,
code_version: 0.into(),
};
let account2 = BasicAccount {
nonce: 400.into(),
balance: 98765432123456789usize.into(),
storage_root: KECCAK_NULL_RLP,
code_hash,
code_version: 0.into(),
};
let mut used_code = HashSet::new();
let p1 = Progress::default();
let p2 = Progress::default();
let fat_rlp1 = to_fat_rlps(&keccak(&addr1), &account1, &AccountDB::from_hash(db.as_hash_db(), keccak(addr1)), &mut used_code, usize::max_value(), usize::max_value(), &p1).unwrap();
let fat_rlp2 = to_fat_rlps(&keccak(&addr2), &account2, &AccountDB::from_hash(db.as_hash_db(), keccak(addr2)), &mut used_code, usize::max_value(), usize::max_value(), &p2).unwrap();
assert_eq!(used_code.len(), 1);
let fat_rlp1 = Rlp::new(&fat_rlp1[0]).at(1).unwrap();
let fat_rlp2 = Rlp::new(&fat_rlp2[0]).at(1).unwrap();
let (acc, maybe_code) = from_fat_rlp(&mut AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr2)), fat_rlp2, H256::zero()).unwrap();
assert!(maybe_code.is_none());
assert_eq!(acc, account2);
let (acc, maybe_code) = from_fat_rlp(&mut AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr1)), fat_rlp1, H256::zero()).unwrap();
assert_eq!(maybe_code, Some(b"this is definitely code".to_vec()));
assert_eq!(acc, account1);
}
#[test]
fn encoding_empty_acc() {
let mut db = get_temp_state_db();
assert_eq!(from_fat_rlp(&mut AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(Address::zero())), Rlp::new(&::rlp::NULL_RLP), H256::zero()).unwrap(), (ACC_EMPTY, None));
}
}

View File

@@ -1,204 +0,0 @@
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
//! Block RLP compression.
use bytes::Bytes;
use ethereum_types::H256;
use hash::keccak;
use rlp::{DecoderError, RlpStream, Rlp};
use triehash::ordered_trie_root;
use types::block::Block;
use types::header::Header;
use types::views::BlockView;
const HEADER_FIELDS: usize = 8;
const BLOCK_FIELDS: usize = 2;
pub struct AbridgedBlock {
rlp: Bytes,
}
impl AbridgedBlock {
/// Create from rlp-compressed bytes. Does no verification.
pub fn from_raw(compressed: Bytes) -> Self {
AbridgedBlock {
rlp: compressed,
}
}
/// Return the inner bytes.
pub fn into_inner(self) -> Bytes {
self.rlp
}
/// Given a full block view, trim out the parent hash and block number,
/// producing new rlp.
pub fn from_block_view(block_view: &BlockView) -> Self {
let header = block_view.header_view();
let seal_fields = header.seal();
// 10 header fields, unknown number of seal fields, and 2 block fields.
let mut stream = RlpStream::new_list(
HEADER_FIELDS +
seal_fields.len() +
BLOCK_FIELDS
);
// write header values.
stream
.append(&header.author())
.append(&header.state_root())
.append(&header.log_bloom())
.append(&header.difficulty())
.append(&header.gas_limit())
.append(&header.gas_used())
.append(&header.timestamp())
.append(&header.extra_data());
// write block values.
stream
.append_list(&block_view.transactions())
.append_list(&block_view.uncles());
// write seal fields.
for field in seal_fields {
stream.append_raw(&field, 1);
}
AbridgedBlock {
rlp: stream.out(),
}
}
/// Flesh out an abridged block view with the provided parent hash and block number.
///
/// Will fail if contains invalid rlp.
pub fn to_block(&self, parent_hash: H256, number: u64, receipts_root: H256) -> Result<Block, DecoderError> {
let rlp = Rlp::new(&self.rlp);
let mut header: Header = Default::default();
header.set_parent_hash(parent_hash);
header.set_author(rlp.val_at(0)?);
header.set_state_root(rlp.val_at(1)?);
header.set_log_bloom(rlp.val_at(2)?);
header.set_difficulty(rlp.val_at(3)?);
header.set_number(number);
header.set_gas_limit(rlp.val_at(4)?);
header.set_gas_used(rlp.val_at(5)?);
header.set_timestamp(rlp.val_at(6)?);
header.set_extra_data(rlp.val_at(7)?);
let transactions = rlp.list_at(8)?;
let uncles: Vec<Header> = rlp.list_at(9)?;
header.set_transactions_root(ordered_trie_root(
rlp.at(8)?.iter().map(|r| r.as_raw())
));
header.set_receipts_root(receipts_root);
let mut uncles_rlp = RlpStream::new();
uncles_rlp.append_list(&uncles);
header.set_uncles_hash(keccak(uncles_rlp.as_raw()));
let mut seal_fields = Vec::new();
for i in (HEADER_FIELDS + BLOCK_FIELDS)..rlp.item_count()? {
let seal_rlp = rlp.at(i)?;
seal_fields.push(seal_rlp.as_raw().to_owned());
}
header.set_seal(seal_fields);
Ok(Block {
header: header,
transactions: transactions,
uncles: uncles,
})
}
}
#[cfg(test)]
mod tests {
use super::AbridgedBlock;
use bytes::Bytes;
use ethereum_types::{H256, U256, Address};
use types::transaction::{Action, Transaction};
use types::block::Block;
use types::view;
use types::views::BlockView;
fn encode_block(b: &Block) -> Bytes {
b.rlp_bytes()
}
#[test]
fn empty_block_abridging() {
let b = Block::default();
let receipts_root = b.header.receipts_root().clone();
let encoded = encode_block(&b);
let abridged = AbridgedBlock::from_block_view(&view!(BlockView, &encoded));
assert_eq!(abridged.to_block(H256::zero(), 0, receipts_root).unwrap(), b);
}
#[test]
#[should_panic]
fn wrong_number() {
let b = Block::default();
let receipts_root = b.header.receipts_root().clone();
let encoded = encode_block(&b);
let abridged = AbridgedBlock::from_block_view(&view!(BlockView, &encoded));
assert_eq!(abridged.to_block(H256::zero(), 2, receipts_root).unwrap(), b);
}
#[test]
fn with_transactions() {
let mut b = Block::default();
let t1 = Transaction {
action: Action::Create,
nonce: U256::from(42),
gas_price: U256::from(3000),
gas: U256::from(50_000),
value: U256::from(1),
data: b"Hello!".to_vec()
}.fake_sign(Address::from_low_u64_be(0x69));
let t2 = Transaction {
action: Action::Create,
nonce: U256::from(88),
gas_price: U256::from(12345),
gas: U256::from(300000),
value: U256::from(1000000000),
data: "Eep!".into(),
}.fake_sign(Address::from_low_u64_be(0x55));
b.transactions.push(t1.into());
b.transactions.push(t2.into());
let receipts_root = b.header.receipts_root().clone();
b.header.set_transactions_root(::triehash::ordered_trie_root(
b.transactions.iter().map(::rlp::encode)
));
let encoded = encode_block(&b);
let abridged = AbridgedBlock::from_block_view(&view!(BlockView, &encoded[..]));
assert_eq!(abridged.to_block(H256::zero(), 0, receipts_root).unwrap(), b);
}
}

View File

@@ -1,398 +0,0 @@
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
//! Secondary chunk creation and restoration, implementation for proof-of-authority
//! based engines.
//!
//! The chunks here contain state proofs of transitions, along with validator proofs.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use engine::{Engine, EpochVerifier};
use blockchain::{BlockChain, BlockChainDB, BlockProvider};
use bytes::Bytes;
use ethereum_types::{H256, U256};
use itertools::{Position, Itertools};
use kvdb::KeyValueDB;
use rlp::{RlpStream, Rlp};
use types::{
encoded,
engines::epoch::Transition as EpochTransition,
header::Header,
ids::BlockId,
errors::{SnapshotError, EthcoreError},
receipt::Receipt,
snapshot::{ChunkSink, Progress, ManifestData}
};
use snapshot::{SnapshotComponents, Rebuilder};
/// Snapshot creation and restoration for PoA chains.
/// Chunk format:
///
/// [FLAG, [header, epoch data], ...]
/// - Header data at which transition occurred,
/// - epoch data (usually list of validators and proof of change)
///
/// FLAG is a bool: true for last chunk, false otherwise.
///
/// The last item of the last chunk will be a list containing data for the warp target block:
/// [header, transactions, uncles, receipts, parent_td].
pub struct PoaSnapshot;
impl SnapshotComponents for PoaSnapshot {
fn chunk_all(
&mut self,
chain: &BlockChain,
block_at: H256,
sink: &mut ChunkSink,
_progress: &Progress,
preferred_size: usize,
) -> Result<(), SnapshotError> {
let number = chain.block_number(&block_at)
.ok_or_else(||SnapshotError::InvalidStartingBlock(BlockId::Hash(block_at)))?;
let mut pending_size = 0;
let mut rlps = Vec::new();
for (_, transition) in chain.epoch_transitions()
.take_while(|&(_, ref t)| t.block_number <= number)
{
// this can happen when our starting block is non-canonical.
if transition.block_number == number && transition.block_hash != block_at {
break
}
let header = chain.block_header_data(&transition.block_hash)
.ok_or_else(||SnapshotError::BlockNotFound(transition.block_hash))?;
let entry = {
let mut entry_stream = RlpStream::new_list(2);
entry_stream
.append_raw(&header.into_inner(), 1)
.append(&transition.proof);
entry_stream.out()
};
// cut of the chunk if too large.
let new_loaded_size = pending_size + entry.len();
pending_size = if new_loaded_size > preferred_size && !rlps.is_empty() {
write_chunk(false, &mut rlps, sink)?;
entry.len()
} else {
new_loaded_size
};
rlps.push(entry);
}
let (block, receipts) = chain.block(&block_at)
.and_then(|b| chain.block_receipts(&block_at).map(|r| (b, r)))
.ok_or_else(||SnapshotError::BlockNotFound(block_at))?;
let block = block.decode()?;
let parent_td = chain.block_details(block.header.parent_hash())
.map(|d| d.total_difficulty)
.ok_or_else(||SnapshotError::BlockNotFound(block_at))?;
rlps.push({
let mut stream = RlpStream::new_list(5);
stream
.append(&block.header)
.append_list(&block.transactions)
.append_list(&block.uncles)
.append(&receipts)
.append(&parent_td);
stream.out()
});
write_chunk(true, &mut rlps, sink)?;
Ok(())
}
fn rebuilder(
&self,
chain: BlockChain,
db: Arc<dyn BlockChainDB>,
manifest: &ManifestData,
) -> Result<Box<dyn Rebuilder>, EthcoreError> {
Ok(Box::new(ChunkRebuilder {
manifest: manifest.clone(),
warp_target: None,
chain,
db: db.key_value().clone(),
had_genesis: false,
unverified_firsts: Vec::new(),
last_epochs: Vec::new(),
}))
}
fn min_supported_version(&self) -> u64 { 3 }
fn current_version(&self) -> u64 { 3 }
}
// writes a chunk composed of the inner RLPs here.
// flag indicates whether the chunk is the last chunk.
fn write_chunk(last: bool, chunk_data: &mut Vec<Bytes>, sink: &mut ChunkSink) -> Result<(), SnapshotError> {
let mut stream = RlpStream::new_list(1 + chunk_data.len());
stream.append(&last);
for item in chunk_data.drain(..) {
stream.append_raw(&item, 1);
}
(sink)(stream.out().as_slice()).map_err(Into::into)
}
// rebuilder checks state proofs for all transitions, and checks that each
// transition header is verifiable from the epoch data of the one prior.
struct ChunkRebuilder {
manifest: ManifestData,
warp_target: Option<Header>,
chain: BlockChain,
db: Arc<dyn KeyValueDB>,
had_genesis: bool,
// sorted vectors of unverified first blocks in a chunk
// and epoch data from last blocks in chunks.
// verification for these will be done at the end.
unverified_firsts: Vec<(Header, Bytes, H256)>,
last_epochs: Vec<(Header, Box<dyn EpochVerifier>)>,
}
// verified data.
struct Verified {
epoch_transition: EpochTransition,
header: Header,
}
impl ChunkRebuilder {
fn verify_transition(
&mut self,
last_verifier: &mut Option<Box<dyn EpochVerifier>>,
transition_rlp: Rlp,
engine: &dyn Engine,
) -> Result<Verified, EthcoreError> {
use engine::ConstructedVerifier;
// decode.
let header: Header = transition_rlp.val_at(0)?;
let epoch_data: Bytes = transition_rlp.val_at(1)?;
trace!(target: "snapshot", "verifying transition to epoch at block {}", header.number());
// check current transition against validators of last epoch.
let new_verifier = match engine.epoch_verifier(&header, &epoch_data) {
ConstructedVerifier::Trusted(v) => v,
ConstructedVerifier::Unconfirmed(v, finality_proof, hash) => {
match *last_verifier {
Some(ref last) =>
if last.check_finality_proof(finality_proof).map_or(true, |hashes| !hashes.contains(&hash))
{
return Err(SnapshotError::BadEpochProof(header.number()).into());
},
None if header.number() != 0 => {
// genesis never requires additional validation.
let idx = self.unverified_firsts
.binary_search_by_key(&header.number(), |&(ref h, _, _)| h.number())
.unwrap_or_else(|x| x);
let entry = (header.clone(), finality_proof.to_owned(), hash);
self.unverified_firsts.insert(idx, entry);
}
None => {}
}
v
}
ConstructedVerifier::Err(e) => return Err(e),
};
// create new epoch verifier.
*last_verifier = Some(new_verifier);
Ok(Verified {
epoch_transition: EpochTransition {
block_hash: header.hash(),
block_number: header.number(),
proof: epoch_data,
},
header,
})
}
}
impl Rebuilder for ChunkRebuilder {
fn feed(
&mut self,
chunk: &[u8],
engine: &dyn Engine,
abort_flag: &AtomicBool,
) -> Result<(), EthcoreError> {
let rlp = Rlp::new(chunk);
let is_last_chunk: bool = rlp.val_at(0)?;
let num_items = rlp.item_count()?;
// number of transitions in the chunk.
let num_transitions = if is_last_chunk {
num_items - 2
} else {
num_items - 1
};
if num_transitions == 0 && !is_last_chunk {
return Err(SnapshotError::WrongChunkFormat("Found non-last chunk without any data.".into()).into());
}
let mut last_verifier = None;
let mut last_number = None;
for transition_rlp in rlp.iter().skip(1).take(num_transitions).with_position() {
if !abort_flag.load(Ordering::SeqCst) { return Err(SnapshotError::RestorationAborted.into()) }
let (is_first, is_last) = match transition_rlp {
Position::First(_) => (true, false),
Position::Middle(_) => (false, false),
Position::Last(_) => (false, true),
Position::Only(_) => (true, true),
};
let transition_rlp = transition_rlp.into_inner();
let verified = self.verify_transition(
&mut last_verifier,
transition_rlp,
engine,
)?;
if last_number.map_or(false, |num| verified.header.number() <= num) {
return Err(SnapshotError::WrongChunkFormat("Later epoch transition in earlier or same block.".into()).into());
}
last_number = Some(verified.header.number());
// book-keep borders for verification later.
if is_first {
// make sure the genesis transition was included,
// but it doesn't need verification later.
if verified.header.number() == 0 {
if verified.header.hash() != self.chain.genesis_hash() {
return Err(SnapshotError::WrongBlockHash(0, verified.header.hash(), self.chain.genesis_hash()).into());
}
self.had_genesis = true;
}
}
if is_last {
let idx = self.last_epochs
.binary_search_by_key(&verified.header.number(), |&(ref h, _)| h.number())
.unwrap_or_else(|x| x);
let entry = (
verified.header.clone(),
last_verifier.take().expect("last_verifier always set after verify_transition; qed"),
);
self.last_epochs.insert(idx, entry);
}
// write epoch transition into database.
let mut batch = self.db.transaction();
self.chain.insert_epoch_transition(&mut batch, verified.header.number(),
verified.epoch_transition);
self.db.write_buffered(batch);
trace!(target: "snapshot", "Verified epoch transition for epoch at block {}", verified.header.number());
}
if is_last_chunk {
use types::block::Block;
let last_rlp = rlp.at(num_items - 1)?;
let block = Block {
header: last_rlp.val_at(0)?,
transactions: last_rlp.list_at(1)?,
uncles: last_rlp.list_at(2)?,
};
let block_data = block.rlp_bytes();
let receipts: Vec<Receipt> = last_rlp.list_at(3)?;
{
let hash = block.header.hash();
let best_hash = self.manifest.block_hash;
if hash != best_hash {
return Err(SnapshotError::WrongBlockHash(block.header.number(), best_hash, hash).into())
}
}
let parent_td: U256 = last_rlp.val_at(4)?;
let mut batch = self.db.transaction();
self.chain.insert_unordered_block(&mut batch, encoded::Block::new(block_data), receipts, Some(parent_td), true, false);
self.db.write_buffered(batch);
self.warp_target = Some(block.header);
}
Ok(())
}
fn finalize(&mut self) -> Result<(), EthcoreError> {
if !self.had_genesis {
return Err(SnapshotError::WrongChunkFormat("No genesis transition included.".into()).into());
}
let target_header = match self.warp_target.take() {
Some(x) => x,
None => return Err(SnapshotError::WrongChunkFormat("Warp target block not included.".into()).into()),
};
trace!(target: "snapshot", "rebuilder, finalize: verifying {} unverified first blocks", self.unverified_firsts.len());
// verify the first entries of chunks we couldn't before.
// we store all last verifiers, but not all firsts.
// match each unverified first epoch with a last epoch verifier.
let mut lasts_reversed = self.last_epochs.iter().rev();
for &(ref header, ref finality_proof, hash) in self.unverified_firsts.iter().rev() {
let mut found = false;
while let Some(&(ref last_header, ref last_verifier)) = lasts_reversed.next() {
if last_header.number() < header.number() {
if last_verifier.check_finality_proof(&finality_proof).map_or(true, |hashes| !hashes.contains(&hash)) {
return Err(SnapshotError::BadEpochProof(header.number()).into());
}
found = true;
break;
}
}
if !found {
return Err(SnapshotError::WrongChunkFormat("Inconsistent chunk ordering.".into()).into());
}
}
// verify that the warp target verifies correctly the
// most recent epoch. if the warp target was a transition itself,
// it's already verified and doesn't need any more verification.
let &(ref header, ref last_epoch) = self.last_epochs.last()
.expect("last_epochs known to have at least one element by the check above; qed");
if header != &target_header {
last_epoch.verify_heavy(&target_header)?;
}
Ok(())
}
}

View File

@@ -1,37 +0,0 @@
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
//! Secondary chunk creation and restoration, implementations for different consensus
//! engines.
mod authority;
mod work;
pub use self::authority::*;
pub use self::work::*;
use snapshot::SnapshotComponents;
use types::snapshot::Snapshotting::{self, *};
/// Create a factory for building snapshot chunks and restoring from them.
/// `None` indicates that the engine doesn't support snapshot creation.
pub fn chunker(snapshot_type: Snapshotting) -> Option<Box<dyn SnapshotComponents>> {
match snapshot_type {
PoA => Some(Box::new(PoaSnapshot)),
PoW { blocks, max_restore_blocks } => Some(Box::new(PowSnapshot::new(blocks, max_restore_blocks))),
Unsupported => None,
}
}

View File

@@ -1,332 +0,0 @@
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
//! Secondary chunk creation and restoration, implementation for proof-of-work
//! chains.
//!
//! The secondary chunks in this instance are 30,000 "abridged blocks" from the head
//! of the chain, which serve as an indication of valid chain.
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use blockchain::{BlockChain, BlockChainDB, BlockProvider};
use engine::Engine;
use snapshot::block::AbridgedBlock;
use ethereum_types::H256;
use kvdb::KeyValueDB;
use bytes::Bytes;
use rlp::{RlpStream, Rlp};
use rand::rngs::OsRng;
use types::{
encoded,
engines::epoch::Transition as EpochTransition,
errors::{SnapshotError, EthcoreError},
snapshot::{ChunkSink, ManifestData, Progress},
};
use snapshot::{SnapshotComponents, Rebuilder};
/// Snapshot creation and restoration for PoW chains.
/// This includes blocks from the head of the chain as a
/// loose assurance that the chain is valid.
#[derive(Clone, Copy, PartialEq)]
pub struct PowSnapshot {
/// Number of blocks from the head of the chain
/// to include in the snapshot.
pub blocks: u64,
/// Number of blocks to allow in the snapshot when restoring.
pub max_restore_blocks: u64,
}
impl PowSnapshot {
/// Create a new instance.
pub fn new(blocks: u64, max_restore_blocks: u64) -> PowSnapshot {
PowSnapshot { blocks, max_restore_blocks }
}
}
impl SnapshotComponents for PowSnapshot {
fn chunk_all(
&mut self,
chain: &BlockChain,
block_at: H256,
chunk_sink: &mut ChunkSink,
progress: &Progress,
preferred_size: usize,
) -> Result<(), SnapshotError> {
PowWorker {
chain,
rlps: VecDeque::new(),
current_hash: block_at,
writer: chunk_sink,
progress,
preferred_size,
}.chunk_all(self.blocks)
}
fn rebuilder(
&self,
chain: BlockChain,
db: Arc<dyn BlockChainDB>,
manifest: &ManifestData,
) -> Result<Box<dyn Rebuilder>, EthcoreError> {
PowRebuilder::new(
chain,
db.key_value().clone(),
manifest,
self.max_restore_blocks
).map(|r| Box::new(r) as Box<_>)
}
fn min_supported_version(&self) -> u64 { ::snapshot::MIN_SUPPORTED_STATE_CHUNK_VERSION }
fn current_version(&self) -> u64 { ::snapshot::STATE_CHUNK_VERSION }
}
/// Used to build block chunks.
struct PowWorker<'a> {
chain: &'a BlockChain,
// block, receipt rlp pairs.
rlps: VecDeque<Bytes>,
current_hash: H256,
writer: &'a mut ChunkSink<'a>,
progress: &'a Progress,
preferred_size: usize,
}
impl<'a> PowWorker<'a> {
// Repeatedly fill the buffers and writes out chunks, moving backwards from starting block hash.
// Loops until we reach the first desired block, and writes out the remainder.
fn chunk_all(&mut self, snapshot_blocks: u64) -> Result<(), SnapshotError> {
let mut loaded_size = 0;
let mut last = self.current_hash;
let genesis_hash = self.chain.genesis_hash();
for _ in 0..snapshot_blocks {
if self.current_hash == genesis_hash { break }
let (block, receipts) = self.chain.block(&self.current_hash)
.and_then(|b| self.chain.block_receipts(&self.current_hash).map(|r| (b, r)))
.ok_or_else(||SnapshotError::BlockNotFound(self.current_hash))?;
let abridged_rlp = AbridgedBlock::from_block_view(&block.view()).into_inner();
let pair = {
let mut pair_stream = RlpStream::new_list(2);
pair_stream.append_raw(&abridged_rlp, 1).append(&receipts);
pair_stream.out()
};
let new_loaded_size = loaded_size + pair.len();
// cut off the chunk if too large.
if new_loaded_size > self.preferred_size && !self.rlps.is_empty() {
self.write_chunk(last)?;
loaded_size = pair.len();
} else {
loaded_size = new_loaded_size;
}
self.rlps.push_front(pair);
last = self.current_hash;
self.current_hash = block.header_view().parent_hash();
self.progress.blocks.fetch_add(1, Ordering::SeqCst);
}
if loaded_size != 0 {
self.write_chunk(last)?;
}
Ok(())
}
// write out the data in the buffers to a chunk on disk
//
// we preface each chunk with the parent of the first block's details,
// obtained from the details of the last block written.
fn write_chunk(&mut self, last: H256) -> Result<(), SnapshotError> {
trace!(target: "snapshot", "prepared block chunk with {} blocks", self.rlps.len());
let (last_header, last_details) = self.chain.block_header_data(&last)
.and_then(|n| self.chain.block_details(&last).map(|d| (n, d)))
.ok_or_else(||SnapshotError::BlockNotFound(last))?;
let parent_number = last_header.number() - 1;
let parent_hash = last_header.parent_hash();
let parent_total_difficulty = last_details.total_difficulty - last_header.difficulty();
trace!(target: "snapshot", "parent last written block: {}", parent_hash);
let num_entries = self.rlps.len();
let mut rlp_stream = RlpStream::new_list(3 + num_entries);
rlp_stream.append(&parent_number).append(&parent_hash).append(&parent_total_difficulty);
for pair in self.rlps.drain(..) {
rlp_stream.append_raw(&pair, 1);
}
let raw_data = rlp_stream.out();
(self.writer)(&raw_data)?;
Ok(())
}
}
/// Rebuilder for proof-of-work chains.
/// Does basic verification for all blocks, but `PoW` verification for some.
/// Blocks must be fed in-order.
///
/// The first block in every chunk is disconnected from the last block in the
/// chunk before it, as chunks may be submitted out-of-order.
///
/// After all chunks have been submitted, we "glue" the chunks together.
pub struct PowRebuilder {
chain: BlockChain,
db: Arc<dyn KeyValueDB>,
rng: OsRng,
disconnected: Vec<(u64, H256)>,
best_number: u64,
best_hash: H256,
best_root: H256,
fed_blocks: u64,
snapshot_blocks: u64,
}
impl PowRebuilder {
/// Create a new PowRebuilder.
fn new(chain: BlockChain, db: Arc<dyn KeyValueDB>, manifest: &ManifestData, snapshot_blocks: u64) -> Result<Self, EthcoreError> {
Ok(PowRebuilder {
chain,
db,
rng: OsRng::new().map_err(|e| format!("{}", e))?,
disconnected: Vec::new(),
best_number: manifest.block_number,
best_hash: manifest.block_hash,
best_root: manifest.state_root,
fed_blocks: 0,
snapshot_blocks,
})
}
}
impl Rebuilder for PowRebuilder {
/// Feed the rebuilder an uncompressed block chunk.
/// Returns the number of blocks fed or any errors.
fn feed(&mut self, chunk: &[u8], engine: &dyn Engine, abort_flag: &AtomicBool) -> Result<(), EthcoreError> {
use snapshot::verify_old_block;
use ethereum_types::U256;
use triehash::ordered_trie_root;
let rlp = Rlp::new(chunk);
let item_count = rlp.item_count()?;
let num_blocks = (item_count - 3) as u64;
trace!(target: "snapshot", "restoring block chunk with {} blocks.", num_blocks);
if self.fed_blocks + num_blocks > self.snapshot_blocks {
return Err(SnapshotError::TooManyBlocks(self.snapshot_blocks, self.fed_blocks + num_blocks).into())
}
// todo: assert here that these values are consistent with chunks being in order.
let mut cur_number = rlp.val_at::<u64>(0)? + 1;
let mut parent_hash = rlp.val_at::<H256>(1)?;
let parent_total_difficulty = rlp.val_at::<U256>(2)?;
for idx in 3..item_count {
if !abort_flag.load(Ordering::SeqCst) { return Err(SnapshotError::RestorationAborted.into()) }
let pair = rlp.at(idx)?;
let abridged_rlp = pair.at(0)?.as_raw().to_owned();
let abridged_block = AbridgedBlock::from_raw(abridged_rlp);
let receipts: Vec<::types::receipt::Receipt> = pair.list_at(1)?;
let receipts_root = ordered_trie_root(pair.at(1)?.iter().map(|r| r.as_raw()));
let block = abridged_block.to_block(parent_hash, cur_number, receipts_root)?;
let block_bytes = encoded::Block::new(block.rlp_bytes());
let is_best = cur_number == self.best_number;
if is_best {
if block.header.hash() != self.best_hash {
return Err(SnapshotError::WrongBlockHash(cur_number, self.best_hash, block.header.hash()).into())
}
if block.header.state_root() != &self.best_root {
return Err(SnapshotError::WrongStateRoot(self.best_root, *block.header.state_root()).into())
}
}
verify_old_block(
&mut self.rng,
&block.header,
engine,
&self.chain,
is_best
)?;
let mut batch = self.db.transaction();
// special-case the first block in each chunk.
if idx == 3 {
if self.chain.insert_unordered_block(&mut batch, block_bytes, receipts, Some(parent_total_difficulty), is_best, false) {
self.disconnected.push((cur_number, block.header.hash()));
}
} else {
self.chain.insert_unordered_block(&mut batch, block_bytes, receipts, None, is_best, false);
}
self.db.write_buffered(batch);
self.chain.commit();
parent_hash = block.header.hash();
cur_number += 1;
}
self.fed_blocks += num_blocks;
Ok(())
}
/// Glue together any disconnected chunks and check that the chain is complete.
fn finalize(&mut self) -> Result<(), EthcoreError> {
let mut batch = self.db.transaction();
trace!(target: "snapshot", "rebuilder, finalize: inserting {} disconnected chunks", self.disconnected.len());
for (first_num, first_hash) in self.disconnected.drain(..) {
let parent_num = first_num - 1;
// check if the parent is even in the chain.
// since we don't restore every single block in the chain,
// the first block of the first chunks has nothing to connect to.
if let Some(parent_hash) = self.chain.block_hash(parent_num) {
// if so, add the child to it.
self.chain.add_child(&mut batch, parent_hash, first_hash);
}
}
let genesis_hash = self.chain.genesis_hash();
self.chain.insert_epoch_transition(&mut batch, 0, EpochTransition {
block_number: 0,
block_hash: genesis_hash,
proof: vec![],
});
self.db.write_buffered(batch);
Ok(())
}
}

View File

@@ -1,409 +0,0 @@
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
//! Snapshot i/o.
//! Ways of writing and reading snapshots. This module supports writing and reading
//! snapshots of two different formats: packed and loose.
//! Packed snapshots are written to a single file, and loose snapshots are
//! written to multiple files in one directory.
use std::collections::HashMap;
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::fs::{self, File};
use std::path::{Path, PathBuf};
use bytes::Bytes;
use client_traits::SnapshotWriter;
use ethereum_types::H256;
use rlp::{RlpStream, Rlp};
use types::{
errors::{SnapshotError, EthcoreError},
snapshot::ManifestData,
};
const SNAPSHOT_VERSION: u64 = 2;
// (hash, len, offset)
#[derive(RlpEncodable, RlpDecodable)]
struct ChunkInfo(H256, u64, u64);
/// A packed snapshot writer. This writes snapshots to a single concatenated file.
///
/// The file format is very simple and consists of three parts:
/// [Concatenated chunk data]
/// [manifest as RLP]
/// [manifest start offset (8 bytes little-endian)]
///
/// The manifest contains all the same information as a standard `ManifestData`,
/// but also maps chunk hashes to their lengths and offsets in the file
/// for easy reading.
pub struct PackedWriter {
file: File,
state_hashes: Vec<ChunkInfo>,
block_hashes: Vec<ChunkInfo>,
cur_len: u64,
}
impl PackedWriter {
/// Create a new "PackedWriter", to write into the file at the given path.
pub fn new(path: &Path) -> io::Result<Self> {
Ok(PackedWriter {
file: File::create(path)?,
state_hashes: Vec::new(),
block_hashes: Vec::new(),
cur_len: 0,
})
}
}
impl SnapshotWriter for PackedWriter {
fn write_state_chunk(&mut self, hash: H256, chunk: &[u8]) -> io::Result<()> {
self.file.write_all(chunk)?;
let len = chunk.len() as u64;
self.state_hashes.push(ChunkInfo(hash, len, self.cur_len));
self.cur_len += len;
Ok(())
}
fn write_block_chunk(&mut self, hash: H256, chunk: &[u8]) -> io::Result<()> {
self.file.write_all(chunk)?;
let len = chunk.len() as u64;
self.block_hashes.push(ChunkInfo(hash, len, self.cur_len));
self.cur_len += len;
Ok(())
}
fn finish(mut self, manifest: ManifestData) -> io::Result<()> {
// we ignore the hashes fields of the manifest under the assumption that
// they are consistent with ours.
let mut stream = RlpStream::new_list(6);
stream
.append(&SNAPSHOT_VERSION)
.append_list(&self.state_hashes)
.append_list(&self.block_hashes)
.append(&manifest.state_root)
.append(&manifest.block_number)
.append(&manifest.block_hash);
let manifest_rlp = stream.out();
self.file.write_all(&manifest_rlp)?;
let off = self.cur_len;
trace!(target: "snapshot_io", "writing manifest of len {} to offset {}", manifest_rlp.len(), off);
let off_bytes: [u8; 8] =
[
off as u8,
(off >> 8) as u8,
(off >> 16) as u8,
(off >> 24) as u8,
(off >> 32) as u8,
(off >> 40) as u8,
(off >> 48) as u8,
(off >> 56) as u8,
];
self.file.write_all(&off_bytes[..])?;
Ok(())
}
}
/// A "loose" writer writes chunk files into a directory.
pub struct LooseWriter {
dir: PathBuf,
}
impl LooseWriter {
/// Create a new LooseWriter which will write into the given directory,
/// creating it if it doesn't exist.
pub fn new(path: PathBuf) -> io::Result<Self> {
fs::create_dir_all(&path)?;
Ok(LooseWriter {
dir: path,
})
}
// writing logic is the same for both kinds of chunks.
fn write_chunk(&mut self, hash: H256, chunk: &[u8]) -> io::Result<()> {
let file_path = self.dir.join(format!("{:x}", hash));
let mut file = File::create(file_path)?;
file.write_all(chunk)?;
Ok(())
}
}
impl SnapshotWriter for LooseWriter {
fn write_state_chunk(&mut self, hash: H256, chunk: &[u8]) -> io::Result<()> {
self.write_chunk(hash, chunk)
}
fn write_block_chunk(&mut self, hash: H256, chunk: &[u8]) -> io::Result<()> {
self.write_chunk(hash, chunk)
}
fn finish(self, manifest: ManifestData) -> io::Result<()> {
let rlp = manifest.into_rlp();
let mut path = self.dir.clone();
path.push("MANIFEST");
let mut file = File::create(path)?;
file.write_all(&rlp[..])?;
Ok(())
}
}
/// Something which can read compressed snapshots.
pub trait SnapshotReader {
/// Get the manifest data for this snapshot.
fn manifest(&self) -> &ManifestData;
/// Get raw chunk data by hash. implementation defined behavior
/// if a chunk not in the manifest is requested.
fn chunk(&self, hash: H256) -> io::Result<Bytes>;
}
/// Packed snapshot reader.
pub struct PackedReader {
file: File,
state_hashes: HashMap<H256, (u64, u64)>, // len, offset
block_hashes: HashMap<H256, (u64, u64)>, // len, offset
manifest: ManifestData,
}
impl PackedReader {
/// Create a new `PackedReader` for the file at the given path.
/// This will fail if any io errors are encountered or the file
/// is not a valid packed snapshot.
pub fn new(path: &Path) -> Result<Option<Self>, SnapshotError> {
let mut file = File::open(path)?;
let file_len = file.metadata()?.len();
if file_len < 8 {
// ensure we don't seek before beginning.
return Ok(None);
}
file.seek(SeekFrom::End(-8))?;
let mut off_bytes = [0u8; 8];
file.read_exact(&mut off_bytes[..])?;
let manifest_off: u64 =
((off_bytes[7] as u64) << 56) +
((off_bytes[6] as u64) << 48) +
((off_bytes[5] as u64) << 40) +
((off_bytes[4] as u64) << 32) +
((off_bytes[3] as u64) << 24) +
((off_bytes[2] as u64) << 16) +
((off_bytes[1] as u64) << 8) +
(off_bytes[0] as u64);
let manifest_len = file_len - manifest_off - 8;
trace!(target: "snapshot", "loading manifest of length {} from offset {}", manifest_len, manifest_off);
let mut manifest_buf = vec![0; manifest_len as usize];
file.seek(SeekFrom::Start(manifest_off))?;
file.read_exact(&mut manifest_buf)?;
let rlp = Rlp::new(&manifest_buf);
let (start, version) = if rlp.item_count()? == 5 {
(0, 1)
} else {
(1, rlp.val_at(0)?)
};
if version > SNAPSHOT_VERSION {
return Err(SnapshotError::VersionNotSupported(version));
}
let state: Vec<ChunkInfo> = rlp.list_at(0 + start)?;
let blocks: Vec<ChunkInfo> = rlp.list_at(1 + start)?;
let manifest = ManifestData {
version: version,
state_hashes: state.iter().map(|c| c.0).collect(),
block_hashes: blocks.iter().map(|c| c.0).collect(),
state_root: rlp.val_at(2 + start)?,
block_number: rlp.val_at(3 + start)?,
block_hash: rlp.val_at(4 + start)?,
};
Ok(Some(PackedReader {
file: file,
state_hashes: state.into_iter().map(|c| (c.0, (c.1, c.2))).collect(),
block_hashes: blocks.into_iter().map(|c| (c.0, (c.1, c.2))).collect(),
manifest: manifest
}))
}
}
impl SnapshotReader for PackedReader {
fn manifest(&self) -> &ManifestData {
&self.manifest
}
fn chunk(&self, hash: H256) -> io::Result<Bytes> {
let &(len, off) = self.state_hashes.get(&hash).or_else(|| self.block_hashes.get(&hash))
.expect("only chunks in the manifest can be requested; qed");
let mut file = &self.file;
file.seek(SeekFrom::Start(off))?;
let mut buf = vec![0; len as usize];
file.read_exact(&mut buf[..])?;
Ok(buf)
}
}
/// reader for "loose" snapshots
pub struct LooseReader {
dir: PathBuf,
manifest: ManifestData,
}
impl LooseReader {
/// Create a new `LooseReader` which will read the manifest and chunk data from
/// the given directory.
pub fn new(mut dir: PathBuf) -> Result<Self, EthcoreError> {
let mut manifest_buf = Vec::new();
dir.push("MANIFEST");
let mut manifest_file = File::open(&dir)?;
manifest_file.read_to_end(&mut manifest_buf)?;
let manifest = ManifestData::from_rlp(&manifest_buf[..])?;
dir.pop();
Ok(LooseReader { dir, manifest })
}
}
impl SnapshotReader for LooseReader {
fn manifest(&self) -> &ManifestData {
&self.manifest
}
fn chunk(&self, hash: H256) -> io::Result<Bytes> {
let path = self.dir.join(format!("{:x}", hash));
let mut buf = Vec::new();
let mut file = File::open(&path)?;
file.read_to_end(&mut buf)?;
Ok(buf)
}
}
#[cfg(test)]
mod tests {
use tempdir::TempDir;
use hash::keccak;
use snapshot::ManifestData;
use super::{SnapshotWriter, SnapshotReader, PackedWriter, PackedReader, LooseWriter, LooseReader, SNAPSHOT_VERSION};
const STATE_CHUNKS: &'static [&'static [u8]] = &[b"dog", b"cat", b"hello world", b"hi", b"notarealchunk"];
const BLOCK_CHUNKS: &'static [&'static [u8]] = &[b"hello!", b"goodbye!", b"abcdefg", b"hijklmnop", b"qrstuvwxy", b"and", b"z"];
#[test]
fn packed_write_and_read() {
let tempdir = TempDir::new("").unwrap();
let path = tempdir.path().join("packed");
let mut writer = PackedWriter::new(&path).unwrap();
let mut state_hashes = Vec::new();
let mut block_hashes = Vec::new();
for chunk in STATE_CHUNKS {
let hash = keccak(&chunk);
state_hashes.push(hash.clone());
writer.write_state_chunk(hash, chunk).unwrap();
}
for chunk in BLOCK_CHUNKS {
let hash = keccak(&chunk);
block_hashes.push(hash.clone());
writer.write_block_chunk(keccak(&chunk), chunk).unwrap();
}
let manifest = ManifestData {
version: SNAPSHOT_VERSION,
state_hashes: state_hashes,
block_hashes: block_hashes,
state_root: keccak(b"notarealroot"),
block_number: 12345678987654321,
block_hash: keccak(b"notarealblock"),
};
writer.finish(manifest.clone()).unwrap();
let reader = PackedReader::new(&path).unwrap().unwrap();
assert_eq!(reader.manifest(), &manifest);
for hash in manifest.state_hashes.iter().chain(&manifest.block_hashes) {
reader.chunk(hash.clone()).unwrap();
}
}
#[test]
fn loose_write_and_read() {
let tempdir = TempDir::new("").unwrap();
let mut writer = LooseWriter::new(tempdir.path().into()).unwrap();
let mut state_hashes = Vec::new();
let mut block_hashes = Vec::new();
for chunk in STATE_CHUNKS {
let hash = keccak(&chunk);
state_hashes.push(hash.clone());
writer.write_state_chunk(hash, chunk).unwrap();
}
for chunk in BLOCK_CHUNKS {
let hash = keccak(&chunk);
block_hashes.push(hash.clone());
writer.write_block_chunk(keccak(&chunk), chunk).unwrap();
}
let manifest = ManifestData {
version: SNAPSHOT_VERSION,
state_hashes: state_hashes,
block_hashes: block_hashes,
state_root: keccak(b"notarealroot"),
block_number: 12345678987654321,
block_hash: keccak(b"notarealblock)"),
};
writer.finish(manifest.clone()).unwrap();
let reader = LooseReader::new(tempdir.path().into()).unwrap();
assert_eq!(reader.manifest(), &manifest);
for hash in manifest.state_hashes.iter().chain(&manifest.block_hashes) {
reader.chunk(hash.clone()).unwrap();
}
}
}

View File

@@ -1,557 +0,0 @@
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
//! Snapshot creation, restoration, and network service.
//!
//! Documentation of the format can be found at
//! https://wiki.parity.io/Warp-Sync-Snapshot-Format
use std::collections::{HashMap, HashSet};
use std::cmp;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use hash::{keccak, KECCAK_NULL_RLP, KECCAK_EMPTY};
use account_db::{AccountDB, AccountDBMut};
use blockchain::{BlockChain, BlockProvider};
use types::{
ids::BlockId,
header::Header,
errors::{SnapshotError as Error, EthcoreError},
snapshot::Progress,
};
use ethereum_types::{H256, U256};
use hash_db::HashDB;
use keccak_hasher::KeccakHasher;
use snappy;
use bytes::Bytes;
use parking_lot::Mutex;
use journaldb::{self, Algorithm, JournalDB};
use kvdb::{KeyValueDB, DBValue};
use trie::{Trie, TrieMut};
use ethtrie::{TrieDB, TrieDBMut};
use rlp::{RlpStream, Rlp};
use bloom_journal::Bloom;
use num_cpus;
use types::snapshot::ManifestData;
// todo[dvdplm] put back in snapshots once it's extracted
use client_traits::SnapshotWriter;
use super::state_db::StateDB;
use account_state::Account as StateAccount;
use engine::Engine;
use crossbeam_utils::thread;
use rand::{Rng, rngs::OsRng};
pub use self::consensus::*;
pub use self::service::Service;
pub use self::traits::{SnapshotService, SnapshotComponents, Rebuilder};
pub use self::watcher::Watcher;
pub use types::basic_account::BasicAccount;
pub mod io;
pub mod service;
mod account;
mod block;
mod consensus;
mod watcher;
#[cfg(test)]
mod tests;
mod traits;
// Try to have chunks be around 4MB (before compression)
const PREFERRED_CHUNK_SIZE: usize = 4 * 1024 * 1024;
// Maximal chunk size (decompressed)
// Snappy::decompressed_len estimation may sometimes yield results greater
// than PREFERRED_CHUNK_SIZE so allow some threshold here.
const MAX_CHUNK_SIZE: usize = PREFERRED_CHUNK_SIZE / 4 * 5;
// Minimum supported state chunk version.
const MIN_SUPPORTED_STATE_CHUNK_VERSION: u64 = 1;
// current state chunk version.
const STATE_CHUNK_VERSION: u64 = 2;
/// number of snapshot subparts, must be a power of 2 in [1; 256]
const SNAPSHOT_SUBPARTS: usize = 16;
/// Maximum number of snapshot subparts (must be a multiple of `SNAPSHOT_SUBPARTS`)
const MAX_SNAPSHOT_SUBPARTS: usize = 256;
/// Configuration for the Snapshot service
#[derive(Debug, Clone, PartialEq)]
pub struct SnapshotConfiguration {
/// If `true`, no periodic snapshots will be created
pub no_periodic: bool,
/// Number of threads for creating snapshots
pub processing_threads: usize,
}
impl Default for SnapshotConfiguration {
fn default() -> Self {
SnapshotConfiguration {
no_periodic: false,
processing_threads: ::std::cmp::max(1, num_cpus::get_physical() / 2),
}
}
}
/// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer.
pub fn take_snapshot<W: SnapshotWriter + Send>(
chunker: Box<dyn SnapshotComponents>,
chain: &BlockChain,
block_hash: H256,
state_db: &dyn HashDB<KeccakHasher, DBValue>,
writer: W,
p: &Progress,
processing_threads: usize,
) -> Result<(), Error> {
let start_header = chain.block_header_data(&block_hash)
.ok_or_else(|| Error::InvalidStartingBlock(BlockId::Hash(block_hash)))?;
let state_root = start_header.state_root();
let block_number = start_header.number();
info!("Taking snapshot starting at block {}", block_number);
let version = chunker.current_version();
let writer = Mutex::new(writer);
let (state_hashes, block_hashes) = thread::scope(|scope| -> Result<(Vec<H256>, Vec<H256>), Error> {
let writer = &writer;
let block_guard = scope.spawn(move |_| {
chunk_secondary(chunker, chain, block_hash, writer, p)
});
// The number of threads must be between 1 and SNAPSHOT_SUBPARTS
assert!(processing_threads >= 1, "Cannot use less than 1 threads for creating snapshots");
let num_threads: usize = cmp::min(processing_threads, SNAPSHOT_SUBPARTS);
info!(target: "snapshot", "Using {} threads for Snapshot creation.", num_threads);
let mut state_guards = Vec::with_capacity(num_threads as usize);
for thread_idx in 0..num_threads {
let state_guard = scope.spawn(move |_| -> Result<Vec<H256>, Error> {
let mut chunk_hashes = Vec::new();
for part in (thread_idx..SNAPSHOT_SUBPARTS).step_by(num_threads) {
debug!(target: "snapshot", "Chunking part {} in thread {}", part, thread_idx);
let mut hashes = chunk_state(state_db, &state_root, writer, p, Some(part), thread_idx)?;
chunk_hashes.append(&mut hashes);
}
Ok(chunk_hashes)
});
state_guards.push(state_guard);
}
let block_hashes = block_guard.join().expect("Sub-thread never panics; qed")?;
let mut state_hashes = Vec::new();
for guard in state_guards {
let part_state_hashes = guard.join().expect("Sub-thread never panics; qed")?;
state_hashes.extend(part_state_hashes);
}
debug!(target: "snapshot", "Took a snapshot of {} accounts", p.accounts.load(Ordering::SeqCst));
Ok((state_hashes, block_hashes))
}).expect("Sub-thread never panics; qed")?;
info!(target: "snapshot", "produced {} state chunks and {} block chunks.", state_hashes.len(), block_hashes.len());
let manifest_data = ManifestData {
version,
state_hashes,
block_hashes,
state_root,
block_number,
block_hash,
};
writer.into_inner().finish(manifest_data)?;
p.done.store(true, Ordering::SeqCst);
Ok(())
}
/// Create and write out all secondary chunks to disk, returning a vector of all
/// the hashes of secondary chunks created.
///
/// Secondary chunks are engine-specific, but they intend to corroborate the state data
/// in the state chunks.
/// Returns a list of chunk hashes, with the first having the blocks furthest from the genesis.
pub fn chunk_secondary<'a>(
mut chunker: Box<dyn SnapshotComponents>,
chain: &'a BlockChain,
start_hash: H256,
writer: &Mutex<dyn SnapshotWriter + 'a>,
progress: &'a Progress
) -> Result<Vec<H256>, Error> {
let mut chunk_hashes = Vec::new();
let mut snappy_buffer = vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)];
{
let mut chunk_sink = |raw_data: &[u8]| {
let compressed_size = snappy::compress_into(raw_data, &mut snappy_buffer);
let compressed = &snappy_buffer[..compressed_size];
let hash = keccak(&compressed);
let size = compressed.len();
writer.lock().write_block_chunk(hash, compressed)?;
trace!(target: "snapshot", "wrote secondary chunk. hash: {:x}, size: {}, uncompressed size: {}",
hash, size, raw_data.len());
progress.size.fetch_add(size as u64, Ordering::SeqCst);
chunk_hashes.push(hash);
Ok(())
};
chunker.chunk_all(
chain,
start_hash,
&mut chunk_sink,
progress,
PREFERRED_CHUNK_SIZE,
)?;
}
Ok(chunk_hashes)
}
/// State trie chunker.
struct StateChunker<'a> {
hashes: Vec<H256>,
rlps: Vec<Bytes>,
cur_size: usize,
snappy_buffer: Vec<u8>,
writer: &'a Mutex<dyn SnapshotWriter + 'a>,
progress: &'a Progress,
thread_idx: usize,
}
impl<'a> StateChunker<'a> {
// Push a key, value pair to be encoded.
//
// If the buffer is greater than the desired chunk size,
// this will write out the data to disk.
fn push(&mut self, data: Bytes) -> Result<(), Error> {
self.cur_size += data.len();
self.rlps.push(data);
Ok(())
}
// Write out the buffer to disk, pushing the created chunk's hash to
// the list.
fn write_chunk(&mut self) -> Result<(), Error> {
let num_entries = self.rlps.len();
let mut stream = RlpStream::new_list(num_entries);
for rlp in self.rlps.drain(..) {
stream.append_raw(&rlp, 1);
}
let raw_data = stream.out();
let compressed_size = snappy::compress_into(&raw_data, &mut self.snappy_buffer);
let compressed = &self.snappy_buffer[..compressed_size];
let hash = keccak(&compressed);
self.writer.lock().write_state_chunk(hash, compressed)?;
trace!(target: "snapshot", "Thread {} wrote state chunk. size: {}, uncompressed size: {}", self.thread_idx, compressed_size, raw_data.len());
self.progress.accounts.fetch_add(num_entries, Ordering::SeqCst);
self.progress.size.fetch_add(compressed_size as u64, Ordering::SeqCst);
self.hashes.push(hash);
self.cur_size = 0;
Ok(())
}
// Get current chunk size.
fn chunk_size(&self) -> usize {
self.cur_size
}
}
/// Walk the given state database starting from the given root,
/// creating chunks and writing them out.
/// `part` is a number between 0 and 15, which describe which part of
/// the tree should be chunked.
///
/// Returns a list of hashes of chunks created, or any error it may
/// have encountered.
pub fn chunk_state<'a>(
db: &dyn HashDB<KeccakHasher, DBValue>,
root: &H256,
writer: &Mutex<dyn SnapshotWriter + 'a>,
progress: &'a Progress,
part: Option<usize>,
thread_idx: usize,
) -> Result<Vec<H256>, Error> {
let account_trie = TrieDB::new(&db, &root)?;
let mut chunker = StateChunker {
hashes: Vec::new(),
rlps: Vec::new(),
cur_size: 0,
snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)],
writer,
progress,
thread_idx,
};
let mut used_code = HashSet::new();
// account_key here is the address' hash.
let mut account_iter = account_trie.iter()?;
let mut seek_to = None;
if let Some(part) = part {
assert!(part < 16, "Wrong chunk state part number (must be <16) in snapshot creation.");
let part_offset = MAX_SNAPSHOT_SUBPARTS / SNAPSHOT_SUBPARTS;
let mut seek_from = vec![0; 32];
seek_from[0] = (part * part_offset) as u8;
account_iter.seek(&seek_from)?;
// Set the upper-bound, except for the last part
if part < SNAPSHOT_SUBPARTS - 1 {
seek_to = Some(((part + 1) * part_offset) as u8)
}
}
for item in account_iter {
let (account_key, account_data) = item?;
let account_key_hash = H256::from_slice(&account_key);
if seek_to.map_or(false, |seek_to| account_key[0] >= seek_to) {
break;
}
let account = ::rlp::decode(&*account_data)?;
let account_db = AccountDB::from_hash(db, account_key_hash);
let fat_rlps = account::to_fat_rlps(&account_key_hash, &account, &account_db, &mut used_code, PREFERRED_CHUNK_SIZE - chunker.chunk_size(), PREFERRED_CHUNK_SIZE, progress)?;
for (i, fat_rlp) in fat_rlps.into_iter().enumerate() {
if i > 0 {
chunker.write_chunk()?;
}
chunker.push(fat_rlp)?;
}
}
if chunker.cur_size != 0 {
chunker.write_chunk()?;
}
Ok(chunker.hashes)
}
/// Used to rebuild the state trie piece by piece.
pub struct StateRebuilder {
db: Box<dyn JournalDB>,
state_root: H256,
known_code: HashMap<H256, H256>, // code hashes mapped to first account with this code.
missing_code: HashMap<H256, Vec<H256>>, // maps code hashes to lists of accounts missing that code.
bloom: Bloom,
known_storage_roots: HashMap<H256, H256>, // maps account hashes to last known storage root. Only filled for last account per chunk.
}
impl StateRebuilder {
/// Create a new state rebuilder to write into the given backing DB.
pub fn new(db: Arc<dyn KeyValueDB>, pruning: Algorithm) -> Self {
StateRebuilder {
db: journaldb::new(db.clone(), pruning, ::db::COL_STATE),
state_root: KECCAK_NULL_RLP,
known_code: HashMap::new(),
missing_code: HashMap::new(),
bloom: StateDB::load_bloom(&*db),
known_storage_roots: HashMap::new(),
}
}
/// Feed an uncompressed state chunk into the rebuilder.
pub fn feed(&mut self, chunk: &[u8], flag: &AtomicBool) -> Result<(), EthcoreError> {
let rlp = Rlp::new(chunk);
let empty_rlp = StateAccount::new_basic(U256::zero(), U256::zero()).rlp();
let mut pairs = Vec::with_capacity(rlp.item_count()?);
// initialize the pairs vector with empty values so we have slots to write into.
pairs.resize(rlp.item_count()?, (H256::zero(), Vec::new()));
let status = rebuild_accounts(
self.db.as_hash_db_mut(),
rlp,
&mut pairs,
&self.known_code,
&mut self.known_storage_roots,
flag
)?;
for (addr_hash, code_hash) in status.missing_code {
self.missing_code.entry(code_hash).or_insert_with(Vec::new).push(addr_hash);
}
// patch up all missing code. must be done after collecting all new missing code entries.
for (code_hash, code, first_with) in status.new_code {
for addr_hash in self.missing_code.remove(&code_hash).unwrap_or_else(Vec::new) {
let mut db = AccountDBMut::from_hash(self.db.as_hash_db_mut(), addr_hash);
db.emplace(code_hash, hash_db::EMPTY_PREFIX, DBValue::from_slice(&code));
}
self.known_code.insert(code_hash, first_with);
}
let backing = self.db.backing().clone();
// batch trie writes
{
let mut account_trie = if self.state_root != KECCAK_NULL_RLP {
TrieDBMut::from_existing(self.db.as_hash_db_mut(), &mut self.state_root)?
} else {
TrieDBMut::new(self.db.as_hash_db_mut(), &mut self.state_root)
};
for (hash, thin_rlp) in pairs {
if !flag.load(Ordering::SeqCst) { return Err(Error::RestorationAborted.into()) }
if &thin_rlp[..] != &empty_rlp[..] {
self.bloom.set(hash.as_bytes());
}
account_trie.insert(hash.as_bytes(), &thin_rlp)?;
}
}
let bloom_journal = self.bloom.drain_journal();
let mut batch = backing.transaction();
StateDB::commit_bloom(&mut batch, bloom_journal)?;
self.db.inject(&mut batch)?;
backing.write_buffered(batch);
trace!(target: "snapshot", "current state root: {:?}", self.state_root);
Ok(())
}
/// Finalize the restoration. Check for accounts missing code and make a dummy
/// journal entry.
/// Once all chunks have been fed, there should be nothing missing.
pub fn finalize(mut self, era: u64, id: H256) -> Result<Box<dyn JournalDB>, EthcoreError> {
let missing = self.missing_code.keys().cloned().collect::<Vec<_>>();
if !missing.is_empty() { return Err(Error::MissingCode(missing).into()) }
let mut batch = self.db.backing().transaction();
self.db.journal_under(&mut batch, era, &id)?;
self.db.backing().write_buffered(batch);
Ok(self.db)
}
/// Get the state root of the rebuilder.
pub fn state_root(&self) -> H256 { self.state_root }
}
#[derive(Default)]
struct RebuiltStatus {
// new code that's become available. (code_hash, code, addr_hash)
new_code: Vec<(H256, Bytes, H256)>,
missing_code: Vec<(H256, H256)>, // accounts that are missing code.
}
// rebuild a set of accounts and their storage.
// returns a status detailing newly-loaded code and accounts missing code.
fn rebuild_accounts(
db: &mut dyn HashDB<KeccakHasher, DBValue>,
account_fat_rlps: Rlp,
out_chunk: &mut [(H256, Bytes)],
known_code: &HashMap<H256, H256>,
known_storage_roots: &mut HashMap<H256, H256>,
abort_flag: &AtomicBool,
) -> Result<RebuiltStatus, EthcoreError> {
let mut status = RebuiltStatus::default();
for (account_rlp, out) in account_fat_rlps.into_iter().zip(out_chunk.iter_mut()) {
if !abort_flag.load(Ordering::SeqCst) { return Err(Error::RestorationAborted.into()) }
let hash: H256 = account_rlp.val_at(0)?;
let fat_rlp = account_rlp.at(1)?;
let thin_rlp = {
// fill out the storage trie and code while decoding.
let (acc, maybe_code) = {
let mut acct_db = AccountDBMut::from_hash(db, hash);
let storage_root = known_storage_roots.get(&hash).cloned().unwrap_or_default();
account::from_fat_rlp(&mut acct_db, fat_rlp, storage_root)?
};
let code_hash = acc.code_hash.clone();
match maybe_code {
// new inline code
Some(code) => status.new_code.push((code_hash, code, hash)),
None => {
if code_hash != KECCAK_EMPTY {
// see if this code has already been included inline
match known_code.get(&code_hash) {
Some(&first_with) => {
// if so, load it from the database.
let code = AccountDB::from_hash(db, first_with)
.get(&code_hash, hash_db::EMPTY_PREFIX)
.ok_or_else(|| Error::MissingCode(vec![first_with]))?;
// and write it again under a different mangled key
AccountDBMut::from_hash(db, hash).emplace(code_hash, hash_db::EMPTY_PREFIX, code);
}
// if not, queue it up to be filled later
None => status.missing_code.push((hash, code_hash)),
}
}
}
}
::rlp::encode(&acc)
};
*out = (hash, thin_rlp);
}
if let Some(&(ref hash, ref rlp)) = out_chunk.iter().last() {
known_storage_roots.insert(*hash, ::rlp::decode::<BasicAccount>(rlp)?.storage_root);
}
if let Some(&(ref hash, ref rlp)) = out_chunk.iter().next() {
known_storage_roots.insert(*hash, ::rlp::decode::<BasicAccount>(rlp)?.storage_root);
}
Ok(status)
}
/// Proportion of blocks which we will verify `PoW` for.
const POW_VERIFY_RATE: f32 = 0.02;
/// Verify an old block with the given header, engine, blockchain, body. If `always` is set, it will perform
/// the fullest verification possible. If not, it will take a random sample to determine whether it will
/// do heavy or light verification.
pub fn verify_old_block(rng: &mut OsRng, header: &Header, engine: &dyn Engine, chain: &BlockChain, always: bool) -> Result<(), EthcoreError> {
engine.verify_block_basic(header)?;
if always || rng.gen::<f32>() <= POW_VERIFY_RATE {
engine.verify_block_unordered(header)?;
match chain.block_header_data(header.parent_hash()) {
Some(parent) => engine.verify_block_family(header, &parent.decode()?).map_err(Into::into),
None => Ok(()),
}
} else {
Ok(())
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,193 +0,0 @@
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
//! Snapshot test helpers. These are used to build blockchains and state tries
//! which can be queried before and after a full snapshot/restore cycle.
extern crate trie_standardmap;
use std::sync::Arc;
use hash::{KECCAK_NULL_RLP};
use account_db::AccountDBMut;
use types::basic_account::BasicAccount;
use blockchain::{BlockChain, BlockChainDB};
use client::Client;
use client_traits::{ChainInfo, SnapshotClient};
use engine::Engine;
use snapshot::{StateRebuilder};
use snapshot::io::{SnapshotReader, PackedWriter, PackedReader};
use tempdir::TempDir;
use rand::Rng;
use kvdb::DBValue;
use ethereum_types::H256;
use hash_db::HashDB;
use keccak_hasher::KeccakHasher;
use journaldb;
use trie::{TrieMut, Trie};
use ethtrie::{SecTrieDBMut, TrieDB, TrieDBMut};
use self::trie_standardmap::{Alphabet, StandardMap, ValueMode};
use types::errors::EthcoreError;
// the proportion of accounts we will alter each tick.
const ACCOUNT_CHURN: f32 = 0.01;
/// This structure will incrementally alter a state given an rng.
pub struct StateProducer {
state_root: H256,
storage_seed: H256,
}
impl StateProducer {
/// Create a new `StateProducer`.
pub fn new() -> Self {
StateProducer {
state_root: KECCAK_NULL_RLP,
storage_seed: H256::zero(),
}
}
/// Tick the state producer. This alters the state, writing new data into
/// the database.
pub fn tick<R: Rng>(&mut self, rng: &mut R, db: &mut dyn HashDB<KeccakHasher, DBValue>) {
// modify existing accounts.
let mut accounts_to_modify: Vec<_> = {
let trie = TrieDB::new(&db, &self.state_root).unwrap();
let temp = trie.iter().unwrap() // binding required due to complicated lifetime stuff
.filter(|_| rng.gen::<f32>() < ACCOUNT_CHURN)
.map(Result::unwrap)
.map(|(k, v)| (H256::from_slice(&k), v.to_owned()))
.collect();
temp
};
// sweep once to alter storage tries.
for &mut (ref mut address_hash, ref mut account_data) in &mut accounts_to_modify {
let mut account: BasicAccount = ::rlp::decode(&*account_data).expect("error decoding basic account");
let acct_db = AccountDBMut::from_hash(db, *address_hash);
fill_storage(acct_db, &mut account.storage_root, &mut self.storage_seed);
*account_data = DBValue::from_vec(::rlp::encode(&account));
}
// sweep again to alter account trie.
let mut trie = TrieDBMut::from_existing(db, &mut self.state_root).unwrap();
for (address_hash, account_data) in accounts_to_modify {
trie.insert(&address_hash[..], &account_data).unwrap();
}
// add between 0 and 5 new accounts each tick.
let new_accs = rng.gen::<u32>() % 5;
for _ in 0..new_accs {
let address_hash = H256(rng.gen());
let balance: usize = rng.gen();
let nonce: usize = rng.gen();
let acc = account_state::Account::new_basic(balance.into(), nonce.into()).rlp();
trie.insert(&address_hash[..], &acc).unwrap();
}
}
/// Get the current state root.
pub fn state_root(&self) -> H256 {
self.state_root
}
}
/// Fill the storage of an account.
pub fn fill_storage(mut db: AccountDBMut, root: &mut H256, seed: &mut H256) {
let map = StandardMap {
alphabet: Alphabet::All,
min_key: 6,
journal_key: 6,
value_mode: ValueMode::Random,
count: 100,
};
{
let mut trie = if *root == KECCAK_NULL_RLP {
SecTrieDBMut::new(&mut db, root)
} else {
SecTrieDBMut::from_existing(&mut db, root).unwrap()
};
for (k, v) in map.make_with(&mut seed.to_fixed_bytes()) {
trie.insert(&k, &v).unwrap();
}
}
}
/// Take a snapshot from the given client into a temporary file.
/// Return a snapshot reader for it.
pub fn snap(client: &Client) -> (Box<dyn SnapshotReader>, TempDir) {
use types::ids::BlockId;
let tempdir = TempDir::new("").unwrap();
let path = tempdir.path().join("file");
let writer = PackedWriter::new(&path).unwrap();
let progress = Default::default();
let hash = client.chain_info().best_block_hash;
client.take_snapshot(writer, BlockId::Hash(hash), &progress).unwrap();
let reader = PackedReader::new(&path).unwrap().unwrap();
(Box::new(reader), tempdir)
}
/// Restore a snapshot into a given database. This will read chunks from the given reader
/// write into the given database.
pub fn restore(
db: Arc<dyn BlockChainDB>,
engine: &dyn Engine,
reader: &dyn SnapshotReader,
genesis: &[u8],
) -> Result<(), EthcoreError> {
use std::sync::atomic::AtomicBool;
let flag = AtomicBool::new(true);
let chunker = crate::snapshot::chunker(engine.snapshot_mode()).expect("the engine used here supports snapshots");
let manifest = reader.manifest();
let mut state = StateRebuilder::new(db.key_value().clone(), journaldb::Algorithm::Archive);
let mut secondary = {
let chain = BlockChain::new(Default::default(), genesis, db.clone());
chunker.rebuilder(chain, db, manifest).unwrap()
};
let mut snappy_buffer = Vec::new();
trace!(target: "snapshot", "restoring state");
for state_chunk_hash in manifest.state_hashes.iter() {
trace!(target: "snapshot", "state chunk hash: {}", state_chunk_hash);
let chunk = reader.chunk(*state_chunk_hash).unwrap();
let len = snappy::decompress_into(&chunk, &mut snappy_buffer).unwrap();
state.feed(&snappy_buffer[..len], &flag)?;
}
trace!(target: "snapshot", "restoring secondary");
for chunk_hash in manifest.block_hashes.iter() {
let chunk = reader.chunk(*chunk_hash).unwrap();
let len = snappy::decompress_into(&chunk, &mut snappy_buffer).unwrap();
secondary.feed(&snappy_buffer[..len], engine, &flag)?;
}
trace!(target: "snapshot", "finalizing");
state.finalize(manifest.block_number, manifest.block_hash)?;
secondary.finalize()
}

View File

@@ -1,40 +0,0 @@
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
//! Snapshot tests.
mod proof_of_work;
mod proof_of_authority;
mod state;
mod service;
pub mod helpers;
use super::ManifestData;
#[test]
fn manifest_rlp() {
let manifest = ManifestData {
version: 2,
block_hashes: Vec::new(),
state_hashes: Vec::new(),
block_number: 1234567,
state_root: Default::default(),
block_hash: Default::default(),
};
let raw = manifest.clone().into_rlp();
assert_eq!(ManifestData::from_rlp(&raw).unwrap(), manifest);
}

View File

@@ -1,265 +0,0 @@
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
//! PoA block chunker and rebuilder tests.
use std::cell::RefCell;
use std::sync::Arc;
use std::str::FromStr;
use accounts::AccountProvider;
use client::Client;
use client_traits::{BlockChainClient, ChainInfo};
use ethkey::Secret;
use snapshot::tests::helpers as snapshot_helpers;
use spec::Spec;
use test_helpers::generate_dummy_client_with_spec;
use types::transaction::{Transaction, Action, SignedTransaction};
use tempdir::TempDir;
use ethereum_types::Address;
use test_helpers;
use_contract!(test_validator_set, "res/contracts/test_validator_set.json");
const PASS: &'static str = "";
const TRANSITION_BLOCK_1: usize = 2; // block at which the contract becomes activated.
const TRANSITION_BLOCK_2: usize = 10; // block at which the second contract activates.
macro_rules! secret {
($e: expr) => { Secret::from($crate::hash::keccak($e).0) }
}
lazy_static! {
// contract addresses.
static ref CONTRACT_ADDR_1: Address = Address::from_str("0000000000000000000000000000000000000005").unwrap();
static ref CONTRACT_ADDR_2: Address = Address::from_str("0000000000000000000000000000000000000006").unwrap();
// secret: `keccak(1)`, and initial validator.
static ref RICH_ADDR: Address = Address::from_str("7d577a597b2742b498cb5cf0c26cdcd726d39e6e").unwrap();
// rich address' secret.
static ref RICH_SECRET: Secret = secret!("1");
}
/// Contract code used here: https://gist.github.com/anonymous/2a43783647e0f0dfcc359bd6fd81d6d9
/// Account with secrets keccak("1") is initially the validator.
/// Transitions to the contract at block 2, initially same validator set.
/// Create a new Spec with AuthorityRound which uses a contract at address 5 to determine the current validators using `getValidators`.
/// `test_validator_set::ValidatorSet` provides a native wrapper for the ABi.
fn spec_fixed_to_contract() -> Spec {
let data = include_bytes!("test_validator_contract.json");
let tempdir = TempDir::new("").unwrap();
Spec::load(&tempdir.path(), &data[..]).unwrap()
}
// creates an account provider, filling it with accounts from all the given
// secrets and password `PASS`.
// returns addresses corresponding to secrets.
fn make_accounts(secrets: &[Secret]) -> (Arc<AccountProvider>, Vec<Address>) {
let provider = AccountProvider::transient_provider();
let addrs = secrets.iter()
.cloned()
.map(|s| provider.insert_account(s, &PASS.into()).unwrap())
.collect();
(Arc::new(provider), addrs)
}
// validator transition. block number and new validators. must be after `TRANSITION_BLOCK`.
// all addresses in the set must be in the account provider.
enum Transition {
// manual transition via transaction
Manual(usize, Vec<Address>),
// implicit transition via multi-set
Implicit(usize, Vec<Address>),
}
// create a chain with the given transitions and some blocks beyond that transition.
fn make_chain(accounts: Arc<AccountProvider>, blocks_beyond: usize, transitions: Vec<Transition>) -> Arc<Client> {
let client = generate_dummy_client_with_spec(spec_fixed_to_contract);
let mut cur_signers = vec![*RICH_ADDR];
{
let engine = client.engine();
engine.register_client(Arc::downgrade(&client) as _);
}
{
// push a block with given number, signed by one of the signers, with given transactions.
let push_block = |signers: &[Address], n, txs: Vec<SignedTransaction>| {
use miner::{self, MinerService};
let idx = n as usize % signers.len();
trace!(target: "snapshot", "Pushing block #{}, {} txs, author={}",
n, txs.len(), signers[idx]);
let signer = Box::new((accounts.clone(), signers[idx], PASS.into()));
client.miner().set_author(miner::Author::Sealer(signer));
client.miner().import_external_transactions(&*client,
txs.into_iter().map(Into::into).collect());
client.engine().step();
assert_eq!(client.chain_info().best_block_number, n);
};
// execution callback for native contract: push transaction to be sealed.
let nonce = RefCell::new(client.engine().account_start_nonce(0));
// create useless transactions vector so we don't have to dig in
// and force sealing.
let make_useless_transactions = || {
let mut nonce = nonce.borrow_mut();
let transaction = Transaction {
nonce: *nonce,
gas_price: 1.into(),
gas: 21_000.into(),
action: Action::Call(Address::zero()),
value: 1.into(),
data: Vec::new(),
}.sign(&*RICH_SECRET, client.signing_chain_id());
*nonce = *nonce + 1;
vec![transaction]
};
// apply all transitions.
for transition in transitions {
let (num, manual, new_set) = match transition {
Transition::Manual(num, new_set) => (num, true, new_set),
Transition::Implicit(num, new_set) => (num, false, new_set),
};
if num < TRANSITION_BLOCK_1 {
panic!("Bad test: issued epoch change before transition to contract.");
}
if (num as u64) < client.chain_info().best_block_number {
panic!("Bad test: issued epoch change before previous transition finalized.");
}
for number in client.chain_info().best_block_number + 1 .. num as u64 {
push_block(&cur_signers, number, make_useless_transactions());
}
let pending = if manual {
trace!(target: "snapshot", "applying set transition at block #{}", num);
let address = match num >= TRANSITION_BLOCK_2 {
true => &CONTRACT_ADDR_2 as &Address,
false => &CONTRACT_ADDR_1 as &Address,
};
let data = test_validator_set::functions::set_validators::encode_input(new_set.clone());
let mut nonce = nonce.borrow_mut();
let transaction = Transaction {
nonce: *nonce,
gas_price: 0.into(),
gas: 1_000_000.into(),
action: Action::Call(*address),
value: 0.into(),
data,
}.sign(&*RICH_SECRET, client.signing_chain_id());
*nonce = *nonce + 1;
vec![transaction]
} else {
make_useless_transactions()
};
// push transition block.
push_block(&cur_signers, num as u64, pending);
// push blocks to finalize transition
for finalization_count in 1.. {
if finalization_count * 2 > cur_signers.len() { break }
push_block(&cur_signers, (num + finalization_count) as u64, make_useless_transactions());
}
cur_signers = new_set;
}
// make blocks beyond.
for number in (client.chain_info().best_block_number..).take(blocks_beyond) {
push_block(&cur_signers, number + 1, make_useless_transactions());
}
}
client
}
#[test]
fn fixed_to_contract_only() {
let (provider, addrs) = make_accounts(&[
RICH_SECRET.clone(),
secret!("foo"),
secret!("bar"),
secret!("test"),
secret!("signer"),
secret!("crypto"),
secret!("wizard"),
secret!("dog42"),
]);
assert!(provider.has_account(*RICH_ADDR));
let client = make_chain(provider, 3, vec![
Transition::Manual(3, vec![addrs[2], addrs[3], addrs[5], addrs[7]]),
Transition::Manual(6, vec![addrs[0], addrs[1], addrs[4], addrs[6]]),
]);
// 6, 7, 8 prove finality for transition at 6.
// 3 beyond gets us to 11.
assert_eq!(client.chain_info().best_block_number, 11);
let (reader, _tempdir) = snapshot_helpers::snap(&*client);
let new_db = test_helpers::new_db();
let spec = spec_fixed_to_contract();
// ensure fresh engine's step matches.
for _ in 0..11 { spec.engine.step() }
snapshot_helpers::restore(new_db, &*spec.engine, &*reader, &spec.genesis_block()).unwrap();
}
#[test]
fn fixed_to_contract_to_contract() {
let (provider, addrs) = make_accounts(&[
RICH_SECRET.clone(),
secret!("foo"),
secret!("bar"),
secret!("test"),
secret!("signer"),
secret!("crypto"),
secret!("wizard"),
secret!("dog42"),
]);
assert!(provider.has_account(*RICH_ADDR));
let client = make_chain(provider, 3, vec![
Transition::Manual(3, vec![addrs[2], addrs[3], addrs[5], addrs[7]]),
Transition::Manual(6, vec![addrs[0], addrs[1], addrs[4], addrs[6]]),
Transition::Implicit(10, vec![addrs[0]]),
Transition::Manual(13, vec![addrs[2], addrs[4], addrs[6], addrs[7]]),
]);
assert_eq!(client.chain_info().best_block_number, 16);
let (reader, _tempdir) = snapshot_helpers::snap(&*client);
let new_db = test_helpers::new_db();
let spec = spec_fixed_to_contract();
for _ in 0..16 { spec.engine.step() }
snapshot_helpers::restore(new_db, &*spec.engine, &*reader, &spec.genesis_block()).unwrap();
}

View File

@@ -1,155 +0,0 @@
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
//! PoW block chunker and rebuilder tests.
use std::sync::atomic::AtomicBool;
use tempdir::TempDir;
use types::{
errors::EthcoreError as Error,
engines::ForkChoice,
snapshot::Progress,
};
use blockchain::generator::{BlockGenerator, BlockBuilder};
use blockchain::{BlockChain, ExtrasInsert};
use client_traits::SnapshotWriter;
use snapshot::{chunk_secondary, Error as SnapshotError, SnapshotComponents};
use snapshot::io::{PackedReader, PackedWriter, SnapshotReader};
use parking_lot::Mutex;
use snappy;
use kvdb::DBTransaction;
use test_helpers;
use spec;
const SNAPSHOT_MODE: ::snapshot::PowSnapshot = ::snapshot::PowSnapshot { blocks: 30000, max_restore_blocks: 30000 };
fn chunk_and_restore(amount: u64) {
let genesis = BlockBuilder::genesis();
let rest = genesis.add_blocks(amount as usize);
let generator = BlockGenerator::new(vec![rest]);
let genesis = genesis.last();
let engine = spec::new_test().engine;
let tempdir = TempDir::new("").unwrap();
let snapshot_path = tempdir.path().join("SNAP");
let old_db = test_helpers::new_db();
let bc = BlockChain::new(Default::default(), genesis.encoded().raw(), old_db.clone());
// build the blockchain.
let mut batch = DBTransaction::new();
for block in generator {
bc.insert_block(&mut batch, block.encoded(), vec![], ExtrasInsert {
fork_choice: ForkChoice::New,
is_finalized: false,
});
bc.commit();
}
old_db.key_value().write(batch).unwrap();
let best_hash = bc.best_block_hash();
// snapshot it.
let writer = Mutex::new(PackedWriter::new(&snapshot_path).unwrap());
let block_hashes = chunk_secondary(
Box::new(SNAPSHOT_MODE),
&bc,
best_hash,
&writer,
&Progress::default()
).unwrap();
let manifest = ::snapshot::ManifestData {
version: 2,
state_hashes: Vec::new(),
block_hashes: block_hashes,
state_root: ::hash::KECCAK_NULL_RLP,
block_number: amount,
block_hash: best_hash,
};
writer.into_inner().finish(manifest.clone()).unwrap();
// restore it.
let new_db = test_helpers::new_db();
let new_chain = BlockChain::new(Default::default(), genesis.encoded().raw(), new_db.clone());
let mut rebuilder = SNAPSHOT_MODE.rebuilder(new_chain, new_db.clone(), &manifest).unwrap();
let reader = PackedReader::new(&snapshot_path).unwrap().unwrap();
let flag = AtomicBool::new(true);
for chunk_hash in &reader.manifest().block_hashes {
let compressed = reader.chunk(*chunk_hash).unwrap();
let chunk = snappy::decompress(&compressed).unwrap();
rebuilder.feed(&chunk, engine.as_ref(), &flag).unwrap();
}
rebuilder.finalize().unwrap();
drop(rebuilder);
// and test it.
let new_chain = BlockChain::new(Default::default(), genesis.encoded().raw(), new_db);
assert_eq!(new_chain.best_block_hash(), best_hash);
}
#[test]
fn chunk_and_restore_500() {
chunk_and_restore(500)
}
#[test]
fn chunk_and_restore_4k() {
chunk_and_restore(4000)
}
#[test]
fn checks_flag() {
use rlp::RlpStream;
use ethereum_types::H256;
let mut stream = RlpStream::new_list(5);
stream.append(&100u64)
.append(&H256::zero())
.append(&(!0u64));
stream.append_empty_data().append_empty_data();
let genesis = BlockBuilder::genesis();
let chunk = stream.out();
let db = test_helpers::new_db();
let engine = spec::new_test().engine;
let chain = BlockChain::new(Default::default(), genesis.last().encoded().raw(), db.clone());
let manifest = ::snapshot::ManifestData {
version: 2,
state_hashes: Vec::new(),
block_hashes: Vec::new(),
state_root: ::hash::KECCAK_NULL_RLP,
block_number: 102,
block_hash: H256::zero(),
};
let mut rebuilder = SNAPSHOT_MODE.rebuilder(chain, db.clone(), &manifest).unwrap();
match rebuilder.feed(&chunk, engine.as_ref(), &AtomicBool::new(false)) {
Err(Error::Snapshot(SnapshotError::RestorationAborted)) => {}
_ => panic!("Wrong result on abort flag set")
}
}

View File

@@ -1,353 +0,0 @@
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
//! Tests for the snapshot service.
use std::fs;
use std::sync::Arc;
use tempdir::TempDir;
use blockchain::BlockProvider;
use client::{Client, ClientConfig};
use client_traits::{BlockInfo, ImportBlock, SnapshotWriter};
use types::{
ids::BlockId,
snapshot::Progress,
verification::Unverified,
snapshot::{ManifestData, RestorationStatus},
};
use snapshot::io::{PackedReader, PackedWriter, SnapshotReader};
use snapshot::service::{Service, ServiceParams};
use snapshot::{chunk_state, chunk_secondary, SnapshotService};
use spec;
use test_helpers::{new_db, new_temp_db, generate_dummy_client_with_spec_and_data, restoration_db_handler};
use parking_lot::Mutex;
use io::IoChannel;
use kvdb_rocksdb::DatabaseConfig;
#[test]
fn restored_is_equivalent() {
let _ = ::env_logger::try_init();
const NUM_BLOCKS: u32 = 400;
const TX_PER: usize = 5;
let gas_prices = vec![1.into(), 2.into(), 3.into(), 999.into()];
let client = generate_dummy_client_with_spec_and_data(spec::new_null, NUM_BLOCKS, TX_PER, &gas_prices);
let tempdir = TempDir::new("").unwrap();
let client_db = tempdir.path().join("client_db");
let path = tempdir.path().join("snapshot");
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let restoration = restoration_db_handler(db_config);
let blockchain_db = restoration.open(&client_db).unwrap();
let spec = spec::new_null();
let client2 = Client::new(
Default::default(),
&spec,
blockchain_db,
Arc::new(::miner::Miner::new_for_tests(&spec, None)),
IoChannel::disconnected(),
).unwrap();
let service_params = ServiceParams {
engine: spec.engine.clone(),
genesis_block: spec.genesis_block(),
restoration_db_handler: restoration,
pruning: ::journaldb::Algorithm::Archive,
channel: IoChannel::disconnected(),
snapshot_root: path,
client: client2.clone(),
};
let service = Service::new(service_params).unwrap();
service.take_snapshot(&*client, NUM_BLOCKS as u64).unwrap();
let manifest = service.manifest().unwrap();
service.init_restore(manifest.clone(), true).unwrap();
assert!(service.init_restore(manifest.clone(), true).is_ok());
for hash in manifest.state_hashes {
let chunk = service.chunk(hash).unwrap();
service.feed_state_chunk(hash, &chunk);
}
for hash in manifest.block_hashes {
let chunk = service.chunk(hash).unwrap();
service.feed_block_chunk(hash, &chunk);
}
assert_eq!(service.status(), RestorationStatus::Inactive);
for x in 0..NUM_BLOCKS {
let block1 = client.block(BlockId::Number(x as u64)).unwrap();
let block2 = client2.block(BlockId::Number(x as u64)).unwrap();
assert_eq!(block1, block2);
}
}
// on windows the guards deletion (remove_dir_all)
// is not happening (error directory is not empty).
// So the test is disabled until windows api behave.
#[cfg(not(target_os = "windows"))]
#[test]
fn guards_delete_folders() {
let gas_prices = vec![1.into(), 2.into(), 3.into(), 999.into()];
let client = generate_dummy_client_with_spec_and_data(spec::new_null, 400, 5, &gas_prices);
let spec = spec::new_null();
let tempdir = TempDir::new("").unwrap();
let service_params = ServiceParams {
engine: spec.engine.clone(),
genesis_block: spec.genesis_block(),
restoration_db_handler: restoration_db_handler(DatabaseConfig::with_columns(::db::NUM_COLUMNS)),
pruning: ::journaldb::Algorithm::Archive,
channel: IoChannel::disconnected(),
snapshot_root: tempdir.path().to_owned(),
client: client,
};
let service = Service::new(service_params).unwrap();
let path = tempdir.path().join("restoration");
let manifest = ManifestData {
version: 2,
state_hashes: vec![],
block_hashes: vec![],
block_number: 0,
block_hash: Default::default(),
state_root: Default::default(),
};
service.init_restore(manifest.clone(), true).unwrap();
assert!(path.exists());
// The `db` folder should have been deleted,
// while the `temp` one kept
service.abort_restore();
assert!(!path.join("db").exists());
assert!(path.join("temp").exists());
service.init_restore(manifest.clone(), true).unwrap();
assert!(path.exists());
drop(service);
assert!(!path.join("db").exists());
assert!(path.join("temp").exists());
}
#[test]
fn keep_ancient_blocks() {
let _ = ::env_logger::try_init();
// Test variables
const NUM_BLOCKS: u64 = 500;
const NUM_SNAPSHOT_BLOCKS: u64 = 300;
const SNAPSHOT_MODE: ::snapshot::PowSnapshot = ::snapshot::PowSnapshot { blocks: NUM_SNAPSHOT_BLOCKS, max_restore_blocks: NUM_SNAPSHOT_BLOCKS };
// Temporary folders
let tempdir = TempDir::new("").unwrap();
let snapshot_path = tempdir.path().join("SNAP");
// Generate blocks
let gas_prices = vec![1.into(), 2.into(), 3.into(), 999.into()];
let spec_f = spec::new_null;
let spec = spec_f();
let client = generate_dummy_client_with_spec_and_data(spec_f, NUM_BLOCKS as u32, 5, &gas_prices);
let bc = client.chain();
// Create the Snapshot
let best_hash = bc.best_block_hash();
let writer = Mutex::new(PackedWriter::new(&snapshot_path).unwrap());
let block_hashes = chunk_secondary(
Box::new(SNAPSHOT_MODE),
&bc,
best_hash,
&writer,
&Progress::default()
).unwrap();
let state_db = client.state_db().journal_db().boxed_clone();
let start_header = bc.block_header_data(&best_hash).unwrap();
let state_root = start_header.state_root();
let state_hashes = chunk_state(
state_db.as_hash_db(),
&state_root,
&writer,
&Progress::default(),
None,
0
).unwrap();
let manifest = ::snapshot::ManifestData {
version: 2,
state_hashes,
state_root,
block_hashes,
block_number: NUM_BLOCKS,
block_hash: best_hash,
};
writer.into_inner().finish(manifest.clone()).unwrap();
// Initialize the Client
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let client_db = new_temp_db(&tempdir.path());
let client2 = Client::new(
ClientConfig::default(),
&spec,
client_db,
Arc::new(::miner::Miner::new_for_tests(&spec, None)),
IoChannel::disconnected(),
).unwrap();
// Add some ancient blocks
for block_number in 1..50 {
let block_hash = bc.block_hash(block_number).unwrap();
let block = bc.block(&block_hash).unwrap();
client2.import_block(Unverified::from_rlp(block.into_inner()).unwrap()).unwrap();
}
client2.flush_queue();
// Restore the Snapshot
let reader = PackedReader::new(&snapshot_path).unwrap().unwrap();
let service_params = ServiceParams {
engine: spec.engine.clone(),
genesis_block: spec.genesis_block(),
restoration_db_handler: restoration_db_handler(db_config),
pruning: ::journaldb::Algorithm::Archive,
channel: IoChannel::disconnected(),
snapshot_root: tempdir.path().to_owned(),
client: client2.clone(),
};
let service = Service::new(service_params).unwrap();
service.init_restore(manifest.clone(), false).unwrap();
for hash in &manifest.block_hashes {
let chunk = reader.chunk(*hash).unwrap();
service.feed_block_chunk(*hash, &chunk);
}
for hash in &manifest.state_hashes {
let chunk = reader.chunk(*hash).unwrap();
service.feed_state_chunk(*hash, &chunk);
}
match service.status() {
RestorationStatus::Inactive => (),
RestorationStatus::Failed => panic!("Snapshot Restoration has failed."),
RestorationStatus::Ongoing { .. } => panic!("Snapshot Restoration should be done."),
_ => panic!("Invalid Snapshot Service status."),
}
// Check that the latest block number is the right one
assert_eq!(client2.block(BlockId::Latest).unwrap().number(), NUM_BLOCKS as u64);
// Check that we have blocks in [NUM_BLOCKS - NUM_SNAPSHOT_BLOCKS + 1 ; NUM_BLOCKS]
// but none before
assert!(client2.block(BlockId::Number(NUM_BLOCKS - NUM_SNAPSHOT_BLOCKS + 1)).is_some());
assert!(client2.block(BlockId::Number(100)).is_none());
// Check that the first 50 blocks have been migrated
for block_number in 1..49 {
assert!(client2.block(BlockId::Number(block_number)).is_some());
}
}
#[test]
fn recover_aborted_recovery() {
let _ = ::env_logger::try_init();
const NUM_BLOCKS: u32 = 400;
let gas_prices = vec![1.into(), 2.into(), 3.into(), 999.into()];
let client = generate_dummy_client_with_spec_and_data(spec::new_null, NUM_BLOCKS, 5, &gas_prices);
let spec = spec::new_null();
let tempdir = TempDir::new("").unwrap();
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let client_db = new_db();
let client2 = Client::new(
Default::default(),
&spec,
client_db,
Arc::new(::miner::Miner::new_for_tests(&spec, None)),
IoChannel::disconnected(),
).unwrap();
let service_params = ServiceParams {
engine: spec.engine.clone(),
genesis_block: spec.genesis_block(),
restoration_db_handler: restoration_db_handler(db_config),
pruning: ::journaldb::Algorithm::Archive,
channel: IoChannel::disconnected(),
snapshot_root: tempdir.path().to_owned(),
client: client2.clone(),
};
let service = Service::new(service_params).unwrap();
service.take_snapshot(&*client, NUM_BLOCKS as u64).unwrap();
let manifest = service.manifest().unwrap();
service.init_restore(manifest.clone(), true).unwrap();
// Restore only the state chunks
for hash in &manifest.state_hashes {
let chunk = service.chunk(*hash).unwrap();
service.feed_state_chunk(*hash, &chunk);
}
match service.status() {
RestorationStatus::Ongoing { block_chunks_done, state_chunks_done, .. } => {
assert_eq!(state_chunks_done, manifest.state_hashes.len() as u32);
assert_eq!(block_chunks_done, 0);
},
e => panic!("Snapshot restoration must be ongoing ; {:?}", e),
}
// Abort the restore...
service.abort_restore();
// And try again!
service.init_restore(manifest.clone(), true).unwrap();
match service.status() {
RestorationStatus::Ongoing { block_chunks_done, state_chunks_done, .. } => {
assert_eq!(state_chunks_done, manifest.state_hashes.len() as u32);
assert_eq!(block_chunks_done, 0);
},
e => panic!("Snapshot restoration must be ongoing ; {:?}", e),
}
// Remove the snapshot directory, and restart the restoration
// It shouldn't have restored any previous blocks
fs::remove_dir_all(tempdir.path()).unwrap();
// And try again!
service.init_restore(manifest.clone(), true).unwrap();
match service.status() {
RestorationStatus::Ongoing { block_chunks_done, state_chunks_done, .. } => {
assert_eq!(block_chunks_done, 0);
assert_eq!(state_chunks_done, 0);
},
_ => panic!("Snapshot restoration must be ongoing"),
}
}

View File

@@ -1,208 +0,0 @@
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
//! State snapshotting tests.
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use hash::{KECCAK_NULL_RLP, keccak};
use types::{
basic_account::BasicAccount,
errors::EthcoreError as Error,
};
use client_traits::SnapshotWriter;
use snapshot::account;
use snapshot::{chunk_state, Error as SnapshotError, Progress, StateRebuilder, SNAPSHOT_SUBPARTS};
use snapshot::io::{PackedReader, PackedWriter, SnapshotReader};
use super::helpers::StateProducer;
use rand::SeedableRng;
use rand_xorshift::XorShiftRng;
use ethereum_types::H256;
use journaldb::{self, Algorithm};
use kvdb_rocksdb::{Database, DatabaseConfig};
use parking_lot::Mutex;
use tempdir::TempDir;
const RNG_SEED: [u8; 16] = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16];
#[test]
fn snap_and_restore() {
use hash_db::{HashDB, EMPTY_PREFIX};
let mut producer = StateProducer::new();
let mut rng = XorShiftRng::from_seed(RNG_SEED);
let mut old_db = journaldb::new_memory_db();
let db_cfg = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
for _ in 0..150 {
producer.tick(&mut rng, &mut old_db);
}
let tempdir = TempDir::new("").unwrap();
let snap_file = tempdir.path().join("SNAP");
let state_root = producer.state_root();
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());
let mut state_hashes = Vec::new();
for part in 0..SNAPSHOT_SUBPARTS {
let mut hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), Some(part), 0).unwrap();
state_hashes.append(&mut hashes);
}
writer.into_inner().finish(::snapshot::ManifestData {
version: 2,
state_hashes: state_hashes,
block_hashes: Vec::new(),
state_root: state_root,
block_number: 1000,
block_hash: H256::zero(),
}).unwrap();
let db_path = tempdir.path().join("db");
let db = {
let new_db = Arc::new(Database::open(&db_cfg, &db_path.to_string_lossy()).unwrap());
let mut rebuilder = StateRebuilder::new(new_db.clone(), Algorithm::OverlayRecent);
let reader = PackedReader::new(&snap_file).unwrap().unwrap();
let flag = AtomicBool::new(true);
for chunk_hash in &reader.manifest().state_hashes {
let raw = reader.chunk(*chunk_hash).unwrap();
let chunk = ::snappy::decompress(&raw).unwrap();
rebuilder.feed(&chunk, &flag).unwrap();
}
assert_eq!(rebuilder.state_root(), state_root);
rebuilder.finalize(1000, H256::zero()).unwrap();
new_db
};
let new_db = journaldb::new(db, Algorithm::OverlayRecent, ::db::COL_STATE);
assert_eq!(new_db.earliest_era(), Some(1000));
let keys = old_db.keys();
for key in keys.keys() {
assert_eq!(old_db.get(&key, EMPTY_PREFIX).unwrap(), new_db.as_hash_db().get(&key, EMPTY_PREFIX).unwrap());
}
}
#[test]
fn get_code_from_prev_chunk() {
use std::collections::HashSet;
use rlp::RlpStream;
use ethereum_types::{H256, U256};
use hash_db::{HashDB, EMPTY_PREFIX};
use account_db::{AccountDBMut, AccountDB};
let code = b"this is definitely code";
let mut used_code = HashSet::new();
let mut acc_stream = RlpStream::new_list(4);
acc_stream.append(&U256::default())
.append(&U256::default())
.append(&KECCAK_NULL_RLP)
.append(&keccak(code));
let (h1, h2) = (H256::random(), H256::random());
// two accounts with the same code, one per chunk.
// first one will have code inlined,
// second will just have its hash.
let thin_rlp = acc_stream.out();
let acc: BasicAccount = ::rlp::decode(&thin_rlp).expect("error decoding basic account");
let mut make_chunk = |acc, hash| {
let mut db = journaldb::new_memory_db();
AccountDBMut::from_hash(&mut db, hash).insert(EMPTY_PREFIX, &code[..]);
let p = Progress::default();
let fat_rlp = account::to_fat_rlps(&hash, &acc, &AccountDB::from_hash(&db, hash), &mut used_code, usize::max_value(), usize::max_value(), &p).unwrap();
let mut stream = RlpStream::new_list(1);
stream.append_raw(&fat_rlp[0], 1);
stream.out()
};
let chunk1 = make_chunk(acc.clone(), h1);
let chunk2 = make_chunk(acc, h2);
let tempdir = TempDir::new("").unwrap();
let db_cfg = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let new_db = Arc::new(Database::open(&db_cfg, tempdir.path().to_str().unwrap()).unwrap());
{
let mut rebuilder = StateRebuilder::new(new_db.clone(), Algorithm::OverlayRecent);
let flag = AtomicBool::new(true);
rebuilder.feed(&chunk1, &flag).unwrap();
rebuilder.feed(&chunk2, &flag).unwrap();
rebuilder.finalize(1000, H256::random()).unwrap();
}
let state_db = journaldb::new(new_db, Algorithm::OverlayRecent, ::db::COL_STATE);
assert_eq!(state_db.earliest_era(), Some(1000));
}
#[test]
fn checks_flag() {
let mut producer = StateProducer::new();
let mut rng = XorShiftRng::from_seed(RNG_SEED);
let mut old_db = journaldb::new_memory_db();
let db_cfg = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
for _ in 0..10 {
producer.tick(&mut rng, &mut old_db);
}
let tempdir = TempDir::new("").unwrap();
let snap_file = tempdir.path().join("SNAP");
let state_root = producer.state_root();
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());
let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), None, 0).unwrap();
writer.into_inner().finish(::snapshot::ManifestData {
version: 2,
state_hashes,
block_hashes: Vec::new(),
state_root,
block_number: 0,
block_hash: H256::zero(),
}).unwrap();
let tempdir = TempDir::new("").unwrap();
let db_path = tempdir.path().join("db");
{
let new_db = Arc::new(Database::open(&db_cfg, &db_path.to_string_lossy()).unwrap());
let mut rebuilder = StateRebuilder::new(new_db.clone(), Algorithm::OverlayRecent);
let reader = PackedReader::new(&snap_file).unwrap().unwrap();
let flag = AtomicBool::new(false);
for chunk_hash in &reader.manifest().state_hashes {
let raw = reader.chunk(*chunk_hash).unwrap();
let chunk = ::snappy::decompress(&raw).unwrap();
match rebuilder.feed(&chunk, &flag) {
Err(Error::Snapshot(SnapshotError::RestorationAborted)) => {},
_ => panic!("unexpected result when feeding with flag off"),
}
}
}
}

View File

@@ -1,55 +0,0 @@
{
"name": "TestValidatorContract",
"engine": {
"authorityRound": {
"params": {
"stepDuration": 1,
"startStep": 0,
"validators": {
"multi": {
"0": { "list": ["0x7d577a597b2742b498cb5cf0c26cdcd726d39e6e"] },
"2": { "contract": "0x0000000000000000000000000000000000000005" },
"10": { "contract": "0x0000000000000000000000000000000000000006" }
}
}
}
}
},
"params": {
"gasLimitBoundDivisor": "0x0400",
"accountStartNonce": "0x0",
"maximumExtraDataSize": "0x20",
"minGasLimit": "0x1388",
"networkID" : "0x69"
},
"genesis": {
"seal": {
"authorityRound": {
"step": "0x0",
"signature": "0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"
}
},
"difficulty": "0x20000",
"author": "0x0000000000000000000000000000000000000000",
"timestamp": "0x00",
"parentHash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"extraData": "0x",
"gasLimit": "0x2fefd8"
},
"accounts": {
"0000000000000000000000000000000000000001": { "balance": "1", "builtin": { "name": "ecrecover", "pricing": { "linear": { "base": 3000, "word": 0 } } } },
"0000000000000000000000000000000000000002": { "balance": "1", "builtin": { "name": "sha256", "pricing": { "linear": { "base": 60, "word": 12 } } } },
"0000000000000000000000000000000000000003": { "balance": "1", "builtin": { "name": "ripemd160", "pricing": { "linear": { "base": 600, "word": 120 } } } },
"0000000000000000000000000000000000000004": { "balance": "1", "builtin": { "name": "identity", "pricing": { "linear": { "base": 15, "word": 3 } } } },
"0000000000000000000000000000000000000005": {
"balance": "1",
"constructor": "6060604052602060405190810160405280737d577a597b2742b498cb5cf0c26cdcd726d39e6e73ffffffffffffffffffffffffffffffffffffffff16815250600090600161004e92919061005c565b50341561005757fe5b610129565b8280548282559060005260206000209081019282156100d5579160200282015b828111156100d45782518260006101000a81548173ffffffffffffffffffffffffffffffffffffffff021916908373ffffffffffffffffffffffffffffffffffffffff1602179055509160200191906001019061007c565b5b5090506100e291906100e6565b5090565b61012691905b8082111561012257600081816101000a81549073ffffffffffffffffffffffffffffffffffffffff0219169055506001016100ec565b5090565b90565b61056f806101386000396000f30060606040526000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff16806375286211146100675780639300c92614610079578063b7ab4db5146100d0578063c476dd4014610145578063d69f13bb146101c7575bfe5b341561006f57fe5b610077610206565b005b341561008157fe5b6100ce600480803590602001908201803590602001908080602002602001604051908101604052809392919081815260200183836020028082843782019150505050505091905050610275565b005b34156100d857fe5b6100e061031f565b6040518080602001828103825283818151815260200191508051906020019060200280838360008314610132575b8051825260208311156101325760208201915060208101905060208303925061010e565b5050509050019250505060405180910390f35b341561014d57fe5b6101c5600480803573ffffffffffffffffffffffffffffffffffffffff1690602001909190803590602001909190803590602001908201803590602001908080601f016020809104026020016040519081016040528093929190818152602001838380828437820191505050505050919050506103b4565b005b34156101cf57fe5b610204600480803573ffffffffffffffffffffffffffffffffffffffff169060200190919080359060200190919050506103ba565b005b73fffffffffffffffffffffffffffffffffffffffe3373ffffffffffffffffffffffffffffffffffffffff1614151561023f5760006000fd5b600060018054905014151561027257600160009080546102609291906103bf565b5060006001816102709190610411565b505b5b565b806001908051906020019061028b92919061043d565b506001430340600019167f55252fa6eee4741b4e24a74a70e9c11fd2c2281df8d6ea13126ff845f7825c89826040518080602001828103825283818151815260200191508051906020019060200280838360008314610309575b805182526020831115610309576020820191506020810190506020830392506102e5565b5050509050019250505060405180910390a25b50565b6103276104c7565b60008054806020026020016040519081016040528092919081815260200182805480156103a957602002820191906000526020600020905b8160009054906101000a900473ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff168152602001906001019080831161035f575b505050505090505b90565b5b505050565b5b5050565b8280548282559060005260206000209081019282156104005760005260206000209182015b828111156103ff5782548255916001019190600101906103e4565b5b50905061040d91906104db565b5090565b81548183558181151161043857818360005260206000209182019101610437919061051e565b5b505050565b8280548282559060005260206000209081019282156104b6579160200282015b828111156104b55782518260006101000a81548173ffffffffffffffffffffffffffffffffffffffff021916908373ffffffffffffffffffffffffffffffffffffffff1602179055509160200191906001019061045d565b5b5090506104c391906104db565b5090565b602060405190810160405280600081525090565b61051b91905b8082111561051757600081816101000a81549073ffffffffffffffffffffffffffffffffffffffff0219169055506001016104e1565b5090565b90565b61054091905b8082111561053c576000816000905550600101610524565b5090565b905600a165627a7a7230582041ce7e5c820bc89b1a330a3233c4f3013e77433ecba368fa234adf758d87fe1d0029"
},
"0000000000000000000000000000000000000006": {
"balance": "1",
"constructor": "6060604052602060405190810160405280737d577a597b2742b498cb5cf0c26cdcd726d39e6e73ffffffffffffffffffffffffffffffffffffffff16815250600090600161004e92919061005c565b50341561005757fe5b610129565b8280548282559060005260206000209081019282156100d5579160200282015b828111156100d45782518260006101000a81548173ffffffffffffffffffffffffffffffffffffffff021916908373ffffffffffffffffffffffffffffffffffffffff1602179055509160200191906001019061007c565b5b5090506100e291906100e6565b5090565b61012691905b8082111561012257600081816101000a81549073ffffffffffffffffffffffffffffffffffffffff0219169055506001016100ec565b5090565b90565b61056f806101386000396000f30060606040526000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff16806375286211146100675780639300c92614610079578063b7ab4db5146100d0578063c476dd4014610145578063d69f13bb146101c7575bfe5b341561006f57fe5b610077610206565b005b341561008157fe5b6100ce600480803590602001908201803590602001908080602002602001604051908101604052809392919081815260200183836020028082843782019150505050505091905050610275565b005b34156100d857fe5b6100e061031f565b6040518080602001828103825283818151815260200191508051906020019060200280838360008314610132575b8051825260208311156101325760208201915060208101905060208303925061010e565b5050509050019250505060405180910390f35b341561014d57fe5b6101c5600480803573ffffffffffffffffffffffffffffffffffffffff1690602001909190803590602001909190803590602001908201803590602001908080601f016020809104026020016040519081016040528093929190818152602001838380828437820191505050505050919050506103b4565b005b34156101cf57fe5b610204600480803573ffffffffffffffffffffffffffffffffffffffff169060200190919080359060200190919050506103ba565b005b73fffffffffffffffffffffffffffffffffffffffe3373ffffffffffffffffffffffffffffffffffffffff1614151561023f5760006000fd5b600060018054905014151561027257600160009080546102609291906103bf565b5060006001816102709190610411565b505b5b565b806001908051906020019061028b92919061043d565b506001430340600019167f55252fa6eee4741b4e24a74a70e9c11fd2c2281df8d6ea13126ff845f7825c89826040518080602001828103825283818151815260200191508051906020019060200280838360008314610309575b805182526020831115610309576020820191506020810190506020830392506102e5565b5050509050019250505060405180910390a25b50565b6103276104c7565b60008054806020026020016040519081016040528092919081815260200182805480156103a957602002820191906000526020600020905b8160009054906101000a900473ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff168152602001906001019080831161035f575b505050505090505b90565b5b505050565b5b5050565b8280548282559060005260206000209081019282156104005760005260206000209182015b828111156103ff5782548255916001019190600101906103e4565b5b50905061040d91906104db565b5090565b81548183558181151161043857818360005260206000209182019101610437919061051e565b5b505050565b8280548282559060005260206000209081019282156104b6579160200282015b828111156104b55782518260006101000a81548173ffffffffffffffffffffffffffffffffffffffff021916908373ffffffffffffffffffffffffffffffffffffffff1602179055509160200191906001019061045d565b5b5090506104c391906104db565b5090565b602060405190810160405280600081525090565b61051b91905b8082111561051757600081816101000a81549073ffffffffffffffffffffffffffffffffffffffff0219169055506001016104e1565b5090565b90565b61054091905b8082111561053c576000816000905550600101610524565b5090565b905600a165627a7a7230582041ce7e5c820bc89b1a330a3233c4f3013e77433ecba368fa234adf758d87fe1d0029"
},
"0x7d577a597b2742b498cb5cf0c26cdcd726d39e6e": { "balance": "1606938044258990275541962092341162602522202993782792835301376" },
"0x82a978b3f5962a5b0957d9ee9eef472ee55b42f1": { "balance": "1606938044258990275541962092341162602522202993782792835301376" }
}
}

View File

@@ -1,132 +0,0 @@
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
use std::sync::{Arc, atomic::AtomicBool};
use blockchain::{BlockChain, BlockChainDB};
use bytes::Bytes;
use ethereum_types::H256;
use types::{
errors::{EthcoreError as Error, SnapshotError},
snapshot::{ManifestData, ChunkSink, Progress, RestorationStatus},
};
/// The interface for a snapshot network service.
/// This handles:
/// - restoration of snapshots to temporary databases.
/// - responding to queries for snapshot manifests and chunks
pub trait SnapshotService : Sync + Send {
/// Query the most recent manifest data.
fn manifest(&self) -> Option<ManifestData>;
/// Get the supported range of snapshot version numbers.
/// `None` indicates warp sync isn't supported by the consensus engine.
fn supported_versions(&self) -> Option<(u64, u64)>;
/// Returns a list of the completed chunks
fn completed_chunks(&self) -> Option<Vec<H256>>;
/// Get raw chunk for a given hash.
fn chunk(&self, hash: H256) -> Option<Bytes>;
/// Ask the snapshot service for the restoration status.
fn status(&self) -> RestorationStatus;
/// Begin snapshot restoration.
/// If restoration in-progress, this will reset it.
/// From this point on, any previous snapshot may become unavailable.
fn begin_restore(&self, manifest: ManifestData);
/// Abort an in-progress restoration if there is one.
fn abort_restore(&self);
/// Feed a raw state chunk to the service to be processed asynchronously.
/// no-op if not currently restoring.
fn restore_state_chunk(&self, hash: H256, chunk: Bytes);
/// Feed a raw block chunk to the service to be processed asynchronously.
/// no-op if currently restoring.
fn restore_block_chunk(&self, hash: H256, chunk: Bytes);
/// Abort in-progress snapshotting if there is one.
fn abort_snapshot(&self);
/// Shutdown the Snapshot Service by aborting any ongoing restore
fn shutdown(&self);
}
use crate::engine::Engine;
/// Restore from secondary snapshot chunks.
pub trait Rebuilder: Send {
/// Feed a chunk, potentially out of order.
///
/// Check `abort_flag` periodically while doing heavy work. If set to `false`, should bail with
/// `Error::RestorationAborted`.
fn feed(
&mut self,
chunk: &[u8],
engine: &dyn Engine,
abort_flag: &AtomicBool,
) -> Result<(), Error>;
/// Finalize the restoration. Will be done after all chunks have been
/// fed successfully.
///
/// This should apply the necessary "glue" between chunks,
/// and verify against the restored state.
fn finalize(&mut self) -> Result<(), Error>;
}
/// Components necessary for snapshot creation and restoration.
pub trait SnapshotComponents: Send {
/// Create secondary snapshot chunks; these corroborate the state data
/// in the state chunks.
///
/// Chunks shouldn't exceed the given preferred size, and should be fed
/// uncompressed into the sink.
///
/// This will vary by consensus engine, so it's exposed as a trait.
fn chunk_all(
&mut self,
chain: &BlockChain,
block_at: H256,
chunk_sink: &mut ChunkSink,
progress: &Progress,
preferred_size: usize,
) -> Result<(), SnapshotError>;
/// Create a rebuilder, which will have chunks fed into it in arbitrary
/// order and then be finalized.
///
/// The manifest, a database, and fresh `BlockChain` are supplied.
///
/// The engine passed to the `Rebuilder` methods will be the same instance
/// that created the `SnapshotComponents`.
fn rebuilder(
&self,
chain: BlockChain,
db: Arc<dyn BlockChainDB>,
manifest: &ManifestData,
) -> Result<Box<dyn Rebuilder>, Error>;
/// Minimum supported snapshot version number.
fn min_supported_version(&self) -> u64;
/// Current version number
fn current_version(&self) -> u64;
}

View File

@@ -1,200 +0,0 @@
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
//! Watcher for snapshot-related chain events.
use parking_lot::Mutex;
use client::{Client, ChainNotify, NewBlocks};
use client_traits::BlockInfo;
use types::{
ids::BlockId,
io_message::ClientIoMessage,
};
use io::IoChannel;
use ethereum_types::H256;
use std::sync::Arc;
// helper trait for transforming hashes to numbers and checking if syncing.
trait Oracle: Send + Sync {
fn to_number(&self, hash: H256) -> Option<u64>;
fn is_major_importing(&self) -> bool;
}
struct StandardOracle<F> where F: 'static + Send + Sync + Fn() -> bool {
client: Arc<Client>,
sync_status: F,
}
impl<F> Oracle for StandardOracle<F>
where F: Send + Sync + Fn() -> bool
{
fn to_number(&self, hash: H256) -> Option<u64> {
self.client.block_header(BlockId::Hash(hash)).map(|h| h.number())
}
fn is_major_importing(&self) -> bool {
(self.sync_status)()
}
}
// helper trait for broadcasting a block to take a snapshot at.
trait Broadcast: Send + Sync {
fn take_at(&self, num: Option<u64>);
}
impl Broadcast for Mutex<IoChannel<ClientIoMessage<Client>>> {
fn take_at(&self, num: Option<u64>) {
let num = match num {
Some(n) => n,
None => return,
};
trace!(target: "snapshot_watcher", "broadcast: {}", num);
if let Err(e) = self.lock().send(ClientIoMessage::TakeSnapshot(num)) {
warn!("Snapshot watcher disconnected from IoService: {}", e);
}
}
}
/// A `ChainNotify` implementation which will trigger a snapshot event
/// at certain block numbers.
pub struct Watcher {
oracle: Box<dyn Oracle>,
broadcast: Box<dyn Broadcast>,
period: u64,
history: u64,
}
impl Watcher {
/// Create a new `Watcher` which will trigger a snapshot event
/// once every `period` blocks, but only after that block is
/// `history` blocks old.
pub fn new<F>(client: Arc<Client>, sync_status: F, channel: IoChannel<ClientIoMessage<Client>>, period: u64, history: u64) -> Self
where F: 'static + Send + Sync + Fn() -> bool
{
Watcher {
oracle: Box::new(StandardOracle {
client: client,
sync_status: sync_status,
}),
broadcast: Box::new(Mutex::new(channel)),
period: period,
history: history,
}
}
}
impl ChainNotify for Watcher {
fn new_blocks(&self, new_blocks: NewBlocks) {
if self.oracle.is_major_importing() || new_blocks.has_more_blocks_to_import { return }
trace!(target: "snapshot_watcher", "{} imported", new_blocks.imported.len());
let highest = new_blocks.imported.into_iter()
.filter_map(|h| self.oracle.to_number(h))
.filter(|&num| num >= self.period + self.history)
.map(|num| num - self.history)
.filter(|num| num % self.period == 0)
.fold(0, ::std::cmp::max);
match highest {
0 => self.broadcast.take_at(None),
_ => self.broadcast.take_at(Some(highest)),
}
}
}
#[cfg(test)]
mod tests {
use super::{Broadcast, Oracle, Watcher};
use client::{ChainNotify, NewBlocks, ChainRoute};
use ethereum_types::{H256, U256, BigEndianHash};
use std::collections::HashMap;
use std::time::Duration;
struct TestOracle(HashMap<H256, u64>);
impl Oracle for TestOracle {
fn to_number(&self, hash: H256) -> Option<u64> {
self.0.get(&hash).cloned()
}
fn is_major_importing(&self) -> bool { false }
}
struct TestBroadcast(Option<u64>);
impl Broadcast for TestBroadcast {
fn take_at(&self, num: Option<u64>) {
if num != self.0 {
panic!("Watcher broadcast wrong number. Expected {:?}, found {:?}", self.0, num);
}
}
}
// helper harness for tests which expect a notification.
fn harness(numbers: Vec<u64>, period: u64, history: u64, expected: Option<u64>) {
const DURATION_ZERO: Duration = Duration::from_millis(0);
let hashes: Vec<_> = numbers.clone().into_iter().map(|x| BigEndianHash::from_uint(&U256::from(x))).collect();
let map = hashes.clone().into_iter().zip(numbers).collect();
let watcher = Watcher {
oracle: Box::new(TestOracle(map)),
broadcast: Box::new(TestBroadcast(expected)),
period: period,
history: history,
};
watcher.new_blocks(NewBlocks::new(
hashes,
vec![],
ChainRoute::default(),
vec![],
vec![],
DURATION_ZERO,
false
));
}
// helper
#[test]
fn should_not_fire() {
harness(vec![0], 5, 0, None);
}
#[test]
fn fires_once_for_two() {
harness(vec![14, 15], 10, 5, Some(10));
}
#[test]
fn finds_highest() {
harness(vec![15, 25], 10, 5, Some(20));
}
#[test]
fn doesnt_fire_before_history() {
harness(vec![10, 11], 10, 5, None);
}
}

View File

@@ -34,6 +34,7 @@ use parking_lot::RwLock;
use rlp::{self, RlpStream};
use tempdir::TempDir;
use types::{
chain_notify::ChainMessageType,
transaction::{Action, Transaction, SignedTransaction},
encoded,
engines::ForkChoice,
@@ -44,8 +45,8 @@ use types::{
};
use block::{OpenBlock, Drain};
use client::{Client, ClientConfig, ChainNotify, ChainMessageType, PrepareOpenBlock};
use client_traits::{ChainInfo, ImportBlock};
use client::{Client, ClientConfig, PrepareOpenBlock};
use client_traits::{ChainInfo, ChainNotify, ImportBlock};
use trie_vm_factories::Factories;
use miner::Miner;
use spec::{Spec, self};