diff --git a/src/client.rs b/src/client.rs index 04d372786..6f47d0601 100644 --- a/src/client.rs +++ b/src/client.rs @@ -7,7 +7,6 @@ use header::BlockNumber; use spec::Spec; use engine::Engine; use block_queue::{BlockQueue, BlockQueueInfo}; -use db_queue::{DbQueue}; use service::NetSyncMessage; use env_info::LastHashes; use verification::*; @@ -127,7 +126,6 @@ pub struct Client { engine: Arc>, state_db: JournalDB, block_queue: RwLock, - db_queue: RwLock, report: RwLock, uncommited_states: RwLock>, import_lock: Mutex<()> @@ -172,20 +170,15 @@ impl Client { } let state_db = JournalDB::new_with_arc(db); - let client = Arc::new(Client { + Ok(Arc::new(Client { chain: chain, engine: engine.clone(), state_db: state_db, block_queue: RwLock::new(BlockQueue::new(engine, message_channel)), - db_queue: RwLock::new(DbQueue::new()), report: RwLock::new(Default::default()), uncommited_states: RwLock::new(HashMap::new()), import_lock: Mutex::new(()), - }); - - let weak = Arc::downgrade(&client); - client.db_queue.read().unwrap().start(weak); - Ok(client) + })) } /// This is triggered by a message coming from a block queue when the block is ready for insertion diff --git a/src/db_queue.rs b/src/db_queue.rs deleted file mode 100644 index 242fd9cc4..000000000 --- a/src/db_queue.rs +++ /dev/null @@ -1,111 +0,0 @@ -//! A queue of state changes that are written to database in background. -use std::thread::{JoinHandle, self}; -use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; -use util::*; -use engine::Engine; -use client::Client; - -/// State DB commit params -pub struct StateDBCommit { - /// Database to commit - pub db: JournalDB, - /// Starting block number - pub now: u64, - /// Block ahash - pub hash: H256, - /// End block number + hash - pub end: Option<(u64, H256)>, -} - -/// A queue of state changes that are written to database in background. -pub struct DbQueue { - more_to_write: Arc, - queue: Arc>>, - writer: Mutex>>, - deleting: Arc, -} - -impl DbQueue { - /// Creates a new queue instance. - pub fn new() -> DbQueue { - let queue = Arc::new(Mutex::new(VecDeque::new())); - let more_to_write = Arc::new(Condvar::new()); - let deleting = Arc::new(AtomicBool::new(false)); - - DbQueue { - more_to_write: more_to_write.clone(), - queue: queue.clone(), - writer: Mutex::new(None), - deleting: deleting.clone(), - } - } - - /// Start processing the queue - pub fn start(&self, client: Weak) { - let writer = { - let queue = self.queue.clone(); - let client = client.clone(); - let more_to_write = self.more_to_write.clone(); - let deleting = self.deleting.clone(); - thread::Builder::new().name("DB Writer".to_string()).spawn(move || DbQueue::writer_loop(client, queue, more_to_write, deleting)).expect("Error creating db writer thread") - }; - mem::replace(self.writer.lock().unwrap().deref_mut(), Some(writer)); - } - - fn writer_loop(client: Weak, queue: Arc>>, wait: Arc, deleting: Arc) { - while !deleting.load(AtomicOrdering::Relaxed) { - let mut batch = { - let mut locked = queue.lock().unwrap(); - while locked.is_empty() && !deleting.load(AtomicOrdering::Relaxed) { - locked = wait.wait(locked).unwrap(); - } - - if deleting.load(AtomicOrdering::Relaxed) { - return; - } - mem::replace(locked.deref_mut(), VecDeque::new()) - }; - - for mut state in batch.drain(..) { //TODO: make this a single write transaction - match state.db.commit(state.now, &state.hash, state.end.clone()) { - Ok(_) => (), - Err(e) => { - warn!(target: "client", "State DB commit failed: {:?}", e); - } - } - client.upgrade().unwrap().clear_state(&state.hash); - } - - } - } - - /// Add a state to the queue - pub fn queue(&self, state: StateDBCommit) { - let mut queue = self.queue.lock().unwrap(); - queue.push_back(state); - self.more_to_write.notify_all(); - } -} - -impl Drop for DbQueue { - fn drop(&mut self) { - self.deleting.store(true, AtomicOrdering::Relaxed); - self.more_to_write.notify_all(); - mem::replace(self.writer.lock().unwrap().deref_mut(), None).unwrap().join().unwrap(); - } -} - -#[cfg(test)] -mod tests { - use util::*; - use spec::*; - use queue::*; - - #[test] - fn test_block_queue() { - // TODO better test - let spec = Spec::new_test(); - let engine = spec.to_engine().unwrap(); - let _ = BlockQueue::new(Arc::new(engine), IoChannel::disconnected()); - } -} diff --git a/src/lib.rs b/src/lib.rs index 58d84764a..68c658267 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -149,7 +149,5 @@ pub mod sync; pub mod block; /// TODO [arkpar] Please document me pub mod verification; -/// TODO [debris] Please document me -pub mod db_queue; pub mod block_queue; pub mod ethereum;