Merge pull request #1185 from ethcore/ipc-fixes

Database service upgrade (from the ipc branch)
This commit is contained in:
Nikolay Volf 2016-05-31 17:00:07 +02:00
commit 3dd642abe9
5 changed files with 446 additions and 118 deletions

View File

@ -8,17 +8,19 @@ authors = ["Ethcore <admin@ethcore.io>"]
build = "build.rs" build = "build.rs"
[build-dependencies] [build-dependencies]
syntex = "*" syntex = "0.32"
ethcore-ipc-codegen = { path = "../ipc/codegen" } ethcore-ipc-codegen = { path = "../ipc/codegen" }
[dependencies] [dependencies]
ethcore-util = { path = "../util" }
clippy = { version = "0.0.67", optional = true} clippy = { version = "0.0.67", optional = true}
ethcore-devtools = { path = "../devtools" } ethcore-devtools = { path = "../devtools" }
ethcore-ipc = { path = "../ipc/rpc" } ethcore-ipc = { path = "../ipc/rpc" }
rocksdb = { git = "https://github.com/ethcore/rust-rocksdb" } rocksdb = { git = "https://github.com/ethcore/rust-rocksdb" }
semver = "0.2" semver = "0.2"
ethcore-ipc-nano = { path = "../ipc/nano" } ethcore-ipc-nano = { path = "../ipc/nano" }
nanomsg = { git = "https://github.com/ethcore/nanomsg.rs.git" }
crossbeam = "0.2"
ethcore-util = { path = "../util" }
[features] [features]
dev = ["clippy"] dev = ["clippy"]

View File

@ -18,15 +18,13 @@
use traits::*; use traits::*;
use rocksdb::{DB, Writable, WriteBatch, IteratorMode, DBIterator, use rocksdb::{DB, Writable, WriteBatch, IteratorMode, DBIterator,
IndexType, Options, DBCompactionStyle, BlockBasedOptions, Direction}; IndexType, Options, DBCompactionStyle, BlockBasedOptions, Direction};
use std::collections::BTreeMap; use std::sync::{RwLock, Arc};
use std::sync::{RwLock};
use std::convert::From; use std::convert::From;
use ipc::IpcConfig; use ipc::IpcConfig;
use std::ops::*;
use std::mem; use std::mem;
use ipc::binary::BinaryConvertError; use ipc::binary::BinaryConvertError;
use std::collections::VecDeque; use std::collections::{VecDeque, HashMap, BTreeMap};
impl From<String> for Error { impl From<String> for Error {
fn from(s: String) -> Error { fn from(s: String) -> Error {
@ -34,20 +32,136 @@ impl From<String> for Error {
} }
} }
enum WriteCacheEntry {
Remove,
Write(Vec<u8>),
}
pub struct WriteCache {
entries: HashMap<Vec<u8>, WriteCacheEntry>,
preferred_len: usize,
}
const FLUSH_BATCH_SIZE: usize = 4096;
impl WriteCache {
fn new(cache_len: usize) -> WriteCache {
WriteCache {
entries: HashMap::new(),
preferred_len: cache_len,
}
}
fn write(&mut self, key: Vec<u8>, val: Vec<u8>) {
self.entries.insert(key, WriteCacheEntry::Write(val));
}
fn remove(&mut self, key: Vec<u8>) {
self.entries.insert(key, WriteCacheEntry::Remove);
}
fn get(&self, key: &Vec<u8>) -> Option<Vec<u8>> {
self.entries.get(key).and_then(
|vec_ref| match vec_ref {
&WriteCacheEntry::Write(ref val) => Some(val.clone()),
&WriteCacheEntry::Remove => None
})
}
/// WriteCache should be locked for this
fn flush(&mut self, db: &DB, amount: usize) -> Result<(), Error> {
let batch = WriteBatch::new();
let mut removed_so_far = 0;
while removed_so_far < amount {
if self.entries.len() == 0 { break; }
let removed_key = {
let (key, cache_entry) = self.entries.iter().nth(0)
.expect("if entries.len == 0, we should have break in the loop, still we got here somehow");
match *cache_entry {
WriteCacheEntry::Write(ref val) => {
try!(batch.put(&key, val));
},
WriteCacheEntry::Remove => {
try!(batch.delete(&key));
},
}
key.clone()
};
self.entries.remove(&removed_key);
removed_so_far = removed_so_far + 1;
}
if removed_so_far > 0 {
try!(db.write(batch));
}
Ok(())
}
/// flushes until cache is empty
fn flush_all(&mut self, db: &DB) -> Result<(), Error> {
while !self.is_empty() { try!(self.flush(db, FLUSH_BATCH_SIZE)); }
Ok(())
}
fn is_empty(&self) -> bool {
self.entries.is_empty()
}
fn try_shrink(&mut self, db: &DB) -> Result<(), Error> {
if self.entries.len() > self.preferred_len {
try!(self.flush(db, FLUSH_BATCH_SIZE));
}
Ok(())
}
}
pub struct Database { pub struct Database {
db: RwLock<Option<DB>>, db: RwLock<Option<DB>>,
transactions: RwLock<BTreeMap<TransactionHandle, WriteBatch>>, /// Iterators - dont't use between threads!
iterators: RwLock<BTreeMap<IteratorHandle, DBIterator>>, iterators: RwLock<BTreeMap<IteratorHandle, DBIterator>>,
write_cache: RwLock<WriteCache>,
} }
unsafe impl Send for Database {}
unsafe impl Sync for Database {}
impl Database { impl Database {
pub fn new() -> Database { pub fn new() -> Database {
Database { Database {
db: RwLock::new(None), db: RwLock::new(None),
transactions: RwLock::new(BTreeMap::new()),
iterators: RwLock::new(BTreeMap::new()), iterators: RwLock::new(BTreeMap::new()),
write_cache: RwLock::new(WriteCache::new(DEFAULT_CACHE_LEN)),
} }
} }
pub fn flush(&self) -> Result<(), Error> {
let mut cache_lock = self.write_cache.write().unwrap();
let db_lock = self.db.read().unwrap();
if db_lock.is_none() { return Ok(()); }
let db = db_lock.as_ref().unwrap();
try!(cache_lock.try_shrink(&db));
Ok(())
}
pub fn flush_all(&self) -> Result<(), Error> {
let mut cache_lock = self.write_cache.write().unwrap();
let db_lock = self.db.read().unwrap();
if db_lock.is_none() { return Ok(()); }
let db = db_lock.as_ref().expect("we should have exited with Ok(()) on the previous step");
try!(cache_lock.flush_all(&db));
Ok(())
}
}
impl Drop for Database {
fn drop(&mut self) {
self.flush().unwrap();
}
} }
#[derive(Ipc)] #[derive(Ipc)]
@ -72,51 +186,64 @@ impl DatabaseService for Database {
Ok(()) Ok(())
} }
/// Opens database in the specified path with the default config
fn open_default(&self, path: String) -> Result<(), Error> {
self.open(DatabaseConfig::default(), path)
}
fn close(&self) -> Result<(), Error> { fn close(&self) -> Result<(), Error> {
try!(self.flush_all());
let mut db = self.db.write().unwrap(); let mut db = self.db.write().unwrap();
if db.is_none() { return Err(Error::IsClosed); } if db.is_none() { return Err(Error::IsClosed); }
// TODO: wait for transactions to expire/close here?
if self.transactions.read().unwrap().len() > 0 { return Err(Error::UncommitedTransactions); }
*db = None; *db = None;
Ok(()) Ok(())
} }
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> {
let db_lock = self.db.read().unwrap(); let mut cache_lock = self.write_cache.write().unwrap();
let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); cache_lock.write(key.to_vec(), value.to_vec());
try!(db.put(key, value));
Ok(()) Ok(())
} }
fn delete(&self, key: &[u8]) -> Result<(), Error> { fn delete(&self, key: &[u8]) -> Result<(), Error> {
let db_lock = self.db.read().unwrap(); let mut cache_lock = self.write_cache.write().unwrap();
let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); cache_lock.remove(key.to_vec());
try!(db.delete(key));
Ok(()) Ok(())
} }
fn write(&self, handle: TransactionHandle) -> Result<(), Error> { fn write(&self, transaction: DBTransaction) -> Result<(), Error> {
let db_lock = self.db.read().unwrap(); let mut cache_lock = self.write_cache.write().unwrap();
let db = try!(db_lock.as_ref().ok_or(Error::IsClosed));
let mut transactions = self.transactions.write().unwrap(); let mut writes = transaction.writes.borrow_mut();
let batch = try!( for kv in writes.drain(..) {
transactions.remove(&handle).ok_or(Error::TransactionUnknown) cache_lock.write(kv.key, kv.value);
); }
try!(db.write(batch));
let mut removes = transaction.removes.borrow_mut();
for k in removes.drain(..) {
cache_lock.remove(k);
}
Ok(()) Ok(())
} }
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error> { fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
{
let key_vec = key.to_vec();
let cache_hit = self.write_cache.read().unwrap().get(&key_vec);
if cache_hit.is_some() {
return Ok(Some(cache_hit.expect("cache_hit.is_some() = true, still there is none somehow here")))
}
}
let db_lock = self.db.read().unwrap(); let db_lock = self.db.read().unwrap();
let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); let db = try!(db_lock.as_ref().ok_or(Error::IsClosed));
match try!(db.get(key)) { match try!(db.get(key)) {
Some(db_vec) => Ok(Some(db_vec.to_vec())), Some(db_vec) => {
Ok(Some(db_vec.to_vec()))
},
None => Ok(None), None => Ok(None),
} }
} }
@ -166,37 +293,35 @@ impl DatabaseService for Database {
}) })
} }
fn transaction_put(&self, transaction: TransactionHandle, key: &[u8], value: &[u8]) -> Result<(), Error> fn dispose_iter(&self, handle: IteratorHandle) -> Result<(), Error> {
{ let mut iterators = self.iterators.write().unwrap();
let mut transactions = self.transactions.write().unwrap(); iterators.remove(&handle);
let batch = try!(
transactions.get_mut(&transaction).ok_or(Error::TransactionUnknown)
);
try!(batch.put(&key, &value));
Ok(()) Ok(())
} }
fn transaction_delete(&self, transaction: TransactionHandle, key: &[u8]) -> Result<(), Error> {
let mut transactions = self.transactions.write().unwrap();
let batch = try!(
transactions.get_mut(&transaction).ok_or(Error::TransactionUnknown)
);
try!(batch.delete(&key));
Ok(())
}
fn new_transaction(&self) -> TransactionHandle {
let mut transactions = self.transactions.write().unwrap();
let next_transaction = transactions.keys().last().unwrap_or(&0) + 1;
transactions.insert(next_transaction, WriteBatch::new());
next_transaction
}
} }
// TODO : put proper at compile-time // TODO : put proper at compile-time
impl IpcConfig for Database {} impl IpcConfig for Database {}
/// Database iterator
pub struct DatabaseIterator {
client: Arc<DatabaseClient<::nanomsg::Socket>>,
handle: IteratorHandle,
}
impl Iterator for DatabaseIterator {
type Item = (Vec<u8>, Vec<u8>);
fn next(&mut self) -> Option<Self::Item> {
self.client.iter_next(self.handle).and_then(|kv| Some((kv.key, kv.value)))
}
}
impl Drop for DatabaseIterator {
fn drop(&mut self) {
self.client.dispose_iter(self.handle).unwrap();
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
@ -215,7 +340,7 @@ mod test {
fn can_be_open_empty() { fn can_be_open_empty() {
let db = Database::new(); let db = Database::new();
let path = RandomTempPath::create_dir(); let path = RandomTempPath::create_dir();
db.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap(); db.open_default(path.as_str().to_owned()).unwrap();
assert!(db.is_empty().is_ok()); assert!(db.is_empty().is_ok());
} }
@ -224,9 +349,10 @@ mod test {
fn can_store_key() { fn can_store_key() {
let db = Database::new(); let db = Database::new();
let path = RandomTempPath::create_dir(); let path = RandomTempPath::create_dir();
db.open(DatabaseConfig { prefix_size: None }, path.as_str().to_owned()).unwrap(); db.open_default(path.as_str().to_owned()).unwrap();
db.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); db.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
db.flush_all().unwrap();
assert!(!db.is_empty().unwrap()); assert!(!db.is_empty().unwrap());
} }
@ -234,15 +360,37 @@ mod test {
fn can_retrieve() { fn can_retrieve() {
let db = Database::new(); let db = Database::new();
let path = RandomTempPath::create_dir(); let path = RandomTempPath::create_dir();
db.open(DatabaseConfig { prefix_size: None }, path.as_str().to_owned()).unwrap(); db.open_default(path.as_str().to_owned()).unwrap();
db.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); db.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
db.close().unwrap(); db.close().unwrap();
db.open(DatabaseConfig { prefix_size: None }, path.as_str().to_owned()).unwrap(); db.open_default(path.as_str().to_owned()).unwrap();
assert_eq!(db.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec()); assert_eq!(db.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec());
} }
} }
#[cfg(test)]
mod write_cache_tests {
use super::Database;
use traits::*;
use devtools::*;
#[test]
fn cache_write_flush() {
let db = Database::new();
let path = RandomTempPath::create_dir();
db.open_default(path.as_str().to_owned()).unwrap();
db.put("100500".as_bytes(), "1".as_bytes()).unwrap();
db.delete("100500".as_bytes()).unwrap();
db.flush_all().unwrap();
let val = db.get("100500".as_bytes()).unwrap();
assert!(val.is_none());
}
}
#[cfg(test)] #[cfg(test)]
mod client_tests { mod client_tests {
use super::{DatabaseClient, Database}; use super::{DatabaseClient, Database};
@ -251,6 +399,8 @@ mod client_tests {
use nanoipc; use nanoipc;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{Ordering, AtomicBool}; use std::sync::atomic::{Ordering, AtomicBool};
use crossbeam;
use run_worker;
fn init_worker(addr: &str) -> nanoipc::Worker<Database> { fn init_worker(addr: &str) -> nanoipc::Worker<Database> {
let mut worker = nanoipc::Worker::<Database>::new(&Arc::new(Database::new())); let mut worker = nanoipc::Worker::<Database>::new(&Arc::new(Database::new()));
@ -268,7 +418,7 @@ mod client_tests {
::std::thread::spawn(move || { ::std::thread::spawn(move || {
let mut worker = init_worker(url); let mut worker = init_worker(url);
while !c_worker_should_exit.load(Ordering::Relaxed) { while !c_worker_should_exit.load(Ordering::Relaxed) {
worker.poll(); worker.poll();
c_worker_is_ready.store(true, Ordering::Relaxed); c_worker_is_ready.store(true, Ordering::Relaxed);
} }
@ -295,7 +445,7 @@ mod client_tests {
::std::thread::spawn(move || { ::std::thread::spawn(move || {
let mut worker = init_worker(url); let mut worker = init_worker(url);
while !c_worker_should_exit.load(Ordering::Relaxed) { while !c_worker_should_exit.load(Ordering::Relaxed) {
worker.poll(); worker.poll();
c_worker_is_ready.store(true, Ordering::Relaxed); c_worker_is_ready.store(true, Ordering::Relaxed);
} }
@ -304,7 +454,7 @@ mod client_tests {
while !worker_is_ready.load(Ordering::Relaxed) { } while !worker_is_ready.load(Ordering::Relaxed) { }
let client = nanoipc::init_duplex_client::<DatabaseClient<_>>(url).unwrap(); let client = nanoipc::init_duplex_client::<DatabaseClient<_>>(url).unwrap();
client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap(); client.open_default(path.as_str().to_owned()).unwrap();
assert!(client.is_empty().unwrap()); assert!(client.is_empty().unwrap());
worker_should_exit.store(true, Ordering::Relaxed); worker_should_exit.store(true, Ordering::Relaxed);
} }
@ -314,27 +464,16 @@ mod client_tests {
let url = "ipc:///tmp/parity-db-ipc-test-30.ipc"; let url = "ipc:///tmp/parity-db-ipc-test-30.ipc";
let path = RandomTempPath::create_dir(); let path = RandomTempPath::create_dir();
let worker_should_exit = Arc::new(AtomicBool::new(false)); crossbeam::scope(move |scope| {
let worker_is_ready = Arc::new(AtomicBool::new(false)); let stop = Arc::new(AtomicBool::new(false));
let c_worker_should_exit = worker_should_exit.clone(); run_worker(scope, stop.clone(), url);
let c_worker_is_ready = worker_is_ready.clone(); let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap();
client.open_default(path.as_str().to_owned()).unwrap();
client.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
client.close().unwrap();
::std::thread::spawn(move || { stop.store(true, Ordering::Relaxed);
let mut worker = init_worker(url);
while !c_worker_should_exit.load(Ordering::Relaxed) {
worker.poll();
c_worker_is_ready.store(true, Ordering::Relaxed);
}
}); });
while !worker_is_ready.load(Ordering::Relaxed) { }
let client = nanoipc::init_duplex_client::<DatabaseClient<_>>(url).unwrap();
client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap();
client.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
client.close().unwrap();
worker_should_exit.store(true, Ordering::Relaxed);
} }
#[test] #[test]
@ -342,29 +481,93 @@ mod client_tests {
let url = "ipc:///tmp/parity-db-ipc-test-40.ipc"; let url = "ipc:///tmp/parity-db-ipc-test-40.ipc";
let path = RandomTempPath::create_dir(); let path = RandomTempPath::create_dir();
let worker_should_exit = Arc::new(AtomicBool::new(false)); crossbeam::scope(move |scope| {
let worker_is_ready = Arc::new(AtomicBool::new(false)); let stop = Arc::new(AtomicBool::new(false));
let c_worker_should_exit = worker_should_exit.clone(); run_worker(scope, stop.clone(), url);
let c_worker_is_ready = worker_is_ready.clone(); let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap();
::std::thread::spawn(move || { client.open_default(path.as_str().to_owned()).unwrap();
let mut worker = init_worker(url); client.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
while !c_worker_should_exit.load(Ordering::Relaxed) { client.close().unwrap();
worker.poll();
c_worker_is_ready.store(true, Ordering::Relaxed); client.open_default(path.as_str().to_owned()).unwrap();
assert_eq!(client.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec());
stop.store(true, Ordering::Relaxed);
});
}
#[test]
fn can_read_empty() {
let url = "ipc:///tmp/parity-db-ipc-test-45.ipc";
let path = RandomTempPath::create_dir();
crossbeam::scope(move |scope| {
let stop = Arc::new(AtomicBool::new(false));
run_worker(scope, stop.clone(), url);
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap();
client.open_default(path.as_str().to_owned()).unwrap();
assert!(client.get("xxx".as_bytes()).unwrap().is_none());
stop.store(true, Ordering::Relaxed);
});
}
#[test]
fn can_commit_client_transaction() {
let url = "ipc:///tmp/parity-db-ipc-test-60.ipc";
let path = RandomTempPath::create_dir();
crossbeam::scope(move |scope| {
let stop = Arc::new(AtomicBool::new(false));
run_worker(scope, stop.clone(), url);
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap();
client.open_default(path.as_str().to_owned()).unwrap();
let transaction = DBTransaction::new();
transaction.put("xxx".as_bytes(), "1".as_bytes());
client.write(transaction).unwrap();
client.close().unwrap();
client.open_default(path.as_str().to_owned()).unwrap();
assert_eq!(client.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec());
stop.store(true, Ordering::Relaxed);
});
}
#[test]
fn key_write_read_ipc() {
let url = "ipc:///tmp/parity-db-ipc-test-70.ipc";
let path = RandomTempPath::create_dir();
crossbeam::scope(|scope| {
let stop = StopGuard::new();
run_worker(&scope, stop.share(), url);
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap();
client.open_default(path.as_str().to_owned()).unwrap();
let mut batch = Vec::new();
for _ in 0..100 {
batch.push((random_str(256).as_bytes().to_vec(), random_str(256).as_bytes().to_vec()));
batch.push((random_str(256).as_bytes().to_vec(), random_str(2048).as_bytes().to_vec()));
batch.push((random_str(2048).as_bytes().to_vec(), random_str(2048).as_bytes().to_vec()));
batch.push((random_str(2048).as_bytes().to_vec(), random_str(256).as_bytes().to_vec()));
}
for &(ref k, ref v) in batch.iter() {
client.put(k, v).unwrap();
}
client.close().unwrap();
client.open_default(path.as_str().to_owned()).unwrap();
for &(ref k, ref v) in batch.iter() {
assert_eq!(v, &client.get(k).unwrap().unwrap());
} }
}); });
while !worker_is_ready.load(Ordering::Relaxed) { }
let client = nanoipc::init_duplex_client::<DatabaseClient<_>>(url).unwrap();
client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap();
client.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
client.close().unwrap();
client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap();
assert_eq!(client.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec());
worker_should_exit.store(true, Ordering::Relaxed);
} }
} }

View File

@ -19,7 +19,71 @@ extern crate rocksdb;
extern crate ethcore_devtools as devtools; extern crate ethcore_devtools as devtools;
extern crate semver; extern crate semver;
extern crate ethcore_ipc_nano as nanoipc; extern crate ethcore_ipc_nano as nanoipc;
extern crate nanomsg;
extern crate crossbeam;
extern crate ethcore_util as util; extern crate ethcore_util as util;
pub mod database; pub mod database;
pub mod traits; pub mod traits;
pub use traits::{DatabaseService, DBTransaction, Error};
pub use database::{Database, DatabaseClient, DatabaseIterator};
use std::sync::Arc;
use std::sync::atomic::*;
use std::path::PathBuf;
pub type DatabaseNanoClient = DatabaseClient<::nanomsg::Socket>;
pub type DatabaseConnection = nanoipc::GuardedSocket<DatabaseNanoClient>;
#[derive(Debug)]
pub enum ServiceError {
Io(std::io::Error),
Socket(nanoipc::SocketError),
}
impl std::convert::From<std::io::Error> for ServiceError {
fn from(io_error: std::io::Error) -> ServiceError { ServiceError::Io(io_error) }
}
impl std::convert::From<nanoipc::SocketError> for ServiceError {
fn from(socket_error: nanoipc::SocketError) -> ServiceError { ServiceError::Socket(socket_error) }
}
pub fn blocks_service_url(db_path: &str) -> Result<String, std::io::Error> {
let mut path = PathBuf::from(db_path);
try!(::std::fs::create_dir_all(db_path));
path.push("blocks.ipc");
Ok(format!("ipc://{}", path.to_str().unwrap()))
}
pub fn extras_service_url(db_path: &str) -> Result<String, ::std::io::Error> {
let mut path = PathBuf::from(db_path);
try!(::std::fs::create_dir_all(db_path));
path.push("extras.ipc");
Ok(format!("ipc://{}", path.to_str().unwrap()))
}
pub fn blocks_client(db_path: &str) -> Result<DatabaseConnection, ServiceError> {
let url = try!(blocks_service_url(db_path));
let client = try!(nanoipc::init_client::<DatabaseClient<_>>(&url));
Ok(client)
}
pub fn extras_client(db_path: &str) -> Result<DatabaseConnection, ServiceError> {
let url = try!(extras_service_url(db_path));
let client = try!(nanoipc::init_client::<DatabaseClient<_>>(&url));
Ok(client)
}
// for tests
pub fn run_worker(scope: &crossbeam::Scope, stop: Arc<AtomicBool>, socket_path: &str) {
let socket_path = socket_path.to_owned();
scope.spawn(move || {
let mut worker = nanoipc::Worker::new(&Arc::new(Database::new()));
worker.add_reqrep(&socket_path).unwrap();
while !stop.load(Ordering::Relaxed) {
worker.poll();
}
});
}

View File

View File

@ -1,21 +1,38 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Ethcore database trait //! Ethcore database trait
use ipc::BinaryConvertable;
use std::mem; use std::mem;
use ipc::binary::BinaryConvertError; use ipc::binary::BinaryConvertError;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::cell::RefCell;
pub type TransactionHandle = u32;
pub type IteratorHandle = u32; pub type IteratorHandle = u32;
pub const DEFAULT_CACHE_LEN: usize = 12288;
#[derive(Binary)] #[derive(Binary)]
pub struct KeyValue { pub struct KeyValue {
pub key: Vec<u8>, pub key: Vec<u8>,
pub value: Vec<u8>, pub value: Vec<u8>,
} }
#[derive(Debug, Binary)] #[derive(Debug, Binary)]
pub enum Error { pub enum Error {
AlreadyOpen, AlreadyOpen,
IsClosed, IsClosed,
RocksDb(String), RocksDb(String),
@ -28,13 +45,36 @@ pub enum Error {
#[derive(Binary)] #[derive(Binary)]
pub struct DatabaseConfig { pub struct DatabaseConfig {
/// Optional prefix size in bytes. Allows lookup by partial key. /// Optional prefix size in bytes. Allows lookup by partial key.
pub prefix_size: Option<usize> pub prefix_size: Option<usize>,
/// write cache length
pub cache: usize,
} }
pub trait DatabaseService { impl Default for DatabaseConfig {
fn default() -> DatabaseConfig {
DatabaseConfig {
prefix_size: None,
cache: DEFAULT_CACHE_LEN,
}
}
}
impl DatabaseConfig {
fn with_prefix(prefix: usize) -> DatabaseConfig {
DatabaseConfig {
prefix_size: Some(prefix),
cache: DEFAULT_CACHE_LEN,
}
}
}
pub trait DatabaseService : Sized {
/// Opens database in the specified path /// Opens database in the specified path
fn open(&self, config: DatabaseConfig, path: String) -> Result<(), Error>; fn open(&self, config: DatabaseConfig, path: String) -> Result<(), Error>;
/// Opens database in the specified path with the default config
fn open_default(&self, path: String) -> Result<(), Error>;
/// Closes database /// Closes database
fn close(&self) -> Result<(), Error>; fn close(&self) -> Result<(), Error>;
@ -44,18 +84,6 @@ pub trait DatabaseService {
/// Delete value by key. /// Delete value by key.
fn delete(&self, key: &[u8]) -> Result<(), Error>; fn delete(&self, key: &[u8]) -> Result<(), Error>;
/// Insert a key-value pair in the transaction. Any existing value value will be overwritten.
fn transaction_put(&self, transaction: TransactionHandle, key: &[u8], value: &[u8]) -> Result<(), Error>;
/// Delete value by key using transaction
fn transaction_delete(&self, transaction: TransactionHandle, key: &[u8]) -> Result<(), Error>;
/// Commit transaction to database.
fn write(&self, tr: TransactionHandle) -> Result<(), Error>;
/// Initiate new transaction on database
fn new_transaction(&self) -> TransactionHandle;
/// Get value by key. /// Get value by key.
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error>; fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error>;
@ -70,4 +98,35 @@ pub trait DatabaseService {
/// Next key-value for the the given iterator /// Next key-value for the the given iterator
fn iter_next(&self, iterator: IteratorHandle) -> Option<KeyValue>; fn iter_next(&self, iterator: IteratorHandle) -> Option<KeyValue>;
/// Dispose iteration that is no longer needed
fn dispose_iter(&self, handle: IteratorHandle) -> Result<(), Error>;
/// Write client transaction
fn write(&self, transaction: DBTransaction) -> Result<(), Error>;
}
#[derive(Binary)]
pub struct DBTransaction {
pub writes: RefCell<Vec<KeyValue>>,
pub removes: RefCell<Vec<Vec<u8>>>,
}
impl DBTransaction {
pub fn new() -> DBTransaction {
DBTransaction {
writes: RefCell::new(Vec::new()),
removes: RefCell::new(Vec::new()),
}
}
pub fn put(&self, key: &[u8], value: &[u8]) {
let mut brw = self.writes.borrow_mut();
brw.push(KeyValue { key: key.to_vec(), value: value.to_vec() });
}
pub fn delete(&self, key: &[u8]) {
let mut brw = self.removes.borrow_mut();
brw.push(key.to_vec());
}
} }