snapshot chunk and restore traits
This commit is contained in:
		
							parent
							
								
									4d3f137e1e
								
							
						
					
					
						commit
						2ec3397b7d
					
				| @ -894,7 +894,7 @@ impl Client { | ||||
| 			}, | ||||
| 		}; | ||||
| 
 | ||||
| 		snapshot::take_snapshot(&self.chain.read(), start_hash, db.as_hashdb(), writer, p)?; | ||||
| 		snapshot::take_snapshot(&*self.engine, &self.chain.read(), start_hash, db.as_hashdb(), writer, p)?; | ||||
| 
 | ||||
| 		Ok(()) | ||||
| 	} | ||||
|  | ||||
| @ -45,6 +45,7 @@ use error::{Error, TransactionError}; | ||||
| use evm::Schedule; | ||||
| use header::Header; | ||||
| use spec::CommonParams; | ||||
| use snapshot::SnapshotComponents; | ||||
| use transaction::{UnverifiedTransaction, SignedTransaction}; | ||||
| use receipt::Receipt; | ||||
| 
 | ||||
| @ -294,4 +295,10 @@ pub trait Engine : Sync + Send { | ||||
| 
 | ||||
| 	/// Stops any services that the may hold the Engine and makes it safe to drop.
 | ||||
| 	fn stop(&self) {} | ||||
| 
 | ||||
| 	/// Create a factory for building snapshot chunks and restoring from them.
 | ||||
| 	/// Returning `None` indicates that this engine doesn't support snapshot creation.
 | ||||
| 	fn snapshot_components(&self) -> Option<Box<SnapshotComponents>> { | ||||
| 		None | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @ -60,4 +60,8 @@ impl Engine for NullEngine { | ||||
| 	fn schedule(&self, _env_info: &EnvInfo) -> Schedule { | ||||
| 		Schedule::new_homestead() | ||||
| 	} | ||||
| 
 | ||||
| 	fn snapshot_components(&self) -> Option<Box<::snapshot::SnapshotComponents>> { | ||||
| 		Some(Box::new(::snapshot::PowSnapshot)) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @ -405,6 +405,10 @@ impl Engine for Arc<Ethash> { | ||||
| 	fn epoch_verifier(&self, _header: &Header, _proof: &[u8]) -> Result<Box<::engines::EpochVerifier>, Error> { | ||||
| 		Ok(Box::new(self.clone())) | ||||
| 	} | ||||
| 
 | ||||
| 	fn snapshot_components(&self) -> Option<Box<::snapshot::SnapshotComponents>> { | ||||
| 		Some(Box::new(::snapshot::PowSnapshot)) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // Try to round gas_limit a bit so that:
 | ||||
|  | ||||
							
								
								
									
										349
									
								
								ethcore/src/snapshot/consensus/mod.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										349
									
								
								ethcore/src/snapshot/consensus/mod.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,349 @@ | ||||
| // Copyright 2015-2017 Parity Technologies (UK) Ltd.
 | ||||
| // This file is part of Parity.
 | ||||
| 
 | ||||
| // Parity is free software: you can redistribute it and/or modify
 | ||||
| // it under the terms of the GNU General Public License as published by
 | ||||
| // the Free Software Foundation, either version 3 of the License, or
 | ||||
| // (at your option) any later version.
 | ||||
| 
 | ||||
| // Parity is distributed in the hope that it will be useful,
 | ||||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | ||||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | ||||
| // GNU General Public License for more details.
 | ||||
| 
 | ||||
| // You should have received a copy of the GNU General Public License
 | ||||
| // along with Parity.  If not, see <http://www.gnu.org/licenses/>.
 | ||||
| 
 | ||||
| //! Secondary chunk creation and restoration, implementations for different consensus
 | ||||
| //! engines.
 | ||||
| 
 | ||||
| use std::collections::VecDeque; | ||||
| use std::io; | ||||
| use std::sync::atomic::{AtomicBool, Ordering}; | ||||
| use std::sync::Arc; | ||||
| 
 | ||||
| use blockchain::{BlockChain, BlockProvider}; | ||||
| use engines::Engine; | ||||
| use snapshot::{Error, ManifestData}; | ||||
| use snapshot::block::AbridgedBlock; | ||||
| 
 | ||||
| use util::{Bytes, H256}; | ||||
| use util::kvdb::KeyValueDB; | ||||
| use rand::OsRng; | ||||
| use rlp::{RlpStream, UntrustedRlp}; | ||||
| 
 | ||||
| 
 | ||||
| /// A sink for produced chunks.
 | ||||
| pub type ChunkSink<'a> = FnMut(&[u8]) -> io::Result<()> + 'a; | ||||
| 
 | ||||
| // How many blocks to include in a snapshot, starting from the head of the chain.
 | ||||
| const SNAPSHOT_BLOCKS: u64 = 30000; | ||||
| 
 | ||||
| /// Components necessary for snapshot creation and restoration.
 | ||||
| pub trait SnapshotComponents: Send { | ||||
| 	/// Create secondary snapshot chunks; these corroborate the state data
 | ||||
| 	/// in the state chunks.
 | ||||
| 	///
 | ||||
| 	/// Chunks shouldn't exceed the given preferred size, and should be fed
 | ||||
| 	/// uncompressed into the sink.
 | ||||
| 	///
 | ||||
| 	/// This will vary by consensus engine, so it's exposed as a trait.
 | ||||
| 	fn chunk_all( | ||||
| 		&mut self, | ||||
| 		chain: &BlockChain, | ||||
| 		block_at: H256, | ||||
| 		chunk_sink: &mut ChunkSink, | ||||
| 		preferred_size: usize, | ||||
| 	) -> Result<(), Error>; | ||||
| 
 | ||||
| 	/// Create a rebuilder, which will have chunks fed into it in aribtrary
 | ||||
| 	/// order and then be finalized.
 | ||||
| 	///
 | ||||
| 	/// The manifest, a database, and fresh `BlockChain` are supplied.
 | ||||
| 	// TODO: supply anything for state?
 | ||||
| 	fn rebuilder( | ||||
| 		&self, | ||||
| 		chain: BlockChain, | ||||
| 		db: Arc<KeyValueDB>, | ||||
| 		manifest: &ManifestData, | ||||
| 	) -> Result<Box<Rebuilder>, ::error::Error>; | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| /// Restore from secondary snapshot chunks.
 | ||||
| pub trait Rebuilder: Send { | ||||
| 	/// Feed a chunk, potentially out of order.
 | ||||
| 	///
 | ||||
| 	/// Check `abort_flag` periodically while doing heavy work. If set to `false`, should bail with
 | ||||
| 	/// `Error::RestorationAborted`.
 | ||||
| 	fn feed( | ||||
| 		&mut self, | ||||
| 		chunk: &[u8], | ||||
| 		engine: &Engine, | ||||
| 		abort_flag: &AtomicBool, | ||||
| 	) -> Result<(), ::error::Error>; | ||||
| 
 | ||||
| 	/// Finalize the restoration. Will be done after all chunks have been
 | ||||
| 	/// fed successfully.
 | ||||
| 	/// This will apply the necessary "glue" between chunks.
 | ||||
| 	fn finalize(&mut self) -> Result<(), Error>; | ||||
| } | ||||
| 
 | ||||
| /// Snapshot creation and restoration for PoW chains.
 | ||||
| /// This includes blocks from the head of the chain as a
 | ||||
| /// loose assurance that the chain is valid.
 | ||||
| pub struct PowSnapshot; | ||||
| 
 | ||||
| impl SnapshotComponents for PowSnapshot { | ||||
| 	fn chunk_all( | ||||
| 		&mut self, | ||||
| 		chain: &BlockChain, | ||||
| 		block_at: H256, | ||||
| 		chunk_sink: &mut ChunkSink, | ||||
| 		preferred_size: usize, | ||||
| 	) -> Result<(), Error> { | ||||
| 		PowWorker { | ||||
| 			chain: chain, | ||||
| 			rlps: VecDeque::new(), | ||||
| 			current_hash: block_at, | ||||
| 			writer: chunk_sink, | ||||
| 			preferred_size: preferred_size, | ||||
| 		}.chunk_all() | ||||
| 	} | ||||
| 
 | ||||
| 	fn rebuilder( | ||||
| 		&self, | ||||
| 		chain: BlockChain, | ||||
| 		db: Arc<KeyValueDB>, | ||||
| 		manifest: &ManifestData, | ||||
| 	) -> Result<Box<Rebuilder>, ::error::Error> { | ||||
| 		PowRebuilder::new(chain, db, manifest).map(|r| Box::new(r) as Box<_>) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| /// Used to build block chunks.
 | ||||
| struct PowWorker<'a> { | ||||
| 	chain: &'a BlockChain, | ||||
| 	// block, receipt rlp pairs.
 | ||||
| 	rlps: VecDeque<Bytes>, | ||||
| 	current_hash: H256, | ||||
| 	writer: &'a mut ChunkSink<'a>, | ||||
| 	preferred_size: usize, | ||||
| } | ||||
| 
 | ||||
| impl<'a> PowWorker<'a> { | ||||
| 	// Repeatedly fill the buffers and writes out chunks, moving backwards from starting block hash.
 | ||||
| 	// Loops until we reach the first desired block, and writes out the remainder.
 | ||||
| 	fn chunk_all(&mut self) -> Result<(), Error> { | ||||
| 		let mut loaded_size = 0; | ||||
| 		let mut last = self.current_hash; | ||||
| 
 | ||||
| 		let genesis_hash = self.chain.genesis_hash(); | ||||
| 
 | ||||
| 		for _ in 0..SNAPSHOT_BLOCKS { | ||||
| 			if self.current_hash == genesis_hash { break } | ||||
| 
 | ||||
| 			let (block, receipts) = self.chain.block(&self.current_hash) | ||||
| 				.and_then(|b| self.chain.block_receipts(&self.current_hash).map(|r| (b, r))) | ||||
| 				.ok_or(Error::BlockNotFound(self.current_hash))?; | ||||
| 
 | ||||
| 			let abridged_rlp = AbridgedBlock::from_block_view(&block.view()).into_inner(); | ||||
| 
 | ||||
| 			let pair = { | ||||
| 				let mut pair_stream = RlpStream::new_list(2); | ||||
| 				pair_stream.append_raw(&abridged_rlp, 1).append(&receipts); | ||||
| 				pair_stream.out() | ||||
| 			}; | ||||
| 
 | ||||
| 			let new_loaded_size = loaded_size + pair.len(); | ||||
| 
 | ||||
| 			// cut off the chunk if too large.
 | ||||
| 
 | ||||
| 			if new_loaded_size > self.preferred_size && !self.rlps.is_empty() { | ||||
| 				self.write_chunk(last)?; | ||||
| 				loaded_size = pair.len(); | ||||
| 			} else { | ||||
| 				loaded_size = new_loaded_size; | ||||
| 			} | ||||
| 
 | ||||
| 			self.rlps.push_front(pair); | ||||
| 
 | ||||
| 			last = self.current_hash; | ||||
| 			self.current_hash = block.header_view().parent_hash(); | ||||
| 		} | ||||
| 
 | ||||
| 		if loaded_size != 0 { | ||||
| 			self.write_chunk(last)?; | ||||
| 		} | ||||
| 
 | ||||
| 		Ok(()) | ||||
| 	} | ||||
| 
 | ||||
| 	// write out the data in the buffers to a chunk on disk
 | ||||
| 	//
 | ||||
| 	// we preface each chunk with the parent of the first block's details,
 | ||||
| 	// obtained from the details of the last block written.
 | ||||
| 	fn write_chunk(&mut self, last: H256) -> Result<(), Error> { | ||||
| 		trace!(target: "snapshot", "prepared block chunk with {} blocks", self.rlps.len()); | ||||
| 
 | ||||
| 		let (last_header, last_details) = self.chain.block_header(&last) | ||||
| 			.and_then(|n| self.chain.block_details(&last).map(|d| (n, d))) | ||||
| 			.ok_or(Error::BlockNotFound(last))?; | ||||
| 
 | ||||
| 		let parent_number = last_header.number() - 1; | ||||
| 		let parent_hash = last_header.parent_hash(); | ||||
| 		let parent_total_difficulty = last_details.total_difficulty - *last_header.difficulty(); | ||||
| 
 | ||||
| 		trace!(target: "snapshot", "parent last written block: {}", parent_hash); | ||||
| 
 | ||||
| 		let num_entries = self.rlps.len(); | ||||
| 		let mut rlp_stream = RlpStream::new_list(3 + num_entries); | ||||
| 		rlp_stream.append(&parent_number).append(parent_hash).append(&parent_total_difficulty); | ||||
| 
 | ||||
| 		for pair in self.rlps.drain(..) { | ||||
| 			rlp_stream.append_raw(&pair, 1); | ||||
| 		} | ||||
| 
 | ||||
| 		let raw_data = rlp_stream.out(); | ||||
| 
 | ||||
| 		(self.writer)(&raw_data)?; | ||||
| 
 | ||||
| 		Ok(()) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| /// Rebuilder for proof-of-work chains.
 | ||||
| /// Does basic verification for all blocks, but `PoW` verification for some.
 | ||||
| /// Blocks must be fed in-order.
 | ||||
| ///
 | ||||
| /// The first block in every chunk is disconnected from the last block in the
 | ||||
| /// chunk before it, as chunks may be submitted out-of-order.
 | ||||
| ///
 | ||||
| /// After all chunks have been submitted, we "glue" the chunks together.
 | ||||
| pub struct PowRebuilder { | ||||
| 	chain: BlockChain, | ||||
| 	db: Arc<KeyValueDB>, | ||||
| 	rng: OsRng, | ||||
| 	disconnected: Vec<(u64, H256)>, | ||||
| 	best_number: u64, | ||||
| 	best_hash: H256, | ||||
| 	best_root: H256, | ||||
| 	fed_blocks: u64, | ||||
| } | ||||
| 
 | ||||
| impl PowRebuilder { | ||||
| 	/// Create a new PowRebuilder.
 | ||||
| 	fn new(chain: BlockChain, db: Arc<KeyValueDB>, manifest: &ManifestData) -> Result<Self, ::error::Error> { | ||||
| 		Ok(PowRebuilder { | ||||
| 			chain: chain, | ||||
| 			db: db, | ||||
| 			rng: OsRng::new()?, | ||||
| 			disconnected: Vec::new(), | ||||
| 			best_number: manifest.block_number, | ||||
| 			best_hash: manifest.block_hash, | ||||
| 			best_root: manifest.state_root, | ||||
| 			fed_blocks: 0, | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| impl Rebuilder for PowRebuilder { | ||||
| 	/// Feed the rebuilder an uncompressed block chunk.
 | ||||
| 	/// Returns the number of blocks fed or any errors.
 | ||||
| 	fn feed(&mut self, chunk: &[u8], engine: &Engine, abort_flag: &AtomicBool) -> Result<(), ::error::Error> { | ||||
| 		use basic_types::Seal::With; | ||||
| 		use views::BlockView; | ||||
| 		use snapshot::verify_old_block; | ||||
| 		use util::U256; | ||||
| 		use util::triehash::ordered_trie_root; | ||||
| 
 | ||||
| 		let rlp = UntrustedRlp::new(chunk); | ||||
| 		let item_count = rlp.item_count()?; | ||||
| 		let num_blocks = (item_count - 3) as u64; | ||||
| 
 | ||||
| 		trace!(target: "snapshot", "restoring block chunk with {} blocks.", item_count - 3); | ||||
| 
 | ||||
| 		if self.fed_blocks + num_blocks > SNAPSHOT_BLOCKS { | ||||
| 			return Err(Error::TooManyBlocks(SNAPSHOT_BLOCKS, self.fed_blocks).into()) | ||||
| 		} | ||||
| 
 | ||||
| 		// todo: assert here that these values are consistent with chunks being in order.
 | ||||
| 		let mut cur_number = rlp.val_at::<u64>(0)? + 1; | ||||
| 		let mut parent_hash = rlp.val_at::<H256>(1)?; | ||||
| 		let parent_total_difficulty = rlp.val_at::<U256>(2)?; | ||||
| 
 | ||||
| 		for idx in 3..item_count { | ||||
| 			if !abort_flag.load(Ordering::SeqCst) { return Err(Error::RestorationAborted.into()) } | ||||
| 
 | ||||
| 			let pair = rlp.at(idx)?; | ||||
| 			let abridged_rlp = pair.at(0)?.as_raw().to_owned(); | ||||
| 			let abridged_block = AbridgedBlock::from_raw(abridged_rlp); | ||||
| 			let receipts: Vec<::receipt::Receipt> = pair.list_at(1)?; | ||||
| 			let receipts_root = ordered_trie_root( | ||||
| 				pair.at(1)?.iter().map(|r| r.as_raw().to_owned()) | ||||
| 			); | ||||
| 
 | ||||
| 			let block = abridged_block.to_block(parent_hash, cur_number, receipts_root)?; | ||||
| 			let block_bytes = block.rlp_bytes(With); | ||||
| 			let is_best = cur_number == self.best_number; | ||||
| 
 | ||||
| 			if is_best { | ||||
| 				if block.header.hash() != self.best_hash { | ||||
| 					return Err(Error::WrongBlockHash(cur_number, self.best_hash, block.header.hash()).into()) | ||||
| 				} | ||||
| 
 | ||||
| 				if block.header.state_root() != &self.best_root { | ||||
| 					return Err(Error::WrongStateRoot(self.best_root, *block.header.state_root()).into()) | ||||
| 				} | ||||
| 			} | ||||
| 
 | ||||
| 			verify_old_block( | ||||
| 				&mut self.rng, | ||||
| 				&block.header, | ||||
| 				engine, | ||||
| 				&self.chain, | ||||
| 				Some(&block_bytes), | ||||
| 				is_best | ||||
| 			)?; | ||||
| 
 | ||||
| 			let mut batch = self.db.transaction(); | ||||
| 
 | ||||
| 			// special-case the first block in each chunk.
 | ||||
| 			if idx == 3 { | ||||
| 				if self.chain.insert_unordered_block(&mut batch, &block_bytes, receipts, Some(parent_total_difficulty), is_best, false) { | ||||
| 					self.disconnected.push((cur_number, block.header.hash())); | ||||
| 				} | ||||
| 			} else { | ||||
| 				self.chain.insert_unordered_block(&mut batch, &block_bytes, receipts, None, is_best, false); | ||||
| 			} | ||||
| 			self.db.write_buffered(batch); | ||||
| 			self.chain.commit(); | ||||
| 
 | ||||
| 			parent_hash = BlockView::new(&block_bytes).hash(); | ||||
| 			cur_number += 1; | ||||
| 		} | ||||
| 
 | ||||
| 		self.fed_blocks += num_blocks; | ||||
| 
 | ||||
| 		Ok(()) | ||||
| 	} | ||||
| 
 | ||||
| 	/// Glue together any disconnected chunks and check that the chain is complete.
 | ||||
| 	fn finalize(&mut self) -> Result<(), Error> { | ||||
| 		let mut batch = self.db.transaction(); | ||||
| 
 | ||||
| 		for (first_num, first_hash) in self.disconnected.drain(..) { | ||||
| 			let parent_num = first_num - 1; | ||||
| 
 | ||||
| 			// check if the parent is even in the chain.
 | ||||
| 			// since we don't restore every single block in the chain,
 | ||||
| 			// the first block of the first chunks has nothing to connect to.
 | ||||
| 			if let Some(parent_hash) = self.chain.block_hash(parent_num) { | ||||
| 				// if so, add the child to it.
 | ||||
| 				self.chain.add_child(&mut batch, parent_hash, first_hash); | ||||
| 			} | ||||
| 		} | ||||
| 		self.db.write_buffered(batch); | ||||
| 		Ok(()) | ||||
| 	} | ||||
| } | ||||
| @ -57,6 +57,8 @@ pub enum Error { | ||||
| 	VersionNotSupported(u64), | ||||
| 	/// Max chunk size is to small to fit basic account data.
 | ||||
| 	ChunkTooSmall, | ||||
| 	/// Snapshots not supported by the consensus engine.
 | ||||
| 	SnapshotsUnsupported, | ||||
| } | ||||
| 
 | ||||
| impl fmt::Display for Error { | ||||
| @ -79,6 +81,7 @@ impl fmt::Display for Error { | ||||
| 			Error::Trie(ref err) => err.fmt(f), | ||||
| 			Error::VersionNotSupported(ref ver) => write!(f, "Snapshot version {} is not supprted.", ver), | ||||
| 			Error::ChunkTooSmall => write!(f, "Chunk size is too small."), | ||||
| 			Error::SnapshotsUnsupported => write!(f, "Snapshots unsupported by consensus engine."), | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @ -17,9 +17,9 @@ | ||||
| //! Snapshot creation, restoration, and network service.
 | ||||
| //!
 | ||||
| //! Documentation of the format can be found at
 | ||||
| //! https://github.com/paritytech/parity/wiki/%22PV64%22-Snapshot-Format
 | ||||
| //! https://github.com/paritytech/parity/wiki/Warp-Sync-Snapshot-Format
 | ||||
| 
 | ||||
| use std::collections::{HashMap, HashSet, VecDeque}; | ||||
| use std::collections::{HashMap, HashSet}; | ||||
| use std::sync::Arc; | ||||
| use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; | ||||
| 
 | ||||
| @ -28,7 +28,6 @@ use blockchain::{BlockChain, BlockProvider}; | ||||
| use engines::Engine; | ||||
| use header::Header; | ||||
| use ids::BlockId; | ||||
| use views::BlockView; | ||||
| 
 | ||||
| use util::{Bytes, Hashable, HashDB, DBValue, snappy, U256, Uint}; | ||||
| use util::Mutex; | ||||
| @ -40,7 +39,6 @@ use util::sha3::SHA3_NULL_RLP; | ||||
| use rlp::{RlpStream, UntrustedRlp}; | ||||
| use bloom_journal::Bloom; | ||||
| 
 | ||||
| use self::block::AbridgedBlock; | ||||
| use self::io::SnapshotWriter; | ||||
| 
 | ||||
| use super::state_db::StateDB; | ||||
| @ -51,6 +49,7 @@ use rand::{Rng, OsRng}; | ||||
| 
 | ||||
| pub use self::error::Error; | ||||
| 
 | ||||
| pub use self::consensus::*; | ||||
| pub use self::service::{Service, DatabaseRestore}; | ||||
| pub use self::traits::SnapshotService; | ||||
| pub use self::watcher::Watcher; | ||||
| @ -63,6 +62,7 @@ pub mod service; | ||||
| 
 | ||||
| mod account; | ||||
| mod block; | ||||
| mod consensus; | ||||
| mod error; | ||||
| mod watcher; | ||||
| 
 | ||||
| @ -83,9 +83,6 @@ mod traits { | ||||
| // Try to have chunks be around 4MB (before compression)
 | ||||
| const PREFERRED_CHUNK_SIZE: usize = 4 * 1024 * 1024; | ||||
| 
 | ||||
| // How many blocks to include in a snapshot, starting from the head of the chain.
 | ||||
| const SNAPSHOT_BLOCKS: u64 = 30000; | ||||
| 
 | ||||
| /// A progress indicator for snapshots.
 | ||||
| #[derive(Debug, Default)] | ||||
| pub struct Progress { | ||||
| @ -122,6 +119,7 @@ impl Progress { | ||||
| } | ||||
| /// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer.
 | ||||
| pub fn take_snapshot<W: SnapshotWriter + Send>( | ||||
| 	engine: &Engine, | ||||
| 	chain: &BlockChain, | ||||
| 	block_at: H256, | ||||
| 	state_db: &HashDB, | ||||
| @ -136,9 +134,11 @@ pub fn take_snapshot<W: SnapshotWriter + Send>( | ||||
| 	info!("Taking snapshot starting at block {}", number); | ||||
| 
 | ||||
| 	let writer = Mutex::new(writer); | ||||
| 	let chunker = engine.snapshot_components().ok_or(Error::SnapshotsUnsupported)?; | ||||
| 	let (state_hashes, block_hashes) = scope(|scope| { | ||||
| 		let block_guard = scope.spawn(|| chunk_blocks(chain, block_at, &writer, p)); | ||||
| 		let state_res = chunk_state(state_db, state_root, &writer, p); | ||||
| 		let writer = &writer; | ||||
| 		let block_guard = scope.spawn(move || chunk_secondary(chunker, chain, block_at, writer, p)); | ||||
| 		let state_res = chunk_state(state_db, state_root, writer, p); | ||||
| 
 | ||||
| 		state_res.and_then(|state_hashes| { | ||||
| 			block_guard.join().map(|block_hashes| (state_hashes, block_hashes)) | ||||
| @ -163,128 +163,41 @@ pub fn take_snapshot<W: SnapshotWriter + Send>( | ||||
| 	Ok(()) | ||||
| } | ||||
| 
 | ||||
| /// Used to build block chunks.
 | ||||
| struct BlockChunker<'a> { | ||||
| 	chain: &'a BlockChain, | ||||
| 	// block, receipt rlp pairs.
 | ||||
| 	rlps: VecDeque<Bytes>, | ||||
| 	current_hash: H256, | ||||
| 	hashes: Vec<H256>, | ||||
| 	snappy_buffer: Vec<u8>, | ||||
| 	writer: &'a Mutex<SnapshotWriter + 'a>, | ||||
| 	progress: &'a Progress, | ||||
| } | ||||
| 
 | ||||
| impl<'a> BlockChunker<'a> { | ||||
| 	// Repeatedly fill the buffers and writes out chunks, moving backwards from starting block hash.
 | ||||
| 	// Loops until we reach the first desired block, and writes out the remainder.
 | ||||
| 	fn chunk_all(&mut self) -> Result<(), Error> { | ||||
| 		let mut loaded_size = 0; | ||||
| 		let mut last = self.current_hash; | ||||
| 
 | ||||
| 		let genesis_hash = self.chain.genesis_hash(); | ||||
| 
 | ||||
| 		for _ in 0..SNAPSHOT_BLOCKS { | ||||
| 			if self.current_hash == genesis_hash { break } | ||||
| 
 | ||||
| 			let (block, receipts) = self.chain.block(&self.current_hash) | ||||
| 				.and_then(|b| self.chain.block_receipts(&self.current_hash).map(|r| (b, r))) | ||||
| 				.ok_or(Error::BlockNotFound(self.current_hash))?; | ||||
| 
 | ||||
| 			let abridged_rlp = AbridgedBlock::from_block_view(&block.view()).into_inner(); | ||||
| 
 | ||||
| 			let pair = { | ||||
| 				let mut pair_stream = RlpStream::new_list(2); | ||||
| 				pair_stream.append_raw(&abridged_rlp, 1).append(&receipts); | ||||
| 				pair_stream.out() | ||||
| 			}; | ||||
| 
 | ||||
| 			let new_loaded_size = loaded_size + pair.len(); | ||||
| 
 | ||||
| 			// cut off the chunk if too large.
 | ||||
| 
 | ||||
| 			if new_loaded_size > PREFERRED_CHUNK_SIZE && !self.rlps.is_empty() { | ||||
| 				self.write_chunk(last)?; | ||||
| 				loaded_size = pair.len(); | ||||
| 			} else { | ||||
| 				loaded_size = new_loaded_size; | ||||
| 			} | ||||
| 
 | ||||
| 			self.rlps.push_front(pair); | ||||
| 
 | ||||
| 			last = self.current_hash; | ||||
| 			self.current_hash = block.header_view().parent_hash(); | ||||
| 		} | ||||
| 
 | ||||
| 		if loaded_size != 0 { | ||||
| 			self.write_chunk(last)?; | ||||
| 		} | ||||
| 
 | ||||
| 		Ok(()) | ||||
| 	} | ||||
| 
 | ||||
| 	// write out the data in the buffers to a chunk on disk
 | ||||
| 	//
 | ||||
| 	// we preface each chunk with the parent of the first block's details,
 | ||||
| 	// obtained from the details of the last block written.
 | ||||
| 	fn write_chunk(&mut self, last: H256) -> Result<(), Error> { | ||||
| 		trace!(target: "snapshot", "prepared block chunk with {} blocks", self.rlps.len()); | ||||
| 
 | ||||
| 		let (last_header, last_details) = self.chain.block_header(&last) | ||||
| 			.and_then(|n| self.chain.block_details(&last).map(|d| (n, d))) | ||||
| 			.ok_or(Error::BlockNotFound(last))?; | ||||
| 
 | ||||
| 		let parent_number = last_header.number() - 1; | ||||
| 		let parent_hash = last_header.parent_hash(); | ||||
| 		let parent_total_difficulty = last_details.total_difficulty - *last_header.difficulty(); | ||||
| 
 | ||||
| 		trace!(target: "snapshot", "parent last written block: {}", parent_hash); | ||||
| 
 | ||||
| 		let num_entries = self.rlps.len(); | ||||
| 		let mut rlp_stream = RlpStream::new_list(3 + num_entries); | ||||
| 		rlp_stream.append(&parent_number).append(parent_hash).append(&parent_total_difficulty); | ||||
| 
 | ||||
| 		for pair in self.rlps.drain(..) { | ||||
| 			rlp_stream.append_raw(&pair, 1); | ||||
| 		} | ||||
| 
 | ||||
| 		let raw_data = rlp_stream.out(); | ||||
| 
 | ||||
| 		let size = snappy::compress_into(&raw_data, &mut self.snappy_buffer); | ||||
| 		let compressed = &self.snappy_buffer[..size]; | ||||
| 		let hash = compressed.sha3(); | ||||
| 
 | ||||
| 		self.writer.lock().write_block_chunk(hash, compressed)?; | ||||
| 		trace!(target: "snapshot", "wrote block chunk. hash: {}, size: {}, uncompressed size: {}", hash.hex(), size, raw_data.len()); | ||||
| 
 | ||||
| 		self.progress.size.fetch_add(size, Ordering::SeqCst); | ||||
| 		self.progress.blocks.fetch_add(num_entries, Ordering::SeqCst); | ||||
| 
 | ||||
| 		self.hashes.push(hash); | ||||
| 		Ok(()) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| /// Create and write out all block chunks to disk, returning a vector of all
 | ||||
| /// the hashes of block chunks created.
 | ||||
| /// Create and write out all secondary chunks to disk, returning a vector of all
 | ||||
| /// the hashes of secondary chunks created.
 | ||||
| ///
 | ||||
| /// The path parameter is the directory to store the block chunks in.
 | ||||
| /// This function assumes the directory exists already.
 | ||||
| /// Secondary chunks are engine-specific, but they intend to corroborate the state data
 | ||||
| /// in the state chunks.
 | ||||
| /// Returns a list of chunk hashes, with the first having the blocks furthest from the genesis.
 | ||||
| pub fn chunk_blocks<'a>(chain: &'a BlockChain, start_hash: H256, writer: &Mutex<SnapshotWriter + 'a>, progress: &'a Progress) -> Result<Vec<H256>, Error> { | ||||
| 	let mut chunker = BlockChunker { | ||||
| 		chain: chain, | ||||
| 		rlps: VecDeque::new(), | ||||
| 		current_hash: start_hash, | ||||
| 		hashes: Vec::new(), | ||||
| 		snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)], | ||||
| 		writer: writer, | ||||
| 		progress: progress, | ||||
| pub fn chunk_secondary<'a>(mut chunker: Box<SnapshotComponents>, chain: &'a BlockChain, start_hash: H256, writer: &Mutex<SnapshotWriter + 'a>, progress: &'a Progress) -> Result<Vec<H256>, Error> { | ||||
| 	let mut chunk_hashes = Vec::new(); | ||||
| 	let mut snappy_buffer = vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)]; | ||||
| 
 | ||||
| 	{ | ||||
| 		let mut chunk_sink = |raw_data: &[u8]| { | ||||
| 			let compressed_size = snappy::compress_into(raw_data, &mut snappy_buffer); | ||||
| 			let compressed = &snappy_buffer[..compressed_size]; | ||||
| 			let hash = compressed.sha3(); | ||||
| 			let size = compressed.len(); | ||||
| 
 | ||||
| 			writer.lock().write_block_chunk(hash, compressed)?; | ||||
| 			trace!(target: "snapshot", "wrote secondary chunk. hash: {}, size: {}, uncompressed size: {}", | ||||
| 				hash.hex(), size, raw_data.len()); | ||||
| 
 | ||||
| 			progress.size.fetch_add(size, Ordering::SeqCst); | ||||
| 			chunk_hashes.push(hash); | ||||
| 			Ok(()) | ||||
| 		}; | ||||
| 
 | ||||
| 	chunker.chunk_all()?; | ||||
| 		chunker.chunk_all( | ||||
| 			chain, | ||||
| 			start_hash, | ||||
| 			&mut chunk_sink, | ||||
| 			PREFERRED_CHUNK_SIZE, | ||||
| 		)?; | ||||
| 	} | ||||
| 
 | ||||
| 	Ok(chunker.hashes) | ||||
| 	Ok(chunk_hashes) | ||||
| } | ||||
| 
 | ||||
| /// State trie chunker.
 | ||||
| @ -564,158 +477,15 @@ const POW_VERIFY_RATE: f32 = 0.02; | ||||
| /// the fullest verification possible. If not, it will take a random sample to determine whether it will
 | ||||
| /// do heavy or light verification.
 | ||||
| pub fn verify_old_block(rng: &mut OsRng, header: &Header, engine: &Engine, chain: &BlockChain, body: Option<&[u8]>, always: bool) -> Result<(), ::error::Error> { | ||||
| 	engine.verify_block_basic(header, body)?; | ||||
| 
 | ||||
| 	if always || rng.gen::<f32>() <= POW_VERIFY_RATE { | ||||
| 		engine.verify_block_unordered(header, body)?; | ||||
| 		match chain.block_header(header.parent_hash()) { | ||||
| 			Some(parent) => engine.verify_block_family(header, &parent, body), | ||||
| 			None => engine.verify_block_seal(header), // TODO: fetch validation proof as necessary.
 | ||||
| 			None => Ok(()), | ||||
| 		} | ||||
| 	} else { | ||||
| 		engine.verify_block_basic(header, body) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| /// Rebuilds the blockchain from chunks.
 | ||||
| ///
 | ||||
| /// Does basic verification for all blocks, but `PoW` verification for some.
 | ||||
| /// Blocks must be fed in-order.
 | ||||
| ///
 | ||||
| /// The first block in every chunk is disconnected from the last block in the
 | ||||
| /// chunk before it, as chunks may be submitted out-of-order.
 | ||||
| ///
 | ||||
| /// After all chunks have been submitted, we "glue" the chunks together.
 | ||||
| pub struct BlockRebuilder { | ||||
| 	chain: BlockChain, | ||||
| 	db: Arc<Database>, | ||||
| 	rng: OsRng, | ||||
| 	disconnected: Vec<(u64, H256)>, | ||||
| 	best_number: u64, | ||||
| 	best_hash: H256, | ||||
| 	best_root: H256, | ||||
| 	fed_blocks: u64, | ||||
| } | ||||
| 
 | ||||
| impl BlockRebuilder { | ||||
| 	/// Create a new BlockRebuilder.
 | ||||
| 	pub fn new(chain: BlockChain, db: Arc<Database>, manifest: &ManifestData) -> Result<Self, ::error::Error> { | ||||
| 		Ok(BlockRebuilder { | ||||
| 			chain: chain, | ||||
| 			db: db, | ||||
| 			rng: OsRng::new()?, | ||||
| 			disconnected: Vec::new(), | ||||
| 			best_number: manifest.block_number, | ||||
| 			best_hash: manifest.block_hash, | ||||
| 			best_root: manifest.state_root, | ||||
| 			fed_blocks: 0, | ||||
| 		}) | ||||
| 	} | ||||
| 
 | ||||
| 	/// Feed the rebuilder an uncompressed block chunk.
 | ||||
| 	/// Returns the number of blocks fed or any errors.
 | ||||
| 	pub fn feed(&mut self, chunk: &[u8], engine: &Engine, abort_flag: &AtomicBool) -> Result<u64, ::error::Error> { | ||||
| 		use basic_types::Seal::With; | ||||
| 		use util::U256; | ||||
| 		use util::triehash::ordered_trie_root; | ||||
| 
 | ||||
| 		let rlp = UntrustedRlp::new(chunk); | ||||
| 		let item_count = rlp.item_count()?; | ||||
| 		let num_blocks = (item_count - 3) as u64; | ||||
| 
 | ||||
| 		trace!(target: "snapshot", "restoring block chunk with {} blocks.", item_count - 3); | ||||
| 
 | ||||
| 		if self.fed_blocks + num_blocks > SNAPSHOT_BLOCKS { | ||||
| 			return Err(Error::TooManyBlocks(SNAPSHOT_BLOCKS, self.fed_blocks).into()) | ||||
| 		} | ||||
| 
 | ||||
| 		// todo: assert here that these values are consistent with chunks being in order.
 | ||||
| 		let mut cur_number = rlp.val_at::<u64>(0)? + 1; | ||||
| 		let mut parent_hash = rlp.val_at::<H256>(1)?; | ||||
| 		let parent_total_difficulty = rlp.val_at::<U256>(2)?; | ||||
| 
 | ||||
| 		for idx in 3..item_count { | ||||
| 			if !abort_flag.load(Ordering::SeqCst) { return Err(Error::RestorationAborted.into()) } | ||||
| 
 | ||||
| 			let pair = rlp.at(idx)?; | ||||
| 			let abridged_rlp = pair.at(0)?.as_raw().to_owned(); | ||||
| 			let abridged_block = AbridgedBlock::from_raw(abridged_rlp); | ||||
| 			let receipts: Vec<::receipt::Receipt> = pair.list_at(1)?; | ||||
| 			let receipts_root = ordered_trie_root( | ||||
| 				pair.at(1)?.iter().map(|r| r.as_raw().to_owned()) | ||||
| 			); | ||||
| 
 | ||||
| 			let block = abridged_block.to_block(parent_hash, cur_number, receipts_root)?; | ||||
| 			let block_bytes = block.rlp_bytes(With); | ||||
| 			let is_best = cur_number == self.best_number; | ||||
| 
 | ||||
| 			if is_best { | ||||
| 				if block.header.hash() != self.best_hash { | ||||
| 					return Err(Error::WrongBlockHash(cur_number, self.best_hash, block.header.hash()).into()) | ||||
| 				} | ||||
| 
 | ||||
| 				if block.header.state_root() != &self.best_root { | ||||
| 					return Err(Error::WrongStateRoot(self.best_root, *block.header.state_root()).into()) | ||||
| 				} | ||||
| 			} | ||||
| 
 | ||||
| 			verify_old_block( | ||||
| 				&mut self.rng, | ||||
| 				&block.header, | ||||
| 				engine, | ||||
| 				&self.chain, | ||||
| 				Some(&block_bytes), | ||||
| 				is_best | ||||
| 			)?; | ||||
| 
 | ||||
| 			let mut batch = self.db.transaction(); | ||||
| 
 | ||||
| 			// special-case the first block in each chunk.
 | ||||
| 			if idx == 3 { | ||||
| 				if self.chain.insert_unordered_block(&mut batch, &block_bytes, receipts, Some(parent_total_difficulty), is_best, false) { | ||||
| 					self.disconnected.push((cur_number, block.header.hash())); | ||||
| 				} | ||||
| 			} else { | ||||
| 				self.chain.insert_unordered_block(&mut batch, &block_bytes, receipts, None, is_best, false); | ||||
| 			} | ||||
| 			self.db.write_buffered(batch); | ||||
| 			self.chain.commit(); | ||||
| 
 | ||||
| 			parent_hash = BlockView::new(&block_bytes).hash(); | ||||
| 			cur_number += 1; | ||||
| 		} | ||||
| 
 | ||||
| 		self.fed_blocks += num_blocks; | ||||
| 
 | ||||
| 		Ok(num_blocks) | ||||
| 	} | ||||
| 
 | ||||
| 	/// Glue together any disconnected chunks and check that the chain is complete.
 | ||||
| 	pub fn finalize(self, canonical: HashMap<u64, H256>) -> Result<(), Error> { | ||||
| 		let mut batch = self.db.transaction(); | ||||
| 
 | ||||
| 		for (first_num, first_hash) in self.disconnected { | ||||
| 			let parent_num = first_num - 1; | ||||
| 
 | ||||
| 			// check if the parent is even in the chain.
 | ||||
| 			// since we don't restore every single block in the chain,
 | ||||
| 			// the first block of the first chunks has nothing to connect to.
 | ||||
| 			if let Some(parent_hash) = self.chain.block_hash(parent_num) { | ||||
| 				// if so, add the child to it.
 | ||||
| 				self.chain.add_child(&mut batch, parent_hash, first_hash); | ||||
| 			} | ||||
| 		} | ||||
| 		self.db.write_buffered(batch); | ||||
| 
 | ||||
| 		let best_number = self.best_number; | ||||
| 		for num in (0..self.fed_blocks).map(|x| best_number - x) { | ||||
| 
 | ||||
| 			let hash = self.chain.block_hash(num).ok_or(Error::IncompleteChain)?; | ||||
| 
 | ||||
| 			if let Some(canon_hash) = canonical.get(&num).cloned() { | ||||
| 				if canon_hash != hash { | ||||
| 					return Err(Error::WrongBlockHash(num, canon_hash, hash)); | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 		Ok(()) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @ -16,14 +16,14 @@ | ||||
| 
 | ||||
| //! Snapshot network service implementation.
 | ||||
| 
 | ||||
| use std::collections::{HashMap, HashSet}; | ||||
| use std::collections::HashSet; | ||||
| use std::io::ErrorKind; | ||||
| use std::fs; | ||||
| use std::path::PathBuf; | ||||
| use std::sync::Arc; | ||||
| use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; | ||||
| 
 | ||||
| use super::{ManifestData, StateRebuilder, BlockRebuilder, RestorationStatus, SnapshotService}; | ||||
| use super::{ManifestData, StateRebuilder, Rebuilder, RestorationStatus, SnapshotService}; | ||||
| use super::io::{SnapshotReader, LooseReader, SnapshotWriter, LooseWriter}; | ||||
| 
 | ||||
| use blockchain::BlockChain; | ||||
| @ -69,12 +69,11 @@ struct Restoration { | ||||
| 	state_chunks_left: HashSet<H256>, | ||||
| 	block_chunks_left: HashSet<H256>, | ||||
| 	state: StateRebuilder, | ||||
| 	blocks: BlockRebuilder, | ||||
| 	secondary: Box<Rebuilder>, | ||||
| 	writer: Option<LooseWriter>, | ||||
| 	snappy_buffer: Bytes, | ||||
| 	final_state_root: H256, | ||||
| 	guard: Guard, | ||||
| 	canonical_hashes: HashMap<u64, H256>, | ||||
| 	db: Arc<Database>, | ||||
| } | ||||
| 
 | ||||
| @ -86,6 +85,7 @@ struct RestorationParams<'a> { | ||||
| 	writer: Option<LooseWriter>, // writer for recovered snapshot.
 | ||||
| 	genesis: &'a [u8], // genesis block of the chain.
 | ||||
| 	guard: Guard, // guard for the restoration directory.
 | ||||
| 	engine: &'a Engine, | ||||
| } | ||||
| 
 | ||||
| impl Restoration { | ||||
| @ -100,7 +100,10 @@ impl Restoration { | ||||
| 			.map_err(UtilError::SimpleString)?); | ||||
| 
 | ||||
| 		let chain = BlockChain::new(Default::default(), params.genesis, raw_db.clone()); | ||||
| 		let blocks = BlockRebuilder::new(chain, raw_db.clone(), &manifest)?; | ||||
| 		let components = params.engine.snapshot_components() | ||||
| 			.ok_or_else(|| ::snapshot::Error::SnapshotsUnsupported)?; | ||||
| 
 | ||||
| 		let secondary = components.rebuilder(chain, raw_db.clone(), &manifest)?; | ||||
| 
 | ||||
| 		let root = manifest.state_root.clone(); | ||||
| 		Ok(Restoration { | ||||
| @ -108,12 +111,11 @@ impl Restoration { | ||||
| 			state_chunks_left: state_chunks, | ||||
| 			block_chunks_left: block_chunks, | ||||
| 			state: StateRebuilder::new(raw_db.clone(), params.pruning), | ||||
| 			blocks: blocks, | ||||
| 			secondary: secondary, | ||||
| 			writer: params.writer, | ||||
| 			snappy_buffer: Vec::new(), | ||||
| 			final_state_root: root, | ||||
| 			guard: params.guard, | ||||
| 			canonical_hashes: HashMap::new(), | ||||
| 			db: raw_db, | ||||
| 		}) | ||||
| 	} | ||||
| @ -138,7 +140,7 @@ impl Restoration { | ||||
| 		if self.block_chunks_left.remove(&hash) { | ||||
| 			let len = snappy::decompress_into(chunk, &mut self.snappy_buffer)?; | ||||
| 
 | ||||
| 			self.blocks.feed(&self.snappy_buffer[..len], engine, flag)?; | ||||
| 			self.secondary.feed(&self.snappy_buffer[..len], engine, flag)?; | ||||
| 			if let Some(ref mut writer) = self.writer.as_mut() { | ||||
| 				 writer.write_block_chunk(hash, chunk)?; | ||||
| 			} | ||||
| @ -147,13 +149,8 @@ impl Restoration { | ||||
| 		Ok(()) | ||||
| 	} | ||||
| 
 | ||||
| 	// note canonical hashes.
 | ||||
| 	fn note_canonical(&mut self, hashes: &[(u64, H256)]) { | ||||
| 		self.canonical_hashes.extend(hashes.iter().cloned()); | ||||
| 	} | ||||
| 
 | ||||
| 	// finish up restoration.
 | ||||
| 	fn finalize(self) -> Result<(), Error> { | ||||
| 	fn finalize(mut self) -> Result<(), Error> { | ||||
| 		use util::trie::TrieError; | ||||
| 
 | ||||
| 		if !self.is_done() { return Ok(()) } | ||||
| @ -169,7 +166,7 @@ impl Restoration { | ||||
| 		self.state.finalize(self.manifest.block_number, self.manifest.block_hash)?; | ||||
| 
 | ||||
| 		// connect out-of-order chunks and verify chain integrity.
 | ||||
| 		self.blocks.finalize(self.canonical_hashes)?; | ||||
| 		self.secondary.finalize()?; | ||||
| 
 | ||||
| 		if let Some(writer) = self.writer { | ||||
| 			writer.finish(self.manifest)?; | ||||
| @ -425,6 +422,7 @@ impl Service { | ||||
| 			writer: writer, | ||||
| 			genesis: &self.genesis_block, | ||||
| 			guard: Guard::new(rest_dir), | ||||
| 			engine: &*self.engine, | ||||
| 		}; | ||||
| 
 | ||||
| 		let state_chunks = params.manifest.state_hashes.len(); | ||||
| @ -593,14 +591,6 @@ impl SnapshotService for Service { | ||||
| 			trace!("Error sending snapshot service message: {:?}", e); | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	fn provide_canon_hashes(&self, canonical: &[(u64, H256)]) { | ||||
| 		let mut rest = self.restoration.lock(); | ||||
| 
 | ||||
| 		if let Some(ref mut rest) = rest.as_mut() { | ||||
| 			rest.note_canonical(canonical); | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| impl Drop for Service { | ||||
|  | ||||
| @ -48,10 +48,6 @@ pub trait SnapshotService : Sync + Send { | ||||
| 	/// Feed a raw block chunk to the service to be processed asynchronously.
 | ||||
| 	/// no-op if currently restoring.
 | ||||
| 	fn restore_block_chunk(&self, hash: H256, chunk: Bytes); | ||||
| 
 | ||||
| 	/// Give the restoration in-progress some canonical block hashes for
 | ||||
| 	/// extra verification (performed at the end)
 | ||||
| 	fn provide_canon_hashes(&self, canonical: &[(u64, H256)]); | ||||
| } | ||||
| 
 | ||||
| impl IpcConfig for SnapshotService { } | ||||
|  | ||||
| @ -21,13 +21,12 @@ use error::Error; | ||||
| 
 | ||||
| use blockchain::generator::{ChainGenerator, ChainIterator, BlockFinalizer}; | ||||
| use blockchain::BlockChain; | ||||
| use snapshot::{chunk_blocks, BlockRebuilder, Error as SnapshotError, Progress}; | ||||
| use snapshot::{chunk_secondary, Error as SnapshotError, Progress, SnapshotComponents}; | ||||
| use snapshot::io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter}; | ||||
| 
 | ||||
| use util::{Mutex, snappy}; | ||||
| use util::kvdb::{Database, DatabaseConfig}; | ||||
| use util::kvdb::{self, KeyValueDB, DBTransaction}; | ||||
| 
 | ||||
| use std::collections::HashMap; | ||||
| use std::sync::Arc; | ||||
| use std::sync::atomic::AtomicBool; | ||||
| 
 | ||||
| @ -35,19 +34,18 @@ fn chunk_and_restore(amount: u64) { | ||||
| 	let mut canon_chain = ChainGenerator::default(); | ||||
| 	let mut finalizer = BlockFinalizer::default(); | ||||
| 	let genesis = canon_chain.generate(&mut finalizer).unwrap(); | ||||
| 	let db_cfg = DatabaseConfig::with_columns(::db::NUM_COLUMNS); | ||||
| 	let components = ::snapshot::PowSnapshot; | ||||
| 
 | ||||
| 	let engine = Arc::new(::engines::NullEngine::default()); | ||||
| 	let orig_path = RandomTempPath::create_dir(); | ||||
| 	let new_path = RandomTempPath::create_dir(); | ||||
| 	let mut snapshot_path = new_path.as_path().to_owned(); | ||||
| 	snapshot_path.push("SNAP"); | ||||
| 
 | ||||
| 	let old_db = Arc::new(Database::open(&db_cfg, orig_path.as_str()).unwrap()); | ||||
| 	let old_db = Arc::new(kvdb::in_memory(::db::NUM_COLUMNS.unwrap_or(0))); | ||||
| 	let bc = BlockChain::new(Default::default(), &genesis, old_db.clone()); | ||||
| 
 | ||||
| 	// build the blockchain.
 | ||||
| 	let mut batch = old_db.transaction(); | ||||
| 	let mut batch = DBTransaction::new(); | ||||
| 	for _ in 0..amount { | ||||
| 		let block = canon_chain.generate(&mut finalizer).unwrap(); | ||||
| 		bc.insert_block(&mut batch, &block, vec![]); | ||||
| @ -56,12 +54,18 @@ fn chunk_and_restore(amount: u64) { | ||||
| 
 | ||||
| 	old_db.write(batch).unwrap(); | ||||
| 
 | ||||
| 
 | ||||
| 	let best_hash = bc.best_block_hash(); | ||||
| 
 | ||||
| 	// snapshot it.
 | ||||
| 	let writer = Mutex::new(PackedWriter::new(&snapshot_path).unwrap()); | ||||
| 	let block_hashes = chunk_blocks(&bc, best_hash, &writer, &Progress::default()).unwrap(); | ||||
| 	let block_hashes = chunk_secondary( | ||||
| 		Box::new(::snapshot::PowSnapshot), | ||||
| 		&bc, | ||||
| 		best_hash, | ||||
| 		&writer, | ||||
| 		&Progress::default() | ||||
| 	).unwrap(); | ||||
| 
 | ||||
| 	let manifest = ::snapshot::ManifestData { | ||||
| 		version: 2, | ||||
| 		state_hashes: Vec::new(), | ||||
| @ -74,9 +78,10 @@ fn chunk_and_restore(amount: u64) { | ||||
| 	writer.into_inner().finish(manifest.clone()).unwrap(); | ||||
| 
 | ||||
| 	// restore it.
 | ||||
| 	let new_db = Arc::new(Database::open(&db_cfg, new_path.as_str()).unwrap()); | ||||
| 	let new_db = Arc::new(kvdb::in_memory(::db::NUM_COLUMNS.unwrap_or(0))); | ||||
| 	let new_chain = BlockChain::new(Default::default(), &genesis, new_db.clone()); | ||||
| 	let mut rebuilder = BlockRebuilder::new(new_chain, new_db.clone(), &manifest).unwrap(); | ||||
| 	let mut rebuilder = components.rebuilder(new_chain, new_db.clone(), &manifest).unwrap(); | ||||
| 
 | ||||
| 	let reader = PackedReader::new(&snapshot_path).unwrap().unwrap(); | ||||
| 	let flag = AtomicBool::new(true); | ||||
| 	for chunk_hash in &reader.manifest().block_hashes { | ||||
| @ -85,7 +90,8 @@ fn chunk_and_restore(amount: u64) { | ||||
| 		rebuilder.feed(&chunk, engine.as_ref(), &flag).unwrap(); | ||||
| 	} | ||||
| 
 | ||||
| 	rebuilder.finalize(HashMap::new()).unwrap(); | ||||
| 	rebuilder.finalize().unwrap(); | ||||
| 	drop(rebuilder); | ||||
| 
 | ||||
| 	// and test it.
 | ||||
| 	let new_chain = BlockChain::new(Default::default(), &genesis, new_db); | ||||
| @ -118,10 +124,8 @@ fn checks_flag() { | ||||
| 	}; | ||||
| 
 | ||||
| 	let chunk = stream.out(); | ||||
| 	let path = RandomTempPath::create_dir(); | ||||
| 
 | ||||
| 	let db_cfg = DatabaseConfig::with_columns(::db::NUM_COLUMNS); | ||||
| 	let db = Arc::new(Database::open(&db_cfg, path.as_str()).unwrap()); | ||||
| 	let db = Arc::new(kvdb::in_memory(::db::NUM_COLUMNS.unwrap_or(0))); | ||||
| 	let engine = Arc::new(::engines::NullEngine::default()); | ||||
| 	let chain = BlockChain::new(Default::default(), &genesis, db.clone()); | ||||
| 
 | ||||
| @ -134,7 +138,7 @@ fn checks_flag() { | ||||
| 		block_hash: H256::default(), | ||||
| 	}; | ||||
| 
 | ||||
| 	let mut rebuilder = BlockRebuilder::new(chain, db.clone(), &manifest).unwrap(); | ||||
| 	let mut rebuilder = ::snapshot::PowSnapshot.rebuilder(chain, db.clone(), &manifest).unwrap(); | ||||
| 
 | ||||
| 	match rebuilder.feed(&chunk, engine.as_ref(), &AtomicBool::new(false)) { | ||||
| 		Err(Error::Snapshot(SnapshotError::RestorationAborted)) => {} | ||||
|  | ||||
| @ -47,5 +47,4 @@ impl SnapshotService for TestSnapshotService { | ||||
| 	fn abort_restore(&self) { } | ||||
| 	fn restore_state_chunk(&self, _hash: H256, _chunk: Bytes) { } | ||||
| 	fn restore_block_chunk(&self, _hash: H256, _chunk: Bytes) { } | ||||
| 	fn provide_canon_hashes(&self, _hashes: &[(u64, H256)]) { } | ||||
| } | ||||
| @ -24,7 +24,6 @@ use SyncConfig; | ||||
| pub struct TestSnapshotService { | ||||
| 	manifest: Option<ManifestData>, | ||||
| 	chunks: HashMap<H256, Bytes>, | ||||
| 	canon_hashes: Mutex<HashMap<u64, H256>>, | ||||
| 
 | ||||
| 	restoration_manifest: Mutex<Option<ManifestData>>, | ||||
| 	state_restoration_chunks: Mutex<HashMap<H256, Bytes>>, | ||||
| @ -36,7 +35,6 @@ impl TestSnapshotService { | ||||
| 		TestSnapshotService { | ||||
| 			manifest: None, | ||||
| 			chunks: HashMap::new(), | ||||
| 			canon_hashes: Mutex::new(HashMap::new()), | ||||
| 			restoration_manifest: Mutex::new(None), | ||||
| 			state_restoration_chunks: Mutex::new(HashMap::new()), | ||||
| 			block_restoration_chunks: Mutex::new(HashMap::new()), | ||||
| @ -61,7 +59,6 @@ impl TestSnapshotService { | ||||
| 		TestSnapshotService { | ||||
| 			manifest: Some(manifest), | ||||
| 			chunks: chunks, | ||||
| 			canon_hashes: Mutex::new(HashMap::new()), | ||||
| 			restoration_manifest: Mutex::new(None), | ||||
| 			state_restoration_chunks: Mutex::new(HashMap::new()), | ||||
| 			block_restoration_chunks: Mutex::new(HashMap::new()), | ||||
| @ -115,10 +112,6 @@ impl SnapshotService for TestSnapshotService { | ||||
| 			self.block_restoration_chunks.lock().insert(hash, chunk); | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	fn provide_canon_hashes(&self, hashes: &[(u64, H256)]) { | ||||
| 		self.canon_hashes.lock().extend(hashes.iter().cloned()); | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| #[test] | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user