diff --git a/db/src/database.rs b/db/src/database.rs index 4abc98467..d535f1f56 100644 --- a/db/src/database.rs +++ b/db/src/database.rs @@ -18,15 +18,13 @@ use traits::*; use rocksdb::{DB, Writable, WriteBatch, IteratorMode, DBIterator, - IndexType, Options, DBCompactionStyle, BlockBasedOptions, Direction}; -use std::collections::BTreeMap; -use std::sync::{RwLock}; +IndexType, Options, DBCompactionStyle, BlockBasedOptions, Direction}; +use std::sync::{RwLock, Arc}; use std::convert::From; use ipc::IpcConfig; -use std::ops::*; use std::mem; use ipc::binary::BinaryConvertError; -use std::collections::VecDeque; +use std::collections::{VecDeque, HashMap, BTreeMap}; impl From for Error { fn from(s: String) -> Error { @@ -34,20 +32,136 @@ impl From for Error { } } +enum WriteCacheEntry { + Remove, + Write(Vec), +} + +pub struct WriteCache { + entries: HashMap, WriteCacheEntry>, + preferred_len: usize, +} + +const FLUSH_BATCH_SIZE: usize = 4096; + +impl WriteCache { + fn new(cache_len: usize) -> WriteCache { + WriteCache { + entries: HashMap::new(), + preferred_len: cache_len, + } + } + + fn write(&mut self, key: Vec, val: Vec) { + self.entries.insert(key, WriteCacheEntry::Write(val)); + } + + fn remove(&mut self, key: Vec) { + self.entries.insert(key, WriteCacheEntry::Remove); + } + + fn get(&self, key: &Vec) -> Option> { + self.entries.get(key).and_then( + |vec_ref| match vec_ref { + &WriteCacheEntry::Write(ref val) => Some(val.clone()), + &WriteCacheEntry::Remove => None + }) + } + + /// WriteCache should be locked for this + fn flush(&mut self, db: &DB, amount: usize) -> Result<(), Error> { + let batch = WriteBatch::new(); + let mut removed_so_far = 0; + while removed_so_far < amount { + if self.entries.len() == 0 { break; } + let removed_key = { + let (key, cache_entry) = self.entries.iter().nth(0) + .expect("if entries.len == 0, we should have break in the loop, still we got here somehow"); + + match *cache_entry { + WriteCacheEntry::Write(ref val) => { + try!(batch.put(&key, val)); + }, + WriteCacheEntry::Remove => { + try!(batch.delete(&key)); + }, + } + key.clone() + }; + + self.entries.remove(&removed_key); + + removed_so_far = removed_so_far + 1; + } + if removed_so_far > 0 { + try!(db.write(batch)); + } + Ok(()) + } + + /// flushes until cache is empty + fn flush_all(&mut self, db: &DB) -> Result<(), Error> { + while !self.is_empty() { try!(self.flush(db, FLUSH_BATCH_SIZE)); } + Ok(()) + } + + fn is_empty(&self) -> bool { + self.entries.is_empty() + } + + fn try_shrink(&mut self, db: &DB) -> Result<(), Error> { + if self.entries.len() > self.preferred_len { + try!(self.flush(db, FLUSH_BATCH_SIZE)); + } + Ok(()) + } +} + pub struct Database { db: RwLock>, - transactions: RwLock>, + /// Iterators - dont't use between threads! iterators: RwLock>, + write_cache: RwLock, } +unsafe impl Send for Database {} +unsafe impl Sync for Database {} + impl Database { pub fn new() -> Database { Database { db: RwLock::new(None), - transactions: RwLock::new(BTreeMap::new()), iterators: RwLock::new(BTreeMap::new()), + write_cache: RwLock::new(WriteCache::new(DEFAULT_CACHE_LEN)), } } + + pub fn flush(&self) -> Result<(), Error> { + let mut cache_lock = self.write_cache.write().unwrap(); + let db_lock = self.db.read().unwrap(); + if db_lock.is_none() { return Ok(()); } + let db = db_lock.as_ref().unwrap(); + + try!(cache_lock.try_shrink(&db)); + Ok(()) + } + + pub fn flush_all(&self) -> Result<(), Error> { + let mut cache_lock = self.write_cache.write().unwrap(); + let db_lock = self.db.read().unwrap(); + if db_lock.is_none() { return Ok(()); } + let db = db_lock.as_ref().expect("we should have exited with Ok(()) on the previous step"); + + try!(cache_lock.flush_all(&db)); + Ok(()) + + } +} + +impl Drop for Database { + fn drop(&mut self) { + self.flush().unwrap(); + } } #[derive(Ipc)] @@ -72,51 +186,64 @@ impl DatabaseService for Database { Ok(()) } + /// Opens database in the specified path with the default config + fn open_default(&self, path: String) -> Result<(), Error> { + self.open(DatabaseConfig::default(), path) + } + fn close(&self) -> Result<(), Error> { + try!(self.flush_all()); + let mut db = self.db.write().unwrap(); if db.is_none() { return Err(Error::IsClosed); } - // TODO: wait for transactions to expire/close here? - if self.transactions.read().unwrap().len() > 0 { return Err(Error::UncommitedTransactions); } - *db = None; Ok(()) } fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { - let db_lock = self.db.read().unwrap(); - let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); - - try!(db.put(key, value)); + let mut cache_lock = self.write_cache.write().unwrap(); + cache_lock.write(key.to_vec(), value.to_vec()); Ok(()) } fn delete(&self, key: &[u8]) -> Result<(), Error> { - let db_lock = self.db.read().unwrap(); - let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); - - try!(db.delete(key)); + let mut cache_lock = self.write_cache.write().unwrap(); + cache_lock.remove(key.to_vec()); Ok(()) } - fn write(&self, handle: TransactionHandle) -> Result<(), Error> { - let db_lock = self.db.read().unwrap(); - let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); + fn write(&self, transaction: DBTransaction) -> Result<(), Error> { + let mut cache_lock = self.write_cache.write().unwrap(); - let mut transactions = self.transactions.write().unwrap(); - let batch = try!( - transactions.remove(&handle).ok_or(Error::TransactionUnknown) - ); - try!(db.write(batch)); + let mut writes = transaction.writes.borrow_mut(); + for kv in writes.drain(..) { + cache_lock.write(kv.key, kv.value); + } + + let mut removes = transaction.removes.borrow_mut(); + for k in removes.drain(..) { + cache_lock.remove(k); + } Ok(()) } fn get(&self, key: &[u8]) -> Result>, Error> { + { + let key_vec = key.to_vec(); + let cache_hit = self.write_cache.read().unwrap().get(&key_vec); + + if cache_hit.is_some() { + return Ok(Some(cache_hit.expect("cache_hit.is_some() = true, still there is none somehow here"))) + } + } let db_lock = self.db.read().unwrap(); let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); match try!(db.get(key)) { - Some(db_vec) => Ok(Some(db_vec.to_vec())), + Some(db_vec) => { + Ok(Some(db_vec.to_vec())) + }, None => Ok(None), } } @@ -166,37 +293,35 @@ impl DatabaseService for Database { }) } - fn transaction_put(&self, transaction: TransactionHandle, key: &[u8], value: &[u8]) -> Result<(), Error> - { - let mut transactions = self.transactions.write().unwrap(); - let batch = try!( - transactions.get_mut(&transaction).ok_or(Error::TransactionUnknown) - ); - try!(batch.put(&key, &value)); + fn dispose_iter(&self, handle: IteratorHandle) -> Result<(), Error> { + let mut iterators = self.iterators.write().unwrap(); + iterators.remove(&handle); Ok(()) } - - fn transaction_delete(&self, transaction: TransactionHandle, key: &[u8]) -> Result<(), Error> { - let mut transactions = self.transactions.write().unwrap(); - let batch = try!( - transactions.get_mut(&transaction).ok_or(Error::TransactionUnknown) - ); - try!(batch.delete(&key)); - Ok(()) - } - - fn new_transaction(&self) -> TransactionHandle { - let mut transactions = self.transactions.write().unwrap(); - let next_transaction = transactions.keys().last().unwrap_or(&0) + 1; - transactions.insert(next_transaction, WriteBatch::new()); - - next_transaction - } } // TODO : put proper at compile-time impl IpcConfig for Database {} +/// Database iterator +pub struct DatabaseIterator { + client: Arc>, + handle: IteratorHandle, +} + +impl Iterator for DatabaseIterator { + type Item = (Vec, Vec); + + fn next(&mut self) -> Option { + self.client.iter_next(self.handle).and_then(|kv| Some((kv.key, kv.value))) + } +} + +impl Drop for DatabaseIterator { + fn drop(&mut self) { + self.client.dispose_iter(self.handle).unwrap(); + } +} #[cfg(test)] mod test { @@ -215,7 +340,7 @@ mod test { fn can_be_open_empty() { let db = Database::new(); let path = RandomTempPath::create_dir(); - db.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap(); + db.open_default(path.as_str().to_owned()).unwrap(); assert!(db.is_empty().is_ok()); } @@ -224,9 +349,10 @@ mod test { fn can_store_key() { let db = Database::new(); let path = RandomTempPath::create_dir(); - db.open(DatabaseConfig { prefix_size: None }, path.as_str().to_owned()).unwrap(); + db.open_default(path.as_str().to_owned()).unwrap(); db.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); + db.flush_all().unwrap(); assert!(!db.is_empty().unwrap()); } @@ -234,15 +360,37 @@ mod test { fn can_retrieve() { let db = Database::new(); let path = RandomTempPath::create_dir(); - db.open(DatabaseConfig { prefix_size: None }, path.as_str().to_owned()).unwrap(); + db.open_default(path.as_str().to_owned()).unwrap(); db.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); db.close().unwrap(); - db.open(DatabaseConfig { prefix_size: None }, path.as_str().to_owned()).unwrap(); + db.open_default(path.as_str().to_owned()).unwrap(); assert_eq!(db.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec()); } } +#[cfg(test)] +mod write_cache_tests { + use super::Database; + use traits::*; + use devtools::*; + + #[test] + fn cache_write_flush() { + let db = Database::new(); + let path = RandomTempPath::create_dir(); + + db.open_default(path.as_str().to_owned()).unwrap(); + db.put("100500".as_bytes(), "1".as_bytes()).unwrap(); + db.delete("100500".as_bytes()).unwrap(); + db.flush_all().unwrap(); + + let val = db.get("100500".as_bytes()).unwrap(); + assert!(val.is_none()); + } + +} + #[cfg(test)] mod client_tests { use super::{DatabaseClient, Database}; @@ -251,6 +399,8 @@ mod client_tests { use nanoipc; use std::sync::Arc; use std::sync::atomic::{Ordering, AtomicBool}; + use crossbeam; + use run_worker; fn init_worker(addr: &str) -> nanoipc::Worker { let mut worker = nanoipc::Worker::::new(&Arc::new(Database::new())); @@ -268,7 +418,7 @@ mod client_tests { ::std::thread::spawn(move || { let mut worker = init_worker(url); - while !c_worker_should_exit.load(Ordering::Relaxed) { + while !c_worker_should_exit.load(Ordering::Relaxed) { worker.poll(); c_worker_is_ready.store(true, Ordering::Relaxed); } @@ -295,7 +445,7 @@ mod client_tests { ::std::thread::spawn(move || { let mut worker = init_worker(url); - while !c_worker_should_exit.load(Ordering::Relaxed) { + while !c_worker_should_exit.load(Ordering::Relaxed) { worker.poll(); c_worker_is_ready.store(true, Ordering::Relaxed); } @@ -304,7 +454,7 @@ mod client_tests { while !worker_is_ready.load(Ordering::Relaxed) { } let client = nanoipc::init_duplex_client::>(url).unwrap(); - client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap(); + client.open_default(path.as_str().to_owned()).unwrap(); assert!(client.is_empty().unwrap()); worker_should_exit.store(true, Ordering::Relaxed); } @@ -314,27 +464,16 @@ mod client_tests { let url = "ipc:///tmp/parity-db-ipc-test-30.ipc"; let path = RandomTempPath::create_dir(); - let worker_should_exit = Arc::new(AtomicBool::new(false)); - let worker_is_ready = Arc::new(AtomicBool::new(false)); - let c_worker_should_exit = worker_should_exit.clone(); - let c_worker_is_ready = worker_is_ready.clone(); + crossbeam::scope(move |scope| { + let stop = Arc::new(AtomicBool::new(false)); + run_worker(scope, stop.clone(), url); + let client = nanoipc::init_client::>(url).unwrap(); + client.open_default(path.as_str().to_owned()).unwrap(); + client.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); + client.close().unwrap(); - ::std::thread::spawn(move || { - let mut worker = init_worker(url); - while !c_worker_should_exit.load(Ordering::Relaxed) { - worker.poll(); - c_worker_is_ready.store(true, Ordering::Relaxed); - } + stop.store(true, Ordering::Relaxed); }); - - while !worker_is_ready.load(Ordering::Relaxed) { } - let client = nanoipc::init_duplex_client::>(url).unwrap(); - - client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap(); - client.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); - client.close().unwrap(); - - worker_should_exit.store(true, Ordering::Relaxed); } #[test] @@ -342,29 +481,93 @@ mod client_tests { let url = "ipc:///tmp/parity-db-ipc-test-40.ipc"; let path = RandomTempPath::create_dir(); - let worker_should_exit = Arc::new(AtomicBool::new(false)); - let worker_is_ready = Arc::new(AtomicBool::new(false)); - let c_worker_should_exit = worker_should_exit.clone(); - let c_worker_is_ready = worker_is_ready.clone(); + crossbeam::scope(move |scope| { + let stop = Arc::new(AtomicBool::new(false)); + run_worker(scope, stop.clone(), url); + let client = nanoipc::init_client::>(url).unwrap(); - ::std::thread::spawn(move || { - let mut worker = init_worker(url); - while !c_worker_should_exit.load(Ordering::Relaxed) { - worker.poll(); - c_worker_is_ready.store(true, Ordering::Relaxed); + client.open_default(path.as_str().to_owned()).unwrap(); + client.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); + client.close().unwrap(); + + client.open_default(path.as_str().to_owned()).unwrap(); + assert_eq!(client.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec()); + + stop.store(true, Ordering::Relaxed); + }); + } + + #[test] + fn can_read_empty() { + let url = "ipc:///tmp/parity-db-ipc-test-45.ipc"; + let path = RandomTempPath::create_dir(); + + crossbeam::scope(move |scope| { + let stop = Arc::new(AtomicBool::new(false)); + run_worker(scope, stop.clone(), url); + let client = nanoipc::init_client::>(url).unwrap(); + + client.open_default(path.as_str().to_owned()).unwrap(); + assert!(client.get("xxx".as_bytes()).unwrap().is_none()); + + stop.store(true, Ordering::Relaxed); + }); + } + + + #[test] + fn can_commit_client_transaction() { + let url = "ipc:///tmp/parity-db-ipc-test-60.ipc"; + let path = RandomTempPath::create_dir(); + + crossbeam::scope(move |scope| { + let stop = Arc::new(AtomicBool::new(false)); + run_worker(scope, stop.clone(), url); + let client = nanoipc::init_client::>(url).unwrap(); + client.open_default(path.as_str().to_owned()).unwrap(); + + let transaction = DBTransaction::new(); + transaction.put("xxx".as_bytes(), "1".as_bytes()); + client.write(transaction).unwrap(); + + client.close().unwrap(); + + client.open_default(path.as_str().to_owned()).unwrap(); + assert_eq!(client.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec()); + + stop.store(true, Ordering::Relaxed); + }); + } + + #[test] + fn key_write_read_ipc() { + let url = "ipc:///tmp/parity-db-ipc-test-70.ipc"; + let path = RandomTempPath::create_dir(); + + crossbeam::scope(|scope| { + let stop = StopGuard::new(); + run_worker(&scope, stop.share(), url); + + let client = nanoipc::init_client::>(url).unwrap(); + + client.open_default(path.as_str().to_owned()).unwrap(); + let mut batch = Vec::new(); + for _ in 0..100 { + batch.push((random_str(256).as_bytes().to_vec(), random_str(256).as_bytes().to_vec())); + batch.push((random_str(256).as_bytes().to_vec(), random_str(2048).as_bytes().to_vec())); + batch.push((random_str(2048).as_bytes().to_vec(), random_str(2048).as_bytes().to_vec())); + batch.push((random_str(2048).as_bytes().to_vec(), random_str(256).as_bytes().to_vec())); + } + + for &(ref k, ref v) in batch.iter() { + client.put(k, v).unwrap(); + } + client.close().unwrap(); + + client.open_default(path.as_str().to_owned()).unwrap(); + for &(ref k, ref v) in batch.iter() { + assert_eq!(v, &client.get(k).unwrap().unwrap()); } }); - - while !worker_is_ready.load(Ordering::Relaxed) { } - let client = nanoipc::init_duplex_client::>(url).unwrap(); - - client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap(); - client.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); - client.close().unwrap(); - - client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap(); - assert_eq!(client.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec()); - - worker_should_exit.store(true, Ordering::Relaxed); } } diff --git a/db/src/service.rs b/db/src/service.rs deleted file mode 100644 index e69de29bb..000000000