remove internal locking from DBTransaction (#2003)
This commit is contained in:
committed by
Arkadiy Paronyan
parent
b18407b9e3
commit
2aef81cf90
@@ -156,7 +156,7 @@ impl JournalDB for ArchiveDB {
|
||||
self.latest_era.is_none()
|
||||
}
|
||||
|
||||
fn commit(&mut self, batch: &DBTransaction, now: u64, _id: &H256, _end: Option<(u64, H256)>) -> Result<u32, UtilError> {
|
||||
fn commit(&mut self, batch: &mut DBTransaction, now: u64, _id: &H256, _end: Option<(u64, H256)>) -> Result<u32, UtilError> {
|
||||
let mut inserts = 0usize;
|
||||
let mut deletes = 0usize;
|
||||
|
||||
@@ -185,7 +185,7 @@ impl JournalDB for ArchiveDB {
|
||||
Ok((inserts + deletes) as u32)
|
||||
}
|
||||
|
||||
fn inject(&mut self, batch: &DBTransaction) -> Result<u32, UtilError> {
|
||||
fn inject(&mut self, batch: &mut DBTransaction) -> Result<u32, UtilError> {
|
||||
let mut inserts = 0usize;
|
||||
let mut deletes = 0usize;
|
||||
|
||||
|
||||
@@ -101,13 +101,13 @@ impl EarlyMergeDB {
|
||||
}
|
||||
|
||||
// The next three are valid only as long as there is an insert operation of `key` in the journal.
|
||||
fn set_already_in(batch: &DBTransaction, col: Option<u32>, key: &H256) { batch.put(col, &Self::morph_key(key, 0), &[1u8]); }
|
||||
fn reset_already_in(batch: &DBTransaction, col: Option<u32>, key: &H256) { batch.delete(col, &Self::morph_key(key, 0)); }
|
||||
fn set_already_in(batch: &mut DBTransaction, col: Option<u32>, key: &H256) { batch.put(col, &Self::morph_key(key, 0), &[1u8]); }
|
||||
fn reset_already_in(batch: &mut DBTransaction, col: Option<u32>, key: &H256) { batch.delete(col, &Self::morph_key(key, 0)); }
|
||||
fn is_already_in(backing: &Database, col: Option<u32>, key: &H256) -> bool {
|
||||
backing.get(col, &Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?").is_some()
|
||||
}
|
||||
|
||||
fn insert_keys(inserts: &[(H256, Bytes)], backing: &Database, col: Option<u32>, refs: &mut HashMap<H256, RefInfo>, batch: &DBTransaction, trace: bool) {
|
||||
fn insert_keys(inserts: &[(H256, Bytes)], backing: &Database, col: Option<u32>, refs: &mut HashMap<H256, RefInfo>, batch: &mut DBTransaction, trace: bool) {
|
||||
for &(ref h, ref d) in inserts {
|
||||
if let Some(c) = refs.get_mut(h) {
|
||||
// already counting. increment.
|
||||
@@ -156,7 +156,7 @@ impl EarlyMergeDB {
|
||||
trace!(target: "jdb.fine", "replay_keys: (end) refs={:?}", refs);
|
||||
}
|
||||
|
||||
fn remove_keys(deletes: &[H256], refs: &mut HashMap<H256, RefInfo>, batch: &DBTransaction, col: Option<u32>, from: RemoveFrom, trace: bool) {
|
||||
fn remove_keys(deletes: &[H256], refs: &mut HashMap<H256, RefInfo>, batch: &mut DBTransaction, col: Option<u32>, from: RemoveFrom, trace: bool) {
|
||||
// with a remove on {queue_refs: 1, in_archive: true}, we have two options:
|
||||
// - convert to {queue_refs: 1, in_archive: false} (i.e. remove it from the conceptual archive)
|
||||
// - convert to {queue_refs: 0, in_archive: true} (i.e. remove it from the conceptual queue)
|
||||
@@ -337,7 +337,7 @@ impl JournalDB for EarlyMergeDB {
|
||||
}
|
||||
|
||||
#[cfg_attr(feature="dev", allow(cyclomatic_complexity))]
|
||||
fn commit(&mut self, batch: &DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
|
||||
fn commit(&mut self, batch: &mut DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
|
||||
// journal format:
|
||||
// [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ]
|
||||
// [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ]
|
||||
@@ -514,7 +514,7 @@ impl JournalDB for EarlyMergeDB {
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
fn inject(&mut self, batch: &DBTransaction) -> Result<u32, UtilError> {
|
||||
fn inject(&mut self, batch: &mut DBTransaction) -> Result<u32, UtilError> {
|
||||
let mut ops = 0;
|
||||
for (key, (value, rc)) in self.overlay.drain() {
|
||||
if rc != 0 { ops += 1 }
|
||||
|
||||
@@ -222,7 +222,7 @@ impl JournalDB for OverlayRecentDB {
|
||||
.or_else(|| self.backing.get_by_prefix(self.column, &key[0..DB_PREFIX_LEN]).map(|b| b.to_vec()))
|
||||
}
|
||||
|
||||
fn commit(&mut self, batch: &DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
|
||||
fn commit(&mut self, batch: &mut DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
|
||||
// record new commit's details.
|
||||
trace!("commit: #{} ({}), end era: {:?}", now, id, end);
|
||||
let mut journal_overlay = self.journal_overlay.write();
|
||||
@@ -314,7 +314,7 @@ impl JournalDB for OverlayRecentDB {
|
||||
self.journal_overlay.write().pending_overlay.clear();
|
||||
}
|
||||
|
||||
fn inject(&mut self, batch: &DBTransaction) -> Result<u32, UtilError> {
|
||||
fn inject(&mut self, batch: &mut DBTransaction) -> Result<u32, UtilError> {
|
||||
let mut ops = 0;
|
||||
for (key, (value, rc)) in self.transaction_overlay.drain() {
|
||||
if rc != 0 { ops += 1 }
|
||||
|
||||
@@ -109,7 +109,7 @@ impl JournalDB for RefCountedDB {
|
||||
self.backing.get_by_prefix(self.column, &id[0..DB_PREFIX_LEN]).map(|b| b.to_vec())
|
||||
}
|
||||
|
||||
fn commit(&mut self, batch: &DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
|
||||
fn commit(&mut self, batch: &mut DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
|
||||
// journal format:
|
||||
// [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ]
|
||||
// [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ]
|
||||
@@ -182,11 +182,11 @@ impl JournalDB for RefCountedDB {
|
||||
}
|
||||
}
|
||||
|
||||
let r = try!(self.forward.commit_to_batch(&batch));
|
||||
let r = try!(self.forward.commit_to_batch(batch));
|
||||
Ok(r)
|
||||
}
|
||||
|
||||
fn inject(&mut self, batch: &DBTransaction) -> Result<u32, UtilError> {
|
||||
fn inject(&mut self, batch: &mut DBTransaction) -> Result<u32, UtilError> {
|
||||
self.inserts.clear();
|
||||
for remove in self.removes.drain(..) {
|
||||
self.forward.remove(&remove);
|
||||
|
||||
@@ -37,7 +37,7 @@ pub trait JournalDB: HashDB {
|
||||
|
||||
/// Commit all recent insert operations and canonical historical commits' removals from the
|
||||
/// old era to the backing database, reverting any non-canonical historical commit's inserts.
|
||||
fn commit(&mut self, batch: &DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError>;
|
||||
fn commit(&mut self, batch: &mut DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError>;
|
||||
|
||||
/// Commit all queued insert and delete operations without affecting any journalling -- this requires that all insertions
|
||||
/// and deletions are indeed canonical and will likely lead to an invalid database if that assumption is violated.
|
||||
@@ -46,7 +46,7 @@ pub trait JournalDB: HashDB {
|
||||
/// by any previous `commit` operations. Essentially, this means that `inject` can be used
|
||||
/// either to restore a state to a fresh database, or to insert data which may only be journalled
|
||||
/// from this point onwards.
|
||||
fn inject(&mut self, batch: &DBTransaction) -> Result<u32, UtilError>;
|
||||
fn inject(&mut self, batch: &mut DBTransaction) -> Result<u32, UtilError>;
|
||||
|
||||
/// State data query
|
||||
fn state(&self, _id: &H256) -> Option<Bytes>;
|
||||
@@ -67,8 +67,8 @@ pub trait JournalDB: HashDB {
|
||||
/// Commit all changes in a single batch
|
||||
#[cfg(test)]
|
||||
fn commit_batch(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
|
||||
let batch = self.backing().transaction();
|
||||
let res = try!(self.commit(&batch, now, id, end));
|
||||
let mut batch = self.backing().transaction();
|
||||
let res = try!(self.commit(&mut batch, now, id, end));
|
||||
let result = self.backing().write(batch).map(|_| res).map_err(Into::into);
|
||||
self.flush();
|
||||
result
|
||||
@@ -77,8 +77,8 @@ pub trait JournalDB: HashDB {
|
||||
/// Inject all changes in a single batch.
|
||||
#[cfg(test)]
|
||||
fn inject_batch(&mut self) -> Result<u32, UtilError> {
|
||||
let batch = self.backing().transaction();
|
||||
let res = try!(self.inject(&batch));
|
||||
let mut batch = self.backing().transaction();
|
||||
let res = try!(self.inject(&mut batch));
|
||||
self.backing().write(batch).map(|_| res).map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ const DB_BACKGROUND_COMPACTIONS: i32 = 2;
|
||||
|
||||
/// Write transaction. Batches a sequence of put/delete operations for efficiency.
|
||||
pub struct DBTransaction {
|
||||
ops: Mutex<Vec<DBOp>>,
|
||||
ops: Vec<DBOp>,
|
||||
}
|
||||
|
||||
enum DBOp {
|
||||
@@ -52,15 +52,15 @@ impl DBTransaction {
|
||||
/// Create new transaction.
|
||||
pub fn new(_db: &Database) -> DBTransaction {
|
||||
DBTransaction {
|
||||
ops: Mutex::new(Vec::with_capacity(256)),
|
||||
ops: Vec::with_capacity(256),
|
||||
}
|
||||
}
|
||||
|
||||
/// Insert a key-value pair in the transaction. Any existing value value will be overwritten upon write.
|
||||
pub fn put(&self, col: Option<u32>, key: &[u8], value: &[u8]) {
|
||||
pub fn put(&mut self, col: Option<u32>, key: &[u8], value: &[u8]) {
|
||||
let mut ekey = ElasticArray32::new();
|
||||
ekey.append_slice(key);
|
||||
self.ops.lock().push(DBOp::Insert {
|
||||
self.ops.push(DBOp::Insert {
|
||||
col: col,
|
||||
key: ekey,
|
||||
value: value.to_vec(),
|
||||
@@ -68,10 +68,10 @@ impl DBTransaction {
|
||||
}
|
||||
|
||||
/// Insert a key-value pair in the transaction. Any existing value value will be overwritten upon write.
|
||||
pub fn put_vec(&self, col: Option<u32>, key: &[u8], value: Bytes) {
|
||||
pub fn put_vec(&mut self, col: Option<u32>, key: &[u8], value: Bytes) {
|
||||
let mut ekey = ElasticArray32::new();
|
||||
ekey.append_slice(key);
|
||||
self.ops.lock().push(DBOp::Insert {
|
||||
self.ops.push(DBOp::Insert {
|
||||
col: col,
|
||||
key: ekey,
|
||||
value: value,
|
||||
@@ -79,11 +79,11 @@ impl DBTransaction {
|
||||
}
|
||||
|
||||
/// Insert a key-value pair in the transaction. Any existing value value will be overwritten upon write.
|
||||
/// Value will be RLP-compressed on flush
|
||||
pub fn put_compressed(&self, col: Option<u32>, key: &[u8], value: Bytes) {
|
||||
/// Value will be RLP-compressed on flush
|
||||
pub fn put_compressed(&mut self, col: Option<u32>, key: &[u8], value: Bytes) {
|
||||
let mut ekey = ElasticArray32::new();
|
||||
ekey.append_slice(key);
|
||||
self.ops.lock().push(DBOp::InsertCompressed {
|
||||
self.ops.push(DBOp::InsertCompressed {
|
||||
col: col,
|
||||
key: ekey,
|
||||
value: value,
|
||||
@@ -91,10 +91,10 @@ impl DBTransaction {
|
||||
}
|
||||
|
||||
/// Delete value by key.
|
||||
pub fn delete(&self, col: Option<u32>, key: &[u8]) {
|
||||
pub fn delete(&mut self, col: Option<u32>, key: &[u8]) {
|
||||
let mut ekey = ElasticArray32::new();
|
||||
ekey.append_slice(key);
|
||||
self.ops.lock().push(DBOp::Delete {
|
||||
self.ops.push(DBOp::Delete {
|
||||
col: col,
|
||||
key: ekey,
|
||||
});
|
||||
@@ -299,7 +299,7 @@ impl Database {
|
||||
/// Commit transaction to database.
|
||||
pub fn write_buffered(&self, tr: DBTransaction) {
|
||||
let mut overlay = self.overlay.write();
|
||||
let ops = tr.ops.into_inner();
|
||||
let ops = tr.ops;
|
||||
for op in ops {
|
||||
match op {
|
||||
DBOp::Insert { col, key, value } => {
|
||||
@@ -359,7 +359,7 @@ impl Database {
|
||||
/// Commit transaction to database.
|
||||
pub fn write(&self, tr: DBTransaction) -> Result<(), String> {
|
||||
let batch = WriteBatch::new();
|
||||
let ops = tr.ops.into_inner();
|
||||
let ops = tr.ops;
|
||||
for op in ops {
|
||||
match op {
|
||||
DBOp::Insert { col, key, value } => {
|
||||
@@ -425,7 +425,7 @@ mod tests {
|
||||
let key2 = H256::from_str("03c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap();
|
||||
let key3 = H256::from_str("01c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap();
|
||||
|
||||
let batch = db.transaction();
|
||||
let mut batch = db.transaction();
|
||||
batch.put(None, &key1, b"cat");
|
||||
batch.put(None, &key2, b"dog");
|
||||
db.write(batch).unwrap();
|
||||
@@ -439,17 +439,17 @@ mod tests {
|
||||
assert_eq!(&*contents[1].0, &*key2);
|
||||
assert_eq!(&*contents[1].1, b"dog");
|
||||
|
||||
let batch = db.transaction();
|
||||
let mut batch = db.transaction();
|
||||
batch.delete(None, &key1);
|
||||
db.write(batch).unwrap();
|
||||
|
||||
assert!(db.get(None, &key1).unwrap().is_none());
|
||||
|
||||
let batch = db.transaction();
|
||||
let mut batch = db.transaction();
|
||||
batch.put(None, &key1, b"cat");
|
||||
db.write(batch).unwrap();
|
||||
|
||||
let transaction = db.transaction();
|
||||
let mut transaction = db.transaction();
|
||||
transaction.put(None, &key3, b"elephant");
|
||||
transaction.delete(None, &key1);
|
||||
db.write(transaction).unwrap();
|
||||
@@ -459,7 +459,7 @@ mod tests {
|
||||
assert_eq!(&*db.get_by_prefix(None, &key3).unwrap(), b"elephant");
|
||||
assert_eq!(&*db.get_by_prefix(None, &key2).unwrap(), b"dog");
|
||||
|
||||
let transaction = db.transaction();
|
||||
let mut transaction = db.transaction();
|
||||
transaction.put(None, &key1, b"horse");
|
||||
transaction.delete(None, &key3);
|
||||
db.write_buffered(transaction);
|
||||
|
||||
@@ -72,7 +72,7 @@ impl Batch {
|
||||
pub fn commit(&mut self, dest: &mut Database) -> Result<(), Error> {
|
||||
if self.inner.is_empty() { return Ok(()) }
|
||||
|
||||
let transaction = DBTransaction::new(dest);
|
||||
let mut transaction = DBTransaction::new(dest);
|
||||
|
||||
for keypair in &self.inner {
|
||||
transaction.put(self.column, &keypair.0, &keypair.1);
|
||||
|
||||
@@ -35,7 +35,7 @@ fn db_path(path: &Path) -> PathBuf {
|
||||
fn make_db(path: &Path, pairs: BTreeMap<Vec<u8>, Vec<u8>>) {
|
||||
let db = Database::open_default(path.to_str().unwrap()).expect("failed to open temp database");
|
||||
{
|
||||
let transaction = db.transaction();
|
||||
let mut transaction = db.transaction();
|
||||
for (k, v) in pairs {
|
||||
transaction.put(None, &k, &v);
|
||||
}
|
||||
|
||||
@@ -58,13 +58,13 @@ impl OverlayDB {
|
||||
/// Commit all operations in a single batch.
|
||||
#[cfg(test)]
|
||||
pub fn commit(&mut self) -> Result<u32, UtilError> {
|
||||
let batch = self.backing.transaction();
|
||||
let res = try!(self.commit_to_batch(&batch));
|
||||
let mut batch = self.backing.transaction();
|
||||
let res = try!(self.commit_to_batch(&mut batch));
|
||||
self.backing.write(batch).map(|_| res).map_err(|e| e.into())
|
||||
}
|
||||
|
||||
/// Commit all operations to given batch.
|
||||
pub fn commit_to_batch(&mut self, batch: &DBTransaction) -> Result<u32, UtilError> {
|
||||
pub fn commit_to_batch(&mut self, batch: &mut DBTransaction) -> Result<u32, UtilError> {
|
||||
let mut ret = 0u32;
|
||||
let mut deletes = 0usize;
|
||||
for i in self.overlay.drain().into_iter() {
|
||||
@@ -111,7 +111,7 @@ impl OverlayDB {
|
||||
}
|
||||
|
||||
/// Put the refs and value of the given key, possibly deleting it from the db.
|
||||
fn put_payload_in_batch(&self, batch: &DBTransaction, key: &H256, payload: (Bytes, u32)) -> bool {
|
||||
fn put_payload_in_batch(&self, batch: &mut DBTransaction, key: &H256, payload: (Bytes, u32)) -> bool {
|
||||
if payload.1 > 0 {
|
||||
let mut s = RlpStream::new_list(2);
|
||||
s.append(&payload.1);
|
||||
@@ -195,8 +195,8 @@ impl HashDB for OverlayDB {
|
||||
fn overlaydb_revert() {
|
||||
let mut m = OverlayDB::new_temp();
|
||||
let foo = m.insert(b"foo"); // insert foo.
|
||||
let batch = m.backing.transaction();
|
||||
m.commit_to_batch(&batch).unwrap(); // commit - new operations begin here...
|
||||
let mut batch = m.backing.transaction();
|
||||
m.commit_to_batch(&mut batch).unwrap(); // commit - new operations begin here...
|
||||
m.backing.write(batch).unwrap();
|
||||
let bar = m.insert(b"bar"); // insert bar.
|
||||
m.remove(&foo); // remove foo.
|
||||
@@ -300,7 +300,7 @@ fn playpen() {
|
||||
use std::fs;
|
||||
{
|
||||
let db = Database::open_default("/tmp/test").unwrap();
|
||||
let batch = db.transaction();
|
||||
let mut batch = db.transaction();
|
||||
batch.put(None, b"test", b"test2");
|
||||
db.write(batch).unwrap();
|
||||
match db.get(None, b"test") {
|
||||
@@ -308,7 +308,7 @@ fn playpen() {
|
||||
Ok(None) => println!("No value for that key"),
|
||||
Err(..) => println!("Gah"),
|
||||
}
|
||||
let batch = db.transaction();
|
||||
let mut batch = db.transaction();
|
||||
batch.delete(None, b"test");
|
||||
db.write(batch).unwrap();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user