use mutex in dbtransaction (#1843)

This commit is contained in:
Robert Habermeier 2016-08-04 23:54:26 +02:00 committed by Arkadiy Paronyan
parent aa59aa439d
commit 8702a29e6f

View File

@ -28,7 +28,7 @@ const DB_BACKGROUND_COMPACTIONS: i32 = 2;
/// Write transaction. Batches a sequence of put/delete operations for efficiency. /// Write transaction. Batches a sequence of put/delete operations for efficiency.
pub struct DBTransaction { pub struct DBTransaction {
ops: RwLock<Vec<DBOp>>, ops: Mutex<Vec<DBOp>>,
} }
enum DBOp { enum DBOp {
@ -52,7 +52,7 @@ impl DBTransaction {
/// Create new transaction. /// Create new transaction.
pub fn new(_db: &Database) -> DBTransaction { pub fn new(_db: &Database) -> DBTransaction {
DBTransaction { DBTransaction {
ops: RwLock::new(Vec::with_capacity(256)), ops: Mutex::new(Vec::with_capacity(256)),
} }
} }
@ -60,7 +60,7 @@ impl DBTransaction {
pub fn put(&self, col: Option<u32>, key: &[u8], value: &[u8]) -> Result<(), String> { pub fn put(&self, col: Option<u32>, key: &[u8], value: &[u8]) -> Result<(), String> {
let mut ekey = ElasticArray32::new(); let mut ekey = ElasticArray32::new();
ekey.append_slice(key); ekey.append_slice(key);
self.ops.write().push(DBOp::Insert { self.ops.lock().push(DBOp::Insert {
col: col, col: col,
key: ekey, key: ekey,
value: value.to_vec(), value: value.to_vec(),
@ -72,7 +72,7 @@ impl DBTransaction {
pub fn put_vec(&self, col: Option<u32>, key: &[u8], value: Bytes) -> Result<(), String> { pub fn put_vec(&self, col: Option<u32>, key: &[u8], value: Bytes) -> Result<(), String> {
let mut ekey = ElasticArray32::new(); let mut ekey = ElasticArray32::new();
ekey.append_slice(key); ekey.append_slice(key);
self.ops.write().push(DBOp::Insert { self.ops.lock().push(DBOp::Insert {
col: col, col: col,
key: ekey, key: ekey,
value: value, value: value,
@ -85,7 +85,7 @@ impl DBTransaction {
pub fn put_compressed(&self, col: Option<u32>, key: &[u8], value: Bytes) -> Result<(), String> { pub fn put_compressed(&self, col: Option<u32>, key: &[u8], value: Bytes) -> Result<(), String> {
let mut ekey = ElasticArray32::new(); let mut ekey = ElasticArray32::new();
ekey.append_slice(key); ekey.append_slice(key);
self.ops.write().push(DBOp::InsertCompressed { self.ops.lock().push(DBOp::InsertCompressed {
col: col, col: col,
key: ekey, key: ekey,
value: value, value: value,
@ -97,7 +97,7 @@ impl DBTransaction {
pub fn delete(&self, col: Option<u32>, key: &[u8]) -> Result<(), String> { pub fn delete(&self, col: Option<u32>, key: &[u8]) -> Result<(), String> {
let mut ekey = ElasticArray32::new(); let mut ekey = ElasticArray32::new();
ekey.append_slice(key); ekey.append_slice(key);
self.ops.write().push(DBOp::Delete { self.ops.lock().push(DBOp::Delete {
col: col, col: col,
key: ekey, key: ekey,
}); });
@ -290,30 +290,30 @@ impl Database {
} }
fn to_overly_column(col: Option<u32>) -> usize { fn to_overlay_column(col: Option<u32>) -> usize {
col.map_or(0, |c| (c + 1) as usize) col.map_or(0, |c| (c + 1) as usize)
} }
/// Commit transaction to database. /// Commit transaction to database.
pub fn write_buffered(&self, tr: DBTransaction) -> Result<(), String> { pub fn write_buffered(&self, tr: DBTransaction) -> Result<(), String> {
let mut overlay = self.overlay.write(); let mut overlay = self.overlay.write();
let ops = mem::replace(&mut *tr.ops.write(), Vec::new()); let ops = tr.ops.into_inner();
for op in ops { for op in ops {
match op { match op {
DBOp::Insert { col, key, value } => { DBOp::Insert { col, key, value } => {
let c = Self::to_overly_column(col); let c = Self::to_overlay_column(col);
overlay[c].deletions.remove(&key); overlay[c].deletions.remove(&key);
overlay[c].compressed_insertions.remove(&key); overlay[c].compressed_insertions.remove(&key);
overlay[c].insertions.insert(key, value); overlay[c].insertions.insert(key, value);
}, },
DBOp::InsertCompressed { col, key, value } => { DBOp::InsertCompressed { col, key, value } => {
let c = Self::to_overly_column(col); let c = Self::to_overlay_column(col);
overlay[c].deletions.remove(&key); overlay[c].deletions.remove(&key);
overlay[c].insertions.remove(&key); overlay[c].insertions.remove(&key);
overlay[c].compressed_insertions.insert(key, value); overlay[c].compressed_insertions.insert(key, value);
}, },
DBOp::Delete { col, key } => { DBOp::Delete { col, key } => {
let c = Self::to_overly_column(col); let c = Self::to_overlay_column(col);
overlay[c].insertions.remove(&key); overlay[c].insertions.remove(&key);
overlay[c].compressed_insertions.remove(&key); overlay[c].compressed_insertions.remove(&key);
overlay[c].deletions.insert(key); overlay[c].deletions.insert(key);
@ -364,7 +364,7 @@ impl Database {
/// Commit transaction to database. /// Commit transaction to database.
pub fn write(&self, tr: DBTransaction) -> Result<(), String> { pub fn write(&self, tr: DBTransaction) -> Result<(), String> {
let batch = WriteBatch::new(); let batch = WriteBatch::new();
let ops = mem::replace(&mut *tr.ops.write(), Vec::new()); let ops = tr.ops.into_inner();
for op in ops { for op in ops {
match op { match op {
DBOp::Insert { col, key, value } => { DBOp::Insert { col, key, value } => {
@ -384,7 +384,7 @@ impl Database {
/// Get value by key. /// Get value by key.
pub fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<Bytes>, String> { pub fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<Bytes>, String> {
let overlay = &self.overlay.read()[Self::to_overly_column(col)]; let overlay = &self.overlay.read()[Self::to_overlay_column(col)];
overlay.insertions.get(key).or_else(|| overlay.compressed_insertions.get(key)).map_or_else(|| overlay.insertions.get(key).or_else(|| overlay.compressed_insertions.get(key)).map_or_else(||
col.map_or_else( col.map_or_else(
|| self.db.get(key).map(|r| r.map(|v| v.to_vec())), || self.db.get(key).map(|r| r.map(|v| v.to_vec())),