batch abstraction for migration

This commit is contained in:
Robert Habermeier 2016-07-08 15:34:03 +02:00
parent 3e61d6f3f9
commit a160adadaa

View File

@ -39,6 +39,45 @@ impl Default for Config {
} }
} }
/// A batch of key-value pairs to be written into the database.
pub struct Batch {
inner: BTreeMap<Vec<u8>, Vec<u8>>,
batch_size: usize,
}
impl Batch {
/// Make a new batch with the given config.
pub fn new(config: &Config) -> Self {
Batch {
inner: BTreeMap::new(),
batch_size: config.batch_size,
}
}
/// Insert a value into the batch, committing if necessary.
pub fn insert(&mut self, key: Vec<u8>, value: Vec<u8>, dest: &mut Database) -> Result<(), Error> {
self.inner.insert(key, value);
if self.inner.len() == self.batch_size {
try!(self.commit(dest));
}
Ok(())
}
/// Commit all the items in the batch to the given database.
pub fn commit(&mut self, dest: &mut Database) -> Result<(), Error> {
if self.inner.is_empty() { return Ok(()) }
let transaction = DBTransaction::new();
for keypair in &self.inner {
try!(transaction.put(&keypair.0, &keypair.1).map_err(Error::Custom));
}
self.inner.clear();
dest.write(transaction).map_err(Error::Custom)
}
}
/// Migration error. /// Migration error.
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
@ -79,39 +118,18 @@ impl<T: SimpleMigration> Migration for T {
fn version(&self) -> u32 { SimpleMigration::version(self) } fn version(&self) -> u32 { SimpleMigration::version(self) }
fn migrate(&mut self, source: &Database, config: &Config, dest: &mut Database) -> Result<(), Error> { fn migrate(&mut self, source: &Database, config: &Config, dest: &mut Database) -> Result<(), Error> {
let mut batch: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new(); let mut batch = Batch::new(config);
for (key, value) in source.iter() { for (key, value) in source.iter() {
if let Some((key, value)) = self.simple_migrate(key.to_vec(), value.to_vec()) { if let Some((key, value)) = self.simple_migrate(key.to_vec(), value.to_vec()) {
batch.insert(key, value); try!(batch.insert(key, value, dest));
}
if batch.len() == config.batch_size {
try!(commit_batch(dest, &batch));
batch.clear();
} }
} }
if batch.len() != 0 { batch.commit(dest)
try!(commit_batch(dest, &batch));
}
Ok(())
} }
} }
/// Commit a batch of writes to a database.
pub fn commit_batch(db: &mut Database, batch: &BTreeMap<Vec<u8>, Vec<u8>>) -> Result<(), Error> {
let transaction = DBTransaction::new();
for keypair in batch {
try!(transaction.put(&keypair.0, &keypair.1).map_err(Error::Custom));
}
db.write(transaction).map_err(Error::Custom)
}
/// Get the path where all databases reside. /// Get the path where all databases reside.
fn database_path(path: &Path) -> PathBuf { fn database_path(path: &Path) -> PathBuf {
let mut temp_path = path.to_owned(); let mut temp_path = path.to_owned();