// Copyright 2015-2017 Parity Technologies (UK) Ltd. // This file is part of Parity. // Parity is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity. If not, see . //! Snapshot creation, restoration, and network service. //! //! Documentation of the format can be found at //! https://github.com/paritytech/parity/wiki/Warp-Sync-Snapshot-Format use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use account_db::{AccountDB, AccountDBMut}; use blockchain::{BlockChain, BlockProvider}; use engines::Engine; use header::Header; use ids::BlockId; use util::{Bytes, Hashable, HashDB, DBValue, snappy, U256}; use util::Mutex; use util::hash::{H256}; use util::journaldb::{self, Algorithm, JournalDB}; use util::kvdb::KeyValueDB; use util::trie::{TrieDB, TrieDBMut, Trie, TrieMut}; use util::sha3::SHA3_NULL_RLP; use rlp::{RlpStream, UntrustedRlp}; use bloom_journal::Bloom; use self::io::SnapshotWriter; use super::state_db::StateDB; use super::state::Account as StateAccount; use crossbeam::scope; use rand::{Rng, OsRng}; pub use self::error::Error; pub use self::consensus::*; pub use self::service::{Service, DatabaseRestore}; pub use self::traits::SnapshotService; pub use self::watcher::Watcher; pub use types::snapshot_manifest::ManifestData; pub use types::restoration_status::RestorationStatus; pub use types::basic_account::BasicAccount; pub mod io; pub mod service; mod account; mod block; mod consensus; mod error; mod watcher; #[cfg(test)] mod tests; /// IPC interfaces #[cfg(feature="ipc")] pub mod remote { pub use super::traits::RemoteSnapshotService; } mod traits { #![allow(dead_code, unused_assignments, unused_variables, missing_docs)] // codegen issues include!(concat!(env!("OUT_DIR"), "/snapshot_service_trait.rs")); } // Try to have chunks be around 4MB (before compression) const PREFERRED_CHUNK_SIZE: usize = 4 * 1024 * 1024; // Minimum supported state chunk version. const MIN_SUPPORTED_STATE_CHUNK_VERSION: u64 = 1; // current state chunk version. const STATE_CHUNK_VERSION: u64 = 2; /// A progress indicator for snapshots. #[derive(Debug, Default)] pub struct Progress { accounts: AtomicUsize, blocks: AtomicUsize, size: AtomicUsize, // Todo [rob] use Atomicu64 when it stabilizes. done: AtomicBool, } impl Progress { /// Reset the progress. pub fn reset(&self) { self.accounts.store(0, Ordering::Release); self.blocks.store(0, Ordering::Release); self.size.store(0, Ordering::Release); // atomic fence here to ensure the others are written first? // logs might very rarely get polluted if not. self.done.store(false, Ordering::Release); } /// Get the number of accounts snapshotted thus far. pub fn accounts(&self) -> usize { self.accounts.load(Ordering::Acquire) } /// Get the number of blocks snapshotted thus far. pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Acquire) } /// Get the written size of the snapshot in bytes. pub fn size(&self) -> usize { self.size.load(Ordering::Acquire) } /// Whether the snapshot is complete. pub fn done(&self) -> bool { self.done.load(Ordering::Acquire) } } /// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer. pub fn take_snapshot( engine: &Engine, chain: &BlockChain, block_at: H256, state_db: &HashDB, writer: W, p: &Progress ) -> Result<(), Error> { let start_header = chain.block_header(&block_at) .ok_or(Error::InvalidStartingBlock(BlockId::Hash(block_at)))?; let state_root = start_header.state_root(); let number = start_header.number(); info!("Taking snapshot starting at block {}", number); let writer = Mutex::new(writer); let chunker = engine.snapshot_components().ok_or(Error::SnapshotsUnsupported)?; let snapshot_version = chunker.current_version(); let (state_hashes, block_hashes) = scope(|scope| { let writer = &writer; let block_guard = scope.spawn(move || chunk_secondary(chunker, chain, block_at, writer, p)); let state_res = chunk_state(state_db, state_root, writer, p); state_res.and_then(|state_hashes| { block_guard.join().map(|block_hashes| (state_hashes, block_hashes)) }) })?; info!("produced {} state chunks and {} block chunks.", state_hashes.len(), block_hashes.len()); let manifest_data = ManifestData { version: snapshot_version, state_hashes: state_hashes, block_hashes: block_hashes, state_root: *state_root, block_number: number, block_hash: block_at, }; writer.into_inner().finish(manifest_data)?; p.done.store(true, Ordering::SeqCst); Ok(()) } /// Create and write out all secondary chunks to disk, returning a vector of all /// the hashes of secondary chunks created. /// /// Secondary chunks are engine-specific, but they intend to corroborate the state data /// in the state chunks. /// Returns a list of chunk hashes, with the first having the blocks furthest from the genesis. pub fn chunk_secondary<'a>(mut chunker: Box, chain: &'a BlockChain, start_hash: H256, writer: &Mutex, progress: &'a Progress) -> Result, Error> { let mut chunk_hashes = Vec::new(); let mut snappy_buffer = vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)]; { let mut chunk_sink = |raw_data: &[u8]| { let compressed_size = snappy::compress_into(raw_data, &mut snappy_buffer); let compressed = &snappy_buffer[..compressed_size]; let hash = compressed.sha3(); let size = compressed.len(); writer.lock().write_block_chunk(hash, compressed)?; trace!(target: "snapshot", "wrote secondary chunk. hash: {}, size: {}, uncompressed size: {}", hash.hex(), size, raw_data.len()); progress.size.fetch_add(size, Ordering::SeqCst); chunk_hashes.push(hash); Ok(()) }; chunker.chunk_all( chain, start_hash, &mut chunk_sink, PREFERRED_CHUNK_SIZE, )?; } Ok(chunk_hashes) } /// State trie chunker. struct StateChunker<'a> { hashes: Vec, rlps: Vec, cur_size: usize, snappy_buffer: Vec, writer: &'a Mutex, progress: &'a Progress, } impl<'a> StateChunker<'a> { // Push a key, value pair to be encoded. // // If the buffer is greater than the desired chunk size, // this will write out the data to disk. fn push(&mut self, data: Bytes) -> Result<(), Error> { self.cur_size += data.len(); self.rlps.push(data); Ok(()) } // Write out the buffer to disk, pushing the created chunk's hash to // the list. fn write_chunk(&mut self) -> Result<(), Error> { let num_entries = self.rlps.len(); let mut stream = RlpStream::new_list(num_entries); for rlp in self.rlps.drain(..) { stream.append_raw(&rlp, 1); } let raw_data = stream.out(); let compressed_size = snappy::compress_into(&raw_data, &mut self.snappy_buffer); let compressed = &self.snappy_buffer[..compressed_size]; let hash = compressed.sha3(); self.writer.lock().write_state_chunk(hash, compressed)?; trace!(target: "snapshot", "wrote state chunk. size: {}, uncompressed size: {}", compressed_size, raw_data.len()); self.progress.accounts.fetch_add(num_entries, Ordering::SeqCst); self.progress.size.fetch_add(compressed_size, Ordering::SeqCst); self.hashes.push(hash); self.cur_size = 0; Ok(()) } // Get current chunk size. fn chunk_size(&self) -> usize { self.cur_size } } /// Walk the given state database starting from the given root, /// creating chunks and writing them out. /// /// Returns a list of hashes of chunks created, or any error it may /// have encountered. pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: &Mutex, progress: &'a Progress) -> Result, Error> { let account_trie = TrieDB::new(db, &root)?; let mut chunker = StateChunker { hashes: Vec::new(), rlps: Vec::new(), cur_size: 0, snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)], writer: writer, progress: progress, }; let mut used_code = HashSet::new(); // account_key here is the address' hash. for item in account_trie.iter()? { let (account_key, account_data) = item?; let account = ::rlp::decode(&*account_data); let account_key_hash = H256::from_slice(&account_key); let account_db = AccountDB::from_hash(db, account_key_hash); let fat_rlps = account::to_fat_rlps(&account_key_hash, &account, &account_db, &mut used_code, PREFERRED_CHUNK_SIZE - chunker.chunk_size(), PREFERRED_CHUNK_SIZE)?; for (i, fat_rlp) in fat_rlps.into_iter().enumerate() { if i > 0 { chunker.write_chunk()?; } chunker.push(fat_rlp)?; } } if chunker.cur_size != 0 { chunker.write_chunk()?; } Ok(chunker.hashes) } /// Used to rebuild the state trie piece by piece. pub struct StateRebuilder { db: Box, state_root: H256, known_code: HashMap, // code hashes mapped to first account with this code. missing_code: HashMap>, // maps code hashes to lists of accounts missing that code. bloom: Bloom, known_storage_roots: HashMap, // maps account hashes to last known storage root. Only filled for last account per chunk. } impl StateRebuilder { /// Create a new state rebuilder to write into the given backing DB. pub fn new(db: Arc, pruning: Algorithm) -> Self { StateRebuilder { db: journaldb::new(db.clone(), pruning, ::db::COL_STATE), state_root: SHA3_NULL_RLP, known_code: HashMap::new(), missing_code: HashMap::new(), bloom: StateDB::load_bloom(&*db), known_storage_roots: HashMap::new(), } } /// Feed an uncompressed state chunk into the rebuilder. pub fn feed(&mut self, chunk: &[u8], flag: &AtomicBool) -> Result<(), ::error::Error> { let rlp = UntrustedRlp::new(chunk); let empty_rlp = StateAccount::new_basic(U256::zero(), U256::zero()).rlp(); let mut pairs = Vec::with_capacity(rlp.item_count()?); // initialize the pairs vector with empty values so we have slots to write into. pairs.resize(rlp.item_count()?, (H256::new(), Vec::new())); let status = rebuild_accounts( self.db.as_hashdb_mut(), rlp, &mut pairs, &self.known_code, &mut self.known_storage_roots, flag )?; for (addr_hash, code_hash) in status.missing_code { self.missing_code.entry(code_hash).or_insert_with(Vec::new).push(addr_hash); } // patch up all missing code. must be done after collecting all new missing code entries. for (code_hash, code, first_with) in status.new_code { for addr_hash in self.missing_code.remove(&code_hash).unwrap_or_else(Vec::new) { let mut db = AccountDBMut::from_hash(self.db.as_hashdb_mut(), addr_hash); db.emplace(code_hash, DBValue::from_slice(&code)); } self.known_code.insert(code_hash, first_with); } let backing = self.db.backing().clone(); // batch trie writes { let mut account_trie = if self.state_root != SHA3_NULL_RLP { TrieDBMut::from_existing(self.db.as_hashdb_mut(), &mut self.state_root)? } else { TrieDBMut::new(self.db.as_hashdb_mut(), &mut self.state_root) }; for (hash, thin_rlp) in pairs { if !flag.load(Ordering::SeqCst) { return Err(Error::RestorationAborted.into()) } if &thin_rlp[..] != &empty_rlp[..] { self.bloom.set(&*hash); } account_trie.insert(&hash, &thin_rlp)?; } } let bloom_journal = self.bloom.drain_journal(); let mut batch = backing.transaction(); StateDB::commit_bloom(&mut batch, bloom_journal)?; self.db.inject(&mut batch)?; backing.write_buffered(batch); trace!(target: "snapshot", "current state root: {:?}", self.state_root); Ok(()) } /// Finalize the restoration. Check for accounts missing code and make a dummy /// journal entry. /// Once all chunks have been fed, there should be nothing missing. pub fn finalize(mut self, era: u64, id: H256) -> Result, ::error::Error> { let missing = self.missing_code.keys().cloned().collect::>(); if !missing.is_empty() { return Err(Error::MissingCode(missing).into()) } let mut batch = self.db.backing().transaction(); self.db.journal_under(&mut batch, era, &id)?; self.db.backing().write_buffered(batch); Ok(self.db) } /// Get the state root of the rebuilder. pub fn state_root(&self) -> H256 { self.state_root } } #[derive(Default)] struct RebuiltStatus { // new code that's become available. (code_hash, code, addr_hash) new_code: Vec<(H256, Bytes, H256)>, missing_code: Vec<(H256, H256)>, // accounts that are missing code. } // rebuild a set of accounts and their storage. // returns a status detailing newly-loaded code and accounts missing code. fn rebuild_accounts( db: &mut HashDB, account_fat_rlps: UntrustedRlp, out_chunk: &mut [(H256, Bytes)], known_code: &HashMap, known_storage_roots: &mut HashMap, abort_flag: &AtomicBool, ) -> Result { let mut status = RebuiltStatus::default(); for (account_rlp, out) in account_fat_rlps.into_iter().zip(out_chunk.iter_mut()) { if !abort_flag.load(Ordering::SeqCst) { return Err(Error::RestorationAborted.into()) } let hash: H256 = account_rlp.val_at(0)?; let fat_rlp = account_rlp.at(1)?; let thin_rlp = { // fill out the storage trie and code while decoding. let (acc, maybe_code) = { let mut acct_db = AccountDBMut::from_hash(db, hash); let storage_root = known_storage_roots.get(&hash).cloned().unwrap_or(H256::zero()); account::from_fat_rlp(&mut acct_db, fat_rlp, storage_root)? }; let code_hash = acc.code_hash.clone(); match maybe_code { // new inline code Some(code) => status.new_code.push((code_hash, code, hash)), None => { if code_hash != ::util::SHA3_EMPTY { // see if this code has already been included inline match known_code.get(&code_hash) { Some(&first_with) => { // if so, load it from the database. let code = AccountDB::from_hash(db, first_with) .get(&code_hash) .ok_or_else(|| Error::MissingCode(vec![first_with]))?; // and write it again under a different mangled key AccountDBMut::from_hash(db, hash).emplace(code_hash, code); } // if not, queue it up to be filled later None => status.missing_code.push((hash, code_hash)), } } } } ::rlp::encode(&acc).to_vec() }; *out = (hash, thin_rlp); } if let Some(&(ref hash, ref rlp)) = out_chunk.iter().last() { known_storage_roots.insert(*hash, ::rlp::decode::(rlp).storage_root); } if let Some(&(ref hash, ref rlp)) = out_chunk.iter().next() { known_storage_roots.insert(*hash, ::rlp::decode::(rlp).storage_root); } Ok(status) } /// Proportion of blocks which we will verify `PoW` for. const POW_VERIFY_RATE: f32 = 0.02; /// Verify an old block with the given header, engine, blockchain, body. If `always` is set, it will perform /// the fullest verification possible. If not, it will take a random sample to determine whether it will /// do heavy or light verification. pub fn verify_old_block(rng: &mut OsRng, header: &Header, engine: &Engine, chain: &BlockChain, body: Option<&[u8]>, always: bool) -> Result<(), ::error::Error> { engine.verify_block_basic(header, body)?; if always || rng.gen::() <= POW_VERIFY_RATE { engine.verify_block_unordered(header, body)?; match chain.block_header(header.parent_hash()) { Some(parent) => engine.verify_block_family(header, &parent, body), None => Ok(()), } } else { Ok(()) } }