Removed db_queue

This commit is contained in:
arkpar 2016-01-22 04:57:02 +01:00
parent 9bcb720f1f
commit 0ce15af91e
3 changed files with 2 additions and 122 deletions

View File

@ -7,7 +7,6 @@ use header::BlockNumber;
use spec::Spec; use spec::Spec;
use engine::Engine; use engine::Engine;
use block_queue::{BlockQueue, BlockQueueInfo}; use block_queue::{BlockQueue, BlockQueueInfo};
use db_queue::{DbQueue};
use service::NetSyncMessage; use service::NetSyncMessage;
use env_info::LastHashes; use env_info::LastHashes;
use verification::*; use verification::*;
@ -127,7 +126,6 @@ pub struct Client {
engine: Arc<Box<Engine>>, engine: Arc<Box<Engine>>,
state_db: JournalDB, state_db: JournalDB,
block_queue: RwLock<BlockQueue>, block_queue: RwLock<BlockQueue>,
db_queue: RwLock<DbQueue>,
report: RwLock<ClientReport>, report: RwLock<ClientReport>,
uncommited_states: RwLock<HashMap<H256, JournalDB>>, uncommited_states: RwLock<HashMap<H256, JournalDB>>,
import_lock: Mutex<()> import_lock: Mutex<()>
@ -172,20 +170,15 @@ impl Client {
} }
let state_db = JournalDB::new_with_arc(db); let state_db = JournalDB::new_with_arc(db);
let client = Arc::new(Client { Ok(Arc::new(Client {
chain: chain, chain: chain,
engine: engine.clone(), engine: engine.clone(),
state_db: state_db, state_db: state_db,
block_queue: RwLock::new(BlockQueue::new(engine, message_channel)), block_queue: RwLock::new(BlockQueue::new(engine, message_channel)),
db_queue: RwLock::new(DbQueue::new()),
report: RwLock::new(Default::default()), report: RwLock::new(Default::default()),
uncommited_states: RwLock::new(HashMap::new()), uncommited_states: RwLock::new(HashMap::new()),
import_lock: Mutex::new(()), import_lock: Mutex::new(()),
}); }))
let weak = Arc::downgrade(&client);
client.db_queue.read().unwrap().start(weak);
Ok(client)
} }
/// This is triggered by a message coming from a block queue when the block is ready for insertion /// This is triggered by a message coming from a block queue when the block is ready for insertion

View File

@ -1,111 +0,0 @@
//! A queue of state changes that are written to database in background.
use std::thread::{JoinHandle, self};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use util::*;
use engine::Engine;
use client::Client;
/// State DB commit params
pub struct StateDBCommit {
/// Database to commit
pub db: JournalDB,
/// Starting block number
pub now: u64,
/// Block ahash
pub hash: H256,
/// End block number + hash
pub end: Option<(u64, H256)>,
}
/// A queue of state changes that are written to database in background.
pub struct DbQueue {
more_to_write: Arc<Condvar>,
queue: Arc<Mutex<VecDeque<StateDBCommit>>>,
writer: Mutex<Option<JoinHandle<()>>>,
deleting: Arc<AtomicBool>,
}
impl DbQueue {
/// Creates a new queue instance.
pub fn new() -> DbQueue {
let queue = Arc::new(Mutex::new(VecDeque::new()));
let more_to_write = Arc::new(Condvar::new());
let deleting = Arc::new(AtomicBool::new(false));
DbQueue {
more_to_write: more_to_write.clone(),
queue: queue.clone(),
writer: Mutex::new(None),
deleting: deleting.clone(),
}
}
/// Start processing the queue
pub fn start(&self, client: Weak<Client>) {
let writer = {
let queue = self.queue.clone();
let client = client.clone();
let more_to_write = self.more_to_write.clone();
let deleting = self.deleting.clone();
thread::Builder::new().name("DB Writer".to_string()).spawn(move || DbQueue::writer_loop(client, queue, more_to_write, deleting)).expect("Error creating db writer thread")
};
mem::replace(self.writer.lock().unwrap().deref_mut(), Some(writer));
}
fn writer_loop(client: Weak<Client>, queue: Arc<Mutex<VecDeque<StateDBCommit>>>, wait: Arc<Condvar>, deleting: Arc<AtomicBool>) {
while !deleting.load(AtomicOrdering::Relaxed) {
let mut batch = {
let mut locked = queue.lock().unwrap();
while locked.is_empty() && !deleting.load(AtomicOrdering::Relaxed) {
locked = wait.wait(locked).unwrap();
}
if deleting.load(AtomicOrdering::Relaxed) {
return;
}
mem::replace(locked.deref_mut(), VecDeque::new())
};
for mut state in batch.drain(..) { //TODO: make this a single write transaction
match state.db.commit(state.now, &state.hash, state.end.clone()) {
Ok(_) => (),
Err(e) => {
warn!(target: "client", "State DB commit failed: {:?}", e);
}
}
client.upgrade().unwrap().clear_state(&state.hash);
}
}
}
/// Add a state to the queue
pub fn queue(&self, state: StateDBCommit) {
let mut queue = self.queue.lock().unwrap();
queue.push_back(state);
self.more_to_write.notify_all();
}
}
impl Drop for DbQueue {
fn drop(&mut self) {
self.deleting.store(true, AtomicOrdering::Relaxed);
self.more_to_write.notify_all();
mem::replace(self.writer.lock().unwrap().deref_mut(), None).unwrap().join().unwrap();
}
}
#[cfg(test)]
mod tests {
use util::*;
use spec::*;
use queue::*;
#[test]
fn test_block_queue() {
// TODO better test
let spec = Spec::new_test();
let engine = spec.to_engine().unwrap();
let _ = BlockQueue::new(Arc::new(engine), IoChannel::disconnected());
}
}

View File

@ -149,7 +149,5 @@ pub mod sync;
pub mod block; pub mod block;
/// TODO [arkpar] Please document me /// TODO [arkpar] Please document me
pub mod verification; pub mod verification;
/// TODO [debris] Please document me
pub mod db_queue;
pub mod block_queue; pub mod block_queue;
pub mod ethereum; pub mod ethereum;