2019-01-07 11:33:07 +01:00
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
2016-08-05 17:00:46 +02:00
2019-01-07 11:33:07 +01:00
// Parity Ethereum is free software: you can redistribute it and/or modify
2016-08-05 17:00:46 +02:00
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
2019-01-07 11:33:07 +01:00
// Parity Ethereum is distributed in the hope that it will be useful,
2016-08-05 17:00:46 +02:00
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
2019-01-07 11:33:07 +01:00
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
2016-08-05 17:00:46 +02:00
//! Snapshot network service implementation.
2017-04-19 20:31:53 +02:00
use std ::collections ::HashSet ;
2018-05-16 22:01:55 +02:00
use std ::io ::{ self , Read , ErrorKind } ;
use std ::fs ::{ self , File } ;
2016-09-07 15:27:28 +02:00
use std ::path ::PathBuf ;
2016-08-05 17:00:46 +02:00
use std ::sync ::Arc ;
2016-09-07 15:27:14 +02:00
use std ::sync ::atomic ::{ AtomicBool , AtomicUsize , Ordering } ;
2018-11-18 00:06:34 +01:00
use std ::cmp ;
2016-08-05 17:00:46 +02:00
2018-06-20 15:13:07 +02:00
use blockchain ::{ BlockChain , BlockChainDB , BlockChainDBHandler } ;
2019-09-03 11:29:25 +02:00
use bytes ::Bytes ;
use common_types ::{
2019-08-28 10:09:42 +02:00
io_message ::ClientIoMessage ,
2019-07-18 12:27:08 +02:00
errors ::{ EthcoreError as Error , SnapshotError , SnapshotError ::UnlinkedAncientBlockChain } ,
ids ::BlockId ,
2019-08-22 18:25:49 +02:00
snapshot ::{ ManifestData , Progress , RestorationStatus } ,
2019-07-18 12:27:08 +02:00
} ;
2019-09-10 22:44:33 +02:00
use client_traits ::ChainInfo ;
2019-09-03 11:29:25 +02:00
use engine ::Engine ;
2018-01-10 13:35:18 +01:00
use ethereum_types ::H256 ;
2019-09-03 11:29:25 +02:00
use ethcore_io ::IoChannel ;
2017-10-17 06:20:24 +02:00
use journaldb ::Algorithm ;
2019-09-03 11:29:25 +02:00
use keccak_hash ::keccak ;
2018-11-18 00:06:34 +01:00
use kvdb ::DBTransaction ;
2019-10-24 16:46:32 +02:00
use log ::{ debug , error , info , trace , warn } ;
2019-09-03 11:29:25 +02:00
use parking_lot ::{ Mutex , RwLock , RwLockReadGuard } ;
2017-10-10 20:01:27 +02:00
use snappy ;
2019-09-03 11:29:25 +02:00
use trie_db ::TrieError ;
2019-09-10 22:44:33 +02:00
use crate ::{ SnapshotClient , SnapshotWriter } ;
2019-09-03 11:29:25 +02:00
use super ::{
StateRebuilder ,
SnapshotService ,
Rebuilder ,
MAX_CHUNK_SIZE ,
io ::{ SnapshotReader , LooseReader , LooseWriter } ,
chunker ,
} ;
2016-08-05 17:00:46 +02:00
2016-09-02 19:00:20 +02:00
/// Helper for removing directories in case of error.
2019-09-10 22:44:33 +02:00
pub struct Guard ( bool , PathBuf ) ;
2016-09-02 19:00:20 +02:00
impl Guard {
fn new ( path : PathBuf ) -> Self { Guard ( true , path ) }
2019-09-10 22:44:33 +02:00
#[ cfg(any(test, feature = " test-helpers " )) ]
pub fn benign ( ) -> Self { Guard ( false , PathBuf ::default ( ) ) }
2017-07-21 17:24:53 +02:00
2016-09-02 19:00:20 +02:00
fn disarm ( mut self ) { self . 0 = false }
}
impl Drop for Guard {
fn drop ( & mut self ) {
if self . 0 {
let _ = fs ::remove_dir_all ( & self . 1 ) ;
}
}
}
2016-08-05 17:00:46 +02:00
/// State restoration manager.
2019-09-10 22:44:33 +02:00
pub struct Restoration {
2016-08-25 22:20:44 +02:00
manifest : ManifestData ,
2016-08-05 17:00:46 +02:00
state_chunks_left : HashSet < H256 > ,
block_chunks_left : HashSet < H256 > ,
state : StateRebuilder ,
2019-06-14 18:48:35 +02:00
secondary : Box < dyn Rebuilder > ,
2016-09-11 14:05:59 +02:00
writer : Option < LooseWriter > ,
2016-08-05 17:00:46 +02:00
snappy_buffer : Bytes ,
final_state_root : H256 ,
2016-09-02 19:00:20 +02:00
guard : Guard ,
2019-06-14 18:48:35 +02:00
db : Arc < dyn BlockChainDB > ,
2016-08-05 17:00:46 +02:00
}
2019-09-10 22:44:33 +02:00
/// Params to initialise restoration
pub struct RestorationParams < ' a > {
2016-08-25 22:20:44 +02:00
manifest : ManifestData , // manifest to base restoration on.
pruning : Algorithm , // pruning algorithm for the database.
2019-06-14 18:48:35 +02:00
db : Arc < dyn BlockChainDB > , // database
2016-09-11 14:05:59 +02:00
writer : Option < LooseWriter > , // writer for recovered snapshot.
2016-08-25 22:20:44 +02:00
genesis : & ' a [ u8 ] , // genesis block of the chain.
2016-09-02 19:00:20 +02:00
guard : Guard , // guard for the restoration directory.
2019-06-28 10:18:18 +02:00
engine : & ' a dyn Engine ,
2016-08-25 22:20:44 +02:00
}
2019-09-10 22:44:33 +02:00
#[ cfg(any(test, feature = " test-helpers " )) ]
impl < ' a > RestorationParams < ' a > {
pub fn new (
manifest : ManifestData ,
pruning : Algorithm ,
db : Arc < dyn BlockChainDB > ,
writer : Option < LooseWriter > ,
genesis : & ' a [ u8 ] ,
guard : Guard ,
engine : & ' a dyn Engine ,
) -> Self {
Self { manifest , pruning , db , writer , genesis , guard , engine }
}
}
2016-08-05 17:00:46 +02:00
impl Restoration {
2019-09-10 22:44:33 +02:00
/// Build a Restoration using the given parameters.
pub fn new ( params : RestorationParams ) -> Result < Self , Error > {
2016-08-25 22:20:44 +02:00
let manifest = params . manifest ;
let state_chunks = manifest . state_hashes . iter ( ) . cloned ( ) . collect ( ) ;
let block_chunks = manifest . block_hashes . iter ( ) . cloned ( ) . collect ( ) ;
2018-04-09 14:21:37 +02:00
let raw_db = params . db ;
2016-08-05 17:00:46 +02:00
2017-01-23 15:27:11 +01:00
let chain = BlockChain ::new ( Default ::default ( ) , params . genesis , raw_db . clone ( ) ) ;
2019-08-22 18:25:49 +02:00
let chunker = chunker ( params . engine . snapshot_mode ( ) )
. ok_or_else ( | | Error ::Snapshot ( SnapshotError ::SnapshotsUnsupported ) ) ? ;
2017-04-19 20:31:53 +02:00
2019-08-22 18:25:49 +02:00
let secondary = chunker . rebuilder ( chain , raw_db . clone ( ) , & manifest ) ? ;
2016-08-05 17:00:46 +02:00
2019-07-01 14:41:45 +02:00
let final_state_root = manifest . state_root . clone ( ) ;
2017-05-17 12:41:33 +02:00
2016-08-05 17:00:46 +02:00
Ok ( Restoration {
2019-07-01 14:41:45 +02:00
manifest ,
2016-08-25 22:20:44 +02:00
state_chunks_left : state_chunks ,
block_chunks_left : block_chunks ,
2018-06-20 15:13:07 +02:00
state : StateRebuilder ::new ( raw_db . key_value ( ) . clone ( ) , params . pruning ) ,
2019-07-01 14:41:45 +02:00
secondary ,
2016-08-25 22:20:44 +02:00
writer : params . writer ,
2016-08-05 17:00:46 +02:00
snappy_buffer : Vec ::new ( ) ,
2019-07-01 14:41:45 +02:00
final_state_root ,
2016-09-02 19:00:20 +02:00
guard : params . guard ,
2016-10-25 18:40:01 +02:00
db : raw_db ,
2016-08-05 17:00:46 +02:00
} )
}
2019-09-10 22:44:33 +02:00
/// Feeds a chunk of state data to the Restoration. Aborts early if `flag` becomes false.
pub fn feed_state ( & mut self , hash : H256 , chunk : & [ u8 ] , flag : & AtomicBool ) -> Result < ( ) , Error > {
2017-07-21 17:24:53 +02:00
if self . state_chunks_left . contains ( & hash ) {
2018-02-22 14:52:29 +01:00
let expected_len = snappy ::decompressed_len ( chunk ) ? ;
if expected_len > MAX_CHUNK_SIZE {
trace! ( target : " snapshot " , " Discarding large chunk: {} vs {} " , expected_len , MAX_CHUNK_SIZE ) ;
2019-09-03 11:29:25 +02:00
return Err ( SnapshotError ::ChunkTooLarge . into ( ) ) ;
2018-02-22 14:52:29 +01:00
}
2016-12-27 12:53:56 +01:00
let len = snappy ::decompress_into ( chunk , & mut self . snappy_buffer ) ? ;
2016-08-05 17:00:46 +02:00
2016-12-27 12:53:56 +01:00
self . state . feed ( & self . snappy_buffer [ .. len ] , flag ) ? ;
2016-09-11 14:05:59 +02:00
if let Some ( ref mut writer ) = self . writer . as_mut ( ) {
2016-12-27 12:53:56 +01:00
writer . write_state_chunk ( hash , chunk ) ? ;
2016-09-11 14:05:59 +02:00
}
2017-07-21 17:24:53 +02:00
self . state_chunks_left . remove ( & hash ) ;
2016-08-05 17:00:46 +02:00
}
Ok ( ( ) )
}
2019-09-10 22:44:33 +02:00
/// Feeds a chunk of block data to the `Restoration`. Aborts early if `flag` becomes false.
pub fn feed_blocks ( & mut self , hash : H256 , chunk : & [ u8 ] , engine : & dyn Engine , flag : & AtomicBool ) -> Result < ( ) , Error > {
2017-07-21 17:24:53 +02:00
if self . block_chunks_left . contains ( & hash ) {
2018-02-22 14:52:29 +01:00
let expected_len = snappy ::decompressed_len ( chunk ) ? ;
if expected_len > MAX_CHUNK_SIZE {
trace! ( target : " snapshot " , " Discarding large chunk: {} vs {} " , expected_len , MAX_CHUNK_SIZE ) ;
2019-09-03 11:29:25 +02:00
return Err ( SnapshotError ::ChunkTooLarge . into ( ) ) ;
2018-02-22 14:52:29 +01:00
}
2016-12-27 12:53:56 +01:00
let len = snappy ::decompress_into ( chunk , & mut self . snappy_buffer ) ? ;
2016-08-25 22:20:44 +02:00
2017-04-19 20:31:53 +02:00
self . secondary . feed ( & self . snappy_buffer [ .. len ] , engine , flag ) ? ;
2016-09-11 14:05:59 +02:00
if let Some ( ref mut writer ) = self . writer . as_mut ( ) {
2016-12-27 12:53:56 +01:00
writer . write_block_chunk ( hash , chunk ) ? ;
2016-09-11 14:05:59 +02:00
}
2017-07-21 17:24:53 +02:00
self . block_chunks_left . remove ( & hash ) ;
2016-08-25 22:20:44 +02:00
}
2016-08-05 17:00:46 +02:00
2016-08-25 22:20:44 +02:00
Ok ( ( ) )
}
// finish up restoration.
2019-07-01 14:41:45 +02:00
fn finalize ( mut self ) -> Result < ( ) , Error > {
2016-08-25 22:20:44 +02:00
if ! self . is_done ( ) { return Ok ( ( ) ) }
// verify final state root.
let root = self . state . state_root ( ) ;
if root ! = self . final_state_root {
2018-05-12 22:46:08 +02:00
warn! ( " Final restored state has wrong state root: expected {:?}, got {:?} " , self . final_state_root , root ) ;
2016-08-25 22:20:44 +02:00
return Err ( TrieError ::InvalidStateRoot ( root ) . into ( ) ) ;
2016-08-05 17:00:46 +02:00
}
2016-08-25 22:20:44 +02:00
// check for missing code.
2017-06-28 13:17:36 +02:00
self . state . finalize ( self . manifest . block_number , self . manifest . block_hash ) ? ;
2016-08-25 22:20:44 +02:00
2016-10-28 16:10:30 +02:00
// connect out-of-order chunks and verify chain integrity.
2019-07-01 14:41:45 +02:00
self . secondary . finalize ( ) ? ;
2016-08-25 22:20:44 +02:00
2016-09-11 14:05:59 +02:00
if let Some ( writer ) = self . writer {
2016-12-27 12:53:56 +01:00
writer . finish ( self . manifest ) ? ;
2016-09-11 14:05:59 +02:00
}
2016-08-25 22:20:44 +02:00
2016-09-02 19:00:20 +02:00
self . guard . disarm ( ) ;
2019-10-24 16:46:32 +02:00
trace! ( target : " snapshot " , " Restoration finalised correctly " ) ;
2016-08-05 17:00:46 +02:00
Ok ( ( ) )
}
2019-09-10 22:44:33 +02:00
/// Check if we're done restoring: no more block chunks and no more state chunks to process.
pub fn is_done ( & self ) -> bool {
2016-08-05 17:00:46 +02:00
self . block_chunks_left . is_empty ( ) & & self . state_chunks_left . is_empty ( )
}
}
/// Type alias for client io channel.
2019-09-03 11:29:25 +02:00
pub type Channel < C > = IoChannel < ClientIoMessage < C > > ;
2018-11-18 00:06:34 +01:00
2016-09-07 15:27:28 +02:00
/// Snapshot service parameters.
2019-09-03 11:29:25 +02:00
pub struct ServiceParams < C : 'static > {
2016-09-07 15:27:28 +02:00
/// The consensus engine this is built on.
2019-06-28 10:18:18 +02:00
pub engine : Arc < dyn Engine > ,
2016-09-07 15:27:28 +02:00
/// The chain's genesis block.
pub genesis_block : Bytes ,
/// State pruning algorithm.
pub pruning : Algorithm ,
2018-04-09 14:21:37 +02:00
/// Handler for opening a restoration DB.
2019-06-14 18:48:35 +02:00
pub restoration_db_handler : Box < dyn BlockChainDBHandler > ,
2016-09-07 15:27:28 +02:00
/// Async IO channel for sending messages.
2019-09-03 11:29:25 +02:00
pub channel : Channel < C > ,
2016-09-07 15:27:28 +02:00
/// The directory to put snapshots in.
/// Usually "<chain hash>/snapshot"
pub snapshot_root : PathBuf ,
/// A handle for database restoration.
2019-09-03 11:29:25 +02:00
pub client : Arc < C > ,
2016-09-07 15:27:28 +02:00
}
/// `SnapshotService` implementation.
/// This controls taking snapshots and restoring from them.
2019-09-03 11:29:25 +02:00
pub struct Service < C : Send + Sync + 'static > {
2016-08-05 17:00:46 +02:00
restoration : Mutex < Option < Restoration > > ,
2019-06-14 18:48:35 +02:00
restoration_db_handler : Box < dyn BlockChainDBHandler > ,
2016-09-07 15:27:28 +02:00
snapshot_root : PathBuf ,
2019-09-03 11:29:25 +02:00
io_channel : Mutex < Channel < C > > ,
2016-08-05 17:00:46 +02:00
pruning : Algorithm ,
status : Mutex < RestorationStatus > ,
2016-08-25 22:20:44 +02:00
reader : RwLock < Option < LooseReader > > ,
2019-06-28 10:18:18 +02:00
engine : Arc < dyn Engine > ,
2016-08-05 23:33:55 +02:00
genesis_block : Bytes ,
2016-08-05 17:00:46 +02:00
state_chunks : AtomicUsize ,
block_chunks : AtomicUsize ,
2019-09-03 11:29:25 +02:00
client : Arc < C > ,
2019-10-28 18:24:45 +01:00
progress : RwLock < Progress > ,
2016-09-07 15:27:14 +02:00
taking_snapshot : AtomicBool ,
2016-11-13 13:52:53 +01:00
restoring_snapshot : AtomicBool ,
2016-08-05 17:00:46 +02:00
}
2019-09-03 11:29:25 +02:00
impl < C > Service < C > where C : SnapshotClient + ChainInfo {
2016-09-07 15:27:28 +02:00
/// Create a new snapshot service from the given parameters.
2019-09-03 11:29:25 +02:00
pub fn new ( params : ServiceParams < C > ) -> Result < Self , Error > {
2016-09-07 15:27:28 +02:00
let mut service = Service {
2016-08-05 17:00:46 +02:00
restoration : Mutex ::new ( None ) ,
2018-04-09 14:21:37 +02:00
restoration_db_handler : params . restoration_db_handler ,
2016-09-07 15:27:28 +02:00
snapshot_root : params . snapshot_root ,
2016-10-30 09:56:34 +01:00
io_channel : Mutex ::new ( params . channel ) ,
2016-09-07 15:27:28 +02:00
pruning : params . pruning ,
2016-08-05 17:00:46 +02:00
status : Mutex ::new ( RestorationStatus ::Inactive ) ,
2016-09-07 15:27:28 +02:00
reader : RwLock ::new ( None ) ,
engine : params . engine ,
genesis_block : params . genesis_block ,
2016-08-05 17:00:46 +02:00
state_chunks : AtomicUsize ::new ( 0 ) ,
block_chunks : AtomicUsize ::new ( 0 ) ,
2018-11-18 00:06:34 +01:00
client : params . client ,
2019-10-28 18:24:45 +01:00
progress : RwLock ::new ( Progress ::new ( ) ) ,
2016-09-07 15:27:14 +02:00
taking_snapshot : AtomicBool ::new ( false ) ,
2016-11-13 13:52:53 +01:00
restoring_snapshot : AtomicBool ::new ( false ) ,
2016-08-05 17:00:46 +02:00
} ;
2016-08-25 22:20:44 +02:00
// create the root snapshot dir if it doesn't exist.
2016-09-07 15:27:28 +02:00
if let Err ( e ) = fs ::create_dir_all ( & service . snapshot_root ) {
2016-08-10 16:29:40 +02:00
if e . kind ( ) ! = ErrorKind ::AlreadyExists {
return Err ( e . into ( ) )
2016-08-05 17:00:46 +02:00
}
}
2018-05-16 22:01:55 +02:00
// delete the temporary restoration DB dir if it does exist.
if let Err ( e ) = fs ::remove_dir_all ( service . restoration_db ( ) ) {
2016-08-10 16:29:40 +02:00
if e . kind ( ) ! = ErrorKind ::NotFound {
return Err ( e . into ( ) )
2016-08-05 17:00:46 +02:00
}
}
2016-09-05 14:25:56 +02:00
// delete the temporary snapshot dir if it does exist.
if let Err ( e ) = fs ::remove_dir_all ( service . temp_snapshot_dir ( ) ) {
2016-09-05 14:28:28 +02:00
if e . kind ( ) ! = ErrorKind ::NotFound {
2016-09-05 14:25:56 +02:00
return Err ( e . into ( ) )
}
}
2016-09-07 15:27:28 +02:00
let reader = LooseReader ::new ( service . snapshot_dir ( ) ) . ok ( ) ;
* service . reader . get_mut ( ) = reader ;
2016-08-05 17:00:46 +02:00
2016-09-07 15:27:28 +02:00
Ok ( service )
2016-08-05 17:00:46 +02:00
}
2016-08-25 22:20:44 +02:00
// get the current snapshot dir.
fn snapshot_dir ( & self ) -> PathBuf {
2016-09-07 15:27:28 +02:00
let mut dir = self . snapshot_root . clone ( ) ;
2016-08-25 22:20:44 +02:00
dir . push ( " current " ) ;
dir
}
2016-09-02 16:15:25 +02:00
// get the temporary snapshot dir.
fn temp_snapshot_dir ( & self ) -> PathBuf {
2016-09-07 15:27:28 +02:00
let mut dir = self . snapshot_root . clone ( ) ;
2016-09-02 16:15:25 +02:00
dir . push ( " in_progress " ) ;
dir
}
2016-08-05 17:00:46 +02:00
// get the restoration directory.
fn restoration_dir ( & self ) -> PathBuf {
2016-09-07 15:27:28 +02:00
let mut dir = self . snapshot_root . clone ( ) ;
2016-08-05 17:00:46 +02:00
dir . push ( " restoration " ) ;
dir
}
// restoration db path.
fn restoration_db ( & self ) -> PathBuf {
let mut dir = self . restoration_dir ( ) ;
dir . push ( " db " ) ;
dir
}
2016-08-25 22:20:44 +02:00
// temporary snapshot recovery path.
fn temp_recovery_dir ( & self ) -> PathBuf {
let mut dir = self . restoration_dir ( ) ;
dir . push ( " temp " ) ;
dir
}
2018-05-16 22:01:55 +02:00
// previous snapshot chunks path.
fn prev_chunks_dir ( & self ) -> PathBuf {
let mut dir = self . snapshot_root . clone ( ) ;
dir . push ( " prev_chunks " ) ;
dir
}
2018-11-18 00:06:34 +01:00
// Migrate the blocks in the current DB into the new chain
fn migrate_blocks ( & self ) -> Result < usize , Error > {
// Count the number of migrated blocks
let mut count = 0 ;
let rest_db = self . restoration_db ( ) ;
let cur_chain_info = self . client . chain_info ( ) ;
let next_db = self . restoration_db_handler . open ( & rest_db ) ? ;
let next_chain = BlockChain ::new ( Default ::default ( ) , & [ ] , next_db . clone ( ) ) ;
let next_chain_info = next_chain . chain_info ( ) ;
// The old database looks like this:
// [genesis, best_ancient_block] ... [first_block, best_block]
2019-07-01 14:41:45 +02:00
// If we are fully synced neither `best_ancient_block` nor `first_block` is set, and we can
// assume that the whole range from [genesis, best_block] is imported.
// The new database only contains the tip of the chain ([new_first_block, new_best_block]),
2018-11-18 00:06:34 +01:00
// so the useful set of blocks is defined as:
// [0 ... min(new.first_block, best_ancient_block or best_block)]
2019-07-01 14:41:45 +02:00
//
// If, for whatever reason, the old db does not have ancient blocks (i.e.
// `best_ancient_block` is `None` AND a non-zero `first_block`), such that the old db looks
// like [old_first_block..old_best_block] (which may or may not partially overlap with
// [new_first_block..new_best_block]) we do the conservative thing and do not migrate the
// old blocks.
2018-11-18 00:06:34 +01:00
let find_range = | | -> Option < ( H256 , H256 ) > {
2019-07-01 14:41:45 +02:00
// In theory, if the current best_block is > new first_block (i.e. ranges overlap)
// we could salvage them but what if there's been a re-org at the boundary and the two
// chains do not match anymore? We'd have to check the existing blocks carefully.
if cur_chain_info . ancient_block_number . is_none ( ) & & cur_chain_info . first_block_number . unwrap_or ( 0 ) > 0 {
info! ( target : " blockchain " , " blocks in the current DB do not stretch back to genesis; can't salvage them into the new DB. In current DB, first block: #{:?}/{:?}, best block: #{:?}/{:?} " ,
cur_chain_info . first_block_number , cur_chain_info . first_block_hash ,
cur_chain_info . best_block_number , cur_chain_info . best_block_hash ) ;
return None ;
}
2018-11-18 00:06:34 +01:00
let next_available_from = next_chain_info . first_block_number ? ;
let cur_available_to = cur_chain_info . ancient_block_number . unwrap_or ( cur_chain_info . best_block_number ) ;
let highest_block_num = cmp ::min ( next_available_from . saturating_sub ( 1 ) , cur_available_to ) ;
if highest_block_num = = 0 {
return None ;
}
2019-07-01 14:41:45 +02:00
trace! ( target : " snapshot " , " Trying to import ancient blocks until {}. First block in new chain=#{}, first block in old chain=#{:?}, best block in old chain=#{} " ,
highest_block_num , next_available_from , cur_chain_info . first_block_number , cur_chain_info . best_block_number ) ;
2018-11-18 00:06:34 +01:00
// Here we start from the highest block number and go backward to 0,
2019-07-01 14:41:45 +02:00
// thus starting at `highest_block_num` and targeting `0`.
2018-11-18 00:06:34 +01:00
let target_hash = self . client . block_hash ( BlockId ::Number ( 0 ) ) ? ;
let start_hash = self . client . block_hash ( BlockId ::Number ( highest_block_num ) ) ? ;
Some ( ( start_hash , target_hash ) )
} ;
let ( start_hash , target_hash ) = match find_range ( ) {
Some ( x ) = > x ,
None = > return Ok ( 0 ) ,
} ;
2019-10-24 16:46:32 +02:00
info! ( target : " snapshot " , " Migrating blocks from old db to new. Start: #{}/{:?}, Target: #{}/{:?} " ,
self . client . block_number ( BlockId ::Hash ( start_hash ) ) . unwrap_or_default ( ) , start_hash ,
self . client . block_number ( BlockId ::Hash ( target_hash ) ) . unwrap_or_default ( ) , target_hash ,
) ;
2018-11-18 00:06:34 +01:00
let mut batch = DBTransaction ::new ( ) ;
let mut parent_hash = start_hash ;
while parent_hash ! = target_hash {
// Early return if restoration is aborted
if ! self . restoring_snapshot . load ( Ordering ::SeqCst ) {
return Ok ( count ) ;
}
2019-07-01 14:41:45 +02:00
let block = self . client . block ( BlockId ::Hash ( parent_hash ) ) . ok_or_else ( | | {
error! ( target : " snapshot " , " migrate_blocks: did not find block from parent_hash={:#x} (start_hash={:#x}) " , parent_hash , start_hash ) ;
UnlinkedAncientBlockChain ( parent_hash )
} ) ? ;
2018-11-18 00:06:34 +01:00
parent_hash = block . parent_hash ( ) ;
let block_number = block . number ( ) ;
let block_receipts = self . client . block_receipts ( & block . hash ( ) ) ;
let parent_total_difficulty = self . client . block_total_difficulty ( BlockId ::Hash ( parent_hash ) ) ;
match ( block_receipts , parent_total_difficulty ) {
( Some ( block_receipts ) , Some ( parent_total_difficulty ) ) = > {
let block_receipts = block_receipts . receipts ;
next_chain . insert_unordered_block ( & mut batch , block , block_receipts , Some ( parent_total_difficulty ) , false , true ) ;
count + = 1 ;
} ,
2019-07-01 14:41:45 +02:00
_ = > {
// We couldn't reach the targeted hash
error! ( target : " snapshot " , " migrate_blocks: failed to find receipts and parent total difficulty; cannot reach the target_hash ({:#x}). Block #{}, parent_hash={:#x}, parent_total_difficulty={:?}, start_hash={:#x}, ancient_block_number={:?}, best_block_number={:?} " ,
target_hash , block_number , parent_hash , parent_total_difficulty ,
start_hash , cur_chain_info . ancient_block_number , cur_chain_info . best_block_number ,
) ;
return Err ( UnlinkedAncientBlockChain ( parent_hash ) . into ( ) ) ;
} ,
2018-11-18 00:06:34 +01:00
}
2019-06-19 10:13:09 +02:00
// Writing changes to DB and logging every now and then
2018-11-18 00:06:34 +01:00
if block_number % 1_000 = = 0 {
next_db . key_value ( ) . write_buffered ( batch ) ;
next_chain . commit ( ) ;
next_db . key_value ( ) . flush ( ) . expect ( " DB flush failed. " ) ;
batch = DBTransaction ::new ( ) ;
2019-10-24 16:46:32 +02:00
if block_number % 10_000 = = 0 {
info! ( target : " snapshot " , " Block restoration at #{} " , block_number ) ;
}
2018-11-18 00:06:34 +01:00
}
}
// Final commit to the DB
next_db . key_value ( ) . write_buffered ( batch ) ;
next_chain . commit ( ) ;
next_db . key_value ( ) . flush ( ) . expect ( " DB flush failed. " ) ;
// Update best ancient block in the Next Chain
next_chain . update_best_ancient_block ( & start_hash ) ;
Ok ( count )
}
2016-09-11 14:05:59 +02:00
/// Get a reference to the snapshot reader.
pub fn reader ( & self ) -> RwLockReadGuard < Option < LooseReader > > {
self . reader . read ( )
}
2016-09-06 17:44:11 +02:00
/// Tick the snapshot service. This will log any active snapshot
/// being taken.
pub fn tick ( & self ) {
2019-10-28 18:24:45 +01:00
if self . progress . read ( ) . done ( ) | | ! self . taking_snapshot . load ( Ordering ::SeqCst ) { return }
2016-09-06 17:44:11 +02:00
2019-10-28 18:24:45 +01:00
let p = & self . progress . read ( ) ;
2019-10-24 16:46:32 +02:00
info! ( " Snapshot: {} accounts, {} blocks, {} bytes " , p . accounts ( ) , p . blocks ( ) , p . bytes ( ) ) ;
let rate = p . rate ( ) ;
debug! ( target : " snapshot " , " Current progress rate: {:.0} acc/s, {:.0} bytes/s (compressed) " , rate . 0 , rate . 1 ) ;
2016-09-06 17:44:11 +02:00
}
2016-09-02 16:15:25 +02:00
/// Take a snapshot at the block with the given number.
2019-10-24 16:46:32 +02:00
/// Calling this while a restoration is in progress or vice versa
2016-09-02 16:15:25 +02:00
/// will lead to a race condition where the first one to finish will
/// have their produced snapshot overwritten.
2019-09-03 11:29:25 +02:00
pub fn take_snapshot ( & self , client : & C , num : u64 ) -> Result < ( ) , Error > {
2016-09-07 15:27:14 +02:00
if self . taking_snapshot . compare_and_swap ( false , true , Ordering ::SeqCst ) {
info! ( " Skipping snapshot at #{} as another one is currently in-progress. " , num ) ;
return Ok ( ( ) ) ;
}
2016-09-02 16:15:25 +02:00
info! ( " Taking snapshot at #{} " , num ) ;
2019-10-24 16:46:32 +02:00
{
scopeguard ::defer! { {
self . taking_snapshot . store ( false , Ordering ::SeqCst ) ;
} }
let start_time = std ::time ::Instant ::now ( ) ;
2019-10-28 18:24:45 +01:00
* self . progress . write ( ) = Progress ::new ( ) ;
2019-10-24 16:46:32 +02:00
let temp_dir = self . temp_snapshot_dir ( ) ;
let snapshot_dir = self . snapshot_dir ( ) ;
2016-09-02 16:15:25 +02:00
2019-10-24 16:46:32 +02:00
let _ = fs ::remove_dir_all ( & temp_dir ) ; // expected to fail
2016-09-02 16:15:25 +02:00
2019-10-24 16:46:32 +02:00
let writer = LooseWriter ::new ( temp_dir . clone ( ) ) ? ;
2016-09-02 19:00:20 +02:00
2019-10-24 16:46:32 +02:00
let guard = Guard ::new ( temp_dir . clone ( ) ) ;
let _ = client . take_snapshot ( writer , BlockId ::Number ( num ) , & self . progress ) ? ;
info! ( " Finished taking snapshot at #{}, in {:.0?} " , num , start_time . elapsed ( ) ) ;
2016-09-02 16:15:25 +02:00
2019-10-24 16:46:32 +02:00
// destroy the old snapshot reader.
let mut reader = self . reader . write ( ) ;
* reader = None ;
2016-09-06 17:44:11 +02:00
2019-10-24 16:46:32 +02:00
if snapshot_dir . exists ( ) {
trace! ( target : " snapshot " , " Removing previous snapshot at {:?} " , & snapshot_dir ) ;
fs ::remove_dir_all ( & snapshot_dir ) ? ;
}
2016-09-02 16:15:25 +02:00
2019-10-24 16:46:32 +02:00
fs ::rename ( temp_dir , & snapshot_dir ) ? ;
trace! ( target : " snapshot " , " Moved new snapshot into place at {:?} " , & snapshot_dir ) ;
* reader = Some ( LooseReader ::new ( snapshot_dir ) ? ) ;
2016-09-02 16:15:25 +02:00
2019-10-24 16:46:32 +02:00
guard . disarm ( ) ;
Ok ( ( ) )
2016-09-13 10:33:03 +02:00
}
2016-09-02 16:15:25 +02:00
}
2016-08-25 22:20:44 +02:00
/// Initialize the restoration synchronously.
2016-09-11 14:05:59 +02:00
/// The recover flag indicates whether to recover the restored snapshot.
pub fn init_restore ( & self , manifest : ManifestData , recover : bool ) -> Result < ( ) , Error > {
2018-05-16 22:01:55 +02:00
let mut res = self . restoration . lock ( ) ;
2016-08-25 22:20:44 +02:00
let rest_dir = self . restoration_dir ( ) ;
2018-05-16 22:01:55 +02:00
let rest_db = self . restoration_db ( ) ;
let recovery_temp = self . temp_recovery_dir ( ) ;
let prev_chunks = self . prev_chunks_dir ( ) ;
2016-08-25 22:20:44 +02:00
2018-05-16 22:01:55 +02:00
// delete and restore the restoration dir.
if let Err ( e ) = fs ::remove_dir_all ( & prev_chunks ) {
match e . kind ( ) {
ErrorKind ::NotFound = > { } ,
_ = > return Err ( e . into ( ) ) ,
}
}
// Move the previous recovery temp directory
// to `prev_chunks` to be able to restart restoring
// with previously downloaded blocks
// This step is optional, so don't fail on error
fs ::rename ( & recovery_temp , & prev_chunks ) . ok ( ) ;
2016-08-25 22:20:44 +02:00
2016-09-11 14:05:59 +02:00
self . state_chunks . store ( 0 , Ordering ::SeqCst ) ;
self . block_chunks . store ( 0 , Ordering ::SeqCst ) ;
2016-08-25 22:20:44 +02:00
// tear down existing restoration.
* res = None ;
// delete and restore the restoration dir.
if let Err ( e ) = fs ::remove_dir_all ( & rest_dir ) {
match e . kind ( ) {
ErrorKind ::NotFound = > { } ,
_ = > return Err ( e . into ( ) ) ,
}
}
2018-05-16 22:01:55 +02:00
* self . status . lock ( ) = RestorationStatus ::Initializing {
chunks_done : 0 ,
2019-07-01 14:41:45 +02:00
state_chunks : manifest . state_hashes . len ( ) as u32 ,
block_chunks : manifest . block_hashes . len ( ) as u32 ,
2018-05-16 22:01:55 +02:00
} ;
2016-12-27 12:53:56 +01:00
fs ::create_dir_all ( & rest_dir ) ? ;
2016-08-25 22:20:44 +02:00
// make new restoration.
2016-09-11 14:05:59 +02:00
let writer = match recover {
2018-05-16 22:01:55 +02:00
true = > Some ( LooseWriter ::new ( recovery_temp ) ? ) ,
2016-09-11 14:05:59 +02:00
false = > None
} ;
2016-08-25 22:20:44 +02:00
let params = RestorationParams {
2018-05-16 22:01:55 +02:00
manifest : manifest . clone ( ) ,
2016-08-25 22:20:44 +02:00
pruning : self . pruning ,
2018-05-16 22:01:55 +02:00
db : self . restoration_db_handler . open ( & rest_db ) ? ,
2019-07-01 14:41:45 +02:00
writer ,
2016-08-25 22:20:44 +02:00
genesis : & self . genesis_block ,
2018-05-16 22:01:55 +02:00
guard : Guard ::new ( rest_db ) ,
2017-04-19 20:31:53 +02:00
engine : & * self . engine ,
2016-08-25 22:20:44 +02:00
} ;
2018-05-16 22:01:55 +02:00
let state_chunks = manifest . state_hashes . len ( ) ;
let block_chunks = manifest . block_hashes . len ( ) ;
2016-10-18 18:16:00 +02:00
2016-12-27 12:53:56 +01:00
* res = Some ( Restoration ::new ( params ) ? ) ;
2016-08-25 22:20:44 +02:00
2018-05-16 22:01:55 +02:00
self . restoring_snapshot . store ( true , Ordering ::SeqCst ) ;
// Import previous chunks, continue if it fails
self . import_prev_chunks ( & mut res , manifest ) . ok ( ) ;
2018-11-18 00:06:34 +01:00
// It could be that the restoration failed or completed in the meanwhile
let mut restoration_status = self . status . lock ( ) ;
if let RestorationStatus ::Initializing { .. } = * restoration_status {
* restoration_status = RestorationStatus ::Ongoing {
state_chunks : state_chunks as u32 ,
block_chunks : block_chunks as u32 ,
state_chunks_done : self . state_chunks . load ( Ordering ::SeqCst ) as u32 ,
block_chunks_done : self . block_chunks . load ( Ordering ::SeqCst ) as u32 ,
} ;
}
2016-11-13 13:52:53 +01:00
2016-08-25 22:20:44 +02:00
Ok ( ( ) )
}
2018-05-16 22:01:55 +02:00
/// Import the previous chunks into the current restoration
fn import_prev_chunks ( & self , restoration : & mut Option < Restoration > , manifest : ManifestData ) -> Result < ( ) , Error > {
let prev_chunks = self . prev_chunks_dir ( ) ;
// Restore previous snapshot chunks
let files = fs ::read_dir ( prev_chunks . as_path ( ) ) ? ;
let mut num_temp_chunks = 0 ;
for prev_chunk_file in files {
// Don't go over all the files if the restoration has been aborted
if ! self . restoring_snapshot . load ( Ordering ::SeqCst ) {
trace! ( target :" snapshot " , " Aborting importing previous chunks " ) ;
return Ok ( ( ) ) ;
}
// Import the chunk, don't fail and continue if one fails
match self . import_prev_chunk ( restoration , & manifest , prev_chunk_file ) {
Ok ( true ) = > num_temp_chunks + = 1 ,
Err ( e ) = > trace! ( target : " snapshot " , " Error importing chunk: {:?} " , e ) ,
_ = > ( ) ,
}
}
trace! ( target :" snapshot " , " Imported {} previous chunks " , num_temp_chunks ) ;
// Remove the prev temp directory
fs ::remove_dir_all ( & prev_chunks ) ? ;
Ok ( ( ) )
}
2019-10-24 16:46:32 +02:00
/// Import a previous chunk at the given path. Returns whether the chunk was imported or not
fn import_prev_chunk (
& self ,
restoration : & mut Option < Restoration > ,
manifest : & ManifestData ,
file : io ::Result < fs ::DirEntry >
) -> Result < bool , Error > {
2018-05-16 22:01:55 +02:00
let file = file ? ;
let path = file . path ( ) ;
let mut file = File ::open ( path . clone ( ) ) ? ;
2019-10-24 16:46:32 +02:00
let filesize = file . metadata ( ) ? . len ( ) ;
let mut buffer = Vec ::with_capacity ( filesize as usize + 1 ) ; // +1 for EOF
2018-05-16 22:01:55 +02:00
file . read_to_end ( & mut buffer ) ? ;
let hash = keccak ( & buffer ) ;
let is_state = if manifest . block_hashes . contains ( & hash ) {
false
} else if manifest . state_hashes . contains ( & hash ) {
true
} else {
2019-10-24 16:46:32 +02:00
warn! ( target : " snapshot " , " Hash of the content of {:?} not present in the manifest block/state hashes. " , path ) ;
2018-05-16 22:01:55 +02:00
return Ok ( false ) ;
} ;
self . feed_chunk_with_restoration ( restoration , hash , & buffer , is_state ) ? ;
trace! ( target : " snapshot " , " Fed chunk {:?} " , hash ) ;
Ok ( true )
}
2019-10-24 16:46:32 +02:00
// Finalize the restoration. This accepts an already-locked restoration as an argument -- so
// acquiring it again _will_ lead to deadlock.
2016-08-05 17:00:46 +02:00
fn finalize_restoration ( & self , rest : & mut Option < Restoration > ) -> Result < ( ) , Error > {
2019-10-24 16:46:32 +02:00
trace! ( target : " snapshot " , " Finalizing restoration " ) ;
2019-07-01 14:41:45 +02:00
* self . status . lock ( ) = RestorationStatus ::Finalizing ;
2016-08-05 17:00:46 +02:00
2016-09-11 14:05:59 +02:00
let recover = rest . as_ref ( ) . map_or ( false , | rest | rest . writer . is_some ( ) ) ;
2016-08-05 17:00:46 +02:00
2016-08-25 22:20:44 +02:00
// destroy the restoration before replacing databases and snapshot.
2017-05-17 12:41:33 +02:00
rest . take ( )
2019-07-01 14:41:45 +02:00
. map ( | r | r . finalize ( ) )
2017-05-17 12:41:33 +02:00
. unwrap_or ( Ok ( ( ) ) ) ? ;
2019-07-01 14:41:45 +02:00
let migrated_blocks = self . migrate_blocks ( ) ? ;
2019-10-24 16:46:32 +02:00
info! ( target : " snapshot " , " Migrated {} ancient blocks from the old DB " , migrated_blocks ) ;
2019-07-01 14:41:45 +02:00
// replace the Client's database with the new one (restart the Client).
self . client . restore_db ( & * self . restoration_db ( ) . to_string_lossy ( ) ) ? ;
2016-08-05 17:00:46 +02:00
2016-09-11 14:05:59 +02:00
if recover {
let mut reader = self . reader . write ( ) ;
* reader = None ; // destroy the old reader if it existed.
2016-08-25 22:20:44 +02:00
2016-09-11 14:05:59 +02:00
let snapshot_dir = self . snapshot_dir ( ) ;
2016-08-25 22:20:44 +02:00
2016-09-13 10:33:03 +02:00
if snapshot_dir . exists ( ) {
2019-10-24 16:46:32 +02:00
trace! ( target : " snapshot " , " Removing old snapshot dir at {} " , snapshot_dir . to_string_lossy ( ) ) ;
2016-12-27 12:53:56 +01:00
fs ::remove_dir_all ( & snapshot_dir ) ? ;
2016-08-25 22:20:44 +02:00
}
2019-10-24 16:46:32 +02:00
trace! ( target : " snapshot " , " Copying restored snapshot files over " ) ;
2016-12-27 12:53:56 +01:00
fs ::rename ( self . temp_recovery_dir ( ) , & snapshot_dir ) ? ;
2016-08-05 17:00:46 +02:00
2016-12-27 12:53:56 +01:00
* reader = Some ( LooseReader ::new ( snapshot_dir ) ? ) ;
2016-09-11 14:05:59 +02:00
}
2016-08-25 22:20:44 +02:00
2016-09-11 14:05:59 +02:00
let _ = fs ::remove_dir_all ( self . restoration_dir ( ) ) ;
2016-08-25 22:20:44 +02:00
* self . status . lock ( ) = RestorationStatus ::Inactive ;
2016-08-05 17:00:46 +02:00
Ok ( ( ) )
}
2018-09-10 15:26:52 +02:00
/// Feed a chunk of either kind (block or state). no-op if no restoration or status is wrong.
fn feed_chunk ( & self , hash : H256 , chunk : & [ u8 ] , is_state : bool ) {
2016-08-25 22:20:44 +02:00
// TODO: be able to process block chunks and state chunks at same time?
2019-07-01 14:41:45 +02:00
let r = {
let mut restoration = self . restoration . lock ( ) ;
self . feed_chunk_with_restoration ( & mut restoration , hash , chunk , is_state )
} ;
match r {
2018-09-10 15:26:52 +02:00
Ok ( ( ) ) |
2019-05-06 15:06:20 +02:00
Err ( Error ::Snapshot ( SnapshotError ::RestorationAborted ) ) = > ( ) ,
2018-09-10 15:26:52 +02:00
Err ( e ) = > {
2019-07-01 14:41:45 +02:00
// TODO: after this we're sometimes deadlocked
2018-09-10 15:26:52 +02:00
warn! ( " Encountered error during snapshot restoration: {} " , e ) ;
2019-07-01 14:41:45 +02:00
self . abort_restore ( ) ;
if let Some ( mut status ) = self . status . try_lock_for ( std ::time ::Duration ::from_millis ( 10 ) ) {
* status = RestorationStatus ::Failed ;
}
2018-09-10 15:26:52 +02:00
let _ = fs ::remove_dir_all ( self . restoration_dir ( ) ) ;
}
}
2018-05-16 22:01:55 +02:00
}
2016-10-25 18:40:01 +02:00
2018-05-16 22:01:55 +02:00
/// Feed a chunk with the Restoration
fn feed_chunk_with_restoration ( & self , restoration : & mut Option < Restoration > , hash : H256 , chunk : & [ u8 ] , is_state : bool ) -> Result < ( ) , Error > {
let ( result , db ) = {
2016-10-25 18:40:01 +02:00
match self . status ( ) {
2019-07-01 14:41:45 +02:00
RestorationStatus ::Inactive | RestorationStatus ::Failed | RestorationStatus ::Finalizing = > {
trace! ( target : " snapshot " , " Tried to restore chunk {:x} while inactive, failed or finalizing " , hash ) ;
2018-05-16 22:01:55 +02:00
return Ok ( ( ) ) ;
} ,
RestorationStatus ::Ongoing { .. } | RestorationStatus ::Initializing { .. } = > {
2016-10-25 18:40:01 +02:00
let ( res , db ) = {
let rest = match * restoration {
Some ( ref mut r ) = > r ,
None = > return Ok ( ( ) ) ,
2016-08-05 17:00:46 +02:00
} ;
2016-10-25 18:40:01 +02:00
( match is_state {
2016-11-13 13:52:53 +01:00
true = > rest . feed_state ( hash , chunk , & self . restoring_snapshot ) ,
false = > rest . feed_blocks ( hash , chunk , & * self . engine , & self . restoring_snapshot ) ,
2016-10-25 18:40:01 +02:00
} . map ( | _ | rest . is_done ( ) ) , rest . db . clone ( ) )
} ;
let res = match res {
Ok ( is_done ) = > {
match is_state {
true = > self . state_chunks . fetch_add ( 1 , Ordering ::SeqCst ) ,
false = > self . block_chunks . fetch_add ( 1 , Ordering ::SeqCst ) ,
} ;
match is_done {
true = > {
2018-07-06 15:09:39 +02:00
db . key_value ( ) . flush ( ) ? ;
2016-10-26 16:14:13 +02:00
drop ( db ) ;
return self . finalize_restoration ( & mut * restoration ) ;
2016-10-25 18:40:01 +02:00
} ,
false = > Ok ( ( ) )
}
2016-08-05 17:00:46 +02:00
}
2016-10-25 18:40:01 +02:00
other = > other . map ( drop ) ,
} ;
( res , db )
2016-08-05 17:00:46 +02:00
}
}
2016-10-25 18:40:01 +02:00
} ;
2018-07-06 15:09:39 +02:00
result ? ;
db . key_value ( ) . flush ( ) ? ;
Ok ( ( ) )
2016-08-05 17:00:46 +02:00
}
/// Feed a state chunk to be processed synchronously.
pub fn feed_state_chunk ( & self , hash : H256 , chunk : & [ u8 ] ) {
2018-09-10 15:26:52 +02:00
self . feed_chunk ( hash , chunk , true ) ;
2016-08-05 17:00:46 +02:00
}
/// Feed a block chunk to be processed synchronously.
pub fn feed_block_chunk ( & self , hash : H256 , chunk : & [ u8 ] ) {
2018-09-10 15:26:52 +02:00
self . feed_chunk ( hash , chunk , false ) ;
2016-08-05 17:00:46 +02:00
}
}
2019-09-03 11:29:25 +02:00
impl < C : Send + Sync > SnapshotService for Service < C > {
2016-08-05 17:00:46 +02:00
fn manifest ( & self ) -> Option < ManifestData > {
2016-08-25 22:20:44 +02:00
self . reader . read ( ) . as_ref ( ) . map ( | r | r . manifest ( ) . clone ( ) )
2016-08-05 17:00:46 +02:00
}
2017-07-18 16:59:33 +02:00
fn supported_versions ( & self ) -> Option < ( u64 , u64 ) > {
2019-08-22 18:25:49 +02:00
chunker ( self . engine . snapshot_mode ( ) )
2017-07-18 16:59:33 +02:00
. map ( | c | ( c . min_supported_version ( ) , c . current_version ( ) ) )
2017-05-17 12:41:33 +02:00
}
2018-05-16 22:01:55 +02:00
fn completed_chunks ( & self ) -> Option < Vec < H256 > > {
let restoration = self . restoration . lock ( ) ;
match * restoration {
Some ( ref restoration ) = > {
let completed_chunks = restoration . manifest . block_hashes
. iter ( )
. filter ( | h | ! restoration . block_chunks_left . contains ( h ) )
. chain (
restoration . manifest . state_hashes
. iter ( )
. filter ( | h | ! restoration . state_chunks_left . contains ( h ) )
)
. map ( | h | * h )
. collect ( ) ;
Some ( completed_chunks )
} ,
None = > None ,
}
}
2019-09-03 11:29:25 +02:00
fn chunk ( & self , hash : H256 ) -> Option < Bytes > {
self . reader . read ( ) . as_ref ( ) . and_then ( | r | r . chunk ( hash ) . ok ( ) )
}
2016-08-05 17:00:46 +02:00
fn status ( & self ) -> RestorationStatus {
2016-09-11 14:05:59 +02:00
let mut cur_status = self . status . lock ( ) ;
2018-05-16 22:01:55 +02:00
match * cur_status {
2019-07-01 14:41:45 +02:00
RestorationStatus ::Initializing { ref mut chunks_done , .. } = > {
2018-05-16 22:01:55 +02:00
* chunks_done = self . state_chunks . load ( Ordering ::SeqCst ) as u32 +
self . block_chunks . load ( Ordering ::SeqCst ) as u32 ;
}
RestorationStatus ::Ongoing { ref mut state_chunks_done , ref mut block_chunks_done , .. } = > {
* state_chunks_done = self . state_chunks . load ( Ordering ::SeqCst ) as u32 ;
* block_chunks_done = self . block_chunks . load ( Ordering ::SeqCst ) as u32 ;
} ,
_ = > ( ) ,
2016-09-11 14:05:59 +02:00
}
cur_status . clone ( )
2016-08-05 17:00:46 +02:00
}
2016-08-25 22:20:44 +02:00
fn begin_restore ( & self , manifest : ManifestData ) {
2016-10-30 09:56:34 +01:00
if let Err ( e ) = self . io_channel . lock ( ) . send ( ClientIoMessage ::BeginRestoration ( manifest ) ) {
2016-10-25 18:40:01 +02:00
trace! ( " Error sending snapshot service message: {:?} " , e ) ;
}
2016-08-25 22:20:44 +02:00
}
2016-08-05 17:00:46 +02:00
2016-08-25 22:20:44 +02:00
fn abort_restore ( & self ) {
2018-05-16 22:01:55 +02:00
trace! ( target : " snapshot " , " Aborting restore " ) ;
2016-11-13 13:52:53 +01:00
self . restoring_snapshot . store ( false , Ordering ::SeqCst ) ;
2016-08-25 22:20:44 +02:00
* self . restoration . lock ( ) = None ;
* self . status . lock ( ) = RestorationStatus ::Inactive ;
2016-08-05 17:00:46 +02:00
}
fn restore_state_chunk ( & self , hash : H256 , chunk : Bytes ) {
2016-10-30 09:56:34 +01:00
if let Err ( e ) = self . io_channel . lock ( ) . send ( ClientIoMessage ::FeedStateChunk ( hash , chunk ) ) {
2016-10-25 18:40:01 +02:00
trace! ( " Error sending snapshot service message: {:?} " , e ) ;
}
2016-08-05 17:00:46 +02:00
}
fn restore_block_chunk ( & self , hash : H256 , chunk : Bytes ) {
2016-10-30 09:56:34 +01:00
if let Err ( e ) = self . io_channel . lock ( ) . send ( ClientIoMessage ::FeedBlockChunk ( hash , chunk ) ) {
2016-10-25 18:40:01 +02:00
trace! ( " Error sending snapshot service message: {:?} " , e ) ;
}
2016-08-05 17:00:46 +02:00
}
2018-05-29 12:23:15 +02:00
2019-06-19 10:13:09 +02:00
fn abort_snapshot ( & self ) {
if self . taking_snapshot . load ( Ordering ::SeqCst ) {
trace! ( target : " snapshot " , " Aborting snapshot – Snapshot under way " ) ;
2019-10-28 18:24:45 +01:00
self . progress . write ( ) . abort = true ;
2019-06-19 10:13:09 +02:00
}
}
2018-05-29 12:23:15 +02:00
fn shutdown ( & self ) {
2019-06-19 10:13:09 +02:00
trace! ( target : " snapshot " , " Shut down SnapshotService " ) ;
2018-05-29 12:23:15 +02:00
self . abort_restore ( ) ;
2019-06-19 10:13:09 +02:00
trace! ( target : " snapshot " , " Shut down SnapshotService - restore aborted " ) ;
self . abort_snapshot ( ) ;
trace! ( target : " snapshot " , " Shut down SnapshotService - snapshot aborted " ) ;
2018-05-29 12:23:15 +02:00
}
2016-08-10 16:29:40 +02:00
}
2016-09-05 12:24:03 +02:00
2019-09-03 11:29:25 +02:00
impl < C : Send + Sync > Drop for Service < C > {
2016-09-06 15:41:56 +02:00
fn drop ( & mut self ) {
2019-06-19 10:13:09 +02:00
trace! ( target : " shutdown " , " Dropping Service " ) ;
2016-09-06 15:41:56 +02:00
self . abort_restore ( ) ;
2019-06-19 10:13:09 +02:00
trace! ( target : " shutdown " , " Dropping Service - restore aborted " ) ;
self . abort_snapshot ( ) ;
trace! ( target : " shutdown " , " Dropping Service - snapshot aborted " ) ;
2016-09-06 15:41:56 +02:00
}
}