diff --git a/util/src/kvdb.rs b/util/src/kvdb.rs index 3a2652c3d..63d46d573 100644 --- a/util/src/kvdb.rs +++ b/util/src/kvdb.rs @@ -277,7 +277,8 @@ pub struct Database { // Values currently being flushed. Cleared when `flush` completes. flushing: RwLock, KeyState>>>, // Prevents concurrent flushes. - flushing_lock: Mutex<()>, + // Value indicates if a flush is in progress. + flushing_lock: Mutex, } impl Database { @@ -379,7 +380,7 @@ impl Database { write_opts: write_opts, overlay: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), flushing: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), - flushing_lock: Mutex::new(()), + flushing_lock: Mutex::new((false)), path: path.to_owned(), read_opts: read_opts, }) @@ -417,11 +418,10 @@ impl Database { }; } - /// Commit buffered changes to database. - pub fn flush(&self) -> Result<(), String> { + /// Commit buffered changes to database. Must be called under `flush_lock` + fn write_flushing_with_lock(&self, _lock: &mut MutexGuard) -> Result<(), String> { match *self.db.read() { Some(DBAndColumns { ref db, ref cfs }) => { - let _lock = self.flushing_lock.lock(); let batch = WriteBatch::new(); mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write()); { @@ -464,6 +464,20 @@ impl Database { } } + /// Commit buffered changes to database. + pub fn flush(&self) -> Result<(), String> { + let mut lock = self.flushing_lock.lock(); + // If RocksDB batch allocation fails the thread gets terminated and the lock is released. + // The value inside the lock is used to detect that. + if *lock { + // This can only happen if another flushing thread is terminated unexpectedly. + return Err("Database write failure. Running low on memory perhaps?".to_owned()); + } + *lock = true; + let result = self.write_flushing_with_lock(&mut lock); + *lock = false; + result + } /// Commit transaction to database. pub fn write(&self, tr: DBTransaction) -> Result<(), String> {