From 9475a2e474d9175abf05b30167bc543bd25a4e3c Mon Sep 17 00:00:00 2001 From: Nicolas Gotchac Date: Sun, 18 Nov 2018 00:06:34 +0100 Subject: [PATCH] Keep existing blocks when restoring a Snapshot (#8643) * Rename db_restore => client * First step: make it compile! * Second step: working implementation! * Refactoring * Fix tests * PR Grumbles * PR Grumbles WIP * Migrate ancient blocks interating backward * Early return in block migration if snapshot is aborted * Remove RwLock getter (PR Grumble I) * Remove dependency on `Client`: only used Traits * Add test for recovering aborted snapshot recovery * Add test for migrating old blocks * Fix build * PR Grumble I * PR Grumble II * PR Grumble III * PR Grumble IV * PR Grumble V * PR Grumble VI * Fix one test * Fix test * PR Grumble * PR Grumbles * PR Grumbles II * Fix tests * Release RwLock earlier * Revert Cargo.lock * Update _update ancient block_ logic: set local in `commit` * Update typo in ethcore/src/snapshot/service.rs Co-Authored-By: ngotchac --- Cargo.lock | 11 ++ ethcore/Cargo.toml | 1 + ethcore/light/src/provider.rs | 4 +- ethcore/service/src/service.rs | 2 +- ethcore/src/blockchain/blockchain.rs | 99 +++++++++-- ethcore/src/client/client.rs | 24 ++- ethcore/src/client/test_client.rs | 8 +- ethcore/src/client/traits.rs | 8 +- ethcore/src/lib.rs | 3 + ethcore/src/snapshot/error.rs | 3 + ethcore/src/snapshot/mod.rs | 2 +- ethcore/src/snapshot/service.rs | 146 ++++++++++++++--- ethcore/src/snapshot/tests/service.rs | 227 ++++++++++++++++++++++++-- ethcore/src/test_helpers.rs | 66 +++++--- ethcore/sync/src/chain/mod.rs | 6 + ethcore/sync/src/chain/supplier.rs | 3 +- rpc/src/v1/impls/parity.rs | 2 +- 17 files changed, 521 insertions(+), 94 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6db83916b..2cb062ce5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -527,6 +527,15 @@ dependencies = [ "heapsize 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "env_logger" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "regex 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "env_logger" version = "0.5.13" @@ -627,6 +636,7 @@ dependencies = [ "common-types 0.1.0", "criterion 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethabi 6.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethabi-contract 6.0.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4244,6 +4254,7 @@ dependencies = [ "checksum edit-distance 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3bd26878c3d921f89797a4e1a1711919f999a9f6946bb6f5a4ffda126d297b7e" "checksum either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3be565ca5c557d7f59e7cfcf1844f9e3033650c929c6566f511e8005f205c1d0" "checksum elastic-array 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "88d4851b005ef16de812ea9acdb7bece2f0a40dd86c07b85631d7dafa54537bb" +"checksum env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3ddf21e73e016298f5cb37d6ef8e8da8e39f91f9ec8b0df44b7deb16a9f8cd5b" "checksum env_logger 0.5.13 (registry+https://github.com/rust-lang/crates.io-index)" = "15b0a4d2e39f8420210be8b27eeda28029729e2fd4291019455016c348240c38" "checksum error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "07e791d3be96241c77c43846b665ef1384606da2cd2a48730abe606a12906e02" "checksum eth-secp256k1 0.5.7 (git+https://github.com/paritytech/rust-secp256k1)" = "" diff --git a/ethcore/Cargo.toml b/ethcore/Cargo.toml index aeae2f76c..0e3ea23a7 100644 --- a/ethcore/Cargo.toml +++ b/ethcore/Cargo.toml @@ -76,6 +76,7 @@ hardware-wallet = { path = "../hw" } fake-hardware-wallet = { path = "../util/fake-hardware-wallet" } [dev-dependencies] +env_logger = "0.4" tempdir = "0.3" trie-standardmap = "0.1" criterion = "0.2" diff --git a/ethcore/light/src/provider.rs b/ethcore/light/src/provider.rs index 723446206..bfd15dadc 100644 --- a/ethcore/light/src/provider.rs +++ b/ethcore/light/src/provider.rs @@ -176,8 +176,8 @@ impl Provider for T { } fn block_receipts(&self, req: request::CompleteReceiptsRequest) -> Option { - BlockChainClient::encoded_block_receipts(self, &req.hash) - .map(|x| ::request::ReceiptsResponse { receipts: ::rlp::decode_list(&x) }) + BlockChainClient::block_receipts(self, &req.hash) + .map(|x| ::request::ReceiptsResponse { receipts: x.receipts }) } fn account_proof(&self, req: request::CompleteAccountRequest) -> Option { diff --git a/ethcore/service/src/service.rs b/ethcore/service/src/service.rs index a6bbc4e10..1763b8fd5 100644 --- a/ethcore/service/src/service.rs +++ b/ethcore/service/src/service.rs @@ -117,7 +117,7 @@ impl ClientService { pruning: pruning, channel: io_service.channel(), snapshot_root: snapshot_path.into(), - db_restore: client.clone(), + client: client.clone(), }; let snapshot = Arc::new(SnapshotService::new(snapshot_params)?); diff --git a/ethcore/src/blockchain/blockchain.rs b/ethcore/src/blockchain/blockchain.rs index 5785aa103..e5af64208 100644 --- a/ethcore/src/blockchain/blockchain.rs +++ b/ethcore/src/blockchain/blockchain.rs @@ -229,6 +229,7 @@ pub struct BlockChain { cache_man: Mutex>, + pending_best_ancient_block: RwLock>>, pending_best_block: RwLock>, pending_block_hashes: RwLock>, pending_block_details: RwLock>, @@ -538,6 +539,7 @@ impl BlockChain { block_receipts: RwLock::new(HashMap::new()), db: db.clone(), cache_man: Mutex::new(cache_man), + pending_best_ancient_block: RwLock::new(None), pending_best_block: RwLock::new(None), pending_block_hashes: RwLock::new(HashMap::new()), pending_block_details: RwLock::new(HashMap::new()), @@ -808,18 +810,7 @@ impl BlockChain { }, is_best); if is_ancient { - let mut best_ancient_block = self.best_ancient_block.write(); - let ancient_number = best_ancient_block.as_ref().map_or(0, |b| b.number); - if self.block_hash(block_number + 1).is_some() { - batch.delete(db::COL_EXTRA, b"ancient"); - *best_ancient_block = None; - } else if block_number > ancient_number { - batch.put(db::COL_EXTRA, b"ancient", &hash); - *best_ancient_block = Some(BestAncientBlock { - hash: hash, - number: block_number, - }); - } + self.set_best_ancient_block(block_number, &hash, batch); } false @@ -860,6 +851,84 @@ impl BlockChain { } } + /// Update the best ancient block to the given hash, after checking that + /// it's directly linked to the currently known best ancient block + pub fn update_best_ancient_block(&self, hash: &H256) { + // Get the block view of the next ancient block (it must + // be in DB at this point) + let block_view = match self.block(hash) { + Some(v) => v, + None => return, + }; + + // So that `best_ancient_block` gets unlocked before calling + // `set_best_ancient_block` + { + // Get the target hash ; if there are no ancient block, + // it means that the chain is already fully linked + // Release the `best_ancient_block` RwLock + let target_hash = { + let best_ancient_block = self.best_ancient_block.read(); + let cur_ancient_block = match *best_ancient_block { + Some(ref b) => b, + None => return, + }; + + // Ensure that the new best ancient block is after the current one + if block_view.number() <= cur_ancient_block.number { + return; + } + + cur_ancient_block.hash.clone() + }; + + let mut block_hash = *hash; + let mut is_linked = false; + + loop { + if block_hash == target_hash { + is_linked = true; + break; + } + + match self.block_details(&block_hash) { + Some(block_details) => { + block_hash = block_details.parent; + }, + None => break, + } + } + + if !is_linked { + trace!(target: "blockchain", "The given block {:x} is not linked to the known ancient block {:x}", hash, target_hash); + return; + } + } + + let mut batch = self.db.key_value().transaction(); + self.set_best_ancient_block(block_view.number(), hash, &mut batch); + self.db.key_value().write(batch).expect("Low level database error."); + } + + /// Set the best ancient block with the given value: private method + /// `best_ancient_block` must not be locked, otherwise a DeadLock would occur + fn set_best_ancient_block(&self, block_number: BlockNumber, block_hash: &H256, batch: &mut DBTransaction) { + let mut pending_best_ancient_block = self.pending_best_ancient_block.write(); + let ancient_number = self.best_ancient_block.read().as_ref().map_or(0, |b| b.number); + if self.block_hash(block_number + 1).is_some() { + trace!(target: "blockchain", "The two ends of the chain have met."); + batch.delete(db::COL_EXTRA, b"ancient"); + *pending_best_ancient_block = Some(None); + } else if block_number > ancient_number { + trace!(target: "blockchain", "Updating the best ancient block to {}.", block_number); + batch.put(db::COL_EXTRA, b"ancient", &block_hash); + *pending_best_ancient_block = Some(Some(BestAncientBlock { + hash: *block_hash, + number: block_number, + })); + } + } + /// Insert an epoch transition. Provide an epoch number being transitioned to /// and epoch transition object. /// @@ -1112,15 +1181,21 @@ impl BlockChain { /// Apply pending insertion updates pub fn commit(&self) { + let mut pending_best_ancient_block = self.pending_best_ancient_block.write(); let mut pending_best_block = self.pending_best_block.write(); let mut pending_write_hashes = self.pending_block_hashes.write(); let mut pending_block_details = self.pending_block_details.write(); let mut pending_write_txs = self.pending_transaction_addresses.write(); + let mut best_ancient_block = self.best_ancient_block.write(); let mut best_block = self.best_block.write(); let mut write_block_details = self.block_details.write(); let mut write_hashes = self.block_hashes.write(); let mut write_txs = self.transaction_addresses.write(); + // update best ancient block + if let Some(block_option) = pending_best_ancient_block.take() { + *best_ancient_block = block_option; + } // update best block if let Some(block) = pending_best_block.take() { *best_block = block; diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index ff09f7c72..8412881a2 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -32,7 +32,7 @@ use kvdb::{DBValue, KeyValueDB, DBTransaction}; // other use ethereum_types::{H256, Address, U256}; use block::{IsBlock, LockedBlock, Drain, ClosedBlock, OpenBlock, enact_verified, SealedBlock}; -use blockchain::{BlockChain, BlockChainDB, BlockProvider, TreeRoute, ImportRoute, TransactionAddress, ExtrasInsert}; +use blockchain::{BlockReceipts, BlockChain, BlockChainDB, BlockProvider, TreeRoute, ImportRoute, TransactionAddress, ExtrasInsert}; use client::ancient_import::AncientVerifier; use client::{ Nonce, Balance, ChainInfo, BlockInfo, CallContract, TransactionInfo, @@ -66,7 +66,7 @@ use ethcore_miner::pool::VerifiedTransaction; use parking_lot::{Mutex, RwLock}; use rand::OsRng; use receipt::{Receipt, LocalizedReceipt}; -use snapshot::{self, io as snapshot_io}; +use snapshot::{self, io as snapshot_io, SnapshotClient}; use spec::Spec; use state_db::StateDB; use state::{self, State}; @@ -1005,6 +1005,16 @@ impl Client { self.importer.miner.clone() } + #[cfg(test)] + pub fn state_db(&self) -> ::parking_lot::RwLockReadGuard { + self.state_db.read() + } + + #[cfg(test)] + pub fn chain(&self) -> Arc { + self.chain.read().clone() + } + /// Replace io channel. Useful for testing. pub fn set_io_channel(&self, io_channel: IoChannel) { *self.io_channel.write() = io_channel; @@ -1817,7 +1827,7 @@ impl BlockChainClient for Client { Some(receipt) } - fn block_receipts(&self, id: BlockId) -> Option> { + fn localized_block_receipts(&self, id: BlockId) -> Option> { let hash = self.block_hash(id)?; let chain = self.chain.read(); @@ -1860,8 +1870,8 @@ impl BlockChainClient for Client { self.state_db.read().journal_db().state(hash) } - fn encoded_block_receipts(&self, hash: &H256) -> Option { - self.chain.read().block_receipts(hash).map(|receipts| ::rlp::encode(&receipts)) + fn block_receipts(&self, hash: &H256) -> Option { + self.chain.read().block_receipts(hash) } fn queue_info(&self) -> BlockQueueInfo { @@ -2406,6 +2416,8 @@ impl ProvingBlockChainClient for Client { } } +impl SnapshotClient for Client {} + impl Drop for Client { fn drop(&mut self) { self.engine.stop(); @@ -2504,7 +2516,7 @@ mod tests { use test_helpers::{generate_dummy_client_with_data}; let client = generate_dummy_client_with_data(2, 2, &[1.into(), 1.into()]); - let receipts = client.block_receipts(BlockId::Latest).unwrap(); + let receipts = client.localized_block_receipts(BlockId::Latest).unwrap(); assert_eq!(receipts.len(), 2); assert_eq!(receipts[0].transaction_index, 0); diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index c3d8c127b..3c422c49a 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -686,7 +686,7 @@ impl BlockChainClient for TestBlockChainClient { self.receipts.read().get(&id).cloned() } - fn block_receipts(&self, _id: BlockId) -> Option> { + fn localized_block_receipts(&self, _id: BlockId) -> Option> { Some(self.receipts.read().values().cloned().collect()) } @@ -789,16 +789,14 @@ impl BlockChainClient for TestBlockChainClient { None } - fn encoded_block_receipts(&self, hash: &H256) -> Option { + fn block_receipts(&self, hash: &H256) -> Option { // starts with 'f' ? if *hash > H256::from("f000000000000000000000000000000000000000000000000000000000000000") { let receipt = BlockReceipts::new(vec![Receipt::new( TransactionOutcome::StateRoot(H256::zero()), U256::zero(), vec![])]); - let mut rlp = RlpStream::new(); - rlp.append(&receipt); - return Some(rlp.out()); + return Some(receipt); } None } diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs index ebe70dcd8..5b78a54b3 100644 --- a/ethcore/src/client/traits.rs +++ b/ethcore/src/client/traits.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use itertools::Itertools; use block::{OpenBlock, SealedBlock, ClosedBlock}; -use blockchain::TreeRoute; +use blockchain::{BlockReceipts, TreeRoute}; use client::Mode; use encoded; use vm::LastHashes; @@ -282,7 +282,7 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra fn transaction_receipt(&self, id: TransactionId) -> Option; /// Get localized receipts for all transaction in given block. - fn block_receipts(&self, id: BlockId) -> Option>; + fn localized_block_receipts(&self, id: BlockId) -> Option>; /// Get a tree route between `from` and `to`. /// See `BlockChain::tree_route`. @@ -294,8 +294,8 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra /// Get latest state node fn state_data(&self, hash: &H256) -> Option; - /// Get raw block receipts data by block header hash. - fn encoded_block_receipts(&self, hash: &H256) -> Option; + /// Get block receipts data by block header hash. + fn block_receipts(&self, hash: &H256) -> Option; /// Get block queue information. fn queue_info(&self) -> BlockQueueInfo; diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs index fc6f38cb7..9a0315220 100644 --- a/ethcore/src/lib.rs +++ b/ethcore/src/lib.rs @@ -138,6 +138,9 @@ extern crate trace_time; #[cfg_attr(test, macro_use)] extern crate evm; +#[cfg(test)] +extern crate env_logger; + pub extern crate ethstore; #[macro_use] diff --git a/ethcore/src/snapshot/error.rs b/ethcore/src/snapshot/error.rs index 527b4e288..eb1a16278 100644 --- a/ethcore/src/snapshot/error.rs +++ b/ethcore/src/snapshot/error.rs @@ -65,6 +65,8 @@ pub enum Error { BadEpochProof(u64), /// Wrong chunk format. WrongChunkFormat(String), + /// Unlinked ancient block chain + UnlinkedAncientBlockChain, } impl fmt::Display for Error { @@ -91,6 +93,7 @@ impl fmt::Display for Error { Error::SnapshotsUnsupported => write!(f, "Snapshots unsupported by consensus engine."), Error::BadEpochProof(i) => write!(f, "Bad epoch proof for transition to epoch {}", i), Error::WrongChunkFormat(ref msg) => write!(f, "Wrong chunk format: {}", msg), + Error::UnlinkedAncientBlockChain => write!(f, "Unlinked ancient blocks chain"), } } } diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index fbe6d6a16..bd7e3cb51 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -56,7 +56,7 @@ use rand::{Rng, OsRng}; pub use self::error::Error; pub use self::consensus::*; -pub use self::service::{Service, DatabaseRestore}; +pub use self::service::{SnapshotClient, Service, DatabaseRestore}; pub use self::traits::SnapshotService; pub use self::watcher::Watcher; pub use types::snapshot_manifest::ManifestData; diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index 97fc516b8..f547faab1 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -22,12 +22,13 @@ use std::fs::{self, File}; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::cmp; use super::{ManifestData, StateRebuilder, Rebuilder, RestorationStatus, SnapshotService, MAX_CHUNK_SIZE}; use super::io::{SnapshotReader, LooseReader, SnapshotWriter, LooseWriter}; use blockchain::{BlockChain, BlockChainDB, BlockChainDBHandler}; -use client::{Client, ChainInfo, ClientIoMessage}; +use client::{BlockInfo, BlockChainClient, Client, ChainInfo, ClientIoMessage}; use engines::EthEngine; use error::{Error, ErrorKind as SnapshotErrorKind}; use snapshot::{Error as SnapshotError}; @@ -40,6 +41,7 @@ use ethereum_types::H256; use parking_lot::{Mutex, RwLock, RwLockReadGuard}; use bytes::Bytes; use journaldb::Algorithm; +use kvdb::DBTransaction; use snappy; /// Helper for removing directories in case of error. @@ -203,6 +205,9 @@ impl Restoration { /// Type alias for client io channel. pub type Channel = IoChannel; +/// Trait alias for the Client Service used +pub trait SnapshotClient: BlockChainClient + BlockInfo + DatabaseRestore {} + /// Snapshot service parameters. pub struct ServiceParams { /// The consensus engine this is built on. @@ -219,7 +224,7 @@ pub struct ServiceParams { /// Usually "/snapshot" pub snapshot_root: PathBuf, /// A handle for database restoration. - pub db_restore: Arc, + pub client: Arc, } /// `SnapshotService` implementation. @@ -236,7 +241,7 @@ pub struct Service { genesis_block: Bytes, state_chunks: AtomicUsize, block_chunks: AtomicUsize, - db_restore: Arc, + client: Arc, progress: super::Progress, taking_snapshot: AtomicBool, restoring_snapshot: AtomicBool, @@ -257,7 +262,7 @@ impl Service { genesis_block: params.genesis_block, state_chunks: AtomicUsize::new(0), block_chunks: AtomicUsize::new(0), - db_restore: params.db_restore, + client: params.client, progress: Default::default(), taking_snapshot: AtomicBool::new(false), restoring_snapshot: AtomicBool::new(false), @@ -334,12 +339,110 @@ impl Service { // replace one the client's database with our own. fn replace_client_db(&self) -> Result<(), Error> { - let our_db = self.restoration_db(); + let migrated_blocks = self.migrate_blocks()?; + trace!(target: "snapshot", "Migrated {} ancient blocks", migrated_blocks); - self.db_restore.restore_db(&*our_db.to_string_lossy())?; + let rest_db = self.restoration_db(); + self.client.restore_db(&*rest_db.to_string_lossy())?; Ok(()) } + // Migrate the blocks in the current DB into the new chain + fn migrate_blocks(&self) -> Result { + // Count the number of migrated blocks + let mut count = 0; + let rest_db = self.restoration_db(); + + let cur_chain_info = self.client.chain_info(); + + let next_db = self.restoration_db_handler.open(&rest_db)?; + let next_chain = BlockChain::new(Default::default(), &[], next_db.clone()); + let next_chain_info = next_chain.chain_info(); + + // The old database looks like this: + // [genesis, best_ancient_block] ... [first_block, best_block] + // If we are fully synced neither `best_ancient_block` nor `first_block` is set, and we can assume that the whole range from [genesis, best_block] is imported. + // The new database only contains the tip of the chain ([first_block, best_block]), + // so the useful set of blocks is defined as: + // [0 ... min(new.first_block, best_ancient_block or best_block)] + let find_range = || -> Option<(H256, H256)> { + let next_available_from = next_chain_info.first_block_number?; + let cur_available_to = cur_chain_info.ancient_block_number.unwrap_or(cur_chain_info.best_block_number); + + let highest_block_num = cmp::min(next_available_from.saturating_sub(1), cur_available_to); + + if highest_block_num == 0 { + return None; + } + + trace!(target: "snapshot", "Trying to import ancient blocks until {}", highest_block_num); + + // Here we start from the highest block number and go backward to 0, + // thus starting at `highest_block_num` and targetting `0`. + let target_hash = self.client.block_hash(BlockId::Number(0))?; + let start_hash = self.client.block_hash(BlockId::Number(highest_block_num))?; + + Some((start_hash, target_hash)) + }; + + let (start_hash, target_hash) = match find_range() { + Some(x) => x, + None => return Ok(0), + }; + + let mut batch = DBTransaction::new(); + let mut parent_hash = start_hash; + while parent_hash != target_hash { + // Early return if restoration is aborted + if !self.restoring_snapshot.load(Ordering::SeqCst) { + return Ok(count); + } + + let block = self.client.block(BlockId::Hash(parent_hash)).ok_or(::snapshot::error::Error::UnlinkedAncientBlockChain)?; + parent_hash = block.parent_hash(); + + let block_number = block.number(); + let block_receipts = self.client.block_receipts(&block.hash()); + let parent_total_difficulty = self.client.block_total_difficulty(BlockId::Hash(parent_hash)); + + match (block_receipts, parent_total_difficulty) { + (Some(block_receipts), Some(parent_total_difficulty)) => { + let block_receipts = block_receipts.receipts; + + next_chain.insert_unordered_block(&mut batch, block, block_receipts, Some(parent_total_difficulty), false, true); + count += 1; + }, + _ => break, + } + + // Writting changes to DB and logging every now and then + if block_number % 1_000 == 0 { + next_db.key_value().write_buffered(batch); + next_chain.commit(); + next_db.key_value().flush().expect("DB flush failed."); + batch = DBTransaction::new(); + } + + if block_number % 10_000 == 0 { + trace!(target: "snapshot", "Block restoration at #{}", block_number); + } + } + + // Final commit to the DB + next_db.key_value().write_buffered(batch); + next_chain.commit(); + next_db.key_value().flush().expect("DB flush failed."); + + // We couldn't reach the targeted hash + if parent_hash != target_hash { + return Err(::snapshot::error::Error::UnlinkedAncientBlockChain.into()); + } + + // Update best ancient block in the Next Chain + next_chain.update_best_ancient_block(&start_hash); + Ok(count) + } + /// Get a reference to the snapshot reader. pub fn reader(&self) -> RwLockReadGuard> { self.reader.read() @@ -480,12 +583,16 @@ impl Service { // Import previous chunks, continue if it fails self.import_prev_chunks(&mut res, manifest).ok(); - *self.status.lock() = RestorationStatus::Ongoing { - state_chunks: state_chunks as u32, - block_chunks: block_chunks as u32, - state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32, - block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32, - }; + // It could be that the restoration failed or completed in the meanwhile + let mut restoration_status = self.status.lock(); + if let RestorationStatus::Initializing { .. } = *restoration_status { + *restoration_status = RestorationStatus::Ongoing { + state_chunks: state_chunks as u32, + block_chunks: block_chunks as u32, + state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32, + block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32, + }; + } Ok(()) } @@ -752,26 +859,19 @@ impl Drop for Service { #[cfg(test)] mod tests { - use std::sync::Arc; use client::ClientIoMessage; use io::{IoService}; use spec::Spec; use journaldb::Algorithm; - use error::Error; use snapshot::{ManifestData, RestorationStatus, SnapshotService}; use super::*; use tempdir::TempDir; - use test_helpers::restoration_db_handler; - - struct NoopDBRestore; - impl DatabaseRestore for NoopDBRestore { - fn restore_db(&self, _new_db: &str) -> Result<(), Error> { - Ok(()) - } - } + use test_helpers::{generate_dummy_client_with_spec_and_data, restoration_db_handler}; #[test] fn sends_async_messages() { + let gas_prices = vec![1.into(), 2.into(), 3.into(), 999.into()]; + let client = generate_dummy_client_with_spec_and_data(Spec::new_null, 400, 5, &gas_prices); let service = IoService::::start().unwrap(); let spec = Spec::new_test(); @@ -785,7 +885,7 @@ mod tests { pruning: Algorithm::Archive, channel: service.channel(), snapshot_root: dir, - db_restore: Arc::new(NoopDBRestore), + client: client, }; let service = Service::new(snapshot_params).unwrap(); diff --git a/ethcore/src/snapshot/tests/service.rs b/ethcore/src/snapshot/tests/service.rs index 1529ef001..fd070eab8 100644 --- a/ethcore/src/snapshot/tests/service.rs +++ b/ethcore/src/snapshot/tests/service.rs @@ -16,26 +16,23 @@ //! Tests for the snapshot service. +use std::fs; use std::sync::Arc; use tempdir::TempDir; -use client::{Client, BlockInfo}; +use blockchain::BlockProvider; +use client::{Client, ClientConfig, ImportBlock, BlockInfo}; use ids::BlockId; +use snapshot::io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter}; use snapshot::service::{Service, ServiceParams}; -use snapshot::{self, ManifestData, SnapshotService}; +use snapshot::{chunk_state, chunk_secondary, ManifestData, Progress, SnapshotService, RestorationStatus}; use spec::Spec; -use test_helpers::{generate_dummy_client_with_spec_and_data, restoration_db_handler}; +use test_helpers::{new_db, new_temp_db, generate_dummy_client_with_spec_and_data, restoration_db_handler}; +use parking_lot::Mutex; use io::IoChannel; use kvdb_rocksdb::DatabaseConfig; - -struct NoopDBRestore; - -impl snapshot::DatabaseRestore for NoopDBRestore { - fn restore_db(&self, _new_db: &str) -> Result<(), ::error::Error> { - Ok(()) - } -} +use verification::queue::kind::blocks::Unverified; #[test] fn restored_is_equivalent() { @@ -46,7 +43,6 @@ fn restored_is_equivalent() { const TX_PER: usize = 5; let gas_prices = vec![1.into(), 2.into(), 3.into(), 999.into()]; - let client = generate_dummy_client_with_spec_and_data(Spec::new_null, NUM_BLOCKS, TX_PER, &gas_prices); let tempdir = TempDir::new("").unwrap(); @@ -73,7 +69,7 @@ fn restored_is_equivalent() { pruning: ::journaldb::Algorithm::Archive, channel: IoChannel::disconnected(), snapshot_root: path, - db_restore: client2.clone(), + client: client2.clone(), }; let service = Service::new(service_params).unwrap(); @@ -94,7 +90,7 @@ fn restored_is_equivalent() { service.feed_block_chunk(hash, &chunk); } - assert_eq!(service.status(), ::snapshot::RestorationStatus::Inactive); + assert_eq!(service.status(), RestorationStatus::Inactive); for x in 0..NUM_BLOCKS { let block1 = client.block(BlockId::Number(x as u64)).unwrap(); @@ -111,6 +107,9 @@ fn restored_is_equivalent() { #[cfg(not(target_os = "windows"))] #[test] fn guards_delete_folders() { + let gas_prices = vec![1.into(), 2.into(), 3.into(), 999.into()]; + let client = generate_dummy_client_with_spec_and_data(Spec::new_null, 400, 5, &gas_prices); + let spec = Spec::new_null(); let tempdir = TempDir::new("").unwrap(); let service_params = ServiceParams { @@ -120,7 +119,7 @@ fn guards_delete_folders() { pruning: ::journaldb::Algorithm::Archive, channel: IoChannel::disconnected(), snapshot_root: tempdir.path().to_owned(), - db_restore: Arc::new(NoopDBRestore), + client: client, }; let service = Service::new(service_params).unwrap(); @@ -151,3 +150,201 @@ fn guards_delete_folders() { assert!(!path.join("db").exists()); assert!(path.join("temp").exists()); } + +#[test] +fn keep_ancient_blocks() { + ::env_logger::init().ok(); + + // Test variables + const NUM_BLOCKS: u64 = 500; + const NUM_SNAPSHOT_BLOCKS: u64 = 300; + const SNAPSHOT_MODE: ::snapshot::PowSnapshot = ::snapshot::PowSnapshot { blocks: NUM_SNAPSHOT_BLOCKS, max_restore_blocks: NUM_SNAPSHOT_BLOCKS }; + + // Temporary folders + let tempdir = TempDir::new("").unwrap(); + let snapshot_path = tempdir.path().join("SNAP"); + + // Generate blocks + let gas_prices = vec![1.into(), 2.into(), 3.into(), 999.into()]; + let spec_f = Spec::new_null; + let spec = spec_f(); + let client = generate_dummy_client_with_spec_and_data(spec_f, NUM_BLOCKS as u32, 5, &gas_prices); + + let bc = client.chain(); + + // Create the Snapshot + let best_hash = bc.best_block_hash(); + let writer = Mutex::new(PackedWriter::new(&snapshot_path).unwrap()); + let block_hashes = chunk_secondary( + Box::new(SNAPSHOT_MODE), + &bc, + best_hash, + &writer, + &Progress::default() + ).unwrap(); + let state_db = client.state_db().journal_db().boxed_clone(); + let start_header = bc.block_header_data(&best_hash).unwrap(); + let state_root = start_header.state_root(); + let state_hashes = chunk_state( + state_db.as_hashdb(), + &state_root, + &writer, + &Progress::default(), + None + ).unwrap(); + + let manifest = ::snapshot::ManifestData { + version: 2, + state_hashes: state_hashes, + state_root: state_root, + block_hashes: block_hashes, + block_number: NUM_BLOCKS, + block_hash: best_hash, + }; + + writer.into_inner().finish(manifest.clone()).unwrap(); + + // Initialize the Client + let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + let client_db = new_temp_db(&tempdir.path()); + let client2 = Client::new( + ClientConfig::default(), + &spec, + client_db, + Arc::new(::miner::Miner::new_for_tests(&spec, None)), + IoChannel::disconnected(), + ).unwrap(); + + // Add some ancient blocks + for block_number in 1..50 { + let block_hash = bc.block_hash(block_number).unwrap(); + let block = bc.block(&block_hash).unwrap(); + client2.import_block(Unverified::from_rlp(block.into_inner()).unwrap()).unwrap(); + } + + client2.import_verified_blocks(); + client2.flush_queue(); + + // Restore the Snapshot + let reader = PackedReader::new(&snapshot_path).unwrap().unwrap(); + let service_params = ServiceParams { + engine: spec.engine.clone(), + genesis_block: spec.genesis_block(), + restoration_db_handler: restoration_db_handler(db_config), + pruning: ::journaldb::Algorithm::Archive, + channel: IoChannel::disconnected(), + snapshot_root: tempdir.path().to_owned(), + client: client2.clone(), + }; + let service = Service::new(service_params).unwrap(); + service.init_restore(manifest.clone(), false).unwrap(); + + for hash in &manifest.block_hashes { + let chunk = reader.chunk(*hash).unwrap(); + service.feed_block_chunk(*hash, &chunk); + } + + for hash in &manifest.state_hashes { + let chunk = reader.chunk(*hash).unwrap(); + service.feed_state_chunk(*hash, &chunk); + } + + match service.status() { + RestorationStatus::Inactive => (), + RestorationStatus::Failed => panic!("Snapshot Restoration has failed."), + RestorationStatus::Ongoing { .. } => panic!("Snapshot Restoration should be done."), + _ => panic!("Invalid Snapshot Service status."), + } + + // Check that the latest block number is the right one + assert_eq!(client2.block(BlockId::Latest).unwrap().number(), NUM_BLOCKS as u64); + + // Check that we have blocks in [NUM_BLOCKS - NUM_SNAPSHOT_BLOCKS + 1 ; NUM_BLOCKS] + // but none before + assert!(client2.block(BlockId::Number(NUM_BLOCKS - NUM_SNAPSHOT_BLOCKS + 1)).is_some()); + assert!(client2.block(BlockId::Number(100)).is_none()); + + // Check that the first 50 blocks have been migrated + for block_number in 1..49 { + assert!(client2.block(BlockId::Number(block_number)).is_some()); + } +} + +#[test] +fn recover_aborted_recovery() { + ::env_logger::init().ok(); + + const NUM_BLOCKS: u32 = 400; + let gas_prices = vec![1.into(), 2.into(), 3.into(), 999.into()]; + let client = generate_dummy_client_with_spec_and_data(Spec::new_null, NUM_BLOCKS, 5, &gas_prices); + + let spec = Spec::new_null(); + let tempdir = TempDir::new("").unwrap(); + let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + let client_db = new_db(); + let client2 = Client::new( + Default::default(), + &spec, + client_db, + Arc::new(::miner::Miner::new_for_tests(&spec, None)), + IoChannel::disconnected(), + ).unwrap(); + let service_params = ServiceParams { + engine: spec.engine.clone(), + genesis_block: spec.genesis_block(), + restoration_db_handler: restoration_db_handler(db_config), + pruning: ::journaldb::Algorithm::Archive, + channel: IoChannel::disconnected(), + snapshot_root: tempdir.path().to_owned(), + client: client2.clone(), + }; + + let service = Service::new(service_params).unwrap(); + service.take_snapshot(&client, NUM_BLOCKS as u64).unwrap(); + + let manifest = service.manifest().unwrap(); + service.init_restore(manifest.clone(), true).unwrap(); + + // Restore only the state chunks + for hash in &manifest.state_hashes { + let chunk = service.chunk(*hash).unwrap(); + service.feed_state_chunk(*hash, &chunk); + } + + match service.status() { + RestorationStatus::Ongoing { block_chunks_done, state_chunks_done, .. } => { + assert_eq!(state_chunks_done, manifest.state_hashes.len() as u32); + assert_eq!(block_chunks_done, 0); + }, + e => panic!("Snapshot restoration must be ongoing ; {:?}", e), + } + + // Abort the restore... + service.abort_restore(); + + // And try again! + service.init_restore(manifest.clone(), true).unwrap(); + + match service.status() { + RestorationStatus::Ongoing { block_chunks_done, state_chunks_done, .. } => { + assert_eq!(state_chunks_done, manifest.state_hashes.len() as u32); + assert_eq!(block_chunks_done, 0); + }, + e => panic!("Snapshot restoration must be ongoing ; {:?}", e), + } + + // Remove the snapshot directory, and restart the restoration + // It shouldn't have restored any previous blocks + fs::remove_dir_all(tempdir.path()).unwrap(); + + // And try again! + service.init_restore(manifest.clone(), true).unwrap(); + + match service.status() { + RestorationStatus::Ongoing { block_chunks_done, state_chunks_done, .. } => { + assert_eq!(block_chunks_done, 0); + assert_eq!(state_chunks_done, 0); + }, + _ => panic!("Snapshot restoration must be ongoing"), + } +} diff --git a/ethcore/src/test_helpers.rs b/ethcore/src/test_helpers.rs index bf9ab0204..9f00f9662 100644 --- a/ethcore/src/test_helpers.rs +++ b/ethcore/src/test_helpers.rs @@ -41,7 +41,7 @@ use transaction::{Action, Transaction, SignedTransaction}; use views::BlockView; use blooms_db; use kvdb::KeyValueDB; -use kvdb_rocksdb; +use kvdb_rocksdb::{self, Database, DatabaseConfig}; use tempdir::TempDir; use verification::queue::kind::blocks::Unverified; use encoded; @@ -263,30 +263,30 @@ pub fn get_test_client_with_blocks(blocks: Vec) -> Arc { client } +struct TestBlockChainDB { + _blooms_dir: TempDir, + _trace_blooms_dir: TempDir, + blooms: blooms_db::Database, + trace_blooms: blooms_db::Database, + key_value: Arc, +} + +impl BlockChainDB for TestBlockChainDB { + fn key_value(&self) -> &Arc { + &self.key_value + } + + fn blooms(&self) -> &blooms_db::Database { + &self.blooms + } + + fn trace_blooms(&self) -> &blooms_db::Database { + &self.trace_blooms + } +} + /// Creates new test instance of `BlockChainDB` pub fn new_db() -> Arc { - struct TestBlockChainDB { - _blooms_dir: TempDir, - _trace_blooms_dir: TempDir, - blooms: blooms_db::Database, - trace_blooms: blooms_db::Database, - key_value: Arc, - } - - impl BlockChainDB for TestBlockChainDB { - fn key_value(&self) -> &Arc { - &self.key_value - } - - fn blooms(&self) -> &blooms_db::Database { - &self.blooms - } - - fn trace_blooms(&self) -> &blooms_db::Database { - &self.trace_blooms - } - } - let blooms_dir = TempDir::new("").unwrap(); let trace_blooms_dir = TempDir::new("").unwrap(); @@ -301,6 +301,26 @@ pub fn new_db() -> Arc { Arc::new(db) } +/// Creates a new temporary `BlockChainDB` on FS +pub fn new_temp_db(tempdir: &Path) -> Arc { + let blooms_dir = TempDir::new("").unwrap(); + let trace_blooms_dir = TempDir::new("").unwrap(); + let key_value_dir = tempdir.join("key_value"); + + let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + let key_value_db = Database::open(&db_config, key_value_dir.to_str().unwrap()).unwrap(); + + let db = TestBlockChainDB { + blooms: blooms_db::Database::open(blooms_dir.path()).unwrap(), + trace_blooms: blooms_db::Database::open(trace_blooms_dir.path()).unwrap(), + _blooms_dir: blooms_dir, + _trace_blooms_dir: trace_blooms_dir, + key_value: Arc::new(key_value_db) + }; + + Arc::new(db) +} + /// Creates new instance of KeyValueDBHandler pub fn restoration_db_handler(config: kvdb_rocksdb::DatabaseConfig) -> Box { struct RestorationDBHandler { diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 1a0f99b92..e0fc8ecdd 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -970,6 +970,12 @@ impl ChainSync { self.state = SyncState::Blocks; self.continue_sync(io); }, + SyncState::SnapshotData => match io.snapshot_service().status() { + RestorationStatus::Inactive | RestorationStatus::Failed => { + self.state = SyncState::SnapshotWaiting; + }, + RestorationStatus::Initializing { .. } | RestorationStatus::Ongoing { .. } => (), + }, SyncState::SnapshotWaiting => { match io.snapshot_service().status() { RestorationStatus::Inactive => { diff --git a/ethcore/sync/src/chain/supplier.rs b/ethcore/sync/src/chain/supplier.rs index 201e0d9f3..4bce0ef98 100644 --- a/ethcore/sync/src/chain/supplier.rs +++ b/ethcore/sync/src/chain/supplier.rs @@ -226,7 +226,8 @@ impl SyncSupplier { let mut added_receipts = 0usize; let mut data = Bytes::new(); for i in 0..count { - if let Some(mut receipts_bytes) = io.chain().encoded_block_receipts(&rlp.val_at::(i)?) { + if let Some(receipts) = io.chain().block_receipts(&rlp.val_at::(i)?) { + let mut receipts_bytes = ::rlp::encode(&receipts); data.append(&mut receipts_bytes); added_receipts += receipts_bytes.len(); added_headers += 1; diff --git a/rpc/src/v1/impls/parity.rs b/rpc/src/v1/impls/parity.rs index fe3eb320a..2671a0eab 100644 --- a/rpc/src/v1/impls/parity.rs +++ b/rpc/src/v1/impls/parity.rs @@ -440,7 +440,7 @@ impl Parity for ParityClient where BlockNumber::Earliest => BlockId::Earliest, BlockNumber::Latest => BlockId::Latest, }; - let receipts = try_bf!(self.client.block_receipts(id).ok_or_else(errors::unknown_block)); + let receipts = try_bf!(self.client.localized_block_receipts(id).ok_or_else(errors::unknown_block)); Box::new(future::ok(receipts.into_iter().map(Into::into).collect())) }