Improve handling of RocksDB corruption (#7630)

* kvdb-rocksdb: update rust-rocksdb version

* kvdb-rocksdb: mark corruptions and attempt repair on db open

* kvdb-rocksdb: better corruption detection on open

* kvdb-rocksdb: add corruption_file_name const

* kvdb-rocksdb: rename mark_corruption to check_for_corruption
This commit is contained in:
André Silva 2018-01-19 13:33:38 +00:00 committed by Marek Kotewicz
parent 6bebb9e74a
commit 2af4bd195f
2 changed files with 49 additions and 12 deletions

5
Cargo.lock generated
View File

@ -2679,7 +2679,7 @@ dependencies = [
[[package]]
name = "rocksdb"
version = "0.4.5"
source = "git+https://github.com/paritytech/rust-rocksdb#7adec2311d31387a832b0ef051472cdef906b480"
source = "git+https://github.com/paritytech/rust-rocksdb#ecf06adf3148ab10f6f7686b724498382ff4f36e"
dependencies = [
"libc 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)",
"local-encoding 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2689,10 +2689,11 @@ dependencies = [
[[package]]
name = "rocksdb-sys"
version = "0.3.0"
source = "git+https://github.com/paritytech/rust-rocksdb#7adec2311d31387a832b0ef051472cdef906b480"
source = "git+https://github.com/paritytech/rust-rocksdb#ecf06adf3148ab10f6f7686b724498382ff4f36e"
dependencies = [
"cc 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)",
"local-encoding 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"snappy-sys 0.1.0 (git+https://github.com/paritytech/rust-snappy)",
]

View File

@ -32,7 +32,7 @@ use std::cmp;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::path::{PathBuf, Path};
use std::{mem, fs, io};
use std::{fs, io, mem, result};
use parking_lot::{Mutex, MutexGuard, RwLock};
use rocksdb::{
@ -257,7 +257,25 @@ pub struct Database {
flushing_lock: Mutex<bool>,
}
#[inline]
fn check_for_corruption<T, P: AsRef<Path>>(path: P, res: result::Result<T, String>) -> result::Result<T, String> {
if let Err(ref s) = res {
if s.starts_with("Corruption:") {
warn!("DB corrupted: {}. Repair will be triggered on next restart", s);
let _ = fs::File::create(path.as_ref().join(Database::CORRUPTION_FILE_NAME));
}
}
res
}
fn is_corrupted(s: &str) -> bool {
s.starts_with("Corruption:") || s.starts_with("Invalid argument: You have to open all column families")
}
impl Database {
const CORRUPTION_FILE_NAME: &'static str = "CORRUPTED";
/// Open database with default settings.
pub fn open_default(path: &str) -> Result<Database> {
Database::open(&DatabaseConfig::default(), path)
@ -287,6 +305,14 @@ impl Database {
block_opts.set_cache(cache);
}
// attempt database repair if it has been previously marked as corrupted
let db_corrupted = Path::new(path).join(Database::CORRUPTION_FILE_NAME);
if db_corrupted.exists() {
warn!("DB has been previously marked as corrupted, attempting repair");
DB::repair(&opts, path)?;
fs::remove_file(db_corrupted)?;
}
let columns = config.columns.unwrap_or(0) as usize;
let mut cf_options = Vec::with_capacity(columns);
@ -306,12 +332,11 @@ impl Database {
let mut cfs: Vec<Column> = Vec::new();
let db = match config.columns {
Some(columns) => {
Some(_) => {
match DB::open_cf(&opts, path, &cfnames, &cf_options) {
Ok(db) => {
cfs = cfnames.iter().map(|n| db.cf_handle(n)
.expect("rocksdb opens a cf_handle for each cfname; qed")).collect();
assert!(cfs.len() == columns as usize);
Ok(db)
}
Err(_) => {
@ -321,7 +346,7 @@ impl Database {
cfs = cfnames.iter().enumerate().map(|(i, n)| db.create_cf(n, &cf_options[i])).collect::<::std::result::Result<_, _>>()?;
Ok(db)
},
err @ Err(_) => err,
err => err,
}
}
}
@ -331,14 +356,18 @@ impl Database {
let db = match db {
Ok(db) => db,
Err(ref s) if s.starts_with("Corruption:") => {
info!("{}", s);
info!("Attempting DB repair for {}", path);
Err(ref s) if is_corrupted(s) => {
warn!("DB corrupted: {}, attempting repair", s);
DB::repair(&opts, path)?;
match cfnames.is_empty() {
true => DB::open(&opts, path)?,
false => DB::open_cf(&opts, path, &cfnames, &cf_options)?
false => {
let db = DB::open_cf(&opts, path, &cfnames, &cf_options)?;
cfs = cfnames.iter().map(|n| db.cf_handle(n)
.expect("rocksdb opens a cf_handle for each cfname; qed")).collect();
db
},
}
},
Err(s) => { return Err(s.into()); }
@ -425,7 +454,11 @@ impl Database {
}
}
}
db.write_opt(batch, &self.write_opts)?;
check_for_corruption(
&self.path,
db.write_opt(batch, &self.write_opts))?;
for column in self.flushing.write().iter_mut() {
column.clear();
column.shrink_to_fit();
@ -471,7 +504,10 @@ impl Database {
},
}
}
db.write_opt(batch, &self.write_opts).map_err(Into::into)
check_for_corruption(
&self.path,
db.write_opt(batch, &self.write_opts)).map_err(Into::into)
},
None => Err("Database is closed".into())
}