diff --git a/util/src/kvdb.rs b/util/src/kvdb.rs index 1714ce22f..add9528cc 100644 --- a/util/src/kvdb.rs +++ b/util/src/kvdb.rs @@ -410,6 +410,29 @@ struct DBAndColumns { cfs: Vec, } +// get column family configuration from database config. +fn col_config(col: u32, config: &DatabaseConfig) -> Options { + // default cache size for columns not specified. + const DEFAULT_CACHE: usize = 2; + + let mut opts = Options::new(); + opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction); + opts.set_target_file_size_base(config.compaction.initial_file_size); + opts.set_target_file_size_multiplier(config.compaction.file_size_multiplier); + + let col_opt = config.columns.map(|_| col); + + { + let cache_size = config.cache_sizes.get(&col_opt).cloned().unwrap_or(DEFAULT_CACHE); + let mut block_opts = BlockBasedOptions::new(); + // all goes to read cache. + block_opts.set_cache(Cache::new(cache_size * 1024 * 1024)); + opts.set_block_based_table_factory(&block_opts); + } + + opts +} + /// Key-Value database. pub struct Database { db: RwLock>, @@ -434,9 +457,6 @@ impl Database { /// Open database file. Creates if it does not exist. pub fn open(config: &DatabaseConfig, path: &str) -> Result { - // default cache size for columns not specified. - const DEFAULT_CACHE: usize = 2; - let mut opts = Options::new(); if let Some(rate_limit) = config.compaction.write_rate_limit { opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit))?; @@ -460,22 +480,7 @@ impl Database { let cfnames: Vec<&str> = cfnames.iter().map(|n| n as &str).collect(); for col in 0 .. config.columns.unwrap_or(0) { - let mut opts = Options::new(); - opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction); - opts.set_target_file_size_base(config.compaction.initial_file_size); - opts.set_target_file_size_multiplier(config.compaction.file_size_multiplier); - - let col_opt = config.columns.map(|_| col); - - { - let cache_size = config.cache_sizes.get(&col_opt).cloned().unwrap_or(DEFAULT_CACHE); - let mut block_opts = BlockBasedOptions::new(); - // all goes to read cache. - block_opts.set_cache(Cache::new(cache_size * 1024 * 1024)); - opts.set_block_based_table_factory(&block_opts); - } - - cf_options.push(opts); + cf_options.push(col_config(col, &config)); } let mut write_opts = WriteOptions::new(); @@ -768,6 +773,42 @@ impl Database { *self.flushing.write() = mem::replace(&mut *db.flushing.write(), Vec::new()); Ok(()) } + + /// The number of non-default column families. + pub fn num_columns(&self) -> u32 { + self.db.read().as_ref() + .and_then(|db| if db.cfs.is_empty() { None } else { Some(db.cfs.len()) } ) + .map(|n| n as u32) + .unwrap_or(0) + } + + /// Drop a column family. + pub fn drop_column(&self) -> Result<(), String> { + match *self.db.write() { + Some(DBAndColumns { ref mut db, ref mut cfs }) => { + if let Some(col) = cfs.pop() { + let name = format!("col{}", cfs.len()); + drop(col); + db.drop_cf(&name)?; + } + Ok(()) + }, + None => Ok(()), + } + } + + /// Add a column family. + pub fn add_column(&self) -> Result<(), String> { + match *self.db.write() { + Some(DBAndColumns { ref mut db, ref mut cfs }) => { + let col = cfs.len() as u32; + let name = format!("col{}", col); + cfs.push(db.create_cf(&name, &col_config(col, &self.config))?); + Ok(()) + }, + None => Ok(()), + } + } } // duplicate declaration of methods here to avoid trait import in certain existing cases @@ -886,4 +927,40 @@ mod tests { let expected_output = Some(PathBuf::from("/sys/block/sda/queue/rotational")); assert_eq!(rotational_from_df_output(example_df), expected_output); } + + #[test] + fn dynamic_add_drop_columns() { + let config = DatabaseConfig::default(); + let config_5 = DatabaseConfig::with_columns(Some(5)); + + let path = RandomTempPath::create_dir(); + + // open empty, add 5. + { + let db = Database::open(&config, path.as_path().to_str().unwrap()).unwrap(); + assert_eq!(db.num_columns(), 0); + + for i in 0..5 { + db.add_column().unwrap(); + assert_eq!(db.num_columns(), i + 1); + } + } + + // open 5, remove all. + { + let db = Database::open(&config_5, path.as_path().to_str().unwrap()).unwrap(); + assert_eq!(db.num_columns(), 5); + + for i in (0..5).rev() { + db.drop_column().unwrap(); + assert_eq!(db.num_columns(), i); + } + } + + // reopen as 0. + { + let db = Database::open(&config, path.as_path().to_str().unwrap()).unwrap(); + assert_eq!(db.num_columns(), 0); + } + } } diff --git a/util/src/migration/mod.rs b/util/src/migration/mod.rs index 50464444f..3af86fc5b 100644 --- a/util/src/migration/mod.rs +++ b/util/src/migration/mod.rs @@ -127,6 +127,9 @@ pub trait Migration: 'static { fn pre_columns(&self) -> Option { self.columns() } /// Number of columns in database after the migration. fn columns(&self) -> Option; + /// Whether this migration alters any existing columns. + /// if not, then column families will simply be added and `migrate` will never be called. + fn alters_existing(&self) -> bool { true } /// Version of the database after the migration. fn version(&self) -> u32; /// Migrate a source to a destination. @@ -149,6 +152,8 @@ impl Migration for T { fn version(&self) -> u32 { SimpleMigration::version(self) } + fn alters_existing(&self) -> bool { true } + fn migrate(&mut self, source: Arc, config: &Config, dest: &mut Database, col: Option) -> Result<(), Error> { let mut batch = Batch::new(config, col); @@ -256,28 +261,41 @@ impl Manager { let current_columns = db_config.columns; db_config.columns = migration.columns(); - // open the target temporary database. - temp_path = temp_idx.path(&db_root); - let temp_path_str = temp_path.to_str().ok_or(Error::MigrationImpossible)?; - let mut new_db = Database::open(&db_config, temp_path_str).map_err(Error::Custom)?; + // slow migrations: alter existing data. + if migration.alters_existing() { + // open the target temporary database. + temp_path = temp_idx.path(&db_root); + let temp_path_str = temp_path.to_str().ok_or(Error::MigrationImpossible)?; + let mut new_db = Database::open(&db_config, temp_path_str).map_err(Error::Custom)?; - // perform the migration from cur_db to new_db. - match current_columns { - // migrate only default column - None => migration.migrate(cur_db.clone(), &config, &mut new_db, None)?, - Some(v) => { - // Migrate all columns in previous DB - for col in 0..v { - migration.migrate(cur_db.clone(), &config, &mut new_db, Some(col))? + match current_columns { + // migrate only default column + None => migration.migrate(cur_db.clone(), &config, &mut new_db, None)?, + Some(v) => { + // Migrate all columns in previous DB + for col in 0..v { + migration.migrate(cur_db.clone(), &config, &mut new_db, Some(col))? + } } } - } - // next iteration, we will migrate from this db into the other temp. - cur_db = Arc::new(new_db); - temp_idx.swap(); + // next iteration, we will migrate from this db into the other temp. + cur_db = Arc::new(new_db); + temp_idx.swap(); - // remove the other temporary migration database. - let _ = fs::remove_dir_all(temp_idx.path(&db_root)); + // remove the other temporary migration database. + let _ = fs::remove_dir_all(temp_idx.path(&db_root)); + } else { + // migrations which simply add or remove column families. + // we can do this in-place. + let goal_columns = migration.columns().unwrap_or(0); + while cur_db.num_columns() < goal_columns { + cur_db.add_column().map_err(Error::Custom)?; + } + + while cur_db.num_columns() > goal_columns { + cur_db.drop_column().map_err(Error::Custom)?; + } + } } Ok(temp_path) }