Merge pull request #4687 from ethcore/fast-migrate
Fast in-place migration for adding and removing column families
This commit is contained in:
commit
655c7ae7ae
@ -28,4 +28,4 @@ mod v10;
|
|||||||
pub use self::v10::ToV10;
|
pub use self::v10::ToV10;
|
||||||
|
|
||||||
mod v11;
|
mod v11;
|
||||||
pub use self::v11::ToV11;
|
pub use self::v11::TO_V11;
|
||||||
|
@ -14,33 +14,13 @@
|
|||||||
// You should have received a copy of the GNU General Public License
|
// You should have received a copy of the GNU General Public License
|
||||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
|
||||||
//! Adds a seventh column for node information.
|
//! Adds a seventh column for node information.
|
||||||
|
|
||||||
use util::kvdb::Database;
|
use util::migration::ChangeColumns;
|
||||||
use util::migration::{Batch, Config, Error, Migration, Progress};
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
/// Copies over data for all existing columns.
|
/// The migration from v10 to v11.
|
||||||
#[derive(Default)]
|
pub const TO_V11: ChangeColumns = ChangeColumns {
|
||||||
pub struct ToV11(Progress);
|
pre_columns: Some(6),
|
||||||
|
post_columns: Some(7),
|
||||||
|
version: 11,
|
||||||
impl Migration for ToV11 {
|
};
|
||||||
fn pre_columns(&self) -> Option<u32> { Some(6) }
|
|
||||||
fn columns(&self) -> Option<u32> { Some(7) }
|
|
||||||
|
|
||||||
fn version(&self) -> u32 { 11 }
|
|
||||||
|
|
||||||
fn migrate(&mut self, source: Arc<Database>, config: &Config, dest: &mut Database, col: Option<u32>) -> Result<(), Error> {
|
|
||||||
// just copy everything over.
|
|
||||||
let mut batch = Batch::new(config, col);
|
|
||||||
|
|
||||||
for (key, value) in source.iter(col) {
|
|
||||||
self.0.tick();
|
|
||||||
batch.insert(key.to_vec(), value.to_vec(), dest)?
|
|
||||||
}
|
|
||||||
|
|
||||||
batch.commit(dest)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -146,7 +146,7 @@ pub fn default_migration_settings(compaction_profile: &CompactionProfile) -> Mig
|
|||||||
fn consolidated_database_migrations(compaction_profile: &CompactionProfile) -> Result<MigrationManager, Error> {
|
fn consolidated_database_migrations(compaction_profile: &CompactionProfile) -> Result<MigrationManager, Error> {
|
||||||
let mut manager = MigrationManager::new(default_migration_settings(compaction_profile));
|
let mut manager = MigrationManager::new(default_migration_settings(compaction_profile));
|
||||||
manager.add_migration(migrations::ToV10::new()).map_err(|_| Error::MigrationImpossible)?;
|
manager.add_migration(migrations::ToV10::new()).map_err(|_| Error::MigrationImpossible)?;
|
||||||
manager.add_migration(migrations::ToV11::default()).map_err(|_| Error::MigrationImpossible)?;
|
manager.add_migration(migrations::TO_V11).map_err(|_| Error::MigrationImpossible)?;
|
||||||
Ok(manager)
|
Ok(manager)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -201,6 +201,10 @@ fn migrate_database(version: u32, db_path: PathBuf, mut migrations: MigrationMan
|
|||||||
// migrate old database to the new one
|
// migrate old database to the new one
|
||||||
let temp_path = migrations.execute(&db_path, version)?;
|
let temp_path = migrations.execute(&db_path, version)?;
|
||||||
|
|
||||||
|
// completely in-place migration leads to the paths being equal.
|
||||||
|
// in that case, no need to shuffle directories.
|
||||||
|
if temp_path == db_path { return Ok(()) }
|
||||||
|
|
||||||
// create backup
|
// create backup
|
||||||
fs::rename(&db_path, &backup_path)?;
|
fs::rename(&db_path, &backup_path)?;
|
||||||
|
|
||||||
@ -212,9 +216,7 @@ fn migrate_database(version: u32, db_path: PathBuf, mut migrations: MigrationMan
|
|||||||
}
|
}
|
||||||
|
|
||||||
// remove backup
|
// remove backup
|
||||||
fs::remove_dir_all(&backup_path)?;
|
fs::remove_dir_all(&backup_path).map_err(Into::into)
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn exists(path: &Path) -> bool {
|
fn exists(path: &Path) -> bool {
|
||||||
|
129
util/src/kvdb.rs
129
util/src/kvdb.rs
@ -410,6 +410,29 @@ struct DBAndColumns {
|
|||||||
cfs: Vec<Column>,
|
cfs: Vec<Column>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// get column family configuration from database config.
|
||||||
|
fn col_config(col: u32, config: &DatabaseConfig) -> Options {
|
||||||
|
// default cache size for columns not specified.
|
||||||
|
const DEFAULT_CACHE: usize = 2;
|
||||||
|
|
||||||
|
let mut opts = Options::new();
|
||||||
|
opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction);
|
||||||
|
opts.set_target_file_size_base(config.compaction.initial_file_size);
|
||||||
|
opts.set_target_file_size_multiplier(config.compaction.file_size_multiplier);
|
||||||
|
|
||||||
|
let col_opt = config.columns.map(|_| col);
|
||||||
|
|
||||||
|
{
|
||||||
|
let cache_size = config.cache_sizes.get(&col_opt).cloned().unwrap_or(DEFAULT_CACHE);
|
||||||
|
let mut block_opts = BlockBasedOptions::new();
|
||||||
|
// all goes to read cache.
|
||||||
|
block_opts.set_cache(Cache::new(cache_size * 1024 * 1024));
|
||||||
|
opts.set_block_based_table_factory(&block_opts);
|
||||||
|
}
|
||||||
|
|
||||||
|
opts
|
||||||
|
}
|
||||||
|
|
||||||
/// Key-Value database.
|
/// Key-Value database.
|
||||||
pub struct Database {
|
pub struct Database {
|
||||||
db: RwLock<Option<DBAndColumns>>,
|
db: RwLock<Option<DBAndColumns>>,
|
||||||
@ -434,9 +457,6 @@ impl Database {
|
|||||||
|
|
||||||
/// Open database file. Creates if it does not exist.
|
/// Open database file. Creates if it does not exist.
|
||||||
pub fn open(config: &DatabaseConfig, path: &str) -> Result<Database, String> {
|
pub fn open(config: &DatabaseConfig, path: &str) -> Result<Database, String> {
|
||||||
// default cache size for columns not specified.
|
|
||||||
const DEFAULT_CACHE: usize = 2;
|
|
||||||
|
|
||||||
let mut opts = Options::new();
|
let mut opts = Options::new();
|
||||||
if let Some(rate_limit) = config.compaction.write_rate_limit {
|
if let Some(rate_limit) = config.compaction.write_rate_limit {
|
||||||
opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit))?;
|
opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit))?;
|
||||||
@ -460,22 +480,7 @@ impl Database {
|
|||||||
let cfnames: Vec<&str> = cfnames.iter().map(|n| n as &str).collect();
|
let cfnames: Vec<&str> = cfnames.iter().map(|n| n as &str).collect();
|
||||||
|
|
||||||
for col in 0 .. config.columns.unwrap_or(0) {
|
for col in 0 .. config.columns.unwrap_or(0) {
|
||||||
let mut opts = Options::new();
|
cf_options.push(col_config(col, &config));
|
||||||
opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction);
|
|
||||||
opts.set_target_file_size_base(config.compaction.initial_file_size);
|
|
||||||
opts.set_target_file_size_multiplier(config.compaction.file_size_multiplier);
|
|
||||||
|
|
||||||
let col_opt = config.columns.map(|_| col);
|
|
||||||
|
|
||||||
{
|
|
||||||
let cache_size = config.cache_sizes.get(&col_opt).cloned().unwrap_or(DEFAULT_CACHE);
|
|
||||||
let mut block_opts = BlockBasedOptions::new();
|
|
||||||
// all goes to read cache.
|
|
||||||
block_opts.set_cache(Cache::new(cache_size * 1024 * 1024));
|
|
||||||
opts.set_block_based_table_factory(&block_opts);
|
|
||||||
}
|
|
||||||
|
|
||||||
cf_options.push(opts);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut write_opts = WriteOptions::new();
|
let mut write_opts = WriteOptions::new();
|
||||||
@ -768,6 +773,42 @@ impl Database {
|
|||||||
*self.flushing.write() = mem::replace(&mut *db.flushing.write(), Vec::new());
|
*self.flushing.write() = mem::replace(&mut *db.flushing.write(), Vec::new());
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The number of non-default column families.
|
||||||
|
pub fn num_columns(&self) -> u32 {
|
||||||
|
self.db.read().as_ref()
|
||||||
|
.and_then(|db| if db.cfs.is_empty() { None } else { Some(db.cfs.len()) } )
|
||||||
|
.map(|n| n as u32)
|
||||||
|
.unwrap_or(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Drop a column family.
|
||||||
|
pub fn drop_column(&self) -> Result<(), String> {
|
||||||
|
match *self.db.write() {
|
||||||
|
Some(DBAndColumns { ref mut db, ref mut cfs }) => {
|
||||||
|
if let Some(col) = cfs.pop() {
|
||||||
|
let name = format!("col{}", cfs.len());
|
||||||
|
drop(col);
|
||||||
|
db.drop_cf(&name)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
None => Ok(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add a column family.
|
||||||
|
pub fn add_column(&self) -> Result<(), String> {
|
||||||
|
match *self.db.write() {
|
||||||
|
Some(DBAndColumns { ref mut db, ref mut cfs }) => {
|
||||||
|
let col = cfs.len() as u32;
|
||||||
|
let name = format!("col{}", col);
|
||||||
|
cfs.push(db.create_cf(&name, &col_config(col, &self.config))?);
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
None => Ok(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// duplicate declaration of methods here to avoid trait import in certain existing cases
|
// duplicate declaration of methods here to avoid trait import in certain existing cases
|
||||||
@ -886,4 +927,54 @@ mod tests {
|
|||||||
let expected_output = Some(PathBuf::from("/sys/block/sda/queue/rotational"));
|
let expected_output = Some(PathBuf::from("/sys/block/sda/queue/rotational"));
|
||||||
assert_eq!(rotational_from_df_output(example_df), expected_output);
|
assert_eq!(rotational_from_df_output(example_df), expected_output);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn add_columns() {
|
||||||
|
let config = DatabaseConfig::default();
|
||||||
|
let config_5 = DatabaseConfig::with_columns(Some(5));
|
||||||
|
|
||||||
|
let path = RandomTempPath::create_dir();
|
||||||
|
|
||||||
|
// open empty, add 5.
|
||||||
|
{
|
||||||
|
let db = Database::open(&config, path.as_path().to_str().unwrap()).unwrap();
|
||||||
|
assert_eq!(db.num_columns(), 0);
|
||||||
|
|
||||||
|
for i in 0..5 {
|
||||||
|
db.add_column().unwrap();
|
||||||
|
assert_eq!(db.num_columns(), i + 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// reopen as 5.
|
||||||
|
{
|
||||||
|
let db = Database::open(&config_5, path.as_path().to_str().unwrap()).unwrap();
|
||||||
|
assert_eq!(db.num_columns(), 5);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn drop_columns() {
|
||||||
|
let config = DatabaseConfig::default();
|
||||||
|
let config_5 = DatabaseConfig::with_columns(Some(5));
|
||||||
|
|
||||||
|
let path = RandomTempPath::create_dir();
|
||||||
|
|
||||||
|
// open 5, remove all.
|
||||||
|
{
|
||||||
|
let db = Database::open(&config_5, path.as_path().to_str().unwrap()).unwrap();
|
||||||
|
assert_eq!(db.num_columns(), 5);
|
||||||
|
|
||||||
|
for i in (0..5).rev() {
|
||||||
|
db.drop_column().unwrap();
|
||||||
|
assert_eq!(db.num_columns(), i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// reopen as 0.
|
||||||
|
{
|
||||||
|
let db = Database::open(&config, path.as_path().to_str().unwrap()).unwrap();
|
||||||
|
assert_eq!(db.num_columns(), 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -127,6 +127,9 @@ pub trait Migration: 'static {
|
|||||||
fn pre_columns(&self) -> Option<u32> { self.columns() }
|
fn pre_columns(&self) -> Option<u32> { self.columns() }
|
||||||
/// Number of columns in database after the migration.
|
/// Number of columns in database after the migration.
|
||||||
fn columns(&self) -> Option<u32>;
|
fn columns(&self) -> Option<u32>;
|
||||||
|
/// Whether this migration alters any existing columns.
|
||||||
|
/// if not, then column families will simply be added and `migrate` will never be called.
|
||||||
|
fn alters_existing(&self) -> bool { true }
|
||||||
/// Version of the database after the migration.
|
/// Version of the database after the migration.
|
||||||
fn version(&self) -> u32;
|
fn version(&self) -> u32;
|
||||||
/// Migrate a source to a destination.
|
/// Migrate a source to a destination.
|
||||||
@ -149,6 +152,8 @@ impl<T: SimpleMigration> Migration for T {
|
|||||||
|
|
||||||
fn version(&self) -> u32 { SimpleMigration::version(self) }
|
fn version(&self) -> u32 { SimpleMigration::version(self) }
|
||||||
|
|
||||||
|
fn alters_existing(&self) -> bool { true }
|
||||||
|
|
||||||
fn migrate(&mut self, source: Arc<Database>, config: &Config, dest: &mut Database, col: Option<u32>) -> Result<(), Error> {
|
fn migrate(&mut self, source: Arc<Database>, config: &Config, dest: &mut Database, col: Option<u32>) -> Result<(), Error> {
|
||||||
let mut batch = Batch::new(config, col);
|
let mut batch = Batch::new(config, col);
|
||||||
|
|
||||||
@ -162,6 +167,26 @@ impl<T: SimpleMigration> Migration for T {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// An even simpler migration which just changes the number of columns.
|
||||||
|
pub struct ChangeColumns {
|
||||||
|
/// The amount of columns before this migration.
|
||||||
|
pub pre_columns: Option<u32>,
|
||||||
|
/// The amount of columns after this migration.
|
||||||
|
pub post_columns: Option<u32>,
|
||||||
|
/// The version after this migration.
|
||||||
|
pub version: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Migration for ChangeColumns {
|
||||||
|
fn pre_columns(&self) -> Option<u32> { self.pre_columns }
|
||||||
|
fn columns(&self) -> Option<u32> { self.post_columns }
|
||||||
|
fn version(&self) -> u32 { self.version }
|
||||||
|
fn alters_existing(&self) -> bool { false }
|
||||||
|
fn migrate(&mut self, _: Arc<Database>, _: &Config, _: &mut Database, _: Option<u32>) -> Result<(), Error> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Get the path where all databases reside.
|
/// Get the path where all databases reside.
|
||||||
fn database_path(path: &Path) -> PathBuf {
|
fn database_path(path: &Path) -> PathBuf {
|
||||||
let mut temp_path = path.to_owned();
|
let mut temp_path = path.to_owned();
|
||||||
@ -244,7 +269,7 @@ impl Manager {
|
|||||||
|
|
||||||
let db_root = database_path(old_path);
|
let db_root = database_path(old_path);
|
||||||
let mut temp_idx = TempIndex::One;
|
let mut temp_idx = TempIndex::One;
|
||||||
let mut temp_path = temp_idx.path(&db_root);
|
let mut temp_path = old_path.to_path_buf();
|
||||||
|
|
||||||
// start with the old db.
|
// start with the old db.
|
||||||
let old_path_str = old_path.to_str().ok_or(Error::MigrationImpossible)?;
|
let old_path_str = old_path.to_str().ok_or(Error::MigrationImpossible)?;
|
||||||
@ -256,12 +281,14 @@ impl Manager {
|
|||||||
let current_columns = db_config.columns;
|
let current_columns = db_config.columns;
|
||||||
db_config.columns = migration.columns();
|
db_config.columns = migration.columns();
|
||||||
|
|
||||||
// open the target temporary database.
|
// slow migrations: alter existing data.
|
||||||
|
if migration.alters_existing() {
|
||||||
temp_path = temp_idx.path(&db_root);
|
temp_path = temp_idx.path(&db_root);
|
||||||
|
|
||||||
|
// open the target temporary database.
|
||||||
let temp_path_str = temp_path.to_str().ok_or(Error::MigrationImpossible)?;
|
let temp_path_str = temp_path.to_str().ok_or(Error::MigrationImpossible)?;
|
||||||
let mut new_db = Database::open(&db_config, temp_path_str).map_err(Error::Custom)?;
|
let mut new_db = Database::open(&db_config, temp_path_str).map_err(Error::Custom)?;
|
||||||
|
|
||||||
// perform the migration from cur_db to new_db.
|
|
||||||
match current_columns {
|
match current_columns {
|
||||||
// migrate only default column
|
// migrate only default column
|
||||||
None => migration.migrate(cur_db.clone(), &config, &mut new_db, None)?,
|
None => migration.migrate(cur_db.clone(), &config, &mut new_db, None)?,
|
||||||
@ -278,6 +305,18 @@ impl Manager {
|
|||||||
|
|
||||||
// remove the other temporary migration database.
|
// remove the other temporary migration database.
|
||||||
let _ = fs::remove_dir_all(temp_idx.path(&db_root));
|
let _ = fs::remove_dir_all(temp_idx.path(&db_root));
|
||||||
|
} else {
|
||||||
|
// migrations which simply add or remove column families.
|
||||||
|
// we can do this in-place.
|
||||||
|
let goal_columns = migration.columns().unwrap_or(0);
|
||||||
|
while cur_db.num_columns() < goal_columns {
|
||||||
|
cur_db.add_column().map_err(Error::Custom)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
while cur_db.num_columns() > goal_columns {
|
||||||
|
cur_db.drop_column().map_err(Error::Custom)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(temp_path)
|
Ok(temp_path)
|
||||||
}
|
}
|
||||||
|
@ -226,3 +226,26 @@ fn pre_columns() {
|
|||||||
// short of the one before it.
|
// short of the one before it.
|
||||||
manager.execute(&db_path, 0).unwrap();
|
manager.execute(&db_path, 0).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn change_columns() {
|
||||||
|
use kvdb::DatabaseConfig;
|
||||||
|
|
||||||
|
let mut manager = Manager::new(Config::default());
|
||||||
|
manager.add_migration(::migration::ChangeColumns {
|
||||||
|
pre_columns: None,
|
||||||
|
post_columns: Some(4),
|
||||||
|
version: 1,
|
||||||
|
}).unwrap();
|
||||||
|
|
||||||
|
let dir = RandomTempPath::create_dir();
|
||||||
|
let db_path = db_path(dir.as_path());
|
||||||
|
|
||||||
|
let new_path = manager.execute(&db_path, 0).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(db_path, new_path, "Changing columns is an in-place migration.");
|
||||||
|
|
||||||
|
let config = DatabaseConfig::with_columns(Some(4));
|
||||||
|
let db = Database::open(&config, new_path.to_str().unwrap()).unwrap();
|
||||||
|
assert_eq!(db.num_columns(), 4);
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user