2019-01-07 11:33:07 +01:00
|
|
|
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
|
|
|
|
// This file is part of Parity Ethereum.
|
2016-05-24 22:38:11 +02:00
|
|
|
|
2019-01-07 11:33:07 +01:00
|
|
|
// Parity Ethereum is free software: you can redistribute it and/or modify
|
2016-05-24 22:38:11 +02:00
|
|
|
// it under the terms of the GNU General Public License as published by
|
|
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
|
|
// (at your option) any later version.
|
|
|
|
|
2019-01-07 11:33:07 +01:00
|
|
|
// Parity Ethereum is distributed in the hope that it will be useful,
|
2016-05-24 22:38:11 +02:00
|
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
// GNU General Public License for more details.
|
|
|
|
|
|
|
|
// You should have received a copy of the GNU General Public License
|
2019-01-07 11:33:07 +01:00
|
|
|
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
|
2016-05-24 22:38:11 +02:00
|
|
|
|
|
|
|
//! DB Migration module.
|
|
|
|
|
2017-10-10 20:01:27 +02:00
|
|
|
#[macro_use]
|
|
|
|
extern crate log;
|
|
|
|
#[macro_use]
|
|
|
|
extern crate macros;
|
|
|
|
|
|
|
|
extern crate kvdb;
|
2017-10-12 15:36:27 +02:00
|
|
|
extern crate kvdb_rocksdb;
|
2017-10-10 20:01:27 +02:00
|
|
|
|
2016-05-24 22:38:11 +02:00
|
|
|
use std::collections::BTreeMap;
|
2016-07-06 12:05:23 +02:00
|
|
|
use std::path::{Path, PathBuf};
|
2016-10-01 14:33:19 +02:00
|
|
|
use std::sync::Arc;
|
2018-07-02 11:04:48 +02:00
|
|
|
use std::{fs, io, error};
|
2016-07-06 12:05:23 +02:00
|
|
|
|
2017-10-12 15:36:27 +02:00
|
|
|
use kvdb::DBTransaction;
|
|
|
|
use kvdb_rocksdb::{CompactionProfile, Database, DatabaseConfig};
|
2016-07-06 12:05:23 +02:00
|
|
|
|
2018-07-02 11:04:48 +02:00
|
|
|
fn other_io_err<E>(e: E) -> io::Error where E: Into<Box<error::Error + Send + Sync>> {
|
|
|
|
io::Error::new(io::ErrorKind::Other, e)
|
2017-10-16 12:11:35 +02:00
|
|
|
}
|
|
|
|
|
2016-07-06 12:05:23 +02:00
|
|
|
/// Migration config.
|
2016-07-11 09:46:33 +02:00
|
|
|
#[derive(Clone)]
|
2016-07-06 12:05:23 +02:00
|
|
|
pub struct Config {
|
|
|
|
/// Defines how many elements should be migrated at once.
|
|
|
|
pub batch_size: usize,
|
2016-07-28 20:29:58 +02:00
|
|
|
/// Database compaction profile.
|
|
|
|
pub compaction_profile: CompactionProfile,
|
2016-07-06 12:05:23 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for Config {
|
|
|
|
fn default() -> Self {
|
|
|
|
Config {
|
|
|
|
batch_size: 1024,
|
2016-07-28 20:29:58 +02:00
|
|
|
compaction_profile: Default::default(),
|
2016-07-06 12:05:23 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-07-11 09:46:33 +02:00
|
|
|
/// A batch of key-value pairs to be written into the database.
|
|
|
|
pub struct Batch {
|
|
|
|
inner: BTreeMap<Vec<u8>, Vec<u8>>,
|
|
|
|
batch_size: usize,
|
2016-07-28 23:46:24 +02:00
|
|
|
column: Option<u32>,
|
2016-07-11 09:46:33 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Batch {
|
|
|
|
/// Make a new batch with the given config.
|
2016-07-28 23:46:24 +02:00
|
|
|
pub fn new(config: &Config, col: Option<u32>) -> Self {
|
2016-07-11 09:46:33 +02:00
|
|
|
Batch {
|
|
|
|
inner: BTreeMap::new(),
|
|
|
|
batch_size: config.batch_size,
|
2016-07-28 23:46:24 +02:00
|
|
|
column: col,
|
2016-07-11 09:46:33 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Insert a value into the batch, committing if necessary.
|
2018-07-02 11:04:48 +02:00
|
|
|
pub fn insert(&mut self, key: Vec<u8>, value: Vec<u8>, dest: &mut Database) -> io::Result<()> {
|
2016-07-11 09:46:33 +02:00
|
|
|
self.inner.insert(key, value);
|
|
|
|
if self.inner.len() == self.batch_size {
|
2016-12-27 12:53:56 +01:00
|
|
|
self.commit(dest)?;
|
2016-07-11 09:46:33 +02:00
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Commit all the items in the batch to the given database.
|
2018-07-02 11:04:48 +02:00
|
|
|
pub fn commit(&mut self, dest: &mut Database) -> io::Result<()> {
|
2016-07-11 09:46:33 +02:00
|
|
|
if self.inner.is_empty() { return Ok(()) }
|
|
|
|
|
2017-02-20 17:21:55 +01:00
|
|
|
let mut transaction = DBTransaction::new();
|
2016-07-11 09:46:33 +02:00
|
|
|
|
|
|
|
for keypair in &self.inner {
|
2016-08-18 09:43:56 +02:00
|
|
|
transaction.put(self.column, &keypair.0, &keypair.1);
|
2016-07-11 09:46:33 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
self.inner.clear();
|
2018-07-02 11:04:48 +02:00
|
|
|
dest.write(transaction)
|
2016-10-03 12:02:43 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-07-06 12:05:23 +02:00
|
|
|
/// A generalized migration from the given db to a destination db.
|
2016-05-24 22:38:11 +02:00
|
|
|
pub trait Migration: 'static {
|
2016-09-23 18:36:50 +02:00
|
|
|
/// Number of columns in the database before the migration.
|
|
|
|
fn pre_columns(&self) -> Option<u32> { self.columns() }
|
2016-07-28 23:46:24 +02:00
|
|
|
/// Number of columns in database after the migration.
|
|
|
|
fn columns(&self) -> Option<u32>;
|
2017-02-26 18:29:35 +01:00
|
|
|
/// Whether this migration alters any existing columns.
|
|
|
|
/// if not, then column families will simply be added and `migrate` will never be called.
|
|
|
|
fn alters_existing(&self) -> bool { true }
|
2016-07-06 12:05:23 +02:00
|
|
|
/// Version of the database after the migration.
|
|
|
|
fn version(&self) -> u32;
|
|
|
|
/// Migrate a source to a destination.
|
2018-07-02 11:04:48 +02:00
|
|
|
fn migrate(&mut self, source: Arc<Database>, config: &Config, destination: &mut Database, col: Option<u32>) -> io::Result<()>;
|
2016-07-06 12:05:23 +02:00
|
|
|
}
|
|
|
|
|
2018-02-22 14:53:10 +01:00
|
|
|
/// A simple migration over key-value pairs of a single column.
|
2016-07-06 12:05:23 +02:00
|
|
|
pub trait SimpleMigration: 'static {
|
2016-07-28 23:46:24 +02:00
|
|
|
/// Number of columns in database after the migration.
|
|
|
|
fn columns(&self) -> Option<u32>;
|
2016-05-24 22:38:11 +02:00
|
|
|
/// Version of database after the migration.
|
|
|
|
fn version(&self) -> u32;
|
2018-02-22 14:53:10 +01:00
|
|
|
/// Index of column which should be migrated.
|
|
|
|
fn migrated_column_index(&self) -> Option<u32>;
|
2016-05-24 22:38:11 +02:00
|
|
|
/// Should migrate existing object to new database.
|
|
|
|
/// Returns `None` if the object does not exist in new version of database.
|
2016-07-11 09:46:33 +02:00
|
|
|
fn simple_migrate(&mut self, key: Vec<u8>, value: Vec<u8>) -> Option<(Vec<u8>, Vec<u8>)>;
|
2016-05-24 22:38:11 +02:00
|
|
|
}
|
|
|
|
|
2016-07-06 12:05:23 +02:00
|
|
|
impl<T: SimpleMigration> Migration for T {
|
2016-07-28 23:46:24 +02:00
|
|
|
fn columns(&self) -> Option<u32> { SimpleMigration::columns(self) }
|
|
|
|
|
2016-07-06 12:05:23 +02:00
|
|
|
fn version(&self) -> u32 { SimpleMigration::version(self) }
|
|
|
|
|
2017-02-26 18:29:35 +01:00
|
|
|
fn alters_existing(&self) -> bool { true }
|
|
|
|
|
2018-07-02 11:04:48 +02:00
|
|
|
fn migrate(&mut self, source: Arc<Database>, config: &Config, dest: &mut Database, col: Option<u32>) -> io::Result<()> {
|
2018-02-22 14:53:10 +01:00
|
|
|
let migration_needed = col == SimpleMigration::migrated_column_index(self);
|
2016-07-28 23:46:24 +02:00
|
|
|
let mut batch = Batch::new(config, col);
|
2016-07-06 12:05:23 +02:00
|
|
|
|
2017-04-18 15:45:15 +02:00
|
|
|
let iter = match source.iter(col) {
|
|
|
|
Some(iter) => iter,
|
|
|
|
None => return Ok(()),
|
|
|
|
};
|
|
|
|
|
|
|
|
for (key, value) in iter {
|
2018-02-22 14:53:10 +01:00
|
|
|
if migration_needed {
|
|
|
|
if let Some((key, value)) = self.simple_migrate(key.into_vec(), value.into_vec()) {
|
|
|
|
batch.insert(key, value, dest)?;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
batch.insert(key.into_vec(), value.into_vec(), dest)?;
|
2016-07-06 12:05:23 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-07-11 09:46:33 +02:00
|
|
|
batch.commit(dest)
|
2016-07-06 12:05:23 +02:00
|
|
|
}
|
2016-05-24 22:38:11 +02:00
|
|
|
}
|
2016-07-06 12:05:23 +02:00
|
|
|
|
2017-02-26 18:41:40 +01:00
|
|
|
/// An even simpler migration which just changes the number of columns.
|
|
|
|
pub struct ChangeColumns {
|
2017-02-26 19:11:19 +01:00
|
|
|
/// The amount of columns before this migration.
|
2017-02-26 18:41:40 +01:00
|
|
|
pub pre_columns: Option<u32>,
|
2017-02-26 19:11:19 +01:00
|
|
|
/// The amount of columns after this migration.
|
2017-02-26 18:41:40 +01:00
|
|
|
pub post_columns: Option<u32>,
|
2017-02-26 19:11:19 +01:00
|
|
|
/// The version after this migration.
|
2017-02-26 18:41:40 +01:00
|
|
|
pub version: u32,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Migration for ChangeColumns {
|
|
|
|
fn pre_columns(&self) -> Option<u32> { self.pre_columns }
|
|
|
|
fn columns(&self) -> Option<u32> { self.post_columns }
|
|
|
|
fn version(&self) -> u32 { self.version }
|
|
|
|
fn alters_existing(&self) -> bool { false }
|
2018-07-02 11:04:48 +02:00
|
|
|
fn migrate(&mut self, _: Arc<Database>, _: &Config, _: &mut Database, _: Option<u32>) -> io::Result<()> {
|
2017-02-26 18:41:40 +01:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-07-06 12:05:23 +02:00
|
|
|
/// Get the path where all databases reside.
|
|
|
|
fn database_path(path: &Path) -> PathBuf {
|
|
|
|
let mut temp_path = path.to_owned();
|
|
|
|
temp_path.pop();
|
|
|
|
temp_path
|
|
|
|
}
|
|
|
|
|
|
|
|
enum TempIndex {
|
|
|
|
One,
|
|
|
|
Two,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl TempIndex {
|
|
|
|
fn swap(&mut self) {
|
|
|
|
match *self {
|
|
|
|
TempIndex::One => *self = TempIndex::Two,
|
|
|
|
TempIndex::Two => *self = TempIndex::One,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// given the path to the old database, get the path of this one.
|
|
|
|
fn path(&self, db_root: &Path) -> PathBuf {
|
|
|
|
let mut buf = db_root.to_owned();
|
|
|
|
|
|
|
|
match *self {
|
|
|
|
TempIndex::One => buf.push("temp_migration_1"),
|
|
|
|
TempIndex::Two => buf.push("temp_migration_2"),
|
|
|
|
};
|
|
|
|
|
|
|
|
buf
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Manages database migration.
|
|
|
|
pub struct Manager {
|
|
|
|
config: Config,
|
|
|
|
migrations: Vec<Box<Migration>>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Manager {
|
|
|
|
/// Creates new migration manager with given configuration.
|
|
|
|
pub fn new(config: Config) -> Self {
|
|
|
|
Manager {
|
|
|
|
config: config,
|
|
|
|
migrations: vec![],
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Adds new migration rules.
|
2018-07-02 11:04:48 +02:00
|
|
|
pub fn add_migration<T>(&mut self, migration: T) -> io::Result<()> where T: Migration {
|
2016-07-28 12:05:41 +02:00
|
|
|
let is_new = match self.migrations.last() {
|
|
|
|
Some(last) => migration.version() > last.version(),
|
2016-07-06 12:05:23 +02:00
|
|
|
None => true,
|
|
|
|
};
|
2016-09-23 18:36:50 +02:00
|
|
|
|
2016-07-28 12:05:41 +02:00
|
|
|
match is_new {
|
2016-07-06 12:05:23 +02:00
|
|
|
true => Ok(self.migrations.push(Box::new(migration))),
|
2018-07-02 11:04:48 +02:00
|
|
|
false => Err(other_io_err("Cannot add migration.")),
|
2016-07-06 12:05:23 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Performs migration in order, starting with a source path, migrating between two temporary databases,
|
|
|
|
/// and producing a path where the final migration lives.
|
2018-07-02 11:04:48 +02:00
|
|
|
pub fn execute(&mut self, old_path: &Path, version: u32) -> io::Result<PathBuf> {
|
2016-07-11 09:46:33 +02:00
|
|
|
let config = self.config.clone();
|
2016-07-28 23:46:24 +02:00
|
|
|
let migrations = self.migrations_from(version);
|
2016-10-03 12:02:43 +02:00
|
|
|
trace!(target: "migration", "Total migrations to execute for version {}: {}", version, migrations.len());
|
2017-10-16 12:11:35 +02:00
|
|
|
if migrations.is_empty() {
|
2018-07-02 11:04:48 +02:00
|
|
|
return Err(other_io_err("Migration impossible"));
|
2017-10-16 12:11:35 +02:00
|
|
|
};
|
2016-09-23 18:36:50 +02:00
|
|
|
|
2016-10-27 08:28:12 +02:00
|
|
|
let columns = migrations.get(0).and_then(|m| m.pre_columns());
|
2016-09-23 18:36:50 +02:00
|
|
|
|
2016-10-03 12:02:43 +02:00
|
|
|
trace!(target: "migration", "Expecting database to contain {:?} columns", columns);
|
2016-07-28 23:46:24 +02:00
|
|
|
let mut db_config = DatabaseConfig {
|
2016-07-06 12:05:23 +02:00
|
|
|
max_open_files: 64,
|
2018-01-03 11:00:37 +01:00
|
|
|
memory_budget: None,
|
2016-07-28 23:46:24 +02:00
|
|
|
compaction: config.compaction_profile,
|
|
|
|
columns: columns,
|
2016-07-06 12:05:23 +02:00
|
|
|
};
|
|
|
|
|
|
|
|
let db_root = database_path(old_path);
|
|
|
|
let mut temp_idx = TempIndex::One;
|
2017-02-26 19:30:54 +01:00
|
|
|
let mut temp_path = old_path.to_path_buf();
|
2016-07-06 12:05:23 +02:00
|
|
|
|
|
|
|
// start with the old db.
|
2018-07-02 11:04:48 +02:00
|
|
|
let old_path_str = old_path.to_str().ok_or_else(|| other_io_err("Migration impossible."))?;
|
2017-10-16 12:11:35 +02:00
|
|
|
let mut cur_db = Arc::new(Database::open(&db_config, old_path_str)?);
|
2016-07-28 23:46:24 +02:00
|
|
|
|
2016-07-06 12:05:23 +02:00
|
|
|
for migration in migrations {
|
2016-10-31 16:18:20 +01:00
|
|
|
trace!(target: "migration", "starting migration to version {}", migration.version());
|
2016-07-28 23:46:24 +02:00
|
|
|
// Change number of columns in new db
|
|
|
|
let current_columns = db_config.columns;
|
|
|
|
db_config.columns = migration.columns();
|
|
|
|
|
2017-02-26 18:29:35 +01:00
|
|
|
// slow migrations: alter existing data.
|
|
|
|
if migration.alters_existing() {
|
2017-02-26 19:30:54 +01:00
|
|
|
temp_path = temp_idx.path(&db_root);
|
|
|
|
|
2017-02-26 18:29:35 +01:00
|
|
|
// open the target temporary database.
|
2018-07-02 11:04:48 +02:00
|
|
|
let temp_path_str = temp_path.to_str().ok_or_else(|| other_io_err("Migration impossible."))?;
|
2017-10-16 12:11:35 +02:00
|
|
|
let mut new_db = Database::open(&db_config, temp_path_str)?;
|
2017-02-26 18:29:35 +01:00
|
|
|
|
|
|
|
match current_columns {
|
|
|
|
// migrate only default column
|
|
|
|
None => migration.migrate(cur_db.clone(), &config, &mut new_db, None)?,
|
|
|
|
Some(v) => {
|
|
|
|
// Migrate all columns in previous DB
|
|
|
|
for col in 0..v {
|
|
|
|
migration.migrate(cur_db.clone(), &config, &mut new_db, Some(col))?
|
|
|
|
}
|
2016-07-28 23:46:24 +02:00
|
|
|
}
|
|
|
|
}
|
2017-02-26 18:29:35 +01:00
|
|
|
// next iteration, we will migrate from this db into the other temp.
|
|
|
|
cur_db = Arc::new(new_db);
|
|
|
|
temp_idx.swap();
|
|
|
|
|
|
|
|
// remove the other temporary migration database.
|
|
|
|
let _ = fs::remove_dir_all(temp_idx.path(&db_root));
|
|
|
|
} else {
|
|
|
|
// migrations which simply add or remove column families.
|
|
|
|
// we can do this in-place.
|
|
|
|
let goal_columns = migration.columns().unwrap_or(0);
|
|
|
|
while cur_db.num_columns() < goal_columns {
|
2018-07-02 11:04:48 +02:00
|
|
|
cur_db.add_column().map_err(other_io_err)?;
|
2017-02-26 18:29:35 +01:00
|
|
|
}
|
2016-07-06 12:05:23 +02:00
|
|
|
|
2017-02-26 18:29:35 +01:00
|
|
|
while cur_db.num_columns() > goal_columns {
|
2018-07-02 11:04:48 +02:00
|
|
|
cur_db.drop_column().map_err(other_io_err)?;
|
2017-02-26 18:29:35 +01:00
|
|
|
}
|
|
|
|
}
|
2016-07-06 12:05:23 +02:00
|
|
|
}
|
|
|
|
Ok(temp_path)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns true if migration is needed.
|
|
|
|
pub fn is_needed(&self, version: u32) -> bool {
|
|
|
|
match self.migrations.last() {
|
|
|
|
Some(last) => version < last.version(),
|
|
|
|
None => false,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-07-28 12:05:41 +02:00
|
|
|
/// Find all needed migrations.
|
|
|
|
fn migrations_from(&mut self, version: u32) -> Vec<&mut Box<Migration>> {
|
|
|
|
self.migrations.iter_mut().filter(|m| m.version() > version).collect()
|
2016-07-06 12:05:23 +02:00
|
|
|
}
|
2016-07-28 23:46:24 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Prints a dot every `max` ticks
|
|
|
|
pub struct Progress {
|
|
|
|
current: usize,
|
|
|
|
max: usize,
|
2016-07-06 12:05:23 +02:00
|
|
|
}
|
|
|
|
|
2016-07-28 23:46:24 +02:00
|
|
|
impl Default for Progress {
|
|
|
|
fn default() -> Self {
|
|
|
|
Progress {
|
|
|
|
current: 0,
|
|
|
|
max: 100_000,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Progress {
|
|
|
|
/// Tick progress meter.
|
|
|
|
pub fn tick(&mut self) {
|
|
|
|
self.current += 1;
|
|
|
|
if self.current == self.max {
|
|
|
|
self.current = 0;
|
|
|
|
flush!(".");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|