Merge branch 'master' into config-files
This commit is contained in:
@@ -19,7 +19,6 @@ use std::{io, fs};
|
||||
use std::io::{BufReader, BufRead};
|
||||
use std::time::Duration;
|
||||
use std::thread::sleep;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use rustc_serialize::hex::FromHex;
|
||||
use ethcore_logger::{setup_log, Config as LogConfig};
|
||||
@@ -32,6 +31,7 @@ use ethcore::error::ImportError;
|
||||
use ethcore::miner::Miner;
|
||||
use cache::CacheConfig;
|
||||
use informant::Informant;
|
||||
use io_handler::ImportIoHandler;
|
||||
use params::{SpecType, Pruning};
|
||||
use helpers::{to_client_config, execute_upgrades};
|
||||
use dir::Directories;
|
||||
@@ -125,8 +125,9 @@ fn execute_import(cmd: ImportBlockchain) -> Result<String, String> {
|
||||
// select pruning algorithm
|
||||
let algorithm = cmd.pruning.to_algorithm(&cmd.dirs, genesis_hash, spec.fork_name.as_ref());
|
||||
|
||||
// prepare client_path
|
||||
// prepare client and snapshot paths.
|
||||
let client_path = cmd.dirs.client_path(genesis_hash, spec.fork_name.as_ref(), algorithm);
|
||||
let snapshot_path = cmd.dirs.snapshot_path(genesis_hash, spec.fork_name.as_ref());
|
||||
|
||||
// execute upgrades
|
||||
try!(execute_upgrades(&cmd.dirs, genesis_hash, spec.fork_name.as_ref(), algorithm, cmd.compaction.compaction_profile()));
|
||||
@@ -138,8 +139,9 @@ fn execute_import(cmd: ImportBlockchain) -> Result<String, String> {
|
||||
let service = try!(ClientService::start(
|
||||
client_config,
|
||||
&spec,
|
||||
Path::new(&client_path),
|
||||
Path::new(&cmd.dirs.ipc_path()),
|
||||
&client_path,
|
||||
&snapshot_path,
|
||||
&cmd.dirs.ipc_path(),
|
||||
Arc::new(Miner::with_spec(&spec)),
|
||||
).map_err(|e| format!("Client service error: {:?}", e)));
|
||||
|
||||
@@ -169,6 +171,10 @@ fn execute_import(cmd: ImportBlockchain) -> Result<String, String> {
|
||||
|
||||
let informant = Informant::new(client.clone(), None, None, cmd.logger_config.color);
|
||||
|
||||
try!(service.register_io_handler(Arc::new(ImportIoHandler {
|
||||
info: Arc::new(informant),
|
||||
})).map_err(|_| "Unable to register informant handler".to_owned()));
|
||||
|
||||
let do_import = |bytes| {
|
||||
while client.queue_info().is_full() { sleep(Duration::from_secs(1)); }
|
||||
match client.import_block(bytes) {
|
||||
@@ -180,7 +186,6 @@ fn execute_import(cmd: ImportBlockchain) -> Result<String, String> {
|
||||
},
|
||||
Ok(_) => {},
|
||||
}
|
||||
informant.tick();
|
||||
Ok(())
|
||||
};
|
||||
|
||||
@@ -237,8 +242,9 @@ fn execute_export(cmd: ExportBlockchain) -> Result<String, String> {
|
||||
// select pruning algorithm
|
||||
let algorithm = cmd.pruning.to_algorithm(&cmd.dirs, genesis_hash, spec.fork_name.as_ref());
|
||||
|
||||
// prepare client_path
|
||||
// prepare client and snapshot paths.
|
||||
let client_path = cmd.dirs.client_path(genesis_hash, spec.fork_name.as_ref(), algorithm);
|
||||
let snapshot_path = cmd.dirs.snapshot_path(genesis_hash, spec.fork_name.as_ref());
|
||||
|
||||
// execute upgrades
|
||||
try!(execute_upgrades(&cmd.dirs, genesis_hash, spec.fork_name.as_ref(), algorithm, cmd.compaction.compaction_profile()));
|
||||
@@ -249,8 +255,9 @@ fn execute_export(cmd: ExportBlockchain) -> Result<String, String> {
|
||||
let service = try!(ClientService::start(
|
||||
client_config,
|
||||
&spec,
|
||||
Path::new(&client_path),
|
||||
Path::new(&cmd.dirs.ipc_path()),
|
||||
&client_path,
|
||||
&snapshot_path,
|
||||
&cmd.dirs.ipc_path(),
|
||||
Arc::new(Miner::with_spec(&spec)),
|
||||
).map_err(|e| format!("Client service error: {:?}", e)));
|
||||
|
||||
@@ -263,10 +270,10 @@ fn execute_export(cmd: ExportBlockchain) -> Result<String, String> {
|
||||
};
|
||||
|
||||
let from = try!(client.block_number(cmd.from_block).ok_or("From block could not be found"));
|
||||
let to = try!(client.block_number(cmd.to_block).ok_or("From block could not be found"));
|
||||
let to = try!(client.block_number(cmd.to_block).ok_or("To block could not be found"));
|
||||
|
||||
for i in from..(to + 1) {
|
||||
let b = client.block(BlockID::Number(i)).unwrap();
|
||||
let b = try!(client.block(BlockID::Number(i)).ok_or("Error exporting incomplete chain"));
|
||||
match format {
|
||||
DataFormat::Binary => { out.write(&b).expect("Couldn't write to stream."); }
|
||||
DataFormat::Hex => { out.write_fmt(format_args!("{}", b.pretty())).expect("Couldn't write to stream."); }
|
||||
|
||||
@@ -63,7 +63,7 @@ pub fn payload<B: ipc::BinaryConvertable>() -> Result<B, BootError> {
|
||||
}
|
||||
|
||||
pub fn register(hv_url: &str, control_url: &str, module_id: IpcModuleId) -> GuardedSocket<HypervisorServiceClient<NanoSocket>>{
|
||||
let hypervisor_client = nanoipc::init_client::<HypervisorServiceClient<_>>(hv_url).unwrap();
|
||||
let hypervisor_client = nanoipc::fast_client::<HypervisorServiceClient<_>>(hv_url).unwrap();
|
||||
hypervisor_client.handshake().unwrap();
|
||||
hypervisor_client.module_ready(module_id, control_url.to_owned());
|
||||
|
||||
@@ -73,7 +73,7 @@ pub fn register(hv_url: &str, control_url: &str, module_id: IpcModuleId) -> Guar
|
||||
pub fn dependency<C: WithSocket<NanoSocket>>(url: &str)
|
||||
-> Result<GuardedSocket<C>, BootError>
|
||||
{
|
||||
nanoipc::init_client::<C>(url).map_err(|socket_err| BootError::DependencyConnect(socket_err))
|
||||
nanoipc::generic_client::<C>(url).map_err(|socket_err| BootError::DependencyConnect(socket_err))
|
||||
}
|
||||
|
||||
pub fn main_thread() -> Arc<AtomicBool> {
|
||||
|
||||
@@ -13,7 +13,7 @@ Usage:
|
||||
parity export [ <file> ] [options]
|
||||
parity signer new-token [options]
|
||||
parity snapshot <file> [options]
|
||||
parity restore <file> [options]
|
||||
parity restore [ <file> ] [options]
|
||||
|
||||
Operating Options:
|
||||
--mode MODE Set the operating mode. MODE can be one of:
|
||||
|
||||
@@ -52,10 +52,16 @@ impl Directories {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get the root path for database
|
||||
pub fn db_version_path(&self, genesis_hash: H256, fork_name: Option<&String>, pruning: Algorithm) -> PathBuf {
|
||||
/// Get the chain's root path.
|
||||
pub fn chain_path(&self, genesis_hash: H256, fork_name: Option<&String>) -> PathBuf {
|
||||
let mut dir = Path::new(&self.db).to_path_buf();
|
||||
dir.push(format!("{:?}{}", H64::from(genesis_hash), fork_name.map(|f| format!("-{}", f)).unwrap_or_default()));
|
||||
dir
|
||||
}
|
||||
|
||||
/// Get the root path for database
|
||||
pub fn db_version_path(&self, genesis_hash: H256, fork_name: Option<&String>, pruning: Algorithm) -> PathBuf {
|
||||
let mut dir = self.chain_path(genesis_hash, fork_name);
|
||||
dir.push(format!("v{}-sec-{}", LEGACY_CLIENT_DB_VER_STR, pruning.as_internal_name_str()));
|
||||
dir
|
||||
}
|
||||
@@ -67,6 +73,13 @@ impl Directories {
|
||||
dir
|
||||
}
|
||||
|
||||
/// Get the path for the snapshot directory given the genesis hash and fork name.
|
||||
pub fn snapshot_path(&self, genesis_hash: H256, fork_name: Option<&String>) -> PathBuf {
|
||||
let mut dir = self.chain_path(genesis_hash, fork_name);
|
||||
dir.push("snapshot");
|
||||
dir
|
||||
}
|
||||
|
||||
/// Get the ipc sockets path
|
||||
pub fn ipc_path(&self) -> PathBuf {
|
||||
let mut dir = Path::new(&self.db).to_path_buf();
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use ethcore::client::Client;
|
||||
use ethcore::service::ClientIoMessage;
|
||||
use ethsync::{SyncProvider, ManageNetwork};
|
||||
@@ -31,6 +32,7 @@ pub struct ClientIoHandler {
|
||||
pub net: Arc<ManageNetwork>,
|
||||
pub accounts: Arc<AccountProvider>,
|
||||
pub info: Arc<Informant>,
|
||||
pub shutdown: Arc<AtomicBool>
|
||||
}
|
||||
|
||||
impl IoHandler<ClientIoMessage> for ClientIoHandler {
|
||||
@@ -39,8 +41,24 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
|
||||
}
|
||||
|
||||
fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {
|
||||
if let INFO_TIMER = timer {
|
||||
if timer == INFO_TIMER && !self.shutdown.load(Ordering::SeqCst) {
|
||||
self.info.tick();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ImportIoHandler {
|
||||
pub info: Arc<Informant>,
|
||||
}
|
||||
|
||||
impl IoHandler<ClientIoMessage> for ImportIoHandler {
|
||||
fn initialize(&self, io: &IoContext<ClientIoMessage>) {
|
||||
io.register_timer(INFO_TIMER, 5000).expect("Error registering timer");
|
||||
}
|
||||
|
||||
fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {
|
||||
if let INFO_TIMER = timer {
|
||||
self.info.tick()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -71,7 +71,7 @@ mod ipc_deps {
|
||||
pub use ethsync::{SyncClient, NetworkManagerClient, ServiceConfiguration};
|
||||
pub use ethcore::client::ChainNotifyClient;
|
||||
pub use hypervisor::{SYNC_MODULE_ID, BootArgs, HYPERVISOR_IPC_URL};
|
||||
pub use nanoipc::{GuardedSocket, NanoSocket, init_client};
|
||||
pub use nanoipc::{GuardedSocket, NanoSocket, generic_client, fast_client};
|
||||
pub use ipc::IpcSocket;
|
||||
pub use ipc::binary::serialize;
|
||||
}
|
||||
@@ -134,11 +134,11 @@ pub fn sync
|
||||
hypervisor.start();
|
||||
hypervisor.wait_for_startup();
|
||||
|
||||
let sync_client = init_client::<SyncClient<_>>(
|
||||
let sync_client = generic_client::<SyncClient<_>>(
|
||||
&service_urls::with_base(&hypervisor.io_path, service_urls::SYNC)).unwrap();
|
||||
let notify_client = init_client::<ChainNotifyClient<_>>(
|
||||
let notify_client = generic_client::<ChainNotifyClient<_>>(
|
||||
&service_urls::with_base(&hypervisor.io_path, service_urls::SYNC_NOTIFY)).unwrap();
|
||||
let manage_client = init_client::<NetworkManagerClient<_>>(
|
||||
let manage_client = generic_client::<NetworkManagerClient<_>>(
|
||||
&service_urls::with_base(&hypervisor.io_path, service_urls::NETWORK_MANAGER)).unwrap();
|
||||
|
||||
*hypervisor_ref = Some(hypervisor);
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
use std::str::FromStr;
|
||||
use std::fs;
|
||||
use std::time::Duration;
|
||||
use util::{contents, H256, Address, U256, version_data};
|
||||
use util::{H256, Address, U256, version_data};
|
||||
use util::journaldb::Algorithm;
|
||||
use ethcore::spec::Spec;
|
||||
use ethcore::ethereum;
|
||||
@@ -61,7 +61,10 @@ impl SpecType {
|
||||
SpecType::Testnet => Ok(ethereum::new_morden()),
|
||||
SpecType::Olympic => Ok(ethereum::new_olympic()),
|
||||
SpecType::Classic => Ok(ethereum::new_classic()),
|
||||
SpecType::Custom(ref file) => Ok(Spec::load(&try!(contents(file).map_err(|_| "Could not load specification file."))))
|
||||
SpecType::Custom(ref filename) => {
|
||||
let file = try!(fs::File::open(filename).map_err(|_| "Could not load specification file."));
|
||||
Spec::load(file)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::sync::{Arc, Mutex, Condvar};
|
||||
use std::path::Path;
|
||||
use std::io::ErrorKind;
|
||||
use ctrlc::CtrlC;
|
||||
use fdlimit::raise_fd_limit;
|
||||
@@ -29,7 +28,7 @@ use ethcore::service::ClientService;
|
||||
use ethcore::account_provider::AccountProvider;
|
||||
use ethcore::miner::{Miner, MinerService, ExternalMiner, MinerOptions};
|
||||
use ethcore::snapshot;
|
||||
use ethsync::SyncConfig;
|
||||
use ethsync::{SyncConfig, SyncProvider};
|
||||
use informant::Informant;
|
||||
|
||||
use rpc::{HttpServer, IpcServer, HttpConfiguration, IpcConfiguration};
|
||||
@@ -51,7 +50,7 @@ use url;
|
||||
const SNAPSHOT_PERIOD: u64 = 10000;
|
||||
|
||||
// how many blocks to wait before starting a periodic snapshot.
|
||||
const SNAPSHOT_HISTORY: u64 = 1000;
|
||||
const SNAPSHOT_HISTORY: u64 = 500;
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct RunCmd {
|
||||
@@ -110,8 +109,9 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
|
||||
// select pruning algorithm
|
||||
let algorithm = cmd.pruning.to_algorithm(&cmd.dirs, genesis_hash, fork_name.as_ref());
|
||||
|
||||
// prepare client_path
|
||||
// prepare client and snapshot paths.
|
||||
let client_path = cmd.dirs.client_path(genesis_hash, fork_name.as_ref(), algorithm);
|
||||
let snapshot_path = cmd.dirs.snapshot_path(genesis_hash, fork_name.as_ref());
|
||||
|
||||
// execute upgrades
|
||||
try!(execute_upgrades(&cmd.dirs, genesis_hash, fork_name.as_ref(), algorithm, cmd.compaction.compaction_profile()));
|
||||
@@ -171,14 +171,15 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
|
||||
}
|
||||
|
||||
// create supervisor
|
||||
let mut hypervisor = modules::hypervisor(Path::new(&cmd.dirs.ipc_path()));
|
||||
let mut hypervisor = modules::hypervisor(&cmd.dirs.ipc_path());
|
||||
|
||||
// create client service.
|
||||
let service = try!(ClientService::start(
|
||||
client_config,
|
||||
&spec,
|
||||
Path::new(&client_path),
|
||||
Path::new(&cmd.dirs.ipc_path()),
|
||||
&client_path,
|
||||
&snapshot_path,
|
||||
&cmd.dirs.ipc_path(),
|
||||
miner.clone(),
|
||||
).map_err(|e| format!("Client service error: {:?}", e)));
|
||||
|
||||
@@ -256,15 +257,18 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
|
||||
sync: sync_provider.clone(),
|
||||
net: manage_network.clone(),
|
||||
accounts: account_provider.clone(),
|
||||
shutdown: Default::default(),
|
||||
});
|
||||
service.register_io_handler(io_handler).expect("Error registering IO handler");
|
||||
service.register_io_handler(io_handler.clone()).expect("Error registering IO handler");
|
||||
|
||||
// the watcher must be kept alive.
|
||||
let _watcher = match cmd.no_periodic_snapshot {
|
||||
true => None,
|
||||
false => {
|
||||
let sync = sync_provider.clone();
|
||||
let watcher = Arc::new(snapshot::Watcher::new(
|
||||
service.client(),
|
||||
move || sync.status().is_major_syncing(),
|
||||
service.io().channel(),
|
||||
SNAPSHOT_PERIOD,
|
||||
SNAPSHOT_HISTORY,
|
||||
@@ -286,6 +290,11 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
|
||||
// Handle exit
|
||||
wait_for_exit(panic_handler, http_server, ipc_server, dapps_server, signer_server);
|
||||
|
||||
// to make sure timer does not spawn requests while shutdown is in progress
|
||||
io_handler.shutdown.store(true, ::std::sync::atomic::Ordering::SeqCst);
|
||||
// just Arc is dropping here, to allow other reference release in its default time
|
||||
drop(io_handler);
|
||||
|
||||
// hypervisor should be shutdown first while everything still works and can be
|
||||
// terminated gracefully
|
||||
drop(hypervisor);
|
||||
|
||||
@@ -21,8 +21,9 @@ use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use ethcore_logger::{setup_log, Config as LogConfig};
|
||||
use ethcore::snapshot::{Progress, RestorationStatus, SnapshotService};
|
||||
use ethcore::snapshot::{Progress, RestorationStatus, SnapshotService as SS};
|
||||
use ethcore::snapshot::io::{SnapshotReader, PackedReader, PackedWriter};
|
||||
use ethcore::snapshot::service::Service as SnapshotService;
|
||||
use ethcore::service::ClientService;
|
||||
use ethcore::client::{Mode, DatabaseCompactionProfile, Switch, VMType};
|
||||
use ethcore::miner::Miner;
|
||||
@@ -62,6 +63,60 @@ pub struct SnapshotCommand {
|
||||
pub block_at: BlockID,
|
||||
}
|
||||
|
||||
// helper for reading chunks from arbitrary reader and feeding them into the
|
||||
// service.
|
||||
fn restore_using<R: SnapshotReader>(snapshot: Arc<SnapshotService>, reader: &R, recover: bool) -> Result<(), String> {
|
||||
let manifest = reader.manifest();
|
||||
|
||||
info!("Restoring to block #{} (0x{:?})", manifest.block_number, manifest.block_hash);
|
||||
|
||||
try!(snapshot.init_restore(manifest.clone(), recover).map_err(|e| {
|
||||
format!("Failed to begin restoration: {}", e)
|
||||
}));
|
||||
|
||||
let (num_state, num_blocks) = (manifest.state_hashes.len(), manifest.block_hashes.len());
|
||||
|
||||
let informant_handle = snapshot.clone();
|
||||
::std::thread::spawn(move || {
|
||||
while let RestorationStatus::Ongoing { state_chunks_done, block_chunks_done } = informant_handle.status() {
|
||||
info!("Processed {}/{} state chunks and {}/{} block chunks.",
|
||||
state_chunks_done, num_state, block_chunks_done, num_blocks);
|
||||
::std::thread::sleep(Duration::from_secs(5));
|
||||
}
|
||||
});
|
||||
|
||||
info!("Restoring state");
|
||||
for &state_hash in &manifest.state_hashes {
|
||||
if snapshot.status() == RestorationStatus::Failed {
|
||||
return Err("Restoration failed".into());
|
||||
}
|
||||
|
||||
let chunk = try!(reader.chunk(state_hash)
|
||||
.map_err(|e| format!("Encountered error while reading chunk {:?}: {}", state_hash, e)));
|
||||
snapshot.feed_state_chunk(state_hash, &chunk);
|
||||
}
|
||||
|
||||
info!("Restoring blocks");
|
||||
for &block_hash in &manifest.block_hashes {
|
||||
if snapshot.status() == RestorationStatus::Failed {
|
||||
return Err("Restoration failed".into());
|
||||
}
|
||||
|
||||
let chunk = try!(reader.chunk(block_hash)
|
||||
.map_err(|e| format!("Encountered error while reading chunk {:?}: {}", block_hash, e)));
|
||||
snapshot.feed_block_chunk(block_hash, &chunk);
|
||||
}
|
||||
|
||||
match snapshot.status() {
|
||||
RestorationStatus::Ongoing { .. } => Err("Snapshot file is incomplete and missing chunks.".into()),
|
||||
RestorationStatus::Failed => Err("Snapshot restoration failed.".into()),
|
||||
RestorationStatus::Inactive => {
|
||||
info!("Restoration complete.");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SnapshotCommand {
|
||||
// shared portion of snapshot commands: start the client service
|
||||
fn start_service(self) -> Result<(ClientService, Arc<PanicHandler>), String> {
|
||||
@@ -82,8 +137,9 @@ impl SnapshotCommand {
|
||||
// select pruning algorithm
|
||||
let algorithm = self.pruning.to_algorithm(&self.dirs, genesis_hash, spec.fork_name.as_ref());
|
||||
|
||||
// prepare client_path
|
||||
// prepare client and snapshot paths.
|
||||
let client_path = self.dirs.client_path(genesis_hash, spec.fork_name.as_ref(), algorithm);
|
||||
let snapshot_path = self.dirs.snapshot_path(genesis_hash, spec.fork_name.as_ref());
|
||||
|
||||
// execute upgrades
|
||||
try!(execute_upgrades(&self.dirs, genesis_hash, spec.fork_name.as_ref(), algorithm, self.compaction.compaction_profile()));
|
||||
@@ -94,8 +150,9 @@ impl SnapshotCommand {
|
||||
let service = try!(ClientService::start(
|
||||
client_config,
|
||||
&spec,
|
||||
Path::new(&client_path),
|
||||
Path::new(&self.dirs.ipc_path()),
|
||||
&client_path,
|
||||
&snapshot_path,
|
||||
&self.dirs.ipc_path(),
|
||||
Arc::new(Miner::with_spec(&spec))
|
||||
).map_err(|e| format!("Client service error: {:?}", e)));
|
||||
|
||||
@@ -104,69 +161,35 @@ impl SnapshotCommand {
|
||||
|
||||
/// restore from a snapshot
|
||||
pub fn restore(self) -> Result<(), String> {
|
||||
let file = try!(self.file_path.clone().ok_or("No file path provided.".to_owned()));
|
||||
let file = self.file_path.clone();
|
||||
let (service, _panic_handler) = try!(self.start_service());
|
||||
|
||||
warn!("Snapshot restoration is experimental and the format may be subject to change.");
|
||||
warn!("On encountering an unexpected error, please ensure that you have a recent snapshot.");
|
||||
|
||||
let snapshot = service.snapshot_service();
|
||||
let reader = PackedReader::new(Path::new(&file))
|
||||
.map_err(|e| format!("Couldn't open snapshot file: {}", e))
|
||||
.and_then(|x| x.ok_or("Snapshot file has invalid format.".into()));
|
||||
|
||||
let reader = try!(reader);
|
||||
let manifest = reader.manifest();
|
||||
if let Some(file) = file {
|
||||
info!("Attempting to restore from snapshot at '{}'", file);
|
||||
|
||||
// drop the client so we don't restore while it has open DB handles.
|
||||
drop(service);
|
||||
let reader = PackedReader::new(Path::new(&file))
|
||||
.map_err(|e| format!("Couldn't open snapshot file: {}", e))
|
||||
.and_then(|x| x.ok_or("Snapshot file has invalid format.".into()));
|
||||
|
||||
try!(snapshot.init_restore(manifest.clone()).map_err(|e| {
|
||||
format!("Failed to begin restoration: {}", e)
|
||||
}));
|
||||
let reader = try!(reader);
|
||||
try!(restore_using(snapshot, &reader, true));
|
||||
} else {
|
||||
info!("Attempting to restore from local snapshot.");
|
||||
|
||||
let (num_state, num_blocks) = (manifest.state_hashes.len(), manifest.block_hashes.len());
|
||||
|
||||
let informant_handle = snapshot.clone();
|
||||
::std::thread::spawn(move || {
|
||||
while let RestorationStatus::Ongoing { state_chunks_done, block_chunks_done } = informant_handle.status() {
|
||||
info!("Processed {}/{} state chunks and {}/{} block chunks.",
|
||||
state_chunks_done, num_state, block_chunks_done, num_blocks);
|
||||
|
||||
::std::thread::sleep(Duration::from_secs(5));
|
||||
}
|
||||
});
|
||||
|
||||
info!("Restoring state");
|
||||
for &state_hash in &manifest.state_hashes {
|
||||
if snapshot.status() == RestorationStatus::Failed {
|
||||
return Err("Restoration failed".into());
|
||||
}
|
||||
|
||||
let chunk = try!(reader.chunk(state_hash)
|
||||
.map_err(|e| format!("Encountered error while reading chunk {:?}: {}", state_hash, e)));
|
||||
snapshot.feed_state_chunk(state_hash, &chunk);
|
||||
}
|
||||
|
||||
info!("Restoring blocks");
|
||||
for &block_hash in &manifest.block_hashes {
|
||||
if snapshot.status() == RestorationStatus::Failed {
|
||||
return Err("Restoration failed".into());
|
||||
}
|
||||
|
||||
let chunk = try!(reader.chunk(block_hash)
|
||||
.map_err(|e| format!("Encountered error while reading chunk {:?}: {}", block_hash, e)));
|
||||
snapshot.feed_block_chunk(block_hash, &chunk);
|
||||
}
|
||||
|
||||
match snapshot.status() {
|
||||
RestorationStatus::Ongoing { .. } => Err("Snapshot file is incomplete and missing chunks.".into()),
|
||||
RestorationStatus::Failed => Err("Snapshot restoration failed.".into()),
|
||||
RestorationStatus::Inactive => {
|
||||
info!("Restoration complete.");
|
||||
Ok(())
|
||||
// attempting restoration with recovery will lead to deadlock
|
||||
// as we currently hold a read lock on the service's reader.
|
||||
match *snapshot.reader() {
|
||||
Some(ref reader) => try!(restore_using(snapshot.clone(), reader, false)),
|
||||
None => return Err("No local snapshot found.".into()),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Take a snapshot from the head of the chain.
|
||||
|
||||
Reference in New Issue
Block a user