ethcore-db crate (#1097)
* trait * implentated, lifetime issue still * full api * test mod * working open * get/retrieve * fix warnings and bug * working serialization of &[u8] parameters * client attributes * fix empty payload ser/de * [ci skip] debug assert out * extra deserialization test * extra serialization test * extra serialization test * serialization fixes, nupdate rocksdb * open test working * result bug & remove some scaffolds * fix warnings * more simple tests * consistent quotes * get rid of dedicated is_open flag * hashmap -> btreemap
This commit is contained in:
370
db/src/database.rs
Normal file
370
db/src/database.rs
Normal file
@@ -0,0 +1,370 @@
|
||||
// Copyright 2015, 2016 Ethcore (UK) Ltd.
|
||||
// This file is part of Parity.
|
||||
|
||||
// Parity is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Parity is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Ethcore rocksdb ipc service
|
||||
|
||||
use traits::*;
|
||||
use rocksdb::{DB, Writable, WriteBatch, IteratorMode, DBIterator,
|
||||
IndexType, Options, DBCompactionStyle, BlockBasedOptions, Direction};
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::{RwLock};
|
||||
use std::convert::From;
|
||||
use ipc::IpcConfig;
|
||||
use std::ops::*;
|
||||
use std::mem;
|
||||
use ipc::binary::BinaryConvertError;
|
||||
use std::collections::VecDeque;
|
||||
|
||||
impl From<String> for Error {
|
||||
fn from(s: String) -> Error {
|
||||
Error::RocksDb(s)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Database {
|
||||
db: RwLock<Option<DB>>,
|
||||
transactions: RwLock<BTreeMap<TransactionHandle, WriteBatch>>,
|
||||
iterators: RwLock<BTreeMap<IteratorHandle, DBIterator>>,
|
||||
}
|
||||
|
||||
impl Database {
|
||||
pub fn new() -> Database {
|
||||
Database {
|
||||
db: RwLock::new(None),
|
||||
transactions: RwLock::new(BTreeMap::new()),
|
||||
iterators: RwLock::new(BTreeMap::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Ipc)]
|
||||
impl DatabaseService for Database {
|
||||
fn open(&self, config: DatabaseConfig, path: String) -> Result<(), Error> {
|
||||
let mut db = self.db.write().unwrap();
|
||||
if db.is_some() { return Err(Error::AlreadyOpen); }
|
||||
|
||||
let mut opts = Options::new();
|
||||
opts.set_max_open_files(256);
|
||||
opts.create_if_missing(true);
|
||||
opts.set_use_fsync(false);
|
||||
opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction);
|
||||
if let Some(size) = config.prefix_size {
|
||||
let mut block_opts = BlockBasedOptions::new();
|
||||
block_opts.set_index_type(IndexType::HashSearch);
|
||||
opts.set_block_based_table_factory(&block_opts);
|
||||
opts.set_prefix_extractor_fixed_size(size);
|
||||
}
|
||||
*db = Some(try!(DB::open(&opts, &path)));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn close(&self) -> Result<(), Error> {
|
||||
let mut db = self.db.write().unwrap();
|
||||
if db.is_none() { return Err(Error::IsClosed); }
|
||||
|
||||
// TODO: wait for transactions to expire/close here?
|
||||
if self.transactions.read().unwrap().len() > 0 { return Err(Error::UncommitedTransactions); }
|
||||
|
||||
*db = None;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> {
|
||||
let db_lock = self.db.read().unwrap();
|
||||
let db = try!(db_lock.as_ref().ok_or(Error::IsClosed));
|
||||
|
||||
try!(db.put(key, value));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn delete(&self, key: &[u8]) -> Result<(), Error> {
|
||||
let db_lock = self.db.read().unwrap();
|
||||
let db = try!(db_lock.as_ref().ok_or(Error::IsClosed));
|
||||
|
||||
try!(db.delete(key));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write(&self, handle: TransactionHandle) -> Result<(), Error> {
|
||||
let db_lock = self.db.read().unwrap();
|
||||
let db = try!(db_lock.as_ref().ok_or(Error::IsClosed));
|
||||
|
||||
let mut transactions = self.transactions.write().unwrap();
|
||||
let batch = try!(
|
||||
transactions.remove(&handle).ok_or(Error::TransactionUnknown)
|
||||
);
|
||||
try!(db.write(batch));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
|
||||
let db_lock = self.db.read().unwrap();
|
||||
let db = try!(db_lock.as_ref().ok_or(Error::IsClosed));
|
||||
|
||||
match try!(db.get(key)) {
|
||||
Some(db_vec) => Ok(Some(db_vec.to_vec())),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_by_prefix(&self, prefix: &[u8]) -> Result<Option<Vec<u8>>, Error> {
|
||||
let db_lock = self.db.read().unwrap();
|
||||
let db = try!(db_lock.as_ref().ok_or(Error::IsClosed));
|
||||
|
||||
let mut iter = db.iterator(IteratorMode::From(prefix, Direction::Forward));
|
||||
match iter.next() {
|
||||
// TODO: use prefix_same_as_start read option (not availabele in C API currently)
|
||||
Some((k, v)) => if k[0 .. prefix.len()] == prefix[..] { Ok(Some(v.to_vec())) } else { Ok(None) },
|
||||
_ => Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn is_empty(&self) -> Result<bool, Error> {
|
||||
let db_lock = self.db.read().unwrap();
|
||||
let db = try!(db_lock.as_ref().ok_or(Error::IsClosed));
|
||||
|
||||
Ok(db.iterator(IteratorMode::Start).next().is_none())
|
||||
}
|
||||
|
||||
fn iter(&self) -> Result<IteratorHandle, Error> {
|
||||
let db_lock = self.db.read().unwrap();
|
||||
let db = try!(db_lock.as_ref().ok_or(Error::IsClosed));
|
||||
|
||||
let mut iterators = self.iterators.write().unwrap();
|
||||
let next_iterator = iterators.keys().last().unwrap_or(&0) + 1;
|
||||
iterators.insert(next_iterator, db.iterator(IteratorMode::Start));
|
||||
Ok(next_iterator)
|
||||
}
|
||||
|
||||
fn iter_next(&self, handle: IteratorHandle) -> Option<KeyValue>
|
||||
{
|
||||
let mut iterators = self.iterators.write().unwrap();
|
||||
let mut iterator = match iterators.get_mut(&handle) {
|
||||
Some(some_iterator) => some_iterator,
|
||||
None => { return None; },
|
||||
};
|
||||
|
||||
iterator.next().and_then(|(some_key, some_val)| {
|
||||
Some(KeyValue {
|
||||
key: some_key.to_vec(),
|
||||
value: some_val.to_vec(),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
fn transaction_put(&self, transaction: TransactionHandle, key: &[u8], value: &[u8]) -> Result<(), Error>
|
||||
{
|
||||
let mut transactions = self.transactions.write().unwrap();
|
||||
let batch = try!(
|
||||
transactions.get_mut(&transaction).ok_or(Error::TransactionUnknown)
|
||||
);
|
||||
try!(batch.put(&key, &value));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn transaction_delete(&self, transaction: TransactionHandle, key: &[u8]) -> Result<(), Error> {
|
||||
let mut transactions = self.transactions.write().unwrap();
|
||||
let batch = try!(
|
||||
transactions.get_mut(&transaction).ok_or(Error::TransactionUnknown)
|
||||
);
|
||||
try!(batch.delete(&key));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn new_transaction(&self) -> TransactionHandle {
|
||||
let mut transactions = self.transactions.write().unwrap();
|
||||
let next_transaction = transactions.keys().last().unwrap_or(&0) + 1;
|
||||
transactions.insert(next_transaction, WriteBatch::new());
|
||||
|
||||
next_transaction
|
||||
}
|
||||
}
|
||||
|
||||
// TODO : put proper at compile-time
|
||||
impl IpcConfig for Database {}
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
use super::Database;
|
||||
use traits::*;
|
||||
use devtools::*;
|
||||
|
||||
#[test]
|
||||
fn can_be_created() {
|
||||
let db = Database::new();
|
||||
assert!(db.is_empty().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn can_be_open_empty() {
|
||||
let db = Database::new();
|
||||
let path = RandomTempPath::create_dir();
|
||||
db.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap();
|
||||
|
||||
assert!(db.is_empty().is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn can_store_key() {
|
||||
let db = Database::new();
|
||||
let path = RandomTempPath::create_dir();
|
||||
db.open(DatabaseConfig { prefix_size: None }, path.as_str().to_owned()).unwrap();
|
||||
|
||||
db.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
|
||||
assert!(!db.is_empty().unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn can_retrieve() {
|
||||
let db = Database::new();
|
||||
let path = RandomTempPath::create_dir();
|
||||
db.open(DatabaseConfig { prefix_size: None }, path.as_str().to_owned()).unwrap();
|
||||
db.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
|
||||
db.close().unwrap();
|
||||
|
||||
db.open(DatabaseConfig { prefix_size: None }, path.as_str().to_owned()).unwrap();
|
||||
assert_eq!(db.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec());
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod client_tests {
|
||||
use super::{DatabaseClient, Database};
|
||||
use traits::*;
|
||||
use devtools::*;
|
||||
use nanoipc;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{Ordering, AtomicBool};
|
||||
|
||||
fn init_worker(addr: &str) -> nanoipc::Worker<Database> {
|
||||
let mut worker = nanoipc::Worker::<Database>::new(&Arc::new(Database::new()));
|
||||
worker.add_duplex(addr).unwrap();
|
||||
worker
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn can_call_handshake() {
|
||||
let url = "ipc:///tmp/parity-db-ipc-test-10.ipc";
|
||||
let worker_should_exit = Arc::new(AtomicBool::new(false));
|
||||
let worker_is_ready = Arc::new(AtomicBool::new(false));
|
||||
let c_worker_should_exit = worker_should_exit.clone();
|
||||
let c_worker_is_ready = worker_is_ready.clone();
|
||||
|
||||
::std::thread::spawn(move || {
|
||||
let mut worker = init_worker(url);
|
||||
while !c_worker_should_exit.load(Ordering::Relaxed) {
|
||||
worker.poll();
|
||||
c_worker_is_ready.store(true, Ordering::Relaxed);
|
||||
}
|
||||
});
|
||||
|
||||
while !worker_is_ready.load(Ordering::Relaxed) { }
|
||||
let client = nanoipc::init_duplex_client::<DatabaseClient<_>>(url).unwrap();
|
||||
|
||||
let hs = client.handshake();
|
||||
|
||||
worker_should_exit.store(true, Ordering::Relaxed);
|
||||
assert!(hs.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn can_open_db() {
|
||||
let url = "ipc:///tmp/parity-db-ipc-test-20.ipc";
|
||||
let path = RandomTempPath::create_dir();
|
||||
|
||||
let worker_should_exit = Arc::new(AtomicBool::new(false));
|
||||
let worker_is_ready = Arc::new(AtomicBool::new(false));
|
||||
let c_worker_should_exit = worker_should_exit.clone();
|
||||
let c_worker_is_ready = worker_is_ready.clone();
|
||||
|
||||
::std::thread::spawn(move || {
|
||||
let mut worker = init_worker(url);
|
||||
while !c_worker_should_exit.load(Ordering::Relaxed) {
|
||||
worker.poll();
|
||||
c_worker_is_ready.store(true, Ordering::Relaxed);
|
||||
}
|
||||
});
|
||||
|
||||
while !worker_is_ready.load(Ordering::Relaxed) { }
|
||||
let client = nanoipc::init_duplex_client::<DatabaseClient<_>>(url).unwrap();
|
||||
|
||||
client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap();
|
||||
assert!(client.is_empty().unwrap());
|
||||
worker_should_exit.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn can_put() {
|
||||
let url = "ipc:///tmp/parity-db-ipc-test-30.ipc";
|
||||
let path = RandomTempPath::create_dir();
|
||||
|
||||
let worker_should_exit = Arc::new(AtomicBool::new(false));
|
||||
let worker_is_ready = Arc::new(AtomicBool::new(false));
|
||||
let c_worker_should_exit = worker_should_exit.clone();
|
||||
let c_worker_is_ready = worker_is_ready.clone();
|
||||
|
||||
::std::thread::spawn(move || {
|
||||
let mut worker = init_worker(url);
|
||||
while !c_worker_should_exit.load(Ordering::Relaxed) {
|
||||
worker.poll();
|
||||
c_worker_is_ready.store(true, Ordering::Relaxed);
|
||||
}
|
||||
});
|
||||
|
||||
while !worker_is_ready.load(Ordering::Relaxed) { }
|
||||
let client = nanoipc::init_duplex_client::<DatabaseClient<_>>(url).unwrap();
|
||||
|
||||
client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap();
|
||||
client.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
|
||||
client.close().unwrap();
|
||||
|
||||
worker_should_exit.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn can_put_and_read() {
|
||||
let url = "ipc:///tmp/parity-db-ipc-test-40.ipc";
|
||||
let path = RandomTempPath::create_dir();
|
||||
|
||||
let worker_should_exit = Arc::new(AtomicBool::new(false));
|
||||
let worker_is_ready = Arc::new(AtomicBool::new(false));
|
||||
let c_worker_should_exit = worker_should_exit.clone();
|
||||
let c_worker_is_ready = worker_is_ready.clone();
|
||||
|
||||
::std::thread::spawn(move || {
|
||||
let mut worker = init_worker(url);
|
||||
while !c_worker_should_exit.load(Ordering::Relaxed) {
|
||||
worker.poll();
|
||||
c_worker_is_ready.store(true, Ordering::Relaxed);
|
||||
}
|
||||
});
|
||||
|
||||
while !worker_is_ready.load(Ordering::Relaxed) { }
|
||||
let client = nanoipc::init_duplex_client::<DatabaseClient<_>>(url).unwrap();
|
||||
|
||||
client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap();
|
||||
client.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
|
||||
client.close().unwrap();
|
||||
|
||||
client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap();
|
||||
assert_eq!(client.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec());
|
||||
|
||||
worker_should_exit.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
20
db/src/lib.rs
Normal file
20
db/src/lib.rs
Normal file
@@ -0,0 +1,20 @@
|
||||
// Copyright 2015, 2016 Ethcore (UK) Ltd.
|
||||
// This file is part of Parity.
|
||||
|
||||
// Parity is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Parity is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Database ipc service
|
||||
|
||||
#![allow(dead_code, unused_assignments, unused_variables)] // codegen issues
|
||||
include!(concat!(env!("OUT_DIR"), "/lib.rs"));
|
||||
25
db/src/lib.rs.in
Normal file
25
db/src/lib.rs.in
Normal file
@@ -0,0 +1,25 @@
|
||||
// Copyright 2015, 2016 Ethcore (UK) Ltd.
|
||||
// This file is part of Parity.
|
||||
|
||||
// Parity is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Parity is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
extern crate ethcore_ipc as ipc;
|
||||
extern crate rocksdb;
|
||||
extern crate ethcore_devtools as devtools;
|
||||
extern crate semver;
|
||||
extern crate ethcore_ipc_nano as nanoipc;
|
||||
extern crate ethcore_util as util;
|
||||
|
||||
pub mod database;
|
||||
pub mod traits;
|
||||
0
db/src/service.rs
Normal file
0
db/src/service.rs
Normal file
73
db/src/traits.rs
Normal file
73
db/src/traits.rs
Normal file
@@ -0,0 +1,73 @@
|
||||
//! Ethcore database trait
|
||||
|
||||
use ipc::BinaryConvertable;
|
||||
use std::mem;
|
||||
use ipc::binary::BinaryConvertError;
|
||||
use std::collections::VecDeque;
|
||||
|
||||
pub type TransactionHandle = u32;
|
||||
pub type IteratorHandle = u32;
|
||||
|
||||
#[derive(Binary)]
|
||||
pub struct KeyValue {
|
||||
pub key: Vec<u8>,
|
||||
pub value: Vec<u8>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Binary)]
|
||||
pub enum Error {
|
||||
AlreadyOpen,
|
||||
IsClosed,
|
||||
RocksDb(String),
|
||||
TransactionUnknown,
|
||||
IteratorUnknown,
|
||||
UncommitedTransactions,
|
||||
}
|
||||
|
||||
/// Database configuration
|
||||
#[derive(Binary)]
|
||||
pub struct DatabaseConfig {
|
||||
/// Optional prefix size in bytes. Allows lookup by partial key.
|
||||
pub prefix_size: Option<usize>
|
||||
}
|
||||
|
||||
pub trait DatabaseService {
|
||||
/// Opens database in the specified path
|
||||
fn open(&self, config: DatabaseConfig, path: String) -> Result<(), Error>;
|
||||
|
||||
/// Closes database
|
||||
fn close(&self) -> Result<(), Error>;
|
||||
|
||||
/// Insert a key-value pair in the transaction. Any existing value value will be overwritten.
|
||||
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error>;
|
||||
|
||||
/// Delete value by key.
|
||||
fn delete(&self, key: &[u8]) -> Result<(), Error>;
|
||||
|
||||
/// Insert a key-value pair in the transaction. Any existing value value will be overwritten.
|
||||
fn transaction_put(&self, transaction: TransactionHandle, key: &[u8], value: &[u8]) -> Result<(), Error>;
|
||||
|
||||
/// Delete value by key using transaction
|
||||
fn transaction_delete(&self, transaction: TransactionHandle, key: &[u8]) -> Result<(), Error>;
|
||||
|
||||
/// Commit transaction to database.
|
||||
fn write(&self, tr: TransactionHandle) -> Result<(), Error>;
|
||||
|
||||
/// Initiate new transaction on database
|
||||
fn new_transaction(&self) -> TransactionHandle;
|
||||
|
||||
/// Get value by key.
|
||||
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error>;
|
||||
|
||||
/// Get value by partial key. Prefix size should match configured prefix size.
|
||||
fn get_by_prefix(&self, prefix: &[u8]) -> Result<Option<Vec<u8>>, Error>;
|
||||
|
||||
/// Check if there is anything in the database.
|
||||
fn is_empty(&self) -> Result<bool, Error>;
|
||||
|
||||
/// Get handle to iterate through keys
|
||||
fn iter(&self) -> Result<IteratorHandle, Error>;
|
||||
|
||||
/// Next key-value for the the given iterator
|
||||
fn iter_next(&self, iterator: IteratorHandle) -> Option<KeyValue>;
|
||||
}
|
||||
Reference in New Issue
Block a user