Local snapshot restore (#2058)

* restore from local snapshot

* update status with chunks done

* rework local restore trigger
This commit is contained in:
Robert Habermeier
2016-09-11 14:05:59 +02:00
committed by Arkadiy Paronyan
parent fd4361e284
commit dcfd7eab6d
5 changed files with 130 additions and 86 deletions

View File

@@ -186,7 +186,7 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(); }
ClientIoMessage::NewTransactions(ref transactions) => { self.client.import_queued_transactions(transactions); }
ClientIoMessage::BeginRestoration(ref manifest) => {
if let Err(e) = self.snapshot.init_restore(manifest.clone()) {
if let Err(e) = self.snapshot.init_restore(manifest.clone(), true) {
warn!("Failed to initialize snapshot restoration: {}", e);
}
}

View File

@@ -35,7 +35,7 @@ use service::ClientIoMessage;
use io::IoChannel;
use util::{Bytes, H256, Mutex, RwLock, UtilError};
use util::{Bytes, H256, Mutex, RwLock, RwLockReadGuard, UtilError};
use util::journaldb::Algorithm;
use util::kvdb::{Database, DatabaseConfig};
use util::snappy;
@@ -70,7 +70,7 @@ struct Restoration {
block_chunks_left: HashSet<H256>,
state: StateRebuilder,
blocks: BlockRebuilder,
writer: LooseWriter,
writer: Option<LooseWriter>,
snappy_buffer: Bytes,
final_state_root: H256,
guard: Guard,
@@ -80,8 +80,8 @@ struct RestorationParams<'a> {
manifest: ManifestData, // manifest to base restoration on.
pruning: Algorithm, // pruning algorithm for the database.
db_path: PathBuf, // database path
db_config: &'a DatabaseConfig,
writer: LooseWriter, // writer for recovered snapshot.
db_config: &'a DatabaseConfig, // configuration for the database.
writer: Option<LooseWriter>, // writer for recovered snapshot.
genesis: &'a [u8], // genesis block of the chain.
guard: Guard, // guard for the restoration directory.
}
@@ -120,7 +120,10 @@ impl Restoration {
let len = try!(snappy::decompress_into(chunk, &mut self.snappy_buffer));
try!(self.state.feed(&self.snappy_buffer[..len]));
try!(self.writer.write_state_chunk(hash, chunk));
if let Some(ref mut writer) = self.writer.as_mut() {
try!(writer.write_state_chunk(hash, chunk));
}
}
Ok(())
@@ -132,7 +135,9 @@ impl Restoration {
let len = try!(snappy::decompress_into(chunk, &mut self.snappy_buffer));
try!(self.blocks.feed(&self.snappy_buffer[..len], engine));
try!(self.writer.write_block_chunk(hash, chunk));
if let Some(ref mut writer) = self.writer.as_mut() {
try!(writer.write_block_chunk(hash, chunk));
}
}
Ok(())
@@ -157,7 +162,9 @@ impl Restoration {
// connect out-of-order chunks.
self.blocks.glue_chunks();
try!(self.writer.finish(self.manifest));
if let Some(writer) = self.writer {
try!(writer.finish(self.manifest));
}
self.guard.disarm();
Ok(())
@@ -300,6 +307,11 @@ impl Service {
Ok(())
}
/// Get a reference to the snapshot reader.
pub fn reader(&self) -> RwLockReadGuard<Option<LooseReader>> {
self.reader.read()
}
/// Tick the snapshot service. This will log any active snapshot
/// being taken.
pub fn tick(&self) {
@@ -351,11 +363,15 @@ impl Service {
}
/// Initialize the restoration synchronously.
pub fn init_restore(&self, manifest: ManifestData) -> Result<(), Error> {
/// The recover flag indicates whether to recover the restored snapshot.
pub fn init_restore(&self, manifest: ManifestData, recover: bool) -> Result<(), Error> {
let rest_dir = self.restoration_dir();
let mut res = self.restoration.lock();
self.state_chunks.store(0, Ordering::SeqCst);
self.block_chunks.store(0, Ordering::SeqCst);
// tear down existing restoration.
*res = None;
@@ -370,7 +386,10 @@ impl Service {
try!(fs::create_dir_all(&rest_dir));
// make new restoration.
let writer = try!(LooseWriter::new(self.temp_recovery_dir()));
let writer = match recover {
true => Some(try!(LooseWriter::new(self.temp_recovery_dir()))),
false => None
};
let params = RestorationParams {
manifest: manifest,
@@ -385,8 +404,8 @@ impl Service {
*res = Some(try!(Restoration::new(params)));
*self.status.lock() = RestorationStatus::Ongoing {
state_chunks_done: self.state_chunks.load(Ordering::Relaxed) as u32,
block_chunks_done: self.block_chunks.load(Ordering::Relaxed) as u32,
state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32,
block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32,
};
Ok(())
}
@@ -397,35 +416,35 @@ impl Service {
fn finalize_restoration(&self, rest: &mut Option<Restoration>) -> Result<(), Error> {
trace!(target: "snapshot", "finalizing restoration");
self.state_chunks.store(0, Ordering::SeqCst);
self.block_chunks.store(0, Ordering::SeqCst);
let recover = rest.as_ref().map_or(false, |rest| rest.writer.is_some());
// destroy the restoration before replacing databases and snapshot.
try!(rest.take().map(Restoration::finalize).unwrap_or(Ok(())));
try!(self.replace_client_db());
let mut reader = self.reader.write();
*reader = None; // destroy the old reader if it existed.
if recover {
let mut reader = self.reader.write();
*reader = None; // destroy the old reader if it existed.
let snapshot_dir = self.snapshot_dir();
let snapshot_dir = self.snapshot_dir();
trace!(target: "snapshot", "removing old snapshot dir at {}", snapshot_dir.to_string_lossy());
if let Err(e) = fs::remove_dir_all(&snapshot_dir) {
match e.kind() {
ErrorKind::NotFound => {}
_ => return Err(e.into()),
trace!(target: "snapshot", "removing old snapshot dir at {}", snapshot_dir.to_string_lossy());
if let Err(e) = fs::remove_dir_all(&snapshot_dir) {
match e.kind() {
ErrorKind::NotFound => {}
_ => return Err(e.into()),
}
}
try!(fs::create_dir(&snapshot_dir));
trace!(target: "snapshot", "copying restored snapshot files over");
try!(fs::rename(self.temp_recovery_dir(), &snapshot_dir));
*reader = Some(try!(LooseReader::new(snapshot_dir)));
}
try!(fs::create_dir(&snapshot_dir));
trace!(target: "snapshot", "copying restored snapshot files over");
try!(fs::rename(self.temp_recovery_dir(), &snapshot_dir));
let _ = fs::remove_dir_all(self.restoration_dir());
*reader = Some(try!(LooseReader::new(snapshot_dir)));
*self.status.lock() = RestorationStatus::Inactive;
Ok(())
@@ -506,7 +525,13 @@ impl SnapshotService for Service {
}
fn status(&self) -> RestorationStatus {
*self.status.lock()
let mut cur_status = self.status.lock();
if let RestorationStatus::Ongoing { ref mut state_chunks_done, ref mut block_chunks_done } = *cur_status {
*state_chunks_done = self.state_chunks.load(Ordering::SeqCst) as u32;
*block_chunks_done = self.block_chunks.load(Ordering::SeqCst) as u32;
}
cur_status.clone()
}
fn begin_restore(&self, manifest: ManifestData) {