Merge branch 'master' into split-internal-seal
This commit is contained in:
commit
d6e5637459
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -570,6 +570,7 @@ name = "ethkey"
|
|||||||
version = "0.2.0"
|
version = "0.2.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bigint 0.1.0",
|
"bigint 0.1.0",
|
||||||
|
"docopt 0.6.80 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"eth-secp256k1 0.5.4 (git+https://github.com/ethcore/rust-secp256k1)",
|
"eth-secp256k1 0.5.4 (git+https://github.com/ethcore/rust-secp256k1)",
|
||||||
"lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
"lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
|
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
@ -581,6 +582,7 @@ dependencies = [
|
|||||||
name = "ethstore"
|
name = "ethstore"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"docopt 0.6.80 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"ethcrypto 0.1.0",
|
"ethcrypto 0.1.0",
|
||||||
"ethkey 0.2.0",
|
"ethkey 0.2.0",
|
||||||
"itertools 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)",
|
"itertools 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
@ -673,6 +673,8 @@ impl Client {
|
|||||||
impl snapshot::DatabaseRestore for Client {
|
impl snapshot::DatabaseRestore for Client {
|
||||||
/// Restart the client with a new backend
|
/// Restart the client with a new backend
|
||||||
fn restore_db(&self, new_db: &str) -> Result<(), EthcoreError> {
|
fn restore_db(&self, new_db: &str) -> Result<(), EthcoreError> {
|
||||||
|
trace!(target: "snapshot", "Replacing client database with {:?}", new_db);
|
||||||
|
|
||||||
let _import_lock = self.import_lock.lock();
|
let _import_lock = self.import_lock.lock();
|
||||||
let mut state_db = self.state_db.write();
|
let mut state_db = self.state_db.write();
|
||||||
let mut chain = self.chain.write();
|
let mut chain = self.chain.write();
|
||||||
|
@ -94,7 +94,6 @@ impl ClientService {
|
|||||||
pruning: pruning,
|
pruning: pruning,
|
||||||
channel: io_service.channel(),
|
channel: io_service.channel(),
|
||||||
snapshot_root: snapshot_path.into(),
|
snapshot_root: snapshot_path.into(),
|
||||||
client_db: client_path.into(),
|
|
||||||
db_restore: client.clone(),
|
db_restore: client.clone(),
|
||||||
};
|
};
|
||||||
let snapshot = Arc::new(try!(SnapshotService::new(snapshot_params)));
|
let snapshot = Arc::new(try!(SnapshotService::new(snapshot_params)));
|
||||||
@ -187,7 +186,7 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
|
|||||||
ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(); }
|
ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(); }
|
||||||
ClientIoMessage::NewTransactions(ref transactions) => { self.client.import_queued_transactions(transactions); }
|
ClientIoMessage::NewTransactions(ref transactions) => { self.client.import_queued_transactions(transactions); }
|
||||||
ClientIoMessage::BeginRestoration(ref manifest) => {
|
ClientIoMessage::BeginRestoration(ref manifest) => {
|
||||||
if let Err(e) = self.snapshot.init_restore(manifest.clone()) {
|
if let Err(e) = self.snapshot.init_restore(manifest.clone(), true) {
|
||||||
warn!("Failed to initialize snapshot restoration: {}", e);
|
warn!("Failed to initialize snapshot restoration: {}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -35,7 +35,7 @@ use service::ClientIoMessage;
|
|||||||
|
|
||||||
use io::IoChannel;
|
use io::IoChannel;
|
||||||
|
|
||||||
use util::{Bytes, H256, Mutex, RwLock, UtilError};
|
use util::{Bytes, H256, Mutex, RwLock, RwLockReadGuard, UtilError};
|
||||||
use util::journaldb::Algorithm;
|
use util::journaldb::Algorithm;
|
||||||
use util::kvdb::{Database, DatabaseConfig};
|
use util::kvdb::{Database, DatabaseConfig};
|
||||||
use util::snappy;
|
use util::snappy;
|
||||||
@ -70,7 +70,7 @@ struct Restoration {
|
|||||||
block_chunks_left: HashSet<H256>,
|
block_chunks_left: HashSet<H256>,
|
||||||
state: StateRebuilder,
|
state: StateRebuilder,
|
||||||
blocks: BlockRebuilder,
|
blocks: BlockRebuilder,
|
||||||
writer: LooseWriter,
|
writer: Option<LooseWriter>,
|
||||||
snappy_buffer: Bytes,
|
snappy_buffer: Bytes,
|
||||||
final_state_root: H256,
|
final_state_root: H256,
|
||||||
guard: Guard,
|
guard: Guard,
|
||||||
@ -80,8 +80,8 @@ struct RestorationParams<'a> {
|
|||||||
manifest: ManifestData, // manifest to base restoration on.
|
manifest: ManifestData, // manifest to base restoration on.
|
||||||
pruning: Algorithm, // pruning algorithm for the database.
|
pruning: Algorithm, // pruning algorithm for the database.
|
||||||
db_path: PathBuf, // database path
|
db_path: PathBuf, // database path
|
||||||
db_config: &'a DatabaseConfig,
|
db_config: &'a DatabaseConfig, // configuration for the database.
|
||||||
writer: LooseWriter, // writer for recovered snapshot.
|
writer: Option<LooseWriter>, // writer for recovered snapshot.
|
||||||
genesis: &'a [u8], // genesis block of the chain.
|
genesis: &'a [u8], // genesis block of the chain.
|
||||||
guard: Guard, // guard for the restoration directory.
|
guard: Guard, // guard for the restoration directory.
|
||||||
}
|
}
|
||||||
@ -120,7 +120,10 @@ impl Restoration {
|
|||||||
let len = try!(snappy::decompress_into(chunk, &mut self.snappy_buffer));
|
let len = try!(snappy::decompress_into(chunk, &mut self.snappy_buffer));
|
||||||
|
|
||||||
try!(self.state.feed(&self.snappy_buffer[..len]));
|
try!(self.state.feed(&self.snappy_buffer[..len]));
|
||||||
try!(self.writer.write_state_chunk(hash, chunk));
|
|
||||||
|
if let Some(ref mut writer) = self.writer.as_mut() {
|
||||||
|
try!(writer.write_state_chunk(hash, chunk));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -132,7 +135,9 @@ impl Restoration {
|
|||||||
let len = try!(snappy::decompress_into(chunk, &mut self.snappy_buffer));
|
let len = try!(snappy::decompress_into(chunk, &mut self.snappy_buffer));
|
||||||
|
|
||||||
try!(self.blocks.feed(&self.snappy_buffer[..len], engine));
|
try!(self.blocks.feed(&self.snappy_buffer[..len], engine));
|
||||||
try!(self.writer.write_block_chunk(hash, chunk));
|
if let Some(ref mut writer) = self.writer.as_mut() {
|
||||||
|
try!(writer.write_block_chunk(hash, chunk));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -157,7 +162,9 @@ impl Restoration {
|
|||||||
// connect out-of-order chunks.
|
// connect out-of-order chunks.
|
||||||
self.blocks.glue_chunks();
|
self.blocks.glue_chunks();
|
||||||
|
|
||||||
try!(self.writer.finish(self.manifest));
|
if let Some(writer) = self.writer {
|
||||||
|
try!(writer.finish(self.manifest));
|
||||||
|
}
|
||||||
|
|
||||||
self.guard.disarm();
|
self.guard.disarm();
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -187,9 +194,6 @@ pub struct ServiceParams {
|
|||||||
/// The directory to put snapshots in.
|
/// The directory to put snapshots in.
|
||||||
/// Usually "<chain hash>/snapshot"
|
/// Usually "<chain hash>/snapshot"
|
||||||
pub snapshot_root: PathBuf,
|
pub snapshot_root: PathBuf,
|
||||||
/// The client's database directory.
|
|
||||||
/// Usually "<chain hash>/<pruning>/db".
|
|
||||||
pub client_db: PathBuf,
|
|
||||||
/// A handle for database restoration.
|
/// A handle for database restoration.
|
||||||
pub db_restore: Arc<DatabaseRestore>,
|
pub db_restore: Arc<DatabaseRestore>,
|
||||||
}
|
}
|
||||||
@ -198,7 +202,6 @@ pub struct ServiceParams {
|
|||||||
/// This controls taking snapshots and restoring from them.
|
/// This controls taking snapshots and restoring from them.
|
||||||
pub struct Service {
|
pub struct Service {
|
||||||
restoration: Mutex<Option<Restoration>>,
|
restoration: Mutex<Option<Restoration>>,
|
||||||
client_db: PathBuf,
|
|
||||||
snapshot_root: PathBuf,
|
snapshot_root: PathBuf,
|
||||||
db_config: DatabaseConfig,
|
db_config: DatabaseConfig,
|
||||||
io_channel: Channel,
|
io_channel: Channel,
|
||||||
@ -219,7 +222,6 @@ impl Service {
|
|||||||
pub fn new(params: ServiceParams) -> Result<Self, Error> {
|
pub fn new(params: ServiceParams) -> Result<Self, Error> {
|
||||||
let mut service = Service {
|
let mut service = Service {
|
||||||
restoration: Mutex::new(None),
|
restoration: Mutex::new(None),
|
||||||
client_db: params.client_db,
|
|
||||||
snapshot_root: params.snapshot_root,
|
snapshot_root: params.snapshot_root,
|
||||||
db_config: params.db_config,
|
db_config: params.db_config,
|
||||||
io_channel: params.channel,
|
io_channel: params.channel,
|
||||||
@ -301,11 +303,15 @@ impl Service {
|
|||||||
fn replace_client_db(&self) -> Result<(), Error> {
|
fn replace_client_db(&self) -> Result<(), Error> {
|
||||||
let our_db = self.restoration_db();
|
let our_db = self.restoration_db();
|
||||||
|
|
||||||
trace!(target: "snapshot", "replacing {:?} with {:?}", self.client_db, our_db);
|
try!(self.db_restore.restore_db(&*our_db.to_string_lossy()));
|
||||||
try!(self.db_restore.restore_db(our_db.to_str().unwrap()));
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get a reference to the snapshot reader.
|
||||||
|
pub fn reader(&self) -> RwLockReadGuard<Option<LooseReader>> {
|
||||||
|
self.reader.read()
|
||||||
|
}
|
||||||
|
|
||||||
/// Tick the snapshot service. This will log any active snapshot
|
/// Tick the snapshot service. This will log any active snapshot
|
||||||
/// being taken.
|
/// being taken.
|
||||||
pub fn tick(&self) {
|
pub fn tick(&self) {
|
||||||
@ -357,11 +363,15 @@ impl Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Initialize the restoration synchronously.
|
/// Initialize the restoration synchronously.
|
||||||
pub fn init_restore(&self, manifest: ManifestData) -> Result<(), Error> {
|
/// The recover flag indicates whether to recover the restored snapshot.
|
||||||
|
pub fn init_restore(&self, manifest: ManifestData, recover: bool) -> Result<(), Error> {
|
||||||
let rest_dir = self.restoration_dir();
|
let rest_dir = self.restoration_dir();
|
||||||
|
|
||||||
let mut res = self.restoration.lock();
|
let mut res = self.restoration.lock();
|
||||||
|
|
||||||
|
self.state_chunks.store(0, Ordering::SeqCst);
|
||||||
|
self.block_chunks.store(0, Ordering::SeqCst);
|
||||||
|
|
||||||
// tear down existing restoration.
|
// tear down existing restoration.
|
||||||
*res = None;
|
*res = None;
|
||||||
|
|
||||||
@ -376,7 +386,10 @@ impl Service {
|
|||||||
try!(fs::create_dir_all(&rest_dir));
|
try!(fs::create_dir_all(&rest_dir));
|
||||||
|
|
||||||
// make new restoration.
|
// make new restoration.
|
||||||
let writer = try!(LooseWriter::new(self.temp_recovery_dir()));
|
let writer = match recover {
|
||||||
|
true => Some(try!(LooseWriter::new(self.temp_recovery_dir()))),
|
||||||
|
false => None
|
||||||
|
};
|
||||||
|
|
||||||
let params = RestorationParams {
|
let params = RestorationParams {
|
||||||
manifest: manifest,
|
manifest: manifest,
|
||||||
@ -391,8 +404,8 @@ impl Service {
|
|||||||
*res = Some(try!(Restoration::new(params)));
|
*res = Some(try!(Restoration::new(params)));
|
||||||
|
|
||||||
*self.status.lock() = RestorationStatus::Ongoing {
|
*self.status.lock() = RestorationStatus::Ongoing {
|
||||||
state_chunks_done: self.state_chunks.load(Ordering::Relaxed) as u32,
|
state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32,
|
||||||
block_chunks_done: self.block_chunks.load(Ordering::Relaxed) as u32,
|
block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32,
|
||||||
};
|
};
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -403,35 +416,35 @@ impl Service {
|
|||||||
fn finalize_restoration(&self, rest: &mut Option<Restoration>) -> Result<(), Error> {
|
fn finalize_restoration(&self, rest: &mut Option<Restoration>) -> Result<(), Error> {
|
||||||
trace!(target: "snapshot", "finalizing restoration");
|
trace!(target: "snapshot", "finalizing restoration");
|
||||||
|
|
||||||
self.state_chunks.store(0, Ordering::SeqCst);
|
let recover = rest.as_ref().map_or(false, |rest| rest.writer.is_some());
|
||||||
self.block_chunks.store(0, Ordering::SeqCst);
|
|
||||||
|
|
||||||
// destroy the restoration before replacing databases and snapshot.
|
// destroy the restoration before replacing databases and snapshot.
|
||||||
try!(rest.take().map(Restoration::finalize).unwrap_or(Ok(())));
|
try!(rest.take().map(Restoration::finalize).unwrap_or(Ok(())));
|
||||||
try!(self.replace_client_db());
|
try!(self.replace_client_db());
|
||||||
|
|
||||||
let mut reader = self.reader.write();
|
if recover {
|
||||||
*reader = None; // destroy the old reader if it existed.
|
let mut reader = self.reader.write();
|
||||||
|
*reader = None; // destroy the old reader if it existed.
|
||||||
|
|
||||||
let snapshot_dir = self.snapshot_dir();
|
let snapshot_dir = self.snapshot_dir();
|
||||||
|
|
||||||
trace!(target: "snapshot", "removing old snapshot dir at {}", snapshot_dir.to_string_lossy());
|
trace!(target: "snapshot", "removing old snapshot dir at {}", snapshot_dir.to_string_lossy());
|
||||||
if let Err(e) = fs::remove_dir_all(&snapshot_dir) {
|
if let Err(e) = fs::remove_dir_all(&snapshot_dir) {
|
||||||
match e.kind() {
|
match e.kind() {
|
||||||
ErrorKind::NotFound => {}
|
ErrorKind::NotFound => {}
|
||||||
_ => return Err(e.into()),
|
_ => return Err(e.into()),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try!(fs::create_dir(&snapshot_dir));
|
||||||
|
|
||||||
|
trace!(target: "snapshot", "copying restored snapshot files over");
|
||||||
|
try!(fs::rename(self.temp_recovery_dir(), &snapshot_dir));
|
||||||
|
|
||||||
|
*reader = Some(try!(LooseReader::new(snapshot_dir)));
|
||||||
}
|
}
|
||||||
|
|
||||||
try!(fs::create_dir(&snapshot_dir));
|
|
||||||
|
|
||||||
trace!(target: "snapshot", "copying restored snapshot files over");
|
|
||||||
try!(fs::rename(self.temp_recovery_dir(), &snapshot_dir));
|
|
||||||
|
|
||||||
let _ = fs::remove_dir_all(self.restoration_dir());
|
let _ = fs::remove_dir_all(self.restoration_dir());
|
||||||
|
|
||||||
*reader = Some(try!(LooseReader::new(snapshot_dir)));
|
|
||||||
|
|
||||||
*self.status.lock() = RestorationStatus::Inactive;
|
*self.status.lock() = RestorationStatus::Inactive;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -512,7 +525,13 @@ impl SnapshotService for Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn status(&self) -> RestorationStatus {
|
fn status(&self) -> RestorationStatus {
|
||||||
*self.status.lock()
|
let mut cur_status = self.status.lock();
|
||||||
|
if let RestorationStatus::Ongoing { ref mut state_chunks_done, ref mut block_chunks_done } = *cur_status {
|
||||||
|
*state_chunks_done = self.state_chunks.load(Ordering::SeqCst) as u32;
|
||||||
|
*block_chunks_done = self.block_chunks.load(Ordering::SeqCst) as u32;
|
||||||
|
}
|
||||||
|
|
||||||
|
cur_status.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn begin_restore(&self, manifest: ManifestData) {
|
fn begin_restore(&self, manifest: ManifestData) {
|
||||||
@ -523,12 +542,6 @@ impl SnapshotService for Service {
|
|||||||
fn abort_restore(&self) {
|
fn abort_restore(&self) {
|
||||||
*self.restoration.lock() = None;
|
*self.restoration.lock() = None;
|
||||||
*self.status.lock() = RestorationStatus::Inactive;
|
*self.status.lock() = RestorationStatus::Inactive;
|
||||||
if let Err(e) = fs::remove_dir_all(&self.restoration_dir()) {
|
|
||||||
match e.kind() {
|
|
||||||
ErrorKind::NotFound => {},
|
|
||||||
_ => warn!("encountered error {} while deleting snapshot restoration dir.", e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn restore_state_chunk(&self, hash: H256, chunk: Bytes) {
|
fn restore_state_chunk(&self, hash: H256, chunk: Bytes) {
|
||||||
@ -585,7 +598,6 @@ mod tests {
|
|||||||
pruning: Algorithm::Archive,
|
pruning: Algorithm::Archive,
|
||||||
channel: service.channel(),
|
channel: service.channel(),
|
||||||
snapshot_root: dir,
|
snapshot_root: dir,
|
||||||
client_db: client_db,
|
|
||||||
db_restore: Arc::new(NoopDBRestore),
|
db_restore: Arc::new(NoopDBRestore),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
mod blocks;
|
mod blocks;
|
||||||
mod state;
|
mod state;
|
||||||
|
mod service;
|
||||||
|
|
||||||
pub mod helpers;
|
pub mod helpers;
|
||||||
|
|
||||||
|
143
ethcore/src/snapshot/tests/service.rs
Normal file
143
ethcore/src/snapshot/tests/service.rs
Normal file
@ -0,0 +1,143 @@
|
|||||||
|
// Copyright 2015, 2016 Ethcore (UK) Ltd.
|
||||||
|
// This file is part of Parity.
|
||||||
|
|
||||||
|
// Parity is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Parity is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
//! Tests for the snapshot service.
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use client::{BlockChainClient, Client};
|
||||||
|
use ids::BlockID;
|
||||||
|
use snapshot::service::{Service, ServiceParams};
|
||||||
|
use snapshot::{self, ManifestData, SnapshotService};
|
||||||
|
use spec::Spec;
|
||||||
|
use tests::helpers::generate_dummy_client_with_spec_and_data;
|
||||||
|
|
||||||
|
use devtools::RandomTempPath;
|
||||||
|
use io::IoChannel;
|
||||||
|
use util::kvdb::DatabaseConfig;
|
||||||
|
|
||||||
|
struct NoopDBRestore;
|
||||||
|
|
||||||
|
impl snapshot::DatabaseRestore for NoopDBRestore {
|
||||||
|
fn restore_db(&self, _new_db: &str) -> Result<(), ::error::Error> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn restored_is_equivalent() {
|
||||||
|
const NUM_BLOCKS: u32 = 400;
|
||||||
|
const TX_PER: usize = 5;
|
||||||
|
|
||||||
|
let gas_prices = vec![1.into(), 2.into(), 3.into(), 999.into()];
|
||||||
|
|
||||||
|
let client = generate_dummy_client_with_spec_and_data(Spec::new_null, NUM_BLOCKS, TX_PER, &gas_prices);
|
||||||
|
|
||||||
|
let path = RandomTempPath::create_dir();
|
||||||
|
let mut path = path.as_path().clone();
|
||||||
|
let mut client_db = path.clone();
|
||||||
|
|
||||||
|
client_db.push("client_db");
|
||||||
|
path.push("snapshot");
|
||||||
|
|
||||||
|
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
|
||||||
|
|
||||||
|
let spec = Spec::new_null();
|
||||||
|
let client2 = Client::new(
|
||||||
|
Default::default(),
|
||||||
|
&spec,
|
||||||
|
&client_db,
|
||||||
|
Arc::new(::miner::Miner::with_spec(&spec)),
|
||||||
|
IoChannel::disconnected(),
|
||||||
|
&db_config,
|
||||||
|
).unwrap();
|
||||||
|
|
||||||
|
let service_params = ServiceParams {
|
||||||
|
engine: spec.engine.clone(),
|
||||||
|
genesis_block: spec.genesis_block(),
|
||||||
|
db_config: db_config,
|
||||||
|
pruning: ::util::journaldb::Algorithm::Archive,
|
||||||
|
channel: IoChannel::disconnected(),
|
||||||
|
snapshot_root: path,
|
||||||
|
db_restore: client2.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let service = Service::new(service_params).unwrap();
|
||||||
|
service.take_snapshot(&client, NUM_BLOCKS as u64).unwrap();
|
||||||
|
|
||||||
|
let manifest = service.manifest().unwrap();
|
||||||
|
|
||||||
|
service.init_restore(manifest.clone(), true).unwrap();
|
||||||
|
assert!(service.init_restore(manifest.clone(), true).is_ok());
|
||||||
|
|
||||||
|
for hash in manifest.state_hashes {
|
||||||
|
let chunk = service.chunk(hash).unwrap();
|
||||||
|
service.feed_state_chunk(hash, &chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
for hash in manifest.block_hashes {
|
||||||
|
let chunk = service.chunk(hash).unwrap();
|
||||||
|
service.feed_block_chunk(hash, &chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(service.status(), ::snapshot::RestorationStatus::Inactive);
|
||||||
|
|
||||||
|
for x in 0..NUM_BLOCKS {
|
||||||
|
let block1 = client.block(BlockID::Number(x as u64)).unwrap();
|
||||||
|
let block2 = client2.block(BlockID::Number(x as u64)).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(block1, block2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn guards_delete_folders() {
|
||||||
|
let spec = Spec::new_null();
|
||||||
|
let path = RandomTempPath::create_dir();
|
||||||
|
let mut path = path.as_path().clone();
|
||||||
|
let service_params = ServiceParams {
|
||||||
|
engine: spec.engine.clone(),
|
||||||
|
genesis_block: spec.genesis_block(),
|
||||||
|
db_config: DatabaseConfig::with_columns(::db::NUM_COLUMNS),
|
||||||
|
pruning: ::util::journaldb::Algorithm::Archive,
|
||||||
|
channel: IoChannel::disconnected(),
|
||||||
|
snapshot_root: path.clone(),
|
||||||
|
db_restore: Arc::new(NoopDBRestore),
|
||||||
|
};
|
||||||
|
|
||||||
|
let service = Service::new(service_params).unwrap();
|
||||||
|
path.push("restoration");
|
||||||
|
|
||||||
|
let manifest = ManifestData {
|
||||||
|
state_hashes: vec![],
|
||||||
|
block_hashes: vec![],
|
||||||
|
block_number: 0,
|
||||||
|
block_hash: Default::default(),
|
||||||
|
state_root: Default::default(),
|
||||||
|
};
|
||||||
|
|
||||||
|
service.init_restore(manifest.clone(), true).unwrap();
|
||||||
|
assert!(path.exists());
|
||||||
|
|
||||||
|
service.abort_restore();
|
||||||
|
assert!(!path.exists());
|
||||||
|
|
||||||
|
service.init_restore(manifest.clone(), true).unwrap();
|
||||||
|
assert!(path.exists());
|
||||||
|
|
||||||
|
drop(service);
|
||||||
|
assert!(!path.exists());
|
||||||
|
}
|
@ -31,6 +31,7 @@ use ethcore::error::ImportError;
|
|||||||
use ethcore::miner::Miner;
|
use ethcore::miner::Miner;
|
||||||
use cache::CacheConfig;
|
use cache::CacheConfig;
|
||||||
use informant::Informant;
|
use informant::Informant;
|
||||||
|
use io_handler::ImportIoHandler;
|
||||||
use params::{SpecType, Pruning};
|
use params::{SpecType, Pruning};
|
||||||
use helpers::{to_client_config, execute_upgrades};
|
use helpers::{to_client_config, execute_upgrades};
|
||||||
use dir::Directories;
|
use dir::Directories;
|
||||||
@ -170,6 +171,10 @@ fn execute_import(cmd: ImportBlockchain) -> Result<String, String> {
|
|||||||
|
|
||||||
let informant = Informant::new(client.clone(), None, None, cmd.logger_config.color);
|
let informant = Informant::new(client.clone(), None, None, cmd.logger_config.color);
|
||||||
|
|
||||||
|
try!(service.register_io_handler(Arc::new(ImportIoHandler {
|
||||||
|
info: Arc::new(informant),
|
||||||
|
})).map_err(|_| "Unable to register informant handler".to_owned()));
|
||||||
|
|
||||||
let do_import = |bytes| {
|
let do_import = |bytes| {
|
||||||
while client.queue_info().is_full() { sleep(Duration::from_secs(1)); }
|
while client.queue_info().is_full() { sleep(Duration::from_secs(1)); }
|
||||||
match client.import_block(bytes) {
|
match client.import_block(bytes) {
|
||||||
@ -181,7 +186,6 @@ fn execute_import(cmd: ImportBlockchain) -> Result<String, String> {
|
|||||||
},
|
},
|
||||||
Ok(_) => {},
|
Ok(_) => {},
|
||||||
}
|
}
|
||||||
informant.tick();
|
|
||||||
Ok(())
|
Ok(())
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -266,10 +270,10 @@ fn execute_export(cmd: ExportBlockchain) -> Result<String, String> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let from = try!(client.block_number(cmd.from_block).ok_or("From block could not be found"));
|
let from = try!(client.block_number(cmd.from_block).ok_or("From block could not be found"));
|
||||||
let to = try!(client.block_number(cmd.to_block).ok_or("From block could not be found"));
|
let to = try!(client.block_number(cmd.to_block).ok_or("To block could not be found"));
|
||||||
|
|
||||||
for i in from..(to + 1) {
|
for i in from..(to + 1) {
|
||||||
let b = client.block(BlockID::Number(i)).unwrap();
|
let b = try!(client.block(BlockID::Number(i)).ok_or("Error exporting incomplete chain"));
|
||||||
match format {
|
match format {
|
||||||
DataFormat::Binary => { out.write(&b).expect("Couldn't write to stream."); }
|
DataFormat::Binary => { out.write(&b).expect("Couldn't write to stream."); }
|
||||||
DataFormat::Hex => { out.write_fmt(format_args!("{}", b.pretty())).expect("Couldn't write to stream."); }
|
DataFormat::Hex => { out.write_fmt(format_args!("{}", b.pretty())).expect("Couldn't write to stream."); }
|
||||||
|
@ -33,7 +33,7 @@ Usage:
|
|||||||
parity export [ <file> ] [options]
|
parity export [ <file> ] [options]
|
||||||
parity signer new-token [options]
|
parity signer new-token [options]
|
||||||
parity snapshot <file> [options]
|
parity snapshot <file> [options]
|
||||||
parity restore <file> [options]
|
parity restore [ <file> ] [options]
|
||||||
|
|
||||||
Operating Options:
|
Operating Options:
|
||||||
--mode MODE Set the operating mode. MODE can be one of:
|
--mode MODE Set the operating mode. MODE can be one of:
|
||||||
|
@ -46,3 +46,19 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct ImportIoHandler {
|
||||||
|
pub info: Arc<Informant>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IoHandler<ClientIoMessage> for ImportIoHandler {
|
||||||
|
fn initialize(&self, io: &IoContext<ClientIoMessage>) {
|
||||||
|
io.register_timer(INFO_TIMER, 5000).expect("Error registering timer");
|
||||||
|
}
|
||||||
|
|
||||||
|
fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {
|
||||||
|
if let INFO_TIMER = timer {
|
||||||
|
self.info.tick()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -21,8 +21,9 @@ use std::path::{Path, PathBuf};
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use ethcore_logger::{setup_log, Config as LogConfig};
|
use ethcore_logger::{setup_log, Config as LogConfig};
|
||||||
use ethcore::snapshot::{Progress, RestorationStatus, SnapshotService};
|
use ethcore::snapshot::{Progress, RestorationStatus, SnapshotService as SS};
|
||||||
use ethcore::snapshot::io::{SnapshotReader, PackedReader, PackedWriter};
|
use ethcore::snapshot::io::{SnapshotReader, PackedReader, PackedWriter};
|
||||||
|
use ethcore::snapshot::service::Service as SnapshotService;
|
||||||
use ethcore::service::ClientService;
|
use ethcore::service::ClientService;
|
||||||
use ethcore::client::{Mode, DatabaseCompactionProfile, Switch, VMType};
|
use ethcore::client::{Mode, DatabaseCompactionProfile, Switch, VMType};
|
||||||
use ethcore::miner::Miner;
|
use ethcore::miner::Miner;
|
||||||
@ -62,6 +63,60 @@ pub struct SnapshotCommand {
|
|||||||
pub block_at: BlockID,
|
pub block_at: BlockID,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// helper for reading chunks from arbitrary reader and feeding them into the
|
||||||
|
// service.
|
||||||
|
fn restore_using<R: SnapshotReader>(snapshot: Arc<SnapshotService>, reader: &R, recover: bool) -> Result<(), String> {
|
||||||
|
let manifest = reader.manifest();
|
||||||
|
|
||||||
|
info!("Restoring to block #{} (0x{:?})", manifest.block_number, manifest.block_hash);
|
||||||
|
|
||||||
|
try!(snapshot.init_restore(manifest.clone(), recover).map_err(|e| {
|
||||||
|
format!("Failed to begin restoration: {}", e)
|
||||||
|
}));
|
||||||
|
|
||||||
|
let (num_state, num_blocks) = (manifest.state_hashes.len(), manifest.block_hashes.len());
|
||||||
|
|
||||||
|
let informant_handle = snapshot.clone();
|
||||||
|
::std::thread::spawn(move || {
|
||||||
|
while let RestorationStatus::Ongoing { state_chunks_done, block_chunks_done } = informant_handle.status() {
|
||||||
|
info!("Processed {}/{} state chunks and {}/{} block chunks.",
|
||||||
|
state_chunks_done, num_state, block_chunks_done, num_blocks);
|
||||||
|
::std::thread::sleep(Duration::from_secs(5));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
info!("Restoring state");
|
||||||
|
for &state_hash in &manifest.state_hashes {
|
||||||
|
if snapshot.status() == RestorationStatus::Failed {
|
||||||
|
return Err("Restoration failed".into());
|
||||||
|
}
|
||||||
|
|
||||||
|
let chunk = try!(reader.chunk(state_hash)
|
||||||
|
.map_err(|e| format!("Encountered error while reading chunk {:?}: {}", state_hash, e)));
|
||||||
|
snapshot.feed_state_chunk(state_hash, &chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Restoring blocks");
|
||||||
|
for &block_hash in &manifest.block_hashes {
|
||||||
|
if snapshot.status() == RestorationStatus::Failed {
|
||||||
|
return Err("Restoration failed".into());
|
||||||
|
}
|
||||||
|
|
||||||
|
let chunk = try!(reader.chunk(block_hash)
|
||||||
|
.map_err(|e| format!("Encountered error while reading chunk {:?}: {}", block_hash, e)));
|
||||||
|
snapshot.feed_block_chunk(block_hash, &chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
match snapshot.status() {
|
||||||
|
RestorationStatus::Ongoing { .. } => Err("Snapshot file is incomplete and missing chunks.".into()),
|
||||||
|
RestorationStatus::Failed => Err("Snapshot restoration failed.".into()),
|
||||||
|
RestorationStatus::Inactive => {
|
||||||
|
info!("Restoration complete.");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl SnapshotCommand {
|
impl SnapshotCommand {
|
||||||
// shared portion of snapshot commands: start the client service
|
// shared portion of snapshot commands: start the client service
|
||||||
fn start_service(self) -> Result<(ClientService, Arc<PanicHandler>), String> {
|
fn start_service(self) -> Result<(ClientService, Arc<PanicHandler>), String> {
|
||||||
@ -106,69 +161,35 @@ impl SnapshotCommand {
|
|||||||
|
|
||||||
/// restore from a snapshot
|
/// restore from a snapshot
|
||||||
pub fn restore(self) -> Result<(), String> {
|
pub fn restore(self) -> Result<(), String> {
|
||||||
let file = try!(self.file_path.clone().ok_or("No file path provided.".to_owned()));
|
let file = self.file_path.clone();
|
||||||
let (service, _panic_handler) = try!(self.start_service());
|
let (service, _panic_handler) = try!(self.start_service());
|
||||||
|
|
||||||
warn!("Snapshot restoration is experimental and the format may be subject to change.");
|
warn!("Snapshot restoration is experimental and the format may be subject to change.");
|
||||||
warn!("On encountering an unexpected error, please ensure that you have a recent snapshot.");
|
warn!("On encountering an unexpected error, please ensure that you have a recent snapshot.");
|
||||||
|
|
||||||
let snapshot = service.snapshot_service();
|
let snapshot = service.snapshot_service();
|
||||||
let reader = PackedReader::new(Path::new(&file))
|
|
||||||
.map_err(|e| format!("Couldn't open snapshot file: {}", e))
|
|
||||||
.and_then(|x| x.ok_or("Snapshot file has invalid format.".into()));
|
|
||||||
|
|
||||||
let reader = try!(reader);
|
if let Some(file) = file {
|
||||||
let manifest = reader.manifest();
|
info!("Attempting to restore from snapshot at '{}'", file);
|
||||||
|
|
||||||
// drop the client so we don't restore while it has open DB handles.
|
let reader = PackedReader::new(Path::new(&file))
|
||||||
drop(service);
|
.map_err(|e| format!("Couldn't open snapshot file: {}", e))
|
||||||
|
.and_then(|x| x.ok_or("Snapshot file has invalid format.".into()));
|
||||||
|
|
||||||
try!(snapshot.init_restore(manifest.clone()).map_err(|e| {
|
let reader = try!(reader);
|
||||||
format!("Failed to begin restoration: {}", e)
|
try!(restore_using(snapshot, &reader, true));
|
||||||
}));
|
} else {
|
||||||
|
info!("Attempting to restore from local snapshot.");
|
||||||
|
|
||||||
let (num_state, num_blocks) = (manifest.state_hashes.len(), manifest.block_hashes.len());
|
// attempting restoration with recovery will lead to deadlock
|
||||||
|
// as we currently hold a read lock on the service's reader.
|
||||||
let informant_handle = snapshot.clone();
|
match *snapshot.reader() {
|
||||||
::std::thread::spawn(move || {
|
Some(ref reader) => try!(restore_using(snapshot.clone(), reader, false)),
|
||||||
while let RestorationStatus::Ongoing { state_chunks_done, block_chunks_done } = informant_handle.status() {
|
None => return Err("No local snapshot found.".into()),
|
||||||
info!("Processed {}/{} state chunks and {}/{} block chunks.",
|
|
||||||
state_chunks_done, num_state, block_chunks_done, num_blocks);
|
|
||||||
|
|
||||||
::std::thread::sleep(Duration::from_secs(5));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
info!("Restoring state");
|
|
||||||
for &state_hash in &manifest.state_hashes {
|
|
||||||
if snapshot.status() == RestorationStatus::Failed {
|
|
||||||
return Err("Restoration failed".into());
|
|
||||||
}
|
|
||||||
|
|
||||||
let chunk = try!(reader.chunk(state_hash)
|
|
||||||
.map_err(|e| format!("Encountered error while reading chunk {:?}: {}", state_hash, e)));
|
|
||||||
snapshot.feed_state_chunk(state_hash, &chunk);
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("Restoring blocks");
|
|
||||||
for &block_hash in &manifest.block_hashes {
|
|
||||||
if snapshot.status() == RestorationStatus::Failed {
|
|
||||||
return Err("Restoration failed".into());
|
|
||||||
}
|
|
||||||
|
|
||||||
let chunk = try!(reader.chunk(block_hash)
|
|
||||||
.map_err(|e| format!("Encountered error while reading chunk {:?}: {}", block_hash, e)));
|
|
||||||
snapshot.feed_block_chunk(block_hash, &chunk);
|
|
||||||
}
|
|
||||||
|
|
||||||
match snapshot.status() {
|
|
||||||
RestorationStatus::Ongoing { .. } => Err("Snapshot file is incomplete and missing chunks.".into()),
|
|
||||||
RestorationStatus::Failed => Err("Snapshot restoration failed.".into()),
|
|
||||||
RestorationStatus::Inactive => {
|
|
||||||
info!("Restoration complete.");
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Take a snapshot from the head of the chain.
|
/// Take a snapshot from the head of the chain.
|
||||||
|
@ -458,8 +458,6 @@ impl Database {
|
|||||||
let mut backup_db = PathBuf::from(&self.path);
|
let mut backup_db = PathBuf::from(&self.path);
|
||||||
backup_db.pop();
|
backup_db.pop();
|
||||||
backup_db.push("backup_db");
|
backup_db.push("backup_db");
|
||||||
println!("Path at {:?}", self.path);
|
|
||||||
println!("Backup at {:?}", backup_db);
|
|
||||||
|
|
||||||
let existed = match fs::rename(&self.path, &backup_db) {
|
let existed = match fs::rename(&self.path, &backup_db) {
|
||||||
Ok(_) => true,
|
Ok(_) => true,
|
||||||
|
Loading…
Reference in New Issue
Block a user