diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 88fd181bb..e63b97dfb 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -894,7 +894,7 @@ impl Client { }, }; - snapshot::take_snapshot(&self.chain.read(), start_hash, db.as_hashdb(), writer, p)?; + snapshot::take_snapshot(&*self.engine, &self.chain.read(), start_hash, db.as_hashdb(), writer, p)?; Ok(()) } diff --git a/ethcore/src/engines/mod.rs b/ethcore/src/engines/mod.rs index badac3cf2..ad573637c 100644 --- a/ethcore/src/engines/mod.rs +++ b/ethcore/src/engines/mod.rs @@ -45,6 +45,7 @@ use error::{Error, TransactionError}; use evm::Schedule; use header::Header; use spec::CommonParams; +use snapshot::SnapshotComponents; use transaction::{UnverifiedTransaction, SignedTransaction}; use receipt::Receipt; @@ -294,4 +295,10 @@ pub trait Engine : Sync + Send { /// Stops any services that the may hold the Engine and makes it safe to drop. fn stop(&self) {} + + /// Create a factory for building snapshot chunks and restoring from them. + /// Returning `None` indicates that this engine doesn't support snapshot creation. + fn snapshot_components(&self) -> Option> { + None + } } diff --git a/ethcore/src/engines/null_engine.rs b/ethcore/src/engines/null_engine.rs index 0611fc08e..9b9fb9469 100644 --- a/ethcore/src/engines/null_engine.rs +++ b/ethcore/src/engines/null_engine.rs @@ -60,4 +60,8 @@ impl Engine for NullEngine { fn schedule(&self, _env_info: &EnvInfo) -> Schedule { Schedule::new_homestead() } + + fn snapshot_components(&self) -> Option> { + Some(Box::new(::snapshot::PowSnapshot)) + } } diff --git a/ethcore/src/ethereum/ethash.rs b/ethcore/src/ethereum/ethash.rs index 548d0dbaa..0acea6f50 100644 --- a/ethcore/src/ethereum/ethash.rs +++ b/ethcore/src/ethereum/ethash.rs @@ -405,6 +405,10 @@ impl Engine for Arc { fn epoch_verifier(&self, _header: &Header, _proof: &[u8]) -> Result, Error> { Ok(Box::new(self.clone())) } + + fn snapshot_components(&self) -> Option> { + Some(Box::new(::snapshot::PowSnapshot)) + } } // Try to round gas_limit a bit so that: diff --git a/ethcore/src/snapshot/consensus/mod.rs b/ethcore/src/snapshot/consensus/mod.rs new file mode 100644 index 000000000..2e6b6b736 --- /dev/null +++ b/ethcore/src/snapshot/consensus/mod.rs @@ -0,0 +1,349 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Secondary chunk creation and restoration, implementations for different consensus +//! engines. + +use std::collections::VecDeque; +use std::io; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use blockchain::{BlockChain, BlockProvider}; +use engines::Engine; +use snapshot::{Error, ManifestData}; +use snapshot::block::AbridgedBlock; + +use util::{Bytes, H256}; +use util::kvdb::KeyValueDB; +use rand::OsRng; +use rlp::{RlpStream, UntrustedRlp}; + + +/// A sink for produced chunks. +pub type ChunkSink<'a> = FnMut(&[u8]) -> io::Result<()> + 'a; + +// How many blocks to include in a snapshot, starting from the head of the chain. +const SNAPSHOT_BLOCKS: u64 = 30000; + +/// Components necessary for snapshot creation and restoration. +pub trait SnapshotComponents: Send { + /// Create secondary snapshot chunks; these corroborate the state data + /// in the state chunks. + /// + /// Chunks shouldn't exceed the given preferred size, and should be fed + /// uncompressed into the sink. + /// + /// This will vary by consensus engine, so it's exposed as a trait. + fn chunk_all( + &mut self, + chain: &BlockChain, + block_at: H256, + chunk_sink: &mut ChunkSink, + preferred_size: usize, + ) -> Result<(), Error>; + + /// Create a rebuilder, which will have chunks fed into it in aribtrary + /// order and then be finalized. + /// + /// The manifest, a database, and fresh `BlockChain` are supplied. + // TODO: supply anything for state? + fn rebuilder( + &self, + chain: BlockChain, + db: Arc, + manifest: &ManifestData, + ) -> Result, ::error::Error>; +} + + +/// Restore from secondary snapshot chunks. +pub trait Rebuilder: Send { + /// Feed a chunk, potentially out of order. + /// + /// Check `abort_flag` periodically while doing heavy work. If set to `false`, should bail with + /// `Error::RestorationAborted`. + fn feed( + &mut self, + chunk: &[u8], + engine: &Engine, + abort_flag: &AtomicBool, + ) -> Result<(), ::error::Error>; + + /// Finalize the restoration. Will be done after all chunks have been + /// fed successfully. + /// This will apply the necessary "glue" between chunks. + fn finalize(&mut self) -> Result<(), Error>; +} + +/// Snapshot creation and restoration for PoW chains. +/// This includes blocks from the head of the chain as a +/// loose assurance that the chain is valid. +pub struct PowSnapshot; + +impl SnapshotComponents for PowSnapshot { + fn chunk_all( + &mut self, + chain: &BlockChain, + block_at: H256, + chunk_sink: &mut ChunkSink, + preferred_size: usize, + ) -> Result<(), Error> { + PowWorker { + chain: chain, + rlps: VecDeque::new(), + current_hash: block_at, + writer: chunk_sink, + preferred_size: preferred_size, + }.chunk_all() + } + + fn rebuilder( + &self, + chain: BlockChain, + db: Arc, + manifest: &ManifestData, + ) -> Result, ::error::Error> { + PowRebuilder::new(chain, db, manifest).map(|r| Box::new(r) as Box<_>) + } +} + +/// Used to build block chunks. +struct PowWorker<'a> { + chain: &'a BlockChain, + // block, receipt rlp pairs. + rlps: VecDeque, + current_hash: H256, + writer: &'a mut ChunkSink<'a>, + preferred_size: usize, +} + +impl<'a> PowWorker<'a> { + // Repeatedly fill the buffers and writes out chunks, moving backwards from starting block hash. + // Loops until we reach the first desired block, and writes out the remainder. + fn chunk_all(&mut self) -> Result<(), Error> { + let mut loaded_size = 0; + let mut last = self.current_hash; + + let genesis_hash = self.chain.genesis_hash(); + + for _ in 0..SNAPSHOT_BLOCKS { + if self.current_hash == genesis_hash { break } + + let (block, receipts) = self.chain.block(&self.current_hash) + .and_then(|b| self.chain.block_receipts(&self.current_hash).map(|r| (b, r))) + .ok_or(Error::BlockNotFound(self.current_hash))?; + + let abridged_rlp = AbridgedBlock::from_block_view(&block.view()).into_inner(); + + let pair = { + let mut pair_stream = RlpStream::new_list(2); + pair_stream.append_raw(&abridged_rlp, 1).append(&receipts); + pair_stream.out() + }; + + let new_loaded_size = loaded_size + pair.len(); + + // cut off the chunk if too large. + + if new_loaded_size > self.preferred_size && !self.rlps.is_empty() { + self.write_chunk(last)?; + loaded_size = pair.len(); + } else { + loaded_size = new_loaded_size; + } + + self.rlps.push_front(pair); + + last = self.current_hash; + self.current_hash = block.header_view().parent_hash(); + } + + if loaded_size != 0 { + self.write_chunk(last)?; + } + + Ok(()) + } + + // write out the data in the buffers to a chunk on disk + // + // we preface each chunk with the parent of the first block's details, + // obtained from the details of the last block written. + fn write_chunk(&mut self, last: H256) -> Result<(), Error> { + trace!(target: "snapshot", "prepared block chunk with {} blocks", self.rlps.len()); + + let (last_header, last_details) = self.chain.block_header(&last) + .and_then(|n| self.chain.block_details(&last).map(|d| (n, d))) + .ok_or(Error::BlockNotFound(last))?; + + let parent_number = last_header.number() - 1; + let parent_hash = last_header.parent_hash(); + let parent_total_difficulty = last_details.total_difficulty - *last_header.difficulty(); + + trace!(target: "snapshot", "parent last written block: {}", parent_hash); + + let num_entries = self.rlps.len(); + let mut rlp_stream = RlpStream::new_list(3 + num_entries); + rlp_stream.append(&parent_number).append(parent_hash).append(&parent_total_difficulty); + + for pair in self.rlps.drain(..) { + rlp_stream.append_raw(&pair, 1); + } + + let raw_data = rlp_stream.out(); + + (self.writer)(&raw_data)?; + + Ok(()) + } +} + +/// Rebuilder for proof-of-work chains. +/// Does basic verification for all blocks, but `PoW` verification for some. +/// Blocks must be fed in-order. +/// +/// The first block in every chunk is disconnected from the last block in the +/// chunk before it, as chunks may be submitted out-of-order. +/// +/// After all chunks have been submitted, we "glue" the chunks together. +pub struct PowRebuilder { + chain: BlockChain, + db: Arc, + rng: OsRng, + disconnected: Vec<(u64, H256)>, + best_number: u64, + best_hash: H256, + best_root: H256, + fed_blocks: u64, +} + +impl PowRebuilder { + /// Create a new PowRebuilder. + fn new(chain: BlockChain, db: Arc, manifest: &ManifestData) -> Result { + Ok(PowRebuilder { + chain: chain, + db: db, + rng: OsRng::new()?, + disconnected: Vec::new(), + best_number: manifest.block_number, + best_hash: manifest.block_hash, + best_root: manifest.state_root, + fed_blocks: 0, + }) + } +} + +impl Rebuilder for PowRebuilder { + /// Feed the rebuilder an uncompressed block chunk. + /// Returns the number of blocks fed or any errors. + fn feed(&mut self, chunk: &[u8], engine: &Engine, abort_flag: &AtomicBool) -> Result<(), ::error::Error> { + use basic_types::Seal::With; + use views::BlockView; + use snapshot::verify_old_block; + use util::U256; + use util::triehash::ordered_trie_root; + + let rlp = UntrustedRlp::new(chunk); + let item_count = rlp.item_count()?; + let num_blocks = (item_count - 3) as u64; + + trace!(target: "snapshot", "restoring block chunk with {} blocks.", item_count - 3); + + if self.fed_blocks + num_blocks > SNAPSHOT_BLOCKS { + return Err(Error::TooManyBlocks(SNAPSHOT_BLOCKS, self.fed_blocks).into()) + } + + // todo: assert here that these values are consistent with chunks being in order. + let mut cur_number = rlp.val_at::(0)? + 1; + let mut parent_hash = rlp.val_at::(1)?; + let parent_total_difficulty = rlp.val_at::(2)?; + + for idx in 3..item_count { + if !abort_flag.load(Ordering::SeqCst) { return Err(Error::RestorationAborted.into()) } + + let pair = rlp.at(idx)?; + let abridged_rlp = pair.at(0)?.as_raw().to_owned(); + let abridged_block = AbridgedBlock::from_raw(abridged_rlp); + let receipts: Vec<::receipt::Receipt> = pair.list_at(1)?; + let receipts_root = ordered_trie_root( + pair.at(1)?.iter().map(|r| r.as_raw().to_owned()) + ); + + let block = abridged_block.to_block(parent_hash, cur_number, receipts_root)?; + let block_bytes = block.rlp_bytes(With); + let is_best = cur_number == self.best_number; + + if is_best { + if block.header.hash() != self.best_hash { + return Err(Error::WrongBlockHash(cur_number, self.best_hash, block.header.hash()).into()) + } + + if block.header.state_root() != &self.best_root { + return Err(Error::WrongStateRoot(self.best_root, *block.header.state_root()).into()) + } + } + + verify_old_block( + &mut self.rng, + &block.header, + engine, + &self.chain, + Some(&block_bytes), + is_best + )?; + + let mut batch = self.db.transaction(); + + // special-case the first block in each chunk. + if idx == 3 { + if self.chain.insert_unordered_block(&mut batch, &block_bytes, receipts, Some(parent_total_difficulty), is_best, false) { + self.disconnected.push((cur_number, block.header.hash())); + } + } else { + self.chain.insert_unordered_block(&mut batch, &block_bytes, receipts, None, is_best, false); + } + self.db.write_buffered(batch); + self.chain.commit(); + + parent_hash = BlockView::new(&block_bytes).hash(); + cur_number += 1; + } + + self.fed_blocks += num_blocks; + + Ok(()) + } + + /// Glue together any disconnected chunks and check that the chain is complete. + fn finalize(&mut self) -> Result<(), Error> { + let mut batch = self.db.transaction(); + + for (first_num, first_hash) in self.disconnected.drain(..) { + let parent_num = first_num - 1; + + // check if the parent is even in the chain. + // since we don't restore every single block in the chain, + // the first block of the first chunks has nothing to connect to. + if let Some(parent_hash) = self.chain.block_hash(parent_num) { + // if so, add the child to it. + self.chain.add_child(&mut batch, parent_hash, first_hash); + } + } + self.db.write_buffered(batch); + Ok(()) + } +} diff --git a/ethcore/src/snapshot/error.rs b/ethcore/src/snapshot/error.rs index c1391b300..7a3ffdca2 100644 --- a/ethcore/src/snapshot/error.rs +++ b/ethcore/src/snapshot/error.rs @@ -57,6 +57,8 @@ pub enum Error { VersionNotSupported(u64), /// Max chunk size is to small to fit basic account data. ChunkTooSmall, + /// Snapshots not supported by the consensus engine. + SnapshotsUnsupported, } impl fmt::Display for Error { @@ -79,6 +81,7 @@ impl fmt::Display for Error { Error::Trie(ref err) => err.fmt(f), Error::VersionNotSupported(ref ver) => write!(f, "Snapshot version {} is not supprted.", ver), Error::ChunkTooSmall => write!(f, "Chunk size is too small."), + Error::SnapshotsUnsupported => write!(f, "Snapshots unsupported by consensus engine."), } } } diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index 1c3b4366b..db3bebde9 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -17,9 +17,9 @@ //! Snapshot creation, restoration, and network service. //! //! Documentation of the format can be found at -//! https://github.com/paritytech/parity/wiki/%22PV64%22-Snapshot-Format +//! https://github.com/paritytech/parity/wiki/Warp-Sync-Snapshot-Format -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -28,7 +28,6 @@ use blockchain::{BlockChain, BlockProvider}; use engines::Engine; use header::Header; use ids::BlockId; -use views::BlockView; use util::{Bytes, Hashable, HashDB, DBValue, snappy, U256, Uint}; use util::Mutex; @@ -40,7 +39,6 @@ use util::sha3::SHA3_NULL_RLP; use rlp::{RlpStream, UntrustedRlp}; use bloom_journal::Bloom; -use self::block::AbridgedBlock; use self::io::SnapshotWriter; use super::state_db::StateDB; @@ -51,6 +49,7 @@ use rand::{Rng, OsRng}; pub use self::error::Error; +pub use self::consensus::*; pub use self::service::{Service, DatabaseRestore}; pub use self::traits::SnapshotService; pub use self::watcher::Watcher; @@ -63,6 +62,7 @@ pub mod service; mod account; mod block; +mod consensus; mod error; mod watcher; @@ -83,9 +83,6 @@ mod traits { // Try to have chunks be around 4MB (before compression) const PREFERRED_CHUNK_SIZE: usize = 4 * 1024 * 1024; -// How many blocks to include in a snapshot, starting from the head of the chain. -const SNAPSHOT_BLOCKS: u64 = 30000; - /// A progress indicator for snapshots. #[derive(Debug, Default)] pub struct Progress { @@ -122,6 +119,7 @@ impl Progress { } /// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer. pub fn take_snapshot( + engine: &Engine, chain: &BlockChain, block_at: H256, state_db: &HashDB, @@ -136,9 +134,11 @@ pub fn take_snapshot( info!("Taking snapshot starting at block {}", number); let writer = Mutex::new(writer); + let chunker = engine.snapshot_components().ok_or(Error::SnapshotsUnsupported)?; let (state_hashes, block_hashes) = scope(|scope| { - let block_guard = scope.spawn(|| chunk_blocks(chain, block_at, &writer, p)); - let state_res = chunk_state(state_db, state_root, &writer, p); + let writer = &writer; + let block_guard = scope.spawn(move || chunk_secondary(chunker, chain, block_at, writer, p)); + let state_res = chunk_state(state_db, state_root, writer, p); state_res.and_then(|state_hashes| { block_guard.join().map(|block_hashes| (state_hashes, block_hashes)) @@ -163,128 +163,41 @@ pub fn take_snapshot( Ok(()) } -/// Used to build block chunks. -struct BlockChunker<'a> { - chain: &'a BlockChain, - // block, receipt rlp pairs. - rlps: VecDeque, - current_hash: H256, - hashes: Vec, - snappy_buffer: Vec, - writer: &'a Mutex, - progress: &'a Progress, -} - -impl<'a> BlockChunker<'a> { - // Repeatedly fill the buffers and writes out chunks, moving backwards from starting block hash. - // Loops until we reach the first desired block, and writes out the remainder. - fn chunk_all(&mut self) -> Result<(), Error> { - let mut loaded_size = 0; - let mut last = self.current_hash; - - let genesis_hash = self.chain.genesis_hash(); - - for _ in 0..SNAPSHOT_BLOCKS { - if self.current_hash == genesis_hash { break } - - let (block, receipts) = self.chain.block(&self.current_hash) - .and_then(|b| self.chain.block_receipts(&self.current_hash).map(|r| (b, r))) - .ok_or(Error::BlockNotFound(self.current_hash))?; - - let abridged_rlp = AbridgedBlock::from_block_view(&block.view()).into_inner(); - - let pair = { - let mut pair_stream = RlpStream::new_list(2); - pair_stream.append_raw(&abridged_rlp, 1).append(&receipts); - pair_stream.out() - }; - - let new_loaded_size = loaded_size + pair.len(); - - // cut off the chunk if too large. - - if new_loaded_size > PREFERRED_CHUNK_SIZE && !self.rlps.is_empty() { - self.write_chunk(last)?; - loaded_size = pair.len(); - } else { - loaded_size = new_loaded_size; - } - - self.rlps.push_front(pair); - - last = self.current_hash; - self.current_hash = block.header_view().parent_hash(); - } - - if loaded_size != 0 { - self.write_chunk(last)?; - } - - Ok(()) - } - - // write out the data in the buffers to a chunk on disk - // - // we preface each chunk with the parent of the first block's details, - // obtained from the details of the last block written. - fn write_chunk(&mut self, last: H256) -> Result<(), Error> { - trace!(target: "snapshot", "prepared block chunk with {} blocks", self.rlps.len()); - - let (last_header, last_details) = self.chain.block_header(&last) - .and_then(|n| self.chain.block_details(&last).map(|d| (n, d))) - .ok_or(Error::BlockNotFound(last))?; - - let parent_number = last_header.number() - 1; - let parent_hash = last_header.parent_hash(); - let parent_total_difficulty = last_details.total_difficulty - *last_header.difficulty(); - - trace!(target: "snapshot", "parent last written block: {}", parent_hash); - - let num_entries = self.rlps.len(); - let mut rlp_stream = RlpStream::new_list(3 + num_entries); - rlp_stream.append(&parent_number).append(parent_hash).append(&parent_total_difficulty); - - for pair in self.rlps.drain(..) { - rlp_stream.append_raw(&pair, 1); - } - - let raw_data = rlp_stream.out(); - - let size = snappy::compress_into(&raw_data, &mut self.snappy_buffer); - let compressed = &self.snappy_buffer[..size]; - let hash = compressed.sha3(); - - self.writer.lock().write_block_chunk(hash, compressed)?; - trace!(target: "snapshot", "wrote block chunk. hash: {}, size: {}, uncompressed size: {}", hash.hex(), size, raw_data.len()); - - self.progress.size.fetch_add(size, Ordering::SeqCst); - self.progress.blocks.fetch_add(num_entries, Ordering::SeqCst); - - self.hashes.push(hash); - Ok(()) - } -} - -/// Create and write out all block chunks to disk, returning a vector of all -/// the hashes of block chunks created. +/// Create and write out all secondary chunks to disk, returning a vector of all +/// the hashes of secondary chunks created. /// -/// The path parameter is the directory to store the block chunks in. -/// This function assumes the directory exists already. +/// Secondary chunks are engine-specific, but they intend to corroborate the state data +/// in the state chunks. /// Returns a list of chunk hashes, with the first having the blocks furthest from the genesis. -pub fn chunk_blocks<'a>(chain: &'a BlockChain, start_hash: H256, writer: &Mutex, progress: &'a Progress) -> Result, Error> { - let mut chunker = BlockChunker { - chain: chain, - rlps: VecDeque::new(), - current_hash: start_hash, - hashes: Vec::new(), - snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)], - writer: writer, - progress: progress, - }; +pub fn chunk_secondary<'a>(mut chunker: Box, chain: &'a BlockChain, start_hash: H256, writer: &Mutex, progress: &'a Progress) -> Result, Error> { + let mut chunk_hashes = Vec::new(); + let mut snappy_buffer = vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)]; - chunker.chunk_all()?; + { + let mut chunk_sink = |raw_data: &[u8]| { + let compressed_size = snappy::compress_into(raw_data, &mut snappy_buffer); + let compressed = &snappy_buffer[..compressed_size]; + let hash = compressed.sha3(); + let size = compressed.len(); - Ok(chunker.hashes) + writer.lock().write_block_chunk(hash, compressed)?; + trace!(target: "snapshot", "wrote secondary chunk. hash: {}, size: {}, uncompressed size: {}", + hash.hex(), size, raw_data.len()); + + progress.size.fetch_add(size, Ordering::SeqCst); + chunk_hashes.push(hash); + Ok(()) + }; + + chunker.chunk_all( + chain, + start_hash, + &mut chunk_sink, + PREFERRED_CHUNK_SIZE, + )?; + } + + Ok(chunk_hashes) } /// State trie chunker. @@ -564,158 +477,15 @@ const POW_VERIFY_RATE: f32 = 0.02; /// the fullest verification possible. If not, it will take a random sample to determine whether it will /// do heavy or light verification. pub fn verify_old_block(rng: &mut OsRng, header: &Header, engine: &Engine, chain: &BlockChain, body: Option<&[u8]>, always: bool) -> Result<(), ::error::Error> { + engine.verify_block_basic(header, body)?; + if always || rng.gen::() <= POW_VERIFY_RATE { + engine.verify_block_unordered(header, body)?; match chain.block_header(header.parent_hash()) { Some(parent) => engine.verify_block_family(header, &parent, body), - None => engine.verify_block_seal(header), // TODO: fetch validation proof as necessary. + None => Ok(()), } } else { - engine.verify_block_basic(header, body) - } -} - -/// Rebuilds the blockchain from chunks. -/// -/// Does basic verification for all blocks, but `PoW` verification for some. -/// Blocks must be fed in-order. -/// -/// The first block in every chunk is disconnected from the last block in the -/// chunk before it, as chunks may be submitted out-of-order. -/// -/// After all chunks have been submitted, we "glue" the chunks together. -pub struct BlockRebuilder { - chain: BlockChain, - db: Arc, - rng: OsRng, - disconnected: Vec<(u64, H256)>, - best_number: u64, - best_hash: H256, - best_root: H256, - fed_blocks: u64, -} - -impl BlockRebuilder { - /// Create a new BlockRebuilder. - pub fn new(chain: BlockChain, db: Arc, manifest: &ManifestData) -> Result { - Ok(BlockRebuilder { - chain: chain, - db: db, - rng: OsRng::new()?, - disconnected: Vec::new(), - best_number: manifest.block_number, - best_hash: manifest.block_hash, - best_root: manifest.state_root, - fed_blocks: 0, - }) - } - - /// Feed the rebuilder an uncompressed block chunk. - /// Returns the number of blocks fed or any errors. - pub fn feed(&mut self, chunk: &[u8], engine: &Engine, abort_flag: &AtomicBool) -> Result { - use basic_types::Seal::With; - use util::U256; - use util::triehash::ordered_trie_root; - - let rlp = UntrustedRlp::new(chunk); - let item_count = rlp.item_count()?; - let num_blocks = (item_count - 3) as u64; - - trace!(target: "snapshot", "restoring block chunk with {} blocks.", item_count - 3); - - if self.fed_blocks + num_blocks > SNAPSHOT_BLOCKS { - return Err(Error::TooManyBlocks(SNAPSHOT_BLOCKS, self.fed_blocks).into()) - } - - // todo: assert here that these values are consistent with chunks being in order. - let mut cur_number = rlp.val_at::(0)? + 1; - let mut parent_hash = rlp.val_at::(1)?; - let parent_total_difficulty = rlp.val_at::(2)?; - - for idx in 3..item_count { - if !abort_flag.load(Ordering::SeqCst) { return Err(Error::RestorationAborted.into()) } - - let pair = rlp.at(idx)?; - let abridged_rlp = pair.at(0)?.as_raw().to_owned(); - let abridged_block = AbridgedBlock::from_raw(abridged_rlp); - let receipts: Vec<::receipt::Receipt> = pair.list_at(1)?; - let receipts_root = ordered_trie_root( - pair.at(1)?.iter().map(|r| r.as_raw().to_owned()) - ); - - let block = abridged_block.to_block(parent_hash, cur_number, receipts_root)?; - let block_bytes = block.rlp_bytes(With); - let is_best = cur_number == self.best_number; - - if is_best { - if block.header.hash() != self.best_hash { - return Err(Error::WrongBlockHash(cur_number, self.best_hash, block.header.hash()).into()) - } - - if block.header.state_root() != &self.best_root { - return Err(Error::WrongStateRoot(self.best_root, *block.header.state_root()).into()) - } - } - - verify_old_block( - &mut self.rng, - &block.header, - engine, - &self.chain, - Some(&block_bytes), - is_best - )?; - - let mut batch = self.db.transaction(); - - // special-case the first block in each chunk. - if idx == 3 { - if self.chain.insert_unordered_block(&mut batch, &block_bytes, receipts, Some(parent_total_difficulty), is_best, false) { - self.disconnected.push((cur_number, block.header.hash())); - } - } else { - self.chain.insert_unordered_block(&mut batch, &block_bytes, receipts, None, is_best, false); - } - self.db.write_buffered(batch); - self.chain.commit(); - - parent_hash = BlockView::new(&block_bytes).hash(); - cur_number += 1; - } - - self.fed_blocks += num_blocks; - - Ok(num_blocks) - } - - /// Glue together any disconnected chunks and check that the chain is complete. - pub fn finalize(self, canonical: HashMap) -> Result<(), Error> { - let mut batch = self.db.transaction(); - - for (first_num, first_hash) in self.disconnected { - let parent_num = first_num - 1; - - // check if the parent is even in the chain. - // since we don't restore every single block in the chain, - // the first block of the first chunks has nothing to connect to. - if let Some(parent_hash) = self.chain.block_hash(parent_num) { - // if so, add the child to it. - self.chain.add_child(&mut batch, parent_hash, first_hash); - } - } - self.db.write_buffered(batch); - - let best_number = self.best_number; - for num in (0..self.fed_blocks).map(|x| best_number - x) { - - let hash = self.chain.block_hash(num).ok_or(Error::IncompleteChain)?; - - if let Some(canon_hash) = canonical.get(&num).cloned() { - if canon_hash != hash { - return Err(Error::WrongBlockHash(num, canon_hash, hash)); - } - } - } - Ok(()) } } diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index 06e659bc1..dc92b5427 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -16,14 +16,14 @@ //! Snapshot network service implementation. -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::io::ErrorKind; use std::fs; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use super::{ManifestData, StateRebuilder, BlockRebuilder, RestorationStatus, SnapshotService}; +use super::{ManifestData, StateRebuilder, Rebuilder, RestorationStatus, SnapshotService}; use super::io::{SnapshotReader, LooseReader, SnapshotWriter, LooseWriter}; use blockchain::BlockChain; @@ -69,12 +69,11 @@ struct Restoration { state_chunks_left: HashSet, block_chunks_left: HashSet, state: StateRebuilder, - blocks: BlockRebuilder, + secondary: Box, writer: Option, snappy_buffer: Bytes, final_state_root: H256, guard: Guard, - canonical_hashes: HashMap, db: Arc, } @@ -86,6 +85,7 @@ struct RestorationParams<'a> { writer: Option, // writer for recovered snapshot. genesis: &'a [u8], // genesis block of the chain. guard: Guard, // guard for the restoration directory. + engine: &'a Engine, } impl Restoration { @@ -100,7 +100,10 @@ impl Restoration { .map_err(UtilError::SimpleString)?); let chain = BlockChain::new(Default::default(), params.genesis, raw_db.clone()); - let blocks = BlockRebuilder::new(chain, raw_db.clone(), &manifest)?; + let components = params.engine.snapshot_components() + .ok_or_else(|| ::snapshot::Error::SnapshotsUnsupported)?; + + let secondary = components.rebuilder(chain, raw_db.clone(), &manifest)?; let root = manifest.state_root.clone(); Ok(Restoration { @@ -108,12 +111,11 @@ impl Restoration { state_chunks_left: state_chunks, block_chunks_left: block_chunks, state: StateRebuilder::new(raw_db.clone(), params.pruning), - blocks: blocks, + secondary: secondary, writer: params.writer, snappy_buffer: Vec::new(), final_state_root: root, guard: params.guard, - canonical_hashes: HashMap::new(), db: raw_db, }) } @@ -138,7 +140,7 @@ impl Restoration { if self.block_chunks_left.remove(&hash) { let len = snappy::decompress_into(chunk, &mut self.snappy_buffer)?; - self.blocks.feed(&self.snappy_buffer[..len], engine, flag)?; + self.secondary.feed(&self.snappy_buffer[..len], engine, flag)?; if let Some(ref mut writer) = self.writer.as_mut() { writer.write_block_chunk(hash, chunk)?; } @@ -147,13 +149,8 @@ impl Restoration { Ok(()) } - // note canonical hashes. - fn note_canonical(&mut self, hashes: &[(u64, H256)]) { - self.canonical_hashes.extend(hashes.iter().cloned()); - } - // finish up restoration. - fn finalize(self) -> Result<(), Error> { + fn finalize(mut self) -> Result<(), Error> { use util::trie::TrieError; if !self.is_done() { return Ok(()) } @@ -169,7 +166,7 @@ impl Restoration { self.state.finalize(self.manifest.block_number, self.manifest.block_hash)?; // connect out-of-order chunks and verify chain integrity. - self.blocks.finalize(self.canonical_hashes)?; + self.secondary.finalize()?; if let Some(writer) = self.writer { writer.finish(self.manifest)?; @@ -425,6 +422,7 @@ impl Service { writer: writer, genesis: &self.genesis_block, guard: Guard::new(rest_dir), + engine: &*self.engine, }; let state_chunks = params.manifest.state_hashes.len(); @@ -593,14 +591,6 @@ impl SnapshotService for Service { trace!("Error sending snapshot service message: {:?}", e); } } - - fn provide_canon_hashes(&self, canonical: &[(u64, H256)]) { - let mut rest = self.restoration.lock(); - - if let Some(ref mut rest) = rest.as_mut() { - rest.note_canonical(canonical); - } - } } impl Drop for Service { diff --git a/ethcore/src/snapshot/snapshot_service_trait.rs b/ethcore/src/snapshot/snapshot_service_trait.rs index 7b53ee9b9..67e96398e 100644 --- a/ethcore/src/snapshot/snapshot_service_trait.rs +++ b/ethcore/src/snapshot/snapshot_service_trait.rs @@ -48,10 +48,6 @@ pub trait SnapshotService : Sync + Send { /// Feed a raw block chunk to the service to be processed asynchronously. /// no-op if currently restoring. fn restore_block_chunk(&self, hash: H256, chunk: Bytes); - - /// Give the restoration in-progress some canonical block hashes for - /// extra verification (performed at the end) - fn provide_canon_hashes(&self, canonical: &[(u64, H256)]); } impl IpcConfig for SnapshotService { } diff --git a/ethcore/src/snapshot/tests/blocks.rs b/ethcore/src/snapshot/tests/blocks.rs index ff63afdfc..fae9ae75e 100644 --- a/ethcore/src/snapshot/tests/blocks.rs +++ b/ethcore/src/snapshot/tests/blocks.rs @@ -21,13 +21,12 @@ use error::Error; use blockchain::generator::{ChainGenerator, ChainIterator, BlockFinalizer}; use blockchain::BlockChain; -use snapshot::{chunk_blocks, BlockRebuilder, Error as SnapshotError, Progress}; +use snapshot::{chunk_secondary, Error as SnapshotError, Progress, SnapshotComponents}; use snapshot::io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter}; use util::{Mutex, snappy}; -use util::kvdb::{Database, DatabaseConfig}; +use util::kvdb::{self, KeyValueDB, DBTransaction}; -use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::AtomicBool; @@ -35,19 +34,18 @@ fn chunk_and_restore(amount: u64) { let mut canon_chain = ChainGenerator::default(); let mut finalizer = BlockFinalizer::default(); let genesis = canon_chain.generate(&mut finalizer).unwrap(); - let db_cfg = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + let components = ::snapshot::PowSnapshot; let engine = Arc::new(::engines::NullEngine::default()); - let orig_path = RandomTempPath::create_dir(); let new_path = RandomTempPath::create_dir(); let mut snapshot_path = new_path.as_path().to_owned(); snapshot_path.push("SNAP"); - let old_db = Arc::new(Database::open(&db_cfg, orig_path.as_str()).unwrap()); + let old_db = Arc::new(kvdb::in_memory(::db::NUM_COLUMNS.unwrap_or(0))); let bc = BlockChain::new(Default::default(), &genesis, old_db.clone()); // build the blockchain. - let mut batch = old_db.transaction(); + let mut batch = DBTransaction::new(); for _ in 0..amount { let block = canon_chain.generate(&mut finalizer).unwrap(); bc.insert_block(&mut batch, &block, vec![]); @@ -56,12 +54,18 @@ fn chunk_and_restore(amount: u64) { old_db.write(batch).unwrap(); - let best_hash = bc.best_block_hash(); // snapshot it. let writer = Mutex::new(PackedWriter::new(&snapshot_path).unwrap()); - let block_hashes = chunk_blocks(&bc, best_hash, &writer, &Progress::default()).unwrap(); + let block_hashes = chunk_secondary( + Box::new(::snapshot::PowSnapshot), + &bc, + best_hash, + &writer, + &Progress::default() + ).unwrap(); + let manifest = ::snapshot::ManifestData { version: 2, state_hashes: Vec::new(), @@ -74,9 +78,10 @@ fn chunk_and_restore(amount: u64) { writer.into_inner().finish(manifest.clone()).unwrap(); // restore it. - let new_db = Arc::new(Database::open(&db_cfg, new_path.as_str()).unwrap()); + let new_db = Arc::new(kvdb::in_memory(::db::NUM_COLUMNS.unwrap_or(0))); let new_chain = BlockChain::new(Default::default(), &genesis, new_db.clone()); - let mut rebuilder = BlockRebuilder::new(new_chain, new_db.clone(), &manifest).unwrap(); + let mut rebuilder = components.rebuilder(new_chain, new_db.clone(), &manifest).unwrap(); + let reader = PackedReader::new(&snapshot_path).unwrap().unwrap(); let flag = AtomicBool::new(true); for chunk_hash in &reader.manifest().block_hashes { @@ -85,7 +90,8 @@ fn chunk_and_restore(amount: u64) { rebuilder.feed(&chunk, engine.as_ref(), &flag).unwrap(); } - rebuilder.finalize(HashMap::new()).unwrap(); + rebuilder.finalize().unwrap(); + drop(rebuilder); // and test it. let new_chain = BlockChain::new(Default::default(), &genesis, new_db); @@ -118,10 +124,8 @@ fn checks_flag() { }; let chunk = stream.out(); - let path = RandomTempPath::create_dir(); - let db_cfg = DatabaseConfig::with_columns(::db::NUM_COLUMNS); - let db = Arc::new(Database::open(&db_cfg, path.as_str()).unwrap()); + let db = Arc::new(kvdb::in_memory(::db::NUM_COLUMNS.unwrap_or(0))); let engine = Arc::new(::engines::NullEngine::default()); let chain = BlockChain::new(Default::default(), &genesis, db.clone()); @@ -134,7 +138,7 @@ fn checks_flag() { block_hash: H256::default(), }; - let mut rebuilder = BlockRebuilder::new(chain, db.clone(), &manifest).unwrap(); + let mut rebuilder = ::snapshot::PowSnapshot.rebuilder(chain, db.clone(), &manifest).unwrap(); match rebuilder.feed(&chunk, engine.as_ref(), &AtomicBool::new(false)) { Err(Error::Snapshot(SnapshotError::RestorationAborted)) => {} diff --git a/rpc/src/v1/tests/helpers/snapshot_service.rs b/rpc/src/v1/tests/helpers/snapshot_service.rs index 2314cea10..f03eb3bc3 100644 --- a/rpc/src/v1/tests/helpers/snapshot_service.rs +++ b/rpc/src/v1/tests/helpers/snapshot_service.rs @@ -47,5 +47,4 @@ impl SnapshotService for TestSnapshotService { fn abort_restore(&self) { } fn restore_state_chunk(&self, _hash: H256, _chunk: Bytes) { } fn restore_block_chunk(&self, _hash: H256, _chunk: Bytes) { } - fn provide_canon_hashes(&self, _hashes: &[(u64, H256)]) { } -} \ No newline at end of file +} diff --git a/sync/src/tests/snapshot.rs b/sync/src/tests/snapshot.rs index 16114e216..0f97ec913 100644 --- a/sync/src/tests/snapshot.rs +++ b/sync/src/tests/snapshot.rs @@ -24,7 +24,6 @@ use SyncConfig; pub struct TestSnapshotService { manifest: Option, chunks: HashMap, - canon_hashes: Mutex>, restoration_manifest: Mutex>, state_restoration_chunks: Mutex>, @@ -36,7 +35,6 @@ impl TestSnapshotService { TestSnapshotService { manifest: None, chunks: HashMap::new(), - canon_hashes: Mutex::new(HashMap::new()), restoration_manifest: Mutex::new(None), state_restoration_chunks: Mutex::new(HashMap::new()), block_restoration_chunks: Mutex::new(HashMap::new()), @@ -61,7 +59,6 @@ impl TestSnapshotService { TestSnapshotService { manifest: Some(manifest), chunks: chunks, - canon_hashes: Mutex::new(HashMap::new()), restoration_manifest: Mutex::new(None), state_restoration_chunks: Mutex::new(HashMap::new()), block_restoration_chunks: Mutex::new(HashMap::new()), @@ -115,10 +112,6 @@ impl SnapshotService for TestSnapshotService { self.block_restoration_chunks.lock().insert(hash, chunk); } } - - fn provide_canon_hashes(&self, hashes: &[(u64, H256)]) { - self.canon_hashes.lock().extend(hashes.iter().cloned()); - } } #[test]