more info on current periodic snapshot
This commit is contained in:
parent
46581e173d
commit
f054a7b8d5
@ -147,16 +147,22 @@ struct ClientIoHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const CLIENT_TICK_TIMER: TimerToken = 0;
|
const CLIENT_TICK_TIMER: TimerToken = 0;
|
||||||
|
const SNAPSHOT_TICK_TIMER: TimerToken = 1;
|
||||||
|
|
||||||
const CLIENT_TICK_MS: u64 = 5000;
|
const CLIENT_TICK_MS: u64 = 5000;
|
||||||
|
const SNAPSHOT_TICK_MS: u64 = 10000;
|
||||||
|
|
||||||
impl IoHandler<ClientIoMessage> for ClientIoHandler {
|
impl IoHandler<ClientIoMessage> for ClientIoHandler {
|
||||||
fn initialize(&self, io: &IoContext<ClientIoMessage>) {
|
fn initialize(&self, io: &IoContext<ClientIoMessage>) {
|
||||||
io.register_timer(CLIENT_TICK_TIMER, CLIENT_TICK_MS).expect("Error registering client timer");
|
io.register_timer(CLIENT_TICK_TIMER, CLIENT_TICK_MS).expect("Error registering client timer");
|
||||||
|
io.register_timer(SNAPSHOT_TICK_TIMER, SNAPSHOT_TICK_MS).expect("Error registering snapshot timer");
|
||||||
}
|
}
|
||||||
|
|
||||||
fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {
|
fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {
|
||||||
if timer == CLIENT_TICK_TIMER {
|
match timer {
|
||||||
self.client.tick();
|
CLIENT_TICK_TIMER => self.client.tick(),
|
||||||
|
SNAPSHOT_TICK_TIMER => self.snapshot.tick(),
|
||||||
|
_ => warn!("IO service triggered unregistered timer '{}'", timer),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,17 +83,28 @@ pub struct Progress {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Progress {
|
impl Progress {
|
||||||
|
/// Reset the progress.
|
||||||
|
pub fn reset(&self) {
|
||||||
|
self.accounts.store(0, Ordering::Release);
|
||||||
|
self.blocks.store(0, Ordering::Release);
|
||||||
|
self.size.store(0, Ordering::Release);
|
||||||
|
|
||||||
|
// atomic fence here to ensure the others are written first?
|
||||||
|
// logs might very rarely get polluted if not.
|
||||||
|
self.done.store(false, Ordering::Release);
|
||||||
|
}
|
||||||
|
|
||||||
/// Get the number of accounts snapshotted thus far.
|
/// Get the number of accounts snapshotted thus far.
|
||||||
pub fn accounts(&self) -> usize { self.accounts.load(Ordering::Relaxed) }
|
pub fn accounts(&self) -> usize { self.accounts.load(Ordering::Acquire) }
|
||||||
|
|
||||||
/// Get the number of blocks snapshotted thus far.
|
/// Get the number of blocks snapshotted thus far.
|
||||||
pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Relaxed) }
|
pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Acquire) }
|
||||||
|
|
||||||
/// Get the written size of the snapshot in bytes.
|
/// Get the written size of the snapshot in bytes.
|
||||||
pub fn size(&self) -> usize { self.size.load(Ordering::Relaxed) }
|
pub fn size(&self) -> usize { self.size.load(Ordering::Acquire) }
|
||||||
|
|
||||||
/// Whether the snapshot is complete.
|
/// Whether the snapshot is complete.
|
||||||
pub fn done(&self) -> bool { self.done.load(Ordering::SeqCst) }
|
pub fn done(&self) -> bool { self.done.load(Ordering::Acquire) }
|
||||||
|
|
||||||
}
|
}
|
||||||
/// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer.
|
/// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer.
|
||||||
|
@ -191,6 +191,7 @@ pub struct Service {
|
|||||||
state_chunks: AtomicUsize,
|
state_chunks: AtomicUsize,
|
||||||
block_chunks: AtomicUsize,
|
block_chunks: AtomicUsize,
|
||||||
db_restore: Arc<DatabaseRestore>,
|
db_restore: Arc<DatabaseRestore>,
|
||||||
|
progress: super::Progress,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
@ -220,6 +221,7 @@ impl Service {
|
|||||||
state_chunks: AtomicUsize::new(0),
|
state_chunks: AtomicUsize::new(0),
|
||||||
block_chunks: AtomicUsize::new(0),
|
block_chunks: AtomicUsize::new(0),
|
||||||
db_restore: db_restore,
|
db_restore: db_restore,
|
||||||
|
progress: Default::default(),
|
||||||
};
|
};
|
||||||
|
|
||||||
// create the root snapshot dir if it doesn't exist.
|
// create the root snapshot dir if it doesn't exist.
|
||||||
@ -297,12 +299,22 @@ impl Service {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Tick the snapshot service. This will log any active snapshot
|
||||||
|
/// being taken.
|
||||||
|
pub fn tick(&self) {
|
||||||
|
if self.progress.done() { return }
|
||||||
|
|
||||||
|
let p = &self.progress;
|
||||||
|
info!("Snapshot: {} accounts {} blocks {} bytes", p.accounts(), p.blocks(), p.size());
|
||||||
|
}
|
||||||
|
|
||||||
/// Take a snapshot at the block with the given number.
|
/// Take a snapshot at the block with the given number.
|
||||||
/// calling this while a restoration is in progress or vice versa
|
/// calling this while a restoration is in progress or vice versa
|
||||||
/// will lead to a race condition where the first one to finish will
|
/// will lead to a race condition where the first one to finish will
|
||||||
/// have their produced snapshot overwritten.
|
/// have their produced snapshot overwritten.
|
||||||
pub fn take_snapshot(&self, client: &Client, num: u64) -> Result<(), Error> {
|
pub fn take_snapshot(&self, client: &Client, num: u64) -> Result<(), Error> {
|
||||||
info!("Taking snapshot at #{}", num);
|
info!("Taking snapshot at #{}", num);
|
||||||
|
self.progress.reset();
|
||||||
|
|
||||||
let temp_dir = self.temp_snapshot_dir();
|
let temp_dir = self.temp_snapshot_dir();
|
||||||
let snapshot_dir = self.snapshot_dir();
|
let snapshot_dir = self.snapshot_dir();
|
||||||
@ -310,11 +322,12 @@ impl Service {
|
|||||||
let _ = fs::remove_dir_all(&temp_dir);
|
let _ = fs::remove_dir_all(&temp_dir);
|
||||||
|
|
||||||
let writer = try!(LooseWriter::new(temp_dir.clone()));
|
let writer = try!(LooseWriter::new(temp_dir.clone()));
|
||||||
let progress = Default::default();
|
|
||||||
|
|
||||||
// Todo [rob] log progress.
|
|
||||||
let guard = Guard::new(temp_dir.clone());
|
let guard = Guard::new(temp_dir.clone());
|
||||||
try!(client.take_snapshot(writer, BlockID::Number(num), &progress));
|
try!(client.take_snapshot(writer, BlockID::Number(num), &self.progress));
|
||||||
|
|
||||||
|
info!("Finished taking snapshot at #{}", num);
|
||||||
|
|
||||||
let mut reader = self.reader.write();
|
let mut reader = self.reader.write();
|
||||||
|
|
||||||
// destroy the old snapshot reader.
|
// destroy the old snapshot reader.
|
||||||
|
Loading…
Reference in New Issue
Block a user