2020-09-22 14:53:52 +02:00
// Copyright 2015-2020 Parity Technologies (UK) Ltd.
// This file is part of OpenEthereum.
2016-08-05 17:00:46 +02:00
2020-09-22 14:53:52 +02:00
// OpenEthereum is free software: you can redistribute it and/or modify
2016-08-05 17:00:46 +02:00
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
2020-09-22 14:53:52 +02:00
// OpenEthereum is distributed in the hope that it will be useful,
2016-08-05 17:00:46 +02:00
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
2020-09-22 14:53:52 +02:00
// along with OpenEthereum. If not, see <http://www.gnu.org/licenses/>.
2016-08-05 17:00:46 +02:00
//! Snapshot network service implementation.
2017-04-19 20:31:53 +02:00
use std ::{
2020-08-05 06:08:03 +02:00
cmp ,
2017-04-19 20:31:53 +02:00
collections ::HashSet ,
2018-05-16 22:01:55 +02:00
fs ::{ self , File } ,
io ::{ self , ErrorKind , Read } ,
2016-09-07 15:27:28 +02:00
path ::PathBuf ,
2016-08-05 17:00:46 +02:00
sync ::{
2016-09-07 15:27:14 +02:00
atomic ::{ AtomicBool , AtomicUsize , Ordering } ,
2020-08-05 06:08:03 +02:00
Arc ,
} ,
2016-09-07 15:27:14 +02:00
} ;
2020-08-05 06:08:03 +02:00
2018-02-22 14:52:29 +01:00
use super ::{
2016-08-25 22:20:44 +02:00
io ::{ LooseReader , LooseWriter , SnapshotReader , SnapshotWriter } ,
2020-09-14 16:08:57 +02:00
CreationStatus , ManifestData , Rebuilder , RestorationStatus , SnapshotService , StateRebuilder ,
MAX_CHUNK_SIZE ,
2016-08-25 22:20:44 +02:00
} ;
2016-08-05 17:00:46 +02:00
2018-06-20 15:13:07 +02:00
use blockchain ::{ BlockChain , BlockChainDB , BlockChainDBHandler } ;
2018-11-18 00:06:34 +01:00
use client ::{ BlockChainClient , BlockInfo , ChainInfo , Client , ClientIoMessage } ;
2017-09-26 14:19:08 +02:00
use engines ::EthEngine ;
2018-09-10 15:26:52 +02:00
use error ::{ Error , ErrorKind as SnapshotErrorKind } ;
2018-05-16 22:01:55 +02:00
use hash ::keccak ;
2018-09-10 15:26:52 +02:00
use snapshot ::Error as SnapshotError ;
2019-01-04 14:05:46 +01:00
use types ::ids ::BlockId ;
2016-08-05 17:00:46 +02:00
use io ::IoChannel ;
2017-09-06 20:47:45 +02:00
use bytes ::Bytes ;
2018-01-10 13:35:18 +01:00
use ethereum_types ::H256 ;
2017-10-17 06:20:24 +02:00
use journaldb ::Algorithm ;
2018-11-18 00:06:34 +01:00
use kvdb ::DBTransaction ;
2017-09-02 20:09:13 +02:00
use parking_lot ::{ Mutex , RwLock , RwLockReadGuard } ;
2017-10-10 20:01:27 +02:00
use snappy ;
2016-08-05 17:00:46 +02:00
2016-09-02 19:00:20 +02:00
/// Helper for removing directories in case of error.
struct Guard ( bool , PathBuf ) ;
impl Guard {
fn new ( path : PathBuf ) -> Self {
Guard ( true , path )
}
2020-08-05 06:08:03 +02:00
2017-07-21 17:24:53 +02:00
#[ cfg(test) ]
fn benign ( ) -> Self {
Guard ( false , PathBuf ::default ( ) )
}
2020-08-05 06:08:03 +02:00
2016-09-02 19:00:20 +02:00
fn disarm ( mut self ) {
self . 0 = false
}
}
impl Drop for Guard {
fn drop ( & mut self ) {
if self . 0 {
let _ = fs ::remove_dir_all ( & self . 1 ) ;
}
}
}
2016-09-06 15:31:13 +02:00
/// External database restoration handler
2016-09-06 15:41:56 +02:00
pub trait DatabaseRestore : Send + Sync {
2016-09-06 15:31:13 +02:00
/// Restart with a new backend. Takes ownership of passed database and moves it to a new location.
fn restore_db ( & self , new_db : & str ) -> Result < ( ) , Error > ;
2016-08-05 17:00:46 +02:00
}
/// State restoration manager.
struct Restoration {
2016-08-25 22:20:44 +02:00
manifest : ManifestData ,
2016-08-05 17:00:46 +02:00
state_chunks_left : HashSet < H256 > ,
block_chunks_left : HashSet < H256 > ,
state : StateRebuilder ,
2020-07-29 10:36:15 +02:00
secondary : Box < dyn Rebuilder > ,
2016-09-11 14:05:59 +02:00
writer : Option < LooseWriter > ,
2016-08-05 17:00:46 +02:00
snappy_buffer : Bytes ,
final_state_root : H256 ,
2016-09-02 19:00:20 +02:00
guard : Guard ,
2020-07-29 10:36:15 +02:00
db : Arc < dyn BlockChainDB > ,
2016-08-05 17:00:46 +02:00
}
2016-08-25 22:20:44 +02:00
struct RestorationParams < ' a > {
manifest : ManifestData , // manifest to base restoration on.
pruning : Algorithm , // pruning algorithm for the database.
2020-07-29 10:36:15 +02:00
db : Arc < dyn BlockChainDB > , // database
2016-09-11 14:05:59 +02:00
writer : Option < LooseWriter > , // writer for recovered snapshot.
2016-08-25 22:20:44 +02:00
genesis : & ' a [ u8 ] , // genesis block of the chain.
2016-09-02 19:00:20 +02:00
guard : Guard , // guard for the restoration directory.
2020-07-29 10:36:15 +02:00
engine : & ' a dyn EthEngine ,
2016-08-25 22:20:44 +02:00
}
2016-08-05 17:00:46 +02:00
impl Restoration {
2016-08-25 22:20:44 +02:00
// make a new restoration using the given parameters.
fn new ( params : RestorationParams ) -> Result < Self , Error > {
let manifest = params . manifest ;
2020-08-05 06:08:03 +02:00
2016-08-25 22:20:44 +02:00
let state_chunks = manifest . state_hashes . iter ( ) . cloned ( ) . collect ( ) ;
let block_chunks = manifest . block_hashes . iter ( ) . cloned ( ) . collect ( ) ;
2020-08-05 06:08:03 +02:00
2018-04-09 14:21:37 +02:00
let raw_db = params . db ;
2020-08-05 06:08:03 +02:00
2017-01-23 15:27:11 +01:00
let chain = BlockChain ::new ( Default ::default ( ) , params . genesis , raw_db . clone ( ) ) ;
2017-04-19 20:31:53 +02:00
let components = params
. engine
. snapshot_components ( )
. ok_or_else ( | | ::snapshot ::Error ::SnapshotsUnsupported ) ? ;
2020-08-05 06:08:03 +02:00
2017-04-19 20:31:53 +02:00
let secondary = components . rebuilder ( chain , raw_db . clone ( ) , & manifest ) ? ;
2020-08-05 06:08:03 +02:00
2016-08-25 22:20:44 +02:00
let root = manifest . state_root . clone ( ) ;
2020-08-05 06:08:03 +02:00
2016-08-05 17:00:46 +02:00
Ok ( Restoration {
2016-08-25 22:20:44 +02:00
manifest : manifest ,
state_chunks_left : state_chunks ,
block_chunks_left : block_chunks ,
2018-06-20 15:13:07 +02:00
state : StateRebuilder ::new ( raw_db . key_value ( ) . clone ( ) , params . pruning ) ,
2017-04-19 20:31:53 +02:00
secondary : secondary ,
2016-08-25 22:20:44 +02:00
writer : params . writer ,
2016-08-05 17:00:46 +02:00
snappy_buffer : Vec ::new ( ) ,
2016-08-25 22:20:44 +02:00
final_state_root : root ,
2016-09-02 19:00:20 +02:00
guard : params . guard ,
2016-10-25 18:40:01 +02:00
db : raw_db ,
2016-08-05 17:00:46 +02:00
} )
}
2020-08-05 06:08:03 +02:00
2016-11-13 13:52:53 +01:00
// feeds a state chunk, aborts early if `flag` becomes false.
fn feed_state ( & mut self , hash : H256 , chunk : & [ u8 ] , flag : & AtomicBool ) -> Result < ( ) , Error > {
2017-07-21 17:24:53 +02:00
if self . state_chunks_left . contains ( & hash ) {
2018-02-22 14:52:29 +01:00
let expected_len = snappy ::decompressed_len ( chunk ) ? ;
if expected_len > MAX_CHUNK_SIZE {
trace! ( target : " snapshot " , " Discarding large chunk: {} vs {} " , expected_len , MAX_CHUNK_SIZE ) ;
return Err ( ::snapshot ::Error ::ChunkTooLarge . into ( ) ) ;
}
2016-12-27 12:53:56 +01:00
let len = snappy ::decompress_into ( chunk , & mut self . snappy_buffer ) ? ;
2020-08-05 06:08:03 +02:00
2016-12-27 12:53:56 +01:00
self . state . feed ( & self . snappy_buffer [ .. len ] , flag ) ? ;
2020-08-05 06:08:03 +02:00
2016-09-11 14:05:59 +02:00
if let Some ( ref mut writer ) = self . writer . as_mut ( ) {
2016-12-27 12:53:56 +01:00
writer . write_state_chunk ( hash , chunk ) ? ;
2016-09-11 14:05:59 +02:00
}
2020-08-05 06:08:03 +02:00
2017-07-21 17:24:53 +02:00
self . state_chunks_left . remove ( & hash ) ;
2016-08-05 17:00:46 +02:00
}
2020-08-05 06:08:03 +02:00
2016-08-05 17:00:46 +02:00
Ok ( ( ) )
}
2020-08-05 06:08:03 +02:00
2016-08-05 17:00:46 +02:00
// feeds a block chunk
2017-09-26 14:19:08 +02:00
fn feed_blocks (
& mut self ,
hash : H256 ,
chunk : & [ u8 ] ,
2020-07-29 10:36:15 +02:00
engine : & dyn EthEngine ,
2017-09-26 14:19:08 +02:00
flag : & AtomicBool ,
) -> Result < ( ) , Error > {
2017-07-21 17:24:53 +02:00
if self . block_chunks_left . contains ( & hash ) {
2018-02-22 14:52:29 +01:00
let expected_len = snappy ::decompressed_len ( chunk ) ? ;
if expected_len > MAX_CHUNK_SIZE {
trace! ( target : " snapshot " , " Discarding large chunk: {} vs {} " , expected_len , MAX_CHUNK_SIZE ) ;
return Err ( ::snapshot ::Error ::ChunkTooLarge . into ( ) ) ;
}
2016-12-27 12:53:56 +01:00
let len = snappy ::decompress_into ( chunk , & mut self . snappy_buffer ) ? ;
2020-08-05 06:08:03 +02:00
2017-04-19 20:31:53 +02:00
self . secondary
. feed ( & self . snappy_buffer [ .. len ] , engine , flag ) ? ;
2016-09-11 14:05:59 +02:00
if let Some ( ref mut writer ) = self . writer . as_mut ( ) {
2016-12-27 12:53:56 +01:00
writer . write_block_chunk ( hash , chunk ) ? ;
2016-09-11 14:05:59 +02:00
}
2020-08-05 06:08:03 +02:00
2017-07-21 17:24:53 +02:00
self . block_chunks_left . remove ( & hash ) ;
2016-08-25 22:20:44 +02:00
}
2020-08-05 06:08:03 +02:00
2016-08-25 22:20:44 +02:00
Ok ( ( ) )
}
2020-08-05 06:08:03 +02:00
2016-08-25 22:20:44 +02:00
// finish up restoration.
2020-07-29 10:36:15 +02:00
fn finalize ( mut self , engine : & dyn EthEngine ) -> Result < ( ) , Error > {
2017-09-06 20:47:45 +02:00
use trie ::TrieError ;
2020-08-05 06:08:03 +02:00
2016-08-25 22:20:44 +02:00
if ! self . is_done ( ) {
return Ok ( ( ) ) ;
}
2020-08-05 06:08:03 +02:00
2016-08-25 22:20:44 +02:00
// verify final state root.
let root = self . state . state_root ( ) ;
if root ! = self . final_state_root {
2018-05-12 22:46:08 +02:00
warn! (
" Final restored state has wrong state root: expected {:?}, got {:?} " ,
self . final_state_root , root
) ;
2016-08-25 22:20:44 +02:00
return Err ( TrieError ::InvalidStateRoot ( root ) . into ( ) ) ;
2016-08-05 17:00:46 +02:00
}
2020-08-05 06:08:03 +02:00
2016-08-25 22:20:44 +02:00
// check for missing code.
2017-06-28 13:17:36 +02:00
self . state
. finalize ( self . manifest . block_number , self . manifest . block_hash ) ? ;
2020-08-05 06:08:03 +02:00
2016-10-28 16:10:30 +02:00
// connect out-of-order chunks and verify chain integrity.
2017-06-28 13:17:36 +02:00
self . secondary . finalize ( engine ) ? ;
2020-08-05 06:08:03 +02:00
2016-09-11 14:05:59 +02:00
if let Some ( writer ) = self . writer {
2016-12-27 12:53:56 +01:00
writer . finish ( self . manifest ) ? ;
2016-09-11 14:05:59 +02:00
}
2020-08-05 06:08:03 +02:00
2016-09-02 19:00:20 +02:00
self . guard . disarm ( ) ;
2016-08-05 17:00:46 +02:00
Ok ( ( ) )
}
2020-08-05 06:08:03 +02:00
2016-08-05 17:00:46 +02:00
// is everything done?
fn is_done ( & self ) -> bool {
self . block_chunks_left . is_empty ( ) & & self . state_chunks_left . is_empty ( )
}
}
/// Type alias for client io channel.
pub type Channel = IoChannel < ClientIoMessage > ;
2018-11-18 00:06:34 +01:00
/// Trait alias for the Client Service used
pub trait SnapshotClient : BlockChainClient + BlockInfo + DatabaseRestore { }
2016-09-07 15:27:28 +02:00
/// Snapshot service parameters.
pub struct ServiceParams {
/// The consensus engine this is built on.
2020-07-29 10:36:15 +02:00
pub engine : Arc < dyn EthEngine > ,
2016-09-07 15:27:28 +02:00
/// The chain's genesis block.
pub genesis_block : Bytes ,
/// State pruning algorithm.
pub pruning : Algorithm ,
2018-04-09 14:21:37 +02:00
/// Handler for opening a restoration DB.
2020-07-29 10:36:15 +02:00
pub restoration_db_handler : Box < dyn BlockChainDBHandler > ,
2016-09-07 15:27:28 +02:00
/// Async IO channel for sending messages.
pub channel : Channel ,
/// The directory to put snapshots in.
/// Usually "<chain hash>/snapshot"
pub snapshot_root : PathBuf ,
/// A handle for database restoration.
2020-07-29 10:36:15 +02:00
pub client : Arc < dyn SnapshotClient > ,
2016-09-07 15:27:28 +02:00
}
/// `SnapshotService` implementation.
/// This controls taking snapshots and restoring from them.
2016-08-05 17:00:46 +02:00
pub struct Service {
restoration : Mutex < Option < Restoration > > ,
2020-07-29 10:36:15 +02:00
restoration_db_handler : Box < dyn BlockChainDBHandler > ,
2016-09-07 15:27:28 +02:00
snapshot_root : PathBuf ,
2016-10-30 09:56:34 +01:00
io_channel : Mutex < Channel > ,
2016-08-05 17:00:46 +02:00
pruning : Algorithm ,
status : Mutex < RestorationStatus > ,
2016-08-25 22:20:44 +02:00
reader : RwLock < Option < LooseReader > > ,
2020-07-29 10:36:15 +02:00
engine : Arc < dyn EthEngine > ,
2016-08-05 23:33:55 +02:00
genesis_block : Bytes ,
2016-08-05 17:00:46 +02:00
state_chunks : AtomicUsize ,
block_chunks : AtomicUsize ,
2020-07-29 10:36:15 +02:00
client : Arc < dyn SnapshotClient > ,
2016-09-06 17:44:11 +02:00
progress : super ::Progress ,
2016-09-07 15:27:14 +02:00
taking_snapshot : AtomicBool ,
2020-09-14 16:08:57 +02:00
taking_snapshot_at : AtomicUsize ,
2016-11-13 13:52:53 +01:00
restoring_snapshot : AtomicBool ,
2016-08-05 17:00:46 +02:00
}
impl Service {
2016-09-07 15:27:28 +02:00
/// Create a new snapshot service from the given parameters.
pub fn new ( params : ServiceParams ) -> Result < Self , Error > {
let mut service = Service {
2016-08-05 17:00:46 +02:00
restoration : Mutex ::new ( None ) ,
2018-04-09 14:21:37 +02:00
restoration_db_handler : params . restoration_db_handler ,
2016-09-07 15:27:28 +02:00
snapshot_root : params . snapshot_root ,
2016-10-30 09:56:34 +01:00
io_channel : Mutex ::new ( params . channel ) ,
2016-09-07 15:27:28 +02:00
pruning : params . pruning ,
2016-08-05 17:00:46 +02:00
status : Mutex ::new ( RestorationStatus ::Inactive ) ,
2016-09-07 15:27:28 +02:00
reader : RwLock ::new ( None ) ,
engine : params . engine ,
genesis_block : params . genesis_block ,
2016-08-05 17:00:46 +02:00
state_chunks : AtomicUsize ::new ( 0 ) ,
block_chunks : AtomicUsize ::new ( 0 ) ,
2018-11-18 00:06:34 +01:00
client : params . client ,
2016-09-06 17:44:11 +02:00
progress : Default ::default ( ) ,
2016-09-07 15:27:14 +02:00
taking_snapshot : AtomicBool ::new ( false ) ,
2020-09-14 16:08:57 +02:00
taking_snapshot_at : AtomicUsize ::new ( 0 ) ,
2016-11-13 13:52:53 +01:00
restoring_snapshot : AtomicBool ::new ( false ) ,
2016-08-05 17:00:46 +02:00
} ;
2020-08-05 06:08:03 +02:00
2016-08-25 22:20:44 +02:00
// create the root snapshot dir if it doesn't exist.
2016-09-07 15:27:28 +02:00
if let Err ( e ) = fs ::create_dir_all ( & service . snapshot_root ) {
2016-08-10 16:29:40 +02:00
if e . kind ( ) ! = ErrorKind ::AlreadyExists {
return Err ( e . into ( ) ) ;
2020-08-05 06:08:03 +02:00
}
2016-08-05 17:00:46 +02:00
}
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
// delete the temporary restoration DB dir if it does exist.
if let Err ( e ) = fs ::remove_dir_all ( service . restoration_db ( ) ) {
2016-08-10 16:29:40 +02:00
if e . kind ( ) ! = ErrorKind ::NotFound {
return Err ( e . into ( ) ) ;
2020-08-05 06:08:03 +02:00
}
2016-08-05 17:00:46 +02:00
}
2020-08-05 06:08:03 +02:00
2016-09-05 14:25:56 +02:00
// delete the temporary snapshot dir if it does exist.
if let Err ( e ) = fs ::remove_dir_all ( service . temp_snapshot_dir ( ) ) {
2016-09-05 14:28:28 +02:00
if e . kind ( ) ! = ErrorKind ::NotFound {
2016-09-05 14:25:56 +02:00
return Err ( e . into ( ) ) ;
}
}
2020-08-05 06:08:03 +02:00
2016-09-07 15:27:28 +02:00
let reader = LooseReader ::new ( service . snapshot_dir ( ) ) . ok ( ) ;
* service . reader . get_mut ( ) = reader ;
2020-08-05 06:08:03 +02:00
2016-09-07 15:27:28 +02:00
Ok ( service )
2016-08-05 17:00:46 +02:00
}
2020-08-05 06:08:03 +02:00
2016-08-25 22:20:44 +02:00
// get the current snapshot dir.
fn snapshot_dir ( & self ) -> PathBuf {
2016-09-07 15:27:28 +02:00
let mut dir = self . snapshot_root . clone ( ) ;
2016-08-25 22:20:44 +02:00
dir . push ( " current " ) ;
dir
}
2020-08-05 06:08:03 +02:00
2016-09-02 16:15:25 +02:00
// get the temporary snapshot dir.
fn temp_snapshot_dir ( & self ) -> PathBuf {
2016-09-07 15:27:28 +02:00
let mut dir = self . snapshot_root . clone ( ) ;
2016-09-02 16:15:25 +02:00
dir . push ( " in_progress " ) ;
dir
}
2020-08-05 06:08:03 +02:00
2016-08-05 17:00:46 +02:00
// get the restoration directory.
fn restoration_dir ( & self ) -> PathBuf {
2016-09-07 15:27:28 +02:00
let mut dir = self . snapshot_root . clone ( ) ;
2016-08-05 17:00:46 +02:00
dir . push ( " restoration " ) ;
dir
}
2020-08-05 06:08:03 +02:00
2016-08-05 17:00:46 +02:00
// restoration db path.
fn restoration_db ( & self ) -> PathBuf {
let mut dir = self . restoration_dir ( ) ;
dir . push ( " db " ) ;
dir
}
2020-08-05 06:08:03 +02:00
2016-08-25 22:20:44 +02:00
// temporary snapshot recovery path.
fn temp_recovery_dir ( & self ) -> PathBuf {
let mut dir = self . restoration_dir ( ) ;
dir . push ( " temp " ) ;
dir
}
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
// previous snapshot chunks path.
fn prev_chunks_dir ( & self ) -> PathBuf {
let mut dir = self . snapshot_root . clone ( ) ;
dir . push ( " prev_chunks " ) ;
dir
}
2020-08-05 06:08:03 +02:00
2016-08-05 17:00:46 +02:00
// replace one the client's database with our own.
fn replace_client_db ( & self ) -> Result < ( ) , Error > {
2018-11-18 00:06:34 +01:00
let migrated_blocks = self . migrate_blocks ( ) ? ;
2019-02-07 14:34:24 +01:00
info! ( target : " snapshot " , " Migrated {} ancient blocks " , migrated_blocks ) ;
2020-08-05 06:08:03 +02:00
2018-11-18 00:06:34 +01:00
let rest_db = self . restoration_db ( ) ;
self . client . restore_db ( & * rest_db . to_string_lossy ( ) ) ? ;
2016-09-06 15:31:13 +02:00
Ok ( ( ) )
2016-08-05 17:00:46 +02:00
}
2020-08-05 06:08:03 +02:00
2018-11-18 00:06:34 +01:00
// Migrate the blocks in the current DB into the new chain
fn migrate_blocks ( & self ) -> Result < usize , Error > {
// Count the number of migrated blocks
let mut count = 0 ;
let rest_db = self . restoration_db ( ) ;
2020-08-05 06:08:03 +02:00
2018-11-18 00:06:34 +01:00
let cur_chain_info = self . client . chain_info ( ) ;
2020-08-05 06:08:03 +02:00
2018-11-18 00:06:34 +01:00
let next_db = self . restoration_db_handler . open ( & rest_db ) ? ;
let next_chain = BlockChain ::new ( Default ::default ( ) , & [ ] , next_db . clone ( ) ) ;
let next_chain_info = next_chain . chain_info ( ) ;
2020-08-05 06:08:03 +02:00
2018-11-18 00:06:34 +01:00
// The old database looks like this:
// [genesis, best_ancient_block] ... [first_block, best_block]
// If we are fully synced neither `best_ancient_block` nor `first_block` is set, and we can assume that the whole range from [genesis, best_block] is imported.
// The new database only contains the tip of the chain ([first_block, best_block]),
// so the useful set of blocks is defined as:
// [0 ... min(new.first_block, best_ancient_block or best_block)]
let find_range = | | -> Option < ( H256 , H256 ) > {
let next_available_from = next_chain_info . first_block_number ? ;
let cur_available_to = cur_chain_info
. ancient_block_number
. unwrap_or ( cur_chain_info . best_block_number ) ;
2020-08-05 06:08:03 +02:00
2018-11-18 00:06:34 +01:00
let highest_block_num =
cmp ::min ( next_available_from . saturating_sub ( 1 ) , cur_available_to ) ;
2020-08-05 06:08:03 +02:00
2018-11-18 00:06:34 +01:00
if highest_block_num = = 0 {
return None ;
}
2020-08-05 06:08:03 +02:00
2018-11-18 00:06:34 +01:00
trace! ( target : " snapshot " , " Trying to import ancient blocks until {} " , highest_block_num ) ;
2020-08-05 06:08:03 +02:00
2018-11-18 00:06:34 +01:00
// Here we start from the highest block number and go backward to 0,
// thus starting at `highest_block_num` and targetting `0`.
let target_hash = self . client . block_hash ( BlockId ::Number ( 0 ) ) ? ;
let start_hash = self . client . block_hash ( BlockId ::Number ( highest_block_num ) ) ? ;
2020-08-05 06:08:03 +02:00
2018-11-18 00:06:34 +01:00
Some ( ( start_hash , target_hash ) )
} ;
2020-08-05 06:08:03 +02:00
2018-11-18 00:06:34 +01:00
let ( start_hash , target_hash ) = match find_range ( ) {
Some ( x ) = > x ,
None = > return Ok ( 0 ) ,
} ;
2020-08-05 06:08:03 +02:00
2018-11-18 00:06:34 +01:00
let mut batch = DBTransaction ::new ( ) ;
let mut parent_hash = start_hash ;
while parent_hash ! = target_hash {
// Early return if restoration is aborted
if ! self . restoring_snapshot . load ( Ordering ::SeqCst ) {
return Ok ( count ) ;
}
2020-08-05 06:08:03 +02:00
2018-11-18 00:06:34 +01:00
let block = self
. client
. block ( BlockId ::Hash ( parent_hash ) )
. ok_or ( ::snapshot ::error ::Error ::UnlinkedAncientBlockChain ) ? ;
parent_hash = block . parent_hash ( ) ;
2020-08-05 06:08:03 +02:00
2018-11-18 00:06:34 +01:00
let block_number = block . number ( ) ;
let block_receipts = self . client . block_receipts ( & block . hash ( ) ) ;
let parent_total_difficulty = self
. client
. block_total_difficulty ( BlockId ::Hash ( parent_hash ) ) ;
2020-08-05 06:08:03 +02:00
2018-11-18 00:06:34 +01:00
match ( block_receipts , parent_total_difficulty ) {
( Some ( block_receipts ) , Some ( parent_total_difficulty ) ) = > {
let block_receipts = block_receipts . receipts ;
2020-08-05 06:08:03 +02:00
2018-11-18 00:06:34 +01:00
next_chain . insert_unordered_block (
& mut batch ,
block ,
block_receipts ,
Some ( parent_total_difficulty ) ,
false ,
true ,
) ;
count + = 1 ;
}
_ = > break ,
}
2020-08-05 06:08:03 +02:00
2019-06-25 15:38:29 +02:00
// Writing changes to DB and logging every now and then
2018-11-18 00:06:34 +01:00
if block_number % 1_000 = = 0 {
next_db . key_value ( ) . write_buffered ( batch ) ;
next_chain . commit ( ) ;
next_db . key_value ( ) . flush ( ) . expect ( " DB flush failed. " ) ;
batch = DBTransaction ::new ( ) ;
}
2020-08-05 06:08:03 +02:00
2018-11-18 00:06:34 +01:00
if block_number % 10_000 = = 0 {
2019-02-07 14:34:24 +01:00
info! ( target : " snapshot " , " Block restoration at #{} " , block_number ) ;
2020-08-05 06:08:03 +02:00
}
2018-11-18 00:06:34 +01:00
}
2020-08-05 06:08:03 +02:00
2018-11-18 00:06:34 +01:00
// Final commit to the DB
next_db . key_value ( ) . write_buffered ( batch ) ;
next_chain . commit ( ) ;
next_db . key_value ( ) . flush ( ) . expect ( " DB flush failed. " ) ;
2020-08-05 06:08:03 +02:00
2018-11-18 00:06:34 +01:00
// We couldn't reach the targeted hash
if parent_hash ! = target_hash {
return Err ( ::snapshot ::error ::Error ::UnlinkedAncientBlockChain . into ( ) ) ;
}
2020-08-05 06:08:03 +02:00
2018-11-18 00:06:34 +01:00
// Update best ancient block in the Next Chain
next_chain . update_best_ancient_block ( & start_hash ) ;
Ok ( count )
}
2020-08-05 06:08:03 +02:00
2016-09-11 14:05:59 +02:00
/// Get a reference to the snapshot reader.
pub fn reader ( & self ) -> RwLockReadGuard < Option < LooseReader > > {
self . reader . read ( )
}
2020-08-05 06:08:03 +02:00
2016-09-06 17:44:11 +02:00
/// Tick the snapshot service. This will log any active snapshot
/// being taken.
pub fn tick ( & self ) {
2016-09-07 15:27:14 +02:00
if self . progress . done ( ) | | ! self . taking_snapshot . load ( Ordering ::SeqCst ) {
return ;
}
2020-08-05 06:08:03 +02:00
2016-09-06 17:44:11 +02:00
let p = & self . progress ;
info! (
" Snapshot: {} accounts {} blocks {} bytes " ,
p . accounts ( ) ,
p . blocks ( ) ,
p . size ( )
) ;
}
2020-08-05 06:08:03 +02:00
2016-09-02 16:15:25 +02:00
/// Take a snapshot at the block with the given number.
/// calling this while a restoration is in progress or vice versa
/// will lead to a race condition where the first one to finish will
/// have their produced snapshot overwritten.
pub fn take_snapshot ( & self , client : & Client , num : u64 ) -> Result < ( ) , Error > {
2016-09-07 15:27:14 +02:00
if self
. taking_snapshot
. compare_and_swap ( false , true , Ordering ::SeqCst )
{
info! (
" Skipping snapshot at #{} as another one is currently in-progress. " ,
num
) ;
return Ok ( ( ) ) ;
}
2020-08-05 06:08:03 +02:00
2020-09-14 16:08:57 +02:00
self . taking_snapshot_at
. store ( num as usize , Ordering ::SeqCst ) ;
2016-09-02 16:15:25 +02:00
info! ( " Taking snapshot at #{} " , num ) ;
2016-09-06 17:44:11 +02:00
self . progress . reset ( ) ;
2020-08-05 06:08:03 +02:00
2016-09-02 16:15:25 +02:00
let temp_dir = self . temp_snapshot_dir ( ) ;
let snapshot_dir = self . snapshot_dir ( ) ;
2020-08-05 06:08:03 +02:00
2016-09-02 16:15:25 +02:00
let _ = fs ::remove_dir_all ( & temp_dir ) ;
2020-08-05 06:08:03 +02:00
2016-12-27 12:53:56 +01:00
let writer = LooseWriter ::new ( temp_dir . clone ( ) ) ? ;
2020-08-05 06:08:03 +02:00
2016-09-02 19:00:20 +02:00
let guard = Guard ::new ( temp_dir . clone ( ) ) ;
2016-12-09 23:01:43 +01:00
let res = client . take_snapshot ( writer , BlockId ::Number ( num ) , & self . progress ) ;
2016-09-07 15:27:14 +02:00
self . taking_snapshot . store ( false , Ordering ::SeqCst ) ;
2016-09-21 12:56:13 +02:00
if let Err ( e ) = res {
2016-10-14 14:44:56 +02:00
if client . chain_info ( ) . best_block_number > = num + client . pruning_history ( ) {
2019-06-25 15:38:29 +02:00
// The state we were snapshotting was pruned before we could finish.
info! ( " Periodic snapshot failed: block state pruned. Run with a longer `--pruning-history` or with `--no-periodic-snapshot` " ) ;
return Err ( e ) ;
2016-09-21 12:56:13 +02:00
} else {
return Err ( e ) ;
}
2020-08-05 06:08:03 +02:00
}
2016-09-06 17:44:11 +02:00
info! ( " Finished taking snapshot at #{} " , num ) ;
2020-08-05 06:08:03 +02:00
2016-09-02 16:15:25 +02:00
let mut reader = self . reader . write ( ) ;
2020-08-05 06:08:03 +02:00
2016-09-02 16:15:25 +02:00
// destroy the old snapshot reader.
* reader = None ;
2020-08-05 06:08:03 +02:00
2016-09-13 10:33:03 +02:00
if snapshot_dir . exists ( ) {
2016-12-27 12:53:56 +01:00
fs ::remove_dir_all ( & snapshot_dir ) ? ;
2016-09-13 10:33:03 +02:00
}
2020-08-05 06:08:03 +02:00
2016-12-27 12:53:56 +01:00
fs ::rename ( temp_dir , & snapshot_dir ) ? ;
2020-08-05 06:08:03 +02:00
2016-12-27 12:53:56 +01:00
* reader = Some ( LooseReader ::new ( snapshot_dir ) ? ) ;
2020-08-05 06:08:03 +02:00
2016-09-02 19:00:20 +02:00
guard . disarm ( ) ;
2016-09-02 16:15:25 +02:00
Ok ( ( ) )
}
2020-08-05 06:08:03 +02:00
2016-08-25 22:20:44 +02:00
/// Initialize the restoration synchronously.
2016-09-11 14:05:59 +02:00
/// The recover flag indicates whether to recover the restored snapshot.
pub fn init_restore ( & self , manifest : ManifestData , recover : bool ) -> Result < ( ) , Error > {
2018-05-16 22:01:55 +02:00
let mut res = self . restoration . lock ( ) ;
2020-08-05 06:08:03 +02:00
2016-08-25 22:20:44 +02:00
let rest_dir = self . restoration_dir ( ) ;
2018-05-16 22:01:55 +02:00
let rest_db = self . restoration_db ( ) ;
let recovery_temp = self . temp_recovery_dir ( ) ;
let prev_chunks = self . prev_chunks_dir ( ) ;
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
// delete and restore the restoration dir.
if let Err ( e ) = fs ::remove_dir_all ( & prev_chunks ) {
match e . kind ( ) {
ErrorKind ::NotFound = > { }
_ = > return Err ( e . into ( ) ) ,
}
2020-08-05 06:08:03 +02:00
}
2018-05-16 22:01:55 +02:00
// Move the previous recovery temp directory
// to `prev_chunks` to be able to restart restoring
// with previously downloaded blocks
// This step is optional, so don't fail on error
fs ::rename ( & recovery_temp , & prev_chunks ) . ok ( ) ;
2020-08-05 06:08:03 +02:00
2016-09-11 14:05:59 +02:00
self . state_chunks . store ( 0 , Ordering ::SeqCst ) ;
self . block_chunks . store ( 0 , Ordering ::SeqCst ) ;
2020-08-05 06:08:03 +02:00
2016-08-25 22:20:44 +02:00
// tear down existing restoration.
* res = None ;
2020-08-05 06:08:03 +02:00
2016-08-25 22:20:44 +02:00
// delete and restore the restoration dir.
if let Err ( e ) = fs ::remove_dir_all ( & rest_dir ) {
match e . kind ( ) {
ErrorKind ::NotFound = > { }
_ = > return Err ( e . into ( ) ) ,
2020-08-05 06:08:03 +02:00
}
2016-08-25 22:20:44 +02:00
}
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
* self . status . lock ( ) = RestorationStatus ::Initializing { chunks_done : 0 } ;
2020-08-05 06:08:03 +02:00
2016-12-27 12:53:56 +01:00
fs ::create_dir_all ( & rest_dir ) ? ;
2020-08-05 06:08:03 +02:00
2016-08-25 22:20:44 +02:00
// make new restoration.
2016-09-11 14:05:59 +02:00
let writer = match recover {
2018-05-16 22:01:55 +02:00
true = > Some ( LooseWriter ::new ( recovery_temp ) ? ) ,
2016-09-11 14:05:59 +02:00
false = > None ,
} ;
2020-08-05 06:08:03 +02:00
2016-08-25 22:20:44 +02:00
let params = RestorationParams {
2018-05-16 22:01:55 +02:00
manifest : manifest . clone ( ) ,
2016-08-25 22:20:44 +02:00
pruning : self . pruning ,
2018-05-16 22:01:55 +02:00
db : self . restoration_db_handler . open ( & rest_db ) ? ,
2016-08-25 22:20:44 +02:00
writer : writer ,
genesis : & self . genesis_block ,
2018-05-16 22:01:55 +02:00
guard : Guard ::new ( rest_db ) ,
2017-04-19 20:31:53 +02:00
engine : & * self . engine ,
2016-08-25 22:20:44 +02:00
} ;
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
let state_chunks = manifest . state_hashes . len ( ) ;
let block_chunks = manifest . block_hashes . len ( ) ;
2020-08-05 06:08:03 +02:00
2016-12-27 12:53:56 +01:00
* res = Some ( Restoration ::new ( params ) ? ) ;
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
self . restoring_snapshot . store ( true , Ordering ::SeqCst ) ;
2020-08-05 06:08:03 +02:00
2020-09-14 16:08:57 +02:00
let block_number = manifest . block_number ;
2018-05-16 22:01:55 +02:00
// Import previous chunks, continue if it fails
self . import_prev_chunks ( & mut res , manifest ) . ok ( ) ;
2020-08-05 06:08:03 +02:00
2018-11-18 00:06:34 +01:00
// It could be that the restoration failed or completed in the meanwhile
let mut restoration_status = self . status . lock ( ) ;
if let RestorationStatus ::Initializing { .. } = * restoration_status {
* restoration_status = RestorationStatus ::Ongoing {
2020-09-14 16:08:57 +02:00
block_number ,
2018-11-18 00:06:34 +01:00
state_chunks : state_chunks as u32 ,
block_chunks : block_chunks as u32 ,
state_chunks_done : self . state_chunks . load ( Ordering ::SeqCst ) as u32 ,
block_chunks_done : self . block_chunks . load ( Ordering ::SeqCst ) as u32 ,
} ;
}
2020-08-05 06:08:03 +02:00
2016-08-25 22:20:44 +02:00
Ok ( ( ) )
}
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
/// Import the previous chunks into the current restoration
fn import_prev_chunks (
& self ,
restoration : & mut Option < Restoration > ,
manifest : ManifestData ,
) -> Result < ( ) , Error > {
let prev_chunks = self . prev_chunks_dir ( ) ;
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
// Restore previous snapshot chunks
let files = fs ::read_dir ( prev_chunks . as_path ( ) ) ? ;
let mut num_temp_chunks = 0 ;
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
for prev_chunk_file in files {
// Don't go over all the files if the restoration has been aborted
if ! self . restoring_snapshot . load ( Ordering ::SeqCst ) {
trace! ( target :" snapshot " , " Aborting importing previous chunks " ) ;
return Ok ( ( ) ) ;
}
// Import the chunk, don't fail and continue if one fails
match self . import_prev_chunk ( restoration , & manifest , prev_chunk_file ) {
Ok ( true ) = > num_temp_chunks + = 1 ,
Err ( e ) = > trace! ( target : " snapshot " , " Error importing chunk: {:?} " , e ) ,
_ = > ( ) ,
2020-08-05 06:08:03 +02:00
}
2018-05-16 22:01:55 +02:00
}
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
trace! ( target :" snapshot " , " Imported {} previous chunks " , num_temp_chunks ) ;
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
// Remove the prev temp directory
fs ::remove_dir_all ( & prev_chunks ) ? ;
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
Ok ( ( ) )
}
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
/// Import a previous chunk at the given path. Returns whether the block was imported or not
fn import_prev_chunk (
& self ,
restoration : & mut Option < Restoration > ,
manifest : & ManifestData ,
file : io ::Result < fs ::DirEntry > ,
) -> Result < bool , Error > {
let file = file ? ;
let path = file . path ( ) ;
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
let mut file = File ::open ( path . clone ( ) ) ? ;
let mut buffer = Vec ::new ( ) ;
file . read_to_end ( & mut buffer ) ? ;
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
let hash = keccak ( & buffer ) ;
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
let is_state = if manifest . block_hashes . contains ( & hash ) {
false
} else if manifest . state_hashes . contains ( & hash ) {
true
} else {
return Ok ( false ) ;
} ;
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
self . feed_chunk_with_restoration ( restoration , hash , & buffer , is_state ) ? ;
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
trace! ( target : " snapshot " , " Fed chunk {:?} " , hash ) ;
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
Ok ( true )
}
2020-08-05 06:08:03 +02:00
2016-08-05 17:00:46 +02:00
// finalize the restoration. this accepts an already-locked
// restoration as an argument -- so acquiring it again _will_
// lead to deadlock.
fn finalize_restoration ( & self , rest : & mut Option < Restoration > ) -> Result < ( ) , Error > {
trace! ( target : " snapshot " , " finalizing restoration " ) ;
2020-08-05 06:08:03 +02:00
2016-09-11 14:05:59 +02:00
let recover = rest . as_ref ( ) . map_or ( false , | rest | rest . writer . is_some ( ) ) ;
2020-08-05 06:08:03 +02:00
2016-08-25 22:20:44 +02:00
// destroy the restoration before replacing databases and snapshot.
2017-05-17 12:41:33 +02:00
rest . take ( )
. map ( | r | r . finalize ( & * self . engine ) )
. unwrap_or ( Ok ( ( ) ) ) ? ;
2020-08-05 06:08:03 +02:00
2016-12-27 12:53:56 +01:00
self . replace_client_db ( ) ? ;
2020-08-05 06:08:03 +02:00
2016-09-11 14:05:59 +02:00
if recover {
let mut reader = self . reader . write ( ) ;
* reader = None ; // destroy the old reader if it existed.
2020-08-05 06:08:03 +02:00
2016-09-11 14:05:59 +02:00
let snapshot_dir = self . snapshot_dir ( ) ;
2020-08-05 06:08:03 +02:00
2016-09-13 10:33:03 +02:00
if snapshot_dir . exists ( ) {
trace! ( target : " snapshot " , " removing old snapshot dir at {} " , snapshot_dir . to_string_lossy ( ) ) ;
2016-12-27 12:53:56 +01:00
fs ::remove_dir_all ( & snapshot_dir ) ? ;
2016-08-25 22:20:44 +02:00
}
2020-08-05 06:08:03 +02:00
2016-09-11 14:05:59 +02:00
trace! ( target : " snapshot " , " copying restored snapshot files over " ) ;
2016-12-27 12:53:56 +01:00
fs ::rename ( self . temp_recovery_dir ( ) , & snapshot_dir ) ? ;
2020-08-05 06:08:03 +02:00
2016-12-27 12:53:56 +01:00
* reader = Some ( LooseReader ::new ( snapshot_dir ) ? ) ;
2016-09-11 14:05:59 +02:00
}
2020-08-05 06:08:03 +02:00
2016-09-11 14:05:59 +02:00
let _ = fs ::remove_dir_all ( self . restoration_dir ( ) ) ;
2016-08-25 22:20:44 +02:00
* self . status . lock ( ) = RestorationStatus ::Inactive ;
2020-08-05 06:08:03 +02:00
2016-08-05 17:00:46 +02:00
Ok ( ( ) )
}
2020-08-05 06:08:03 +02:00
2018-09-10 15:26:52 +02:00
/// Feed a chunk of either kind (block or state). no-op if no restoration or status is wrong.
fn feed_chunk ( & self , hash : H256 , chunk : & [ u8 ] , is_state : bool ) {
2016-08-25 22:20:44 +02:00
// TODO: be able to process block chunks and state chunks at same time?
2018-05-16 22:01:55 +02:00
let mut restoration = self . restoration . lock ( ) ;
2018-09-10 15:26:52 +02:00
match self . feed_chunk_with_restoration ( & mut restoration , hash , chunk , is_state ) {
Ok ( ( ) )
| Err ( Error ( SnapshotErrorKind ::Snapshot ( SnapshotError ::RestorationAborted ) , _ ) ) = > ( ) ,
Err ( e ) = > {
warn! ( " Encountered error during snapshot restoration: {} " , e ) ;
* self . restoration . lock ( ) = None ;
* self . status . lock ( ) = RestorationStatus ::Failed ;
let _ = fs ::remove_dir_all ( self . restoration_dir ( ) ) ;
2020-08-05 06:08:03 +02:00
}
}
2018-09-10 15:26:52 +02:00
}
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
/// Feed a chunk with the Restoration
fn feed_chunk_with_restoration (
& self ,
restoration : & mut Option < Restoration > ,
hash : H256 ,
chunk : & [ u8 ] ,
is_state : bool ,
) -> Result < ( ) , Error > {
let ( result , db ) = {
2020-09-14 16:08:57 +02:00
match self . restoration_status ( ) {
2018-05-16 22:01:55 +02:00
RestorationStatus ::Inactive | RestorationStatus ::Failed = > {
trace! ( target : " snapshot " , " Tried to restore chunk {:x} while inactive or failed " , hash ) ;
return Ok ( ( ) ) ;
}
RestorationStatus ::Ongoing { .. } | RestorationStatus ::Initializing { .. } = > {
2016-10-25 18:40:01 +02:00
let ( res , db ) = {
let rest = match * restoration {
Some ( ref mut r ) = > r ,
None = > return Ok ( ( ) ) ,
2016-08-05 17:00:46 +02:00
} ;
2020-08-05 06:08:03 +02:00
2016-10-25 18:40:01 +02:00
(
match is_state {
2016-11-13 13:52:53 +01:00
true = > rest . feed_state ( hash , chunk , & self . restoring_snapshot ) ,
false = > rest . feed_blocks (
hash ,
chunk ,
& * self . engine ,
& self . restoring_snapshot ,
) ,
2016-10-25 18:40:01 +02:00
}
. map ( | _ | rest . is_done ( ) ) ,
rest . db . clone ( ) ,
)
} ;
2020-08-05 06:08:03 +02:00
2016-10-25 18:40:01 +02:00
let res = match res {
Ok ( is_done ) = > {
match is_state {
true = > self . state_chunks . fetch_add ( 1 , Ordering ::SeqCst ) ,
false = > self . block_chunks . fetch_add ( 1 , Ordering ::SeqCst ) ,
} ;
2020-08-05 06:08:03 +02:00
2016-10-25 18:40:01 +02:00
match is_done {
true = > {
2018-07-06 15:09:39 +02:00
db . key_value ( ) . flush ( ) ? ;
2016-10-26 16:14:13 +02:00
drop ( db ) ;
return self . finalize_restoration ( & mut * restoration ) ;
2016-10-25 18:40:01 +02:00
}
false = > Ok ( ( ) ) ,
2016-08-05 17:00:46 +02:00
}
2020-08-05 06:08:03 +02:00
}
2016-10-25 18:40:01 +02:00
other = > other . map ( drop ) ,
} ;
( res , db )
2016-08-05 17:00:46 +02:00
}
2020-08-05 06:08:03 +02:00
}
2016-10-25 18:40:01 +02:00
} ;
2020-08-05 06:08:03 +02:00
2018-07-06 15:09:39 +02:00
result ? ;
db . key_value ( ) . flush ( ) ? ;
Ok ( ( ) )
2016-08-05 17:00:46 +02:00
}
2020-08-05 06:08:03 +02:00
2016-08-05 17:00:46 +02:00
/// Feed a state chunk to be processed synchronously.
pub fn feed_state_chunk ( & self , hash : H256 , chunk : & [ u8 ] ) {
2018-09-10 15:26:52 +02:00
self . feed_chunk ( hash , chunk , true ) ;
2016-08-05 17:00:46 +02:00
}
2020-08-05 06:08:03 +02:00
2016-08-05 17:00:46 +02:00
/// Feed a block chunk to be processed synchronously.
pub fn feed_block_chunk ( & self , hash : H256 , chunk : & [ u8 ] ) {
2018-09-10 15:26:52 +02:00
self . feed_chunk ( hash , chunk , false ) ;
2016-08-05 17:00:46 +02:00
}
}
impl SnapshotService for Service {
fn manifest ( & self ) -> Option < ManifestData > {
2016-08-25 22:20:44 +02:00
self . reader . read ( ) . as_ref ( ) . map ( | r | r . manifest ( ) . clone ( ) )
2016-08-05 17:00:46 +02:00
}
2020-08-05 06:08:03 +02:00
2017-07-18 16:59:33 +02:00
fn supported_versions ( & self ) -> Option < ( u64 , u64 ) > {
2017-05-17 12:41:33 +02:00
self . engine
. snapshot_components ( )
2017-07-18 16:59:33 +02:00
. map ( | c | ( c . min_supported_version ( ) , c . current_version ( ) ) )
2017-05-17 12:41:33 +02:00
}
2020-08-05 06:08:03 +02:00
2016-08-05 17:00:46 +02:00
fn chunk ( & self , hash : H256 ) -> Option < Bytes > {
2016-08-25 22:20:44 +02:00
self . reader . read ( ) . as_ref ( ) . and_then ( | r | r . chunk ( hash ) . ok ( ) )
2016-08-05 17:00:46 +02:00
}
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
fn completed_chunks ( & self ) -> Option < Vec < H256 > > {
let restoration = self . restoration . lock ( ) ;
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
match * restoration {
Some ( ref restoration ) = > {
let completed_chunks = restoration
. manifest
. block_hashes
. iter ( )
. filter ( | h | ! restoration . block_chunks_left . contains ( h ) )
. chain (
restoration
. manifest
. state_hashes
. iter ( )
. filter ( | h | ! restoration . state_chunks_left . contains ( h ) ) ,
)
. map ( | h | * h )
. collect ( ) ;
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
Some ( completed_chunks )
}
None = > None ,
}
}
2020-08-05 06:08:03 +02:00
2020-09-14 16:08:57 +02:00
fn creation_status ( & self ) -> CreationStatus {
if self . taking_snapshot . load ( Ordering ::SeqCst ) {
CreationStatus ::Ongoing {
block_number : self . taking_snapshot_at . load ( Ordering ::SeqCst ) as u32 ,
}
} else {
CreationStatus ::Inactive
}
}
fn restoration_status ( & self ) -> RestorationStatus {
2016-09-11 14:05:59 +02:00
let mut cur_status = self . status . lock ( ) ;
2020-08-05 06:08:03 +02:00
2018-05-16 22:01:55 +02:00
match * cur_status {
RestorationStatus ::Initializing {
ref mut chunks_done ,
} = > {
* chunks_done = self . state_chunks . load ( Ordering ::SeqCst ) as u32
+ self . block_chunks . load ( Ordering ::SeqCst ) as u32 ;
}
RestorationStatus ::Ongoing {
ref mut state_chunks_done ,
ref mut block_chunks_done ,
..
} = > {
* state_chunks_done = self . state_chunks . load ( Ordering ::SeqCst ) as u32 ;
* block_chunks_done = self . block_chunks . load ( Ordering ::SeqCst ) as u32 ;
}
_ = > ( ) ,
2016-09-11 14:05:59 +02:00
}
2020-08-05 06:08:03 +02:00
2016-09-11 14:05:59 +02:00
cur_status . clone ( )
2016-08-05 17:00:46 +02:00
}
2020-08-05 06:08:03 +02:00
2016-08-25 22:20:44 +02:00
fn begin_restore ( & self , manifest : ManifestData ) {
2016-10-30 09:56:34 +01:00
if let Err ( e ) = self
. io_channel
. lock ( )
. send ( ClientIoMessage ::BeginRestoration ( manifest ) )
{
2016-10-25 18:40:01 +02:00
trace! ( " Error sending snapshot service message: {:?} " , e ) ;
2016-08-25 22:20:44 +02:00
}
2020-08-05 06:08:03 +02:00
}
2016-08-25 22:20:44 +02:00
fn abort_restore ( & self ) {
2018-05-16 22:01:55 +02:00
trace! ( target : " snapshot " , " Aborting restore " ) ;
2016-11-13 13:52:53 +01:00
self . restoring_snapshot . store ( false , Ordering ::SeqCst ) ;
2016-08-25 22:20:44 +02:00
* self . restoration . lock ( ) = None ;
* self . status . lock ( ) = RestorationStatus ::Inactive ;
2016-08-05 17:00:46 +02:00
}
2020-08-05 06:08:03 +02:00
2016-08-05 17:00:46 +02:00
fn restore_state_chunk ( & self , hash : H256 , chunk : Bytes ) {
2016-10-30 09:56:34 +01:00
if let Err ( e ) = self
. io_channel
. lock ( )
. send ( ClientIoMessage ::FeedStateChunk ( hash , chunk ) )
{
2016-10-25 18:40:01 +02:00
trace! ( " Error sending snapshot service message: {:?} " , e ) ;
2016-08-05 17:00:46 +02:00
}
2020-08-05 06:08:03 +02:00
}
2016-08-05 17:00:46 +02:00
fn restore_block_chunk ( & self , hash : H256 , chunk : Bytes ) {
2016-10-30 09:56:34 +01:00
if let Err ( e ) = self
. io_channel
. lock ( )
. send ( ClientIoMessage ::FeedBlockChunk ( hash , chunk ) )
{
2016-10-25 18:40:01 +02:00
trace! ( " Error sending snapshot service message: {:?} " , e ) ;
2016-08-05 17:00:46 +02:00
}
2020-08-05 06:08:03 +02:00
}
2019-06-25 15:38:29 +02:00
fn abort_snapshot ( & self ) {
if self . taking_snapshot . load ( Ordering ::SeqCst ) {
trace! ( target : " snapshot " , " Aborting snapshot – Snapshot under way " ) ;
self . progress . abort . store ( true , Ordering ::SeqCst ) ;
}
2020-08-05 06:08:03 +02:00
}
2018-05-29 12:23:15 +02:00
fn shutdown ( & self ) {
2019-06-25 15:38:29 +02:00
trace! ( target : " snapshot " , " Shut down SnapshotService " ) ;
2018-05-29 12:23:15 +02:00
self . abort_restore ( ) ;
2019-06-25 15:38:29 +02:00
trace! ( target : " snapshot " , " Shut down SnapshotService - restore aborted " ) ;
self . abort_snapshot ( ) ;
trace! ( target : " snapshot " , " Shut down SnapshotService - snapshot aborted " ) ;
2018-05-29 12:23:15 +02:00
}
2016-08-10 16:29:40 +02:00
}
2016-09-05 12:24:03 +02:00
2016-09-06 15:41:56 +02:00
impl Drop for Service {
fn drop ( & mut self ) {
2019-06-25 15:38:29 +02:00
trace! ( target : " shutdown " , " Dropping Service " ) ;
2016-09-06 15:41:56 +02:00
self . abort_restore ( ) ;
2019-06-25 15:38:29 +02:00
trace! ( target : " shutdown " , " Dropping Service - restore aborted " ) ;
self . abort_snapshot ( ) ;
trace! ( target : " shutdown " , " Dropping Service - snapshot aborted " ) ;
2016-09-06 15:41:56 +02:00
}
}
2016-09-05 12:24:03 +02:00
#[ cfg(test) ]
mod tests {
2016-08-25 22:20:44 +02:00
use super ::* ;
2018-03-13 11:49:57 +01:00
use client ::ClientIoMessage ;
2016-09-05 12:24:03 +02:00
use io ::IoService ;
2017-10-17 17:24:47 +02:00
use journaldb ::Algorithm ;
2016-09-06 15:31:13 +02:00
use snapshot ::{ ManifestData , RestorationStatus , SnapshotService } ;
2016-09-05 12:24:03 +02:00
use spec ::Spec ;
2018-01-19 17:32:53 +01:00
use tempdir ::TempDir ;
2018-11-18 00:06:34 +01:00
use test_helpers ::{ generate_dummy_client_with_spec_and_data , restoration_db_handler } ;
2020-08-05 06:08:03 +02:00
2016-09-05 12:24:03 +02:00
#[ test ]
fn sends_async_messages ( ) {
2018-11-18 00:06:34 +01:00
let gas_prices = vec! [ 1. into ( ) , 2. into ( ) , 3. into ( ) , 999. into ( ) ] ;
let client = generate_dummy_client_with_spec_and_data ( Spec ::new_null , 400 , 5 , & gas_prices ) ;
2016-09-05 12:24:03 +02:00
let service = IoService ::< ClientIoMessage > ::start ( ) . unwrap ( ) ;
2018-03-12 18:05:52 +01:00
let spec = Spec ::new_test ( ) ;
2020-08-05 06:08:03 +02:00
2018-01-19 17:32:53 +01:00
let tempdir = TempDir ::new ( " " ) . unwrap ( ) ;
let dir = tempdir . path ( ) . join ( " snapshot " ) ;
2020-08-05 06:08:03 +02:00
2016-09-07 15:27:28 +02:00
let snapshot_params = ServiceParams {
engine : spec . engine . clone ( ) ,
genesis_block : spec . genesis_block ( ) ,
2018-04-09 14:21:37 +02:00
restoration_db_handler : restoration_db_handler ( Default ::default ( ) ) ,
2016-09-07 15:27:28 +02:00
pruning : Algorithm ::Archive ,
channel : service . channel ( ) ,
snapshot_root : dir ,
2018-11-18 00:06:34 +01:00
client : client ,
2016-09-07 15:27:28 +02:00
} ;
2020-08-05 06:08:03 +02:00
2016-09-07 15:27:28 +02:00
let service = Service ::new ( snapshot_params ) . unwrap ( ) ;
2020-08-05 06:08:03 +02:00
2016-09-05 12:24:03 +02:00
assert! ( service . manifest ( ) . is_none ( ) ) ;
assert! ( service . chunk ( Default ::default ( ) ) . is_none ( ) ) ;
2020-09-14 16:08:57 +02:00
assert_eq! ( service . restoration_status ( ) , RestorationStatus ::Inactive ) ;
2020-08-05 06:08:03 +02:00
2016-09-05 12:24:03 +02:00
let manifest = ManifestData {
2017-03-24 14:02:04 +01:00
version : 2 ,
2016-09-05 12:24:03 +02:00
state_hashes : vec ! [ ] ,
block_hashes : vec ! [ ] ,
state_root : Default ::default ( ) ,
block_number : 0 ,
block_hash : Default ::default ( ) ,
} ;
2020-08-05 06:08:03 +02:00
2016-09-05 12:24:03 +02:00
service . begin_restore ( manifest ) ;
service . abort_restore ( ) ;
service . restore_state_chunk ( Default ::default ( ) , vec! [ ] ) ;
service . restore_block_chunk ( Default ::default ( ) , vec! [ ] ) ;
}
2020-08-05 06:08:03 +02:00
2017-07-21 17:24:53 +02:00
#[ test ]
fn cannot_finish_with_invalid_chunks ( ) {
2018-01-10 13:35:18 +01:00
use ethereum_types ::H256 ;
2017-10-12 15:36:27 +02:00
use kvdb_rocksdb ::DatabaseConfig ;
2020-08-05 06:08:03 +02:00
2018-03-12 18:05:52 +01:00
let spec = Spec ::new_test ( ) ;
2018-01-19 17:32:53 +01:00
let tempdir = TempDir ::new ( " " ) . unwrap ( ) ;
2020-08-05 06:08:03 +02:00
2017-07-21 17:24:53 +02:00
let state_hashes : Vec < _ > = ( 0 .. 5 ) . map ( | _ | H256 ::random ( ) ) . collect ( ) ;
let block_hashes : Vec < _ > = ( 0 .. 5 ) . map ( | _ | H256 ::random ( ) ) . collect ( ) ;
let db_config = DatabaseConfig ::with_columns ( ::db ::NUM_COLUMNS ) ;
let gb = spec . genesis_block ( ) ;
let flag = ::std ::sync ::atomic ::AtomicBool ::new ( true ) ;
2020-08-05 06:08:03 +02:00
2017-07-21 17:24:53 +02:00
let params = RestorationParams {
manifest : ManifestData {
version : 2 ,
state_hashes : state_hashes . clone ( ) ,
block_hashes : block_hashes . clone ( ) ,
state_root : H256 ::default ( ) ,
block_number : 100000 ,
block_hash : H256 ::default ( ) ,
} ,
pruning : Algorithm ::Archive ,
2018-04-09 14:21:37 +02:00
db : restoration_db_handler ( db_config )
. open ( & tempdir . path ( ) . to_owned ( ) )
. unwrap ( ) ,
2017-07-21 17:24:53 +02:00
writer : None ,
genesis : & gb ,
guard : Guard ::benign ( ) ,
engine : & * spec . engine . clone ( ) ,
} ;
2020-08-05 06:08:03 +02:00
2017-07-21 17:24:53 +02:00
let mut restoration = Restoration ::new ( params ) . unwrap ( ) ;
let definitely_bad_chunk = [ 1 , 2 , 3 , 4 , 5 ] ;
2020-08-05 06:08:03 +02:00
2017-07-21 17:24:53 +02:00
for hash in state_hashes {
assert! ( restoration
. feed_state ( hash , & definitely_bad_chunk , & flag )
. is_err ( ) ) ;
assert! ( ! restoration . is_done ( ) ) ;
}
2020-08-05 06:08:03 +02:00
2017-07-21 17:24:53 +02:00
for hash in block_hashes {
assert! ( restoration
. feed_blocks ( hash , & definitely_bad_chunk , & * spec . engine , & flag )
. is_err ( ) ) ;
assert! ( ! restoration . is_done ( ) ) ;
}
}
2016-10-18 18:16:00 +02:00
}