From 1b759c14621da6a69d2a19f4d6656689e8f71b78 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 5 Jul 2016 12:36:35 +0200 Subject: [PATCH] finish altering migration framework --- util/src/migration/mod.rs | 106 ++++++++++++++------------------------ 1 file changed, 38 insertions(+), 68 deletions(-) diff --git a/util/src/migration/mod.rs b/util/src/migration/mod.rs index fdf771050..80ca3b401 100644 --- a/util/src/migration/mod.rs +++ b/util/src/migration/mod.rs @@ -57,51 +57,12 @@ impl From<::std::io::Error> for Error { } } -/// Migration source. -pub trait Source { - /// Get an iterator over the key, value pairs. - fn iter(&self) -> Box, Vec)>>; - - /// Query a specific key. - fn get(&self, key: &[u8]) -> Option>; -} - -impl Source for Database { - fn iter(&self) -> Box, Vec)>> { - Box::new(self.iter().map(|(k, v)| (k.to_vec(), v.to_vec()))) - } - - fn get(&self, key: &[u8]) -> Option> { - self.get(key).ok().and_then(|x| x).map(|x| x.to_owned()) - } -} - - - -/// Object being migrated -pub trait Migrateable { - /// Called on destination to commit batch of migrated entries. - fn commit(&mut self, batch: &BTreeMap, Vec>) -> Result<(), Error>; -} - -impl Migrateable for Database { - fn commit(&mut self, batch: &BTreeMap, Vec>) -> Result<(), Error> { - let transaction = DBTransaction::new(); - - for keypair in batch { - try!(transaction.put(&keypair.0, &keypair.1).map_err(Error::Custom)) - } - - self.write(transaction).map_err(Error::Custom) - } -} - /// A generalized migration from the given db to a destination db. pub trait Migration: 'static { /// Version of the database after the migration. fn version(&self) -> u32; /// Migrate a source to a destination. - fn migrate(&self, source: &Source, config: &Config, destination: &mut Destination) -> Result<(), Error>; + fn migrate(&self, source: &Database, config: &Config, destination: &mut Database) -> Result<(), Error>; } /// A simple migration over key-value pairs. @@ -116,7 +77,7 @@ pub trait SimpleMigration: 'static { impl Migration for T { fn version(&self) -> u32 { SimpleMigration::version(self) } - fn migrate(&self, source: &Source, config: &Config, destination: &mut Destination) -> Result<(), Error> { + fn migrate(&self, source: &Database, config: &Config, dest: &mut Database) -> Result<(), Error> { let mut batch: BTreeMap, Vec> = BTreeMap::new(); for (key, value) in source.iter() { @@ -126,17 +87,27 @@ impl Migration for T { } if batch.len() == config.batch_size { - try!(destination.commit(&batch)); - batch.clear(); + try!(commit_batch(dest, &batch)); } } - try!(destination.commit(&batch)); + try!(commit_batch(dest, &batch)); Ok(()) } } +/// Commit a batch of writes to a database. +pub fn commit_batch(db: &mut Database, batch: &BTreeMap, Vec>) -> Result<(), Error> { + let transaction = DBTransaction::new(); + + for keypair in batch { + try!(transaction.put(&keypair.0, &keypair.1).map_err(Error::Custom)); + } + + db.write(transaction).map_err(Error::Custom) +} + /// Get the path where all databases reside. fn database_path(path: &Path) -> PathBuf { let mut temp_path = path.to_owned(); @@ -144,30 +115,29 @@ fn database_path(path: &Path) -> PathBuf { temp_path } -// which temp database we are currently using -enum TempDB { +enum TempIdx { One, Two, } -impl TempDB { - // given the path that all databases reside in, - // return the path of this one. - fn path(&self, db_path: &Path) -> PathBuf { - let mut path = db_path.to_owned(); +impl TempIdx { + fn swap(&mut self) { match *self { - TempDB::One => path.push("migration_1"), - TempDB::Two => path.push("migration_2"), + TempIdx::One => *self = TempIdx::Two, + TempIdx::Two => *self = TempIdx::One, } - - path } - fn swap(&mut self) { - *self = match *self { - TempDB::One => TempDB::Two, - TempDB::Two => TempDB::One, + // given the path to the old database, get the path of this one. + fn path(&self, db_root: &Path) -> PathBuf { + let mut buf = db_root.to_owned(); + + match *self { + TempIdx::One => buf.push("temp_migration_1"), + TempIdx::Two => buf.push("temp_migration_2"), }; + + buf } } @@ -199,10 +169,9 @@ impl Manager { } } - /// Performs migrations in order. This alternates migrating between two - /// temporary databases, and returns the path to the final one used. - /// The other is deleted by this function. - pub fn execute(&self, db_path: PathBuf, version: u32) -> Result { + /// Performs migration in order, starting with a source path, migrating between two temporary databases, + /// and producing a path where the final migration lives. + pub fn execute(&self, old_path: &Path, version: u32) -> Result { let migrations = try!(self.migrations_from(version).ok_or(Error::MigrationImpossible)); let db_config = DatabaseConfig { prefix_size: None, @@ -211,15 +180,16 @@ impl Manager { compaction: CompactionProfile::default(), }; - let mut temp_db = TempDB::One; - let mut temp_path = temp_db.path(&db_path); + let db_root = database_path(old_path); + let mut temp_idx = TempIdx::One; + let mut temp_path = temp_idx.path(&db_root); // start with the old db. let old_path_str = try!(old_path.to_str().ok_or(Error::MigrationImpossible)); let mut cur_db = try!(Database::open(&db_config, old_path_str).map_err(|s| Error::Custom(s))); for migration in migrations { // open the target temporary database. - temp_path = temp_db.path(&db_path); + temp_path = temp_idx.path(&db_root); let temp_path_str = try!(temp_path.to_str().ok_or(Error::MigrationImpossible)); let mut new_db = try!(Database::open(&db_config, temp_path_str).map_err(|s| Error::Custom(s))); @@ -227,10 +197,10 @@ impl Manager { try!(migration.migrate(&cur_db, &self.config, &mut new_db)); // next iteration, we will migrate from this db into the other temp. cur_db = new_db; - temp_db.swap(); + temp_idx.swap(); // remove the other temporary migration database. - let _ = fs::remove_dir_all(temp_db.path(&db_path)); + let _ = fs::remove_dir_all(temp_idx.path(&db_root)); } Ok(temp_path) }