From 417b70f90f55420163a63b9e6e05735367394757 Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Wed, 10 Aug 2016 20:49:26 +0200 Subject: [PATCH] Don't return deleted nodes that are not yet flushed (#1908) --- util/src/kvdb.rs | 119 ++++++++++++++++++++++++----------------------- 1 file changed, 60 insertions(+), 59 deletions(-) diff --git a/util/src/kvdb.rs b/util/src/kvdb.rs index 5e7c0930e..0aa48e45f 100644 --- a/util/src/kvdb.rs +++ b/util/src/kvdb.rs @@ -105,10 +105,10 @@ impl DBTransaction { } } -struct DBColumnOverlay { - insertions: HashMap, Bytes>, - compressed_insertions: HashMap, Bytes>, - deletions: HashSet>, +enum KeyState { + Insert(Bytes), + InsertCompressed(Bytes), + Delete, } /// Compaction profile for the database settings @@ -198,7 +198,7 @@ pub struct Database { db: DB, write_opts: WriteOptions, cfs: Vec, - overlay: RwLock>, + overlay: RwLock, KeyState>>>, } impl Database { @@ -275,11 +275,7 @@ impl Database { Ok(Database { db: db, write_opts: write_opts, - overlay: RwLock::new((0..(cfs.len() + 1)).map(|_| DBColumnOverlay { - insertions: HashMap::new(), - compressed_insertions: HashMap::new(), - deletions: HashSet::new(), - }).collect()), + overlay: RwLock::new((0..(cfs.len() + 1)).map(|_| HashMap::new()).collect()), cfs: cfs, }) } @@ -302,21 +298,15 @@ impl Database { match op { DBOp::Insert { col, key, value } => { let c = Self::to_overlay_column(col); - overlay[c].deletions.remove(&key); - overlay[c].compressed_insertions.remove(&key); - overlay[c].insertions.insert(key, value); + overlay[c].insert(key, KeyState::Insert(value)); }, DBOp::InsertCompressed { col, key, value } => { let c = Self::to_overlay_column(col); - overlay[c].deletions.remove(&key); - overlay[c].insertions.remove(&key); - overlay[c].compressed_insertions.insert(key, value); + overlay[c].insert(key, KeyState::InsertCompressed(value)); }, DBOp::Delete { col, key } => { let c = Self::to_overlay_column(col); - overlay[c].insertions.remove(&key); - overlay[c].compressed_insertions.remove(&key); - overlay[c].deletions.insert(key); + overlay[c].insert(key, KeyState::Delete); }, } }; @@ -328,34 +318,34 @@ impl Database { let batch = WriteBatch::new(); let mut overlay = self.overlay.write(); - let mut c = 0; - for column in overlay.iter_mut() { - let insertions = mem::replace(&mut column.insertions, HashMap::new()); - let compressed_insertions = mem::replace(&mut column.compressed_insertions, HashMap::new()); - let deletions = mem::replace(&mut column.deletions, HashSet::new()); - for d in deletions.into_iter() { - if c > 0 { - try!(batch.delete_cf(self.cfs[c - 1], &d)); - } else { - try!(batch.delete(&d)); + for (c, column) in overlay.iter_mut().enumerate() { + let column_data = mem::replace(column, HashMap::new()); + for (key, state) in column_data.into_iter() { + match state { + KeyState::Delete => { + if c > 0 { + try!(batch.delete_cf(self.cfs[c - 1], &key)); + } else { + try!(batch.delete(&key)); + } + }, + KeyState::Insert(value) => { + if c > 0 { + try!(batch.put_cf(self.cfs[c - 1], &key, &value)); + } else { + try!(batch.put(&key, &value)); + } + }, + KeyState::InsertCompressed(value) => { + let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks); + if c > 0 { + try!(batch.put_cf(self.cfs[c - 1], &key, &compressed)); + } else { + try!(batch.put(&key, &value)); + } + } } } - for (key, value) in insertions.into_iter() { - if c > 0 { - try!(batch.put_cf(self.cfs[c - 1], &key, &value)); - } else { - try!(batch.put(&key, &value)); - } - } - for (key, value) in compressed_insertions.into_iter() { - let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks); - if c > 0 { - try!(batch.put_cf(self.cfs[c - 1], &key, &compressed)); - } else { - try!(batch.put(&key, &compressed)); - } - } - c += 1; } self.db.write_opt(batch, &self.write_opts) } @@ -385,14 +375,19 @@ impl Database { /// Get value by key. pub fn get(&self, col: Option, key: &[u8]) -> Result, String> { let overlay = &self.overlay.read()[Self::to_overlay_column(col)]; - overlay.insertions.get(key).or_else(|| overlay.compressed_insertions.get(key)).map_or_else(|| - col.map_or_else( - || self.db.get(key).map(|r| r.map(|v| v.to_vec())), - |c| self.db.get_cf(self.cfs[c as usize], key).map(|r| r.map(|v| v.to_vec()))), - |value| Ok(Some(value.clone()))) + match overlay.get(key) { + Some(&KeyState::Insert(ref value)) | Some(&KeyState::InsertCompressed(ref value)) => Ok(Some(value.clone())), + Some(&KeyState::Delete) => Ok(None), + None => { + col.map_or_else( + || self.db.get(key).map(|r| r.map(|v| v.to_vec())), + |c| self.db.get_cf(self.cfs[c as usize], key).map(|r| r.map(|v| v.to_vec()))) + }, + } } - /// Get value by partial key. Prefix size should match configured prefix size. + /// Get value by partial key. Prefix size should match configured prefix size. Only searches flushed values. + // TODO: support prefix seek for unflushed ata pub fn get_by_prefix(&self, col: Option, prefix: &[u8]) -> Option> { let mut iter = col.map_or_else(|| self.db.iterator(IteratorMode::From(prefix, Direction::Forward)), |c| self.db.iterator_cf(self.cfs[c as usize], IteratorMode::From(prefix, Direction::Forward)).unwrap()); @@ -403,13 +398,9 @@ impl Database { } } - /// Check if there is anything in the database. - pub fn is_empty(&self, col: Option) -> bool { - self.iter(col).next().is_none() - } - - /// Get database iterator. + /// Get database iterator for flushed data. pub fn iter(&self, col: Option) -> DatabaseIterator { + //TODO: iterate over overlay col.map_or_else(|| DatabaseIterator { iter: self.db.iterator(IteratorMode::Start) }, |c| DatabaseIterator { iter: self.db.iterator_cf(self.cfs[c as usize], IteratorMode::Start).unwrap() }) } @@ -462,13 +453,23 @@ mod tests { assert_eq!(&*db.get_by_prefix(None, &key3).unwrap(), b"elephant"); assert_eq!(&*db.get_by_prefix(None, &key2).unwrap(), b"dog"); + + let transaction = db.transaction(); + transaction.put(None, &key1, b"horse").unwrap(); + transaction.delete(None, &key3).unwrap(); + db.write_buffered(transaction).unwrap(); + assert!(db.get(None, &key3).unwrap().is_none()); + assert_eq!(&*db.get(None, &key1).unwrap().unwrap(), b"horse"); + + db.flush().unwrap(); + assert!(db.get(None, &key3).unwrap().is_none()); + assert_eq!(&*db.get(None, &key1).unwrap().unwrap(), b"horse"); } #[test] fn kvdb() { let path = RandomTempPath::create_dir(); - let smoke = Database::open_default(path.as_path().to_str().unwrap()).unwrap(); - assert!(smoke.is_empty(None)); + let _ = Database::open_default(path.as_path().to_str().unwrap()).unwrap(); test_db(&DatabaseConfig::default()); } }