Keep existing blocks when restoring a Snapshot (#8643)
* Rename db_restore => client * First step: make it compile! * Second step: working implementation! * Refactoring * Fix tests * PR Grumbles * PR Grumbles WIP * Migrate ancient blocks interating backward * Early return in block migration if snapshot is aborted * Remove RwLock getter (PR Grumble I) * Remove dependency on `Client`: only used Traits * Add test for recovering aborted snapshot recovery * Add test for migrating old blocks * Fix build * PR Grumble I * PR Grumble II * PR Grumble III * PR Grumble IV * PR Grumble V * PR Grumble VI * Fix one test * Fix test * PR Grumble * PR Grumbles * PR Grumbles II * Fix tests * Release RwLock earlier * Revert Cargo.lock * Update _update ancient block_ logic: set local in `commit` * Update typo in ethcore/src/snapshot/service.rs Co-Authored-By: ngotchac <ngotchac@gmail.com>
This commit is contained in:
parent
5baed0c158
commit
9475a2e474
11
Cargo.lock
generated
11
Cargo.lock
generated
@ -527,6 +527,15 @@ dependencies = [
|
|||||||
"heapsize 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
"heapsize 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "env_logger"
|
||||||
|
version = "0.4.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
dependencies = [
|
||||||
|
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"regex 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "env_logger"
|
name = "env_logger"
|
||||||
version = "0.5.13"
|
version = "0.5.13"
|
||||||
@ -627,6 +636,7 @@ dependencies = [
|
|||||||
"common-types 0.1.0",
|
"common-types 0.1.0",
|
||||||
"criterion 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
"criterion 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"crossbeam 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
"crossbeam 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"ethabi 6.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"ethabi 6.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"ethabi-contract 6.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"ethabi-contract 6.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
@ -4244,6 +4254,7 @@ dependencies = [
|
|||||||
"checksum edit-distance 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3bd26878c3d921f89797a4e1a1711919f999a9f6946bb6f5a4ffda126d297b7e"
|
"checksum edit-distance 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3bd26878c3d921f89797a4e1a1711919f999a9f6946bb6f5a4ffda126d297b7e"
|
||||||
"checksum either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3be565ca5c557d7f59e7cfcf1844f9e3033650c929c6566f511e8005f205c1d0"
|
"checksum either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3be565ca5c557d7f59e7cfcf1844f9e3033650c929c6566f511e8005f205c1d0"
|
||||||
"checksum elastic-array 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "88d4851b005ef16de812ea9acdb7bece2f0a40dd86c07b85631d7dafa54537bb"
|
"checksum elastic-array 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "88d4851b005ef16de812ea9acdb7bece2f0a40dd86c07b85631d7dafa54537bb"
|
||||||
|
"checksum env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3ddf21e73e016298f5cb37d6ef8e8da8e39f91f9ec8b0df44b7deb16a9f8cd5b"
|
||||||
"checksum env_logger 0.5.13 (registry+https://github.com/rust-lang/crates.io-index)" = "15b0a4d2e39f8420210be8b27eeda28029729e2fd4291019455016c348240c38"
|
"checksum env_logger 0.5.13 (registry+https://github.com/rust-lang/crates.io-index)" = "15b0a4d2e39f8420210be8b27eeda28029729e2fd4291019455016c348240c38"
|
||||||
"checksum error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "07e791d3be96241c77c43846b665ef1384606da2cd2a48730abe606a12906e02"
|
"checksum error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "07e791d3be96241c77c43846b665ef1384606da2cd2a48730abe606a12906e02"
|
||||||
"checksum eth-secp256k1 0.5.7 (git+https://github.com/paritytech/rust-secp256k1)" = "<none>"
|
"checksum eth-secp256k1 0.5.7 (git+https://github.com/paritytech/rust-secp256k1)" = "<none>"
|
||||||
|
@ -76,6 +76,7 @@ hardware-wallet = { path = "../hw" }
|
|||||||
fake-hardware-wallet = { path = "../util/fake-hardware-wallet" }
|
fake-hardware-wallet = { path = "../util/fake-hardware-wallet" }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
env_logger = "0.4"
|
||||||
tempdir = "0.3"
|
tempdir = "0.3"
|
||||||
trie-standardmap = "0.1"
|
trie-standardmap = "0.1"
|
||||||
criterion = "0.2"
|
criterion = "0.2"
|
||||||
|
@ -176,8 +176,8 @@ impl<T: ProvingBlockChainClient + ?Sized> Provider for T {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn block_receipts(&self, req: request::CompleteReceiptsRequest) -> Option<request::ReceiptsResponse> {
|
fn block_receipts(&self, req: request::CompleteReceiptsRequest) -> Option<request::ReceiptsResponse> {
|
||||||
BlockChainClient::encoded_block_receipts(self, &req.hash)
|
BlockChainClient::block_receipts(self, &req.hash)
|
||||||
.map(|x| ::request::ReceiptsResponse { receipts: ::rlp::decode_list(&x) })
|
.map(|x| ::request::ReceiptsResponse { receipts: x.receipts })
|
||||||
}
|
}
|
||||||
|
|
||||||
fn account_proof(&self, req: request::CompleteAccountRequest) -> Option<request::AccountResponse> {
|
fn account_proof(&self, req: request::CompleteAccountRequest) -> Option<request::AccountResponse> {
|
||||||
|
@ -117,7 +117,7 @@ impl ClientService {
|
|||||||
pruning: pruning,
|
pruning: pruning,
|
||||||
channel: io_service.channel(),
|
channel: io_service.channel(),
|
||||||
snapshot_root: snapshot_path.into(),
|
snapshot_root: snapshot_path.into(),
|
||||||
db_restore: client.clone(),
|
client: client.clone(),
|
||||||
};
|
};
|
||||||
let snapshot = Arc::new(SnapshotService::new(snapshot_params)?);
|
let snapshot = Arc::new(SnapshotService::new(snapshot_params)?);
|
||||||
|
|
||||||
|
@ -229,6 +229,7 @@ pub struct BlockChain {
|
|||||||
|
|
||||||
cache_man: Mutex<CacheManager<CacheId>>,
|
cache_man: Mutex<CacheManager<CacheId>>,
|
||||||
|
|
||||||
|
pending_best_ancient_block: RwLock<Option<Option<BestAncientBlock>>>,
|
||||||
pending_best_block: RwLock<Option<BestBlock>>,
|
pending_best_block: RwLock<Option<BestBlock>>,
|
||||||
pending_block_hashes: RwLock<HashMap<BlockNumber, H256>>,
|
pending_block_hashes: RwLock<HashMap<BlockNumber, H256>>,
|
||||||
pending_block_details: RwLock<HashMap<H256, BlockDetails>>,
|
pending_block_details: RwLock<HashMap<H256, BlockDetails>>,
|
||||||
@ -538,6 +539,7 @@ impl BlockChain {
|
|||||||
block_receipts: RwLock::new(HashMap::new()),
|
block_receipts: RwLock::new(HashMap::new()),
|
||||||
db: db.clone(),
|
db: db.clone(),
|
||||||
cache_man: Mutex::new(cache_man),
|
cache_man: Mutex::new(cache_man),
|
||||||
|
pending_best_ancient_block: RwLock::new(None),
|
||||||
pending_best_block: RwLock::new(None),
|
pending_best_block: RwLock::new(None),
|
||||||
pending_block_hashes: RwLock::new(HashMap::new()),
|
pending_block_hashes: RwLock::new(HashMap::new()),
|
||||||
pending_block_details: RwLock::new(HashMap::new()),
|
pending_block_details: RwLock::new(HashMap::new()),
|
||||||
@ -808,18 +810,7 @@ impl BlockChain {
|
|||||||
}, is_best);
|
}, is_best);
|
||||||
|
|
||||||
if is_ancient {
|
if is_ancient {
|
||||||
let mut best_ancient_block = self.best_ancient_block.write();
|
self.set_best_ancient_block(block_number, &hash, batch);
|
||||||
let ancient_number = best_ancient_block.as_ref().map_or(0, |b| b.number);
|
|
||||||
if self.block_hash(block_number + 1).is_some() {
|
|
||||||
batch.delete(db::COL_EXTRA, b"ancient");
|
|
||||||
*best_ancient_block = None;
|
|
||||||
} else if block_number > ancient_number {
|
|
||||||
batch.put(db::COL_EXTRA, b"ancient", &hash);
|
|
||||||
*best_ancient_block = Some(BestAncientBlock {
|
|
||||||
hash: hash,
|
|
||||||
number: block_number,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
false
|
false
|
||||||
@ -860,6 +851,84 @@ impl BlockChain {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Update the best ancient block to the given hash, after checking that
|
||||||
|
/// it's directly linked to the currently known best ancient block
|
||||||
|
pub fn update_best_ancient_block(&self, hash: &H256) {
|
||||||
|
// Get the block view of the next ancient block (it must
|
||||||
|
// be in DB at this point)
|
||||||
|
let block_view = match self.block(hash) {
|
||||||
|
Some(v) => v,
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
|
||||||
|
// So that `best_ancient_block` gets unlocked before calling
|
||||||
|
// `set_best_ancient_block`
|
||||||
|
{
|
||||||
|
// Get the target hash ; if there are no ancient block,
|
||||||
|
// it means that the chain is already fully linked
|
||||||
|
// Release the `best_ancient_block` RwLock
|
||||||
|
let target_hash = {
|
||||||
|
let best_ancient_block = self.best_ancient_block.read();
|
||||||
|
let cur_ancient_block = match *best_ancient_block {
|
||||||
|
Some(ref b) => b,
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Ensure that the new best ancient block is after the current one
|
||||||
|
if block_view.number() <= cur_ancient_block.number {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
cur_ancient_block.hash.clone()
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut block_hash = *hash;
|
||||||
|
let mut is_linked = false;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
if block_hash == target_hash {
|
||||||
|
is_linked = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
match self.block_details(&block_hash) {
|
||||||
|
Some(block_details) => {
|
||||||
|
block_hash = block_details.parent;
|
||||||
|
},
|
||||||
|
None => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !is_linked {
|
||||||
|
trace!(target: "blockchain", "The given block {:x} is not linked to the known ancient block {:x}", hash, target_hash);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut batch = self.db.key_value().transaction();
|
||||||
|
self.set_best_ancient_block(block_view.number(), hash, &mut batch);
|
||||||
|
self.db.key_value().write(batch).expect("Low level database error.");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the best ancient block with the given value: private method
|
||||||
|
/// `best_ancient_block` must not be locked, otherwise a DeadLock would occur
|
||||||
|
fn set_best_ancient_block(&self, block_number: BlockNumber, block_hash: &H256, batch: &mut DBTransaction) {
|
||||||
|
let mut pending_best_ancient_block = self.pending_best_ancient_block.write();
|
||||||
|
let ancient_number = self.best_ancient_block.read().as_ref().map_or(0, |b| b.number);
|
||||||
|
if self.block_hash(block_number + 1).is_some() {
|
||||||
|
trace!(target: "blockchain", "The two ends of the chain have met.");
|
||||||
|
batch.delete(db::COL_EXTRA, b"ancient");
|
||||||
|
*pending_best_ancient_block = Some(None);
|
||||||
|
} else if block_number > ancient_number {
|
||||||
|
trace!(target: "blockchain", "Updating the best ancient block to {}.", block_number);
|
||||||
|
batch.put(db::COL_EXTRA, b"ancient", &block_hash);
|
||||||
|
*pending_best_ancient_block = Some(Some(BestAncientBlock {
|
||||||
|
hash: *block_hash,
|
||||||
|
number: block_number,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Insert an epoch transition. Provide an epoch number being transitioned to
|
/// Insert an epoch transition. Provide an epoch number being transitioned to
|
||||||
/// and epoch transition object.
|
/// and epoch transition object.
|
||||||
///
|
///
|
||||||
@ -1112,15 +1181,21 @@ impl BlockChain {
|
|||||||
|
|
||||||
/// Apply pending insertion updates
|
/// Apply pending insertion updates
|
||||||
pub fn commit(&self) {
|
pub fn commit(&self) {
|
||||||
|
let mut pending_best_ancient_block = self.pending_best_ancient_block.write();
|
||||||
let mut pending_best_block = self.pending_best_block.write();
|
let mut pending_best_block = self.pending_best_block.write();
|
||||||
let mut pending_write_hashes = self.pending_block_hashes.write();
|
let mut pending_write_hashes = self.pending_block_hashes.write();
|
||||||
let mut pending_block_details = self.pending_block_details.write();
|
let mut pending_block_details = self.pending_block_details.write();
|
||||||
let mut pending_write_txs = self.pending_transaction_addresses.write();
|
let mut pending_write_txs = self.pending_transaction_addresses.write();
|
||||||
|
|
||||||
|
let mut best_ancient_block = self.best_ancient_block.write();
|
||||||
let mut best_block = self.best_block.write();
|
let mut best_block = self.best_block.write();
|
||||||
let mut write_block_details = self.block_details.write();
|
let mut write_block_details = self.block_details.write();
|
||||||
let mut write_hashes = self.block_hashes.write();
|
let mut write_hashes = self.block_hashes.write();
|
||||||
let mut write_txs = self.transaction_addresses.write();
|
let mut write_txs = self.transaction_addresses.write();
|
||||||
|
// update best ancient block
|
||||||
|
if let Some(block_option) = pending_best_ancient_block.take() {
|
||||||
|
*best_ancient_block = block_option;
|
||||||
|
}
|
||||||
// update best block
|
// update best block
|
||||||
if let Some(block) = pending_best_block.take() {
|
if let Some(block) = pending_best_block.take() {
|
||||||
*best_block = block;
|
*best_block = block;
|
||||||
|
@ -32,7 +32,7 @@ use kvdb::{DBValue, KeyValueDB, DBTransaction};
|
|||||||
// other
|
// other
|
||||||
use ethereum_types::{H256, Address, U256};
|
use ethereum_types::{H256, Address, U256};
|
||||||
use block::{IsBlock, LockedBlock, Drain, ClosedBlock, OpenBlock, enact_verified, SealedBlock};
|
use block::{IsBlock, LockedBlock, Drain, ClosedBlock, OpenBlock, enact_verified, SealedBlock};
|
||||||
use blockchain::{BlockChain, BlockChainDB, BlockProvider, TreeRoute, ImportRoute, TransactionAddress, ExtrasInsert};
|
use blockchain::{BlockReceipts, BlockChain, BlockChainDB, BlockProvider, TreeRoute, ImportRoute, TransactionAddress, ExtrasInsert};
|
||||||
use client::ancient_import::AncientVerifier;
|
use client::ancient_import::AncientVerifier;
|
||||||
use client::{
|
use client::{
|
||||||
Nonce, Balance, ChainInfo, BlockInfo, CallContract, TransactionInfo,
|
Nonce, Balance, ChainInfo, BlockInfo, CallContract, TransactionInfo,
|
||||||
@ -66,7 +66,7 @@ use ethcore_miner::pool::VerifiedTransaction;
|
|||||||
use parking_lot::{Mutex, RwLock};
|
use parking_lot::{Mutex, RwLock};
|
||||||
use rand::OsRng;
|
use rand::OsRng;
|
||||||
use receipt::{Receipt, LocalizedReceipt};
|
use receipt::{Receipt, LocalizedReceipt};
|
||||||
use snapshot::{self, io as snapshot_io};
|
use snapshot::{self, io as snapshot_io, SnapshotClient};
|
||||||
use spec::Spec;
|
use spec::Spec;
|
||||||
use state_db::StateDB;
|
use state_db::StateDB;
|
||||||
use state::{self, State};
|
use state::{self, State};
|
||||||
@ -1005,6 +1005,16 @@ impl Client {
|
|||||||
self.importer.miner.clone()
|
self.importer.miner.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn state_db(&self) -> ::parking_lot::RwLockReadGuard<StateDB> {
|
||||||
|
self.state_db.read()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn chain(&self) -> Arc<BlockChain> {
|
||||||
|
self.chain.read().clone()
|
||||||
|
}
|
||||||
|
|
||||||
/// Replace io channel. Useful for testing.
|
/// Replace io channel. Useful for testing.
|
||||||
pub fn set_io_channel(&self, io_channel: IoChannel<ClientIoMessage>) {
|
pub fn set_io_channel(&self, io_channel: IoChannel<ClientIoMessage>) {
|
||||||
*self.io_channel.write() = io_channel;
|
*self.io_channel.write() = io_channel;
|
||||||
@ -1817,7 +1827,7 @@ impl BlockChainClient for Client {
|
|||||||
Some(receipt)
|
Some(receipt)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn block_receipts(&self, id: BlockId) -> Option<Vec<LocalizedReceipt>> {
|
fn localized_block_receipts(&self, id: BlockId) -> Option<Vec<LocalizedReceipt>> {
|
||||||
let hash = self.block_hash(id)?;
|
let hash = self.block_hash(id)?;
|
||||||
|
|
||||||
let chain = self.chain.read();
|
let chain = self.chain.read();
|
||||||
@ -1860,8 +1870,8 @@ impl BlockChainClient for Client {
|
|||||||
self.state_db.read().journal_db().state(hash)
|
self.state_db.read().journal_db().state(hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn encoded_block_receipts(&self, hash: &H256) -> Option<Bytes> {
|
fn block_receipts(&self, hash: &H256) -> Option<BlockReceipts> {
|
||||||
self.chain.read().block_receipts(hash).map(|receipts| ::rlp::encode(&receipts))
|
self.chain.read().block_receipts(hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn queue_info(&self) -> BlockQueueInfo {
|
fn queue_info(&self) -> BlockQueueInfo {
|
||||||
@ -2406,6 +2416,8 @@ impl ProvingBlockChainClient for Client {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl SnapshotClient for Client {}
|
||||||
|
|
||||||
impl Drop for Client {
|
impl Drop for Client {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.engine.stop();
|
self.engine.stop();
|
||||||
@ -2504,7 +2516,7 @@ mod tests {
|
|||||||
use test_helpers::{generate_dummy_client_with_data};
|
use test_helpers::{generate_dummy_client_with_data};
|
||||||
|
|
||||||
let client = generate_dummy_client_with_data(2, 2, &[1.into(), 1.into()]);
|
let client = generate_dummy_client_with_data(2, 2, &[1.into(), 1.into()]);
|
||||||
let receipts = client.block_receipts(BlockId::Latest).unwrap();
|
let receipts = client.localized_block_receipts(BlockId::Latest).unwrap();
|
||||||
|
|
||||||
assert_eq!(receipts.len(), 2);
|
assert_eq!(receipts.len(), 2);
|
||||||
assert_eq!(receipts[0].transaction_index, 0);
|
assert_eq!(receipts[0].transaction_index, 0);
|
||||||
|
@ -686,7 +686,7 @@ impl BlockChainClient for TestBlockChainClient {
|
|||||||
self.receipts.read().get(&id).cloned()
|
self.receipts.read().get(&id).cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn block_receipts(&self, _id: BlockId) -> Option<Vec<LocalizedReceipt>> {
|
fn localized_block_receipts(&self, _id: BlockId) -> Option<Vec<LocalizedReceipt>> {
|
||||||
Some(self.receipts.read().values().cloned().collect())
|
Some(self.receipts.read().values().cloned().collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -789,16 +789,14 @@ impl BlockChainClient for TestBlockChainClient {
|
|||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
fn encoded_block_receipts(&self, hash: &H256) -> Option<Bytes> {
|
fn block_receipts(&self, hash: &H256) -> Option<BlockReceipts> {
|
||||||
// starts with 'f' ?
|
// starts with 'f' ?
|
||||||
if *hash > H256::from("f000000000000000000000000000000000000000000000000000000000000000") {
|
if *hash > H256::from("f000000000000000000000000000000000000000000000000000000000000000") {
|
||||||
let receipt = BlockReceipts::new(vec![Receipt::new(
|
let receipt = BlockReceipts::new(vec![Receipt::new(
|
||||||
TransactionOutcome::StateRoot(H256::zero()),
|
TransactionOutcome::StateRoot(H256::zero()),
|
||||||
U256::zero(),
|
U256::zero(),
|
||||||
vec![])]);
|
vec![])]);
|
||||||
let mut rlp = RlpStream::new();
|
return Some(receipt);
|
||||||
rlp.append(&receipt);
|
|
||||||
return Some(rlp.out());
|
|
||||||
}
|
}
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,7 @@ use std::sync::Arc;
|
|||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
|
|
||||||
use block::{OpenBlock, SealedBlock, ClosedBlock};
|
use block::{OpenBlock, SealedBlock, ClosedBlock};
|
||||||
use blockchain::TreeRoute;
|
use blockchain::{BlockReceipts, TreeRoute};
|
||||||
use client::Mode;
|
use client::Mode;
|
||||||
use encoded;
|
use encoded;
|
||||||
use vm::LastHashes;
|
use vm::LastHashes;
|
||||||
@ -282,7 +282,7 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra
|
|||||||
fn transaction_receipt(&self, id: TransactionId) -> Option<LocalizedReceipt>;
|
fn transaction_receipt(&self, id: TransactionId) -> Option<LocalizedReceipt>;
|
||||||
|
|
||||||
/// Get localized receipts for all transaction in given block.
|
/// Get localized receipts for all transaction in given block.
|
||||||
fn block_receipts(&self, id: BlockId) -> Option<Vec<LocalizedReceipt>>;
|
fn localized_block_receipts(&self, id: BlockId) -> Option<Vec<LocalizedReceipt>>;
|
||||||
|
|
||||||
/// Get a tree route between `from` and `to`.
|
/// Get a tree route between `from` and `to`.
|
||||||
/// See `BlockChain::tree_route`.
|
/// See `BlockChain::tree_route`.
|
||||||
@ -294,8 +294,8 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra
|
|||||||
/// Get latest state node
|
/// Get latest state node
|
||||||
fn state_data(&self, hash: &H256) -> Option<Bytes>;
|
fn state_data(&self, hash: &H256) -> Option<Bytes>;
|
||||||
|
|
||||||
/// Get raw block receipts data by block header hash.
|
/// Get block receipts data by block header hash.
|
||||||
fn encoded_block_receipts(&self, hash: &H256) -> Option<Bytes>;
|
fn block_receipts(&self, hash: &H256) -> Option<BlockReceipts>;
|
||||||
|
|
||||||
/// Get block queue information.
|
/// Get block queue information.
|
||||||
fn queue_info(&self) -> BlockQueueInfo;
|
fn queue_info(&self) -> BlockQueueInfo;
|
||||||
|
@ -138,6 +138,9 @@ extern crate trace_time;
|
|||||||
#[cfg_attr(test, macro_use)]
|
#[cfg_attr(test, macro_use)]
|
||||||
extern crate evm;
|
extern crate evm;
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
extern crate env_logger;
|
||||||
|
|
||||||
pub extern crate ethstore;
|
pub extern crate ethstore;
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
|
@ -65,6 +65,8 @@ pub enum Error {
|
|||||||
BadEpochProof(u64),
|
BadEpochProof(u64),
|
||||||
/// Wrong chunk format.
|
/// Wrong chunk format.
|
||||||
WrongChunkFormat(String),
|
WrongChunkFormat(String),
|
||||||
|
/// Unlinked ancient block chain
|
||||||
|
UnlinkedAncientBlockChain,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Display for Error {
|
impl fmt::Display for Error {
|
||||||
@ -91,6 +93,7 @@ impl fmt::Display for Error {
|
|||||||
Error::SnapshotsUnsupported => write!(f, "Snapshots unsupported by consensus engine."),
|
Error::SnapshotsUnsupported => write!(f, "Snapshots unsupported by consensus engine."),
|
||||||
Error::BadEpochProof(i) => write!(f, "Bad epoch proof for transition to epoch {}", i),
|
Error::BadEpochProof(i) => write!(f, "Bad epoch proof for transition to epoch {}", i),
|
||||||
Error::WrongChunkFormat(ref msg) => write!(f, "Wrong chunk format: {}", msg),
|
Error::WrongChunkFormat(ref msg) => write!(f, "Wrong chunk format: {}", msg),
|
||||||
|
Error::UnlinkedAncientBlockChain => write!(f, "Unlinked ancient blocks chain"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -56,7 +56,7 @@ use rand::{Rng, OsRng};
|
|||||||
pub use self::error::Error;
|
pub use self::error::Error;
|
||||||
|
|
||||||
pub use self::consensus::*;
|
pub use self::consensus::*;
|
||||||
pub use self::service::{Service, DatabaseRestore};
|
pub use self::service::{SnapshotClient, Service, DatabaseRestore};
|
||||||
pub use self::traits::SnapshotService;
|
pub use self::traits::SnapshotService;
|
||||||
pub use self::watcher::Watcher;
|
pub use self::watcher::Watcher;
|
||||||
pub use types::snapshot_manifest::ManifestData;
|
pub use types::snapshot_manifest::ManifestData;
|
||||||
|
@ -22,12 +22,13 @@ use std::fs::{self, File};
|
|||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||||
|
use std::cmp;
|
||||||
|
|
||||||
use super::{ManifestData, StateRebuilder, Rebuilder, RestorationStatus, SnapshotService, MAX_CHUNK_SIZE};
|
use super::{ManifestData, StateRebuilder, Rebuilder, RestorationStatus, SnapshotService, MAX_CHUNK_SIZE};
|
||||||
use super::io::{SnapshotReader, LooseReader, SnapshotWriter, LooseWriter};
|
use super::io::{SnapshotReader, LooseReader, SnapshotWriter, LooseWriter};
|
||||||
|
|
||||||
use blockchain::{BlockChain, BlockChainDB, BlockChainDBHandler};
|
use blockchain::{BlockChain, BlockChainDB, BlockChainDBHandler};
|
||||||
use client::{Client, ChainInfo, ClientIoMessage};
|
use client::{BlockInfo, BlockChainClient, Client, ChainInfo, ClientIoMessage};
|
||||||
use engines::EthEngine;
|
use engines::EthEngine;
|
||||||
use error::{Error, ErrorKind as SnapshotErrorKind};
|
use error::{Error, ErrorKind as SnapshotErrorKind};
|
||||||
use snapshot::{Error as SnapshotError};
|
use snapshot::{Error as SnapshotError};
|
||||||
@ -40,6 +41,7 @@ use ethereum_types::H256;
|
|||||||
use parking_lot::{Mutex, RwLock, RwLockReadGuard};
|
use parking_lot::{Mutex, RwLock, RwLockReadGuard};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use journaldb::Algorithm;
|
use journaldb::Algorithm;
|
||||||
|
use kvdb::DBTransaction;
|
||||||
use snappy;
|
use snappy;
|
||||||
|
|
||||||
/// Helper for removing directories in case of error.
|
/// Helper for removing directories in case of error.
|
||||||
@ -203,6 +205,9 @@ impl Restoration {
|
|||||||
/// Type alias for client io channel.
|
/// Type alias for client io channel.
|
||||||
pub type Channel = IoChannel<ClientIoMessage>;
|
pub type Channel = IoChannel<ClientIoMessage>;
|
||||||
|
|
||||||
|
/// Trait alias for the Client Service used
|
||||||
|
pub trait SnapshotClient: BlockChainClient + BlockInfo + DatabaseRestore {}
|
||||||
|
|
||||||
/// Snapshot service parameters.
|
/// Snapshot service parameters.
|
||||||
pub struct ServiceParams {
|
pub struct ServiceParams {
|
||||||
/// The consensus engine this is built on.
|
/// The consensus engine this is built on.
|
||||||
@ -219,7 +224,7 @@ pub struct ServiceParams {
|
|||||||
/// Usually "<chain hash>/snapshot"
|
/// Usually "<chain hash>/snapshot"
|
||||||
pub snapshot_root: PathBuf,
|
pub snapshot_root: PathBuf,
|
||||||
/// A handle for database restoration.
|
/// A handle for database restoration.
|
||||||
pub db_restore: Arc<DatabaseRestore>,
|
pub client: Arc<SnapshotClient>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// `SnapshotService` implementation.
|
/// `SnapshotService` implementation.
|
||||||
@ -236,7 +241,7 @@ pub struct Service {
|
|||||||
genesis_block: Bytes,
|
genesis_block: Bytes,
|
||||||
state_chunks: AtomicUsize,
|
state_chunks: AtomicUsize,
|
||||||
block_chunks: AtomicUsize,
|
block_chunks: AtomicUsize,
|
||||||
db_restore: Arc<DatabaseRestore>,
|
client: Arc<SnapshotClient>,
|
||||||
progress: super::Progress,
|
progress: super::Progress,
|
||||||
taking_snapshot: AtomicBool,
|
taking_snapshot: AtomicBool,
|
||||||
restoring_snapshot: AtomicBool,
|
restoring_snapshot: AtomicBool,
|
||||||
@ -257,7 +262,7 @@ impl Service {
|
|||||||
genesis_block: params.genesis_block,
|
genesis_block: params.genesis_block,
|
||||||
state_chunks: AtomicUsize::new(0),
|
state_chunks: AtomicUsize::new(0),
|
||||||
block_chunks: AtomicUsize::new(0),
|
block_chunks: AtomicUsize::new(0),
|
||||||
db_restore: params.db_restore,
|
client: params.client,
|
||||||
progress: Default::default(),
|
progress: Default::default(),
|
||||||
taking_snapshot: AtomicBool::new(false),
|
taking_snapshot: AtomicBool::new(false),
|
||||||
restoring_snapshot: AtomicBool::new(false),
|
restoring_snapshot: AtomicBool::new(false),
|
||||||
@ -334,12 +339,110 @@ impl Service {
|
|||||||
|
|
||||||
// replace one the client's database with our own.
|
// replace one the client's database with our own.
|
||||||
fn replace_client_db(&self) -> Result<(), Error> {
|
fn replace_client_db(&self) -> Result<(), Error> {
|
||||||
let our_db = self.restoration_db();
|
let migrated_blocks = self.migrate_blocks()?;
|
||||||
|
trace!(target: "snapshot", "Migrated {} ancient blocks", migrated_blocks);
|
||||||
|
|
||||||
self.db_restore.restore_db(&*our_db.to_string_lossy())?;
|
let rest_db = self.restoration_db();
|
||||||
|
self.client.restore_db(&*rest_db.to_string_lossy())?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Migrate the blocks in the current DB into the new chain
|
||||||
|
fn migrate_blocks(&self) -> Result<usize, Error> {
|
||||||
|
// Count the number of migrated blocks
|
||||||
|
let mut count = 0;
|
||||||
|
let rest_db = self.restoration_db();
|
||||||
|
|
||||||
|
let cur_chain_info = self.client.chain_info();
|
||||||
|
|
||||||
|
let next_db = self.restoration_db_handler.open(&rest_db)?;
|
||||||
|
let next_chain = BlockChain::new(Default::default(), &[], next_db.clone());
|
||||||
|
let next_chain_info = next_chain.chain_info();
|
||||||
|
|
||||||
|
// The old database looks like this:
|
||||||
|
// [genesis, best_ancient_block] ... [first_block, best_block]
|
||||||
|
// If we are fully synced neither `best_ancient_block` nor `first_block` is set, and we can assume that the whole range from [genesis, best_block] is imported.
|
||||||
|
// The new database only contains the tip of the chain ([first_block, best_block]),
|
||||||
|
// so the useful set of blocks is defined as:
|
||||||
|
// [0 ... min(new.first_block, best_ancient_block or best_block)]
|
||||||
|
let find_range = || -> Option<(H256, H256)> {
|
||||||
|
let next_available_from = next_chain_info.first_block_number?;
|
||||||
|
let cur_available_to = cur_chain_info.ancient_block_number.unwrap_or(cur_chain_info.best_block_number);
|
||||||
|
|
||||||
|
let highest_block_num = cmp::min(next_available_from.saturating_sub(1), cur_available_to);
|
||||||
|
|
||||||
|
if highest_block_num == 0 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
trace!(target: "snapshot", "Trying to import ancient blocks until {}", highest_block_num);
|
||||||
|
|
||||||
|
// Here we start from the highest block number and go backward to 0,
|
||||||
|
// thus starting at `highest_block_num` and targetting `0`.
|
||||||
|
let target_hash = self.client.block_hash(BlockId::Number(0))?;
|
||||||
|
let start_hash = self.client.block_hash(BlockId::Number(highest_block_num))?;
|
||||||
|
|
||||||
|
Some((start_hash, target_hash))
|
||||||
|
};
|
||||||
|
|
||||||
|
let (start_hash, target_hash) = match find_range() {
|
||||||
|
Some(x) => x,
|
||||||
|
None => return Ok(0),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut batch = DBTransaction::new();
|
||||||
|
let mut parent_hash = start_hash;
|
||||||
|
while parent_hash != target_hash {
|
||||||
|
// Early return if restoration is aborted
|
||||||
|
if !self.restoring_snapshot.load(Ordering::SeqCst) {
|
||||||
|
return Ok(count);
|
||||||
|
}
|
||||||
|
|
||||||
|
let block = self.client.block(BlockId::Hash(parent_hash)).ok_or(::snapshot::error::Error::UnlinkedAncientBlockChain)?;
|
||||||
|
parent_hash = block.parent_hash();
|
||||||
|
|
||||||
|
let block_number = block.number();
|
||||||
|
let block_receipts = self.client.block_receipts(&block.hash());
|
||||||
|
let parent_total_difficulty = self.client.block_total_difficulty(BlockId::Hash(parent_hash));
|
||||||
|
|
||||||
|
match (block_receipts, parent_total_difficulty) {
|
||||||
|
(Some(block_receipts), Some(parent_total_difficulty)) => {
|
||||||
|
let block_receipts = block_receipts.receipts;
|
||||||
|
|
||||||
|
next_chain.insert_unordered_block(&mut batch, block, block_receipts, Some(parent_total_difficulty), false, true);
|
||||||
|
count += 1;
|
||||||
|
},
|
||||||
|
_ => break,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Writting changes to DB and logging every now and then
|
||||||
|
if block_number % 1_000 == 0 {
|
||||||
|
next_db.key_value().write_buffered(batch);
|
||||||
|
next_chain.commit();
|
||||||
|
next_db.key_value().flush().expect("DB flush failed.");
|
||||||
|
batch = DBTransaction::new();
|
||||||
|
}
|
||||||
|
|
||||||
|
if block_number % 10_000 == 0 {
|
||||||
|
trace!(target: "snapshot", "Block restoration at #{}", block_number);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Final commit to the DB
|
||||||
|
next_db.key_value().write_buffered(batch);
|
||||||
|
next_chain.commit();
|
||||||
|
next_db.key_value().flush().expect("DB flush failed.");
|
||||||
|
|
||||||
|
// We couldn't reach the targeted hash
|
||||||
|
if parent_hash != target_hash {
|
||||||
|
return Err(::snapshot::error::Error::UnlinkedAncientBlockChain.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update best ancient block in the Next Chain
|
||||||
|
next_chain.update_best_ancient_block(&start_hash);
|
||||||
|
Ok(count)
|
||||||
|
}
|
||||||
|
|
||||||
/// Get a reference to the snapshot reader.
|
/// Get a reference to the snapshot reader.
|
||||||
pub fn reader(&self) -> RwLockReadGuard<Option<LooseReader>> {
|
pub fn reader(&self) -> RwLockReadGuard<Option<LooseReader>> {
|
||||||
self.reader.read()
|
self.reader.read()
|
||||||
@ -480,12 +583,16 @@ impl Service {
|
|||||||
// Import previous chunks, continue if it fails
|
// Import previous chunks, continue if it fails
|
||||||
self.import_prev_chunks(&mut res, manifest).ok();
|
self.import_prev_chunks(&mut res, manifest).ok();
|
||||||
|
|
||||||
*self.status.lock() = RestorationStatus::Ongoing {
|
// It could be that the restoration failed or completed in the meanwhile
|
||||||
state_chunks: state_chunks as u32,
|
let mut restoration_status = self.status.lock();
|
||||||
block_chunks: block_chunks as u32,
|
if let RestorationStatus::Initializing { .. } = *restoration_status {
|
||||||
state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32,
|
*restoration_status = RestorationStatus::Ongoing {
|
||||||
block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32,
|
state_chunks: state_chunks as u32,
|
||||||
};
|
block_chunks: block_chunks as u32,
|
||||||
|
state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32,
|
||||||
|
block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -752,26 +859,19 @@ impl Drop for Service {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::sync::Arc;
|
|
||||||
use client::ClientIoMessage;
|
use client::ClientIoMessage;
|
||||||
use io::{IoService};
|
use io::{IoService};
|
||||||
use spec::Spec;
|
use spec::Spec;
|
||||||
use journaldb::Algorithm;
|
use journaldb::Algorithm;
|
||||||
use error::Error;
|
|
||||||
use snapshot::{ManifestData, RestorationStatus, SnapshotService};
|
use snapshot::{ManifestData, RestorationStatus, SnapshotService};
|
||||||
use super::*;
|
use super::*;
|
||||||
use tempdir::TempDir;
|
use tempdir::TempDir;
|
||||||
use test_helpers::restoration_db_handler;
|
use test_helpers::{generate_dummy_client_with_spec_and_data, restoration_db_handler};
|
||||||
|
|
||||||
struct NoopDBRestore;
|
|
||||||
impl DatabaseRestore for NoopDBRestore {
|
|
||||||
fn restore_db(&self, _new_db: &str) -> Result<(), Error> {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn sends_async_messages() {
|
fn sends_async_messages() {
|
||||||
|
let gas_prices = vec![1.into(), 2.into(), 3.into(), 999.into()];
|
||||||
|
let client = generate_dummy_client_with_spec_and_data(Spec::new_null, 400, 5, &gas_prices);
|
||||||
let service = IoService::<ClientIoMessage>::start().unwrap();
|
let service = IoService::<ClientIoMessage>::start().unwrap();
|
||||||
let spec = Spec::new_test();
|
let spec = Spec::new_test();
|
||||||
|
|
||||||
@ -785,7 +885,7 @@ mod tests {
|
|||||||
pruning: Algorithm::Archive,
|
pruning: Algorithm::Archive,
|
||||||
channel: service.channel(),
|
channel: service.channel(),
|
||||||
snapshot_root: dir,
|
snapshot_root: dir,
|
||||||
db_restore: Arc::new(NoopDBRestore),
|
client: client,
|
||||||
};
|
};
|
||||||
|
|
||||||
let service = Service::new(snapshot_params).unwrap();
|
let service = Service::new(snapshot_params).unwrap();
|
||||||
|
@ -16,26 +16,23 @@
|
|||||||
|
|
||||||
//! Tests for the snapshot service.
|
//! Tests for the snapshot service.
|
||||||
|
|
||||||
|
use std::fs;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use tempdir::TempDir;
|
use tempdir::TempDir;
|
||||||
use client::{Client, BlockInfo};
|
use blockchain::BlockProvider;
|
||||||
|
use client::{Client, ClientConfig, ImportBlock, BlockInfo};
|
||||||
use ids::BlockId;
|
use ids::BlockId;
|
||||||
|
use snapshot::io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter};
|
||||||
use snapshot::service::{Service, ServiceParams};
|
use snapshot::service::{Service, ServiceParams};
|
||||||
use snapshot::{self, ManifestData, SnapshotService};
|
use snapshot::{chunk_state, chunk_secondary, ManifestData, Progress, SnapshotService, RestorationStatus};
|
||||||
use spec::Spec;
|
use spec::Spec;
|
||||||
use test_helpers::{generate_dummy_client_with_spec_and_data, restoration_db_handler};
|
use test_helpers::{new_db, new_temp_db, generate_dummy_client_with_spec_and_data, restoration_db_handler};
|
||||||
|
|
||||||
|
use parking_lot::Mutex;
|
||||||
use io::IoChannel;
|
use io::IoChannel;
|
||||||
use kvdb_rocksdb::DatabaseConfig;
|
use kvdb_rocksdb::DatabaseConfig;
|
||||||
|
use verification::queue::kind::blocks::Unverified;
|
||||||
struct NoopDBRestore;
|
|
||||||
|
|
||||||
impl snapshot::DatabaseRestore for NoopDBRestore {
|
|
||||||
fn restore_db(&self, _new_db: &str) -> Result<(), ::error::Error> {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn restored_is_equivalent() {
|
fn restored_is_equivalent() {
|
||||||
@ -46,7 +43,6 @@ fn restored_is_equivalent() {
|
|||||||
const TX_PER: usize = 5;
|
const TX_PER: usize = 5;
|
||||||
|
|
||||||
let gas_prices = vec![1.into(), 2.into(), 3.into(), 999.into()];
|
let gas_prices = vec![1.into(), 2.into(), 3.into(), 999.into()];
|
||||||
|
|
||||||
let client = generate_dummy_client_with_spec_and_data(Spec::new_null, NUM_BLOCKS, TX_PER, &gas_prices);
|
let client = generate_dummy_client_with_spec_and_data(Spec::new_null, NUM_BLOCKS, TX_PER, &gas_prices);
|
||||||
|
|
||||||
let tempdir = TempDir::new("").unwrap();
|
let tempdir = TempDir::new("").unwrap();
|
||||||
@ -73,7 +69,7 @@ fn restored_is_equivalent() {
|
|||||||
pruning: ::journaldb::Algorithm::Archive,
|
pruning: ::journaldb::Algorithm::Archive,
|
||||||
channel: IoChannel::disconnected(),
|
channel: IoChannel::disconnected(),
|
||||||
snapshot_root: path,
|
snapshot_root: path,
|
||||||
db_restore: client2.clone(),
|
client: client2.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let service = Service::new(service_params).unwrap();
|
let service = Service::new(service_params).unwrap();
|
||||||
@ -94,7 +90,7 @@ fn restored_is_equivalent() {
|
|||||||
service.feed_block_chunk(hash, &chunk);
|
service.feed_block_chunk(hash, &chunk);
|
||||||
}
|
}
|
||||||
|
|
||||||
assert_eq!(service.status(), ::snapshot::RestorationStatus::Inactive);
|
assert_eq!(service.status(), RestorationStatus::Inactive);
|
||||||
|
|
||||||
for x in 0..NUM_BLOCKS {
|
for x in 0..NUM_BLOCKS {
|
||||||
let block1 = client.block(BlockId::Number(x as u64)).unwrap();
|
let block1 = client.block(BlockId::Number(x as u64)).unwrap();
|
||||||
@ -111,6 +107,9 @@ fn restored_is_equivalent() {
|
|||||||
#[cfg(not(target_os = "windows"))]
|
#[cfg(not(target_os = "windows"))]
|
||||||
#[test]
|
#[test]
|
||||||
fn guards_delete_folders() {
|
fn guards_delete_folders() {
|
||||||
|
let gas_prices = vec![1.into(), 2.into(), 3.into(), 999.into()];
|
||||||
|
let client = generate_dummy_client_with_spec_and_data(Spec::new_null, 400, 5, &gas_prices);
|
||||||
|
|
||||||
let spec = Spec::new_null();
|
let spec = Spec::new_null();
|
||||||
let tempdir = TempDir::new("").unwrap();
|
let tempdir = TempDir::new("").unwrap();
|
||||||
let service_params = ServiceParams {
|
let service_params = ServiceParams {
|
||||||
@ -120,7 +119,7 @@ fn guards_delete_folders() {
|
|||||||
pruning: ::journaldb::Algorithm::Archive,
|
pruning: ::journaldb::Algorithm::Archive,
|
||||||
channel: IoChannel::disconnected(),
|
channel: IoChannel::disconnected(),
|
||||||
snapshot_root: tempdir.path().to_owned(),
|
snapshot_root: tempdir.path().to_owned(),
|
||||||
db_restore: Arc::new(NoopDBRestore),
|
client: client,
|
||||||
};
|
};
|
||||||
|
|
||||||
let service = Service::new(service_params).unwrap();
|
let service = Service::new(service_params).unwrap();
|
||||||
@ -151,3 +150,201 @@ fn guards_delete_folders() {
|
|||||||
assert!(!path.join("db").exists());
|
assert!(!path.join("db").exists());
|
||||||
assert!(path.join("temp").exists());
|
assert!(path.join("temp").exists());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn keep_ancient_blocks() {
|
||||||
|
::env_logger::init().ok();
|
||||||
|
|
||||||
|
// Test variables
|
||||||
|
const NUM_BLOCKS: u64 = 500;
|
||||||
|
const NUM_SNAPSHOT_BLOCKS: u64 = 300;
|
||||||
|
const SNAPSHOT_MODE: ::snapshot::PowSnapshot = ::snapshot::PowSnapshot { blocks: NUM_SNAPSHOT_BLOCKS, max_restore_blocks: NUM_SNAPSHOT_BLOCKS };
|
||||||
|
|
||||||
|
// Temporary folders
|
||||||
|
let tempdir = TempDir::new("").unwrap();
|
||||||
|
let snapshot_path = tempdir.path().join("SNAP");
|
||||||
|
|
||||||
|
// Generate blocks
|
||||||
|
let gas_prices = vec![1.into(), 2.into(), 3.into(), 999.into()];
|
||||||
|
let spec_f = Spec::new_null;
|
||||||
|
let spec = spec_f();
|
||||||
|
let client = generate_dummy_client_with_spec_and_data(spec_f, NUM_BLOCKS as u32, 5, &gas_prices);
|
||||||
|
|
||||||
|
let bc = client.chain();
|
||||||
|
|
||||||
|
// Create the Snapshot
|
||||||
|
let best_hash = bc.best_block_hash();
|
||||||
|
let writer = Mutex::new(PackedWriter::new(&snapshot_path).unwrap());
|
||||||
|
let block_hashes = chunk_secondary(
|
||||||
|
Box::new(SNAPSHOT_MODE),
|
||||||
|
&bc,
|
||||||
|
best_hash,
|
||||||
|
&writer,
|
||||||
|
&Progress::default()
|
||||||
|
).unwrap();
|
||||||
|
let state_db = client.state_db().journal_db().boxed_clone();
|
||||||
|
let start_header = bc.block_header_data(&best_hash).unwrap();
|
||||||
|
let state_root = start_header.state_root();
|
||||||
|
let state_hashes = chunk_state(
|
||||||
|
state_db.as_hashdb(),
|
||||||
|
&state_root,
|
||||||
|
&writer,
|
||||||
|
&Progress::default(),
|
||||||
|
None
|
||||||
|
).unwrap();
|
||||||
|
|
||||||
|
let manifest = ::snapshot::ManifestData {
|
||||||
|
version: 2,
|
||||||
|
state_hashes: state_hashes,
|
||||||
|
state_root: state_root,
|
||||||
|
block_hashes: block_hashes,
|
||||||
|
block_number: NUM_BLOCKS,
|
||||||
|
block_hash: best_hash,
|
||||||
|
};
|
||||||
|
|
||||||
|
writer.into_inner().finish(manifest.clone()).unwrap();
|
||||||
|
|
||||||
|
// Initialize the Client
|
||||||
|
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
|
||||||
|
let client_db = new_temp_db(&tempdir.path());
|
||||||
|
let client2 = Client::new(
|
||||||
|
ClientConfig::default(),
|
||||||
|
&spec,
|
||||||
|
client_db,
|
||||||
|
Arc::new(::miner::Miner::new_for_tests(&spec, None)),
|
||||||
|
IoChannel::disconnected(),
|
||||||
|
).unwrap();
|
||||||
|
|
||||||
|
// Add some ancient blocks
|
||||||
|
for block_number in 1..50 {
|
||||||
|
let block_hash = bc.block_hash(block_number).unwrap();
|
||||||
|
let block = bc.block(&block_hash).unwrap();
|
||||||
|
client2.import_block(Unverified::from_rlp(block.into_inner()).unwrap()).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
client2.import_verified_blocks();
|
||||||
|
client2.flush_queue();
|
||||||
|
|
||||||
|
// Restore the Snapshot
|
||||||
|
let reader = PackedReader::new(&snapshot_path).unwrap().unwrap();
|
||||||
|
let service_params = ServiceParams {
|
||||||
|
engine: spec.engine.clone(),
|
||||||
|
genesis_block: spec.genesis_block(),
|
||||||
|
restoration_db_handler: restoration_db_handler(db_config),
|
||||||
|
pruning: ::journaldb::Algorithm::Archive,
|
||||||
|
channel: IoChannel::disconnected(),
|
||||||
|
snapshot_root: tempdir.path().to_owned(),
|
||||||
|
client: client2.clone(),
|
||||||
|
};
|
||||||
|
let service = Service::new(service_params).unwrap();
|
||||||
|
service.init_restore(manifest.clone(), false).unwrap();
|
||||||
|
|
||||||
|
for hash in &manifest.block_hashes {
|
||||||
|
let chunk = reader.chunk(*hash).unwrap();
|
||||||
|
service.feed_block_chunk(*hash, &chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
for hash in &manifest.state_hashes {
|
||||||
|
let chunk = reader.chunk(*hash).unwrap();
|
||||||
|
service.feed_state_chunk(*hash, &chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
match service.status() {
|
||||||
|
RestorationStatus::Inactive => (),
|
||||||
|
RestorationStatus::Failed => panic!("Snapshot Restoration has failed."),
|
||||||
|
RestorationStatus::Ongoing { .. } => panic!("Snapshot Restoration should be done."),
|
||||||
|
_ => panic!("Invalid Snapshot Service status."),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the latest block number is the right one
|
||||||
|
assert_eq!(client2.block(BlockId::Latest).unwrap().number(), NUM_BLOCKS as u64);
|
||||||
|
|
||||||
|
// Check that we have blocks in [NUM_BLOCKS - NUM_SNAPSHOT_BLOCKS + 1 ; NUM_BLOCKS]
|
||||||
|
// but none before
|
||||||
|
assert!(client2.block(BlockId::Number(NUM_BLOCKS - NUM_SNAPSHOT_BLOCKS + 1)).is_some());
|
||||||
|
assert!(client2.block(BlockId::Number(100)).is_none());
|
||||||
|
|
||||||
|
// Check that the first 50 blocks have been migrated
|
||||||
|
for block_number in 1..49 {
|
||||||
|
assert!(client2.block(BlockId::Number(block_number)).is_some());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn recover_aborted_recovery() {
|
||||||
|
::env_logger::init().ok();
|
||||||
|
|
||||||
|
const NUM_BLOCKS: u32 = 400;
|
||||||
|
let gas_prices = vec![1.into(), 2.into(), 3.into(), 999.into()];
|
||||||
|
let client = generate_dummy_client_with_spec_and_data(Spec::new_null, NUM_BLOCKS, 5, &gas_prices);
|
||||||
|
|
||||||
|
let spec = Spec::new_null();
|
||||||
|
let tempdir = TempDir::new("").unwrap();
|
||||||
|
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
|
||||||
|
let client_db = new_db();
|
||||||
|
let client2 = Client::new(
|
||||||
|
Default::default(),
|
||||||
|
&spec,
|
||||||
|
client_db,
|
||||||
|
Arc::new(::miner::Miner::new_for_tests(&spec, None)),
|
||||||
|
IoChannel::disconnected(),
|
||||||
|
).unwrap();
|
||||||
|
let service_params = ServiceParams {
|
||||||
|
engine: spec.engine.clone(),
|
||||||
|
genesis_block: spec.genesis_block(),
|
||||||
|
restoration_db_handler: restoration_db_handler(db_config),
|
||||||
|
pruning: ::journaldb::Algorithm::Archive,
|
||||||
|
channel: IoChannel::disconnected(),
|
||||||
|
snapshot_root: tempdir.path().to_owned(),
|
||||||
|
client: client2.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let service = Service::new(service_params).unwrap();
|
||||||
|
service.take_snapshot(&client, NUM_BLOCKS as u64).unwrap();
|
||||||
|
|
||||||
|
let manifest = service.manifest().unwrap();
|
||||||
|
service.init_restore(manifest.clone(), true).unwrap();
|
||||||
|
|
||||||
|
// Restore only the state chunks
|
||||||
|
for hash in &manifest.state_hashes {
|
||||||
|
let chunk = service.chunk(*hash).unwrap();
|
||||||
|
service.feed_state_chunk(*hash, &chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
match service.status() {
|
||||||
|
RestorationStatus::Ongoing { block_chunks_done, state_chunks_done, .. } => {
|
||||||
|
assert_eq!(state_chunks_done, manifest.state_hashes.len() as u32);
|
||||||
|
assert_eq!(block_chunks_done, 0);
|
||||||
|
},
|
||||||
|
e => panic!("Snapshot restoration must be ongoing ; {:?}", e),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Abort the restore...
|
||||||
|
service.abort_restore();
|
||||||
|
|
||||||
|
// And try again!
|
||||||
|
service.init_restore(manifest.clone(), true).unwrap();
|
||||||
|
|
||||||
|
match service.status() {
|
||||||
|
RestorationStatus::Ongoing { block_chunks_done, state_chunks_done, .. } => {
|
||||||
|
assert_eq!(state_chunks_done, manifest.state_hashes.len() as u32);
|
||||||
|
assert_eq!(block_chunks_done, 0);
|
||||||
|
},
|
||||||
|
e => panic!("Snapshot restoration must be ongoing ; {:?}", e),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove the snapshot directory, and restart the restoration
|
||||||
|
// It shouldn't have restored any previous blocks
|
||||||
|
fs::remove_dir_all(tempdir.path()).unwrap();
|
||||||
|
|
||||||
|
// And try again!
|
||||||
|
service.init_restore(manifest.clone(), true).unwrap();
|
||||||
|
|
||||||
|
match service.status() {
|
||||||
|
RestorationStatus::Ongoing { block_chunks_done, state_chunks_done, .. } => {
|
||||||
|
assert_eq!(block_chunks_done, 0);
|
||||||
|
assert_eq!(state_chunks_done, 0);
|
||||||
|
},
|
||||||
|
_ => panic!("Snapshot restoration must be ongoing"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -41,7 +41,7 @@ use transaction::{Action, Transaction, SignedTransaction};
|
|||||||
use views::BlockView;
|
use views::BlockView;
|
||||||
use blooms_db;
|
use blooms_db;
|
||||||
use kvdb::KeyValueDB;
|
use kvdb::KeyValueDB;
|
||||||
use kvdb_rocksdb;
|
use kvdb_rocksdb::{self, Database, DatabaseConfig};
|
||||||
use tempdir::TempDir;
|
use tempdir::TempDir;
|
||||||
use verification::queue::kind::blocks::Unverified;
|
use verification::queue::kind::blocks::Unverified;
|
||||||
use encoded;
|
use encoded;
|
||||||
@ -263,30 +263,30 @@ pub fn get_test_client_with_blocks(blocks: Vec<Bytes>) -> Arc<Client> {
|
|||||||
client
|
client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct TestBlockChainDB {
|
||||||
|
_blooms_dir: TempDir,
|
||||||
|
_trace_blooms_dir: TempDir,
|
||||||
|
blooms: blooms_db::Database,
|
||||||
|
trace_blooms: blooms_db::Database,
|
||||||
|
key_value: Arc<KeyValueDB>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BlockChainDB for TestBlockChainDB {
|
||||||
|
fn key_value(&self) -> &Arc<KeyValueDB> {
|
||||||
|
&self.key_value
|
||||||
|
}
|
||||||
|
|
||||||
|
fn blooms(&self) -> &blooms_db::Database {
|
||||||
|
&self.blooms
|
||||||
|
}
|
||||||
|
|
||||||
|
fn trace_blooms(&self) -> &blooms_db::Database {
|
||||||
|
&self.trace_blooms
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Creates new test instance of `BlockChainDB`
|
/// Creates new test instance of `BlockChainDB`
|
||||||
pub fn new_db() -> Arc<BlockChainDB> {
|
pub fn new_db() -> Arc<BlockChainDB> {
|
||||||
struct TestBlockChainDB {
|
|
||||||
_blooms_dir: TempDir,
|
|
||||||
_trace_blooms_dir: TempDir,
|
|
||||||
blooms: blooms_db::Database,
|
|
||||||
trace_blooms: blooms_db::Database,
|
|
||||||
key_value: Arc<KeyValueDB>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BlockChainDB for TestBlockChainDB {
|
|
||||||
fn key_value(&self) -> &Arc<KeyValueDB> {
|
|
||||||
&self.key_value
|
|
||||||
}
|
|
||||||
|
|
||||||
fn blooms(&self) -> &blooms_db::Database {
|
|
||||||
&self.blooms
|
|
||||||
}
|
|
||||||
|
|
||||||
fn trace_blooms(&self) -> &blooms_db::Database {
|
|
||||||
&self.trace_blooms
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let blooms_dir = TempDir::new("").unwrap();
|
let blooms_dir = TempDir::new("").unwrap();
|
||||||
let trace_blooms_dir = TempDir::new("").unwrap();
|
let trace_blooms_dir = TempDir::new("").unwrap();
|
||||||
|
|
||||||
@ -301,6 +301,26 @@ pub fn new_db() -> Arc<BlockChainDB> {
|
|||||||
Arc::new(db)
|
Arc::new(db)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Creates a new temporary `BlockChainDB` on FS
|
||||||
|
pub fn new_temp_db(tempdir: &Path) -> Arc<BlockChainDB> {
|
||||||
|
let blooms_dir = TempDir::new("").unwrap();
|
||||||
|
let trace_blooms_dir = TempDir::new("").unwrap();
|
||||||
|
let key_value_dir = tempdir.join("key_value");
|
||||||
|
|
||||||
|
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
|
||||||
|
let key_value_db = Database::open(&db_config, key_value_dir.to_str().unwrap()).unwrap();
|
||||||
|
|
||||||
|
let db = TestBlockChainDB {
|
||||||
|
blooms: blooms_db::Database::open(blooms_dir.path()).unwrap(),
|
||||||
|
trace_blooms: blooms_db::Database::open(trace_blooms_dir.path()).unwrap(),
|
||||||
|
_blooms_dir: blooms_dir,
|
||||||
|
_trace_blooms_dir: trace_blooms_dir,
|
||||||
|
key_value: Arc::new(key_value_db)
|
||||||
|
};
|
||||||
|
|
||||||
|
Arc::new(db)
|
||||||
|
}
|
||||||
|
|
||||||
/// Creates new instance of KeyValueDBHandler
|
/// Creates new instance of KeyValueDBHandler
|
||||||
pub fn restoration_db_handler(config: kvdb_rocksdb::DatabaseConfig) -> Box<BlockChainDBHandler> {
|
pub fn restoration_db_handler(config: kvdb_rocksdb::DatabaseConfig) -> Box<BlockChainDBHandler> {
|
||||||
struct RestorationDBHandler {
|
struct RestorationDBHandler {
|
||||||
|
@ -970,6 +970,12 @@ impl ChainSync {
|
|||||||
self.state = SyncState::Blocks;
|
self.state = SyncState::Blocks;
|
||||||
self.continue_sync(io);
|
self.continue_sync(io);
|
||||||
},
|
},
|
||||||
|
SyncState::SnapshotData => match io.snapshot_service().status() {
|
||||||
|
RestorationStatus::Inactive | RestorationStatus::Failed => {
|
||||||
|
self.state = SyncState::SnapshotWaiting;
|
||||||
|
},
|
||||||
|
RestorationStatus::Initializing { .. } | RestorationStatus::Ongoing { .. } => (),
|
||||||
|
},
|
||||||
SyncState::SnapshotWaiting => {
|
SyncState::SnapshotWaiting => {
|
||||||
match io.snapshot_service().status() {
|
match io.snapshot_service().status() {
|
||||||
RestorationStatus::Inactive => {
|
RestorationStatus::Inactive => {
|
||||||
|
@ -226,7 +226,8 @@ impl SyncSupplier {
|
|||||||
let mut added_receipts = 0usize;
|
let mut added_receipts = 0usize;
|
||||||
let mut data = Bytes::new();
|
let mut data = Bytes::new();
|
||||||
for i in 0..count {
|
for i in 0..count {
|
||||||
if let Some(mut receipts_bytes) = io.chain().encoded_block_receipts(&rlp.val_at::<H256>(i)?) {
|
if let Some(receipts) = io.chain().block_receipts(&rlp.val_at::<H256>(i)?) {
|
||||||
|
let mut receipts_bytes = ::rlp::encode(&receipts);
|
||||||
data.append(&mut receipts_bytes);
|
data.append(&mut receipts_bytes);
|
||||||
added_receipts += receipts_bytes.len();
|
added_receipts += receipts_bytes.len();
|
||||||
added_headers += 1;
|
added_headers += 1;
|
||||||
|
@ -440,7 +440,7 @@ impl<C, M, U, S> Parity for ParityClient<C, M, U> where
|
|||||||
BlockNumber::Earliest => BlockId::Earliest,
|
BlockNumber::Earliest => BlockId::Earliest,
|
||||||
BlockNumber::Latest => BlockId::Latest,
|
BlockNumber::Latest => BlockId::Latest,
|
||||||
};
|
};
|
||||||
let receipts = try_bf!(self.client.block_receipts(id).ok_or_else(errors::unknown_block));
|
let receipts = try_bf!(self.client.localized_block_receipts(id).ok_or_else(errors::unknown_block));
|
||||||
Box::new(future::ok(receipts.into_iter().map(Into::into).collect()))
|
Box::new(future::ok(receipts.into_iter().map(Into::into).collect()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user