iterate DB by prefix

This commit is contained in:
Robert Habermeier 2017-04-18 15:45:15 +02:00
parent f6f9816ef4
commit 6da6c755a5
2 changed files with 60 additions and 20 deletions

View File

@ -167,6 +167,10 @@ pub trait KeyValueDB: Sync + Send {
/// Iterate over flushed data for a given column. /// Iterate over flushed data for a given column.
fn iter<'a>(&'a self, col: Option<u32>) -> Box<Iterator<Item=(Box<[u8]>, Box<[u8]>)> + 'a>; fn iter<'a>(&'a self, col: Option<u32>) -> Box<Iterator<Item=(Box<[u8]>, Box<[u8]>)> + 'a>;
/// Iterate over flushed data for a given column, starting from a given prefix.
fn iter_from_prefix<'a>(&'a self, col: Option<u32>, prefix: &'a [u8])
-> Box<Iterator<Item=(Box<[u8]>, Box<[u8]>)> + 'a>;
/// Attempt to replace this database with a new one located at the given path. /// Attempt to replace this database with a new one located at the given path.
fn restore(&self, new_db: &str) -> Result<(), UtilError>; fn restore(&self, new_db: &str) -> Result<(), UtilError>;
} }
@ -247,7 +251,21 @@ impl KeyValueDB for InMemory {
.into_iter() .into_iter()
.map(|(k, v)| (k.into_boxed_slice(), v.to_vec().into_boxed_slice())) .map(|(k, v)| (k.into_boxed_slice(), v.to_vec().into_boxed_slice()))
), ),
None => Box::new(None.into_iter()) None => Box::new(None.into_iter()),
}
}
fn iter_from_prefix<'a>(&'a self, col: Option<u32>, prefix: &'a [u8])
-> Box<Iterator<Item=(Box<[u8]>, Box<[u8]>)> + 'a>
{
match self.columns.read().get(&col) {
Some(map) => Box::new(
map.clone()
.into_iter()
.skip_while(move |&(ref k, _)| !k.starts_with(prefix))
.map(|(k, v)| (k.into_boxed_slice(), v.to_vec().into_boxed_slice()))
),
None => Box::new(None.into_iter()),
} }
} }
@ -691,23 +709,17 @@ impl Database {
/// Get value by partial key. Prefix size should match configured prefix size. Only searches flushed values. /// Get value by partial key. Prefix size should match configured prefix size. Only searches flushed values.
// TODO: support prefix seek for unflushed data // TODO: support prefix seek for unflushed data
pub fn get_by_prefix(&self, col: Option<u32>, prefix: &[u8]) -> Option<Box<[u8]>> { pub fn get_by_prefix(&self, col: Option<u32>, prefix: &[u8]) -> Option<Box<[u8]>> {
match *self.db.read() { self.iter_from_prefix(col, prefix).and_then(|mut iter| {
Some(DBAndColumns { ref db, ref cfs }) => {
let mut iter = col.map_or_else(|| db.iterator_opt(IteratorMode::From(prefix, Direction::Forward), &self.read_opts),
|c| db.iterator_cf_opt(cfs[c as usize], IteratorMode::From(prefix, Direction::Forward), &self.read_opts)
.expect("iterator params are valid; qed"));
match iter.next() { match iter.next() {
// TODO: use prefix_same_as_start read option (not availabele in C API currently) // TODO: use prefix_same_as_start read option (not availabele in C API currently)
Some((k, v)) => if k[0 .. prefix.len()] == prefix[..] { Some(v) } else { None }, Some((k, v)) => if k[0 .. prefix.len()] == prefix[..] { Some(v) } else { None },
_ => None _ => None
} }
}, })
None => None,
}
} }
/// Get database iterator for flushed data. /// Get database iterator for flushed data.
pub fn iter(&self, col: Option<u32>) -> DatabaseIterator { pub fn iter(&self, col: Option<u32>) -> Option<DatabaseIterator> {
//TODO: iterate over overlay //TODO: iterate over overlay
match *self.db.read() { match *self.db.read() {
Some(DBAndColumns { ref db, ref cfs }) => { Some(DBAndColumns { ref db, ref cfs }) => {
@ -717,12 +729,28 @@ impl Database {
.expect("iterator params are valid; qed") .expect("iterator params are valid; qed")
); );
DatabaseIterator { Some(DatabaseIterator {
iter: iter, iter: iter,
_marker: PhantomData, _marker: PhantomData,
} })
}, },
None => panic!("Not supported yet") //TODO: return an empty iterator or change return type None => None,
}
}
fn iter_from_prefix(&self, col: Option<u32>, prefix: &[u8]) -> Option<DatabaseIterator> {
match *self.db.read() {
Some(DBAndColumns { ref db, ref cfs }) => {
let iter = col.map_or_else(|| db.iterator_opt(IteratorMode::From(prefix, Direction::Forward), &self.read_opts),
|c| db.iterator_cf_opt(cfs[c as usize], IteratorMode::From(prefix, Direction::Forward), &self.read_opts)
.expect("iterator params are valid; qed"));
Some(DatabaseIterator {
iter: iter,
_marker: PhantomData,
})
},
None => None,
} }
} }
@ -836,7 +864,14 @@ impl KeyValueDB for Database {
fn iter<'a>(&'a self, col: Option<u32>) -> Box<Iterator<Item=(Box<[u8]>, Box<[u8]>)> + 'a> { fn iter<'a>(&'a self, col: Option<u32>) -> Box<Iterator<Item=(Box<[u8]>, Box<[u8]>)> + 'a> {
let unboxed = Database::iter(self, col); let unboxed = Database::iter(self, col);
Box::new(unboxed) Box::new(unboxed.into_iter().flat_map(|inner| inner))
}
fn iter_from_prefix<'a>(&'a self, col: Option<u32>, prefix: &'a [u8])
-> Box<Iterator<Item=(Box<[u8]>, Box<[u8]>)> + 'a>
{
let unboxed = Database::iter_from_prefix(self, col, prefix);
Box::new(unboxed.into_iter().flat_map(|inner| inner))
} }
fn restore(&self, new_db: &str) -> Result<(), UtilError> { fn restore(&self, new_db: &str) -> Result<(), UtilError> {

View File

@ -157,7 +157,12 @@ impl<T: SimpleMigration> Migration for T {
fn migrate(&mut self, source: Arc<Database>, config: &Config, dest: &mut Database, col: Option<u32>) -> Result<(), Error> { fn migrate(&mut self, source: Arc<Database>, config: &Config, dest: &mut Database, col: Option<u32>) -> Result<(), Error> {
let mut batch = Batch::new(config, col); let mut batch = Batch::new(config, col);
for (key, value) in source.iter(col) { let iter = match source.iter(col) {
Some(iter) => iter,
None => return Ok(()),
};
for (key, value) in iter {
if let Some((key, value)) = self.simple_migrate(key.to_vec(), value.to_vec()) { if let Some((key, value)) = self.simple_migrate(key.to_vec(), value.to_vec()) {
batch.insert(key, value, dest)?; batch.insert(key, value, dest)?;
} }