openethereum/bin/oe/run.rs
rakita 7ea5707904
Freeze pruning while creating snapshot (#205)
* Freeze pruning while creating snapshot
* Use scopeguard for snapshot generation
* Snapshot 1k blocks
* Snapshot number correction
2021-03-09 09:47:49 +01:00

738 lines
24 KiB
Rust

// Copyright 2015-2020 Parity Technologies (UK) Ltd.
// This file is part of OpenEthereum.
// OpenEthereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// OpenEthereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with OpenEthereum. If not, see <http://www.gnu.org/licenses/>.
use std::{
any::Any,
sync::{atomic, Arc, Weak},
thread,
time::{Duration, Instant},
};
use account_utils;
use ansi_term::Colour;
use cache::CacheConfig;
use db;
use dir::{DatabaseDirectories, Directories};
use ethcore::{
client::{BlockChainClient, BlockInfo, Client, DatabaseCompactionProfile, Mode, VMType},
miner::{self, stratum, Miner, MinerOptions, MinerService},
snapshot::{self, SnapshotConfiguration},
verification::queue::VerifierSettings,
};
use ethcore_logger::{Config as LogConfig, RotatingLogger};
use ethcore_service::ClientService;
use ethereum_types::{H256, U64};
use helpers::{execute_upgrades, passwords_from_files, to_client_config};
use informant::{FullNodeInformantData, Informant};
use journaldb::Algorithm;
use jsonrpc_core;
use metrics::{start_prometheus_metrics, MetricsConfiguration};
use miner::{external::ExternalMiner, work_notify::WorkPoster};
use modules;
use node_filter::NodeFilter;
use params::{
fatdb_switch_to_bool, mode_switch_to_bool, tracing_switch_to_bool, AccountsConfig,
GasPricerConfig, MinerExtras, Pruning, SpecType, Switch,
};
use parity_rpc::{
informant, is_major_importing, FutureOutput, FutureResponse, FutureResult, Metadata,
NetworkSettings, Origin, PubSubSession,
};
use parity_runtime::Runtime;
use parity_version::version;
use rpc;
use rpc_apis;
use secretstore;
use signer;
use sync::{self, SyncConfig};
use user_defaults::UserDefaults;
// How often we attempt to take a snapshot: only snapshot on blocknumbers that are multiples of this.
const SNAPSHOT_PERIOD: u64 = 20000;
// Start snapshoting from `tip`-`history, with this we want to bypass reorgs. Should be smaller than prunning history.
const SNAPSHOT_HISTORY: u64 = 50;
// Full client number of DNS threads
const FETCH_FULL_NUM_DNS_THREADS: usize = 4;
#[derive(Debug, PartialEq)]
pub struct RunCmd {
pub cache_config: CacheConfig,
pub dirs: Directories,
pub spec: SpecType,
pub pruning: Pruning,
pub pruning_history: u64,
pub pruning_memory: usize,
/// Some if execution should be daemonized. Contains pid_file path.
pub daemon: Option<String>,
pub logger_config: LogConfig,
pub miner_options: MinerOptions,
pub gas_price_percentile: usize,
pub poll_lifetime: u32,
pub ws_conf: rpc::WsConfiguration,
pub http_conf: rpc::HttpConfiguration,
pub ipc_conf: rpc::IpcConfiguration,
pub net_conf: sync::NetworkConfiguration,
pub network_id: Option<u64>,
pub warp_sync: bool,
pub warp_barrier: Option<u64>,
pub acc_conf: AccountsConfig,
pub gas_pricer_conf: GasPricerConfig,
pub miner_extras: MinerExtras,
pub mode: Option<Mode>,
pub tracing: Switch,
pub fat_db: Switch,
pub compaction: DatabaseCompactionProfile,
pub vm_type: VMType,
pub experimental_rpcs: bool,
pub net_settings: NetworkSettings,
pub secretstore_conf: secretstore::Configuration,
pub name: String,
pub custom_bootnodes: bool,
pub stratum: Option<stratum::Options>,
pub snapshot_conf: SnapshotConfiguration,
pub check_seal: bool,
pub allow_missing_blocks: bool,
pub download_old_blocks: bool,
pub verifier_settings: VerifierSettings,
pub no_persistent_txqueue: bool,
pub max_round_blocks_to_import: usize,
pub metrics_conf: MetricsConfiguration,
}
// node info fetcher for the local store.
struct FullNodeInfo {
miner: Option<Arc<Miner>>, // TODO: only TXQ needed, just use that after decoupling.
}
impl ::local_store::NodeInfo for FullNodeInfo {
fn pending_transactions(&self) -> Vec<::types::transaction::PendingTransaction> {
let miner = match self.miner.as_ref() {
Some(m) => m,
None => return Vec::new(),
};
miner
.local_transactions()
.values()
.filter_map(|status| match *status {
::miner::pool::local_transactions::Status::Pending(ref tx) => {
Some(tx.pending().clone())
}
_ => None,
})
.collect()
}
}
/// Executes the given run command.
///
/// On error, returns what to print on stderr.
pub fn execute(cmd: RunCmd, logger: Arc<RotatingLogger>) -> Result<RunningClient, String> {
// load spec
let spec = cmd.spec.spec(&cmd.dirs.cache)?;
// load genesis hash
let genesis_hash = spec.genesis_header().hash();
// database paths
let db_dirs = cmd.dirs.database(
genesis_hash,
cmd.spec.legacy_fork_name(),
spec.data_dir.clone(),
);
// user defaults path
let user_defaults_path = db_dirs.user_defaults_path();
// load user defaults
let mut user_defaults = UserDefaults::load(&user_defaults_path)?;
// select pruning algorithm
let algorithm = cmd.pruning.to_algorithm(&user_defaults);
// check if tracing is on
let tracing = tracing_switch_to_bool(cmd.tracing, &user_defaults)?;
// check if fatdb is on
let fat_db = fatdb_switch_to_bool(cmd.fat_db, &user_defaults, algorithm)?;
// get the mode
let mode = mode_switch_to_bool(cmd.mode, &user_defaults)?;
trace!(target: "mode", "mode is {:?}", mode);
let network_enabled = match mode {
Mode::Dark(_) | Mode::Off => false,
_ => true,
};
// prepare client and snapshot paths.
let client_path = db_dirs.client_path(algorithm);
let snapshot_path = db_dirs.snapshot_path();
// execute upgrades
execute_upgrades(&cmd.dirs.base, &db_dirs, algorithm, &cmd.compaction)?;
// create dirs used by parity
cmd.dirs.create_dirs(
cmd.acc_conf.unlocked_accounts.len() == 0,
cmd.secretstore_conf.enabled,
)?;
//print out running parity environment
print_running_environment(&spec.data_dir, &cmd.dirs, &db_dirs);
// display info about used pruning algorithm
info!(
"State DB configuration: {}{}{}",
Colour::White.bold().paint(algorithm.as_str()),
match fat_db {
true => Colour::White.bold().paint(" +Fat").to_string(),
false => "".to_owned(),
},
match tracing {
true => Colour::White.bold().paint(" +Trace").to_string(),
false => "".to_owned(),
}
);
info!(
"Operating mode: {}",
Colour::White.bold().paint(format!("{}", mode))
);
// display warning about using experimental journaldb algorithm
if !algorithm.is_stable() {
warn!(
"Your chosen strategy is {}! You can re-run with --pruning to change.",
Colour::Red.bold().paint("unstable")
);
}
// create sync config
let mut sync_config = SyncConfig::default();
sync_config.network_id = match cmd.network_id {
Some(id) => id,
None => spec.network_id(),
};
if spec.subprotocol_name().len() > 8 {
warn!("Your chain specification's subprotocol length is more then 8. Ignoring.");
} else {
sync_config.subprotocol_name = U64::from(spec.subprotocol_name().as_bytes())
}
sync_config.fork_block = spec.fork_block();
let mut warp_sync = spec.engine.supports_warp() && cmd.warp_sync;
if warp_sync {
// Logging is not initialized yet, so we print directly to stderr
if fat_db {
warn!("Warning: Warp Sync is disabled because Fat DB is turned on.");
warp_sync = false;
} else if tracing {
warn!("Warning: Warp Sync is disabled because tracing is turned on.");
warp_sync = false;
} else if algorithm != Algorithm::OverlayRecent {
warn!("Warning: Warp Sync is disabled because of non-default pruning mode.");
warp_sync = false;
}
}
sync_config.warp_sync = match (warp_sync, cmd.warp_barrier) {
(true, Some(block)) => sync::WarpSync::OnlyAndAfter(block),
(true, _) => sync::WarpSync::Enabled,
_ => sync::WarpSync::Disabled,
};
sync_config.download_old_blocks = cmd.download_old_blocks;
let passwords = passwords_from_files(&cmd.acc_conf.password_files)?;
// prepare account provider
let account_provider = Arc::new(account_utils::prepare_account_provider(
&cmd.spec,
&cmd.dirs,
&spec.data_dir,
cmd.acc_conf,
&passwords,
)?);
// spin up event loop
let runtime = Runtime::with_default_thread_count();
// fetch service
let fetch = fetch::Client::new(FETCH_FULL_NUM_DNS_THREADS)
.map_err(|e| format!("Error starting fetch client: {:?}", e))?;
let txpool_size = cmd.miner_options.pool_limits.max_count;
// create miner
let miner = Arc::new(Miner::new(
cmd.miner_options,
cmd.gas_pricer_conf
.to_gas_pricer(fetch.clone(), runtime.executor()),
&spec,
(
cmd.miner_extras.local_accounts,
account_utils::miner_local_accounts(account_provider.clone()),
),
));
miner.set_author(miner::Author::External(cmd.miner_extras.author));
miner.set_gas_range_target(cmd.miner_extras.gas_range_target);
miner.set_extra_data(cmd.miner_extras.extra_data);
if !cmd.miner_extras.work_notify.is_empty() {
miner.add_work_listener(Box::new(WorkPoster::new(
&cmd.miner_extras.work_notify,
fetch.clone(),
runtime.executor(),
)));
}
let engine_signer = cmd.miner_extras.engine_signer;
if engine_signer != Default::default() {
if let Some(author) = account_utils::miner_author(
&cmd.spec,
&cmd.dirs,
&account_provider,
engine_signer,
&passwords,
)? {
miner.set_author(author);
}
}
// create client config
let mut client_config = to_client_config(
&cmd.cache_config,
spec.name.to_lowercase(),
mode.clone(),
tracing,
fat_db,
cmd.compaction,
cmd.vm_type,
cmd.name,
algorithm,
cmd.pruning_history,
cmd.pruning_memory,
cmd.check_seal,
cmd.max_round_blocks_to_import,
);
client_config.queue.verifier_settings = cmd.verifier_settings;
client_config.queue.verifier_settings.bad_hashes = verification_bad_blocks(&cmd.spec);
client_config.transaction_verification_queue_size = ::std::cmp::max(2048, txpool_size / 4);
client_config.snapshot = cmd.snapshot_conf.clone();
// set up bootnodes
let mut net_conf = cmd.net_conf;
if !cmd.custom_bootnodes {
net_conf.boot_nodes = spec.nodes.clone();
}
// set network path.
net_conf.net_config_path = Some(db_dirs.network_path().to_string_lossy().into_owned());
let restoration_db_handler = db::restoration_db_handler(&client_path, &client_config);
let client_db = restoration_db_handler
.open(&client_path)
.map_err(|e| format!("Failed to open database {:?}", e))?;
// create client service.
let service = ClientService::start(
client_config,
&spec,
client_db,
&snapshot_path,
restoration_db_handler,
&cmd.dirs.ipc_path(),
miner.clone(),
)
.map_err(|e| format!("Client service error: {:?}", e))?;
let connection_filter_address = spec.params().node_permission_contract;
// drop the spec to free up genesis state.
let forks = spec.hard_forks.clone();
drop(spec);
// take handle to client
let client = service.client();
// Update miners block gas limit
miner.update_transaction_queue_limits(*client.best_block_header().gas_limit());
let connection_filter = connection_filter_address.map(|a| {
Arc::new(NodeFilter::new(
Arc::downgrade(&client) as Weak<dyn BlockChainClient>,
a,
))
});
let snapshot_service = service.snapshot_service();
// initialize the local node information store.
let store = {
let db = service.db();
let node_info = FullNodeInfo {
miner: match cmd.no_persistent_txqueue {
true => None,
false => Some(miner.clone()),
},
};
let store = ::local_store::create(
db.key_value().clone(),
::ethcore_db::COL_NODE_INFO,
node_info,
);
if cmd.no_persistent_txqueue {
info!("Running without a persistent transaction queue.");
if let Err(e) = store.clear() {
warn!("Error clearing persistent transaction queue: {}", e);
}
}
// re-queue pending transactions.
match store.pending_transactions() {
Ok(pending) => {
for pending_tx in pending {
if let Err(e) = miner.import_own_transaction(&*client, pending_tx) {
warn!("Error importing saved transaction: {}", e)
}
}
}
Err(e) => warn!("Error loading cached pending transactions from disk: {}", e),
}
Arc::new(store)
};
// register it as an IO service to update periodically.
service
.register_io_handler(store)
.map_err(|_| "Unable to register local store handler".to_owned())?;
// create external miner
let external_miner = Arc::new(ExternalMiner::default());
// start stratum
if let Some(ref stratum_config) = cmd.stratum {
stratum::Stratum::register(stratum_config, miner.clone(), Arc::downgrade(&client))
.map_err(|e| format!("Stratum start error: {:?}", e))?;
}
// create sync object
let (sync_provider, manage_network, chain_notify, priority_tasks) = modules::sync(
sync_config,
net_conf.clone().into(),
client.clone(),
forks,
snapshot_service.clone(),
&cmd.logger_config,
connection_filter
.clone()
.map(|f| f as Arc<dyn crate::sync::ConnectionFilter + 'static>),
)
.map_err(|e| format!("Sync error: {}", e))?;
service.add_notify(chain_notify.clone());
// Propagate transactions as soon as they are imported.
let tx = ::parking_lot::Mutex::new(priority_tasks);
let is_ready = Arc::new(atomic::AtomicBool::new(true));
miner.add_transactions_listener(Box::new(move |_hashes| {
// we want to have only one PendingTransactions task in the queue.
if is_ready
.compare_exchange(
true,
false,
atomic::Ordering::SeqCst,
atomic::Ordering::SeqCst,
)
.is_ok()
{
let task =
::sync::PriorityTask::PropagateTransactions(Instant::now(), is_ready.clone());
// we ignore error cause it means that we are closing
let _ = tx.lock().send(task);
}
}));
// start network
if network_enabled {
chain_notify.start();
}
// set up dependencies for rpc servers
let rpc_stats = Arc::new(informant::RpcStats::default());
let secret_store = account_provider.clone();
let signer_service = Arc::new(signer::new_service(&cmd.ws_conf, &cmd.logger_config));
let deps_for_rpc_apis = Arc::new(rpc_apis::FullDependencies {
signer_service: signer_service,
snapshot: snapshot_service.clone(),
client: client.clone(),
sync: sync_provider.clone(),
net: manage_network.clone(),
accounts: secret_store,
miner: miner.clone(),
external_miner: external_miner.clone(),
logger: logger.clone(),
settings: Arc::new(cmd.net_settings.clone()),
net_service: manage_network.clone(),
experimental_rpcs: cmd.experimental_rpcs,
ws_address: cmd.ws_conf.address(),
fetch: fetch.clone(),
executor: runtime.executor(),
gas_price_percentile: cmd.gas_price_percentile,
poll_lifetime: cmd.poll_lifetime,
allow_missing_blocks: cmd.allow_missing_blocks,
no_ancient_blocks: !cmd.download_old_blocks,
});
let dependencies = rpc::Dependencies {
apis: deps_for_rpc_apis.clone(),
executor: runtime.executor(),
stats: rpc_stats.clone(),
};
// start rpc servers
let rpc_direct = rpc::setup_apis(rpc_apis::ApiSet::All, &dependencies);
let ws_server = rpc::new_ws(cmd.ws_conf.clone(), &dependencies)?;
let ipc_server = rpc::new_ipc(cmd.ipc_conf, &dependencies)?;
// start the prometheus metrics server
start_prometheus_metrics(&cmd.metrics_conf, &dependencies)?;
let http_server = rpc::new_http(
"HTTP JSON-RPC",
"jsonrpc",
cmd.http_conf.clone(),
&dependencies,
)?;
// secret store key server
let secretstore_deps = secretstore::Dependencies {
client: client.clone(),
sync: sync_provider.clone(),
miner: miner.clone(),
account_provider,
accounts_passwords: &passwords,
};
let secretstore_key_server = secretstore::start(
cmd.secretstore_conf.clone(),
secretstore_deps,
runtime.executor(),
)?;
// the informant
let informant = Arc::new(Informant::new(
FullNodeInformantData {
client: service.client(),
sync: Some(sync_provider.clone()),
net: Some(manage_network.clone()),
},
Some(snapshot_service.clone()),
Some(rpc_stats.clone()),
cmd.logger_config.color,
));
service.add_notify(informant.clone());
service
.register_io_handler(informant.clone())
.map_err(|_| "Unable to register informant handler".to_owned())?;
// save user defaults
user_defaults.is_first_launch = false;
user_defaults.pruning = algorithm;
user_defaults.tracing = tracing;
user_defaults.fat_db = fat_db;
user_defaults.set_mode(mode);
user_defaults.save(&user_defaults_path)?;
// tell client how to save the default mode if it gets changed.
client.on_user_defaults_change(move |mode: Option<Mode>| {
if let Some(mode) = mode {
user_defaults.set_mode(mode);
}
let _ = user_defaults.save(&user_defaults_path); // discard failures - there's nothing we can do
});
// the watcher must be kept alive.
let watcher = match cmd.snapshot_conf.enable {
false => None,
true => {
let sync = sync_provider.clone();
let client = client.clone();
let watcher = Arc::new(snapshot::Watcher::new(
service.client(),
move || is_major_importing(Some(sync.status().state), client.queue_info()),
service.io().channel(),
SNAPSHOT_PERIOD,
SNAPSHOT_HISTORY,
));
service.add_notify(watcher.clone());
Some(watcher)
}
};
Ok(RunningClient {
inner: RunningClientInner::Full {
rpc: rpc_direct,
informant,
client,
client_service: Arc::new(service),
keep_alive: Box::new((
watcher,
ws_server,
http_server,
ipc_server,
secretstore_key_server,
runtime,
)),
},
})
}
/// Set bad blocks in VerificationQeueu. By omiting header we can omit particular fork of chain.
fn verification_bad_blocks(spec: &SpecType) -> Vec<H256> {
match *spec {
SpecType::Ropsten => {
vec!["1eac3d16c642411f13c287e29144c6f58fda859407c8f24c38deb168e1040714".into()]
}
_ => vec![],
}
}
/// Parity client currently executing in background threads.
///
/// Should be destroyed by calling `shutdown()`, otherwise execution will continue in the
/// background.
pub struct RunningClient {
inner: RunningClientInner,
}
enum RunningClientInner {
Full {
rpc:
jsonrpc_core::MetaIoHandler<Metadata, informant::Middleware<informant::ClientNotifier>>,
informant: Arc<Informant<FullNodeInformantData>>,
client: Arc<Client>,
client_service: Arc<ClientService>,
keep_alive: Box<dyn Any>,
},
}
impl RunningClient {
/// Performs an asynchronous RPC query.
// FIXME: [tomaka] This API should be better, with for example a Future
pub fn rpc_query(
&self,
request: &str,
session: Option<Arc<PubSubSession>>,
) -> FutureResult<FutureResponse, FutureOutput> {
let metadata = Metadata {
origin: Origin::CApi,
session,
};
match self.inner {
RunningClientInner::Full { ref rpc, .. } => rpc.handle_request(request, metadata),
}
}
/// Shuts down the client.
pub fn shutdown(self) {
match self.inner {
RunningClientInner::Full {
rpc,
informant,
client,
client_service,
keep_alive,
} => {
info!("Finishing work, please wait...");
// Create a weak reference to the client so that we can wait on shutdown
// until it is dropped
let weak_client = Arc::downgrade(&client);
// Shutdown and drop the ClientService
client_service.shutdown();
trace!(target: "shutdown", "ClientService shut down");
drop(client_service);
trace!(target: "shutdown", "ClientService dropped");
// drop this stuff as soon as exit detected.
drop(rpc);
trace!(target: "shutdown", "RPC dropped");
drop(keep_alive);
trace!(target: "shutdown", "KeepAlive dropped");
// to make sure timer does not spawn requests while shutdown is in progress
informant.shutdown();
trace!(target: "shutdown", "Informant shut down");
// just Arc is dropping here, to allow other reference release in its default time
drop(informant);
trace!(target: "shutdown", "Informant dropped");
drop(client);
trace!(target: "shutdown", "Client dropped");
// This may help when debugging ref cycles. Requires nightly-only `#![feature(weak_counts)]`
// trace!(target: "shutdown", "Waiting for refs to Client to shutdown, strong_count={:?}, weak_count={:?}", weak_client.strong_count(), weak_client.weak_count());
trace!(target: "shutdown", "Waiting for refs to Client to shutdown");
wait_for_drop(weak_client);
}
}
}
}
fn print_running_environment(data_dir: &str, dirs: &Directories, db_dirs: &DatabaseDirectories) {
info!("Starting {}", Colour::White.bold().paint(version()));
info!(
"Keys path {}",
Colour::White
.bold()
.paint(dirs.keys_path(data_dir).to_string_lossy().into_owned())
);
info!(
"DB path {}",
Colour::White
.bold()
.paint(db_dirs.db_root_path().to_string_lossy().into_owned())
);
}
fn wait_for_drop<T>(w: Weak<T>) {
const SLEEP_DURATION: Duration = Duration::from_secs(1);
const WARN_TIMEOUT: Duration = Duration::from_secs(60);
const MAX_TIMEOUT: Duration = Duration::from_secs(300);
let instant = Instant::now();
let mut warned = false;
while instant.elapsed() < MAX_TIMEOUT {
if w.upgrade().is_none() {
return;
}
if !warned && instant.elapsed() > WARN_TIMEOUT {
warned = true;
warn!("Shutdown is taking longer than expected.");
}
thread::sleep(SLEEP_DURATION);
// When debugging shutdown issues on a nightly build it can help to enable this with the
// `#![feature(weak_counts)]` added to lib.rs (TODO: enable when
// https://github.com/rust-lang/rust/issues/57977 is stable)
// trace!(target: "shutdown", "Waiting for client to drop, strong_count={:?}, weak_count={:?}", w.strong_count(), w.weak_count());
trace!(target: "shutdown", "Waiting for client to drop");
}
warn!("Shutdown timeout reached, exiting uncleanly.");
}