Even more snapshot validity checks (#2935)
* clarify "cancelled periodic snapshot" message * more rigorous checks for snapshot validity * verify ancient blocks on import * limit number of fed blocks * make it possible to feed snapshot service canonical hashes * fix failing test build * swap ethash DAG only when more recent
This commit is contained in:
		
							parent
							
								
									29ab4ecac1
								
							
						
					
					
						commit
						2806f1d4c9
					
				| @ -69,14 +69,19 @@ impl EthashManager { | |||||||
| 				Some(ref e) if *e == epoch => lights.recent.clone(), | 				Some(ref e) if *e == epoch => lights.recent.clone(), | ||||||
| 				_ => match lights.prev_epoch.clone() { | 				_ => match lights.prev_epoch.clone() { | ||||||
| 					Some(e) if e == epoch => { | 					Some(e) if e == epoch => { | ||||||
| 						// swap
 | 						// don't swap if recent is newer.
 | ||||||
| 						let t = lights.prev_epoch; | 						if lights.recent_epoch > lights.prev_epoch { | ||||||
| 						lights.prev_epoch = lights.recent_epoch; | 							None | ||||||
| 						lights.recent_epoch = t; | 						} else { | ||||||
| 						let t = lights.prev.clone(); | 							// swap
 | ||||||
| 						lights.prev = lights.recent.clone(); | 							let t = lights.prev_epoch; | ||||||
| 						lights.recent = t; | 							lights.prev_epoch = lights.recent_epoch; | ||||||
| 						lights.recent.clone() | 							lights.recent_epoch = t; | ||||||
|  | 							let t = lights.prev.clone(); | ||||||
|  | 							lights.prev = lights.recent.clone(); | ||||||
|  | 							lights.recent = t; | ||||||
|  | 							lights.recent.clone() | ||||||
|  | 						} | ||||||
| 					} | 					} | ||||||
| 					_ => None, | 					_ => None, | ||||||
| 				}, | 				}, | ||||||
|  | |||||||
| @ -792,11 +792,10 @@ impl BlockChain { | |||||||
| 	/// the chain and the child's parent is this block.
 | 	/// the chain and the child's parent is this block.
 | ||||||
| 	///
 | 	///
 | ||||||
| 	/// Used in snapshots to glue the chunks together at the end.
 | 	/// Used in snapshots to glue the chunks together at the end.
 | ||||||
| 	pub fn add_child(&self, block_hash: H256, child_hash: H256) { | 	pub fn add_child(&self, batch: &mut DBTransaction, block_hash: H256, child_hash: H256) { | ||||||
| 		let mut parent_details = self.block_details(&block_hash) | 		let mut parent_details = self.block_details(&block_hash) | ||||||
| 			.unwrap_or_else(|| panic!("Invalid block hash: {:?}", block_hash)); | 			.unwrap_or_else(|| panic!("Invalid block hash: {:?}", block_hash)); | ||||||
| 
 | 
 | ||||||
| 		let mut batch = self.db.transaction(); |  | ||||||
| 		parent_details.children.push(child_hash); | 		parent_details.children.push(child_hash); | ||||||
| 
 | 
 | ||||||
| 		let mut update = HashMap::new(); | 		let mut update = HashMap::new(); | ||||||
| @ -807,8 +806,6 @@ impl BlockChain { | |||||||
| 		batch.extend_with_cache(db::COL_EXTRA, &mut *write_details, update, CacheUpdatePolicy::Overwrite); | 		batch.extend_with_cache(db::COL_EXTRA, &mut *write_details, update, CacheUpdatePolicy::Overwrite); | ||||||
| 
 | 
 | ||||||
| 		self.cache_man.lock().note_used(CacheID::BlockDetails(block_hash)); | 		self.cache_man.lock().note_used(CacheID::BlockDetails(block_hash)); | ||||||
| 
 |  | ||||||
| 		self.db.write(batch).unwrap(); |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	#[cfg_attr(feature="dev", allow(similar_names))] | 	#[cfg_attr(feature="dev", allow(similar_names))] | ||||||
|  | |||||||
| @ -66,6 +66,7 @@ use snapshot::{self, io as snapshot_io}; | |||||||
| use factory::Factories; | use factory::Factories; | ||||||
| use rlp::{View, UntrustedRlp}; | use rlp::{View, UntrustedRlp}; | ||||||
| use state_db::StateDB; | use state_db::StateDB; | ||||||
|  | use rand::OsRng; | ||||||
| 
 | 
 | ||||||
| // re-export
 | // re-export
 | ||||||
| pub use types::blockchain_info::BlockChainInfo; | pub use types::blockchain_info::BlockChainInfo; | ||||||
| @ -144,6 +145,7 @@ pub struct Client { | |||||||
| 	last_hashes: RwLock<VecDeque<H256>>, | 	last_hashes: RwLock<VecDeque<H256>>, | ||||||
| 	factories: Factories, | 	factories: Factories, | ||||||
| 	history: u64, | 	history: u64, | ||||||
|  | 	rng: Mutex<OsRng>, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl Client { | impl Client { | ||||||
| @ -239,6 +241,7 @@ impl Client { | |||||||
| 			last_hashes: RwLock::new(VecDeque::new()), | 			last_hashes: RwLock::new(VecDeque::new()), | ||||||
| 			factories: factories, | 			factories: factories, | ||||||
| 			history: history, | 			history: history, | ||||||
|  | 			rng: Mutex::new(try!(OsRng::new().map_err(::util::UtilError::StdIo))), | ||||||
| 		}; | 		}; | ||||||
| 		Ok(Arc::new(client)) | 		Ok(Arc::new(client)) | ||||||
| 	} | 	} | ||||||
| @ -434,14 +437,26 @@ impl Client { | |||||||
| 	/// Import a block with transaction receipts.
 | 	/// Import a block with transaction receipts.
 | ||||||
| 	/// The block is guaranteed to be the next best blocks in the first block sequence.
 | 	/// The block is guaranteed to be the next best blocks in the first block sequence.
 | ||||||
| 	/// Does no sealing or transaction validation.
 | 	/// Does no sealing or transaction validation.
 | ||||||
| 	fn import_old_block(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> H256 { | 	fn import_old_block(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> Result<H256, ::error::Error> { | ||||||
| 		let block = BlockView::new(&block_bytes); | 		let block = BlockView::new(&block_bytes); | ||||||
| 		let hash = block.header().hash(); | 		let header = block.header(); | ||||||
|  | 		let hash = header.hash(); | ||||||
| 		let _import_lock = self.import_lock.lock(); | 		let _import_lock = self.import_lock.lock(); | ||||||
| 		{ | 		{ | ||||||
| 			let _timer = PerfTimer::new("import_old_block"); | 			let _timer = PerfTimer::new("import_old_block"); | ||||||
|  | 			let mut rng = self.rng.lock(); | ||||||
| 			let chain = self.chain.read(); | 			let chain = self.chain.read(); | ||||||
| 
 | 
 | ||||||
|  | 			// verify block.
 | ||||||
|  | 			try!(::snapshot::verify_old_block( | ||||||
|  | 				&mut *rng, | ||||||
|  | 				&header, | ||||||
|  | 				&*self.engine, | ||||||
|  | 				&*chain, | ||||||
|  | 				Some(&block_bytes), | ||||||
|  | 				false, | ||||||
|  | 			)); | ||||||
|  | 
 | ||||||
| 			// Commit results
 | 			// Commit results
 | ||||||
| 			let receipts = ::rlp::decode(&receipts_bytes); | 			let receipts = ::rlp::decode(&receipts_bytes); | ||||||
| 			let mut batch = DBTransaction::new(&self.db.read()); | 			let mut batch = DBTransaction::new(&self.db.read()); | ||||||
| @ -451,7 +466,7 @@ impl Client { | |||||||
| 			chain.commit(); | 			chain.commit(); | ||||||
| 		} | 		} | ||||||
| 		self.db.read().flush().expect("DB flush failed."); | 		self.db.read().flush().expect("DB flush failed."); | ||||||
| 		hash | 		Ok(hash) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	fn commit_block<B>(&self, block: B, hash: &H256, block_data: &[u8]) -> ImportRoute where B: IsBlock + Drain { | 	fn commit_block<B>(&self, block: B, hash: &H256, block_data: &[u8]) -> ImportRoute where B: IsBlock + Drain { | ||||||
| @ -1036,7 +1051,7 @@ impl BlockChainClient for Client { | |||||||
| 				return Err(BlockImportError::Block(BlockError::UnknownParent(header.parent_hash()))); | 				return Err(BlockImportError::Block(BlockError::UnknownParent(header.parent_hash()))); | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 		Ok(self.import_old_block(block_bytes, receipts_bytes)) | 		self.import_old_block(block_bytes, receipts_bytes).map_err(Into::into) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	fn queue_info(&self) -> BlockQueueInfo { | 	fn queue_info(&self) -> BlockQueueInfo { | ||||||
|  | |||||||
| @ -33,6 +33,12 @@ pub enum Error { | |||||||
| 	BlockNotFound(H256), | 	BlockNotFound(H256), | ||||||
| 	/// Incomplete chain.
 | 	/// Incomplete chain.
 | ||||||
| 	IncompleteChain, | 	IncompleteChain, | ||||||
|  | 	/// Best block has wrong state root.
 | ||||||
|  | 	WrongStateRoot(H256, H256), | ||||||
|  | 	/// Wrong block hash.
 | ||||||
|  | 	WrongBlockHash(u64, H256, H256), | ||||||
|  | 	/// Too many blocks contained within the snapshot.
 | ||||||
|  | 	TooManyBlocks(u64, u64), | ||||||
| 	/// Old starting block in a pruned database.
 | 	/// Old starting block in a pruned database.
 | ||||||
| 	OldBlockPrunedDB, | 	OldBlockPrunedDB, | ||||||
| 	/// Missing code.
 | 	/// Missing code.
 | ||||||
| @ -52,7 +58,11 @@ impl fmt::Display for Error { | |||||||
| 		match *self { | 		match *self { | ||||||
| 			Error::InvalidStartingBlock(ref id) => write!(f, "Invalid starting block: {:?}", id), | 			Error::InvalidStartingBlock(ref id) => write!(f, "Invalid starting block: {:?}", id), | ||||||
| 			Error::BlockNotFound(ref hash) => write!(f, "Block not found in chain: {}", hash), | 			Error::BlockNotFound(ref hash) => write!(f, "Block not found in chain: {}", hash), | ||||||
| 			Error::IncompleteChain => write!(f, "Cannot create snapshot due to incomplete chain."), | 			Error::IncompleteChain => write!(f, "Incomplete blockchain."), | ||||||
|  | 			Error::WrongStateRoot(ref expected, ref found) => write!(f, "Final block has wrong state root. Expected {:?}, got {:?}", expected, found), | ||||||
|  | 			Error::WrongBlockHash(ref num, ref expected, ref found) => | ||||||
|  | 				write!(f, "Block {} had wrong hash. expected {:?}, got {:?}", num, expected, found), | ||||||
|  | 			Error::TooManyBlocks(ref expected, ref found) => write!(f, "Snapshot contained too many blocks. Expected {}, got {}", expected, found), | ||||||
| 			Error::OldBlockPrunedDB => write!(f, "Attempted to create a snapshot at an old block while using \ | 			Error::OldBlockPrunedDB => write!(f, "Attempted to create a snapshot at an old block while using \ | ||||||
| 				a pruned database. Please re-run with the --pruning archive flag."),
 | 				a pruned database. Please re-run with the --pruning archive flag."),
 | ||||||
| 			Error::MissingCode(ref missing) => write!(f, "Incomplete snapshot: {} contract codes not found.", missing.len()), | 			Error::MissingCode(ref missing) => write!(f, "Incomplete snapshot: {} contract codes not found.", missing.len()), | ||||||
|  | |||||||
| @ -26,6 +26,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; | |||||||
| use account_db::{AccountDB, AccountDBMut}; | use account_db::{AccountDB, AccountDBMut}; | ||||||
| use blockchain::{BlockChain, BlockProvider}; | use blockchain::{BlockChain, BlockProvider}; | ||||||
| use engines::Engine; | use engines::Engine; | ||||||
|  | use header::Header; | ||||||
| use ids::BlockID; | use ids::BlockID; | ||||||
| use views::BlockView; | use views::BlockView; | ||||||
| 
 | 
 | ||||||
| @ -528,6 +529,20 @@ fn rebuild_accounts( | |||||||
| /// Proportion of blocks which we will verify `PoW` for.
 | /// Proportion of blocks which we will verify `PoW` for.
 | ||||||
| const POW_VERIFY_RATE: f32 = 0.02; | const POW_VERIFY_RATE: f32 = 0.02; | ||||||
| 
 | 
 | ||||||
|  | /// Verify an old block with the given header, engine, blockchain, body. If `always` is set, it will perform
 | ||||||
|  | /// the fullest verification possible. If not, it will take a random sample to determine whether it will
 | ||||||
|  | /// do heavy or light verification.
 | ||||||
|  | pub fn verify_old_block(rng: &mut OsRng, header: &Header, engine: &Engine, chain: &BlockChain, body: Option<&[u8]>, always: bool) -> Result<(), ::error::Error> { | ||||||
|  | 	if always || rng.gen::<f32>() <= POW_VERIFY_RATE { | ||||||
|  | 		match chain.block_header(header.parent_hash()) { | ||||||
|  | 			Some(parent) => engine.verify_block_family(&header, &parent, body), | ||||||
|  | 			None => engine.verify_block_seal(&header), | ||||||
|  | 		} | ||||||
|  | 	} else { | ||||||
|  | 		engine.verify_block_basic(&header, body) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
| /// Rebuilds the blockchain from chunks.
 | /// Rebuilds the blockchain from chunks.
 | ||||||
| ///
 | ///
 | ||||||
| /// Does basic verification for all blocks, but `PoW` verification for some.
 | /// Does basic verification for all blocks, but `PoW` verification for some.
 | ||||||
| @ -543,17 +558,23 @@ pub struct BlockRebuilder { | |||||||
| 	rng: OsRng, | 	rng: OsRng, | ||||||
| 	disconnected: Vec<(u64, H256)>, | 	disconnected: Vec<(u64, H256)>, | ||||||
| 	best_number: u64, | 	best_number: u64, | ||||||
|  | 	best_hash: H256, | ||||||
|  | 	best_root: H256, | ||||||
|  | 	fed_blocks: u64, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl BlockRebuilder { | impl BlockRebuilder { | ||||||
| 	/// Create a new BlockRebuilder.
 | 	/// Create a new BlockRebuilder.
 | ||||||
| 	pub fn new(chain: BlockChain, db: Arc<Database>, best_number: u64) -> Result<Self, ::error::Error> { | 	pub fn new(chain: BlockChain, db: Arc<Database>, manifest: &ManifestData) -> Result<Self, ::error::Error> { | ||||||
| 		Ok(BlockRebuilder { | 		Ok(BlockRebuilder { | ||||||
| 			chain: chain, | 			chain: chain, | ||||||
| 			db: db, | 			db: db, | ||||||
| 			rng: try!(OsRng::new()), | 			rng: try!(OsRng::new()), | ||||||
| 			disconnected: Vec::new(), | 			disconnected: Vec::new(), | ||||||
| 			best_number: best_number, | 			best_number: manifest.block_number, | ||||||
|  | 			best_hash: manifest.block_hash, | ||||||
|  | 			best_root: manifest.state_root, | ||||||
|  | 			fed_blocks: 0, | ||||||
| 		}) | 		}) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| @ -566,9 +587,14 @@ impl BlockRebuilder { | |||||||
| 
 | 
 | ||||||
| 		let rlp = UntrustedRlp::new(chunk); | 		let rlp = UntrustedRlp::new(chunk); | ||||||
| 		let item_count = rlp.item_count(); | 		let item_count = rlp.item_count(); | ||||||
|  | 		let num_blocks = (item_count - 3) as u64; | ||||||
| 
 | 
 | ||||||
| 		trace!(target: "snapshot", "restoring block chunk with {} blocks.", item_count - 3); | 		trace!(target: "snapshot", "restoring block chunk with {} blocks.", item_count - 3); | ||||||
| 
 | 
 | ||||||
|  | 		if self.fed_blocks + num_blocks > SNAPSHOT_BLOCKS { | ||||||
|  | 			return Err(Error::TooManyBlocks(SNAPSHOT_BLOCKS, self.fed_blocks).into()) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
| 		// todo: assert here that these values are consistent with chunks being in order.
 | 		// todo: assert here that these values are consistent with chunks being in order.
 | ||||||
| 		let mut cur_number = try!(rlp.val_at::<u64>(0)) + 1; | 		let mut cur_number = try!(rlp.val_at::<u64>(0)) + 1; | ||||||
| 		let mut parent_hash = try!(rlp.val_at::<H256>(1)); | 		let mut parent_hash = try!(rlp.val_at::<H256>(1)); | ||||||
| @ -585,14 +611,27 @@ impl BlockRebuilder { | |||||||
| 
 | 
 | ||||||
| 			let block = try!(abridged_block.to_block(parent_hash, cur_number, receipts_root)); | 			let block = try!(abridged_block.to_block(parent_hash, cur_number, receipts_root)); | ||||||
| 			let block_bytes = block.rlp_bytes(With); | 			let block_bytes = block.rlp_bytes(With); | ||||||
|  | 			let is_best = cur_number == self.best_number; | ||||||
| 
 | 
 | ||||||
| 			if self.rng.gen::<f32>() <= POW_VERIFY_RATE { | 			if is_best { | ||||||
| 				try!(engine.verify_block_seal(&block.header)) | 				if block.header.hash() != self.best_hash { | ||||||
| 			} else { | 					return Err(Error::WrongBlockHash(cur_number, self.best_hash, block.header.hash()).into()) | ||||||
| 				try!(engine.verify_block_basic(&block.header, Some(&block_bytes))); | 				} | ||||||
|  | 
 | ||||||
|  | 				if block.header.state_root() != &self.best_root { | ||||||
|  | 					return Err(Error::WrongStateRoot(self.best_root, *block.header.state_root()).into()) | ||||||
|  | 				} | ||||||
| 			} | 			} | ||||||
| 
 | 
 | ||||||
| 			let is_best = cur_number == self.best_number; | 			try!(verify_old_block( | ||||||
|  | 				&mut self.rng, | ||||||
|  | 				&block.header, | ||||||
|  | 				engine, | ||||||
|  | 				&self.chain, | ||||||
|  | 				Some(&block_bytes), | ||||||
|  | 				is_best | ||||||
|  | 			)); | ||||||
|  | 
 | ||||||
| 			let mut batch = self.db.transaction(); | 			let mut batch = self.db.transaction(); | ||||||
| 
 | 
 | ||||||
| 			// special-case the first block in each chunk.
 | 			// special-case the first block in each chunk.
 | ||||||
| @ -610,11 +649,15 @@ impl BlockRebuilder { | |||||||
| 			cur_number += 1; | 			cur_number += 1; | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		Ok(item_count as u64 - 3) | 		self.fed_blocks += num_blocks; | ||||||
|  | 
 | ||||||
|  | 		Ok(num_blocks) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/// Glue together any disconnected chunks. To be called at the end.
 | 	/// Glue together any disconnected chunks and check that the chain is complete.
 | ||||||
| 	pub fn glue_chunks(self) { | 	pub fn finalize(self, canonical: HashMap<u64, H256>) -> Result<(), Error> { | ||||||
|  | 		let mut batch = self.db.transaction(); | ||||||
|  | 
 | ||||||
| 		for (first_num, first_hash) in self.disconnected { | 		for (first_num, first_hash) in self.disconnected { | ||||||
| 			let parent_num = first_num - 1; | 			let parent_num = first_num - 1; | ||||||
| 
 | 
 | ||||||
| @ -623,8 +666,23 @@ impl BlockRebuilder { | |||||||
| 			// the first block of the first chunks has nothing to connect to.
 | 			// the first block of the first chunks has nothing to connect to.
 | ||||||
| 			if let Some(parent_hash) = self.chain.block_hash(parent_num) { | 			if let Some(parent_hash) = self.chain.block_hash(parent_num) { | ||||||
| 				// if so, add the child to it.
 | 				// if so, add the child to it.
 | ||||||
| 				self.chain.add_child(parent_hash, first_hash); | 				self.chain.add_child(&mut batch, parent_hash, first_hash); | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  | 		self.db.write_buffered(batch); | ||||||
|  | 
 | ||||||
|  | 		let best_number = self.best_number; | ||||||
|  | 		for num in (0..self.fed_blocks).map(|x| best_number - x) { | ||||||
|  | 
 | ||||||
|  | 			let hash = try!(self.chain.block_hash(num).ok_or(Error::IncompleteChain)); | ||||||
|  | 
 | ||||||
|  | 			if let Some(canon_hash) = canonical.get(&num).cloned() { | ||||||
|  | 				if canon_hash != hash { | ||||||
|  | 					return Err(Error::WrongBlockHash(num, canon_hash, hash)); | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		Ok(()) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | |||||||
| @ -16,7 +16,7 @@ | |||||||
| 
 | 
 | ||||||
| //! Snapshot network service implementation.
 | //! Snapshot network service implementation.
 | ||||||
| 
 | 
 | ||||||
| use std::collections::HashSet; | use std::collections::{HashMap, HashSet}; | ||||||
| use std::io::ErrorKind; | use std::io::ErrorKind; | ||||||
| use std::fs; | use std::fs; | ||||||
| use std::path::PathBuf; | use std::path::PathBuf; | ||||||
| @ -74,6 +74,7 @@ struct Restoration { | |||||||
| 	snappy_buffer: Bytes, | 	snappy_buffer: Bytes, | ||||||
| 	final_state_root: H256, | 	final_state_root: H256, | ||||||
| 	guard: Guard, | 	guard: Guard, | ||||||
|  | 	canonical_hashes: HashMap<u64, H256>, | ||||||
| 	db: Arc<Database>, | 	db: Arc<Database>, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -99,7 +100,7 @@ impl Restoration { | |||||||
| 			.map_err(UtilError::SimpleString))); | 			.map_err(UtilError::SimpleString))); | ||||||
| 
 | 
 | ||||||
| 		let chain = BlockChain::new(Default::default(), params.genesis, raw_db.clone()); | 		let chain = BlockChain::new(Default::default(), params.genesis, raw_db.clone()); | ||||||
| 		let blocks = try!(BlockRebuilder::new(chain, raw_db.clone(), manifest.block_number)); | 		let blocks = try!(BlockRebuilder::new(chain, raw_db.clone(), &manifest)); | ||||||
| 
 | 
 | ||||||
| 		let root = manifest.state_root.clone(); | 		let root = manifest.state_root.clone(); | ||||||
| 		Ok(Restoration { | 		Ok(Restoration { | ||||||
| @ -112,6 +113,7 @@ impl Restoration { | |||||||
| 			snappy_buffer: Vec::new(), | 			snappy_buffer: Vec::new(), | ||||||
| 			final_state_root: root, | 			final_state_root: root, | ||||||
| 			guard: params.guard, | 			guard: params.guard, | ||||||
|  | 			canonical_hashes: HashMap::new(), | ||||||
| 			db: raw_db, | 			db: raw_db, | ||||||
| 		}) | 		}) | ||||||
| 	} | 	} | ||||||
| @ -138,13 +140,18 @@ impl Restoration { | |||||||
| 
 | 
 | ||||||
| 			try!(self.blocks.feed(&self.snappy_buffer[..len], engine)); | 			try!(self.blocks.feed(&self.snappy_buffer[..len], engine)); | ||||||
| 			if let Some(ref mut writer) = self.writer.as_mut() { | 			if let Some(ref mut writer) = self.writer.as_mut() { | ||||||
| 				try!(writer.write_block_chunk(hash, chunk)); | 				 try!(writer.write_block_chunk(hash, chunk)); | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		Ok(()) | 		Ok(()) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	// note canonical hashes.
 | ||||||
|  | 	fn note_canonical(&mut self, hashes: &[(u64, H256)]) { | ||||||
|  | 		self.canonical_hashes.extend(hashes.iter().cloned()); | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	// finish up restoration.
 | 	// finish up restoration.
 | ||||||
| 	fn finalize(self) -> Result<(), Error> { | 	fn finalize(self) -> Result<(), Error> { | ||||||
| 		use util::trie::TrieError; | 		use util::trie::TrieError; | ||||||
| @ -161,8 +168,8 @@ impl Restoration { | |||||||
| 		// check for missing code.
 | 		// check for missing code.
 | ||||||
| 		try!(self.state.check_missing()); | 		try!(self.state.check_missing()); | ||||||
| 
 | 
 | ||||||
| 		// connect out-of-order chunks.
 | 		// connect out-of-order chunks and verify chain integrity.
 | ||||||
| 		self.blocks.glue_chunks(); | 		try!(self.blocks.finalize(self.canonical_hashes)); | ||||||
| 
 | 
 | ||||||
| 		if let Some(writer) = self.writer { | 		if let Some(writer) = self.writer { | ||||||
| 			try!(writer.finish(self.manifest)); | 			try!(writer.finish(self.manifest)); | ||||||
| @ -352,7 +359,8 @@ impl Service { | |||||||
| 				// "Cancelled" is mincing words a bit -- what really happened
 | 				// "Cancelled" is mincing words a bit -- what really happened
 | ||||||
| 				// is that the state we were snapshotting got pruned out
 | 				// is that the state we were snapshotting got pruned out
 | ||||||
| 				// before we could finish.
 | 				// before we could finish.
 | ||||||
| 				info!("Cancelled prematurely-started periodic snapshot."); | 				info!("Periodic snapshot failed: block state pruned.\ | ||||||
|  | 					Run with a longer `--pruning-history` or with `--no-periodic-snapshot`");
 | ||||||
| 				return Ok(()) | 				return Ok(()) | ||||||
| 			} else { | 			} else { | ||||||
| 				return Err(e); | 				return Err(e); | ||||||
| @ -580,6 +588,14 @@ impl SnapshotService for Service { | |||||||
| 			trace!("Error sending snapshot service message: {:?}", e); | 			trace!("Error sending snapshot service message: {:?}", e); | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	fn provide_canon_hashes(&self, canonical: &[(u64, H256)]) { | ||||||
|  | 		let mut rest = self.restoration.lock(); | ||||||
|  | 
 | ||||||
|  | 		if let Some(ref mut rest) = rest.as_mut() { | ||||||
|  | 			rest.note_canonical(canonical); | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl Drop for Service { | impl Drop for Service { | ||||||
|  | |||||||
| @ -48,6 +48,10 @@ pub trait SnapshotService : Sync + Send { | |||||||
| 	/// Feed a raw block chunk to the service to be processed asynchronously.
 | 	/// Feed a raw block chunk to the service to be processed asynchronously.
 | ||||||
| 	/// no-op if currently restoring.
 | 	/// no-op if currently restoring.
 | ||||||
| 	fn restore_block_chunk(&self, hash: H256, chunk: Bytes); | 	fn restore_block_chunk(&self, hash: H256, chunk: Bytes); | ||||||
|  | 
 | ||||||
|  | 	/// Give the restoration in-progress some canonical block hashes for
 | ||||||
|  | 	/// extra verification (performed at the end)
 | ||||||
|  | 	fn provide_canon_hashes(&self, canonical: &[(u64, H256)]); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl IpcConfig for SnapshotService { } | impl IpcConfig for SnapshotService { } | ||||||
|  | |||||||
| @ -26,6 +26,7 @@ use snapshot::io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter}; | |||||||
| use util::{Mutex, snappy}; | use util::{Mutex, snappy}; | ||||||
| use util::kvdb::{Database, DatabaseConfig}; | use util::kvdb::{Database, DatabaseConfig}; | ||||||
| 
 | 
 | ||||||
|  | use std::collections::HashMap; | ||||||
| use std::sync::Arc; | use std::sync::Arc; | ||||||
| 
 | 
 | ||||||
| fn chunk_and_restore(amount: u64) { | fn chunk_and_restore(amount: u64) { | ||||||
| @ -58,18 +59,20 @@ fn chunk_and_restore(amount: u64) { | |||||||
| 	// snapshot it.
 | 	// snapshot it.
 | ||||||
| 	let writer = Mutex::new(PackedWriter::new(&snapshot_path).unwrap()); | 	let writer = Mutex::new(PackedWriter::new(&snapshot_path).unwrap()); | ||||||
| 	let block_hashes = chunk_blocks(&bc, best_hash, &writer, &Progress::default()).unwrap(); | 	let block_hashes = chunk_blocks(&bc, best_hash, &writer, &Progress::default()).unwrap(); | ||||||
| 	writer.into_inner().finish(::snapshot::ManifestData { | 	let manifest = ::snapshot::ManifestData { | ||||||
| 		state_hashes: Vec::new(), | 		state_hashes: Vec::new(), | ||||||
| 		block_hashes: block_hashes, | 		block_hashes: block_hashes, | ||||||
| 		state_root: Default::default(), | 		state_root: ::util::sha3::SHA3_NULL_RLP, | ||||||
| 		block_number: amount, | 		block_number: amount, | ||||||
| 		block_hash: best_hash, | 		block_hash: best_hash, | ||||||
| 	}).unwrap(); | 	}; | ||||||
|  | 
 | ||||||
|  | 	writer.into_inner().finish(manifest.clone()).unwrap(); | ||||||
| 
 | 
 | ||||||
| 	// restore it.
 | 	// restore it.
 | ||||||
| 	let new_db = Arc::new(Database::open(&db_cfg, new_path.as_str()).unwrap()); | 	let new_db = Arc::new(Database::open(&db_cfg, new_path.as_str()).unwrap()); | ||||||
| 	let new_chain = BlockChain::new(Default::default(), &genesis, new_db.clone()); | 	let new_chain = BlockChain::new(Default::default(), &genesis, new_db.clone()); | ||||||
| 	let mut rebuilder = BlockRebuilder::new(new_chain, new_db.clone(), amount).unwrap(); | 	let mut rebuilder = BlockRebuilder::new(new_chain, new_db.clone(), &manifest).unwrap(); | ||||||
| 	let reader = PackedReader::new(&snapshot_path).unwrap().unwrap(); | 	let reader = PackedReader::new(&snapshot_path).unwrap().unwrap(); | ||||||
| 	let engine = ::engines::NullEngine::new(Default::default(), Default::default()); | 	let engine = ::engines::NullEngine::new(Default::default(), Default::default()); | ||||||
| 	for chunk_hash in &reader.manifest().block_hashes { | 	for chunk_hash in &reader.manifest().block_hashes { | ||||||
| @ -78,7 +81,7 @@ fn chunk_and_restore(amount: u64) { | |||||||
| 		rebuilder.feed(&chunk, &engine).unwrap(); | 		rebuilder.feed(&chunk, &engine).unwrap(); | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	rebuilder.glue_chunks(); | 	rebuilder.finalize(HashMap::new()).unwrap(); | ||||||
| 
 | 
 | ||||||
| 	// and test it.
 | 	// and test it.
 | ||||||
| 	let new_chain = BlockChain::new(Default::default(), &genesis, new_db); | 	let new_chain = BlockChain::new(Default::default(), &genesis, new_db); | ||||||
|  | |||||||
| @ -23,6 +23,7 @@ use super::helpers::*; | |||||||
| pub struct TestSnapshotService { | pub struct TestSnapshotService { | ||||||
| 	manifest: Option<ManifestData>, | 	manifest: Option<ManifestData>, | ||||||
| 	chunks: HashMap<H256, Bytes>, | 	chunks: HashMap<H256, Bytes>, | ||||||
|  | 	canon_hashes: Mutex<HashMap<u64, H256>>, | ||||||
| 
 | 
 | ||||||
| 	restoration_manifest: Mutex<Option<ManifestData>>, | 	restoration_manifest: Mutex<Option<ManifestData>>, | ||||||
| 	state_restoration_chunks: Mutex<HashMap<H256, Bytes>>, | 	state_restoration_chunks: Mutex<HashMap<H256, Bytes>>, | ||||||
| @ -34,6 +35,7 @@ impl TestSnapshotService { | |||||||
| 		TestSnapshotService { | 		TestSnapshotService { | ||||||
| 			manifest: None, | 			manifest: None, | ||||||
| 			chunks: HashMap::new(), | 			chunks: HashMap::new(), | ||||||
|  | 			canon_hashes: Mutex::new(HashMap::new()), | ||||||
| 			restoration_manifest: Mutex::new(None), | 			restoration_manifest: Mutex::new(None), | ||||||
| 			state_restoration_chunks: Mutex::new(HashMap::new()), | 			state_restoration_chunks: Mutex::new(HashMap::new()), | ||||||
| 			block_restoration_chunks: Mutex::new(HashMap::new()), | 			block_restoration_chunks: Mutex::new(HashMap::new()), | ||||||
| @ -57,6 +59,7 @@ impl TestSnapshotService { | |||||||
| 		TestSnapshotService { | 		TestSnapshotService { | ||||||
| 			manifest: Some(manifest), | 			manifest: Some(manifest), | ||||||
| 			chunks: chunks, | 			chunks: chunks, | ||||||
|  | 			canon_hashes: Mutex::new(HashMap::new()), | ||||||
| 			restoration_manifest: Mutex::new(None), | 			restoration_manifest: Mutex::new(None), | ||||||
| 			state_restoration_chunks: Mutex::new(HashMap::new()), | 			state_restoration_chunks: Mutex::new(HashMap::new()), | ||||||
| 			block_restoration_chunks: Mutex::new(HashMap::new()), | 			block_restoration_chunks: Mutex::new(HashMap::new()), | ||||||
| @ -110,6 +113,10 @@ impl SnapshotService for TestSnapshotService { | |||||||
| 			self.block_restoration_chunks.lock().insert(hash, chunk); | 			self.block_restoration_chunks.lock().insert(hash, chunk); | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	fn provide_canon_hashes(&self, hashes: &[(u64, H256)]) { | ||||||
|  | 		self.canon_hashes.lock().extend(hashes.iter().cloned()); | ||||||
|  | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[test] | #[test] | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user