finish altering migration framework

This commit is contained in:
Robert Habermeier 2016-07-05 12:36:35 +02:00
parent 2ea7a8d666
commit 1b759c1462

View File

@ -57,51 +57,12 @@ impl From<::std::io::Error> for Error {
} }
} }
/// Migration source.
pub trait Source {
/// Get an iterator over the key, value pairs.
fn iter(&self) -> Box<Iterator<Item=(Vec<u8>, Vec<u8>)>>;
/// Query a specific key.
fn get(&self, key: &[u8]) -> Option<Vec<u8>>;
}
impl Source for Database {
fn iter(&self) -> Box<Iterator<Item=(Vec<u8>, Vec<u8>)>> {
Box::new(self.iter().map(|(k, v)| (k.to_vec(), v.to_vec())))
}
fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
self.get(key).ok().and_then(|x| x).map(|x| x.to_owned())
}
}
/// Object being migrated
pub trait Migrateable {
/// Called on destination to commit batch of migrated entries.
fn commit(&mut self, batch: &BTreeMap<Vec<u8>, Vec<u8>>) -> Result<(), Error>;
}
impl Migrateable for Database {
fn commit(&mut self, batch: &BTreeMap<Vec<u8>, Vec<u8>>) -> Result<(), Error> {
let transaction = DBTransaction::new();
for keypair in batch {
try!(transaction.put(&keypair.0, &keypair.1).map_err(Error::Custom))
}
self.write(transaction).map_err(Error::Custom)
}
}
/// A generalized migration from the given db to a destination db. /// A generalized migration from the given db to a destination db.
pub trait Migration: 'static { pub trait Migration: 'static {
/// Version of the database after the migration. /// Version of the database after the migration.
fn version(&self) -> u32; fn version(&self) -> u32;
/// Migrate a source to a destination. /// Migrate a source to a destination.
fn migrate(&self, source: &Source, config: &Config, destination: &mut Destination) -> Result<(), Error>; fn migrate(&self, source: &Database, config: &Config, destination: &mut Database) -> Result<(), Error>;
} }
/// A simple migration over key-value pairs. /// A simple migration over key-value pairs.
@ -116,7 +77,7 @@ pub trait SimpleMigration: 'static {
impl<T: SimpleMigration> Migration for T { impl<T: SimpleMigration> Migration for T {
fn version(&self) -> u32 { SimpleMigration::version(self) } fn version(&self) -> u32 { SimpleMigration::version(self) }
fn migrate(&self, source: &Source, config: &Config, destination: &mut Destination) -> Result<(), Error> { fn migrate(&self, source: &Database, config: &Config, dest: &mut Database) -> Result<(), Error> {
let mut batch: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new(); let mut batch: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
for (key, value) in source.iter() { for (key, value) in source.iter() {
@ -126,17 +87,27 @@ impl<T: SimpleMigration> Migration for T {
} }
if batch.len() == config.batch_size { if batch.len() == config.batch_size {
try!(destination.commit(&batch)); try!(commit_batch(dest, &batch));
batch.clear();
} }
} }
try!(destination.commit(&batch)); try!(commit_batch(dest, &batch));
Ok(()) Ok(())
} }
} }
/// Commit a batch of writes to a database.
pub fn commit_batch(db: &mut Database, batch: &BTreeMap<Vec<u8>, Vec<u8>>) -> Result<(), Error> {
let transaction = DBTransaction::new();
for keypair in batch {
try!(transaction.put(&keypair.0, &keypair.1).map_err(Error::Custom));
}
db.write(transaction).map_err(Error::Custom)
}
/// Get the path where all databases reside. /// Get the path where all databases reside.
fn database_path(path: &Path) -> PathBuf { fn database_path(path: &Path) -> PathBuf {
let mut temp_path = path.to_owned(); let mut temp_path = path.to_owned();
@ -144,30 +115,29 @@ fn database_path(path: &Path) -> PathBuf {
temp_path temp_path
} }
// which temp database we are currently using enum TempIdx {
enum TempDB {
One, One,
Two, Two,
} }
impl TempDB { impl TempIdx {
// given the path that all databases reside in,
// return the path of this one.
fn path(&self, db_path: &Path) -> PathBuf {
let mut path = db_path.to_owned();
match *self {
TempDB::One => path.push("migration_1"),
TempDB::Two => path.push("migration_2"),
}
path
}
fn swap(&mut self) { fn swap(&mut self) {
*self = match *self { match *self {
TempDB::One => TempDB::Two, TempIdx::One => *self = TempIdx::Two,
TempDB::Two => TempDB::One, TempIdx::Two => *self = TempIdx::One,
}
}
// given the path to the old database, get the path of this one.
fn path(&self, db_root: &Path) -> PathBuf {
let mut buf = db_root.to_owned();
match *self {
TempIdx::One => buf.push("temp_migration_1"),
TempIdx::Two => buf.push("temp_migration_2"),
}; };
buf
} }
} }
@ -199,10 +169,9 @@ impl Manager {
} }
} }
/// Performs migrations in order. This alternates migrating between two /// Performs migration in order, starting with a source path, migrating between two temporary databases,
/// temporary databases, and returns the path to the final one used. /// and producing a path where the final migration lives.
/// The other is deleted by this function. pub fn execute(&self, old_path: &Path, version: u32) -> Result<PathBuf, Error> {
pub fn execute(&self, db_path: PathBuf, version: u32) -> Result<PathBuf, Error> {
let migrations = try!(self.migrations_from(version).ok_or(Error::MigrationImpossible)); let migrations = try!(self.migrations_from(version).ok_or(Error::MigrationImpossible));
let db_config = DatabaseConfig { let db_config = DatabaseConfig {
prefix_size: None, prefix_size: None,
@ -211,15 +180,16 @@ impl Manager {
compaction: CompactionProfile::default(), compaction: CompactionProfile::default(),
}; };
let mut temp_db = TempDB::One; let db_root = database_path(old_path);
let mut temp_path = temp_db.path(&db_path); let mut temp_idx = TempIdx::One;
let mut temp_path = temp_idx.path(&db_root);
// start with the old db. // start with the old db.
let old_path_str = try!(old_path.to_str().ok_or(Error::MigrationImpossible)); let old_path_str = try!(old_path.to_str().ok_or(Error::MigrationImpossible));
let mut cur_db = try!(Database::open(&db_config, old_path_str).map_err(|s| Error::Custom(s))); let mut cur_db = try!(Database::open(&db_config, old_path_str).map_err(|s| Error::Custom(s)));
for migration in migrations { for migration in migrations {
// open the target temporary database. // open the target temporary database.
temp_path = temp_db.path(&db_path); temp_path = temp_idx.path(&db_root);
let temp_path_str = try!(temp_path.to_str().ok_or(Error::MigrationImpossible)); let temp_path_str = try!(temp_path.to_str().ok_or(Error::MigrationImpossible));
let mut new_db = try!(Database::open(&db_config, temp_path_str).map_err(|s| Error::Custom(s))); let mut new_db = try!(Database::open(&db_config, temp_path_str).map_err(|s| Error::Custom(s)));
@ -227,10 +197,10 @@ impl Manager {
try!(migration.migrate(&cur_db, &self.config, &mut new_db)); try!(migration.migrate(&cur_db, &self.config, &mut new_db));
// next iteration, we will migrate from this db into the other temp. // next iteration, we will migrate from this db into the other temp.
cur_db = new_db; cur_db = new_db;
temp_db.swap(); temp_idx.swap();
// remove the other temporary migration database. // remove the other temporary migration database.
let _ = fs::remove_dir_all(temp_db.path(&db_path)); let _ = fs::remove_dir_all(temp_idx.path(&db_root));
} }
Ok(temp_path) Ok(temp_path)
} }