Prevent database corruption on OOM (#2832)
* Prevent database corruption on OOM * Renamed write_flushing
This commit is contained in:
parent
1b42e9a9af
commit
edbd667696
@ -277,7 +277,8 @@ pub struct Database {
|
|||||||
// Values currently being flushed. Cleared when `flush` completes.
|
// Values currently being flushed. Cleared when `flush` completes.
|
||||||
flushing: RwLock<Vec<HashMap<ElasticArray32<u8>, KeyState>>>,
|
flushing: RwLock<Vec<HashMap<ElasticArray32<u8>, KeyState>>>,
|
||||||
// Prevents concurrent flushes.
|
// Prevents concurrent flushes.
|
||||||
flushing_lock: Mutex<()>,
|
// Value indicates if a flush is in progress.
|
||||||
|
flushing_lock: Mutex<bool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Database {
|
impl Database {
|
||||||
@ -379,7 +380,7 @@ impl Database {
|
|||||||
write_opts: write_opts,
|
write_opts: write_opts,
|
||||||
overlay: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()),
|
overlay: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()),
|
||||||
flushing: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()),
|
flushing: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()),
|
||||||
flushing_lock: Mutex::new(()),
|
flushing_lock: Mutex::new((false)),
|
||||||
path: path.to_owned(),
|
path: path.to_owned(),
|
||||||
read_opts: read_opts,
|
read_opts: read_opts,
|
||||||
})
|
})
|
||||||
@ -417,11 +418,10 @@ impl Database {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Commit buffered changes to database.
|
/// Commit buffered changes to database. Must be called under `flush_lock`
|
||||||
pub fn flush(&self) -> Result<(), String> {
|
fn write_flushing_with_lock(&self, _lock: &mut MutexGuard<bool>) -> Result<(), String> {
|
||||||
match *self.db.read() {
|
match *self.db.read() {
|
||||||
Some(DBAndColumns { ref db, ref cfs }) => {
|
Some(DBAndColumns { ref db, ref cfs }) => {
|
||||||
let _lock = self.flushing_lock.lock();
|
|
||||||
let batch = WriteBatch::new();
|
let batch = WriteBatch::new();
|
||||||
mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write());
|
mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write());
|
||||||
{
|
{
|
||||||
@ -464,6 +464,20 @@ impl Database {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Commit buffered changes to database.
|
||||||
|
pub fn flush(&self) -> Result<(), String> {
|
||||||
|
let mut lock = self.flushing_lock.lock();
|
||||||
|
// If RocksDB batch allocation fails the thread gets terminated and the lock is released.
|
||||||
|
// The value inside the lock is used to detect that.
|
||||||
|
if *lock {
|
||||||
|
// This can only happen if another flushing thread is terminated unexpectedly.
|
||||||
|
return Err("Database write failure. Running low on memory perhaps?".to_owned());
|
||||||
|
}
|
||||||
|
*lock = true;
|
||||||
|
let result = self.write_flushing_with_lock(&mut lock);
|
||||||
|
*lock = false;
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
/// Commit transaction to database.
|
/// Commit transaction to database.
|
||||||
pub fn write(&self, tr: DBTransaction) -> Result<(), String> {
|
pub fn write(&self, tr: DBTransaction) -> Result<(), String> {
|
||||||
|
Loading…
Reference in New Issue
Block a user