Use proper database configuration in snapshots. (#2052)

* use proper database config in snapshot service

* add snapshot path to parity directories struct

* fix RPC tests
This commit is contained in:
Robert Habermeier 2016-09-07 15:27:28 +02:00 committed by Arkadiy Paronyan
parent 541b14a4ab
commit 57d5c35bb6
12 changed files with 218 additions and 95 deletions

View File

@ -161,13 +161,10 @@ impl Client {
path: &Path,
miner: Arc<Miner>,
message_channel: IoChannel<ClientIoMessage>,
db_config: &DatabaseConfig,
) -> Result<Arc<Client>, ClientError> {
let path = path.to_path_buf();
let gb = spec.genesis_block();
let mut db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
db_config.cache_size = config.db_cache_size;
db_config.compaction = config.db_compaction.compaction_profile();
db_config.wal = config.db_wal;
let db = Arc::new(try!(Database::open(&db_config, &path.to_str().unwrap()).map_err(ClientError::Database)));
let chain = Arc::new(BlockChain::new(config.blockchain.clone(), &gb, db.clone()));

View File

@ -58,12 +58,14 @@ pub fn json_chain_test(json_data: &[u8], era: ChainEra) -> Vec<String> {
let temp = RandomTempPath::new();
{
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let client = Client::new(
ClientConfig::default(),
&spec,
temp.as_path(),
Arc::new(Miner::with_spec(&spec)),
IoChannel::disconnected()
IoChannel::disconnected(),
&db_config,
).unwrap();
for b in &blockchain.blocks_rlp() {
if Block::is_good(&b) {

View File

@ -23,7 +23,7 @@ use error::*;
use client::{Client, ClientConfig, ChainNotify};
use miner::Miner;
use snapshot::ManifestData;
use snapshot::service::Service as SnapshotService;
use snapshot::service::{Service as SnapshotService, ServiceParams as SnapServiceParams};
use std::sync::atomic::AtomicBool;
#[cfg(feature="ipc")]
@ -60,11 +60,12 @@ pub struct ClientService {
}
impl ClientService {
/// Start the service in a separate thread.
/// Start the `ClientService`.
pub fn start(
config: ClientConfig,
spec: &Spec,
db_path: &Path,
client_path: &Path,
snapshot_path: &Path,
ipc_path: &Path,
miner: Arc<Miner>,
) -> Result<ClientService, Error>
@ -78,11 +79,25 @@ impl ClientService {
warn!("Your chain is an alternative fork. {}", Colour::Red.bold().paint("TRANSACTIONS MAY BE REPLAYED ON THE MAINNET!"));
}
let pruning = config.pruning;
let client = try!(Client::new(config, &spec, db_path, miner, io_service.channel()));
let snapshot = try!(SnapshotService::new(spec, pruning, db_path.into(), io_service.channel(), client.clone()));
let mut db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
db_config.cache_size = config.db_cache_size;
db_config.compaction = config.db_compaction.compaction_profile();
db_config.wal = config.db_wal;
let snapshot = Arc::new(snapshot);
let pruning = config.pruning;
let client = try!(Client::new(config, &spec, client_path, miner, io_service.channel(), &db_config));
let snapshot_params = SnapServiceParams {
engine: spec.engine.clone(),
genesis_block: spec.genesis_block(),
db_config: db_config,
pruning: pruning,
channel: io_service.channel(),
snapshot_root: snapshot_path.into(),
client_db: client_path.into(),
db_restore: client.clone(),
};
let snapshot = Arc::new(try!(SnapshotService::new(snapshot_params)));
panic_handler.forward_from(&*client);
let client_io = Arc::new(ClientIoHandler {
@ -232,15 +247,25 @@ mod tests {
#[test]
fn it_can_be_started() {
let temp_path = RandomTempPath::new();
let mut path = temp_path.as_path().to_owned();
path.push("pruning");
path.push("db");
let path = temp_path.as_path().to_owned();
let client_path = {
let mut path = path.to_owned();
path.push("client");
path
};
let snapshot_path = {
let mut path = path.to_owned();
path.push("snapshot");
path
};
let spec = get_test_spec();
let service = ClientService::start(
ClientConfig::default(),
&spec,
&path,
&client_path,
&snapshot_path,
&path,
Arc::new(Miner::with_spec(&spec)),
);

View File

@ -19,7 +19,7 @@
use std::collections::HashSet;
use std::io::ErrorKind;
use std::fs;
use std::path::{Path, PathBuf};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
@ -32,7 +32,6 @@ use engines::Engine;
use error::Error;
use ids::BlockID;
use service::ClientIoMessage;
use spec::Spec;
use io::IoChannel;
@ -81,6 +80,7 @@ struct RestorationParams<'a> {
manifest: ManifestData, // manifest to base restoration on.
pruning: Algorithm, // pruning algorithm for the database.
db_path: PathBuf, // database path
db_config: &'a DatabaseConfig,
writer: LooseWriter, // writer for recovered snapshot.
genesis: &'a [u8], // genesis block of the chain.
guard: Guard, // guard for the restoration directory.
@ -94,8 +94,7 @@ impl Restoration {
let state_chunks = manifest.state_hashes.iter().cloned().collect();
let block_chunks = manifest.block_hashes.iter().cloned().collect();
let cfg = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let raw_db = Arc::new(try!(Database::open(&cfg, &*params.db_path.to_string_lossy())
let raw_db = Arc::new(try!(Database::open(params.db_config, &*params.db_path.to_string_lossy())
.map_err(UtilError::SimpleString)));
let chain = BlockChain::new(Default::default(), params.genesis, raw_db.clone());
@ -173,15 +172,35 @@ impl Restoration {
/// Type alias for client io channel.
pub type Channel = IoChannel<ClientIoMessage>;
/// Service implementation.
///
/// This will replace the client's state DB as soon as the last state chunk
/// is fed, and will replace the client's blocks DB when the last block chunk
/// is fed.
/// Snapshot service parameters.
pub struct ServiceParams {
/// The consensus engine this is built on.
pub engine: Arc<Engine>,
/// The chain's genesis block.
pub genesis_block: Bytes,
/// Database configuration options.
pub db_config: DatabaseConfig,
/// State pruning algorithm.
pub pruning: Algorithm,
/// Async IO channel for sending messages.
pub channel: Channel,
/// The directory to put snapshots in.
/// Usually "<chain hash>/snapshot"
pub snapshot_root: PathBuf,
/// The client's database directory.
/// Usually "<chain hash>/<pruning>/db".
pub client_db: PathBuf,
/// A handle for database restoration.
pub db_restore: Arc<DatabaseRestore>,
}
/// `SnapshotService` implementation.
/// This controls taking snapshots and restoring from them.
pub struct Service {
restoration: Mutex<Option<Restoration>>,
client_db: PathBuf, // "<chain hash>/<pruning>/db"
db_path: PathBuf, // "<chain hash>/"
client_db: PathBuf,
snapshot_root: PathBuf,
db_config: DatabaseConfig,
io_channel: Channel,
pruning: Algorithm,
status: Mutex<RestorationStatus>,
@ -196,38 +215,28 @@ pub struct Service {
}
impl Service {
/// Create a new snapshot service.
pub fn new(spec: &Spec, pruning: Algorithm, client_db: PathBuf, io_channel: Channel, db_restore: Arc<DatabaseRestore>) -> Result<Self, Error> {
let db_path = try!(client_db.parent().and_then(Path::parent)
.ok_or_else(|| UtilError::SimpleString("Failed to find database root.".into()))).to_owned();
let reader = {
let mut snapshot_path = db_path.clone();
snapshot_path.push("snapshot");
snapshot_path.push("current");
LooseReader::new(snapshot_path).ok()
};
let service = Service {
/// Create a new snapshot service from the given parameters.
pub fn new(params: ServiceParams) -> Result<Self, Error> {
let mut service = Service {
restoration: Mutex::new(None),
client_db: client_db,
db_path: db_path,
io_channel: io_channel,
pruning: pruning,
client_db: params.client_db,
snapshot_root: params.snapshot_root,
db_config: params.db_config,
io_channel: params.channel,
pruning: params.pruning,
status: Mutex::new(RestorationStatus::Inactive),
reader: RwLock::new(reader),
engine: spec.engine.clone(),
genesis_block: spec.genesis_block(),
reader: RwLock::new(None),
engine: params.engine,
genesis_block: params.genesis_block,
state_chunks: AtomicUsize::new(0),
block_chunks: AtomicUsize::new(0),
db_restore: db_restore,
db_restore: params.db_restore,
progress: Default::default(),
taking_snapshot: AtomicBool::new(false),
};
// create the root snapshot dir if it doesn't exist.
if let Err(e) = fs::create_dir_all(service.root_dir()) {
if let Err(e) = fs::create_dir_all(&service.snapshot_root) {
if e.kind() != ErrorKind::AlreadyExists {
return Err(e.into())
}
@ -247,33 +256,29 @@ impl Service {
}
}
Ok(service)
}
let reader = LooseReader::new(service.snapshot_dir()).ok();
*service.reader.get_mut() = reader;
// get the root path.
fn root_dir(&self) -> PathBuf {
let mut dir = self.db_path.clone();
dir.push("snapshot");
dir
Ok(service)
}
// get the current snapshot dir.
fn snapshot_dir(&self) -> PathBuf {
let mut dir = self.root_dir();
let mut dir = self.snapshot_root.clone();
dir.push("current");
dir
}
// get the temporary snapshot dir.
fn temp_snapshot_dir(&self) -> PathBuf {
let mut dir = self.root_dir();
let mut dir = self.snapshot_root.clone();
dir.push("in_progress");
dir
}
// get the restoration directory.
fn restoration_dir(&self) -> PathBuf {
let mut dir = self.root_dir();
let mut dir = self.snapshot_root.clone();
dir.push("restoration");
dir
}
@ -377,6 +382,7 @@ impl Service {
manifest: manifest,
pruning: self.pruning,
db_path: self.restoration_db(),
db_config: &self.db_config,
writer: writer,
genesis: &self.genesis_block,
guard: Guard::new(rest_dir),
@ -564,19 +570,26 @@ mod tests {
#[test]
fn sends_async_messages() {
let service = IoService::<ClientIoMessage>::start().unwrap();
let spec = get_test_spec();
let dir = RandomTempPath::new();
let mut dir = dir.as_path().to_owned();
dir.push("pruning");
dir.push("db");
let mut client_db = dir.clone();
dir.push("snapshot");
client_db.push("client");
let service = Service::new(
&get_test_spec(),
Algorithm::Archive,
dir,
service.channel(),
Arc::new(NoopDBRestore),
).unwrap();
let snapshot_params = ServiceParams {
engine: spec.engine.clone(),
genesis_block: spec.genesis_block(),
db_config: Default::default(),
pruning: Algorithm::Archive,
channel: service.channel(),
snapshot_root: dir,
client_db: client_db,
db_restore: Arc::new(NoopDBRestore),
};
let service = Service::new(snapshot_params).unwrap();
assert!(service.manifest().is_none());
assert!(service.chunk(Default::default()).is_none());

View File

@ -28,7 +28,16 @@ use rlp::{Rlp, View};
fn imports_from_empty() {
let dir = RandomTempPath::new();
let spec = get_test_spec();
let client = Client::new(ClientConfig::default(), &spec, dir.as_path(), Arc::new(Miner::with_spec(&spec)), IoChannel::disconnected()).unwrap();
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let client = Client::new(
ClientConfig::default(),
&spec,
dir.as_path(),
Arc::new(Miner::with_spec(&spec)),
IoChannel::disconnected(),
&db_config
).unwrap();
client.import_verified_blocks();
client.flush_queue();
}
@ -37,7 +46,16 @@ fn imports_from_empty() {
fn should_return_registrar() {
let dir = RandomTempPath::new();
let spec = ethereum::new_morden();
let client = Client::new(ClientConfig::default(), &spec, dir.as_path(), Arc::new(Miner::with_spec(&spec)), IoChannel::disconnected()).unwrap();
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let client = Client::new(
ClientConfig::default(),
&spec,
dir.as_path(),
Arc::new(Miner::with_spec(&spec)),
IoChannel::disconnected(),
&db_config
).unwrap();
assert_eq!(client.additional_params().get("registrar"), Some(&"8e4e9b13d4b45cb0befc93c3061b1408f67316b2".to_owned()));
}
@ -55,7 +73,16 @@ fn returns_state_root_basic() {
fn imports_good_block() {
let dir = RandomTempPath::new();
let spec = get_test_spec();
let client = Client::new(ClientConfig::default(), &spec, dir.as_path(), Arc::new(Miner::with_spec(&spec)), IoChannel::disconnected()).unwrap();
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let client = Client::new(
ClientConfig::default(),
&spec,
dir.as_path(),
Arc::new(Miner::with_spec(&spec)),
IoChannel::disconnected(),
&db_config
).unwrap();
let good_block = get_good_dummy_block();
if let Err(_) = client.import_block(good_block) {
panic!("error importing block being good by definition");
@ -71,8 +98,16 @@ fn imports_good_block() {
fn query_none_block() {
let dir = RandomTempPath::new();
let spec = get_test_spec();
let client = Client::new(ClientConfig::default(), &spec, dir.as_path(), Arc::new(Miner::with_spec(&spec)), IoChannel::disconnected()).unwrap();
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let client = Client::new(
ClientConfig::default(),
&spec,
dir.as_path(),
Arc::new(Miner::with_spec(&spec)),
IoChannel::disconnected(),
&db_config
).unwrap();
let non_existant = client.block_header(BlockID::Number(188));
assert!(non_existant.is_none());
}

View File

@ -133,9 +133,17 @@ pub fn generate_dummy_client_with_data(block_number: u32, txs_per_block: usize,
pub fn generate_dummy_client_with_spec_and_data<F>(get_test_spec: F, block_number: u32, txs_per_block: usize, tx_gas_prices: &[U256]) -> GuardedTempResult<Arc<Client>> where F: Fn()->Spec {
let dir = RandomTempPath::new();
let test_spec = get_test_spec();
let client = Client::new(ClientConfig::default(), &test_spec, dir.as_path(), Arc::new(Miner::with_spec(&test_spec)), IoChannel::disconnected()).unwrap();
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let client = Client::new(
ClientConfig::default(),
&test_spec,
dir.as_path(),
Arc::new(Miner::with_spec(&test_spec)),
IoChannel::disconnected(),
&db_config
).unwrap();
let test_engine = &*test_spec.engine;
let mut db_result = get_temp_journal_db();
@ -233,7 +241,17 @@ pub fn push_blocks_to_client(client: &Arc<Client>, timestamp_salt: u64, starting
pub fn get_test_client_with_blocks(blocks: Vec<Bytes>) -> GuardedTempResult<Arc<Client>> {
let dir = RandomTempPath::new();
let test_spec = get_test_spec();
let client = Client::new(ClientConfig::default(), &test_spec, dir.as_path(), Arc::new(Miner::with_spec(&test_spec)), IoChannel::disconnected()).unwrap();
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let client = Client::new(
ClientConfig::default(),
&test_spec,
dir.as_path(),
Arc::new(Miner::with_spec(&test_spec)),
IoChannel::disconnected(),
&db_config
).unwrap();
for block in &blocks {
if let Err(_) = client.import_block(block.clone()) {
panic!("panic importing block which is well-formed");

View File

@ -25,18 +25,23 @@ use devtools::*;
use miner::Miner;
use crossbeam;
use io::IoChannel;
use util::kvdb::DatabaseConfig;
pub fn run_test_worker(scope: &crossbeam::Scope, stop: Arc<AtomicBool>, socket_path: &str) {
let socket_path = socket_path.to_owned();
scope.spawn(move || {
let temp = RandomTempPath::create_dir();
let spec = get_test_spec();
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let client = Client::new(
ClientConfig::default(),
&spec,
temp.as_path(),
Arc::new(Miner::with_spec(&spec)),
IoChannel::disconnected()).unwrap();
IoChannel::disconnected(),
&db_config
).unwrap();
let mut worker = nanoipc::Worker::new(&(client as Arc<BlockChainClient>));
worker.add_reqrep(&socket_path).unwrap();
while !stop.load(Ordering::Relaxed) {

View File

@ -19,7 +19,6 @@ use std::{io, fs};
use std::io::{BufReader, BufRead};
use std::time::Duration;
use std::thread::sleep;
use std::path::Path;
use std::sync::Arc;
use rustc_serialize::hex::FromHex;
use ethcore_logger::{setup_log, Config as LogConfig};
@ -125,8 +124,9 @@ fn execute_import(cmd: ImportBlockchain) -> Result<String, String> {
// select pruning algorithm
let algorithm = cmd.pruning.to_algorithm(&cmd.dirs, genesis_hash, spec.fork_name.as_ref());
// prepare client_path
// prepare client and snapshot paths.
let client_path = cmd.dirs.client_path(genesis_hash, spec.fork_name.as_ref(), algorithm);
let snapshot_path = cmd.dirs.snapshot_path(genesis_hash, spec.fork_name.as_ref());
// execute upgrades
try!(execute_upgrades(&cmd.dirs, genesis_hash, spec.fork_name.as_ref(), algorithm, cmd.compaction.compaction_profile()));
@ -138,8 +138,9 @@ fn execute_import(cmd: ImportBlockchain) -> Result<String, String> {
let service = try!(ClientService::start(
client_config,
&spec,
Path::new(&client_path),
Path::new(&cmd.dirs.ipc_path()),
&client_path,
&snapshot_path,
&cmd.dirs.ipc_path(),
Arc::new(Miner::with_spec(&spec)),
).map_err(|e| format!("Client service error: {:?}", e)));
@ -237,8 +238,9 @@ fn execute_export(cmd: ExportBlockchain) -> Result<String, String> {
// select pruning algorithm
let algorithm = cmd.pruning.to_algorithm(&cmd.dirs, genesis_hash, spec.fork_name.as_ref());
// prepare client_path
// prepare client and snapshot paths.
let client_path = cmd.dirs.client_path(genesis_hash, spec.fork_name.as_ref(), algorithm);
let snapshot_path = cmd.dirs.snapshot_path(genesis_hash, spec.fork_name.as_ref());
// execute upgrades
try!(execute_upgrades(&cmd.dirs, genesis_hash, spec.fork_name.as_ref(), algorithm, cmd.compaction.compaction_profile()));
@ -249,8 +251,9 @@ fn execute_export(cmd: ExportBlockchain) -> Result<String, String> {
let service = try!(ClientService::start(
client_config,
&spec,
Path::new(&client_path),
Path::new(&cmd.dirs.ipc_path()),
&client_path,
&snapshot_path,
&cmd.dirs.ipc_path(),
Arc::new(Miner::with_spec(&spec)),
).map_err(|e| format!("Client service error: {:?}", e)));

View File

@ -52,10 +52,16 @@ impl Directories {
Ok(())
}
/// Get the root path for database
pub fn db_version_path(&self, genesis_hash: H256, fork_name: Option<&String>, pruning: Algorithm) -> PathBuf {
/// Get the chain's root path.
pub fn chain_path(&self, genesis_hash: H256, fork_name: Option<&String>) -> PathBuf {
let mut dir = Path::new(&self.db).to_path_buf();
dir.push(format!("{:?}{}", H64::from(genesis_hash), fork_name.map(|f| format!("-{}", f)).unwrap_or_default()));
dir
}
/// Get the root path for database
pub fn db_version_path(&self, genesis_hash: H256, fork_name: Option<&String>, pruning: Algorithm) -> PathBuf {
let mut dir = self.chain_path(genesis_hash, fork_name);
dir.push(format!("v{}-sec-{}", LEGACY_CLIENT_DB_VER_STR, pruning.as_internal_name_str()));
dir
}
@ -67,6 +73,13 @@ impl Directories {
dir
}
/// Get the path for the snapshot directory given the genesis hash and fork name.
pub fn snapshot_path(&self, genesis_hash: H256, fork_name: Option<&String>) -> PathBuf {
let mut dir = self.chain_path(genesis_hash, fork_name);
dir.push("snapshot");
dir
}
/// Get the ipc sockets path
pub fn ipc_path(&self) -> PathBuf {
let mut dir = Path::new(&self.db).to_path_buf();

View File

@ -15,7 +15,6 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::{Arc, Mutex, Condvar};
use std::path::Path;
use std::io::ErrorKind;
use ctrlc::CtrlC;
use fdlimit::raise_fd_limit;
@ -110,8 +109,9 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
// select pruning algorithm
let algorithm = cmd.pruning.to_algorithm(&cmd.dirs, genesis_hash, fork_name.as_ref());
// prepare client_path
// prepare client and snapshot paths.
let client_path = cmd.dirs.client_path(genesis_hash, fork_name.as_ref(), algorithm);
let snapshot_path = cmd.dirs.snapshot_path(genesis_hash, fork_name.as_ref());
// execute upgrades
try!(execute_upgrades(&cmd.dirs, genesis_hash, fork_name.as_ref(), algorithm, cmd.compaction.compaction_profile()));
@ -171,14 +171,15 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
}
// create supervisor
let mut hypervisor = modules::hypervisor(Path::new(&cmd.dirs.ipc_path()));
let mut hypervisor = modules::hypervisor(&cmd.dirs.ipc_path());
// create client service.
let service = try!(ClientService::start(
client_config,
&spec,
Path::new(&client_path),
Path::new(&cmd.dirs.ipc_path()),
&client_path,
&snapshot_path,
&cmd.dirs.ipc_path(),
miner.clone(),
).map_err(|e| format!("Client service error: {:?}", e)));

View File

@ -82,8 +82,9 @@ impl SnapshotCommand {
// select pruning algorithm
let algorithm = self.pruning.to_algorithm(&self.dirs, genesis_hash, spec.fork_name.as_ref());
// prepare client_path
// prepare client and snapshot paths.
let client_path = self.dirs.client_path(genesis_hash, spec.fork_name.as_ref(), algorithm);
let snapshot_path = self.dirs.snapshot_path(genesis_hash, spec.fork_name.as_ref());
// execute upgrades
try!(execute_upgrades(&self.dirs, genesis_hash, spec.fork_name.as_ref(), algorithm, self.compaction.compaction_profile()));
@ -94,8 +95,9 @@ impl SnapshotCommand {
let service = try!(ClientService::start(
client_config,
&spec,
Path::new(&client_path),
Path::new(&self.dirs.ipc_path()),
&client_path,
&snapshot_path,
&self.dirs.ipc_path(),
Arc::new(Miner::with_spec(&spec))
).map_err(|e| format!("Client service error: {:?}", e)));

View File

@ -108,7 +108,16 @@ impl EthTester {
let dir = RandomTempPath::new();
let account_provider = account_provider();
let miner_service = miner_service(&spec, account_provider.clone());
let client = Client::new(ClientConfig::default(), &spec, dir.as_path(), miner_service.clone(), IoChannel::disconnected()).unwrap();
let db_config = ::util::kvdb::DatabaseConfig::with_columns(::ethcore::db::NUM_COLUMNS);
let client = Client::new(
ClientConfig::default(),
&spec,
dir.as_path(),
miner_service.clone(),
IoChannel::disconnected(),
&db_config
).unwrap();
let sync_provider = sync_provider();
let external_miner = Arc::new(ExternalMiner::default());