From 04dee54cb3b52342911a8ac7b0cf571f19c21ce4 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 2 Sep 2016 16:15:25 +0200 Subject: [PATCH 01/10] add take_snapshot to snapshot service --- ethcore/src/service.rs | 7 +++++ ethcore/src/snapshot/service.rs | 46 +++++++++++++++++++++++++++------ 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index e2e4772a4..de32256dd 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -46,6 +46,8 @@ pub enum ClientIoMessage { FeedStateChunk(H256, Bytes), /// Feed a block chunk to the snapshot service FeedBlockChunk(H256, Bytes), + /// Take a snapshot for the block with given number. + TakeSnapshot(u64), } /// Client service setup. Creates and registers client and network services with the IO subsystem. @@ -170,6 +172,11 @@ impl IoHandler for ClientIoHandler { } ClientIoMessage::FeedStateChunk(ref hash, ref chunk) => self.snapshot.feed_state_chunk(*hash, chunk), ClientIoMessage::FeedBlockChunk(ref hash, ref chunk) => self.snapshot.feed_block_chunk(*hash, chunk), + ClientIoMessage::TakeSnapshot(num) => { + if let Err(e) = self.snapshot.take_snapshot(&*self.client, num) { + warn!("Failed to take snapshot at block #{}: {}", num, e); + } + } _ => {} // ignore other messages } } diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index 9f2b3f34a..d2a9be046 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -27,8 +27,10 @@ use super::{ManifestData, StateRebuilder, BlockRebuilder}; use super::io::{SnapshotReader, LooseReader, SnapshotWriter, LooseWriter}; use blockchain::BlockChain; +use client::Client; use engines::Engine; use error::Error; +use ids::BlockID; use service::ClientIoMessage; use spec::Spec; @@ -269,6 +271,13 @@ impl Service { dir } + // get the temporary snapshot dir. + fn temp_snapshot_dir(&self) -> PathBuf { + let mut dir = self.root_dir(); + dir.push("in_progress"); + dir + } + // get the restoration directory. fn restoration_dir(&self) -> PathBuf { let mut dir = self.root_dir(); @@ -328,6 +337,34 @@ impl Service { } } + /// Take a snapshot at the block with the given number. + /// calling this while a restoration is in progress or vice versa + /// will lead to a race condition where the first one to finish will + /// have their produced snapshot overwritten. + pub fn take_snapshot(&self, client: &Client, num: u64) -> Result<(), Error> { + info!("Taking snapshot at #{}", num); + + let temp_dir = self.temp_snapshot_dir(); + let snapshot_dir = self.snapshot_dir(); + + let _ = fs::remove_dir_all(&temp_dir); + let writer = try!(LooseWriter::new(temp_dir.clone())); + let progress = Default::default(); + + // Todo [rob] log progress. + try!(client.take_snapshot(writer, BlockID::Number(num), &progress)); + let mut reader = self.reader.write(); + + // destroy the old snapshot reader. + *reader = None; + + try!(fs::rename(temp_dir, &snapshot_dir)); + + *reader = Some(try!(LooseReader::new(snapshot_dir))); + + Ok(()) + } + /// Initialize the restoration synchronously. pub fn init_restore(&self, manifest: ManifestData) -> Result<(), Error> { let rest_dir = self.restoration_dir(); @@ -393,14 +430,7 @@ impl Service { try!(fs::create_dir(&snapshot_dir)); trace!(target: "snapshot", "copying restored snapshot files over"); - for maybe_file in try!(fs::read_dir(self.temp_recovery_dir())) { - let path = try!(maybe_file).path(); - if let Some(name) = path.file_name().map(|x| x.to_owned()) { - let mut new_path = snapshot_dir.clone(); - new_path.push(name); - try!(fs::rename(path, new_path)); - } - } + try!(fs::rename(self.temp_recovery_dir(), &snapshot_dir)); let _ = fs::remove_dir_all(self.restoration_dir()); From e3749b3bc4f31b0ebcc7c752161bd7dd1db413d6 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 2 Sep 2016 18:28:47 +0200 Subject: [PATCH 02/10] implement snapshot watcher --- ethcore/src/client/chain_notify.rs | 3 +- ethcore/src/snapshot/mod.rs | 2 + ethcore/src/snapshot/watcher.rs | 177 +++++++++++++++++++++++++++++ 3 files changed, 181 insertions(+), 1 deletion(-) create mode 100644 ethcore/src/snapshot/watcher.rs diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs index 897c8cfac..737cd0153 100644 --- a/ethcore/src/client/chain_notify.rs +++ b/ethcore/src/client/chain_notify.rs @@ -20,7 +20,8 @@ use util::H256; /// Represents what has to be handled by actor listening to chain events #[derive(Ipc)] pub trait ChainNotify : Send + Sync { - /// fires when chain has new blocks + /// fires when chain has new blocks, not including those encountered during + /// a major sync. fn new_blocks(&self, _imported: Vec, _invalid: Vec, diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index 89e4ed8ba..979089331 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -45,6 +45,7 @@ use rand::{Rng, OsRng}; pub use self::error::Error; pub use self::service::{RestorationStatus, Service, SnapshotService}; +pub use self::watcher::Watcher; pub mod io; pub mod service; @@ -52,6 +53,7 @@ pub mod service; mod account; mod block; mod error; +mod watcher; #[cfg(test)] mod tests; diff --git a/ethcore/src/snapshot/watcher.rs b/ethcore/src/snapshot/watcher.rs new file mode 100644 index 000000000..d2dd1d7dc --- /dev/null +++ b/ethcore/src/snapshot/watcher.rs @@ -0,0 +1,177 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Watcher for snapshot-related chain events. + +use client::{BlockChainClient, Client, ChainNotify}; +use ids::BlockID; +use service::ClientIoMessage; +use views::HeaderView; + +use io::IoChannel; +use util::hash::H256; + +use std::sync::Arc; + +// helper trait for transforming hashes to numbers. +trait HashToNumber: Send + Sync { + fn to_number(&self, hash: H256) -> Option; +} + +impl HashToNumber for Client { + fn to_number(&self, hash: H256) -> Option { + self.block_header(BlockID::Hash(hash)).map(|h| HeaderView::new(&h).number()) + } +} + +/// A `ChainNotify` implementation which will trigger a snapshot event +/// at certain block numbers. +pub struct Watcher { + oracle: Arc, + channel: IoChannel, + period: u64, + history: u64, +} + +impl Watcher { + /// Create a new `Watcher` which will trigger a snapshot event + /// once every `period` blocks, but only after that block is + /// `history` blocks old. + pub fn new(client: Arc, channel: IoChannel, period: u64, history: u64) -> Self { + Watcher { + oracle: client, + channel: channel, + period: period, + history: history, + } + } +} + +impl ChainNotify for Watcher { + fn new_blocks( + &self, + imported: Vec, + _: Vec, + _: Vec, + _: Vec, + _: Vec, + _duration: u64) + { + + let highest = imported.into_iter() + .filter_map(|h| self.oracle.to_number(h)) + .filter(|&num| num >= self.period + self.history) + .map(|num| num - self.history) + .filter(|num| num % self.period == 0) + .fold(0, ::std::cmp::max); + + if highest != 0 { + if let Err(e) = self.channel.send(ClientIoMessage::TakeSnapshot(highest)) { + warn!("Snapshot watcher disconnected from IoService: {}", e); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::{HashToNumber, Watcher}; + + use client::ChainNotify; + use service::ClientIoMessage; + + use util::{H256, U256, Mutex}; + use io::{IoContext, IoHandler, IoService}; + + use std::collections::HashMap; + use std::sync::Arc; + + struct TestOracle(HashMap); + + impl HashToNumber for TestOracle { + fn to_number(&self, hash: H256) -> Option { + self.0.get(&hash).cloned() + } + } + + struct Handler(Arc>>); + + impl IoHandler for Handler { + fn message(&self, _context: &IoContext, message: &ClientIoMessage) { + match *message { + ClientIoMessage::TakeSnapshot(num) => self.0.lock().push(num), + _ => {} + } + } + } + + // helper harness for tests. + fn harness(numbers: Vec, period: u64, history: u64) -> Vec { + let events = Arc::new(Mutex::new(Vec::new())); + + let service = IoService::start().unwrap(); + service.register_handler(Arc::new(Handler(events.clone()))).unwrap(); + + let hashes: Vec<_> = numbers.clone().into_iter().map(|x| H256::from(U256::from(x))).collect(); + let mut map = hashes.clone().into_iter().zip(numbers).collect(); + + let watcher = Watcher { + oracle: Arc::new(TestOracle(map)), + channel: service.channel(), + period: period, + history: history, + }; + + watcher.new_blocks( + hashes, + vec![], + vec![], + vec![], + vec![], + 0, + ); + + drop(service); + + // binding necessary for compilation. + let v = events.lock().clone(); + v + } + + #[test] + fn should_not_fire() { + let events = harness(vec![0], 5, 0); + assert_eq!(events, vec![]); + } + + #[test] + fn fires_once_for_two() { + let events = harness(vec![14, 15], 10, 5); + assert_eq!(events, vec![10]); + } + + #[test] + fn finds_highest() { + let events = harness(vec![15, 25], 10, 5); + assert_eq!(events, vec![20]); + } + + #[test] + fn doesnt_fire_before_history() { + let events = harness(vec![10, 11], 10, 5); + assert_eq!(events, vec![]); + } +} \ No newline at end of file From 1c450f616d59b8a001e22b892e7476a016f7883f Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 2 Sep 2016 18:48:07 +0200 Subject: [PATCH 03/10] register the watcher as a ChainNotify --- parity/run.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/parity/run.rs b/parity/run.rs index 71995cd5f..2d619d350 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -28,6 +28,7 @@ use ethcore::client::{Mode, Switch, DatabaseCompactionProfile, VMType, ChainNoti use ethcore::service::ClientService; use ethcore::account_provider::AccountProvider; use ethcore::miner::{Miner, MinerService, ExternalMiner, MinerOptions}; +use ethcore::snapshot; use ethsync::SyncConfig; use informant::Informant; @@ -46,6 +47,9 @@ use rpc_apis; use rpc; use url; +const SNAPSHOT_PERIOD: u64 = 10000; +const SNAPSHOT_HISTORY: u64 = 1000; + #[derive(Debug, PartialEq)] pub struct RunCmd { pub cache_config: CacheConfig, @@ -249,6 +253,15 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> { }); service.register_io_handler(io_handler).expect("Error registering IO handler"); + let watcher = snapshot::Watcher::new( + service.client(), + service.io().channel(), + SNAPSHOT_PERIOD, + SNAPSHOT_HISTORY, + ); + + service.add_notify(Arc::new(watcher)); + // start ui if cmd.ui { if !cmd.dapps_conf.enabled { From d9eb87cae7817253d04822f2289420caabe0b8c3 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 2 Sep 2016 19:00:20 +0200 Subject: [PATCH 04/10] add guard for temporary directories --- ethcore/src/snapshot/service.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index d2a9be046..449ced23f 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -41,6 +41,23 @@ use util::journaldb::Algorithm; use util::kvdb::{Database, DatabaseConfig}; use util::snappy; +/// Helper for removing directories in case of error. +struct Guard(bool, PathBuf); + +impl Guard { + fn new(path: PathBuf) -> Self { Guard(true, path) } + + fn disarm(mut self) { self.0 = false } +} + +impl Drop for Guard { + fn drop(&mut self) { + if self.0 { + let _ = fs::remove_dir_all(&self.1); + } + } +} + /// Statuses for restorations. #[derive(PartialEq, Clone, Copy, Debug)] pub enum RestorationStatus { @@ -98,6 +115,7 @@ struct Restoration { writer: LooseWriter, snappy_buffer: Bytes, final_state_root: H256, + guard: Guard, } struct RestorationParams<'a> { @@ -106,6 +124,7 @@ struct RestorationParams<'a> { db_path: PathBuf, // database path writer: LooseWriter, // writer for recovered snapshot. genesis: &'a [u8], // genesis block of the chain. + guard: Guard, // guard for the restoration directory. } impl Restoration { @@ -133,6 +152,7 @@ impl Restoration { writer: params.writer, snappy_buffer: Vec::new(), final_state_root: root, + guard: params.guard, }) } @@ -181,6 +201,7 @@ impl Restoration { try!(self.writer.finish(self.manifest)); + self.guard.disarm(); Ok(()) } @@ -348,10 +369,12 @@ impl Service { let snapshot_dir = self.snapshot_dir(); let _ = fs::remove_dir_all(&temp_dir); + let writer = try!(LooseWriter::new(temp_dir.clone())); let progress = Default::default(); // Todo [rob] log progress. + let guard = Guard::new(temp_dir.clone()); try!(client.take_snapshot(writer, BlockID::Number(num), &progress)); let mut reader = self.reader.write(); @@ -362,6 +385,7 @@ impl Service { *reader = Some(try!(LooseReader::new(snapshot_dir))); + guard.disarm(); Ok(()) } @@ -393,6 +417,7 @@ impl Service { db_path: self.restoration_db(), writer: writer, genesis: &self.genesis_block, + guard: Guard::new(rest_dir), }; *res = Some(try!(Restoration::new(params))); From a0541738aba2792a6eaf50af563946be32fc7408 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 2 Sep 2016 20:24:59 +0200 Subject: [PATCH 05/10] disabling of periodic snapshots with the --no-periodic-snapshot flag --- parity/cli.rs | 3 +++ parity/configuration.rs | 2 ++ parity/run.rs | 20 +++++++++++++------- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/parity/cli.rs b/parity/cli.rs index 8f33489dc..91b9c1620 100644 --- a/parity/cli.rs +++ b/parity/cli.rs @@ -243,6 +243,8 @@ Snapshot Options: index, hash, or 'latest'. Note that taking snapshots at non-recent blocks will only work with --pruning archive [default: latest] + --no-periodic-snapshot Disable automated snapshots which usually occur once + every 10000 blocks. Virtual Machine Options: --jitvm Enable the JIT VM. @@ -382,6 +384,7 @@ pub struct Args { pub flag_from: String, pub flag_to: String, pub flag_at: String, + pub flag_no_periodic_snapshot: bool, pub flag_format: Option, pub flag_jitvm: bool, pub flag_log_file: Option, diff --git a/parity/configuration.rs b/parity/configuration.rs index f2fd34853..51d637580 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -226,6 +226,7 @@ impl Configuration { ui: self.args.cmd_ui, name: self.args.flag_identity, custom_bootnodes: self.args.flag_bootnodes.is_some(), + no_periodic_snapshot: self.args.flag_no_periodic_snapshot, }; Cmd::Run(run_cmd) }; @@ -802,6 +803,7 @@ mod tests { ui: false, name: "".into(), custom_bootnodes: false, + no_periodic_snapshot: false, })); } diff --git a/parity/run.rs b/parity/run.rs index 2d619d350..d384a3a7c 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -47,7 +47,10 @@ use rpc_apis; use rpc; use url; +// how often to take periodic snapshots. const SNAPSHOT_PERIOD: u64 = 10000; + +// how many blocks to wait before starting a periodic snapshot. const SNAPSHOT_HISTORY: u64 = 1000; #[derive(Debug, PartialEq)] @@ -81,6 +84,7 @@ pub struct RunCmd { pub ui: bool, pub name: String, pub custom_bootnodes: bool, + pub no_periodic_snapshot: bool, } pub fn execute(cmd: RunCmd) -> Result<(), String> { @@ -253,14 +257,16 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> { }); service.register_io_handler(io_handler).expect("Error registering IO handler"); - let watcher = snapshot::Watcher::new( - service.client(), - service.io().channel(), - SNAPSHOT_PERIOD, - SNAPSHOT_HISTORY, - ); + if !cmd.no_periodic_snapshot { + let watcher = snapshot::Watcher::new( + service.client(), + service.io().channel(), + SNAPSHOT_PERIOD, + SNAPSHOT_HISTORY, + ); - service.add_notify(Arc::new(watcher)); + service.add_notify(Arc::new(watcher)); + } // start ui if cmd.ui { From 2bf235e226a6431bf38a4e270d454527405f30f3 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 5 Sep 2016 12:17:21 +0200 Subject: [PATCH 06/10] use more mocking in tests --- ethcore/src/client/chain_notify.rs | 3 +- ethcore/src/snapshot/watcher.rs | 81 +++++++++++++++--------------- 2 files changed, 41 insertions(+), 43 deletions(-) diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs index 737cd0153..0c34382a0 100644 --- a/ethcore/src/client/chain_notify.rs +++ b/ethcore/src/client/chain_notify.rs @@ -20,8 +20,7 @@ use util::H256; /// Represents what has to be handled by actor listening to chain events #[derive(Ipc)] pub trait ChainNotify : Send + Sync { - /// fires when chain has new blocks, not including those encountered during - /// a major sync. + /// fires when chain has new blocks. fn new_blocks(&self, _imported: Vec, _invalid: Vec, diff --git a/ethcore/src/snapshot/watcher.rs b/ethcore/src/snapshot/watcher.rs index d2dd1d7dc..5bf157312 100644 --- a/ethcore/src/snapshot/watcher.rs +++ b/ethcore/src/snapshot/watcher.rs @@ -37,11 +37,29 @@ impl HashToNumber for Client { } } +// helper trait for broadcasting a block to take a snapshot at. +trait Broadcast: Send + Sync { + fn take_at(&self, num: Option); +} + +impl Broadcast for IoChannel { + fn take_at(&self, num: Option) { + let num = match num { + Some(n) => n, + None => return, + }; + + if let Err(e) = self.send(ClientIoMessage::TakeSnapshot(num)) { + warn!("Snapshot watcher disconnected from IoService: {}", e); + } + } +} + /// A `ChainNotify` implementation which will trigger a snapshot event /// at certain block numbers. pub struct Watcher { oracle: Arc, - channel: IoChannel, + broadcast: Box, period: u64, history: u64, } @@ -53,7 +71,7 @@ impl Watcher { pub fn new(client: Arc, channel: IoChannel, period: u64, history: u64) -> Self { Watcher { oracle: client, - channel: channel, + broadcast: Box::new(channel), period: period, history: history, } @@ -70,7 +88,6 @@ impl ChainNotify for Watcher { _: Vec, _duration: u64) { - let highest = imported.into_iter() .filter_map(|h| self.oracle.to_number(h)) .filter(|&num| num >= self.period + self.history) @@ -78,23 +95,20 @@ impl ChainNotify for Watcher { .filter(|num| num % self.period == 0) .fold(0, ::std::cmp::max); - if highest != 0 { - if let Err(e) = self.channel.send(ClientIoMessage::TakeSnapshot(highest)) { - warn!("Snapshot watcher disconnected from IoService: {}", e); - } + match highest { + 0 => self.broadcast.take_at(None), + _ => self.broadcast.take_at(Some(highest)), } } } #[cfg(test)] mod tests { - use super::{HashToNumber, Watcher}; + use super::{Broadcast, HashToNumber, Watcher}; use client::ChainNotify; - use service::ClientIoMessage; - use util::{H256, U256, Mutex}; - use io::{IoContext, IoHandler, IoService}; + use util::{H256, U256}; use std::collections::HashMap; use std::sync::Arc; @@ -107,30 +121,23 @@ mod tests { } } - struct Handler(Arc>>); - - impl IoHandler for Handler { - fn message(&self, _context: &IoContext, message: &ClientIoMessage) { - match *message { - ClientIoMessage::TakeSnapshot(num) => self.0.lock().push(num), - _ => {} + struct TestBroadcast(Option); + impl Broadcast for TestBroadcast { + fn take_at(&self, num: Option) { + if num != self.0 { + panic!("Watcher broadcast wrong number. Expected {:?}, found {:?}", self.0, num); } } } - // helper harness for tests. - fn harness(numbers: Vec, period: u64, history: u64) -> Vec { - let events = Arc::new(Mutex::new(Vec::new())); - - let service = IoService::start().unwrap(); - service.register_handler(Arc::new(Handler(events.clone()))).unwrap(); - + // helper harness for tests which expect a notification. + fn harness(numbers: Vec, period: u64, history: u64, expected: Option) { let hashes: Vec<_> = numbers.clone().into_iter().map(|x| H256::from(U256::from(x))).collect(); - let mut map = hashes.clone().into_iter().zip(numbers).collect(); + let map = hashes.clone().into_iter().zip(numbers).collect(); let watcher = Watcher { oracle: Arc::new(TestOracle(map)), - channel: service.channel(), + broadcast: Box::new(TestBroadcast(expected)), period: period, history: history, }; @@ -143,35 +150,27 @@ mod tests { vec![], 0, ); - - drop(service); - - // binding necessary for compilation. - let v = events.lock().clone(); - v } + // helper + #[test] fn should_not_fire() { - let events = harness(vec![0], 5, 0); - assert_eq!(events, vec![]); + harness(vec![0], 5, 0, None); } #[test] fn fires_once_for_two() { - let events = harness(vec![14, 15], 10, 5); - assert_eq!(events, vec![10]); + harness(vec![14, 15], 10, 5, Some(10)); } #[test] fn finds_highest() { - let events = harness(vec![15, 25], 10, 5); - assert_eq!(events, vec![20]); + harness(vec![15, 25], 10, 5, Some(20)); } #[test] fn doesnt_fire_before_history() { - let events = harness(vec![10, 11], 10, 5); - assert_eq!(events, vec![]); + harness(vec![10, 11], 10, 5, None); } } \ No newline at end of file From f0ef5e6943f90765a07a810263d79f37ebe3946a Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 5 Sep 2016 14:25:56 +0200 Subject: [PATCH 07/10] keep snapshot watcher alive --- ethcore/src/snapshot/service.rs | 7 +++++++ ethcore/src/snapshot/watcher.rs | 4 ++++ parity/run.rs | 16 ++++++++++++++++ 3 files changed, 27 insertions(+) diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index 449ced23f..ea5ec1a0a 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -275,6 +275,13 @@ impl Service { } } + // delete the temporary snapshot dir if it does exist. + if let Err(e) = fs::remove_dir_all(service.temp_snapshot_dir()) { + if e.kind() != ErrorKind { + return Err(e.into()) + } + } + Ok(service) } diff --git a/ethcore/src/snapshot/watcher.rs b/ethcore/src/snapshot/watcher.rs index 5bf157312..5a0c3eafc 100644 --- a/ethcore/src/snapshot/watcher.rs +++ b/ethcore/src/snapshot/watcher.rs @@ -49,6 +49,8 @@ impl Broadcast for IoChannel { None => return, }; + trace!(target: "snapshot_watcher", "broadcast: {}", num); + if let Err(e) = self.send(ClientIoMessage::TakeSnapshot(num)) { warn!("Snapshot watcher disconnected from IoService: {}", e); } @@ -88,6 +90,8 @@ impl ChainNotify for Watcher { _: Vec, _duration: u64) { + trace!(target: "snapshot_watcher", "{} imported", imported.len()); + let highest = imported.into_iter() .filter_map(|h| self.oracle.to_number(h)) .filter(|&num| num >= self.period + self.history) diff --git a/parity/run.rs b/parity/run.rs index d384a3a7c..618a9f0db 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -257,6 +257,22 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> { }); service.register_io_handler(io_handler).expect("Error registering IO handler"); + // the watcher must be kept alive. + let _watcher = match cmd.no_periodic_snapshot { + true => None, + false => { + let watcher = Arc::new(snapshot::Watcher::new( + service.client(), + service.io().channel(), + SNAPSHOT_PERIOD, + SNAPSHOT_HISTORY, + )); + + service.add_notify(watcher.clone()); + Some(watcher) + }, + }; + if !cmd.no_periodic_snapshot { let watcher = snapshot::Watcher::new( service.client(), From 09bc675e6a594e04b0761250716c5c26a30586ea Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 5 Sep 2016 22:59:34 +0200 Subject: [PATCH 08/10] address grumbles --- parity/cli.rs | 2 +- parity/run.rs | 11 ----------- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/parity/cli.rs b/parity/cli.rs index 91b9c1620..bb46bda13 100644 --- a/parity/cli.rs +++ b/parity/cli.rs @@ -384,7 +384,7 @@ pub struct Args { pub flag_from: String, pub flag_to: String, pub flag_at: String, - pub flag_no_periodic_snapshot: bool, + pub flag_no_periodic_snapshot: bool, pub flag_format: Option, pub flag_jitvm: bool, pub flag_log_file: Option, diff --git a/parity/run.rs b/parity/run.rs index 618a9f0db..7e81974c8 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -273,17 +273,6 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> { }, }; - if !cmd.no_periodic_snapshot { - let watcher = snapshot::Watcher::new( - service.client(), - service.io().channel(), - SNAPSHOT_PERIOD, - SNAPSHOT_HISTORY, - ); - - service.add_notify(Arc::new(watcher)); - } - // start ui if cmd.ui { if !cmd.dapps_conf.enabled { From 46581e173da1413bbead7a2a70c86471dbe513ce Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 6 Sep 2016 15:49:44 +0200 Subject: [PATCH 09/10] check block queue size before taking periodic snapshot --- ethcore/src/snapshot/watcher.rs | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/ethcore/src/snapshot/watcher.rs b/ethcore/src/snapshot/watcher.rs index 5a0c3eafc..8f9d3833b 100644 --- a/ethcore/src/snapshot/watcher.rs +++ b/ethcore/src/snapshot/watcher.rs @@ -26,15 +26,23 @@ use util::hash::H256; use std::sync::Arc; -// helper trait for transforming hashes to numbers. -trait HashToNumber: Send + Sync { +// helper trait for transforming hashes to numbers and checking if syncing. +trait Oracle: Send + Sync { fn to_number(&self, hash: H256) -> Option; + + fn is_major_syncing(&self) -> bool; } -impl HashToNumber for Client { +impl Oracle for Client { fn to_number(&self, hash: H256) -> Option { self.block_header(BlockID::Hash(hash)).map(|h| HeaderView::new(&h).number()) } + + fn is_major_syncing(&self) -> bool { + let queue_info = self.queue_info(); + + queue_info.unverified_queue_size + queue_info.verified_queue_size > 3 + } } // helper trait for broadcasting a block to take a snapshot at. @@ -60,7 +68,7 @@ impl Broadcast for IoChannel { /// A `ChainNotify` implementation which will trigger a snapshot event /// at certain block numbers. pub struct Watcher { - oracle: Arc, + oracle: Arc, broadcast: Box, period: u64, history: u64, @@ -90,6 +98,8 @@ impl ChainNotify for Watcher { _: Vec, _duration: u64) { + if self.oracle.is_major_syncing() { return } + trace!(target: "snapshot_watcher", "{} imported", imported.len()); let highest = imported.into_iter() @@ -108,7 +118,7 @@ impl ChainNotify for Watcher { #[cfg(test)] mod tests { - use super::{Broadcast, HashToNumber, Watcher}; + use super::{Broadcast, Oracle, Watcher}; use client::ChainNotify; @@ -119,10 +129,12 @@ mod tests { struct TestOracle(HashMap); - impl HashToNumber for TestOracle { + impl Oracle for TestOracle { fn to_number(&self, hash: H256) -> Option { self.0.get(&hash).cloned() } + + fn is_major_syncing(&self) -> bool { false } } struct TestBroadcast(Option); From f054a7b8d5d5a8320c05495674840cab8e2620c7 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 6 Sep 2016 17:44:11 +0200 Subject: [PATCH 10/10] more info on current periodic snapshot --- ethcore/src/service.rs | 10 ++++++++-- ethcore/src/snapshot/mod.rs | 19 +++++++++++++++---- ethcore/src/snapshot/service.rs | 19 ++++++++++++++++--- 3 files changed, 39 insertions(+), 9 deletions(-) diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 9981bce6e..1f377d0ae 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -147,16 +147,22 @@ struct ClientIoHandler { } const CLIENT_TICK_TIMER: TimerToken = 0; +const SNAPSHOT_TICK_TIMER: TimerToken = 1; + const CLIENT_TICK_MS: u64 = 5000; +const SNAPSHOT_TICK_MS: u64 = 10000; impl IoHandler for ClientIoHandler { fn initialize(&self, io: &IoContext) { io.register_timer(CLIENT_TICK_TIMER, CLIENT_TICK_MS).expect("Error registering client timer"); + io.register_timer(SNAPSHOT_TICK_TIMER, SNAPSHOT_TICK_MS).expect("Error registering snapshot timer"); } fn timeout(&self, _io: &IoContext, timer: TimerToken) { - if timer == CLIENT_TICK_TIMER { - self.client.tick(); + match timer { + CLIENT_TICK_TIMER => self.client.tick(), + SNAPSHOT_TICK_TIMER => self.snapshot.tick(), + _ => warn!("IO service triggered unregistered timer '{}'", timer), } } diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index be7ac2b64..43622fc51 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -83,17 +83,28 @@ pub struct Progress { } impl Progress { + /// Reset the progress. + pub fn reset(&self) { + self.accounts.store(0, Ordering::Release); + self.blocks.store(0, Ordering::Release); + self.size.store(0, Ordering::Release); + + // atomic fence here to ensure the others are written first? + // logs might very rarely get polluted if not. + self.done.store(false, Ordering::Release); + } + /// Get the number of accounts snapshotted thus far. - pub fn accounts(&self) -> usize { self.accounts.load(Ordering::Relaxed) } + pub fn accounts(&self) -> usize { self.accounts.load(Ordering::Acquire) } /// Get the number of blocks snapshotted thus far. - pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Relaxed) } + pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Acquire) } /// Get the written size of the snapshot in bytes. - pub fn size(&self) -> usize { self.size.load(Ordering::Relaxed) } + pub fn size(&self) -> usize { self.size.load(Ordering::Acquire) } /// Whether the snapshot is complete. - pub fn done(&self) -> bool { self.done.load(Ordering::SeqCst) } + pub fn done(&self) -> bool { self.done.load(Ordering::Acquire) } } /// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer. diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index 729fc851e..ce34a0def 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -191,6 +191,7 @@ pub struct Service { state_chunks: AtomicUsize, block_chunks: AtomicUsize, db_restore: Arc, + progress: super::Progress, } impl Service { @@ -220,6 +221,7 @@ impl Service { state_chunks: AtomicUsize::new(0), block_chunks: AtomicUsize::new(0), db_restore: db_restore, + progress: Default::default(), }; // create the root snapshot dir if it doesn't exist. @@ -297,12 +299,22 @@ impl Service { Ok(()) } + /// Tick the snapshot service. This will log any active snapshot + /// being taken. + pub fn tick(&self) { + if self.progress.done() { return } + + let p = &self.progress; + info!("Snapshot: {} accounts {} blocks {} bytes", p.accounts(), p.blocks(), p.size()); + } + /// Take a snapshot at the block with the given number. /// calling this while a restoration is in progress or vice versa /// will lead to a race condition where the first one to finish will /// have their produced snapshot overwritten. pub fn take_snapshot(&self, client: &Client, num: u64) -> Result<(), Error> { info!("Taking snapshot at #{}", num); + self.progress.reset(); let temp_dir = self.temp_snapshot_dir(); let snapshot_dir = self.snapshot_dir(); @@ -310,11 +322,12 @@ impl Service { let _ = fs::remove_dir_all(&temp_dir); let writer = try!(LooseWriter::new(temp_dir.clone())); - let progress = Default::default(); - // Todo [rob] log progress. let guard = Guard::new(temp_dir.clone()); - try!(client.take_snapshot(writer, BlockID::Number(num), &progress)); + try!(client.take_snapshot(writer, BlockID::Number(num), &self.progress)); + + info!("Finished taking snapshot at #{}", num); + let mut reader = self.reader.write(); // destroy the old snapshot reader.