From 2af4bd195ff8f0262af4ee13cbff26bd9a4bb2bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Fri, 19 Jan 2018 13:33:38 +0000 Subject: [PATCH] Improve handling of RocksDB corruption (#7630) * kvdb-rocksdb: update rust-rocksdb version * kvdb-rocksdb: mark corruptions and attempt repair on db open * kvdb-rocksdb: better corruption detection on open * kvdb-rocksdb: add corruption_file_name const * kvdb-rocksdb: rename mark_corruption to check_for_corruption --- Cargo.lock | 5 ++-- util/kvdb-rocksdb/src/lib.rs | 56 +++++++++++++++++++++++++++++------- 2 files changed, 49 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f5cffa8da..fe59a2605 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2679,7 +2679,7 @@ dependencies = [ [[package]] name = "rocksdb" version = "0.4.5" -source = "git+https://github.com/paritytech/rust-rocksdb#7adec2311d31387a832b0ef051472cdef906b480" +source = "git+https://github.com/paritytech/rust-rocksdb#ecf06adf3148ab10f6f7686b724498382ff4f36e" dependencies = [ "libc 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)", "local-encoding 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2689,10 +2689,11 @@ dependencies = [ [[package]] name = "rocksdb-sys" version = "0.3.0" -source = "git+https://github.com/paritytech/rust-rocksdb#7adec2311d31387a832b0ef051472cdef906b480" +source = "git+https://github.com/paritytech/rust-rocksdb#ecf06adf3148ab10f6f7686b724498382ff4f36e" dependencies = [ "cc 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)", + "local-encoding 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "snappy-sys 0.1.0 (git+https://github.com/paritytech/rust-snappy)", ] diff --git a/util/kvdb-rocksdb/src/lib.rs b/util/kvdb-rocksdb/src/lib.rs index df605855c..91cee86bd 100644 --- a/util/kvdb-rocksdb/src/lib.rs +++ b/util/kvdb-rocksdb/src/lib.rs @@ -32,7 +32,7 @@ use std::cmp; use std::collections::HashMap; use std::marker::PhantomData; use std::path::{PathBuf, Path}; -use std::{mem, fs, io}; +use std::{fs, io, mem, result}; use parking_lot::{Mutex, MutexGuard, RwLock}; use rocksdb::{ @@ -257,7 +257,25 @@ pub struct Database { flushing_lock: Mutex, } +#[inline] +fn check_for_corruption>(path: P, res: result::Result) -> result::Result { + if let Err(ref s) = res { + if s.starts_with("Corruption:") { + warn!("DB corrupted: {}. Repair will be triggered on next restart", s); + let _ = fs::File::create(path.as_ref().join(Database::CORRUPTION_FILE_NAME)); + } + } + + res +} + +fn is_corrupted(s: &str) -> bool { + s.starts_with("Corruption:") || s.starts_with("Invalid argument: You have to open all column families") +} + impl Database { + const CORRUPTION_FILE_NAME: &'static str = "CORRUPTED"; + /// Open database with default settings. pub fn open_default(path: &str) -> Result { Database::open(&DatabaseConfig::default(), path) @@ -287,6 +305,14 @@ impl Database { block_opts.set_cache(cache); } + // attempt database repair if it has been previously marked as corrupted + let db_corrupted = Path::new(path).join(Database::CORRUPTION_FILE_NAME); + if db_corrupted.exists() { + warn!("DB has been previously marked as corrupted, attempting repair"); + DB::repair(&opts, path)?; + fs::remove_file(db_corrupted)?; + } + let columns = config.columns.unwrap_or(0) as usize; let mut cf_options = Vec::with_capacity(columns); @@ -306,12 +332,11 @@ impl Database { let mut cfs: Vec = Vec::new(); let db = match config.columns { - Some(columns) => { + Some(_) => { match DB::open_cf(&opts, path, &cfnames, &cf_options) { Ok(db) => { cfs = cfnames.iter().map(|n| db.cf_handle(n) .expect("rocksdb opens a cf_handle for each cfname; qed")).collect(); - assert!(cfs.len() == columns as usize); Ok(db) } Err(_) => { @@ -321,7 +346,7 @@ impl Database { cfs = cfnames.iter().enumerate().map(|(i, n)| db.create_cf(n, &cf_options[i])).collect::<::std::result::Result<_, _>>()?; Ok(db) }, - err @ Err(_) => err, + err => err, } } } @@ -331,14 +356,18 @@ impl Database { let db = match db { Ok(db) => db, - Err(ref s) if s.starts_with("Corruption:") => { - info!("{}", s); - info!("Attempting DB repair for {}", path); + Err(ref s) if is_corrupted(s) => { + warn!("DB corrupted: {}, attempting repair", s); DB::repair(&opts, path)?; match cfnames.is_empty() { true => DB::open(&opts, path)?, - false => DB::open_cf(&opts, path, &cfnames, &cf_options)? + false => { + let db = DB::open_cf(&opts, path, &cfnames, &cf_options)?; + cfs = cfnames.iter().map(|n| db.cf_handle(n) + .expect("rocksdb opens a cf_handle for each cfname; qed")).collect(); + db + }, } }, Err(s) => { return Err(s.into()); } @@ -425,7 +454,11 @@ impl Database { } } } - db.write_opt(batch, &self.write_opts)?; + + check_for_corruption( + &self.path, + db.write_opt(batch, &self.write_opts))?; + for column in self.flushing.write().iter_mut() { column.clear(); column.shrink_to_fit(); @@ -471,7 +504,10 @@ impl Database { }, } } - db.write_opt(batch, &self.write_opts).map_err(Into::into) + + check_for_corruption( + &self.path, + db.write_opt(batch, &self.write_opts)).map_err(Into::into) }, None => Err("Database is closed".into()) }