From dcfd7eab6d44922017c65e184d02a3368056d40d Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sun, 11 Sep 2016 14:05:59 +0200 Subject: [PATCH] Local snapshot restore (#2058) * restore from local snapshot * update status with chunks done * rework local restore trigger --- ethcore/src/service.rs | 2 +- ethcore/src/snapshot/service.rs | 85 ++++++++++++++-------- parity/cli.rs | 2 +- parity/snapshot.rs | 125 +++++++++++++++++++------------- util/src/kvdb.rs | 2 - 5 files changed, 130 insertions(+), 86 deletions(-) diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index aee039996..9fa126cc7 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -186,7 +186,7 @@ impl IoHandler for ClientIoHandler { ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(); } ClientIoMessage::NewTransactions(ref transactions) => { self.client.import_queued_transactions(transactions); } ClientIoMessage::BeginRestoration(ref manifest) => { - if let Err(e) = self.snapshot.init_restore(manifest.clone()) { + if let Err(e) = self.snapshot.init_restore(manifest.clone(), true) { warn!("Failed to initialize snapshot restoration: {}", e); } } diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index acde28de3..30d5ab716 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -35,7 +35,7 @@ use service::ClientIoMessage; use io::IoChannel; -use util::{Bytes, H256, Mutex, RwLock, UtilError}; +use util::{Bytes, H256, Mutex, RwLock, RwLockReadGuard, UtilError}; use util::journaldb::Algorithm; use util::kvdb::{Database, DatabaseConfig}; use util::snappy; @@ -70,7 +70,7 @@ struct Restoration { block_chunks_left: HashSet, state: StateRebuilder, blocks: BlockRebuilder, - writer: LooseWriter, + writer: Option, snappy_buffer: Bytes, final_state_root: H256, guard: Guard, @@ -80,8 +80,8 @@ struct RestorationParams<'a> { manifest: ManifestData, // manifest to base restoration on. pruning: Algorithm, // pruning algorithm for the database. db_path: PathBuf, // database path - db_config: &'a DatabaseConfig, - writer: LooseWriter, // writer for recovered snapshot. + db_config: &'a DatabaseConfig, // configuration for the database. + writer: Option, // writer for recovered snapshot. genesis: &'a [u8], // genesis block of the chain. guard: Guard, // guard for the restoration directory. } @@ -120,7 +120,10 @@ impl Restoration { let len = try!(snappy::decompress_into(chunk, &mut self.snappy_buffer)); try!(self.state.feed(&self.snappy_buffer[..len])); - try!(self.writer.write_state_chunk(hash, chunk)); + + if let Some(ref mut writer) = self.writer.as_mut() { + try!(writer.write_state_chunk(hash, chunk)); + } } Ok(()) @@ -132,7 +135,9 @@ impl Restoration { let len = try!(snappy::decompress_into(chunk, &mut self.snappy_buffer)); try!(self.blocks.feed(&self.snappy_buffer[..len], engine)); - try!(self.writer.write_block_chunk(hash, chunk)); + if let Some(ref mut writer) = self.writer.as_mut() { + try!(writer.write_block_chunk(hash, chunk)); + } } Ok(()) @@ -157,7 +162,9 @@ impl Restoration { // connect out-of-order chunks. self.blocks.glue_chunks(); - try!(self.writer.finish(self.manifest)); + if let Some(writer) = self.writer { + try!(writer.finish(self.manifest)); + } self.guard.disarm(); Ok(()) @@ -300,6 +307,11 @@ impl Service { Ok(()) } + /// Get a reference to the snapshot reader. + pub fn reader(&self) -> RwLockReadGuard> { + self.reader.read() + } + /// Tick the snapshot service. This will log any active snapshot /// being taken. pub fn tick(&self) { @@ -351,11 +363,15 @@ impl Service { } /// Initialize the restoration synchronously. - pub fn init_restore(&self, manifest: ManifestData) -> Result<(), Error> { + /// The recover flag indicates whether to recover the restored snapshot. + pub fn init_restore(&self, manifest: ManifestData, recover: bool) -> Result<(), Error> { let rest_dir = self.restoration_dir(); let mut res = self.restoration.lock(); + self.state_chunks.store(0, Ordering::SeqCst); + self.block_chunks.store(0, Ordering::SeqCst); + // tear down existing restoration. *res = None; @@ -370,7 +386,10 @@ impl Service { try!(fs::create_dir_all(&rest_dir)); // make new restoration. - let writer = try!(LooseWriter::new(self.temp_recovery_dir())); + let writer = match recover { + true => Some(try!(LooseWriter::new(self.temp_recovery_dir()))), + false => None + }; let params = RestorationParams { manifest: manifest, @@ -385,8 +404,8 @@ impl Service { *res = Some(try!(Restoration::new(params))); *self.status.lock() = RestorationStatus::Ongoing { - state_chunks_done: self.state_chunks.load(Ordering::Relaxed) as u32, - block_chunks_done: self.block_chunks.load(Ordering::Relaxed) as u32, + state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32, + block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32, }; Ok(()) } @@ -397,35 +416,35 @@ impl Service { fn finalize_restoration(&self, rest: &mut Option) -> Result<(), Error> { trace!(target: "snapshot", "finalizing restoration"); - self.state_chunks.store(0, Ordering::SeqCst); - self.block_chunks.store(0, Ordering::SeqCst); + let recover = rest.as_ref().map_or(false, |rest| rest.writer.is_some()); // destroy the restoration before replacing databases and snapshot. try!(rest.take().map(Restoration::finalize).unwrap_or(Ok(()))); try!(self.replace_client_db()); - let mut reader = self.reader.write(); - *reader = None; // destroy the old reader if it existed. + if recover { + let mut reader = self.reader.write(); + *reader = None; // destroy the old reader if it existed. - let snapshot_dir = self.snapshot_dir(); + let snapshot_dir = self.snapshot_dir(); - trace!(target: "snapshot", "removing old snapshot dir at {}", snapshot_dir.to_string_lossy()); - if let Err(e) = fs::remove_dir_all(&snapshot_dir) { - match e.kind() { - ErrorKind::NotFound => {} - _ => return Err(e.into()), + trace!(target: "snapshot", "removing old snapshot dir at {}", snapshot_dir.to_string_lossy()); + if let Err(e) = fs::remove_dir_all(&snapshot_dir) { + match e.kind() { + ErrorKind::NotFound => {} + _ => return Err(e.into()), + } } + + try!(fs::create_dir(&snapshot_dir)); + + trace!(target: "snapshot", "copying restored snapshot files over"); + try!(fs::rename(self.temp_recovery_dir(), &snapshot_dir)); + + *reader = Some(try!(LooseReader::new(snapshot_dir))); } - try!(fs::create_dir(&snapshot_dir)); - - trace!(target: "snapshot", "copying restored snapshot files over"); - try!(fs::rename(self.temp_recovery_dir(), &snapshot_dir)); - let _ = fs::remove_dir_all(self.restoration_dir()); - - *reader = Some(try!(LooseReader::new(snapshot_dir))); - *self.status.lock() = RestorationStatus::Inactive; Ok(()) @@ -506,7 +525,13 @@ impl SnapshotService for Service { } fn status(&self) -> RestorationStatus { - *self.status.lock() + let mut cur_status = self.status.lock(); + if let RestorationStatus::Ongoing { ref mut state_chunks_done, ref mut block_chunks_done } = *cur_status { + *state_chunks_done = self.state_chunks.load(Ordering::SeqCst) as u32; + *block_chunks_done = self.block_chunks.load(Ordering::SeqCst) as u32; + } + + cur_status.clone() } fn begin_restore(&self, manifest: ManifestData) { diff --git a/parity/cli.rs b/parity/cli.rs index bb46bda13..a234d9d7d 100644 --- a/parity/cli.rs +++ b/parity/cli.rs @@ -33,7 +33,7 @@ Usage: parity export [ ] [options] parity signer new-token [options] parity snapshot [options] - parity restore [options] + parity restore [ ] [options] Operating Options: --mode MODE Set the operating mode. MODE can be one of: diff --git a/parity/snapshot.rs b/parity/snapshot.rs index 8c0bdd8fc..73d06426f 100644 --- a/parity/snapshot.rs +++ b/parity/snapshot.rs @@ -21,8 +21,9 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use ethcore_logger::{setup_log, Config as LogConfig}; -use ethcore::snapshot::{Progress, RestorationStatus, SnapshotService}; +use ethcore::snapshot::{Progress, RestorationStatus, SnapshotService as SS}; use ethcore::snapshot::io::{SnapshotReader, PackedReader, PackedWriter}; +use ethcore::snapshot::service::Service as SnapshotService; use ethcore::service::ClientService; use ethcore::client::{Mode, DatabaseCompactionProfile, Switch, VMType}; use ethcore::miner::Miner; @@ -62,6 +63,60 @@ pub struct SnapshotCommand { pub block_at: BlockID, } +// helper for reading chunks from arbitrary reader and feeding them into the +// service. +fn restore_using(snapshot: Arc, reader: &R, recover: bool) -> Result<(), String> { + let manifest = reader.manifest(); + + info!("Restoring to block #{} (0x{:?})", manifest.block_number, manifest.block_hash); + + try!(snapshot.init_restore(manifest.clone(), recover).map_err(|e| { + format!("Failed to begin restoration: {}", e) + })); + + let (num_state, num_blocks) = (manifest.state_hashes.len(), manifest.block_hashes.len()); + + let informant_handle = snapshot.clone(); + ::std::thread::spawn(move || { + while let RestorationStatus::Ongoing { state_chunks_done, block_chunks_done } = informant_handle.status() { + info!("Processed {}/{} state chunks and {}/{} block chunks.", + state_chunks_done, num_state, block_chunks_done, num_blocks); + ::std::thread::sleep(Duration::from_secs(5)); + } + }); + + info!("Restoring state"); + for &state_hash in &manifest.state_hashes { + if snapshot.status() == RestorationStatus::Failed { + return Err("Restoration failed".into()); + } + + let chunk = try!(reader.chunk(state_hash) + .map_err(|e| format!("Encountered error while reading chunk {:?}: {}", state_hash, e))); + snapshot.feed_state_chunk(state_hash, &chunk); + } + + info!("Restoring blocks"); + for &block_hash in &manifest.block_hashes { + if snapshot.status() == RestorationStatus::Failed { + return Err("Restoration failed".into()); + } + + let chunk = try!(reader.chunk(block_hash) + .map_err(|e| format!("Encountered error while reading chunk {:?}: {}", block_hash, e))); + snapshot.feed_block_chunk(block_hash, &chunk); + } + + match snapshot.status() { + RestorationStatus::Ongoing { .. } => Err("Snapshot file is incomplete and missing chunks.".into()), + RestorationStatus::Failed => Err("Snapshot restoration failed.".into()), + RestorationStatus::Inactive => { + info!("Restoration complete."); + Ok(()) + } + } +} + impl SnapshotCommand { // shared portion of snapshot commands: start the client service fn start_service(self) -> Result<(ClientService, Arc), String> { @@ -106,69 +161,35 @@ impl SnapshotCommand { /// restore from a snapshot pub fn restore(self) -> Result<(), String> { - let file = try!(self.file_path.clone().ok_or("No file path provided.".to_owned())); + let file = self.file_path.clone(); let (service, _panic_handler) = try!(self.start_service()); warn!("Snapshot restoration is experimental and the format may be subject to change."); warn!("On encountering an unexpected error, please ensure that you have a recent snapshot."); let snapshot = service.snapshot_service(); - let reader = PackedReader::new(Path::new(&file)) - .map_err(|e| format!("Couldn't open snapshot file: {}", e)) - .and_then(|x| x.ok_or("Snapshot file has invalid format.".into())); - let reader = try!(reader); - let manifest = reader.manifest(); + if let Some(file) = file { + info!("Attempting to restore from snapshot at '{}'", file); - // drop the client so we don't restore while it has open DB handles. - drop(service); + let reader = PackedReader::new(Path::new(&file)) + .map_err(|e| format!("Couldn't open snapshot file: {}", e)) + .and_then(|x| x.ok_or("Snapshot file has invalid format.".into())); - try!(snapshot.init_restore(manifest.clone()).map_err(|e| { - format!("Failed to begin restoration: {}", e) - })); + let reader = try!(reader); + try!(restore_using(snapshot, &reader, true)); + } else { + info!("Attempting to restore from local snapshot."); - let (num_state, num_blocks) = (manifest.state_hashes.len(), manifest.block_hashes.len()); - - let informant_handle = snapshot.clone(); - ::std::thread::spawn(move || { - while let RestorationStatus::Ongoing { state_chunks_done, block_chunks_done } = informant_handle.status() { - info!("Processed {}/{} state chunks and {}/{} block chunks.", - state_chunks_done, num_state, block_chunks_done, num_blocks); - - ::std::thread::sleep(Duration::from_secs(5)); - } - }); - - info!("Restoring state"); - for &state_hash in &manifest.state_hashes { - if snapshot.status() == RestorationStatus::Failed { - return Err("Restoration failed".into()); - } - - let chunk = try!(reader.chunk(state_hash) - .map_err(|e| format!("Encountered error while reading chunk {:?}: {}", state_hash, e))); - snapshot.feed_state_chunk(state_hash, &chunk); - } - - info!("Restoring blocks"); - for &block_hash in &manifest.block_hashes { - if snapshot.status() == RestorationStatus::Failed { - return Err("Restoration failed".into()); - } - - let chunk = try!(reader.chunk(block_hash) - .map_err(|e| format!("Encountered error while reading chunk {:?}: {}", block_hash, e))); - snapshot.feed_block_chunk(block_hash, &chunk); - } - - match snapshot.status() { - RestorationStatus::Ongoing { .. } => Err("Snapshot file is incomplete and missing chunks.".into()), - RestorationStatus::Failed => Err("Snapshot restoration failed.".into()), - RestorationStatus::Inactive => { - info!("Restoration complete."); - Ok(()) + // attempting restoration with recovery will lead to deadlock + // as we currently hold a read lock on the service's reader. + match *snapshot.reader() { + Some(ref reader) => try!(restore_using(snapshot.clone(), reader, false)), + None => return Err("No local snapshot found.".into()), } } + + Ok(()) } /// Take a snapshot from the head of the chain. diff --git a/util/src/kvdb.rs b/util/src/kvdb.rs index 177df5fa0..3a89ae293 100644 --- a/util/src/kvdb.rs +++ b/util/src/kvdb.rs @@ -458,8 +458,6 @@ impl Database { let mut backup_db = PathBuf::from(&self.path); backup_db.pop(); backup_db.push("backup_db"); - println!("Path at {:?}", self.path); - println!("Backup at {:?}", backup_db); let existed = match fs::rename(&self.path, &backup_db) { Ok(_) => true,