Remove old experimental remote db code (#4872)
This commit is contained in:
parent
3fe3353696
commit
4af49038cd
@ -1,25 +0,0 @@
|
|||||||
[package]
|
|
||||||
description = "Ethcore Database"
|
|
||||||
homepage = "http://parity.io"
|
|
||||||
license = "GPL-3.0"
|
|
||||||
name = "ethcore-db"
|
|
||||||
version = "1.7.0"
|
|
||||||
authors = ["Parity Technologies <admin@parity.io>"]
|
|
||||||
build = "build.rs"
|
|
||||||
|
|
||||||
[build-dependencies]
|
|
||||||
ethcore-ipc-codegen = { path = "../ipc/codegen" }
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
clippy = { version = "0.0.103", optional = true}
|
|
||||||
ethcore-devtools = { path = "../devtools" }
|
|
||||||
ethcore-ipc = { path = "../ipc/rpc" }
|
|
||||||
rocksdb = { git = "https://github.com/ethcore/rust-rocksdb" }
|
|
||||||
semver = "0.5"
|
|
||||||
ethcore-ipc-nano = { path = "../ipc/nano" }
|
|
||||||
nanomsg = { git = "https://github.com/ethcore/nanomsg.rs.git" }
|
|
||||||
crossbeam = "0.2"
|
|
||||||
ethcore-util = { path = "../util" }
|
|
||||||
|
|
||||||
[features]
|
|
||||||
dev = ["clippy"]
|
|
38
db/build.rs
38
db/build.rs
@ -1,38 +0,0 @@
|
|||||||
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
|
|
||||||
// This file is part of Parity.
|
|
||||||
|
|
||||||
// Parity is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// Parity is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU General Public License
|
|
||||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
extern crate ethcore_ipc_codegen as codegen;
|
|
||||||
|
|
||||||
use std::env;
|
|
||||||
use std::path::Path;
|
|
||||||
|
|
||||||
pub fn main() {
|
|
||||||
let out_dir = env::var_os("OUT_DIR").unwrap();
|
|
||||||
|
|
||||||
// ipc pass
|
|
||||||
{
|
|
||||||
let src = Path::new("src/lib.rs.in");
|
|
||||||
let dst = Path::new(&out_dir).join("lib.intermediate.rs.in");
|
|
||||||
codegen::expand(&src, &dst);
|
|
||||||
}
|
|
||||||
|
|
||||||
// binary serialization pass
|
|
||||||
{
|
|
||||||
let src = Path::new(&out_dir).join("lib.intermediate.rs.in");
|
|
||||||
let dst = Path::new(&out_dir).join("lib.rs");
|
|
||||||
codegen::expand(&src, &dst);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,565 +0,0 @@
|
|||||||
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
|
|
||||||
// This file is part of Parity.
|
|
||||||
|
|
||||||
// Parity is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// Parity is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU General Public License
|
|
||||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
//! Ethcore rocksdb ipc service
|
|
||||||
|
|
||||||
use traits::*;
|
|
||||||
use rocksdb::{DB, Writable, WriteBatch, IteratorMode, DBIterator, IndexType, Options, DBCompactionStyle, BlockBasedOptions, Direction};
|
|
||||||
use std::sync::{RwLock, Arc};
|
|
||||||
use std::convert::From;
|
|
||||||
use ipc::IpcConfig;
|
|
||||||
use std::mem;
|
|
||||||
use ipc::binary::BinaryConvertError;
|
|
||||||
use std::collections::{VecDeque, HashMap, BTreeMap};
|
|
||||||
|
|
||||||
enum WriteCacheEntry {
|
|
||||||
Remove,
|
|
||||||
Write(Vec<u8>),
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct WriteCache {
|
|
||||||
entries: HashMap<Vec<u8>, WriteCacheEntry>,
|
|
||||||
preferred_len: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
const FLUSH_BATCH_SIZE: usize = 4096;
|
|
||||||
|
|
||||||
impl WriteCache {
|
|
||||||
fn new(cache_len: usize) -> WriteCache {
|
|
||||||
WriteCache {
|
|
||||||
entries: HashMap::new(),
|
|
||||||
preferred_len: cache_len,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn write(&mut self, key: Vec<u8>, val: Vec<u8>) {
|
|
||||||
self.entries.insert(key, WriteCacheEntry::Write(val));
|
|
||||||
}
|
|
||||||
|
|
||||||
fn remove(&mut self, key: Vec<u8>) {
|
|
||||||
self.entries.insert(key, WriteCacheEntry::Remove);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
|
|
||||||
self.entries.get(key).and_then(
|
|
||||||
|vec_ref| match vec_ref {
|
|
||||||
&WriteCacheEntry::Write(ref val) => Some(val.clone()),
|
|
||||||
&WriteCacheEntry::Remove => None
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// WriteCache should be locked for this
|
|
||||||
fn flush(&mut self, db: &DB, amount: usize) -> Result<(), Error> {
|
|
||||||
let batch = WriteBatch::new();
|
|
||||||
let mut removed_so_far = 0;
|
|
||||||
while removed_so_far < amount {
|
|
||||||
if self.entries.len() == 0 { break; }
|
|
||||||
let removed_key = {
|
|
||||||
let (key, cache_entry) = self.entries.iter().nth(0)
|
|
||||||
.expect("if entries.len == 0, we should have break in the loop, still we got here somehow");
|
|
||||||
|
|
||||||
match *cache_entry {
|
|
||||||
WriteCacheEntry::Write(ref val) => {
|
|
||||||
batch.put(&key, val)?;
|
|
||||||
},
|
|
||||||
WriteCacheEntry::Remove => {
|
|
||||||
batch.delete(&key)?;
|
|
||||||
},
|
|
||||||
}
|
|
||||||
key.clone()
|
|
||||||
};
|
|
||||||
|
|
||||||
self.entries.remove(&removed_key);
|
|
||||||
|
|
||||||
removed_so_far = removed_so_far + 1;
|
|
||||||
}
|
|
||||||
if removed_so_far > 0 {
|
|
||||||
db.write(batch)?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// flushes until cache is empty
|
|
||||||
fn flush_all(&mut self, db: &DB) -> Result<(), Error> {
|
|
||||||
while !self.is_empty() { self.flush(db, FLUSH_BATCH_SIZE)?; }
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn is_empty(&self) -> bool {
|
|
||||||
self.entries.is_empty()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn try_shrink(&mut self, db: &DB) -> Result<(), Error> {
|
|
||||||
if self.entries.len() > self.preferred_len {
|
|
||||||
self.flush(db, FLUSH_BATCH_SIZE)?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Database {
|
|
||||||
db: RwLock<Option<DB>>,
|
|
||||||
/// Iterators - dont't use between threads!
|
|
||||||
iterators: RwLock<BTreeMap<IteratorHandle, DBIterator>>,
|
|
||||||
write_cache: RwLock<WriteCache>,
|
|
||||||
}
|
|
||||||
|
|
||||||
unsafe impl Send for Database {}
|
|
||||||
unsafe impl Sync for Database {}
|
|
||||||
|
|
||||||
impl Database {
|
|
||||||
pub fn new() -> Database {
|
|
||||||
Database {
|
|
||||||
db: RwLock::new(None),
|
|
||||||
iterators: RwLock::new(BTreeMap::new()),
|
|
||||||
write_cache: RwLock::new(WriteCache::new(DEFAULT_CACHE_LEN)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn flush(&self) -> Result<(), Error> {
|
|
||||||
let mut cache_lock = self.write_cache.write();
|
|
||||||
let db_lock = self.db.read();
|
|
||||||
if db_lock.is_none() { return Ok(()); }
|
|
||||||
let db = db_lock.as_ref().unwrap();
|
|
||||||
|
|
||||||
cache_lock.try_shrink(&db)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn flush_all(&self) -> Result<(), Error> {
|
|
||||||
let mut cache_lock = self.write_cache.write();
|
|
||||||
let db_lock = self.db.read();
|
|
||||||
if db_lock.is_none() { return Ok(()); }
|
|
||||||
let db = db_lock.as_ref().expect("we should have exited with Ok(()) on the previous step");
|
|
||||||
|
|
||||||
cache_lock.flush_all(&db)?;
|
|
||||||
Ok(())
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for Database {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
self.flush().unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[ipc]
|
|
||||||
impl DatabaseService for Database {
|
|
||||||
fn open(&self, config: DatabaseConfig, path: String) -> Result<(), Error> {
|
|
||||||
let mut db = self.db.write();
|
|
||||||
if db.is_some() { return Err(Error::AlreadyOpen); }
|
|
||||||
|
|
||||||
let mut opts = Options::new();
|
|
||||||
opts.set_max_open_files(256);
|
|
||||||
opts.create_if_missing(true);
|
|
||||||
opts.set_use_fsync(false);
|
|
||||||
opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction);
|
|
||||||
if let Some(size) = config.prefix_size {
|
|
||||||
let mut block_opts = BlockBasedOptions::new();
|
|
||||||
block_opts.set_index_type(IndexType::HashSearch);
|
|
||||||
opts.set_block_based_table_factory(&block_opts);
|
|
||||||
opts.set_prefix_extractor_fixed_size(size);
|
|
||||||
}
|
|
||||||
*db = Some(DB::open(&opts, &path)?);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Opens database in the specified path with the default config
|
|
||||||
fn open_default(&self, path: String) -> Result<(), Error> {
|
|
||||||
self.open(DatabaseConfig::default(), path)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn close(&self) -> Result<(), Error> {
|
|
||||||
self.flush_all()?;
|
|
||||||
|
|
||||||
let mut db = self.db.write();
|
|
||||||
if db.is_none() { return Err(Error::IsClosed); }
|
|
||||||
|
|
||||||
*db = None;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> {
|
|
||||||
let mut cache_lock = self.write_cache.write();
|
|
||||||
cache_lock.write(key.to_vec(), value.to_vec());
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn delete(&self, key: &[u8]) -> Result<(), Error> {
|
|
||||||
let mut cache_lock = self.write_cache.write();
|
|
||||||
cache_lock.remove(key.to_vec());
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn write(&self, transaction: DBTransaction) -> Result<(), Error> {
|
|
||||||
let mut cache_lock = self.write_cache.write();
|
|
||||||
|
|
||||||
let mut writes = transaction.writes.borrow_mut();
|
|
||||||
for kv in writes.drain(..) {
|
|
||||||
cache_lock.write(kv.key, kv.value);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut removes = transaction.removes.borrow_mut();
|
|
||||||
for k in removes.drain(..) {
|
|
||||||
cache_lock.remove(k);
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
|
|
||||||
{
|
|
||||||
let key_vec = key.to_vec();
|
|
||||||
let cache_hit = self.write_cache.read().get(&key_vec);
|
|
||||||
|
|
||||||
if cache_hit.is_some() {
|
|
||||||
return Ok(Some(cache_hit.expect("cache_hit.is_some() = true, still there is none somehow here")))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let db_lock = self.db.read();
|
|
||||||
let db = db_lock.as_ref().ok_or(Error::IsClosed)?;
|
|
||||||
|
|
||||||
match db.get(key)? {
|
|
||||||
Some(db_vec) => {
|
|
||||||
Ok(Some(db_vec.to_vec()))
|
|
||||||
},
|
|
||||||
None => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_by_prefix(&self, prefix: &[u8]) -> Result<Option<Vec<u8>>, Error> {
|
|
||||||
let db_lock = self.db.read();
|
|
||||||
let db = db_lock.as_ref().ok_or(Error::IsClosed)?;
|
|
||||||
|
|
||||||
let mut iter = db.iterator(IteratorMode::From(prefix, Direction::Forward));
|
|
||||||
match iter.next() {
|
|
||||||
// TODO: use prefix_same_as_start read option (not availabele in C API currently)
|
|
||||||
Some((k, v)) => if k[0 .. prefix.len()] == prefix[..] { Ok(Some(v.to_vec())) } else { Ok(None) },
|
|
||||||
_ => Ok(None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn is_empty(&self) -> Result<bool, Error> {
|
|
||||||
let db_lock = self.db.read();
|
|
||||||
let db = db_lock.as_ref().ok_or(Error::IsClosed)?;
|
|
||||||
|
|
||||||
Ok(db.iterator(IteratorMode::Start).next().is_none())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn iter(&self) -> Result<IteratorHandle, Error> {
|
|
||||||
let db_lock = self.db.read();
|
|
||||||
let db = db_lock.as_ref().ok_or(Error::IsClosed)?;
|
|
||||||
|
|
||||||
let mut iterators = self.iterators.write();
|
|
||||||
let next_iterator = iterators.keys().last().unwrap_or(&0) + 1;
|
|
||||||
iterators.insert(next_iterator, db.iterator(IteratorMode::Start));
|
|
||||||
Ok(next_iterator)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn iter_next(&self, handle: IteratorHandle) -> Option<KeyValue> {
|
|
||||||
let mut iterators = self.iterators.write();
|
|
||||||
let mut iterator = match iterators.get_mut(&handle) {
|
|
||||||
Some(some_iterator) => some_iterator,
|
|
||||||
None => { return None; },
|
|
||||||
};
|
|
||||||
|
|
||||||
iterator.next().and_then(|(some_key, some_val)| {
|
|
||||||
Some(KeyValue {
|
|
||||||
key: some_key.to_vec(),
|
|
||||||
value: some_val.to_vec(),
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn dispose_iter(&self, handle: IteratorHandle) -> Result<(), Error> {
|
|
||||||
let mut iterators = self.iterators.write();
|
|
||||||
iterators.remove(&handle);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO : put proper at compile-time
|
|
||||||
impl IpcConfig for Database {}
|
|
||||||
|
|
||||||
/// Database iterator
|
|
||||||
pub struct DatabaseIterator {
|
|
||||||
client: Arc<DatabaseClient<::nanomsg::Socket>>,
|
|
||||||
handle: IteratorHandle,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Iterator for DatabaseIterator {
|
|
||||||
type Item = (Vec<u8>, Vec<u8>);
|
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
|
||||||
self.client.iter_next(self.handle).and_then(|kv| Some((kv.key, kv.value)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for DatabaseIterator {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
self.client.dispose_iter(self.handle).unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod test {
|
|
||||||
|
|
||||||
use super::Database;
|
|
||||||
use traits::*;
|
|
||||||
use devtools::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn can_be_created() {
|
|
||||||
let db = Database::new();
|
|
||||||
assert!(db.is_empty().is_err());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn can_be_open_empty() {
|
|
||||||
let db = Database::new();
|
|
||||||
let path = RandomTempPath::create_dir();
|
|
||||||
db.open_default(path.as_str().to_owned()).unwrap();
|
|
||||||
|
|
||||||
assert!(db.is_empty().is_ok());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn can_store_key() {
|
|
||||||
let db = Database::new();
|
|
||||||
let path = RandomTempPath::create_dir();
|
|
||||||
db.open_default(path.as_str().to_owned()).unwrap();
|
|
||||||
|
|
||||||
db.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
|
|
||||||
db.flush_all().unwrap();
|
|
||||||
assert!(!db.is_empty().unwrap());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn can_retrieve() {
|
|
||||||
let db = Database::new();
|
|
||||||
let path = RandomTempPath::create_dir();
|
|
||||||
db.open_default(path.as_str().to_owned()).unwrap();
|
|
||||||
db.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
|
|
||||||
db.close().unwrap();
|
|
||||||
|
|
||||||
db.open_default(path.as_str().to_owned()).unwrap();
|
|
||||||
assert_eq!(db.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod write_cache_tests {
|
|
||||||
use super::Database;
|
|
||||||
use traits::*;
|
|
||||||
use devtools::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn cache_write_flush() {
|
|
||||||
let db = Database::new();
|
|
||||||
let path = RandomTempPath::create_dir();
|
|
||||||
|
|
||||||
db.open_default(path.as_str().to_owned()).unwrap();
|
|
||||||
db.put("100500".as_bytes(), "1".as_bytes()).unwrap();
|
|
||||||
db.delete("100500".as_bytes()).unwrap();
|
|
||||||
db.flush_all().unwrap();
|
|
||||||
|
|
||||||
let val = db.get("100500".as_bytes()).unwrap();
|
|
||||||
assert!(val.is_none());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod client_tests {
|
|
||||||
use super::{DatabaseClient, Database};
|
|
||||||
use traits::*;
|
|
||||||
use devtools::*;
|
|
||||||
use nanoipc;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::sync::atomic::{Ordering, AtomicBool};
|
|
||||||
use crossbeam;
|
|
||||||
use run_worker;
|
|
||||||
|
|
||||||
fn init_worker(addr: &str) -> nanoipc::Worker<Database> {
|
|
||||||
let mut worker = nanoipc::Worker::<Database>::new(&Arc::new(Database::new()));
|
|
||||||
worker.add_duplex(addr).unwrap();
|
|
||||||
worker
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn can_call_handshake() {
|
|
||||||
let url = "ipc:///tmp/parity-db-ipc-test-10.ipc";
|
|
||||||
let worker_should_exit = Arc::new(AtomicBool::new(false));
|
|
||||||
let worker_is_ready = Arc::new(AtomicBool::new(false));
|
|
||||||
let c_worker_should_exit = worker_should_exit.clone();
|
|
||||||
let c_worker_is_ready = worker_is_ready.clone();
|
|
||||||
|
|
||||||
::std::thread::spawn(move || {
|
|
||||||
let mut worker = init_worker(url);
|
|
||||||
while !c_worker_should_exit.load(Ordering::Relaxed) {
|
|
||||||
worker.poll();
|
|
||||||
c_worker_is_ready.store(true, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
while !worker_is_ready.load(Ordering::Relaxed) { }
|
|
||||||
let client = nanoipc::init_duplex_client::<DatabaseClient<_>>(url).unwrap();
|
|
||||||
|
|
||||||
let hs = client.handshake();
|
|
||||||
|
|
||||||
worker_should_exit.store(true, Ordering::Relaxed);
|
|
||||||
assert!(hs.is_ok());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn can_open_db() {
|
|
||||||
let url = "ipc:///tmp/parity-db-ipc-test-20.ipc";
|
|
||||||
let path = RandomTempPath::create_dir();
|
|
||||||
|
|
||||||
let worker_should_exit = Arc::new(AtomicBool::new(false));
|
|
||||||
let worker_is_ready = Arc::new(AtomicBool::new(false));
|
|
||||||
let c_worker_should_exit = worker_should_exit.clone();
|
|
||||||
let c_worker_is_ready = worker_is_ready.clone();
|
|
||||||
|
|
||||||
::std::thread::spawn(move || {
|
|
||||||
let mut worker = init_worker(url);
|
|
||||||
while !c_worker_should_exit.load(Ordering::Relaxed) {
|
|
||||||
worker.poll();
|
|
||||||
c_worker_is_ready.store(true, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
while !worker_is_ready.load(Ordering::Relaxed) { }
|
|
||||||
let client = nanoipc::init_duplex_client::<DatabaseClient<_>>(url).unwrap();
|
|
||||||
|
|
||||||
client.open_default(path.as_str().to_owned()).unwrap();
|
|
||||||
assert!(client.is_empty().unwrap());
|
|
||||||
worker_should_exit.store(true, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn can_put() {
|
|
||||||
let url = "ipc:///tmp/parity-db-ipc-test-30.ipc";
|
|
||||||
let path = RandomTempPath::create_dir();
|
|
||||||
|
|
||||||
crossbeam::scope(move |scope| {
|
|
||||||
let stop = Arc::new(AtomicBool::new(false));
|
|
||||||
run_worker(scope, stop.clone(), url);
|
|
||||||
let client = nanoipc::generic_client::<DatabaseClient<_>>(url).unwrap();
|
|
||||||
client.open_default(path.as_str().to_owned()).unwrap();
|
|
||||||
client.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
|
|
||||||
client.close().unwrap();
|
|
||||||
|
|
||||||
stop.store(true, Ordering::Relaxed);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn can_put_and_read() {
|
|
||||||
let url = "ipc:///tmp/parity-db-ipc-test-40.ipc";
|
|
||||||
let path = RandomTempPath::create_dir();
|
|
||||||
|
|
||||||
crossbeam::scope(move |scope| {
|
|
||||||
let stop = Arc::new(AtomicBool::new(false));
|
|
||||||
run_worker(scope, stop.clone(), url);
|
|
||||||
let client = nanoipc::generic_client::<DatabaseClient<_>>(url).unwrap();
|
|
||||||
|
|
||||||
client.open_default(path.as_str().to_owned()).unwrap();
|
|
||||||
client.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
|
|
||||||
client.close().unwrap();
|
|
||||||
|
|
||||||
client.open_default(path.as_str().to_owned()).unwrap();
|
|
||||||
assert_eq!(client.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec());
|
|
||||||
|
|
||||||
stop.store(true, Ordering::Relaxed);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn can_read_empty() {
|
|
||||||
let url = "ipc:///tmp/parity-db-ipc-test-45.ipc";
|
|
||||||
let path = RandomTempPath::create_dir();
|
|
||||||
|
|
||||||
crossbeam::scope(move |scope| {
|
|
||||||
let stop = Arc::new(AtomicBool::new(false));
|
|
||||||
run_worker(scope, stop.clone(), url);
|
|
||||||
let client = nanoipc::generic_client::<DatabaseClient<_>>(url).unwrap();
|
|
||||||
|
|
||||||
client.open_default(path.as_str().to_owned()).unwrap();
|
|
||||||
assert!(client.get("xxx".as_bytes()).unwrap().is_none());
|
|
||||||
|
|
||||||
stop.store(true, Ordering::Relaxed);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn can_commit_client_transaction() {
|
|
||||||
let url = "ipc:///tmp/parity-db-ipc-test-60.ipc";
|
|
||||||
let path = RandomTempPath::create_dir();
|
|
||||||
|
|
||||||
crossbeam::scope(move |scope| {
|
|
||||||
let stop = Arc::new(AtomicBool::new(false));
|
|
||||||
run_worker(scope, stop.clone(), url);
|
|
||||||
let client = nanoipc::generic_client::<DatabaseClient<_>>(url).unwrap();
|
|
||||||
client.open_default(path.as_str().to_owned()).unwrap();
|
|
||||||
|
|
||||||
let transaction = DBTransaction::new();
|
|
||||||
transaction.put("xxx".as_bytes(), "1".as_bytes());
|
|
||||||
client.write(transaction).unwrap();
|
|
||||||
|
|
||||||
client.close().unwrap();
|
|
||||||
|
|
||||||
client.open_default(path.as_str().to_owned()).unwrap();
|
|
||||||
assert_eq!(client.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec());
|
|
||||||
|
|
||||||
stop.store(true, Ordering::Relaxed);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn key_write_read_ipc() {
|
|
||||||
let url = "ipc:///tmp/parity-db-ipc-test-70.ipc";
|
|
||||||
let path = RandomTempPath::create_dir();
|
|
||||||
|
|
||||||
crossbeam::scope(|scope| {
|
|
||||||
let stop = StopGuard::new();
|
|
||||||
run_worker(&scope, stop.share(), url);
|
|
||||||
|
|
||||||
let client = nanoipc::generic_client::<DatabaseClient<_>>(url).unwrap();
|
|
||||||
|
|
||||||
client.open_default(path.as_str().to_owned()).unwrap();
|
|
||||||
let mut batch = Vec::new();
|
|
||||||
for _ in 0..100 {
|
|
||||||
batch.push((random_str(256).as_bytes().to_vec(), random_str(256).as_bytes().to_vec()));
|
|
||||||
batch.push((random_str(256).as_bytes().to_vec(), random_str(2048).as_bytes().to_vec()));
|
|
||||||
batch.push((random_str(2048).as_bytes().to_vec(), random_str(2048).as_bytes().to_vec()));
|
|
||||||
batch.push((random_str(2048).as_bytes().to_vec(), random_str(256).as_bytes().to_vec()));
|
|
||||||
}
|
|
||||||
|
|
||||||
for &(ref k, ref v) in batch.iter() {
|
|
||||||
client.put(k, v).unwrap();
|
|
||||||
}
|
|
||||||
client.close().unwrap();
|
|
||||||
|
|
||||||
client.open_default(path.as_str().to_owned()).unwrap();
|
|
||||||
for &(ref k, ref v) in batch.iter() {
|
|
||||||
assert_eq!(v, &client.get(k).unwrap().unwrap());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,20 +0,0 @@
|
|||||||
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
|
|
||||||
// This file is part of Parity.
|
|
||||||
|
|
||||||
// Parity is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// Parity is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU General Public License
|
|
||||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
//! Database ipc service
|
|
||||||
|
|
||||||
#![allow(dead_code, unused_assignments, unused_variables)] // codegen issues
|
|
||||||
include!(concat!(env!("OUT_DIR"), "/lib.rs"));
|
|
@ -1,89 +0,0 @@
|
|||||||
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
|
|
||||||
// This file is part of Parity.
|
|
||||||
|
|
||||||
// Parity is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// Parity is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU General Public License
|
|
||||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
extern crate ethcore_ipc as ipc;
|
|
||||||
extern crate rocksdb;
|
|
||||||
extern crate ethcore_devtools as devtools;
|
|
||||||
extern crate semver;
|
|
||||||
extern crate ethcore_ipc_nano as nanoipc;
|
|
||||||
extern crate nanomsg;
|
|
||||||
extern crate crossbeam;
|
|
||||||
extern crate ethcore_util as util;
|
|
||||||
|
|
||||||
pub mod database;
|
|
||||||
pub mod traits;
|
|
||||||
|
|
||||||
pub use traits::{DatabaseService, DBTransaction, Error};
|
|
||||||
pub use database::{Database, DatabaseClient, DatabaseIterator};
|
|
||||||
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::sync::atomic::*;
|
|
||||||
use std::path::PathBuf;
|
|
||||||
|
|
||||||
pub type DatabaseNanoClient = DatabaseClient<::nanomsg::Socket>;
|
|
||||||
pub type DatabaseConnection = nanoipc::GuardedSocket<DatabaseNanoClient>;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum ServiceError {
|
|
||||||
Io(std::io::Error),
|
|
||||||
Socket(nanoipc::SocketError),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::convert::From<std::io::Error> for ServiceError {
|
|
||||||
fn from(io_error: std::io::Error) -> ServiceError { ServiceError::Io(io_error) }
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::convert::From<nanoipc::SocketError> for ServiceError {
|
|
||||||
fn from(socket_error: nanoipc::SocketError) -> ServiceError { ServiceError::Socket(socket_error) }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn blocks_service_url(db_path: &str) -> Result<String, std::io::Error> {
|
|
||||||
let mut path = PathBuf::from(db_path);
|
|
||||||
::std::fs::create_dir_all(db_path)?;
|
|
||||||
path.push("blocks.ipc");
|
|
||||||
Ok(format!("ipc://{}", path.to_str().unwrap()))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn extras_service_url(db_path: &str) -> Result<String, ::std::io::Error> {
|
|
||||||
let mut path = PathBuf::from(db_path);
|
|
||||||
::std::fs::create_dir_all(db_path)?;
|
|
||||||
path.push("extras.ipc");
|
|
||||||
Ok(format!("ipc://{}", path.to_str().unwrap()))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn blocks_client(db_path: &str) -> Result<DatabaseConnection, ServiceError> {
|
|
||||||
let url = blocks_service_url(db_path)?;
|
|
||||||
let client = nanoipc::generic_client::<DatabaseClient<_>>(&url)?;
|
|
||||||
Ok(client)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn extras_client(db_path: &str) -> Result<DatabaseConnection, ServiceError> {
|
|
||||||
let url = extras_service_url(db_path)?;
|
|
||||||
let client = nanoipc::generic_client::<DatabaseClient<_>>(&url)?;
|
|
||||||
Ok(client)
|
|
||||||
}
|
|
||||||
|
|
||||||
// for tests
|
|
||||||
pub fn run_worker(scope: &crossbeam::Scope, stop: Arc<AtomicBool>, socket_path: &str) {
|
|
||||||
let socket_path = socket_path.to_owned();
|
|
||||||
scope.spawn(move || {
|
|
||||||
let mut worker = nanoipc::Worker::new(&Arc::new(Database::new()));
|
|
||||||
worker.add_reqrep(&socket_path).unwrap();
|
|
||||||
while !stop.load(Ordering::Relaxed) {
|
|
||||||
worker.poll();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
135
db/src/traits.rs
135
db/src/traits.rs
@ -1,135 +0,0 @@
|
|||||||
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
|
|
||||||
// This file is part of Parity.
|
|
||||||
|
|
||||||
// Parity is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// Parity is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU General Public License
|
|
||||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
//! Ethcore database trait
|
|
||||||
|
|
||||||
use std::cell::RefCell;
|
|
||||||
|
|
||||||
pub type IteratorHandle = u32;
|
|
||||||
|
|
||||||
pub const DEFAULT_CACHE_LEN: usize = 12288;
|
|
||||||
|
|
||||||
#[derive(Binary)]
|
|
||||||
pub struct KeyValue {
|
|
||||||
pub key: Vec<u8>,
|
|
||||||
pub value: Vec<u8>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Binary)]
|
|
||||||
pub enum Error {
|
|
||||||
AlreadyOpen,
|
|
||||||
IsClosed,
|
|
||||||
RocksDb(String),
|
|
||||||
TransactionUnknown,
|
|
||||||
IteratorUnknown,
|
|
||||||
UncommitedTransactions,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<String> for Error {
|
|
||||||
fn from(s: String) -> Error {
|
|
||||||
Error::RocksDb(s)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Database configuration
|
|
||||||
#[derive(Binary)]
|
|
||||||
pub struct DatabaseConfig {
|
|
||||||
/// Optional prefix size in bytes. Allows lookup by partial key.
|
|
||||||
pub prefix_size: Option<usize>,
|
|
||||||
/// write cache length
|
|
||||||
pub cache: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for DatabaseConfig {
|
|
||||||
fn default() -> DatabaseConfig {
|
|
||||||
DatabaseConfig {
|
|
||||||
prefix_size: None,
|
|
||||||
cache: DEFAULT_CACHE_LEN,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DatabaseConfig {
|
|
||||||
fn with_prefix(prefix: usize) -> DatabaseConfig {
|
|
||||||
DatabaseConfig {
|
|
||||||
prefix_size: Some(prefix),
|
|
||||||
cache: DEFAULT_CACHE_LEN,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait DatabaseService : Sized {
|
|
||||||
/// Opens database in the specified path
|
|
||||||
fn open(&self, config: DatabaseConfig, path: String) -> Result<(), Error>;
|
|
||||||
|
|
||||||
/// Opens database in the specified path with the default config
|
|
||||||
fn open_default(&self, path: String) -> Result<(), Error>;
|
|
||||||
|
|
||||||
/// Closes database
|
|
||||||
fn close(&self) -> Result<(), Error>;
|
|
||||||
|
|
||||||
/// Insert a key-value pair in the transaction. Any existing value value will be overwritten.
|
|
||||||
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error>;
|
|
||||||
|
|
||||||
/// Delete value by key.
|
|
||||||
fn delete(&self, key: &[u8]) -> Result<(), Error>;
|
|
||||||
|
|
||||||
/// Get value by key.
|
|
||||||
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error>;
|
|
||||||
|
|
||||||
/// Get value by partial key. Prefix size should match configured prefix size.
|
|
||||||
fn get_by_prefix(&self, prefix: &[u8]) -> Result<Option<Vec<u8>>, Error>;
|
|
||||||
|
|
||||||
/// Check if there is anything in the database.
|
|
||||||
fn is_empty(&self) -> Result<bool, Error>;
|
|
||||||
|
|
||||||
/// Get handle to iterate through keys
|
|
||||||
fn iter(&self) -> Result<IteratorHandle, Error>;
|
|
||||||
|
|
||||||
/// Next key-value for the the given iterator
|
|
||||||
fn iter_next(&self, iterator: IteratorHandle) -> Option<KeyValue>;
|
|
||||||
|
|
||||||
/// Dispose iteration that is no longer needed
|
|
||||||
fn dispose_iter(&self, handle: IteratorHandle) -> Result<(), Error>;
|
|
||||||
|
|
||||||
/// Write client transaction
|
|
||||||
fn write(&self, transaction: DBTransaction) -> Result<(), Error>;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Binary)]
|
|
||||||
pub struct DBTransaction {
|
|
||||||
pub writes: RefCell<Vec<KeyValue>>,
|
|
||||||
pub removes: RefCell<Vec<Vec<u8>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DBTransaction {
|
|
||||||
pub fn new() -> DBTransaction {
|
|
||||||
DBTransaction {
|
|
||||||
writes: RefCell::new(Vec::new()),
|
|
||||||
removes: RefCell::new(Vec::new()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn put(&self, key: &[u8], value: &[u8]) {
|
|
||||||
let mut brw = self.writes.borrow_mut();
|
|
||||||
brw.push(KeyValue { key: key.to_vec(), value: value.to_vec() });
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn delete(&self, key: &[u8]) {
|
|
||||||
let mut brw = self.removes.borrow_mut();
|
|
||||||
brw.push(key.to_vec());
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user