add and remove column families dynamically

This commit is contained in:
Robert Habermeier 2017-02-26 18:29:35 +01:00
parent 1bf2b27708
commit 3cc007b4d6
2 changed files with 132 additions and 37 deletions

View File

@ -410,6 +410,29 @@ struct DBAndColumns {
cfs: Vec<Column>, cfs: Vec<Column>,
} }
// get column family configuration from database config.
fn col_config(col: u32, config: &DatabaseConfig) -> Options {
// default cache size for columns not specified.
const DEFAULT_CACHE: usize = 2;
let mut opts = Options::new();
opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction);
opts.set_target_file_size_base(config.compaction.initial_file_size);
opts.set_target_file_size_multiplier(config.compaction.file_size_multiplier);
let col_opt = config.columns.map(|_| col);
{
let cache_size = config.cache_sizes.get(&col_opt).cloned().unwrap_or(DEFAULT_CACHE);
let mut block_opts = BlockBasedOptions::new();
// all goes to read cache.
block_opts.set_cache(Cache::new(cache_size * 1024 * 1024));
opts.set_block_based_table_factory(&block_opts);
}
opts
}
/// Key-Value database. /// Key-Value database.
pub struct Database { pub struct Database {
db: RwLock<Option<DBAndColumns>>, db: RwLock<Option<DBAndColumns>>,
@ -434,9 +457,6 @@ impl Database {
/// Open database file. Creates if it does not exist. /// Open database file. Creates if it does not exist.
pub fn open(config: &DatabaseConfig, path: &str) -> Result<Database, String> { pub fn open(config: &DatabaseConfig, path: &str) -> Result<Database, String> {
// default cache size for columns not specified.
const DEFAULT_CACHE: usize = 2;
let mut opts = Options::new(); let mut opts = Options::new();
if let Some(rate_limit) = config.compaction.write_rate_limit { if let Some(rate_limit) = config.compaction.write_rate_limit {
opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit))?; opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit))?;
@ -460,22 +480,7 @@ impl Database {
let cfnames: Vec<&str> = cfnames.iter().map(|n| n as &str).collect(); let cfnames: Vec<&str> = cfnames.iter().map(|n| n as &str).collect();
for col in 0 .. config.columns.unwrap_or(0) { for col in 0 .. config.columns.unwrap_or(0) {
let mut opts = Options::new(); cf_options.push(col_config(col, &config));
opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction);
opts.set_target_file_size_base(config.compaction.initial_file_size);
opts.set_target_file_size_multiplier(config.compaction.file_size_multiplier);
let col_opt = config.columns.map(|_| col);
{
let cache_size = config.cache_sizes.get(&col_opt).cloned().unwrap_or(DEFAULT_CACHE);
let mut block_opts = BlockBasedOptions::new();
// all goes to read cache.
block_opts.set_cache(Cache::new(cache_size * 1024 * 1024));
opts.set_block_based_table_factory(&block_opts);
}
cf_options.push(opts);
} }
let mut write_opts = WriteOptions::new(); let mut write_opts = WriteOptions::new();
@ -768,6 +773,42 @@ impl Database {
*self.flushing.write() = mem::replace(&mut *db.flushing.write(), Vec::new()); *self.flushing.write() = mem::replace(&mut *db.flushing.write(), Vec::new());
Ok(()) Ok(())
} }
/// The number of non-default column families.
pub fn num_columns(&self) -> u32 {
self.db.read().as_ref()
.and_then(|db| if db.cfs.is_empty() { None } else { Some(db.cfs.len()) } )
.map(|n| n as u32)
.unwrap_or(0)
}
/// Drop a column family.
pub fn drop_column(&self) -> Result<(), String> {
match *self.db.write() {
Some(DBAndColumns { ref mut db, ref mut cfs }) => {
if let Some(col) = cfs.pop() {
let name = format!("col{}", cfs.len());
drop(col);
db.drop_cf(&name)?;
}
Ok(())
},
None => Ok(()),
}
}
/// Add a column family.
pub fn add_column(&self) -> Result<(), String> {
match *self.db.write() {
Some(DBAndColumns { ref mut db, ref mut cfs }) => {
let col = cfs.len() as u32;
let name = format!("col{}", col);
cfs.push(db.create_cf(&name, &col_config(col, &self.config))?);
Ok(())
},
None => Ok(()),
}
}
} }
// duplicate declaration of methods here to avoid trait import in certain existing cases // duplicate declaration of methods here to avoid trait import in certain existing cases
@ -886,4 +927,40 @@ mod tests {
let expected_output = Some(PathBuf::from("/sys/block/sda/queue/rotational")); let expected_output = Some(PathBuf::from("/sys/block/sda/queue/rotational"));
assert_eq!(rotational_from_df_output(example_df), expected_output); assert_eq!(rotational_from_df_output(example_df), expected_output);
} }
#[test]
fn dynamic_add_drop_columns() {
let config = DatabaseConfig::default();
let config_5 = DatabaseConfig::with_columns(Some(5));
let path = RandomTempPath::create_dir();
// open empty, add 5.
{
let db = Database::open(&config, path.as_path().to_str().unwrap()).unwrap();
assert_eq!(db.num_columns(), 0);
for i in 0..5 {
db.add_column().unwrap();
assert_eq!(db.num_columns(), i + 1);
}
}
// open 5, remove all.
{
let db = Database::open(&config_5, path.as_path().to_str().unwrap()).unwrap();
assert_eq!(db.num_columns(), 5);
for i in (0..5).rev() {
db.drop_column().unwrap();
assert_eq!(db.num_columns(), i);
}
}
// reopen as 0.
{
let db = Database::open(&config, path.as_path().to_str().unwrap()).unwrap();
assert_eq!(db.num_columns(), 0);
}
}
} }

View File

@ -127,6 +127,9 @@ pub trait Migration: 'static {
fn pre_columns(&self) -> Option<u32> { self.columns() } fn pre_columns(&self) -> Option<u32> { self.columns() }
/// Number of columns in database after the migration. /// Number of columns in database after the migration.
fn columns(&self) -> Option<u32>; fn columns(&self) -> Option<u32>;
/// Whether this migration alters any existing columns.
/// if not, then column families will simply be added and `migrate` will never be called.
fn alters_existing(&self) -> bool { true }
/// Version of the database after the migration. /// Version of the database after the migration.
fn version(&self) -> u32; fn version(&self) -> u32;
/// Migrate a source to a destination. /// Migrate a source to a destination.
@ -149,6 +152,8 @@ impl<T: SimpleMigration> Migration for T {
fn version(&self) -> u32 { SimpleMigration::version(self) } fn version(&self) -> u32 { SimpleMigration::version(self) }
fn alters_existing(&self) -> bool { true }
fn migrate(&mut self, source: Arc<Database>, config: &Config, dest: &mut Database, col: Option<u32>) -> Result<(), Error> { fn migrate(&mut self, source: Arc<Database>, config: &Config, dest: &mut Database, col: Option<u32>) -> Result<(), Error> {
let mut batch = Batch::new(config, col); let mut batch = Batch::new(config, col);
@ -256,12 +261,13 @@ impl Manager {
let current_columns = db_config.columns; let current_columns = db_config.columns;
db_config.columns = migration.columns(); db_config.columns = migration.columns();
// slow migrations: alter existing data.
if migration.alters_existing() {
// open the target temporary database. // open the target temporary database.
temp_path = temp_idx.path(&db_root); temp_path = temp_idx.path(&db_root);
let temp_path_str = temp_path.to_str().ok_or(Error::MigrationImpossible)?; let temp_path_str = temp_path.to_str().ok_or(Error::MigrationImpossible)?;
let mut new_db = Database::open(&db_config, temp_path_str).map_err(Error::Custom)?; let mut new_db = Database::open(&db_config, temp_path_str).map_err(Error::Custom)?;
// perform the migration from cur_db to new_db.
match current_columns { match current_columns {
// migrate only default column // migrate only default column
None => migration.migrate(cur_db.clone(), &config, &mut new_db, None)?, None => migration.migrate(cur_db.clone(), &config, &mut new_db, None)?,
@ -278,6 +284,18 @@ impl Manager {
// remove the other temporary migration database. // remove the other temporary migration database.
let _ = fs::remove_dir_all(temp_idx.path(&db_root)); let _ = fs::remove_dir_all(temp_idx.path(&db_root));
} else {
// migrations which simply add or remove column families.
// we can do this in-place.
let goal_columns = migration.columns().unwrap_or(0);
while cur_db.num_columns() < goal_columns {
cur_db.add_column().map_err(Error::Custom)?;
}
while cur_db.num_columns() > goal_columns {
cur_db.drop_column().map_err(Error::Custom)?;
}
}
} }
Ok(temp_path) Ok(temp_path)
} }