Don't return deleted nodes that are not yet flushed (#1908)
This commit is contained in:
parent
286b67d54b
commit
417b70f90f
119
util/src/kvdb.rs
119
util/src/kvdb.rs
@ -105,10 +105,10 @@ impl DBTransaction {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct DBColumnOverlay {
|
enum KeyState {
|
||||||
insertions: HashMap<ElasticArray32<u8>, Bytes>,
|
Insert(Bytes),
|
||||||
compressed_insertions: HashMap<ElasticArray32<u8>, Bytes>,
|
InsertCompressed(Bytes),
|
||||||
deletions: HashSet<ElasticArray32<u8>>,
|
Delete,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Compaction profile for the database settings
|
/// Compaction profile for the database settings
|
||||||
@ -198,7 +198,7 @@ pub struct Database {
|
|||||||
db: DB,
|
db: DB,
|
||||||
write_opts: WriteOptions,
|
write_opts: WriteOptions,
|
||||||
cfs: Vec<Column>,
|
cfs: Vec<Column>,
|
||||||
overlay: RwLock<Vec<DBColumnOverlay>>,
|
overlay: RwLock<Vec<HashMap<ElasticArray32<u8>, KeyState>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Database {
|
impl Database {
|
||||||
@ -275,11 +275,7 @@ impl Database {
|
|||||||
Ok(Database {
|
Ok(Database {
|
||||||
db: db,
|
db: db,
|
||||||
write_opts: write_opts,
|
write_opts: write_opts,
|
||||||
overlay: RwLock::new((0..(cfs.len() + 1)).map(|_| DBColumnOverlay {
|
overlay: RwLock::new((0..(cfs.len() + 1)).map(|_| HashMap::new()).collect()),
|
||||||
insertions: HashMap::new(),
|
|
||||||
compressed_insertions: HashMap::new(),
|
|
||||||
deletions: HashSet::new(),
|
|
||||||
}).collect()),
|
|
||||||
cfs: cfs,
|
cfs: cfs,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -302,21 +298,15 @@ impl Database {
|
|||||||
match op {
|
match op {
|
||||||
DBOp::Insert { col, key, value } => {
|
DBOp::Insert { col, key, value } => {
|
||||||
let c = Self::to_overlay_column(col);
|
let c = Self::to_overlay_column(col);
|
||||||
overlay[c].deletions.remove(&key);
|
overlay[c].insert(key, KeyState::Insert(value));
|
||||||
overlay[c].compressed_insertions.remove(&key);
|
|
||||||
overlay[c].insertions.insert(key, value);
|
|
||||||
},
|
},
|
||||||
DBOp::InsertCompressed { col, key, value } => {
|
DBOp::InsertCompressed { col, key, value } => {
|
||||||
let c = Self::to_overlay_column(col);
|
let c = Self::to_overlay_column(col);
|
||||||
overlay[c].deletions.remove(&key);
|
overlay[c].insert(key, KeyState::InsertCompressed(value));
|
||||||
overlay[c].insertions.remove(&key);
|
|
||||||
overlay[c].compressed_insertions.insert(key, value);
|
|
||||||
},
|
},
|
||||||
DBOp::Delete { col, key } => {
|
DBOp::Delete { col, key } => {
|
||||||
let c = Self::to_overlay_column(col);
|
let c = Self::to_overlay_column(col);
|
||||||
overlay[c].insertions.remove(&key);
|
overlay[c].insert(key, KeyState::Delete);
|
||||||
overlay[c].compressed_insertions.remove(&key);
|
|
||||||
overlay[c].deletions.insert(key);
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -328,34 +318,34 @@ impl Database {
|
|||||||
let batch = WriteBatch::new();
|
let batch = WriteBatch::new();
|
||||||
let mut overlay = self.overlay.write();
|
let mut overlay = self.overlay.write();
|
||||||
|
|
||||||
let mut c = 0;
|
for (c, column) in overlay.iter_mut().enumerate() {
|
||||||
for column in overlay.iter_mut() {
|
let column_data = mem::replace(column, HashMap::new());
|
||||||
let insertions = mem::replace(&mut column.insertions, HashMap::new());
|
for (key, state) in column_data.into_iter() {
|
||||||
let compressed_insertions = mem::replace(&mut column.compressed_insertions, HashMap::new());
|
match state {
|
||||||
let deletions = mem::replace(&mut column.deletions, HashSet::new());
|
KeyState::Delete => {
|
||||||
for d in deletions.into_iter() {
|
if c > 0 {
|
||||||
if c > 0 {
|
try!(batch.delete_cf(self.cfs[c - 1], &key));
|
||||||
try!(batch.delete_cf(self.cfs[c - 1], &d));
|
} else {
|
||||||
} else {
|
try!(batch.delete(&key));
|
||||||
try!(batch.delete(&d));
|
}
|
||||||
|
},
|
||||||
|
KeyState::Insert(value) => {
|
||||||
|
if c > 0 {
|
||||||
|
try!(batch.put_cf(self.cfs[c - 1], &key, &value));
|
||||||
|
} else {
|
||||||
|
try!(batch.put(&key, &value));
|
||||||
|
}
|
||||||
|
},
|
||||||
|
KeyState::InsertCompressed(value) => {
|
||||||
|
let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks);
|
||||||
|
if c > 0 {
|
||||||
|
try!(batch.put_cf(self.cfs[c - 1], &key, &compressed));
|
||||||
|
} else {
|
||||||
|
try!(batch.put(&key, &value));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (key, value) in insertions.into_iter() {
|
|
||||||
if c > 0 {
|
|
||||||
try!(batch.put_cf(self.cfs[c - 1], &key, &value));
|
|
||||||
} else {
|
|
||||||
try!(batch.put(&key, &value));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (key, value) in compressed_insertions.into_iter() {
|
|
||||||
let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks);
|
|
||||||
if c > 0 {
|
|
||||||
try!(batch.put_cf(self.cfs[c - 1], &key, &compressed));
|
|
||||||
} else {
|
|
||||||
try!(batch.put(&key, &compressed));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
c += 1;
|
|
||||||
}
|
}
|
||||||
self.db.write_opt(batch, &self.write_opts)
|
self.db.write_opt(batch, &self.write_opts)
|
||||||
}
|
}
|
||||||
@ -385,14 +375,19 @@ impl Database {
|
|||||||
/// Get value by key.
|
/// Get value by key.
|
||||||
pub fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<Bytes>, String> {
|
pub fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<Bytes>, String> {
|
||||||
let overlay = &self.overlay.read()[Self::to_overlay_column(col)];
|
let overlay = &self.overlay.read()[Self::to_overlay_column(col)];
|
||||||
overlay.insertions.get(key).or_else(|| overlay.compressed_insertions.get(key)).map_or_else(||
|
match overlay.get(key) {
|
||||||
col.map_or_else(
|
Some(&KeyState::Insert(ref value)) | Some(&KeyState::InsertCompressed(ref value)) => Ok(Some(value.clone())),
|
||||||
|| self.db.get(key).map(|r| r.map(|v| v.to_vec())),
|
Some(&KeyState::Delete) => Ok(None),
|
||||||
|c| self.db.get_cf(self.cfs[c as usize], key).map(|r| r.map(|v| v.to_vec()))),
|
None => {
|
||||||
|value| Ok(Some(value.clone())))
|
col.map_or_else(
|
||||||
|
|| self.db.get(key).map(|r| r.map(|v| v.to_vec())),
|
||||||
|
|c| self.db.get_cf(self.cfs[c as usize], key).map(|r| r.map(|v| v.to_vec())))
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get value by partial key. Prefix size should match configured prefix size.
|
/// Get value by partial key. Prefix size should match configured prefix size. Only searches flushed values.
|
||||||
|
// TODO: support prefix seek for unflushed ata
|
||||||
pub fn get_by_prefix(&self, col: Option<u32>, prefix: &[u8]) -> Option<Box<[u8]>> {
|
pub fn get_by_prefix(&self, col: Option<u32>, prefix: &[u8]) -> Option<Box<[u8]>> {
|
||||||
let mut iter = col.map_or_else(|| self.db.iterator(IteratorMode::From(prefix, Direction::Forward)),
|
let mut iter = col.map_or_else(|| self.db.iterator(IteratorMode::From(prefix, Direction::Forward)),
|
||||||
|c| self.db.iterator_cf(self.cfs[c as usize], IteratorMode::From(prefix, Direction::Forward)).unwrap());
|
|c| self.db.iterator_cf(self.cfs[c as usize], IteratorMode::From(prefix, Direction::Forward)).unwrap());
|
||||||
@ -403,13 +398,9 @@ impl Database {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if there is anything in the database.
|
/// Get database iterator for flushed data.
|
||||||
pub fn is_empty(&self, col: Option<u32>) -> bool {
|
|
||||||
self.iter(col).next().is_none()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get database iterator.
|
|
||||||
pub fn iter(&self, col: Option<u32>) -> DatabaseIterator {
|
pub fn iter(&self, col: Option<u32>) -> DatabaseIterator {
|
||||||
|
//TODO: iterate over overlay
|
||||||
col.map_or_else(|| DatabaseIterator { iter: self.db.iterator(IteratorMode::Start) },
|
col.map_or_else(|| DatabaseIterator { iter: self.db.iterator(IteratorMode::Start) },
|
||||||
|c| DatabaseIterator { iter: self.db.iterator_cf(self.cfs[c as usize], IteratorMode::Start).unwrap() })
|
|c| DatabaseIterator { iter: self.db.iterator_cf(self.cfs[c as usize], IteratorMode::Start).unwrap() })
|
||||||
}
|
}
|
||||||
@ -462,13 +453,23 @@ mod tests {
|
|||||||
|
|
||||||
assert_eq!(&*db.get_by_prefix(None, &key3).unwrap(), b"elephant");
|
assert_eq!(&*db.get_by_prefix(None, &key3).unwrap(), b"elephant");
|
||||||
assert_eq!(&*db.get_by_prefix(None, &key2).unwrap(), b"dog");
|
assert_eq!(&*db.get_by_prefix(None, &key2).unwrap(), b"dog");
|
||||||
|
|
||||||
|
let transaction = db.transaction();
|
||||||
|
transaction.put(None, &key1, b"horse").unwrap();
|
||||||
|
transaction.delete(None, &key3).unwrap();
|
||||||
|
db.write_buffered(transaction).unwrap();
|
||||||
|
assert!(db.get(None, &key3).unwrap().is_none());
|
||||||
|
assert_eq!(&*db.get(None, &key1).unwrap().unwrap(), b"horse");
|
||||||
|
|
||||||
|
db.flush().unwrap();
|
||||||
|
assert!(db.get(None, &key3).unwrap().is_none());
|
||||||
|
assert_eq!(&*db.get(None, &key1).unwrap().unwrap(), b"horse");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn kvdb() {
|
fn kvdb() {
|
||||||
let path = RandomTempPath::create_dir();
|
let path = RandomTempPath::create_dir();
|
||||||
let smoke = Database::open_default(path.as_path().to_str().unwrap()).unwrap();
|
let _ = Database::open_default(path.as_path().to_str().unwrap()).unwrap();
|
||||||
assert!(smoke.is_empty(None));
|
|
||||||
test_db(&DatabaseConfig::default());
|
test_db(&DatabaseConfig::default());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user