From 1c19a807d9996e30e9e2f772c5a7f73e7e80ff07 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 25 Aug 2016 22:20:44 +0200 Subject: [PATCH] Take control of recovered snapshots, start restoration asynchronously (#2010) * take control of given snapshot * start snapshot restoration asynchronously, --- ethcore/src/service.rs | 8 + ethcore/src/snapshot/mod.rs | 8 +- ethcore/src/snapshot/service.rs | 240 ++++++++++++++++++--------- ethcore/src/snapshot/tests/blocks.rs | 1 - ethcore/src/snapshot/tests/state.rs | 3 +- parity/snapshot.rs | 6 +- 6 files changed, 181 insertions(+), 85 deletions(-) diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 355c7d580..e2e4772a4 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -22,6 +22,7 @@ use spec::Spec; use error::*; use client::{Client, ClientConfig, ChainNotify}; use miner::Miner; +use snapshot::ManifestData; use snapshot::service::Service as SnapshotService; use std::sync::atomic::AtomicBool; @@ -39,6 +40,8 @@ pub enum ClientIoMessage { BlockVerified, /// New transaction RLPs are ready to be imported NewTransactions(Vec), + /// Begin snapshot restoration + BeginRestoration(ManifestData), /// Feed a state chunk to the snapshot service FeedStateChunk(H256, Bytes), /// Feed a block chunk to the snapshot service @@ -160,6 +163,11 @@ impl IoHandler for ClientIoHandler { match *net_message { ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(); } ClientIoMessage::NewTransactions(ref transactions) => { self.client.import_queued_transactions(transactions); } + ClientIoMessage::BeginRestoration(ref manifest) => { + if let Err(e) = self.snapshot.init_restore(manifest.clone()) { + warn!("Failed to initialize snapshot restoration: {}", e); + } + } ClientIoMessage::FeedStateChunk(ref hash, ref chunk) => self.snapshot.feed_state_chunk(*hash, chunk), ClientIoMessage::FeedBlockChunk(ref hash, ref chunk) => self.snapshot.feed_block_chunk(*hash, chunk), _ => {} // ignore other messages diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index a1f9812d5..d1ad077fe 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -501,7 +501,7 @@ impl StateRebuilder { /// Check for accounts missing code. Once all chunks have been fed, there should /// be none. - pub fn check_missing(&self) -> Result<(), Error> { + pub fn check_missing(self) -> Result<(), Error> { let missing = self.missing_code.keys().cloned().collect::>(); match missing.is_empty() { true => Ok(()), @@ -640,8 +640,8 @@ impl BlockRebuilder { } /// Glue together any disconnected chunks. To be called at the end. - pub fn glue_chunks(&mut self) { - for &(ref first_num, ref first_hash) in &self.disconnected { + pub fn glue_chunks(self) { + for (first_num, first_hash) in self.disconnected { let parent_num = first_num - 1; // check if the parent is even in the chain. @@ -649,7 +649,7 @@ impl BlockRebuilder { // the first block of the first chunks has nothing to connect to. if let Some(parent_hash) = self.chain.block_hash(parent_num) { // if so, add the child to it. - self.chain.add_child(parent_hash, *first_hash); + self.chain.add_child(parent_hash, first_hash); } } } diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index 45e1184b4..9f2b3f34a 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use super::{ManifestData, StateRebuilder, BlockRebuilder}; -use super::io::{SnapshotReader, LooseReader}; +use super::io::{SnapshotReader, LooseReader, SnapshotWriter, LooseWriter}; use blockchain::BlockChain; use engines::Engine; @@ -34,7 +34,7 @@ use spec::Spec; use io::IoChannel; -use util::{Bytes, H256, Mutex, UtilError}; +use util::{Bytes, H256, Mutex, RwLock, UtilError}; use util::journaldb::Algorithm; use util::kvdb::{Database, DatabaseConfig}; use util::snappy; @@ -50,8 +50,6 @@ pub enum RestorationStatus { Failed, } -/// Restoration info. - /// The interface for a snapshot network service. /// This handles: /// - restoration of snapshots to temporary databases. @@ -74,8 +72,10 @@ pub trait SnapshotService { /// Begin snapshot restoration. /// If restoration in-progress, this will reset it. /// From this point on, any previous snapshot may become unavailable. - /// Returns true if successful, false otherwise. - fn begin_restore(&self, manifest: ManifestData) -> bool; + fn begin_restore(&self, manifest: ManifestData); + + /// Abort an in-progress restoration if there is one. + fn abort_restore(&self); /// Feed a raw state chunk to the service to be processed asynchronously. /// no-op if not currently restoring. @@ -88,51 +88,59 @@ pub trait SnapshotService { /// State restoration manager. struct Restoration { + manifest: ManifestData, state_chunks_left: HashSet, block_chunks_left: HashSet, state: StateRebuilder, blocks: BlockRebuilder, + writer: LooseWriter, snappy_buffer: Bytes, final_state_root: H256, } +struct RestorationParams<'a> { + manifest: ManifestData, // manifest to base restoration on. + pruning: Algorithm, // pruning algorithm for the database. + db_path: PathBuf, // database path + writer: LooseWriter, // writer for recovered snapshot. + genesis: &'a [u8], // genesis block of the chain. +} + impl Restoration { - // make a new restoration, building databases in the given path. - fn new(manifest: &ManifestData, pruning: Algorithm, path: &Path, gb: &[u8]) -> Result { + // make a new restoration using the given parameters. + fn new(params: RestorationParams) -> Result { + let manifest = params.manifest; + + let state_chunks = manifest.state_hashes.iter().cloned().collect(); + let block_chunks = manifest.block_hashes.iter().cloned().collect(); + let cfg = DatabaseConfig::with_columns(::db::NUM_COLUMNS); - let raw_db = Arc::new(try!(Database::open(&cfg, &*path.to_string_lossy()) + let raw_db = Arc::new(try!(Database::open(&cfg, &*params.db_path.to_string_lossy()) .map_err(UtilError::SimpleString))); - let chain = BlockChain::new(Default::default(), gb, raw_db.clone()); + let chain = BlockChain::new(Default::default(), params.genesis, raw_db.clone()); let blocks = try!(BlockRebuilder::new(chain, manifest.block_number)); + let root = manifest.state_root.clone(); Ok(Restoration { - state_chunks_left: manifest.state_hashes.iter().cloned().collect(), - block_chunks_left: manifest.block_hashes.iter().cloned().collect(), - state: StateRebuilder::new(raw_db, pruning), + manifest: manifest, + state_chunks_left: state_chunks, + block_chunks_left: block_chunks, + state: StateRebuilder::new(raw_db, params.pruning), blocks: blocks, + writer: params.writer, snappy_buffer: Vec::new(), - final_state_root: manifest.state_root, + final_state_root: root, }) } // feeds a state chunk fn feed_state(&mut self, hash: H256, chunk: &[u8]) -> Result<(), Error> { - use util::trie::TrieError; - if self.state_chunks_left.remove(&hash) { - let len = try!(snappy::decompress_into(&chunk, &mut self.snappy_buffer)); + let len = try!(snappy::decompress_into(chunk, &mut self.snappy_buffer)); + try!(self.state.feed(&self.snappy_buffer[..len])); - - if self.state_chunks_left.is_empty() { - try!(self.state.check_missing()); - - let root = self.state.state_root(); - if root != self.final_state_root { - warn!("Final restored state has wrong state root: expected {:?}, got {:?}", root, self.final_state_root); - return Err(TrieError::InvalidStateRoot(root).into()); - } - } + try!(self.writer.write_state_chunk(hash, chunk)); } Ok(()) @@ -141,18 +149,39 @@ impl Restoration { // feeds a block chunk fn feed_blocks(&mut self, hash: H256, chunk: &[u8], engine: &Engine) -> Result<(), Error> { if self.block_chunks_left.remove(&hash) { - let len = try!(snappy::decompress_into(&chunk, &mut self.snappy_buffer)); - try!(self.blocks.feed(&self.snappy_buffer[..len], engine)); + let len = try!(snappy::decompress_into(chunk, &mut self.snappy_buffer)); - if self.block_chunks_left.is_empty() { - // connect out-of-order chunks. - self.blocks.glue_chunks(); - } + try!(self.blocks.feed(&self.snappy_buffer[..len], engine)); + try!(self.writer.write_block_chunk(hash, chunk)); } Ok(()) } + // finish up restoration. + fn finalize(self) -> Result<(), Error> { + use util::trie::TrieError; + + if !self.is_done() { return Ok(()) } + + // verify final state root. + let root = self.state.state_root(); + if root != self.final_state_root { + warn!("Final restored state has wrong state root: expected {:?}, got {:?}", root, self.final_state_root); + return Err(TrieError::InvalidStateRoot(root).into()); + } + + // check for missing code. + try!(self.state.check_missing()); + + // connect out-of-order chunks. + self.blocks.glue_chunks(); + + try!(self.writer.finish(self.manifest)); + + Ok(()) + } + // is everything done? fn is_done(&self) -> bool { self.block_chunks_left.is_empty() && self.state_chunks_left.is_empty() @@ -174,7 +203,7 @@ pub struct Service { io_channel: Channel, pruning: Algorithm, status: Mutex, - reader: Option, + reader: RwLock>, engine: Arc, genesis_block: Bytes, state_chunks: AtomicUsize, @@ -190,6 +219,7 @@ impl Service { let reader = { let mut snapshot_path = db_path.clone(); snapshot_path.push("snapshot"); + snapshot_path.push("current"); LooseReader::new(snapshot_path).ok() }; @@ -201,15 +231,15 @@ impl Service { io_channel: io_channel, pruning: pruning, status: Mutex::new(RestorationStatus::Inactive), - reader: reader, + reader: RwLock::new(reader), engine: spec.engine.clone(), genesis_block: spec.genesis_block(), state_chunks: AtomicUsize::new(0), block_chunks: AtomicUsize::new(0), }; - // create the snapshot dir if it doesn't exist. - if let Err(e) = fs::create_dir_all(service.snapshot_dir()) { + // create the root snapshot dir if it doesn't exist. + if let Err(e) = fs::create_dir_all(service.root_dir()) { if e.kind() != ErrorKind::AlreadyExists { return Err(e.into()) } @@ -225,16 +255,23 @@ impl Service { Ok(service) } - // get the snapshot path. - fn snapshot_dir(&self) -> PathBuf { + // get the root path. + fn root_dir(&self) -> PathBuf { let mut dir = self.db_path.clone(); dir.push("snapshot"); dir } + // get the current snapshot dir. + fn snapshot_dir(&self) -> PathBuf { + let mut dir = self.root_dir(); + dir.push("current"); + dir + } + // get the restoration directory. fn restoration_dir(&self) -> PathBuf { - let mut dir = self.snapshot_dir(); + let mut dir = self.root_dir(); dir.push("restoration"); dir } @@ -246,6 +283,13 @@ impl Service { dir } + // temporary snapshot recovery path. + fn temp_recovery_dir(&self) -> PathBuf { + let mut dir = self.restoration_dir(); + dir.push("temp"); + dir + } + // replace one the client's database with our own. fn replace_client_db(&self) -> Result<(), Error> { let our_db = self.restoration_db(); @@ -284,6 +328,42 @@ impl Service { } } + /// Initialize the restoration synchronously. + pub fn init_restore(&self, manifest: ManifestData) -> Result<(), Error> { + let rest_dir = self.restoration_dir(); + + let mut res = self.restoration.lock(); + + // tear down existing restoration. + *res = None; + + // delete and restore the restoration dir. + if let Err(e) = fs::remove_dir_all(&rest_dir) { + match e.kind() { + ErrorKind::NotFound => {}, + _ => return Err(e.into()), + } + } + + try!(fs::create_dir_all(&rest_dir)); + + // make new restoration. + let writer = try!(LooseWriter::new(self.temp_recovery_dir())); + + let params = RestorationParams { + manifest: manifest, + pruning: self.pruning, + db_path: self.restoration_db(), + writer: writer, + genesis: &self.genesis_block, + }; + + *res = Some(try!(Restoration::new(params))); + + *self.status.lock() = RestorationStatus::Ongoing; + Ok(()) + } + // finalize the restoration. this accepts an already-locked // restoration as an argument -- so acquiring it again _will_ // lead to deadlock. @@ -293,27 +373,52 @@ impl Service { self.state_chunks.store(0, Ordering::SeqCst); self.block_chunks.store(0, Ordering::SeqCst); - // destroy the restoration before replacing databases. - *rest = None; - + // destroy the restoration before replacing databases and snapshot. + try!(rest.take().map(Restoration::finalize).unwrap_or(Ok(()))); try!(self.replace_client_db()); - *self.status.lock() = RestorationStatus::Inactive; + let mut reader = self.reader.write(); + *reader = None; // destroy the old reader if it existed. + + let snapshot_dir = self.snapshot_dir(); + + trace!(target: "snapshot", "removing old snapshot dir at {}", snapshot_dir.to_string_lossy()); + if let Err(e) = fs::remove_dir_all(&snapshot_dir) { + match e.kind() { + ErrorKind::NotFound => {} + _ => return Err(e.into()), + } + } + + try!(fs::create_dir(&snapshot_dir)); + + trace!(target: "snapshot", "copying restored snapshot files over"); + for maybe_file in try!(fs::read_dir(self.temp_recovery_dir())) { + let path = try!(maybe_file).path(); + if let Some(name) = path.file_name().map(|x| x.to_owned()) { + let mut new_path = snapshot_dir.clone(); + new_path.push(name); + try!(fs::rename(path, new_path)); + } + } - // TODO: take control of restored snapshot. let _ = fs::remove_dir_all(self.restoration_dir()); + *reader = Some(try!(LooseReader::new(snapshot_dir))); + + *self.status.lock() = RestorationStatus::Inactive; + Ok(()) } /// Feed a chunk of either kind. no-op if no restoration or status is wrong. fn feed_chunk(&self, hash: H256, chunk: &[u8], is_state: bool) -> Result<(), Error> { + // TODO: be able to process block chunks and state chunks at same time? + let mut restoration = self.restoration.lock(); + match self.status() { RestorationStatus::Inactive | RestorationStatus::Failed => Ok(()), RestorationStatus::Ongoing => { - // TODO: be able to process block chunks and state chunks at same time? - let mut restoration = self.restoration.lock(); - let res = { let rest = match *restoration { Some(ref mut r) => r, @@ -373,11 +478,11 @@ impl Service { impl SnapshotService for Service { fn manifest(&self) -> Option { - self.reader.as_ref().map(|r| r.manifest().clone()) + self.reader.read().as_ref().map(|r| r.manifest().clone()) } fn chunk(&self, hash: H256) -> Option { - self.reader.as_ref().and_then(|r| r.chunk(hash).ok()) + self.reader.read().as_ref().and_then(|r| r.chunk(hash).ok()) } fn status(&self) -> RestorationStatus { @@ -388,37 +493,20 @@ impl SnapshotService for Service { (self.state_chunks.load(Ordering::Relaxed), self.block_chunks.load(Ordering::Relaxed)) } - fn begin_restore(&self, manifest: ManifestData) -> bool { - let rest_dir = self.restoration_dir(); + fn begin_restore(&self, manifest: ManifestData) { + self.io_channel.send(ClientIoMessage::BeginRestoration(manifest)) + .expect("snapshot service and io service are kept alive by client service; qed"); + } - let mut res = self.restoration.lock(); - - // tear down existing restoration. - *res = None; - - // delete and restore the restoration dir. - if let Err(e) = fs::remove_dir_all(&rest_dir).and_then(|_| fs::create_dir_all(&rest_dir)) { + fn abort_restore(&self) { + *self.restoration.lock() = None; + *self.status.lock() = RestorationStatus::Inactive; + if let Err(e) = fs::remove_dir_all(&self.restoration_dir()) { match e.kind() { ErrorKind::NotFound => {}, - _ => { - warn!("encountered error {} while beginning snapshot restoration.", e); - return false; - } + _ => warn!("encountered error {} while deleting snapshot restoration dir.", e), } } - - // make new restoration. - let db_path = self.restoration_db(); - *res = match Restoration::new(&manifest, self.pruning, &db_path, &self.genesis_block) { - Ok(b) => Some(b), - Err(e) => { - warn!("encountered error {} while beginning snapshot restoration.", e); - return false; - } - }; - - *self.status.lock() = RestorationStatus::Ongoing; - true } fn restore_state_chunk(&self, hash: H256, chunk: Bytes) { diff --git a/ethcore/src/snapshot/tests/blocks.rs b/ethcore/src/snapshot/tests/blocks.rs index ac9880263..6c4344b6e 100644 --- a/ethcore/src/snapshot/tests/blocks.rs +++ b/ethcore/src/snapshot/tests/blocks.rs @@ -79,7 +79,6 @@ fn chunk_and_restore(amount: u64) { } rebuilder.glue_chunks(); - drop(rebuilder); // and test it. let new_chain = BlockChain::new(Default::default(), &genesis, new_db); diff --git a/ethcore/src/snapshot/tests/state.rs b/ethcore/src/snapshot/tests/state.rs index a293cdb44..fba6d56f6 100644 --- a/ethcore/src/snapshot/tests/state.rs +++ b/ethcore/src/snapshot/tests/state.rs @@ -72,8 +72,9 @@ fn snap_and_restore() { rebuilder.feed(&chunk).unwrap(); } - rebuilder.check_missing().unwrap(); assert_eq!(rebuilder.state_root(), state_root); + rebuilder.check_missing().unwrap(); + new_db }; diff --git a/parity/snapshot.rs b/parity/snapshot.rs index 650123d73..ecc463a2e 100644 --- a/parity/snapshot.rs +++ b/parity/snapshot.rs @@ -121,9 +121,9 @@ impl SnapshotCommand { // drop the client so we don't restore while it has open DB handles. drop(service); - if !snapshot.begin_restore(manifest.clone()) { - return Err("Failed to begin restoration.".into()); - } + try!(snapshot.init_restore(manifest.clone()).map_err(|e| { + format!("Failed to begin restoration: {}", e) + })); let (num_state, num_blocks) = (manifest.state_hashes.len(), manifest.block_hashes.len());