Merge pull request #2044 from ethcore/periodic_snapshot

Periodic snapshots
This commit is contained in:
Robert Habermeier 2016-09-06 18:34:31 +02:00 committed by GitHub
commit 31cd965d66
8 changed files with 346 additions and 23 deletions

View File

@ -20,7 +20,7 @@ use util::H256;
/// Represents what has to be handled by actor listening to chain events /// Represents what has to be handled by actor listening to chain events
#[derive(Ipc)] #[derive(Ipc)]
pub trait ChainNotify : Send + Sync { pub trait ChainNotify : Send + Sync {
/// fires when chain has new blocks /// fires when chain has new blocks.
fn new_blocks(&self, fn new_blocks(&self,
_imported: Vec<H256>, _imported: Vec<H256>,
_invalid: Vec<H256>, _invalid: Vec<H256>,

View File

@ -46,6 +46,8 @@ pub enum ClientIoMessage {
FeedStateChunk(H256, Bytes), FeedStateChunk(H256, Bytes),
/// Feed a block chunk to the snapshot service /// Feed a block chunk to the snapshot service
FeedBlockChunk(H256, Bytes), FeedBlockChunk(H256, Bytes),
/// Take a snapshot for the block with given number.
TakeSnapshot(u64),
} }
/// Client service setup. Creates and registers client and network services with the IO subsystem. /// Client service setup. Creates and registers client and network services with the IO subsystem.
@ -145,16 +147,22 @@ struct ClientIoHandler {
} }
const CLIENT_TICK_TIMER: TimerToken = 0; const CLIENT_TICK_TIMER: TimerToken = 0;
const SNAPSHOT_TICK_TIMER: TimerToken = 1;
const CLIENT_TICK_MS: u64 = 5000; const CLIENT_TICK_MS: u64 = 5000;
const SNAPSHOT_TICK_MS: u64 = 10000;
impl IoHandler<ClientIoMessage> for ClientIoHandler { impl IoHandler<ClientIoMessage> for ClientIoHandler {
fn initialize(&self, io: &IoContext<ClientIoMessage>) { fn initialize(&self, io: &IoContext<ClientIoMessage>) {
io.register_timer(CLIENT_TICK_TIMER, CLIENT_TICK_MS).expect("Error registering client timer"); io.register_timer(CLIENT_TICK_TIMER, CLIENT_TICK_MS).expect("Error registering client timer");
io.register_timer(SNAPSHOT_TICK_TIMER, SNAPSHOT_TICK_MS).expect("Error registering snapshot timer");
} }
fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) { fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {
if timer == CLIENT_TICK_TIMER { match timer {
self.client.tick(); CLIENT_TICK_TIMER => self.client.tick(),
SNAPSHOT_TICK_TIMER => self.snapshot.tick(),
_ => warn!("IO service triggered unregistered timer '{}'", timer),
} }
} }
@ -170,6 +178,11 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
} }
ClientIoMessage::FeedStateChunk(ref hash, ref chunk) => self.snapshot.feed_state_chunk(*hash, chunk), ClientIoMessage::FeedStateChunk(ref hash, ref chunk) => self.snapshot.feed_state_chunk(*hash, chunk),
ClientIoMessage::FeedBlockChunk(ref hash, ref chunk) => self.snapshot.feed_block_chunk(*hash, chunk), ClientIoMessage::FeedBlockChunk(ref hash, ref chunk) => self.snapshot.feed_block_chunk(*hash, chunk),
ClientIoMessage::TakeSnapshot(num) => {
if let Err(e) = self.snapshot.take_snapshot(&*self.client, num) {
warn!("Failed to take snapshot at block #{}: {}", num, e);
}
}
_ => {} // ignore other messages _ => {} // ignore other messages
} }
} }

View File

@ -44,8 +44,10 @@ use crossbeam::{scope, ScopedJoinHandle};
use rand::{Rng, OsRng}; use rand::{Rng, OsRng};
pub use self::error::Error; pub use self::error::Error;
pub use self::service::{Service, DatabaseRestore}; pub use self::service::{Service, DatabaseRestore};
pub use self::traits::{SnapshotService, RemoteSnapshotService}; pub use self::traits::{SnapshotService, RemoteSnapshotService};
pub use self::watcher::Watcher;
pub use types::snapshot_manifest::ManifestData; pub use types::snapshot_manifest::ManifestData;
pub use types::restoration_status::RestorationStatus; pub use types::restoration_status::RestorationStatus;
@ -55,6 +57,7 @@ pub mod service;
mod account; mod account;
mod block; mod block;
mod error; mod error;
mod watcher;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
@ -80,17 +83,28 @@ pub struct Progress {
} }
impl Progress { impl Progress {
/// Reset the progress.
pub fn reset(&self) {
self.accounts.store(0, Ordering::Release);
self.blocks.store(0, Ordering::Release);
self.size.store(0, Ordering::Release);
// atomic fence here to ensure the others are written first?
// logs might very rarely get polluted if not.
self.done.store(false, Ordering::Release);
}
/// Get the number of accounts snapshotted thus far. /// Get the number of accounts snapshotted thus far.
pub fn accounts(&self) -> usize { self.accounts.load(Ordering::Relaxed) } pub fn accounts(&self) -> usize { self.accounts.load(Ordering::Acquire) }
/// Get the number of blocks snapshotted thus far. /// Get the number of blocks snapshotted thus far.
pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Relaxed) } pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Acquire) }
/// Get the written size of the snapshot in bytes. /// Get the written size of the snapshot in bytes.
pub fn size(&self) -> usize { self.size.load(Ordering::Relaxed) } pub fn size(&self) -> usize { self.size.load(Ordering::Acquire) }
/// Whether the snapshot is complete. /// Whether the snapshot is complete.
pub fn done(&self) -> bool { self.done.load(Ordering::SeqCst) } pub fn done(&self) -> bool { self.done.load(Ordering::Acquire) }
} }
/// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer. /// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer.

View File

@ -27,8 +27,10 @@ use super::{ManifestData, StateRebuilder, BlockRebuilder, RestorationStatus, Sna
use super::io::{SnapshotReader, LooseReader, SnapshotWriter, LooseWriter}; use super::io::{SnapshotReader, LooseReader, SnapshotWriter, LooseWriter};
use blockchain::BlockChain; use blockchain::BlockChain;
use client::Client;
use engines::Engine; use engines::Engine;
use error::Error; use error::Error;
use ids::BlockID;
use service::ClientIoMessage; use service::ClientIoMessage;
use spec::Spec; use spec::Spec;
@ -39,8 +41,25 @@ use util::journaldb::Algorithm;
use util::kvdb::{Database, DatabaseConfig}; use util::kvdb::{Database, DatabaseConfig};
use util::snappy; use util::snappy;
/// Helper for removing directories in case of error.
struct Guard(bool, PathBuf);
impl Guard {
fn new(path: PathBuf) -> Self { Guard(true, path) }
fn disarm(mut self) { self.0 = false }
}
impl Drop for Guard {
fn drop(&mut self) {
if self.0 {
let _ = fs::remove_dir_all(&self.1);
}
}
}
/// External database restoration handler /// External database restoration handler
pub trait DatabaseRestore : Send + Sync { pub trait DatabaseRestore: Send + Sync {
/// Restart with a new backend. Takes ownership of passed database and moves it to a new location. /// Restart with a new backend. Takes ownership of passed database and moves it to a new location.
fn restore_db(&self, new_db: &str) -> Result<(), Error>; fn restore_db(&self, new_db: &str) -> Result<(), Error>;
} }
@ -55,6 +74,7 @@ struct Restoration {
writer: LooseWriter, writer: LooseWriter,
snappy_buffer: Bytes, snappy_buffer: Bytes,
final_state_root: H256, final_state_root: H256,
guard: Guard,
} }
struct RestorationParams<'a> { struct RestorationParams<'a> {
@ -63,6 +83,7 @@ struct RestorationParams<'a> {
db_path: PathBuf, // database path db_path: PathBuf, // database path
writer: LooseWriter, // writer for recovered snapshot. writer: LooseWriter, // writer for recovered snapshot.
genesis: &'a [u8], // genesis block of the chain. genesis: &'a [u8], // genesis block of the chain.
guard: Guard, // guard for the restoration directory.
} }
impl Restoration { impl Restoration {
@ -90,6 +111,7 @@ impl Restoration {
writer: params.writer, writer: params.writer,
snappy_buffer: Vec::new(), snappy_buffer: Vec::new(),
final_state_root: root, final_state_root: root,
guard: params.guard,
}) })
} }
@ -138,6 +160,7 @@ impl Restoration {
try!(self.writer.finish(self.manifest)); try!(self.writer.finish(self.manifest));
self.guard.disarm();
Ok(()) Ok(())
} }
@ -168,6 +191,7 @@ pub struct Service {
state_chunks: AtomicUsize, state_chunks: AtomicUsize,
block_chunks: AtomicUsize, block_chunks: AtomicUsize,
db_restore: Arc<DatabaseRestore>, db_restore: Arc<DatabaseRestore>,
progress: super::Progress,
} }
impl Service { impl Service {
@ -197,6 +221,7 @@ impl Service {
state_chunks: AtomicUsize::new(0), state_chunks: AtomicUsize::new(0),
block_chunks: AtomicUsize::new(0), block_chunks: AtomicUsize::new(0),
db_restore: db_restore, db_restore: db_restore,
progress: Default::default(),
}; };
// create the root snapshot dir if it doesn't exist. // create the root snapshot dir if it doesn't exist.
@ -213,6 +238,13 @@ impl Service {
} }
} }
// delete the temporary snapshot dir if it does exist.
if let Err(e) = fs::remove_dir_all(service.temp_snapshot_dir()) {
if e.kind() != ErrorKind::NotFound {
return Err(e.into())
}
}
Ok(service) Ok(service)
} }
@ -230,6 +262,13 @@ impl Service {
dir dir
} }
// get the temporary snapshot dir.
fn temp_snapshot_dir(&self) -> PathBuf {
let mut dir = self.root_dir();
dir.push("in_progress");
dir
}
// get the restoration directory. // get the restoration directory.
fn restoration_dir(&self) -> PathBuf { fn restoration_dir(&self) -> PathBuf {
let mut dir = self.root_dir(); let mut dir = self.root_dir();
@ -260,6 +299,48 @@ impl Service {
Ok(()) Ok(())
} }
/// Tick the snapshot service. This will log any active snapshot
/// being taken.
pub fn tick(&self) {
if self.progress.done() { return }
let p = &self.progress;
info!("Snapshot: {} accounts {} blocks {} bytes", p.accounts(), p.blocks(), p.size());
}
/// Take a snapshot at the block with the given number.
/// calling this while a restoration is in progress or vice versa
/// will lead to a race condition where the first one to finish will
/// have their produced snapshot overwritten.
pub fn take_snapshot(&self, client: &Client, num: u64) -> Result<(), Error> {
info!("Taking snapshot at #{}", num);
self.progress.reset();
let temp_dir = self.temp_snapshot_dir();
let snapshot_dir = self.snapshot_dir();
let _ = fs::remove_dir_all(&temp_dir);
let writer = try!(LooseWriter::new(temp_dir.clone()));
let guard = Guard::new(temp_dir.clone());
try!(client.take_snapshot(writer, BlockID::Number(num), &self.progress));
info!("Finished taking snapshot at #{}", num);
let mut reader = self.reader.write();
// destroy the old snapshot reader.
*reader = None;
try!(fs::rename(temp_dir, &snapshot_dir));
*reader = Some(try!(LooseReader::new(snapshot_dir)));
guard.disarm();
Ok(())
}
/// Initialize the restoration synchronously. /// Initialize the restoration synchronously.
pub fn init_restore(&self, manifest: ManifestData) -> Result<(), Error> { pub fn init_restore(&self, manifest: ManifestData) -> Result<(), Error> {
let rest_dir = self.restoration_dir(); let rest_dir = self.restoration_dir();
@ -288,6 +369,7 @@ impl Service {
db_path: self.restoration_db(), db_path: self.restoration_db(),
writer: writer, writer: writer,
genesis: &self.genesis_block, genesis: &self.genesis_block,
guard: Guard::new(rest_dir),
}; };
*res = Some(try!(Restoration::new(params))); *res = Some(try!(Restoration::new(params)));
@ -328,14 +410,7 @@ impl Service {
try!(fs::create_dir(&snapshot_dir)); try!(fs::create_dir(&snapshot_dir));
trace!(target: "snapshot", "copying restored snapshot files over"); trace!(target: "snapshot", "copying restored snapshot files over");
for maybe_file in try!(fs::read_dir(self.temp_recovery_dir())) { try!(fs::rename(self.temp_recovery_dir(), &snapshot_dir));
let path = try!(maybe_file).path();
if let Some(name) = path.file_name().map(|x| x.to_owned()) {
let mut new_path = snapshot_dir.clone();
new_path.push(name);
try!(fs::rename(path, new_path));
}
}
let _ = fs::remove_dir_all(self.restoration_dir()); let _ = fs::remove_dir_all(self.restoration_dir());
@ -451,6 +526,12 @@ impl SnapshotService for Service {
} }
} }
impl Drop for Service {
fn drop(&mut self) {
self.abort_restore();
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::Arc; use std::sync::Arc;
@ -504,10 +585,4 @@ mod tests {
service.restore_state_chunk(Default::default(), vec![]); service.restore_state_chunk(Default::default(), vec![]);
service.restore_block_chunk(Default::default(), vec![]); service.restore_block_chunk(Default::default(), vec![]);
} }
} }
impl Drop for Service {
fn drop(&mut self) {
self.abort_restore();
}
}

View File

@ -0,0 +1,192 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Watcher for snapshot-related chain events.
use client::{BlockChainClient, Client, ChainNotify};
use ids::BlockID;
use service::ClientIoMessage;
use views::HeaderView;
use io::IoChannel;
use util::hash::H256;
use std::sync::Arc;
// helper trait for transforming hashes to numbers and checking if syncing.
trait Oracle: Send + Sync {
fn to_number(&self, hash: H256) -> Option<u64>;
fn is_major_syncing(&self) -> bool;
}
impl Oracle for Client {
fn to_number(&self, hash: H256) -> Option<u64> {
self.block_header(BlockID::Hash(hash)).map(|h| HeaderView::new(&h).number())
}
fn is_major_syncing(&self) -> bool {
let queue_info = self.queue_info();
queue_info.unverified_queue_size + queue_info.verified_queue_size > 3
}
}
// helper trait for broadcasting a block to take a snapshot at.
trait Broadcast: Send + Sync {
fn take_at(&self, num: Option<u64>);
}
impl Broadcast for IoChannel<ClientIoMessage> {
fn take_at(&self, num: Option<u64>) {
let num = match num {
Some(n) => n,
None => return,
};
trace!(target: "snapshot_watcher", "broadcast: {}", num);
if let Err(e) = self.send(ClientIoMessage::TakeSnapshot(num)) {
warn!("Snapshot watcher disconnected from IoService: {}", e);
}
}
}
/// A `ChainNotify` implementation which will trigger a snapshot event
/// at certain block numbers.
pub struct Watcher {
oracle: Arc<Oracle>,
broadcast: Box<Broadcast>,
period: u64,
history: u64,
}
impl Watcher {
/// Create a new `Watcher` which will trigger a snapshot event
/// once every `period` blocks, but only after that block is
/// `history` blocks old.
pub fn new(client: Arc<Client>, channel: IoChannel<ClientIoMessage>, period: u64, history: u64) -> Self {
Watcher {
oracle: client,
broadcast: Box::new(channel),
period: period,
history: history,
}
}
}
impl ChainNotify for Watcher {
fn new_blocks(
&self,
imported: Vec<H256>,
_: Vec<H256>,
_: Vec<H256>,
_: Vec<H256>,
_: Vec<H256>,
_duration: u64)
{
if self.oracle.is_major_syncing() { return }
trace!(target: "snapshot_watcher", "{} imported", imported.len());
let highest = imported.into_iter()
.filter_map(|h| self.oracle.to_number(h))
.filter(|&num| num >= self.period + self.history)
.map(|num| num - self.history)
.filter(|num| num % self.period == 0)
.fold(0, ::std::cmp::max);
match highest {
0 => self.broadcast.take_at(None),
_ => self.broadcast.take_at(Some(highest)),
}
}
}
#[cfg(test)]
mod tests {
use super::{Broadcast, Oracle, Watcher};
use client::ChainNotify;
use util::{H256, U256};
use std::collections::HashMap;
use std::sync::Arc;
struct TestOracle(HashMap<H256, u64>);
impl Oracle for TestOracle {
fn to_number(&self, hash: H256) -> Option<u64> {
self.0.get(&hash).cloned()
}
fn is_major_syncing(&self) -> bool { false }
}
struct TestBroadcast(Option<u64>);
impl Broadcast for TestBroadcast {
fn take_at(&self, num: Option<u64>) {
if num != self.0 {
panic!("Watcher broadcast wrong number. Expected {:?}, found {:?}", self.0, num);
}
}
}
// helper harness for tests which expect a notification.
fn harness(numbers: Vec<u64>, period: u64, history: u64, expected: Option<u64>) {
let hashes: Vec<_> = numbers.clone().into_iter().map(|x| H256::from(U256::from(x))).collect();
let map = hashes.clone().into_iter().zip(numbers).collect();
let watcher = Watcher {
oracle: Arc::new(TestOracle(map)),
broadcast: Box::new(TestBroadcast(expected)),
period: period,
history: history,
};
watcher.new_blocks(
hashes,
vec![],
vec![],
vec![],
vec![],
0,
);
}
// helper
#[test]
fn should_not_fire() {
harness(vec![0], 5, 0, None);
}
#[test]
fn fires_once_for_two() {
harness(vec![14, 15], 10, 5, Some(10));
}
#[test]
fn finds_highest() {
harness(vec![15, 25], 10, 5, Some(20));
}
#[test]
fn doesnt_fire_before_history() {
harness(vec![10, 11], 10, 5, None);
}
}

View File

@ -243,6 +243,8 @@ Snapshot Options:
index, hash, or 'latest'. Note that taking snapshots at index, hash, or 'latest'. Note that taking snapshots at
non-recent blocks will only work with --pruning archive non-recent blocks will only work with --pruning archive
[default: latest] [default: latest]
--no-periodic-snapshot Disable automated snapshots which usually occur once
every 10000 blocks.
Virtual Machine Options: Virtual Machine Options:
--jitvm Enable the JIT VM. --jitvm Enable the JIT VM.
@ -382,6 +384,7 @@ pub struct Args {
pub flag_from: String, pub flag_from: String,
pub flag_to: String, pub flag_to: String,
pub flag_at: String, pub flag_at: String,
pub flag_no_periodic_snapshot: bool,
pub flag_format: Option<String>, pub flag_format: Option<String>,
pub flag_jitvm: bool, pub flag_jitvm: bool,
pub flag_log_file: Option<String>, pub flag_log_file: Option<String>,

View File

@ -226,6 +226,7 @@ impl Configuration {
ui: self.args.cmd_ui, ui: self.args.cmd_ui,
name: self.args.flag_identity, name: self.args.flag_identity,
custom_bootnodes: self.args.flag_bootnodes.is_some(), custom_bootnodes: self.args.flag_bootnodes.is_some(),
no_periodic_snapshot: self.args.flag_no_periodic_snapshot,
}; };
Cmd::Run(run_cmd) Cmd::Run(run_cmd)
}; };
@ -802,6 +803,7 @@ mod tests {
ui: false, ui: false,
name: "".into(), name: "".into(),
custom_bootnodes: false, custom_bootnodes: false,
no_periodic_snapshot: false,
})); }));
} }

View File

@ -28,6 +28,7 @@ use ethcore::client::{Mode, Switch, DatabaseCompactionProfile, VMType, ChainNoti
use ethcore::service::ClientService; use ethcore::service::ClientService;
use ethcore::account_provider::AccountProvider; use ethcore::account_provider::AccountProvider;
use ethcore::miner::{Miner, MinerService, ExternalMiner, MinerOptions}; use ethcore::miner::{Miner, MinerService, ExternalMiner, MinerOptions};
use ethcore::snapshot;
use ethsync::SyncConfig; use ethsync::SyncConfig;
use informant::Informant; use informant::Informant;
@ -46,6 +47,12 @@ use rpc_apis;
use rpc; use rpc;
use url; use url;
// how often to take periodic snapshots.
const SNAPSHOT_PERIOD: u64 = 10000;
// how many blocks to wait before starting a periodic snapshot.
const SNAPSHOT_HISTORY: u64 = 1000;
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub struct RunCmd { pub struct RunCmd {
pub cache_config: CacheConfig, pub cache_config: CacheConfig,
@ -77,6 +84,7 @@ pub struct RunCmd {
pub ui: bool, pub ui: bool,
pub name: String, pub name: String,
pub custom_bootnodes: bool, pub custom_bootnodes: bool,
pub no_periodic_snapshot: bool,
} }
pub fn execute(cmd: RunCmd) -> Result<(), String> { pub fn execute(cmd: RunCmd) -> Result<(), String> {
@ -251,6 +259,22 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
}); });
service.register_io_handler(io_handler).expect("Error registering IO handler"); service.register_io_handler(io_handler).expect("Error registering IO handler");
// the watcher must be kept alive.
let _watcher = match cmd.no_periodic_snapshot {
true => None,
false => {
let watcher = Arc::new(snapshot::Watcher::new(
service.client(),
service.io().channel(),
SNAPSHOT_PERIOD,
SNAPSHOT_HISTORY,
));
service.add_notify(watcher.clone());
Some(watcher)
},
};
// start ui // start ui
if cmd.ui { if cmd.ui {
if !cmd.dapps_conf.enabled { if !cmd.dapps_conf.enabled {