This commit is contained in:
Denis S. Soldatov aka General-Beck
2018-01-25 02:48:19 +03:00
parent cf10450108
commit 568dc33a02
23 changed files with 1496 additions and 586 deletions

View File

@@ -32,7 +32,7 @@ use std::cmp;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::path::{PathBuf, Path};
use std::{fs, io, mem, result};
use std::{mem, fs, io};
use parking_lot::{Mutex, MutexGuard, RwLock};
use rocksdb::{
@@ -257,25 +257,7 @@ pub struct Database {
flushing_lock: Mutex<bool>,
}
#[inline]
fn check_for_corruption<T, P: AsRef<Path>>(path: P, res: result::Result<T, String>) -> result::Result<T, String> {
if let Err(ref s) = res {
if s.starts_with("Corruption:") {
warn!("DB corrupted: {}. Repair will be triggered on next restart", s);
let _ = fs::File::create(path.as_ref().join(Database::CORRUPTION_FILE_NAME));
}
}
res
}
fn is_corrupted(s: &str) -> bool {
s.starts_with("Corruption:") || s.starts_with("Invalid argument: You have to open all column families")
}
impl Database {
const CORRUPTION_FILE_NAME: &'static str = "CORRUPTED";
/// Open database with default settings.
pub fn open_default(path: &str) -> Result<Database> {
Database::open(&DatabaseConfig::default(), path)
@@ -305,14 +287,6 @@ impl Database {
block_opts.set_cache(cache);
}
// attempt database repair if it has been previously marked as corrupted
let db_corrupted = Path::new(path).join(Database::CORRUPTION_FILE_NAME);
if db_corrupted.exists() {
warn!("DB has been previously marked as corrupted, attempting repair");
DB::repair(&opts, path)?;
fs::remove_file(db_corrupted)?;
}
let columns = config.columns.unwrap_or(0) as usize;
let mut cf_options = Vec::with_capacity(columns);
@@ -332,11 +306,12 @@ impl Database {
let mut cfs: Vec<Column> = Vec::new();
let db = match config.columns {
Some(_) => {
Some(columns) => {
match DB::open_cf(&opts, path, &cfnames, &cf_options) {
Ok(db) => {
cfs = cfnames.iter().map(|n| db.cf_handle(n)
.expect("rocksdb opens a cf_handle for each cfname; qed")).collect();
assert!(cfs.len() == columns as usize);
Ok(db)
}
Err(_) => {
@@ -346,7 +321,7 @@ impl Database {
cfs = cfnames.iter().enumerate().map(|(i, n)| db.create_cf(n, &cf_options[i])).collect::<::std::result::Result<_, _>>()?;
Ok(db)
},
err => err,
err @ Err(_) => err,
}
}
}
@@ -356,18 +331,14 @@ impl Database {
let db = match db {
Ok(db) => db,
Err(ref s) if is_corrupted(s) => {
warn!("DB corrupted: {}, attempting repair", s);
Err(ref s) if s.starts_with("Corruption:") => {
info!("{}", s);
info!("Attempting DB repair for {}", path);
DB::repair(&opts, path)?;
match cfnames.is_empty() {
true => DB::open(&opts, path)?,
false => {
let db = DB::open_cf(&opts, path, &cfnames, &cf_options)?;
cfs = cfnames.iter().map(|n| db.cf_handle(n)
.expect("rocksdb opens a cf_handle for each cfname; qed")).collect();
db
},
false => DB::open_cf(&opts, path, &cfnames, &cf_options)?
}
},
Err(s) => { return Err(s.into()); }
@@ -454,11 +425,7 @@ impl Database {
}
}
}
check_for_corruption(
&self.path,
db.write_opt(batch, &self.write_opts))?;
db.write_opt(batch, &self.write_opts)?;
for column in self.flushing.write().iter_mut() {
column.clear();
column.shrink_to_fit();
@@ -504,10 +471,7 @@ impl Database {
},
}
}
check_for_corruption(
&self.path,
db.write_opt(batch, &self.write_opts)).map_err(Into::into)
db.write_opt(batch, &self.write_opts).map_err(Into::into)
},
None => Err("Database is closed".into())
}