Take control of recovered snapshots, start restoration asynchronously (#2010)
* take control of given snapshot * start snapshot restoration asynchronously,
This commit is contained in:
		
							parent
							
								
									2aef81cf90
								
							
						
					
					
						commit
						1c19a807d9
					
				| @ -22,6 +22,7 @@ use spec::Spec; | |||||||
| use error::*; | use error::*; | ||||||
| use client::{Client, ClientConfig, ChainNotify}; | use client::{Client, ClientConfig, ChainNotify}; | ||||||
| use miner::Miner; | use miner::Miner; | ||||||
|  | use snapshot::ManifestData; | ||||||
| use snapshot::service::Service as SnapshotService; | use snapshot::service::Service as SnapshotService; | ||||||
| use std::sync::atomic::AtomicBool; | use std::sync::atomic::AtomicBool; | ||||||
| 
 | 
 | ||||||
| @ -39,6 +40,8 @@ pub enum ClientIoMessage { | |||||||
| 	BlockVerified, | 	BlockVerified, | ||||||
| 	/// New transaction RLPs are ready to be imported
 | 	/// New transaction RLPs are ready to be imported
 | ||||||
| 	NewTransactions(Vec<Bytes>), | 	NewTransactions(Vec<Bytes>), | ||||||
|  | 	/// Begin snapshot restoration
 | ||||||
|  | 	BeginRestoration(ManifestData), | ||||||
| 	/// Feed a state chunk to the snapshot service
 | 	/// Feed a state chunk to the snapshot service
 | ||||||
| 	FeedStateChunk(H256, Bytes), | 	FeedStateChunk(H256, Bytes), | ||||||
| 	/// Feed a block chunk to the snapshot service
 | 	/// Feed a block chunk to the snapshot service
 | ||||||
| @ -160,6 +163,11 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler { | |||||||
| 		match *net_message { | 		match *net_message { | ||||||
| 			ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(); } | 			ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(); } | ||||||
| 			ClientIoMessage::NewTransactions(ref transactions) => { self.client.import_queued_transactions(transactions); } | 			ClientIoMessage::NewTransactions(ref transactions) => { self.client.import_queued_transactions(transactions); } | ||||||
|  | 			ClientIoMessage::BeginRestoration(ref manifest) => { | ||||||
|  | 				if let Err(e) = self.snapshot.init_restore(manifest.clone()) { | ||||||
|  | 					warn!("Failed to initialize snapshot restoration: {}", e); | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
| 			ClientIoMessage::FeedStateChunk(ref hash, ref chunk) => self.snapshot.feed_state_chunk(*hash, chunk), | 			ClientIoMessage::FeedStateChunk(ref hash, ref chunk) => self.snapshot.feed_state_chunk(*hash, chunk), | ||||||
| 			ClientIoMessage::FeedBlockChunk(ref hash, ref chunk) => self.snapshot.feed_block_chunk(*hash, chunk), | 			ClientIoMessage::FeedBlockChunk(ref hash, ref chunk) => self.snapshot.feed_block_chunk(*hash, chunk), | ||||||
| 			_ => {} // ignore other messages
 | 			_ => {} // ignore other messages
 | ||||||
|  | |||||||
| @ -501,7 +501,7 @@ impl StateRebuilder { | |||||||
| 
 | 
 | ||||||
| 	/// Check for accounts missing code. Once all chunks have been fed, there should
 | 	/// Check for accounts missing code. Once all chunks have been fed, there should
 | ||||||
| 	/// be none.
 | 	/// be none.
 | ||||||
| 	pub fn check_missing(&self) -> Result<(), Error> { | 	pub fn check_missing(self) -> Result<(), Error> { | ||||||
| 		let missing = self.missing_code.keys().cloned().collect::<Vec<_>>(); | 		let missing = self.missing_code.keys().cloned().collect::<Vec<_>>(); | ||||||
| 		match missing.is_empty() { | 		match missing.is_empty() { | ||||||
| 			true => Ok(()), | 			true => Ok(()), | ||||||
| @ -640,8 +640,8 @@ impl BlockRebuilder { | |||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/// Glue together any disconnected chunks. To be called at the end.
 | 	/// Glue together any disconnected chunks. To be called at the end.
 | ||||||
| 	pub fn glue_chunks(&mut self) { | 	pub fn glue_chunks(self) { | ||||||
| 		for &(ref first_num, ref first_hash) in &self.disconnected { | 		for (first_num, first_hash) in self.disconnected { | ||||||
| 			let parent_num = first_num - 1; | 			let parent_num = first_num - 1; | ||||||
| 
 | 
 | ||||||
| 			// check if the parent is even in the chain.
 | 			// check if the parent is even in the chain.
 | ||||||
| @ -649,7 +649,7 @@ impl BlockRebuilder { | |||||||
| 			// the first block of the first chunks has nothing to connect to.
 | 			// the first block of the first chunks has nothing to connect to.
 | ||||||
| 			if let Some(parent_hash) = self.chain.block_hash(parent_num) { | 			if let Some(parent_hash) = self.chain.block_hash(parent_num) { | ||||||
| 				// if so, add the child to it.
 | 				// if so, add the child to it.
 | ||||||
| 				self.chain.add_child(parent_hash, *first_hash); | 				self.chain.add_child(parent_hash, first_hash); | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | |||||||
| @ -24,7 +24,7 @@ use std::sync::Arc; | |||||||
| use std::sync::atomic::{AtomicUsize, Ordering}; | use std::sync::atomic::{AtomicUsize, Ordering}; | ||||||
| 
 | 
 | ||||||
| use super::{ManifestData, StateRebuilder, BlockRebuilder}; | use super::{ManifestData, StateRebuilder, BlockRebuilder}; | ||||||
| use super::io::{SnapshotReader, LooseReader}; | use super::io::{SnapshotReader, LooseReader, SnapshotWriter, LooseWriter}; | ||||||
| 
 | 
 | ||||||
| use blockchain::BlockChain; | use blockchain::BlockChain; | ||||||
| use engines::Engine; | use engines::Engine; | ||||||
| @ -34,7 +34,7 @@ use spec::Spec; | |||||||
| 
 | 
 | ||||||
| use io::IoChannel; | use io::IoChannel; | ||||||
| 
 | 
 | ||||||
| use util::{Bytes, H256, Mutex, UtilError}; | use util::{Bytes, H256, Mutex, RwLock, UtilError}; | ||||||
| use util::journaldb::Algorithm; | use util::journaldb::Algorithm; | ||||||
| use util::kvdb::{Database, DatabaseConfig}; | use util::kvdb::{Database, DatabaseConfig}; | ||||||
| use util::snappy; | use util::snappy; | ||||||
| @ -50,8 +50,6 @@ pub enum RestorationStatus { | |||||||
| 	Failed, | 	Failed, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| /// Restoration info.
 |  | ||||||
| 
 |  | ||||||
| /// The interface for a snapshot network service.
 | /// The interface for a snapshot network service.
 | ||||||
| /// This handles:
 | /// This handles:
 | ||||||
| ///    - restoration of snapshots to temporary databases.
 | ///    - restoration of snapshots to temporary databases.
 | ||||||
| @ -74,8 +72,10 @@ pub trait SnapshotService { | |||||||
| 	/// Begin snapshot restoration.
 | 	/// Begin snapshot restoration.
 | ||||||
| 	/// If restoration in-progress, this will reset it.
 | 	/// If restoration in-progress, this will reset it.
 | ||||||
| 	/// From this point on, any previous snapshot may become unavailable.
 | 	/// From this point on, any previous snapshot may become unavailable.
 | ||||||
| 	/// Returns true if successful, false otherwise.
 | 	fn begin_restore(&self, manifest: ManifestData); | ||||||
| 	fn begin_restore(&self, manifest: ManifestData) -> bool; | 
 | ||||||
|  | 	/// Abort an in-progress restoration if there is one.
 | ||||||
|  | 	fn abort_restore(&self); | ||||||
| 
 | 
 | ||||||
| 	/// Feed a raw state chunk to the service to be processed asynchronously.
 | 	/// Feed a raw state chunk to the service to be processed asynchronously.
 | ||||||
| 	/// no-op if not currently restoring.
 | 	/// no-op if not currently restoring.
 | ||||||
| @ -88,51 +88,59 @@ pub trait SnapshotService { | |||||||
| 
 | 
 | ||||||
| /// State restoration manager.
 | /// State restoration manager.
 | ||||||
| struct Restoration { | struct Restoration { | ||||||
|  | 	manifest: ManifestData, | ||||||
| 	state_chunks_left: HashSet<H256>, | 	state_chunks_left: HashSet<H256>, | ||||||
| 	block_chunks_left: HashSet<H256>, | 	block_chunks_left: HashSet<H256>, | ||||||
| 	state: StateRebuilder, | 	state: StateRebuilder, | ||||||
| 	blocks: BlockRebuilder, | 	blocks: BlockRebuilder, | ||||||
|  | 	writer: LooseWriter, | ||||||
| 	snappy_buffer: Bytes, | 	snappy_buffer: Bytes, | ||||||
| 	final_state_root: H256, | 	final_state_root: H256, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | struct RestorationParams<'a> { | ||||||
|  | 	manifest: ManifestData, // manifest to base restoration on.
 | ||||||
|  | 	pruning: Algorithm, // pruning algorithm for the database.
 | ||||||
|  | 	db_path: PathBuf, // database path
 | ||||||
|  | 	writer: LooseWriter, // writer for recovered snapshot.
 | ||||||
|  | 	genesis: &'a [u8], // genesis block of the chain.
 | ||||||
|  | } | ||||||
|  | 
 | ||||||
| impl Restoration { | impl Restoration { | ||||||
| 	// make a new restoration, building databases in the given path.
 | 	// make a new restoration using the given parameters.
 | ||||||
| 	fn new(manifest: &ManifestData, pruning: Algorithm, path: &Path, gb: &[u8]) -> Result<Self, Error> { | 	fn new(params: RestorationParams) -> Result<Self, Error> { | ||||||
|  | 		let manifest = params.manifest; | ||||||
|  | 
 | ||||||
|  | 		let state_chunks = manifest.state_hashes.iter().cloned().collect(); | ||||||
|  | 		let block_chunks = manifest.block_hashes.iter().cloned().collect(); | ||||||
|  | 
 | ||||||
| 		let cfg = DatabaseConfig::with_columns(::db::NUM_COLUMNS); | 		let cfg = DatabaseConfig::with_columns(::db::NUM_COLUMNS); | ||||||
| 		let raw_db = Arc::new(try!(Database::open(&cfg, &*path.to_string_lossy()) | 		let raw_db = Arc::new(try!(Database::open(&cfg, &*params.db_path.to_string_lossy()) | ||||||
| 			.map_err(UtilError::SimpleString))); | 			.map_err(UtilError::SimpleString))); | ||||||
| 
 | 
 | ||||||
| 		let chain = BlockChain::new(Default::default(), gb, raw_db.clone()); | 		let chain = BlockChain::new(Default::default(), params.genesis, raw_db.clone()); | ||||||
| 		let blocks = try!(BlockRebuilder::new(chain, manifest.block_number)); | 		let blocks = try!(BlockRebuilder::new(chain, manifest.block_number)); | ||||||
| 
 | 
 | ||||||
|  | 		let root = manifest.state_root.clone(); | ||||||
| 		Ok(Restoration { | 		Ok(Restoration { | ||||||
| 			state_chunks_left: manifest.state_hashes.iter().cloned().collect(), | 			manifest: manifest, | ||||||
| 			block_chunks_left: manifest.block_hashes.iter().cloned().collect(), | 			state_chunks_left: state_chunks, | ||||||
| 			state: StateRebuilder::new(raw_db, pruning), | 			block_chunks_left: block_chunks, | ||||||
|  | 			state: StateRebuilder::new(raw_db, params.pruning), | ||||||
| 			blocks: blocks, | 			blocks: blocks, | ||||||
|  | 			writer: params.writer, | ||||||
| 			snappy_buffer: Vec::new(), | 			snappy_buffer: Vec::new(), | ||||||
| 			final_state_root: manifest.state_root, | 			final_state_root: root, | ||||||
| 		}) | 		}) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// feeds a state chunk
 | 	// feeds a state chunk
 | ||||||
| 	fn feed_state(&mut self, hash: H256, chunk: &[u8]) -> Result<(), Error> { | 	fn feed_state(&mut self, hash: H256, chunk: &[u8]) -> Result<(), Error> { | ||||||
| 		use util::trie::TrieError; |  | ||||||
| 
 |  | ||||||
| 		if self.state_chunks_left.remove(&hash) { | 		if self.state_chunks_left.remove(&hash) { | ||||||
| 			let len = try!(snappy::decompress_into(&chunk, &mut self.snappy_buffer)); | 			let len = try!(snappy::decompress_into(chunk, &mut self.snappy_buffer)); | ||||||
|  | 
 | ||||||
| 			try!(self.state.feed(&self.snappy_buffer[..len])); | 			try!(self.state.feed(&self.snappy_buffer[..len])); | ||||||
| 
 | 			try!(self.writer.write_state_chunk(hash, chunk)); | ||||||
| 			if self.state_chunks_left.is_empty() { |  | ||||||
| 				try!(self.state.check_missing()); |  | ||||||
| 
 |  | ||||||
| 				let root = self.state.state_root(); |  | ||||||
| 				if root != self.final_state_root { |  | ||||||
| 					warn!("Final restored state has wrong state root: expected {:?}, got {:?}", root, self.final_state_root); |  | ||||||
| 					return Err(TrieError::InvalidStateRoot(root).into()); |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		Ok(()) | 		Ok(()) | ||||||
| @ -141,14 +149,35 @@ impl Restoration { | |||||||
| 	// feeds a block chunk
 | 	// feeds a block chunk
 | ||||||
| 	fn feed_blocks(&mut self, hash: H256, chunk: &[u8], engine: &Engine) -> Result<(), Error> { | 	fn feed_blocks(&mut self, hash: H256, chunk: &[u8], engine: &Engine) -> Result<(), Error> { | ||||||
| 		if self.block_chunks_left.remove(&hash) { | 		if self.block_chunks_left.remove(&hash) { | ||||||
| 			let len = try!(snappy::decompress_into(&chunk, &mut self.snappy_buffer)); | 			let len = try!(snappy::decompress_into(chunk, &mut self.snappy_buffer)); | ||||||
| 			try!(self.blocks.feed(&self.snappy_buffer[..len], engine)); | 
 | ||||||
|  | 			try!(self.blocks.feed(&self.snappy_buffer[..len], engine)); | ||||||
|  | 			try!(self.writer.write_block_chunk(hash, chunk)); | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		Ok(()) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// finish up restoration.
 | ||||||
|  | 	fn finalize(self) -> Result<(), Error> { | ||||||
|  | 		use util::trie::TrieError; | ||||||
|  | 
 | ||||||
|  | 		if !self.is_done() { return Ok(()) } | ||||||
|  | 
 | ||||||
|  | 		// verify final state root.
 | ||||||
|  | 		let root = self.state.state_root(); | ||||||
|  | 		if root != self.final_state_root { | ||||||
|  | 			warn!("Final restored state has wrong state root: expected {:?}, got {:?}", root, self.final_state_root); | ||||||
|  | 			return Err(TrieError::InvalidStateRoot(root).into()); | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		// check for missing code.
 | ||||||
|  | 		try!(self.state.check_missing()); | ||||||
| 
 | 
 | ||||||
| 			if self.block_chunks_left.is_empty() { |  | ||||||
| 		// connect out-of-order chunks.
 | 		// connect out-of-order chunks.
 | ||||||
| 		self.blocks.glue_chunks(); | 		self.blocks.glue_chunks(); | ||||||
| 			} | 
 | ||||||
| 		} | 		try!(self.writer.finish(self.manifest)); | ||||||
| 
 | 
 | ||||||
| 		Ok(()) | 		Ok(()) | ||||||
| 	} | 	} | ||||||
| @ -174,7 +203,7 @@ pub struct Service { | |||||||
| 	io_channel: Channel, | 	io_channel: Channel, | ||||||
| 	pruning: Algorithm, | 	pruning: Algorithm, | ||||||
| 	status: Mutex<RestorationStatus>, | 	status: Mutex<RestorationStatus>, | ||||||
| 	reader: Option<LooseReader>, | 	reader: RwLock<Option<LooseReader>>, | ||||||
| 	engine: Arc<Engine>, | 	engine: Arc<Engine>, | ||||||
| 	genesis_block: Bytes, | 	genesis_block: Bytes, | ||||||
| 	state_chunks: AtomicUsize, | 	state_chunks: AtomicUsize, | ||||||
| @ -190,6 +219,7 @@ impl Service { | |||||||
| 		let reader = { | 		let reader = { | ||||||
| 			let mut snapshot_path = db_path.clone(); | 			let mut snapshot_path = db_path.clone(); | ||||||
| 			snapshot_path.push("snapshot"); | 			snapshot_path.push("snapshot"); | ||||||
|  | 			snapshot_path.push("current"); | ||||||
| 
 | 
 | ||||||
| 			LooseReader::new(snapshot_path).ok() | 			LooseReader::new(snapshot_path).ok() | ||||||
| 		}; | 		}; | ||||||
| @ -201,15 +231,15 @@ impl Service { | |||||||
| 			io_channel: io_channel, | 			io_channel: io_channel, | ||||||
| 			pruning: pruning, | 			pruning: pruning, | ||||||
| 			status: Mutex::new(RestorationStatus::Inactive), | 			status: Mutex::new(RestorationStatus::Inactive), | ||||||
| 			reader: reader, | 			reader: RwLock::new(reader), | ||||||
| 			engine: spec.engine.clone(), | 			engine: spec.engine.clone(), | ||||||
| 			genesis_block: spec.genesis_block(), | 			genesis_block: spec.genesis_block(), | ||||||
| 			state_chunks: AtomicUsize::new(0), | 			state_chunks: AtomicUsize::new(0), | ||||||
| 			block_chunks: AtomicUsize::new(0), | 			block_chunks: AtomicUsize::new(0), | ||||||
| 		}; | 		}; | ||||||
| 
 | 
 | ||||||
| 		// create the snapshot dir if it doesn't exist.
 | 		// create the root snapshot dir if it doesn't exist.
 | ||||||
| 		if let Err(e) = fs::create_dir_all(service.snapshot_dir()) { | 		if let Err(e) = fs::create_dir_all(service.root_dir()) { | ||||||
| 			if e.kind() != ErrorKind::AlreadyExists { | 			if e.kind() != ErrorKind::AlreadyExists { | ||||||
| 				return Err(e.into()) | 				return Err(e.into()) | ||||||
| 			} | 			} | ||||||
| @ -225,16 +255,23 @@ impl Service { | |||||||
| 		Ok(service) | 		Ok(service) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// get the snapshot path.
 | 	// get the root path.
 | ||||||
| 	fn snapshot_dir(&self) -> PathBuf { | 	fn root_dir(&self) -> PathBuf { | ||||||
| 		let mut dir = self.db_path.clone(); | 		let mut dir = self.db_path.clone(); | ||||||
| 		dir.push("snapshot"); | 		dir.push("snapshot"); | ||||||
| 		dir | 		dir | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	// get the current snapshot dir.
 | ||||||
|  | 	fn snapshot_dir(&self) -> PathBuf { | ||||||
|  | 		let mut dir = self.root_dir(); | ||||||
|  | 		dir.push("current"); | ||||||
|  | 		dir | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	// get the restoration directory.
 | 	// get the restoration directory.
 | ||||||
| 	fn restoration_dir(&self) -> PathBuf { | 	fn restoration_dir(&self) -> PathBuf { | ||||||
| 		let mut dir = self.snapshot_dir(); | 		let mut dir = self.root_dir(); | ||||||
| 		dir.push("restoration"); | 		dir.push("restoration"); | ||||||
| 		dir | 		dir | ||||||
| 	} | 	} | ||||||
| @ -246,6 +283,13 @@ impl Service { | |||||||
| 		dir | 		dir | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	// temporary snapshot recovery path.
 | ||||||
|  | 	fn temp_recovery_dir(&self) -> PathBuf { | ||||||
|  | 		let mut dir = self.restoration_dir(); | ||||||
|  | 		dir.push("temp"); | ||||||
|  | 		dir | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	// replace one the client's database with our own.
 | 	// replace one the client's database with our own.
 | ||||||
| 	fn replace_client_db(&self) -> Result<(), Error> { | 	fn replace_client_db(&self) -> Result<(), Error> { | ||||||
| 		let our_db = self.restoration_db(); | 		let our_db = self.restoration_db(); | ||||||
| @ -284,6 +328,42 @@ impl Service { | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	/// Initialize the restoration synchronously.
 | ||||||
|  | 	pub fn init_restore(&self, manifest: ManifestData) -> Result<(), Error> { | ||||||
|  | 		let rest_dir = self.restoration_dir(); | ||||||
|  | 
 | ||||||
|  | 		let mut res = self.restoration.lock(); | ||||||
|  | 
 | ||||||
|  | 		// tear down existing restoration.
 | ||||||
|  | 		*res = None; | ||||||
|  | 
 | ||||||
|  | 		// delete and restore the restoration dir.
 | ||||||
|  | 		if let Err(e) = fs::remove_dir_all(&rest_dir) { | ||||||
|  | 			match e.kind() { | ||||||
|  | 				ErrorKind::NotFound => {}, | ||||||
|  | 				_ => return Err(e.into()), | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		try!(fs::create_dir_all(&rest_dir)); | ||||||
|  | 
 | ||||||
|  | 		// make new restoration.
 | ||||||
|  | 		let writer = try!(LooseWriter::new(self.temp_recovery_dir())); | ||||||
|  | 
 | ||||||
|  | 		let params = RestorationParams { | ||||||
|  | 			manifest: manifest, | ||||||
|  | 			pruning: self.pruning, | ||||||
|  | 			db_path: self.restoration_db(), | ||||||
|  | 			writer: writer, | ||||||
|  | 			genesis: &self.genesis_block, | ||||||
|  | 		}; | ||||||
|  | 
 | ||||||
|  | 		*res = Some(try!(Restoration::new(params))); | ||||||
|  | 
 | ||||||
|  | 		*self.status.lock() = RestorationStatus::Ongoing; | ||||||
|  | 		Ok(()) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	// finalize the restoration. this accepts an already-locked
 | 	// finalize the restoration. this accepts an already-locked
 | ||||||
| 	// restoration as an argument -- so acquiring it again _will_
 | 	// restoration as an argument -- so acquiring it again _will_
 | ||||||
| 	// lead to deadlock.
 | 	// lead to deadlock.
 | ||||||
| @ -293,27 +373,52 @@ impl Service { | |||||||
| 		self.state_chunks.store(0, Ordering::SeqCst); | 		self.state_chunks.store(0, Ordering::SeqCst); | ||||||
| 		self.block_chunks.store(0, Ordering::SeqCst); | 		self.block_chunks.store(0, Ordering::SeqCst); | ||||||
| 
 | 
 | ||||||
| 		// destroy the restoration before replacing databases.
 | 		// destroy the restoration before replacing databases and snapshot.
 | ||||||
| 		*rest = None; | 		try!(rest.take().map(Restoration::finalize).unwrap_or(Ok(()))); | ||||||
| 
 |  | ||||||
| 		try!(self.replace_client_db()); | 		try!(self.replace_client_db()); | ||||||
| 
 | 
 | ||||||
| 		*self.status.lock() = RestorationStatus::Inactive; | 		let mut reader = self.reader.write(); | ||||||
|  | 		*reader = None; // destroy the old reader if it existed.
 | ||||||
|  | 
 | ||||||
|  | 		let snapshot_dir = self.snapshot_dir(); | ||||||
|  | 
 | ||||||
|  | 		trace!(target: "snapshot", "removing old snapshot dir at {}", snapshot_dir.to_string_lossy()); | ||||||
|  | 		if let Err(e) = fs::remove_dir_all(&snapshot_dir) { | ||||||
|  | 			match e.kind() { | ||||||
|  | 				ErrorKind::NotFound => {} | ||||||
|  | 				_ => return Err(e.into()), | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		try!(fs::create_dir(&snapshot_dir)); | ||||||
|  | 
 | ||||||
|  | 		trace!(target: "snapshot", "copying restored snapshot files over"); | ||||||
|  | 		for maybe_file in try!(fs::read_dir(self.temp_recovery_dir())) { | ||||||
|  | 			let path = try!(maybe_file).path(); | ||||||
|  | 			if let Some(name) = path.file_name().map(|x| x.to_owned()) { | ||||||
|  | 				let mut new_path = snapshot_dir.clone(); | ||||||
|  | 				new_path.push(name); | ||||||
|  | 				try!(fs::rename(path, new_path)); | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
| 
 | 
 | ||||||
| 		// TODO: take control of restored snapshot.
 |  | ||||||
| 		let _ = fs::remove_dir_all(self.restoration_dir()); | 		let _ = fs::remove_dir_all(self.restoration_dir()); | ||||||
| 
 | 
 | ||||||
|  | 		*reader = Some(try!(LooseReader::new(snapshot_dir))); | ||||||
|  | 
 | ||||||
|  | 		*self.status.lock() = RestorationStatus::Inactive; | ||||||
|  | 
 | ||||||
| 		Ok(()) | 		Ok(()) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/// Feed a chunk of either kind. no-op if no restoration or status is wrong.
 | 	/// Feed a chunk of either kind. no-op if no restoration or status is wrong.
 | ||||||
| 	fn feed_chunk(&self, hash: H256, chunk: &[u8], is_state: bool) -> Result<(), Error> { | 	fn feed_chunk(&self, hash: H256, chunk: &[u8], is_state: bool) -> Result<(), Error> { | ||||||
| 		match self.status() { |  | ||||||
| 			RestorationStatus::Inactive | RestorationStatus::Failed => Ok(()), |  | ||||||
| 			RestorationStatus::Ongoing => { |  | ||||||
| 		// TODO: be able to process block chunks and state chunks at same time?
 | 		// TODO: be able to process block chunks and state chunks at same time?
 | ||||||
| 		let mut restoration = self.restoration.lock(); | 		let mut restoration = self.restoration.lock(); | ||||||
| 
 | 
 | ||||||
|  | 		match self.status() { | ||||||
|  | 			RestorationStatus::Inactive | RestorationStatus::Failed => Ok(()), | ||||||
|  | 			RestorationStatus::Ongoing => { | ||||||
| 				let res = { | 				let res = { | ||||||
| 					let rest = match *restoration { | 					let rest = match *restoration { | ||||||
| 						Some(ref mut r) => r, | 						Some(ref mut r) => r, | ||||||
| @ -373,11 +478,11 @@ impl Service { | |||||||
| 
 | 
 | ||||||
| impl SnapshotService for Service { | impl SnapshotService for Service { | ||||||
| 	fn manifest(&self) -> Option<ManifestData> { | 	fn manifest(&self) -> Option<ManifestData> { | ||||||
| 		self.reader.as_ref().map(|r| r.manifest().clone()) | 		self.reader.read().as_ref().map(|r| r.manifest().clone()) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	fn chunk(&self, hash: H256) -> Option<Bytes> { | 	fn chunk(&self, hash: H256) -> Option<Bytes> { | ||||||
| 		self.reader.as_ref().and_then(|r| r.chunk(hash).ok()) | 		self.reader.read().as_ref().and_then(|r| r.chunk(hash).ok()) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	fn status(&self) -> RestorationStatus { | 	fn status(&self) -> RestorationStatus { | ||||||
| @ -388,39 +493,22 @@ impl SnapshotService for Service { | |||||||
| 		(self.state_chunks.load(Ordering::Relaxed), self.block_chunks.load(Ordering::Relaxed)) | 		(self.state_chunks.load(Ordering::Relaxed), self.block_chunks.load(Ordering::Relaxed)) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	fn begin_restore(&self, manifest: ManifestData) -> bool { | 	fn begin_restore(&self, manifest: ManifestData) { | ||||||
| 		let rest_dir = self.restoration_dir(); | 		self.io_channel.send(ClientIoMessage::BeginRestoration(manifest)) | ||||||
|  | 			.expect("snapshot service and io service are kept alive by client service; qed"); | ||||||
|  | 	} | ||||||
| 
 | 
 | ||||||
| 		let mut res = self.restoration.lock(); | 	fn abort_restore(&self) { | ||||||
| 
 | 		*self.restoration.lock() = None; | ||||||
| 		// tear down existing restoration.
 | 		*self.status.lock() = RestorationStatus::Inactive; | ||||||
| 		*res = None; | 		if let Err(e) = fs::remove_dir_all(&self.restoration_dir()) { | ||||||
| 
 |  | ||||||
| 		// delete and restore the restoration dir.
 |  | ||||||
| 		if let Err(e) = fs::remove_dir_all(&rest_dir).and_then(|_| fs::create_dir_all(&rest_dir)) { |  | ||||||
| 			match e.kind() { | 			match e.kind() { | ||||||
| 				ErrorKind::NotFound => {}, | 				ErrorKind::NotFound => {}, | ||||||
| 				_ => { | 				_ => warn!("encountered error {} while deleting snapshot restoration dir.", e), | ||||||
| 					warn!("encountered error {} while beginning snapshot restoration.", e); |  | ||||||
| 					return false; |  | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 		// make new restoration.
 |  | ||||||
| 		let db_path = self.restoration_db(); |  | ||||||
| 		*res = match Restoration::new(&manifest, self.pruning, &db_path, &self.genesis_block) { |  | ||||||
| 				Ok(b) => Some(b), |  | ||||||
| 				Err(e) => { |  | ||||||
| 					warn!("encountered error {} while beginning snapshot restoration.", e); |  | ||||||
| 					return false; |  | ||||||
| 				} |  | ||||||
| 		}; |  | ||||||
| 
 |  | ||||||
| 		*self.status.lock() = RestorationStatus::Ongoing; |  | ||||||
| 		true |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	fn restore_state_chunk(&self, hash: H256, chunk: Bytes) { | 	fn restore_state_chunk(&self, hash: H256, chunk: Bytes) { | ||||||
| 		self.io_channel.send(ClientIoMessage::FeedStateChunk(hash, chunk)) | 		self.io_channel.send(ClientIoMessage::FeedStateChunk(hash, chunk)) | ||||||
| 			.expect("snapshot service and io service are kept alive by client service; qed"); | 			.expect("snapshot service and io service are kept alive by client service; qed"); | ||||||
|  | |||||||
| @ -79,7 +79,6 @@ fn chunk_and_restore(amount: u64) { | |||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	rebuilder.glue_chunks(); | 	rebuilder.glue_chunks(); | ||||||
| 	drop(rebuilder); |  | ||||||
| 
 | 
 | ||||||
| 	// and test it.
 | 	// and test it.
 | ||||||
| 	let new_chain = BlockChain::new(Default::default(), &genesis, new_db); | 	let new_chain = BlockChain::new(Default::default(), &genesis, new_db); | ||||||
|  | |||||||
| @ -72,8 +72,9 @@ fn snap_and_restore() { | |||||||
| 			rebuilder.feed(&chunk).unwrap(); | 			rebuilder.feed(&chunk).unwrap(); | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		rebuilder.check_missing().unwrap(); |  | ||||||
| 		assert_eq!(rebuilder.state_root(), state_root); | 		assert_eq!(rebuilder.state_root(), state_root); | ||||||
|  | 		rebuilder.check_missing().unwrap(); | ||||||
|  | 
 | ||||||
| 		new_db | 		new_db | ||||||
| 	}; | 	}; | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -121,9 +121,9 @@ impl SnapshotCommand { | |||||||
| 		// drop the client so we don't restore while it has open DB handles.
 | 		// drop the client so we don't restore while it has open DB handles.
 | ||||||
| 		drop(service); | 		drop(service); | ||||||
| 
 | 
 | ||||||
| 		if !snapshot.begin_restore(manifest.clone()) { | 		try!(snapshot.init_restore(manifest.clone()).map_err(|e| { | ||||||
| 			return Err("Failed to begin restoration.".into()); | 			format!("Failed to begin restoration: {}", e) | ||||||
| 		} | 		})); | ||||||
| 
 | 
 | ||||||
| 		let (num_state, num_blocks) = (manifest.state_hashes.len(), manifest.block_hashes.len()); | 		let (num_state, num_blocks) = (manifest.state_hashes.len(), manifest.block_hashes.len()); | ||||||
| 
 | 
 | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user