From bdf44461737b9bb5e58ee8dc4856d372f866634b Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 11 Jul 2016 09:46:33 +0200 Subject: [PATCH] have AccountDB use address hash for uniqueness (#1533) * partially done alternate migration scheme * finish altering migration framework * migrate tests to new migration framework * address comments * remove superfluous newline [ci skip] * TempIdx -> TempIndex [ci skip] * modify account_db to work on address hash, not address * add a database migration for new accountdb * preserve first 96 bits of keys when combining * handle metadata keys in migration and preserve first 96 bits * fix comments and hash address instead of hash * different migrations based on pruning * migrations mutably borrow self * batch abstraction for migration * added missing licence headers * overlay recent v7 migration * better error handling, migrate version key as well * fix migration tests * commit final batch and migrate journaled insertions * two passes on journal to migrate all possible deleted keys --- ethcore/src/account_db.rs | 77 ++++++--- ethcore/src/migrations/extras/mod.rs | 18 +- ethcore/src/migrations/extras/v6.rs | 18 +- ethcore/src/migrations/mod.rs | 1 + ethcore/src/migrations/state/mod.rs | 21 +++ ethcore/src/migrations/state/v7.rs | 247 +++++++++++++++++++++++++++ parity/configuration.rs | 22 ++- parity/die.rs | 2 +- parity/main.rs | 2 +- parity/migration.rs | 56 ++++-- parity/rpc_apis.rs | 1 - util/src/migration/mod.rs | 82 +++++---- util/src/migration/tests.rs | 4 +- 13 files changed, 468 insertions(+), 83 deletions(-) create mode 100644 ethcore/src/migrations/state/mod.rs create mode 100644 ethcore/src/migrations/state/v7.rs diff --git a/ethcore/src/account_db.rs b/ethcore/src/account_db.rs index 7337940da..cee2b4d48 100644 --- a/ethcore/src/account_db.rs +++ b/ethcore/src/account_db.rs @@ -1,26 +1,59 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + //! DB backend wrapper for Account trie use util::*; static NULL_RLP_STATIC: [u8; 1] = [0x80; 1]; +// combines a key with an address hash to ensure uniqueness. +// leaves the first 96 bits untouched in order to support partial key lookup. +#[inline] +fn combine_key<'a>(address_hash: &'a H256, key: &'a H256) -> H256 { + let mut dst = key.clone(); + { + let last_src: &[u8] = &*address_hash; + let last_dst: &mut [u8] = &mut *dst; + for (k, a) in last_dst[12..].iter_mut().zip(&last_src[12..]) { + *k ^= *a + } + } + + dst +} + // TODO: introduce HashDBMut? /// DB backend wrapper for Account trie /// Transforms trie node keys for the database pub struct AccountDB<'db> { db: &'db HashDB, - address: H256, -} - -#[inline] -fn combine_key<'a>(address: &'a H256, key: &'a H256) -> H256 { - address ^ key + address_hash: H256, } impl<'db> AccountDB<'db> { - pub fn new(db: &'db HashDB, address: &Address) -> AccountDB<'db> { + /// Create a new AccountDB from an address. + pub fn new(db: &'db HashDB, address: &Address) -> Self { + Self::from_hash(db, address.sha3()) + } + + /// Create a new AcountDB from an address' hash. + pub fn from_hash(db: &'db HashDB, address_hash: H256) -> Self { AccountDB { db: db, - address: address.into(), + address_hash: address_hash, } } } @@ -34,14 +67,14 @@ impl<'db> HashDB for AccountDB<'db>{ if key == &SHA3_NULL_RLP { return Some(&NULL_RLP_STATIC); } - self.db.get(&combine_key(&self.address, key)) + self.db.get(&combine_key(&self.address_hash, key)) } fn contains(&self, key: &H256) -> bool { if key == &SHA3_NULL_RLP { return true; } - self.db.contains(&combine_key(&self.address, key)) + self.db.contains(&combine_key(&self.address_hash, key)) } fn insert(&mut self, _value: &[u8]) -> H256 { @@ -60,20 +93,26 @@ impl<'db> HashDB for AccountDB<'db>{ /// DB backend wrapper for Account trie pub struct AccountDBMut<'db> { db: &'db mut HashDB, - address: H256, + address_hash: H256, } impl<'db> AccountDBMut<'db> { - pub fn new(db: &'db mut HashDB, address: &Address) -> AccountDBMut<'db> { + /// Create a new AccountDB from an address. + pub fn new(db: &'db mut HashDB, address: &Address) -> Self { + Self::from_hash(db, address.sha3()) + } + + /// Create a new AcountDB from an address' hash. + pub fn from_hash(db: &'db mut HashDB, address_hash: H256) -> Self { AccountDBMut { db: db, - address: address.into(), + address_hash: address_hash, } } #[allow(dead_code)] pub fn immutable(&'db self) -> AccountDB<'db> { - AccountDB { db: self.db, address: self.address.clone() } + AccountDB { db: self.db, address_hash: self.address_hash.clone() } } } @@ -86,14 +125,14 @@ impl<'db> HashDB for AccountDBMut<'db>{ if key == &SHA3_NULL_RLP { return Some(&NULL_RLP_STATIC); } - self.db.get(&combine_key(&self.address, key)) + self.db.get(&combine_key(&self.address_hash, key)) } fn contains(&self, key: &H256) -> bool { if key == &SHA3_NULL_RLP { return true; } - self.db.contains(&combine_key(&self.address, key)) + self.db.contains(&combine_key(&self.address_hash, key)) } fn insert(&mut self, value: &[u8]) -> H256 { @@ -101,7 +140,7 @@ impl<'db> HashDB for AccountDBMut<'db>{ return SHA3_NULL_RLP.clone(); } let k = value.sha3(); - let ak = combine_key(&self.address, &k); + let ak = combine_key(&self.address_hash, &k); self.db.emplace(ak, value.to_vec()); k } @@ -110,7 +149,7 @@ impl<'db> HashDB for AccountDBMut<'db>{ if key == SHA3_NULL_RLP { return; } - let key = combine_key(&self.address, &key); + let key = combine_key(&self.address_hash, &key); self.db.emplace(key, value.to_vec()) } @@ -118,7 +157,7 @@ impl<'db> HashDB for AccountDBMut<'db>{ if key == &SHA3_NULL_RLP { return; } - let key = combine_key(&self.address, key); + let key = combine_key(&self.address_hash, key); self.db.remove(&key) } } diff --git a/ethcore/src/migrations/extras/mod.rs b/ethcore/src/migrations/extras/mod.rs index c4d4790dc..0635596ea 100644 --- a/ethcore/src/migrations/extras/mod.rs +++ b/ethcore/src/migrations/extras/mod.rs @@ -1,5 +1,21 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + //! Extras database migrations. mod v6; -pub use self::v6::ToV6; +pub use self::v6::ToV6; \ No newline at end of file diff --git a/ethcore/src/migrations/extras/v6.rs b/ethcore/src/migrations/extras/v6.rs index 3b8160e67..af2d0389b 100644 --- a/ethcore/src/migrations/extras/v6.rs +++ b/ethcore/src/migrations/extras/v6.rs @@ -1,3 +1,19 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + use util::migration::SimpleMigration; /// This migration reduces the sizes of keys and moves `ExtrasIndex` byte from back to the front. @@ -22,7 +38,7 @@ impl SimpleMigration for ToV6 { 6 } - fn simple_migrate(&self, key: Vec, value: Vec) -> Option<(Vec, Vec)> { + fn simple_migrate(&mut self, key: Vec, value: Vec) -> Option<(Vec, Vec)> { //// at this version all extras keys are 33 bytes long. if key.len() == 33 { diff --git a/ethcore/src/migrations/mod.rs b/ethcore/src/migrations/mod.rs index 1473ced9c..6d86a122f 100644 --- a/ethcore/src/migrations/mod.rs +++ b/ethcore/src/migrations/mod.rs @@ -1,3 +1,4 @@ //! Database migrations. pub mod extras; +pub mod state; diff --git a/ethcore/src/migrations/state/mod.rs b/ethcore/src/migrations/state/mod.rs new file mode 100644 index 000000000..9a6b9e086 --- /dev/null +++ b/ethcore/src/migrations/state/mod.rs @@ -0,0 +1,21 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! State database migrations. + +mod v7; + +pub use self::v7::{ArchiveV7, OverlayRecentV7}; \ No newline at end of file diff --git a/ethcore/src/migrations/state/v7.rs b/ethcore/src/migrations/state/v7.rs new file mode 100644 index 000000000..45a5669b2 --- /dev/null +++ b/ethcore/src/migrations/state/v7.rs @@ -0,0 +1,247 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! This migration migrates the state db to use an accountdb which ensures uniqueness +//! using an address' hash as opposed to the address itself. + +use std::collections::HashMap; + +use util::Bytes; +use util::hash::{Address, FixedHash, H256}; +use util::kvdb::Database; +use util::migration::{Batch, Config, Error, Migration, SimpleMigration}; +use util::rlp::{decode, Rlp, RlpStream, Stream, View}; +use util::sha3::Hashable; + +// attempt to migrate a key, value pair. None if migration not possible. +fn attempt_migrate(mut key_h: H256, val: &[u8]) -> Option { + let val_hash = val.sha3(); + + if key_h != val_hash { + // this is a key which has been xor'd with an address. + // recover the address. + let address = key_h ^ val_hash; + + // check that the address is actually a 20-byte value. + // the leftmost 12 bytes should be zero. + if &address[0..12] != &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] { + return None; + } + + let address_hash = Address::from(address).sha3(); + + // create the xor'd key in place. + key_h.copy_from_slice(&*val_hash); + assert_eq!(key_h, val_hash); + + { + let last_src: &[u8] = &*address_hash; + let last_dst: &mut [u8] = &mut *key_h; + for (k, a) in last_dst[12..].iter_mut().zip(&last_src[12..]) { + *k ^= *a; + } + } + + Some(key_h) + } else { + None + } +} + +/// Version for ArchiveDB. +pub struct ArchiveV7; + +impl SimpleMigration for ArchiveV7 { + fn version(&self) -> u32 { + 7 + } + + fn simple_migrate(&mut self, key: Vec, value: Vec) -> Option<(Vec, Vec)> { + if key.len() != 32 { + // metadata key, ignore. + return Some((key, value)); + } + + let key_h = H256::from_slice(&key[..]); + if let Some(new_key) = attempt_migrate(key_h, &value[..]) { + Some((new_key[..].to_owned(), value)) + } else { + Some((key, value)) + } + } +} + +// magic numbers and constants for overlay-recent at v6. +// re-written here because it may change in the journaldb module. +const V7_LATEST_ERA_KEY: &'static [u8] = &[ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ]; +const V7_VERSION_KEY: &'static [u8] = &[ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ]; +const DB_VERSION: u32 = 0x203; +const PADDING : [u8; 10] = [0u8; 10]; + +/// Version for OverlayRecent database. +/// more involved than the archive version because of journaling. +#[derive(Default)] +pub struct OverlayRecentV7 { + migrated_keys: HashMap, +} + +impl OverlayRecentV7 { + // walk all journal entries in the database backwards. + // find migrations for any possible inserted keys. + fn walk_journal(&mut self, source: &Database) -> Result<(), Error> { + if let Some(val) = try!(source.get(V7_LATEST_ERA_KEY).map_err(Error::Custom)) { + let mut era = decode::(&val); + loop { + let mut index: usize = 0; + loop { + let entry_key = { + let mut r = RlpStream::new_list(3); + r.append(&era).append(&index).append(&&PADDING[..]); + r.out() + }; + + if let Some(journal_raw) = try!(source.get(&entry_key).map_err(Error::Custom)) { + let rlp = Rlp::new(&journal_raw); + + // migrate all inserted keys. + for r in rlp.at(1).iter() { + let key: H256 = r.val_at(0); + let v: Bytes = r.val_at(1); + + if self.migrated_keys.get(&key).is_none() { + if let Some(new_key) = attempt_migrate(key, &v) { + self.migrated_keys.insert(key, new_key); + } + } + } + index += 1; + } else { + break; + } + } + + if index == 0 || era == 0 { + break; + } + era -= 1; + } + } + Ok(()) + } + + // walk all journal entries in the database backwards. + // replace all possible inserted/deleted keys with their migrated counterparts + // and commit the altered entries. + fn migrate_journal(&self, source: &Database, mut batch: Batch, dest: &mut Database) -> Result<(), Error> { + if let Some(val) = try!(source.get(V7_LATEST_ERA_KEY).map_err(Error::Custom)) { + try!(batch.insert(V7_LATEST_ERA_KEY.into(), val.to_owned(), dest)); + + let mut era = decode::(&val); + loop { + let mut index: usize = 0; + loop { + let entry_key = { + let mut r = RlpStream::new_list(3); + r.append(&era).append(&index).append(&&PADDING[..]); + r.out() + }; + + if let Some(journal_raw) = try!(source.get(&entry_key).map_err(Error::Custom)) { + let rlp = Rlp::new(&journal_raw); + let id: H256 = rlp.val_at(0); + let mut inserted_keys: Vec<(H256, Bytes)> = Vec::new(); + + // migrate all inserted keys. + for r in rlp.at(1).iter() { + let mut key: H256 = r.val_at(0); + let v: Bytes = r.val_at(1); + + if let Some(new_key) = self.migrated_keys.get(&key) { + key = *new_key; + } + + inserted_keys.push((key, v)); + } + + // migrate all deleted keys. + let mut deleted_keys: Vec = rlp.val_at(2); + for old_key in &mut deleted_keys { + if let Some(new) = self.migrated_keys.get(&*old_key) { + *old_key = new.clone(); + } + } + + // rebuild the journal entry rlp. + let mut stream = RlpStream::new_list(3); + stream.append(&id); + stream.begin_list(inserted_keys.len()); + for (k, v) in inserted_keys { + stream.begin_list(2).append(&k).append(&v); + } + + stream.append(&deleted_keys); + + // and insert it into the new database. + try!(batch.insert(entry_key, stream.out(), dest)); + + index += 1; + } else { + break; + } + } + + if index == 0 || era == 0 { + break; + } + era -= 1; + } + } + batch.commit(dest) + } +} + +impl Migration for OverlayRecentV7 { + fn version(&self) -> u32 { 7 } + + // walk all records in the database, attempting to migrate any possible and + // keeping records of those that we do. then migrate the journal using + // this information. + fn migrate(&mut self, source: &Database, config: &Config, dest: &mut Database) -> Result<(), Error> { + let mut batch = Batch::new(config); + + // check version metadata. + match try!(source.get(V7_VERSION_KEY).map_err(Error::Custom)) { + Some(ref version) if decode::(&*version) == DB_VERSION => {} + _ => return Err(Error::MigrationImpossible), // missing or wrong version + } + + for (key, value) in source.iter() { + let mut key = key.into_vec(); + if key.len() == 32 { + let key_h = H256::from_slice(&key[..]); + if let Some(new_key) = attempt_migrate(key_h.clone(), &value) { + self.migrated_keys.insert(key_h, new_key); + key.copy_from_slice(&new_key[..]); + } + } + + try!(batch.insert(key, value.into_vec(), dest)); + } + + try!(self.walk_journal(source)); + self.migrate_journal(source, batch, dest) + } +} \ No newline at end of file diff --git a/parity/configuration.rs b/parity/configuration.rs index 5bf632e41..3f4955d50 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -295,7 +295,7 @@ impl Configuration { ret } - pub fn find_best_db(&self, spec: &Spec) -> Option { + fn find_best_db(&self, spec: &Spec) -> Option { let mut ret = None; let mut latest_era = None; let jdb_types = [journaldb::Algorithm::Archive, journaldb::Algorithm::EarlyMerge, journaldb::Algorithm::OverlayRecent, journaldb::Algorithm::RefCounted]; @@ -314,6 +314,17 @@ impl Configuration { ret } + pub fn pruning_algorithm(&self, spec: &Spec) -> journaldb::Algorithm { + match self.args.flag_pruning.as_str() { + "archive" => journaldb::Algorithm::Archive, + "light" => journaldb::Algorithm::EarlyMerge, + "fast" => journaldb::Algorithm::OverlayRecent, + "basic" => journaldb::Algorithm::RefCounted, + "auto" => self.find_best_db(spec).unwrap_or(journaldb::Algorithm::OverlayRecent), + _ => { die!("Invalid pruning method given."); } + } + } + pub fn client_config(&self, spec: &Spec) -> ClientConfig { let mut client_config = ClientConfig::default(); @@ -341,14 +352,7 @@ impl Configuration { // forced trace db cache size if provided client_config.tracing.db_cache_size = self.args.flag_db_cache_size.and_then(|cs| Some(cs / 4)); - client_config.pruning = match self.args.flag_pruning.as_str() { - "archive" => journaldb::Algorithm::Archive, - "light" => journaldb::Algorithm::EarlyMerge, - "fast" => journaldb::Algorithm::OverlayRecent, - "basic" => journaldb::Algorithm::RefCounted, - "auto" => self.find_best_db(spec).unwrap_or(journaldb::Algorithm::OverlayRecent), - _ => { die!("Invalid pruning method given."); } - }; + client_config.pruning = self.pruning_algorithm(spec); if self.args.flag_fat_db { if let journaldb::Algorithm::Archive = client_config.pruning { diff --git a/parity/die.rs b/parity/die.rs index c38b041f3..80b31f619 100644 --- a/parity/die.rs +++ b/parity/die.rs @@ -22,7 +22,7 @@ use std::process::exit; #[macro_export] macro_rules! die { - ($($arg:tt)*) => (die_with_message(&format!("{}", format_args!($($arg)*)))); + ($($arg:tt)*) => (::die::die_with_message(&format!("{}", format_args!($($arg)*)))); } pub fn die_with_error(module: &'static str, e: ethcore::error::Error) -> ! { diff --git a/parity/main.rs b/parity/main.rs index 0cf7dd67c..45c60234d 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -174,7 +174,7 @@ fn execute_upgrades(conf: &Configuration, spec: &Spec, client_config: &ClientCon } let db_path = get_db_path(Path::new(&conf.path()), client_config.pruning, spec.genesis_header().hash()); - let result = migrate(&db_path); + let result = migrate(&db_path, client_config.pruning); if let Err(err) = result { die_with_message(&format!("{}", err)); } diff --git a/parity/migration.rs b/parity/migration.rs index 6b5c7de99..853cb6e35 100644 --- a/parity/migration.rs +++ b/parity/migration.rs @@ -17,15 +17,16 @@ use std::fs; use std::fs::File; use std::io::{Read, Write, Error as IoError, ErrorKind}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::fmt::{Display, Formatter, Error as FmtError}; +use util::journaldb::Algorithm; use util::migration::{Manager as MigrationManager, Config as MigrationConfig, Error as MigrationError}; use ethcore::migrations; /// Database is assumed to be at default version, when no version file is found. const DEFAULT_VERSION: u32 = 5; /// Current version of database models. -const CURRENT_VERSION: u32 = 6; +const CURRENT_VERSION: u32 = 7; /// Defines how many items are migrated to the new version of database at once. const BATCH_SIZE: usize = 1024; /// Version file name. @@ -74,15 +75,15 @@ impl From for Error { } /// Returns the version file path. -fn version_file_path(path: &PathBuf) -> PathBuf { - let mut file_path = path.clone(); +fn version_file_path(path: &Path) -> PathBuf { + let mut file_path = path.to_owned(); file_path.push(VERSION_FILE_NAME); file_path } /// Reads current database version from the file at given path. /// If the file does not exist returns `DEFAULT_VERSION`. -fn current_version(path: &PathBuf) -> Result { +fn current_version(path: &Path) -> Result { match File::open(version_file_path(path)) { Err(ref err) if err.kind() == ErrorKind::NotFound => Ok(DEFAULT_VERSION), Err(_) => Err(Error::UnknownDatabaseVersion), @@ -96,7 +97,7 @@ fn current_version(path: &PathBuf) -> Result { /// Writes current database version to the file. /// Creates a new file if the version file does not exist yet. -fn update_version(path: &PathBuf) -> Result<(), Error> { +fn update_version(path: &Path) -> Result<(), Error> { try!(fs::create_dir_all(path)); let mut file = try!(File::create(version_file_path(path))); try!(file.write_all(format!("{}", CURRENT_VERSION).as_bytes())); @@ -104,22 +105,29 @@ fn update_version(path: &PathBuf) -> Result<(), Error> { } /// Blocks database path. -fn blocks_database_path(path: &PathBuf) -> PathBuf { - let mut blocks_path = path.clone(); +fn blocks_database_path(path: &Path) -> PathBuf { + let mut blocks_path = path.to_owned(); blocks_path.push("blocks"); blocks_path } /// Extras database path. -fn extras_database_path(path: &PathBuf) -> PathBuf { - let mut extras_path = path.clone(); +fn extras_database_path(path: &Path) -> PathBuf { + let mut extras_path = path.to_owned(); extras_path.push("extras"); extras_path } +/// State database path. +fn state_database_path(path: &Path) -> PathBuf { + let mut state_path = path.to_owned(); + state_path.push("state"); + state_path +} + /// Database backup -fn backup_database_path(path: &PathBuf) -> PathBuf { - let mut backup_path = path.clone(); +fn backup_database_path(path: &Path) -> PathBuf { + let mut backup_path = path.to_owned(); backup_path.pop(); backup_path.push("temp_backup"); backup_path @@ -132,21 +140,34 @@ fn default_migration_settings() -> MigrationConfig { } } -/// Migrations on blocks database. +/// Migrations on the blocks database. fn blocks_database_migrations() -> Result { let manager = MigrationManager::new(default_migration_settings()); Ok(manager) } -/// Migrations on extras database. +/// Migrations on the extras database. fn extras_database_migrations() -> Result { let mut manager = MigrationManager::new(default_migration_settings()); try!(manager.add_migration(migrations::extras::ToV6).map_err(|_| Error::MigrationImpossible)); Ok(manager) } +/// Migrations on the state database. +fn state_database_migrations(pruning: Algorithm) -> Result { + let mut manager = MigrationManager::new(default_migration_settings()); + let res = match pruning { + Algorithm::Archive => manager.add_migration(migrations::state::ArchiveV7), + Algorithm::OverlayRecent => manager.add_migration(migrations::state::OverlayRecentV7::default()), + _ => die!("Unsupported pruning method for migration. Delete DB and resync"), + }; + + try!(res.map_err(|_| Error::MigrationImpossible)); + Ok(manager) +} + /// Migrates database at given position with given migration rules. -fn migrate_database(version: u32, db_path: PathBuf, migrations: MigrationManager) -> Result<(), Error> { +fn migrate_database(version: u32, db_path: PathBuf, mut migrations: MigrationManager) -> Result<(), Error> { // check if migration is needed if !migrations.is_needed(version) { return Ok(()) @@ -175,12 +196,12 @@ fn migrate_database(version: u32, db_path: PathBuf, migrations: MigrationManager Ok(()) } -fn exists(path: &PathBuf) -> bool { +fn exists(path: &Path) -> bool { fs::metadata(path).is_ok() } /// Migrates the database. -pub fn migrate(path: &PathBuf) -> Result<(), Error> { +pub fn migrate(path: &Path, pruning: Algorithm) -> Result<(), Error> { // read version file. let version = try!(current_version(path)); @@ -190,6 +211,7 @@ pub fn migrate(path: &PathBuf) -> Result<(), Error> { println!("Migrating database from version {} to {}", version, CURRENT_VERSION); try!(migrate_database(version, blocks_database_path(path), try!(blocks_database_migrations()))); try!(migrate_database(version, extras_database_path(path), try!(extras_database_migrations()))); + try!(migrate_database(version, state_database_path(path), try!(state_database_migrations(pruning)))); println!("Migration finished"); } diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index 0187f4058..4b971c167 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -18,7 +18,6 @@ use std::collections::BTreeMap; use std::str::FromStr; use std::sync::Arc; -use die::*; use ethsync::EthSync; use ethcore::miner::{Miner, ExternalMiner}; use ethcore::client::Client; diff --git a/util/src/migration/mod.rs b/util/src/migration/mod.rs index fddb51fca..048441d7d 100644 --- a/util/src/migration/mod.rs +++ b/util/src/migration/mod.rs @@ -25,6 +25,7 @@ use std::path::{Path, PathBuf}; use ::kvdb::{CompactionProfile, Database, DatabaseConfig, DBTransaction}; /// Migration config. +#[derive(Clone)] pub struct Config { /// Defines how many elements should be migrated at once. pub batch_size: usize, @@ -38,6 +39,45 @@ impl Default for Config { } } +/// A batch of key-value pairs to be written into the database. +pub struct Batch { + inner: BTreeMap, Vec>, + batch_size: usize, +} + +impl Batch { + /// Make a new batch with the given config. + pub fn new(config: &Config) -> Self { + Batch { + inner: BTreeMap::new(), + batch_size: config.batch_size, + } + } + + /// Insert a value into the batch, committing if necessary. + pub fn insert(&mut self, key: Vec, value: Vec, dest: &mut Database) -> Result<(), Error> { + self.inner.insert(key, value); + if self.inner.len() == self.batch_size { + try!(self.commit(dest)); + } + Ok(()) + } + + /// Commit all the items in the batch to the given database. + pub fn commit(&mut self, dest: &mut Database) -> Result<(), Error> { + if self.inner.is_empty() { return Ok(()) } + + let transaction = DBTransaction::new(); + + for keypair in &self.inner { + try!(transaction.put(&keypair.0, &keypair.1).map_err(Error::Custom)); + } + + self.inner.clear(); + dest.write(transaction).map_err(Error::Custom) + } +} + /// Migration error. #[derive(Debug)] pub enum Error { @@ -62,7 +102,7 @@ pub trait Migration: 'static { /// Version of the database after the migration. fn version(&self) -> u32; /// Migrate a source to a destination. - fn migrate(&self, source: &Database, config: &Config, destination: &mut Database) -> Result<(), Error>; + fn migrate(&mut self, source: &Database, config: &Config, destination: &mut Database) -> Result<(), Error>; } /// A simple migration over key-value pairs. @@ -71,46 +111,25 @@ pub trait SimpleMigration: 'static { fn version(&self) -> u32; /// Should migrate existing object to new database. /// Returns `None` if the object does not exist in new version of database. - fn simple_migrate(&self, key: Vec, value: Vec) -> Option<(Vec, Vec)>; + fn simple_migrate(&mut self, key: Vec, value: Vec) -> Option<(Vec, Vec)>; } impl Migration for T { fn version(&self) -> u32 { SimpleMigration::version(self) } - fn migrate(&self, source: &Database, config: &Config, dest: &mut Database) -> Result<(), Error> { - let mut batch: BTreeMap, Vec> = BTreeMap::new(); + fn migrate(&mut self, source: &Database, config: &Config, dest: &mut Database) -> Result<(), Error> { + let mut batch = Batch::new(config); for (key, value) in source.iter() { - if let Some((key, value)) = self.simple_migrate(key.to_vec(), value.to_vec()) { - batch.insert(key, value); - } - - if batch.len() == config.batch_size { - try!(commit_batch(dest, &batch)); - batch.clear(); + try!(batch.insert(key, value, dest)); } } - if !batch.is_empty() { - try!(commit_batch(dest, &batch)); - } - - Ok(()) + batch.commit(dest) } } -/// Commit a batch of writes to a database. -pub fn commit_batch(db: &mut Database, batch: &BTreeMap, Vec>) -> Result<(), Error> { - let transaction = DBTransaction::new(); - - for keypair in batch { - try!(transaction.put(&keypair.0, &keypair.1).map_err(Error::Custom)); - } - - db.write(transaction).map_err(Error::Custom) -} - /// Get the path where all databases reside. fn database_path(path: &Path) -> PathBuf { let mut temp_path = path.to_owned(); @@ -174,7 +193,8 @@ impl Manager { /// Performs migration in order, starting with a source path, migrating between two temporary databases, /// and producing a path where the final migration lives. - pub fn execute(&self, old_path: &Path, version: u32) -> Result { + pub fn execute(&mut self, old_path: &Path, version: u32) -> Result { + let config = self.config.clone(); let migrations = try!(self.migrations_from(version).ok_or(Error::MigrationImpossible)); let db_config = DatabaseConfig { prefix_size: None, @@ -197,7 +217,7 @@ impl Manager { let mut new_db = try!(Database::open(&db_config, temp_path_str).map_err(Error::Custom)); // perform the migration from cur_db to new_db. - try!(migration.migrate(&cur_db, &self.config, &mut new_db)); + try!(migration.migrate(&cur_db, &config, &mut new_db)); // next iteration, we will migrate from this db into the other temp. cur_db = new_db; temp_idx.swap(); @@ -216,10 +236,10 @@ impl Manager { } } - fn migrations_from(&self, version: u32) -> Option<&[Box]> { + fn migrations_from(&mut self, version: u32) -> Option<&mut [Box]> { // index of the first required migration let position = self.migrations.iter().position(|m| m.version() == version + 1); - position.map(|p| &self.migrations[p..]) + position.map(move |p| &mut self.migrations[p..]) } } diff --git a/util/src/migration/tests.rs b/util/src/migration/tests.rs index f441b31e4..58d2c9008 100644 --- a/util/src/migration/tests.rs +++ b/util/src/migration/tests.rs @@ -62,7 +62,7 @@ impl SimpleMigration for Migration0 { 1 } - fn simple_migrate(&self, key: Vec, value: Vec) -> Option<(Vec, Vec)> { + fn simple_migrate(&mut self, key: Vec, value: Vec) -> Option<(Vec, Vec)> { let mut key = key; key.push(0x11); let mut value = value; @@ -78,7 +78,7 @@ impl SimpleMigration for Migration1 { 2 } - fn simple_migrate(&self, key: Vec, _value: Vec) -> Option<(Vec, Vec)> { + fn simple_migrate(&mut self, key: Vec, _value: Vec) -> Option<(Vec, Vec)> { Some((key, vec![])) } }