database & write que import
This commit is contained in:
parent
134f48cdfb
commit
1d5f407a29
@ -18,15 +18,13 @@
|
|||||||
|
|
||||||
use traits::*;
|
use traits::*;
|
||||||
use rocksdb::{DB, Writable, WriteBatch, IteratorMode, DBIterator,
|
use rocksdb::{DB, Writable, WriteBatch, IteratorMode, DBIterator,
|
||||||
IndexType, Options, DBCompactionStyle, BlockBasedOptions, Direction};
|
IndexType, Options, DBCompactionStyle, BlockBasedOptions, Direction};
|
||||||
use std::collections::BTreeMap;
|
use std::sync::{RwLock, Arc};
|
||||||
use std::sync::{RwLock};
|
|
||||||
use std::convert::From;
|
use std::convert::From;
|
||||||
use ipc::IpcConfig;
|
use ipc::IpcConfig;
|
||||||
use std::ops::*;
|
|
||||||
use std::mem;
|
use std::mem;
|
||||||
use ipc::binary::BinaryConvertError;
|
use ipc::binary::BinaryConvertError;
|
||||||
use std::collections::VecDeque;
|
use std::collections::{VecDeque, HashMap, BTreeMap};
|
||||||
|
|
||||||
impl From<String> for Error {
|
impl From<String> for Error {
|
||||||
fn from(s: String) -> Error {
|
fn from(s: String) -> Error {
|
||||||
@ -34,20 +32,136 @@ impl From<String> for Error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum WriteCacheEntry {
|
||||||
|
Remove,
|
||||||
|
Write(Vec<u8>),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct WriteCache {
|
||||||
|
entries: HashMap<Vec<u8>, WriteCacheEntry>,
|
||||||
|
preferred_len: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
const FLUSH_BATCH_SIZE: usize = 4096;
|
||||||
|
|
||||||
|
impl WriteCache {
|
||||||
|
fn new(cache_len: usize) -> WriteCache {
|
||||||
|
WriteCache {
|
||||||
|
entries: HashMap::new(),
|
||||||
|
preferred_len: cache_len,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write(&mut self, key: Vec<u8>, val: Vec<u8>) {
|
||||||
|
self.entries.insert(key, WriteCacheEntry::Write(val));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove(&mut self, key: Vec<u8>) {
|
||||||
|
self.entries.insert(key, WriteCacheEntry::Remove);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get(&self, key: &Vec<u8>) -> Option<Vec<u8>> {
|
||||||
|
self.entries.get(key).and_then(
|
||||||
|
|vec_ref| match vec_ref {
|
||||||
|
&WriteCacheEntry::Write(ref val) => Some(val.clone()),
|
||||||
|
&WriteCacheEntry::Remove => None
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// WriteCache should be locked for this
|
||||||
|
fn flush(&mut self, db: &DB, amount: usize) -> Result<(), Error> {
|
||||||
|
let batch = WriteBatch::new();
|
||||||
|
let mut removed_so_far = 0;
|
||||||
|
while removed_so_far < amount {
|
||||||
|
if self.entries.len() == 0 { break; }
|
||||||
|
let removed_key = {
|
||||||
|
let (key, cache_entry) = self.entries.iter().nth(0)
|
||||||
|
.expect("if entries.len == 0, we should have break in the loop, still we got here somehow");
|
||||||
|
|
||||||
|
match *cache_entry {
|
||||||
|
WriteCacheEntry::Write(ref val) => {
|
||||||
|
try!(batch.put(&key, val));
|
||||||
|
},
|
||||||
|
WriteCacheEntry::Remove => {
|
||||||
|
try!(batch.delete(&key));
|
||||||
|
},
|
||||||
|
}
|
||||||
|
key.clone()
|
||||||
|
};
|
||||||
|
|
||||||
|
self.entries.remove(&removed_key);
|
||||||
|
|
||||||
|
removed_so_far = removed_so_far + 1;
|
||||||
|
}
|
||||||
|
if removed_so_far > 0 {
|
||||||
|
try!(db.write(batch));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// flushes until cache is empty
|
||||||
|
fn flush_all(&mut self, db: &DB) -> Result<(), Error> {
|
||||||
|
while !self.is_empty() { try!(self.flush(db, FLUSH_BATCH_SIZE)); }
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_empty(&self) -> bool {
|
||||||
|
self.entries.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_shrink(&mut self, db: &DB) -> Result<(), Error> {
|
||||||
|
if self.entries.len() > self.preferred_len {
|
||||||
|
try!(self.flush(db, FLUSH_BATCH_SIZE));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct Database {
|
pub struct Database {
|
||||||
db: RwLock<Option<DB>>,
|
db: RwLock<Option<DB>>,
|
||||||
transactions: RwLock<BTreeMap<TransactionHandle, WriteBatch>>,
|
/// Iterators - dont't use between threads!
|
||||||
iterators: RwLock<BTreeMap<IteratorHandle, DBIterator>>,
|
iterators: RwLock<BTreeMap<IteratorHandle, DBIterator>>,
|
||||||
|
write_cache: RwLock<WriteCache>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
unsafe impl Send for Database {}
|
||||||
|
unsafe impl Sync for Database {}
|
||||||
|
|
||||||
impl Database {
|
impl Database {
|
||||||
pub fn new() -> Database {
|
pub fn new() -> Database {
|
||||||
Database {
|
Database {
|
||||||
db: RwLock::new(None),
|
db: RwLock::new(None),
|
||||||
transactions: RwLock::new(BTreeMap::new()),
|
|
||||||
iterators: RwLock::new(BTreeMap::new()),
|
iterators: RwLock::new(BTreeMap::new()),
|
||||||
|
write_cache: RwLock::new(WriteCache::new(DEFAULT_CACHE_LEN)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn flush(&self) -> Result<(), Error> {
|
||||||
|
let mut cache_lock = self.write_cache.write().unwrap();
|
||||||
|
let db_lock = self.db.read().unwrap();
|
||||||
|
if db_lock.is_none() { return Ok(()); }
|
||||||
|
let db = db_lock.as_ref().unwrap();
|
||||||
|
|
||||||
|
try!(cache_lock.try_shrink(&db));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn flush_all(&self) -> Result<(), Error> {
|
||||||
|
let mut cache_lock = self.write_cache.write().unwrap();
|
||||||
|
let db_lock = self.db.read().unwrap();
|
||||||
|
if db_lock.is_none() { return Ok(()); }
|
||||||
|
let db = db_lock.as_ref().expect("we should have exited with Ok(()) on the previous step");
|
||||||
|
|
||||||
|
try!(cache_lock.flush_all(&db));
|
||||||
|
Ok(())
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for Database {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.flush().unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Ipc)]
|
#[derive(Ipc)]
|
||||||
@ -72,51 +186,64 @@ impl DatabaseService for Database {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Opens database in the specified path with the default config
|
||||||
|
fn open_default(&self, path: String) -> Result<(), Error> {
|
||||||
|
self.open(DatabaseConfig::default(), path)
|
||||||
|
}
|
||||||
|
|
||||||
fn close(&self) -> Result<(), Error> {
|
fn close(&self) -> Result<(), Error> {
|
||||||
|
try!(self.flush_all());
|
||||||
|
|
||||||
let mut db = self.db.write().unwrap();
|
let mut db = self.db.write().unwrap();
|
||||||
if db.is_none() { return Err(Error::IsClosed); }
|
if db.is_none() { return Err(Error::IsClosed); }
|
||||||
|
|
||||||
// TODO: wait for transactions to expire/close here?
|
|
||||||
if self.transactions.read().unwrap().len() > 0 { return Err(Error::UncommitedTransactions); }
|
|
||||||
|
|
||||||
*db = None;
|
*db = None;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> {
|
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> {
|
||||||
let db_lock = self.db.read().unwrap();
|
let mut cache_lock = self.write_cache.write().unwrap();
|
||||||
let db = try!(db_lock.as_ref().ok_or(Error::IsClosed));
|
cache_lock.write(key.to_vec(), value.to_vec());
|
||||||
|
|
||||||
try!(db.put(key, value));
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn delete(&self, key: &[u8]) -> Result<(), Error> {
|
fn delete(&self, key: &[u8]) -> Result<(), Error> {
|
||||||
let db_lock = self.db.read().unwrap();
|
let mut cache_lock = self.write_cache.write().unwrap();
|
||||||
let db = try!(db_lock.as_ref().ok_or(Error::IsClosed));
|
cache_lock.remove(key.to_vec());
|
||||||
|
|
||||||
try!(db.delete(key));
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write(&self, handle: TransactionHandle) -> Result<(), Error> {
|
fn write(&self, transaction: DBTransaction) -> Result<(), Error> {
|
||||||
let db_lock = self.db.read().unwrap();
|
let mut cache_lock = self.write_cache.write().unwrap();
|
||||||
let db = try!(db_lock.as_ref().ok_or(Error::IsClosed));
|
|
||||||
|
|
||||||
let mut transactions = self.transactions.write().unwrap();
|
let mut writes = transaction.writes.borrow_mut();
|
||||||
let batch = try!(
|
for kv in writes.drain(..) {
|
||||||
transactions.remove(&handle).ok_or(Error::TransactionUnknown)
|
cache_lock.write(kv.key, kv.value);
|
||||||
);
|
}
|
||||||
try!(db.write(batch));
|
|
||||||
|
let mut removes = transaction.removes.borrow_mut();
|
||||||
|
for k in removes.drain(..) {
|
||||||
|
cache_lock.remove(k);
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
|
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
|
||||||
|
{
|
||||||
|
let key_vec = key.to_vec();
|
||||||
|
let cache_hit = self.write_cache.read().unwrap().get(&key_vec);
|
||||||
|
|
||||||
|
if cache_hit.is_some() {
|
||||||
|
return Ok(Some(cache_hit.expect("cache_hit.is_some() = true, still there is none somehow here")))
|
||||||
|
}
|
||||||
|
}
|
||||||
let db_lock = self.db.read().unwrap();
|
let db_lock = self.db.read().unwrap();
|
||||||
let db = try!(db_lock.as_ref().ok_or(Error::IsClosed));
|
let db = try!(db_lock.as_ref().ok_or(Error::IsClosed));
|
||||||
|
|
||||||
match try!(db.get(key)) {
|
match try!(db.get(key)) {
|
||||||
Some(db_vec) => Ok(Some(db_vec.to_vec())),
|
Some(db_vec) => {
|
||||||
|
Ok(Some(db_vec.to_vec()))
|
||||||
|
},
|
||||||
None => Ok(None),
|
None => Ok(None),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -166,37 +293,35 @@ impl DatabaseService for Database {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn transaction_put(&self, transaction: TransactionHandle, key: &[u8], value: &[u8]) -> Result<(), Error>
|
fn dispose_iter(&self, handle: IteratorHandle) -> Result<(), Error> {
|
||||||
{
|
let mut iterators = self.iterators.write().unwrap();
|
||||||
let mut transactions = self.transactions.write().unwrap();
|
iterators.remove(&handle);
|
||||||
let batch = try!(
|
|
||||||
transactions.get_mut(&transaction).ok_or(Error::TransactionUnknown)
|
|
||||||
);
|
|
||||||
try!(batch.put(&key, &value));
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn transaction_delete(&self, transaction: TransactionHandle, key: &[u8]) -> Result<(), Error> {
|
|
||||||
let mut transactions = self.transactions.write().unwrap();
|
|
||||||
let batch = try!(
|
|
||||||
transactions.get_mut(&transaction).ok_or(Error::TransactionUnknown)
|
|
||||||
);
|
|
||||||
try!(batch.delete(&key));
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn new_transaction(&self) -> TransactionHandle {
|
|
||||||
let mut transactions = self.transactions.write().unwrap();
|
|
||||||
let next_transaction = transactions.keys().last().unwrap_or(&0) + 1;
|
|
||||||
transactions.insert(next_transaction, WriteBatch::new());
|
|
||||||
|
|
||||||
next_transaction
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO : put proper at compile-time
|
// TODO : put proper at compile-time
|
||||||
impl IpcConfig for Database {}
|
impl IpcConfig for Database {}
|
||||||
|
|
||||||
|
/// Database iterator
|
||||||
|
pub struct DatabaseIterator {
|
||||||
|
client: Arc<DatabaseClient<::nanomsg::Socket>>,
|
||||||
|
handle: IteratorHandle,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Iterator for DatabaseIterator {
|
||||||
|
type Item = (Vec<u8>, Vec<u8>);
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
self.client.iter_next(self.handle).and_then(|kv| Some((kv.key, kv.value)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for DatabaseIterator {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.client.dispose_iter(self.handle).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
@ -215,7 +340,7 @@ mod test {
|
|||||||
fn can_be_open_empty() {
|
fn can_be_open_empty() {
|
||||||
let db = Database::new();
|
let db = Database::new();
|
||||||
let path = RandomTempPath::create_dir();
|
let path = RandomTempPath::create_dir();
|
||||||
db.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap();
|
db.open_default(path.as_str().to_owned()).unwrap();
|
||||||
|
|
||||||
assert!(db.is_empty().is_ok());
|
assert!(db.is_empty().is_ok());
|
||||||
}
|
}
|
||||||
@ -224,9 +349,10 @@ mod test {
|
|||||||
fn can_store_key() {
|
fn can_store_key() {
|
||||||
let db = Database::new();
|
let db = Database::new();
|
||||||
let path = RandomTempPath::create_dir();
|
let path = RandomTempPath::create_dir();
|
||||||
db.open(DatabaseConfig { prefix_size: None }, path.as_str().to_owned()).unwrap();
|
db.open_default(path.as_str().to_owned()).unwrap();
|
||||||
|
|
||||||
db.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
|
db.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
|
||||||
|
db.flush_all().unwrap();
|
||||||
assert!(!db.is_empty().unwrap());
|
assert!(!db.is_empty().unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -234,15 +360,37 @@ mod test {
|
|||||||
fn can_retrieve() {
|
fn can_retrieve() {
|
||||||
let db = Database::new();
|
let db = Database::new();
|
||||||
let path = RandomTempPath::create_dir();
|
let path = RandomTempPath::create_dir();
|
||||||
db.open(DatabaseConfig { prefix_size: None }, path.as_str().to_owned()).unwrap();
|
db.open_default(path.as_str().to_owned()).unwrap();
|
||||||
db.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
|
db.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
|
||||||
db.close().unwrap();
|
db.close().unwrap();
|
||||||
|
|
||||||
db.open(DatabaseConfig { prefix_size: None }, path.as_str().to_owned()).unwrap();
|
db.open_default(path.as_str().to_owned()).unwrap();
|
||||||
assert_eq!(db.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec());
|
assert_eq!(db.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod write_cache_tests {
|
||||||
|
use super::Database;
|
||||||
|
use traits::*;
|
||||||
|
use devtools::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cache_write_flush() {
|
||||||
|
let db = Database::new();
|
||||||
|
let path = RandomTempPath::create_dir();
|
||||||
|
|
||||||
|
db.open_default(path.as_str().to_owned()).unwrap();
|
||||||
|
db.put("100500".as_bytes(), "1".as_bytes()).unwrap();
|
||||||
|
db.delete("100500".as_bytes()).unwrap();
|
||||||
|
db.flush_all().unwrap();
|
||||||
|
|
||||||
|
let val = db.get("100500".as_bytes()).unwrap();
|
||||||
|
assert!(val.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod client_tests {
|
mod client_tests {
|
||||||
use super::{DatabaseClient, Database};
|
use super::{DatabaseClient, Database};
|
||||||
@ -251,6 +399,8 @@ mod client_tests {
|
|||||||
use nanoipc;
|
use nanoipc;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::{Ordering, AtomicBool};
|
use std::sync::atomic::{Ordering, AtomicBool};
|
||||||
|
use crossbeam;
|
||||||
|
use run_worker;
|
||||||
|
|
||||||
fn init_worker(addr: &str) -> nanoipc::Worker<Database> {
|
fn init_worker(addr: &str) -> nanoipc::Worker<Database> {
|
||||||
let mut worker = nanoipc::Worker::<Database>::new(&Arc::new(Database::new()));
|
let mut worker = nanoipc::Worker::<Database>::new(&Arc::new(Database::new()));
|
||||||
@ -268,7 +418,7 @@ mod client_tests {
|
|||||||
|
|
||||||
::std::thread::spawn(move || {
|
::std::thread::spawn(move || {
|
||||||
let mut worker = init_worker(url);
|
let mut worker = init_worker(url);
|
||||||
while !c_worker_should_exit.load(Ordering::Relaxed) {
|
while !c_worker_should_exit.load(Ordering::Relaxed) {
|
||||||
worker.poll();
|
worker.poll();
|
||||||
c_worker_is_ready.store(true, Ordering::Relaxed);
|
c_worker_is_ready.store(true, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
@ -295,7 +445,7 @@ mod client_tests {
|
|||||||
|
|
||||||
::std::thread::spawn(move || {
|
::std::thread::spawn(move || {
|
||||||
let mut worker = init_worker(url);
|
let mut worker = init_worker(url);
|
||||||
while !c_worker_should_exit.load(Ordering::Relaxed) {
|
while !c_worker_should_exit.load(Ordering::Relaxed) {
|
||||||
worker.poll();
|
worker.poll();
|
||||||
c_worker_is_ready.store(true, Ordering::Relaxed);
|
c_worker_is_ready.store(true, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
@ -304,7 +454,7 @@ mod client_tests {
|
|||||||
while !worker_is_ready.load(Ordering::Relaxed) { }
|
while !worker_is_ready.load(Ordering::Relaxed) { }
|
||||||
let client = nanoipc::init_duplex_client::<DatabaseClient<_>>(url).unwrap();
|
let client = nanoipc::init_duplex_client::<DatabaseClient<_>>(url).unwrap();
|
||||||
|
|
||||||
client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap();
|
client.open_default(path.as_str().to_owned()).unwrap();
|
||||||
assert!(client.is_empty().unwrap());
|
assert!(client.is_empty().unwrap());
|
||||||
worker_should_exit.store(true, Ordering::Relaxed);
|
worker_should_exit.store(true, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
@ -314,27 +464,16 @@ mod client_tests {
|
|||||||
let url = "ipc:///tmp/parity-db-ipc-test-30.ipc";
|
let url = "ipc:///tmp/parity-db-ipc-test-30.ipc";
|
||||||
let path = RandomTempPath::create_dir();
|
let path = RandomTempPath::create_dir();
|
||||||
|
|
||||||
let worker_should_exit = Arc::new(AtomicBool::new(false));
|
crossbeam::scope(move |scope| {
|
||||||
let worker_is_ready = Arc::new(AtomicBool::new(false));
|
let stop = Arc::new(AtomicBool::new(false));
|
||||||
let c_worker_should_exit = worker_should_exit.clone();
|
run_worker(scope, stop.clone(), url);
|
||||||
let c_worker_is_ready = worker_is_ready.clone();
|
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap();
|
||||||
|
client.open_default(path.as_str().to_owned()).unwrap();
|
||||||
|
client.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
|
||||||
|
client.close().unwrap();
|
||||||
|
|
||||||
::std::thread::spawn(move || {
|
stop.store(true, Ordering::Relaxed);
|
||||||
let mut worker = init_worker(url);
|
|
||||||
while !c_worker_should_exit.load(Ordering::Relaxed) {
|
|
||||||
worker.poll();
|
|
||||||
c_worker_is_ready.store(true, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
|
|
||||||
while !worker_is_ready.load(Ordering::Relaxed) { }
|
|
||||||
let client = nanoipc::init_duplex_client::<DatabaseClient<_>>(url).unwrap();
|
|
||||||
|
|
||||||
client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap();
|
|
||||||
client.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
|
|
||||||
client.close().unwrap();
|
|
||||||
|
|
||||||
worker_should_exit.store(true, Ordering::Relaxed);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -342,29 +481,93 @@ mod client_tests {
|
|||||||
let url = "ipc:///tmp/parity-db-ipc-test-40.ipc";
|
let url = "ipc:///tmp/parity-db-ipc-test-40.ipc";
|
||||||
let path = RandomTempPath::create_dir();
|
let path = RandomTempPath::create_dir();
|
||||||
|
|
||||||
let worker_should_exit = Arc::new(AtomicBool::new(false));
|
crossbeam::scope(move |scope| {
|
||||||
let worker_is_ready = Arc::new(AtomicBool::new(false));
|
let stop = Arc::new(AtomicBool::new(false));
|
||||||
let c_worker_should_exit = worker_should_exit.clone();
|
run_worker(scope, stop.clone(), url);
|
||||||
let c_worker_is_ready = worker_is_ready.clone();
|
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap();
|
||||||
|
|
||||||
::std::thread::spawn(move || {
|
client.open_default(path.as_str().to_owned()).unwrap();
|
||||||
let mut worker = init_worker(url);
|
client.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
|
||||||
while !c_worker_should_exit.load(Ordering::Relaxed) {
|
client.close().unwrap();
|
||||||
worker.poll();
|
|
||||||
c_worker_is_ready.store(true, Ordering::Relaxed);
|
client.open_default(path.as_str().to_owned()).unwrap();
|
||||||
|
assert_eq!(client.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec());
|
||||||
|
|
||||||
|
stop.store(true, Ordering::Relaxed);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn can_read_empty() {
|
||||||
|
let url = "ipc:///tmp/parity-db-ipc-test-45.ipc";
|
||||||
|
let path = RandomTempPath::create_dir();
|
||||||
|
|
||||||
|
crossbeam::scope(move |scope| {
|
||||||
|
let stop = Arc::new(AtomicBool::new(false));
|
||||||
|
run_worker(scope, stop.clone(), url);
|
||||||
|
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap();
|
||||||
|
|
||||||
|
client.open_default(path.as_str().to_owned()).unwrap();
|
||||||
|
assert!(client.get("xxx".as_bytes()).unwrap().is_none());
|
||||||
|
|
||||||
|
stop.store(true, Ordering::Relaxed);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn can_commit_client_transaction() {
|
||||||
|
let url = "ipc:///tmp/parity-db-ipc-test-60.ipc";
|
||||||
|
let path = RandomTempPath::create_dir();
|
||||||
|
|
||||||
|
crossbeam::scope(move |scope| {
|
||||||
|
let stop = Arc::new(AtomicBool::new(false));
|
||||||
|
run_worker(scope, stop.clone(), url);
|
||||||
|
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap();
|
||||||
|
client.open_default(path.as_str().to_owned()).unwrap();
|
||||||
|
|
||||||
|
let transaction = DBTransaction::new();
|
||||||
|
transaction.put("xxx".as_bytes(), "1".as_bytes());
|
||||||
|
client.write(transaction).unwrap();
|
||||||
|
|
||||||
|
client.close().unwrap();
|
||||||
|
|
||||||
|
client.open_default(path.as_str().to_owned()).unwrap();
|
||||||
|
assert_eq!(client.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec());
|
||||||
|
|
||||||
|
stop.store(true, Ordering::Relaxed);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn key_write_read_ipc() {
|
||||||
|
let url = "ipc:///tmp/parity-db-ipc-test-70.ipc";
|
||||||
|
let path = RandomTempPath::create_dir();
|
||||||
|
|
||||||
|
crossbeam::scope(|scope| {
|
||||||
|
let stop = StopGuard::new();
|
||||||
|
run_worker(&scope, stop.share(), url);
|
||||||
|
|
||||||
|
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap();
|
||||||
|
|
||||||
|
client.open_default(path.as_str().to_owned()).unwrap();
|
||||||
|
let mut batch = Vec::new();
|
||||||
|
for _ in 0..100 {
|
||||||
|
batch.push((random_str(256).as_bytes().to_vec(), random_str(256).as_bytes().to_vec()));
|
||||||
|
batch.push((random_str(256).as_bytes().to_vec(), random_str(2048).as_bytes().to_vec()));
|
||||||
|
batch.push((random_str(2048).as_bytes().to_vec(), random_str(2048).as_bytes().to_vec()));
|
||||||
|
batch.push((random_str(2048).as_bytes().to_vec(), random_str(256).as_bytes().to_vec()));
|
||||||
|
}
|
||||||
|
|
||||||
|
for &(ref k, ref v) in batch.iter() {
|
||||||
|
client.put(k, v).unwrap();
|
||||||
|
}
|
||||||
|
client.close().unwrap();
|
||||||
|
|
||||||
|
client.open_default(path.as_str().to_owned()).unwrap();
|
||||||
|
for &(ref k, ref v) in batch.iter() {
|
||||||
|
assert_eq!(v, &client.get(k).unwrap().unwrap());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
while !worker_is_ready.load(Ordering::Relaxed) { }
|
|
||||||
let client = nanoipc::init_duplex_client::<DatabaseClient<_>>(url).unwrap();
|
|
||||||
|
|
||||||
client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap();
|
|
||||||
client.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
|
|
||||||
client.close().unwrap();
|
|
||||||
|
|
||||||
client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap();
|
|
||||||
assert_eq!(client.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec());
|
|
||||||
|
|
||||||
worker_should_exit.store(true, Ordering::Relaxed);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user