From 4bf1c205b41b4471b018c511f3b093b43ea14476 Mon Sep 17 00:00:00 2001 From: arkpar Date: Thu, 21 Jan 2016 23:33:52 +0100 Subject: [PATCH] DB commit queue --- src/bin/client/main.rs | 8 +-- src/{queue.rs => block_queue.rs} | 4 +- src/client.rs | 95 ++++++++++++++++++++++---------- src/lib.rs | 3 +- src/service.rs | 26 ++++++--- src/sync/io.rs | 10 ++-- src/sync/mod.rs | 33 ++++------- util/src/io/service.rs | 5 ++ util/src/journaldb.rs | 10 ++++ util/src/network/host.rs | 8 +-- 10 files changed, 127 insertions(+), 75 deletions(-) rename src/{queue.rs => block_queue.rs} (98%) diff --git a/src/bin/client/main.rs b/src/bin/client/main.rs index e49dc2dbc..3ebf4e080 100644 --- a/src/bin/client/main.rs +++ b/src/bin/client/main.rs @@ -10,10 +10,9 @@ use log::{LogLevelFilter}; use env_logger::LogBuilder; use util::*; use ethcore::client::*; -use ethcore::service::ClientService; +use ethcore::service::{ClientService, NetSyncMessage}; use ethcore::ethereum; use ethcore::blockchain::CacheSize; -use ethcore::sync::*; fn setup_log() { let mut builder = LogBuilder::new(); @@ -90,7 +89,7 @@ impl Informant { const INFO_TIMER: TimerToken = 0; struct ClientIoHandler { - client: Arc>, + client: Arc, info: Informant, } @@ -101,8 +100,7 @@ impl IoHandler for ClientIoHandler { fn timeout(&self, _io: &IoContext, timer: TimerToken) { if INFO_TIMER == timer { - let client = self.client.read().unwrap(); - self.info.tick(client.deref()); + self.info.tick(&self.client); } } } diff --git a/src/queue.rs b/src/block_queue.rs similarity index 98% rename from src/queue.rs rename to src/block_queue.rs index 7c74b56d7..0bb184a1b 100644 --- a/src/queue.rs +++ b/src/block_queue.rs @@ -1,12 +1,14 @@ +//! A queue of blocks. Sits between network or other I/O and the BlockChain. +//! Sorts them ready for blockchain insertion. use std::thread::{JoinHandle, self}; use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use util::*; use verification::*; use error::*; use engine::Engine; -use sync::*; use views::*; use header::*; +use service::*; /// A queue of blocks. Sits between network or other I/O and the BlockChain. /// Sorts them ready for blockchain insertion. diff --git a/src/client.rs b/src/client.rs index 226a022ca..cf8b0fd7c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -6,8 +6,9 @@ use error::*; use header::BlockNumber; use spec::Spec; use engine::Engine; -use queue::BlockQueue; -use sync::NetSyncMessage; +use block_queue::BlockQueue; +use db_queue::{DbQueue, StateDBCommit}; +use service::NetSyncMessage; use env_info::LastHashes; use verification::*; use block::*; @@ -95,13 +96,13 @@ pub trait BlockChainClient : Sync + Send { fn block_receipts(&self, hash: &H256) -> Option; /// Import a block into the blockchain. - fn import_block(&mut self, bytes: Bytes) -> ImportResult; + fn import_block(&self, bytes: Bytes) -> ImportResult; /// Get block queue information. fn queue_status(&self) -> BlockQueueStatus; /// Clear block queue and abort all import activity. - fn clear_queue(&mut self); + fn clear_queue(&self); /// Get blockchain information. fn chain_info(&self) -> BlockChainInfo; @@ -132,19 +133,24 @@ pub struct Client { chain: Arc>, engine: Arc>, state_db: JournalDB, - queue: BlockQueue, - report: ClientReport, + block_queue: RwLock, + db_queue: RwLock, + report: RwLock, + uncommited_states: RwLock>, + import_lock: Mutex<()> } const HISTORY: u64 = 1000; impl Client { /// Create a new client with given spec and DB path. - pub fn new(spec: Spec, path: &Path, message_channel: IoChannel ) -> Result { + pub fn new(spec: Spec, path: &Path, message_channel: IoChannel ) -> Result, Error> { let chain = Arc::new(RwLock::new(BlockChain::new(&spec.genesis_block(), path))); let mut opts = Options::new(); opts.set_max_open_files(256); opts.create_if_missing(true); + opts.set_disable_data_sync(true); + opts.set_disable_auto_compactions(true); /*opts.set_use_fsync(false); opts.set_bytes_per_sync(8388608); opts.set_disable_data_sync(false); @@ -164,37 +170,46 @@ impl Client { let mut state_path = path.to_path_buf(); state_path.push("state"); - let db = DB::open(&opts, state_path.to_str().unwrap()).unwrap(); - let mut state_db = JournalDB::new(db); + let db = Arc::new(DB::open(&opts, state_path.to_str().unwrap()).unwrap()); let engine = Arc::new(try!(spec.to_engine())); - if engine.spec().ensure_db_good(&mut state_db) { - state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB"); + { + let mut state_db = JournalDB::new_with_arc(db.clone()); + if engine.spec().ensure_db_good(&mut state_db) { + state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB"); + } } + let state_db = JournalDB::new_with_arc(db); -// chain.write().unwrap().ensure_good(&state_db); - - Ok(Client { + let client = Arc::new(Client { chain: chain, engine: engine.clone(), state_db: state_db, - queue: BlockQueue::new(engine, message_channel), - report: Default::default(), - }) + block_queue: RwLock::new(BlockQueue::new(engine, message_channel)), + db_queue: RwLock::new(DbQueue::new()), + report: RwLock::new(Default::default()), + uncommited_states: RwLock::new(HashMap::new()), + import_lock: Mutex::new(()), + }); + + let weak = Arc::downgrade(&client); + client.db_queue.read().unwrap().start(weak); + Ok(client) } /// This is triggered by a message coming from a block queue when the block is ready for insertion - pub fn import_verified_blocks(&mut self) { + pub fn import_verified_blocks(&self, _io: &IoChannel) { let mut bad = HashSet::new(); - let blocks = self.queue.drain(128); + let _import_lock = self.import_lock.lock(); + let blocks = self.block_queue.write().unwrap().drain(128); if blocks.is_empty() { return; } for block in blocks { if bad.contains(&block.header.parent_hash) { - self.queue.mark_as_bad(&block.header.hash()); + self.block_queue.write().unwrap().mark_as_bad(&block.header.hash()); bad.insert(block.header.hash()); continue; } @@ -202,7 +217,7 @@ impl Client { let header = &block.header; if let Err(e) = verify_block_family(&header, &block.bytes, self.engine.deref().deref(), self.chain.read().unwrap().deref()) { warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); - self.queue.mark_as_bad(&header.hash()); + self.block_queue.write().unwrap().mark_as_bad(&header.hash()); bad.insert(block.header.hash()); return; }; @@ -210,7 +225,7 @@ impl Client { Some(p) => p, None => { warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash); - self.queue.mark_as_bad(&header.hash()); + self.block_queue.write().unwrap().mark_as_bad(&header.hash()); bad.insert(block.header.hash()); return; }, @@ -228,18 +243,23 @@ impl Client { } } - let result = match enact_verified(&block, self.engine.deref().deref(), self.state_db.clone(), &parent, &last_hashes) { + let db = match self.uncommited_states.read().unwrap().get(&header.parent_hash) { + Some(db) => db.clone(), + None => self.state_db.clone(), + }; + + let result = match enact_verified(&block, self.engine.deref().deref(), db, &parent, &last_hashes) { Ok(b) => b, Err(e) => { warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); bad.insert(block.header.hash()); - self.queue.mark_as_bad(&header.hash()); + self.block_queue.write().unwrap().mark_as_bad(&header.hash()); return; } }; if let Err(e) = verify_block_final(&header, result.block().header()) { warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); - self.queue.mark_as_bad(&header.hash()); + self.block_queue.write().unwrap().mark_as_bad(&header.hash()); return; } @@ -252,11 +272,25 @@ impl Client { return; } } - self.report.accrue_block(&block); + /* + let db = result.drain(); + self.uncommited_states.write().unwrap().insert(header.hash(), db.clone()); + self.db_queue.write().unwrap().queue(StateDBCommit { + now: header.number(), + hash: header.hash().clone(), + end: ancient.map(|n|(n, self.chain.read().unwrap().block_hash(n).unwrap())), + db: db, + });*/ + self.report.write().unwrap().accrue_block(&block); trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); } } + /// Clear cached state overlay + pub fn clear_state(&self, hash: &H256) { + self.uncommited_states.write().unwrap().remove(hash); + } + /// Get info on the cache. pub fn cache_info(&self) -> CacheSize { self.chain.read().unwrap().cache_size() @@ -264,7 +298,7 @@ impl Client { /// Get the report. pub fn report(&self) -> ClientReport { - self.report.clone() + self.report.read().unwrap().clone() } /// Tick the client. @@ -327,12 +361,12 @@ impl BlockChainClient for Client { unimplemented!(); } - fn import_block(&mut self, bytes: Bytes) -> ImportResult { + fn import_block(&self, bytes: Bytes) -> ImportResult { let header = BlockView::new(&bytes).header(); if self.chain.read().unwrap().is_known(&header.hash()) { return Err(ImportError::AlreadyInChain); } - self.queue.import_block(bytes) + self.block_queue.write().unwrap().import_block(bytes) } fn queue_status(&self) -> BlockQueueStatus { @@ -341,7 +375,8 @@ impl BlockChainClient for Client { } } - fn clear_queue(&mut self) { + fn clear_queue(&self) { + self.block_queue.write().unwrap().clear(); } fn chain_info(&self) -> BlockChainInfo { diff --git a/src/lib.rs b/src/lib.rs index a5b6c3dae..58d84764a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -150,5 +150,6 @@ pub mod block; /// TODO [arkpar] Please document me pub mod verification; /// TODO [debris] Please document me -pub mod queue; +pub mod db_queue; +pub mod block_queue; pub mod ethereum; diff --git a/src/service.rs b/src/service.rs index b97c1cb69..4034ce841 100644 --- a/src/service.rs +++ b/src/service.rs @@ -5,10 +5,22 @@ use error::*; use std::env; use client::Client; +/// Message type for external and internal events +#[derive(Clone)] +pub enum SyncMessage { + /// New block has been imported into the blockchain + NewChainBlock(Bytes), //TODO: use Cow + /// A block is ready + BlockVerified, +} + +/// TODO [arkpar] Please document me +pub type NetSyncMessage = NetworkIoMessage; + /// Client service setup. Creates and registers client and network services with the IO subsystem. pub struct ClientService { net_service: NetworkService, - client: Arc>, + client: Arc, } impl ClientService { @@ -20,7 +32,7 @@ impl ClientService { let mut dir = env::home_dir().unwrap(); dir.push(".parity"); dir.push(H64::from(spec.genesis_header().hash()).hex()); - let client = Arc::new(RwLock::new(try!(Client::new(spec, &dir, net_service.io().channel())))); + let client = try!(Client::new(spec, &dir, net_service.io().channel())); EthSync::register(&mut net_service, client.clone()); let client_io = Arc::new(ClientIoHandler { client: client.clone() @@ -39,14 +51,14 @@ impl ClientService { } /// TODO [arkpar] Please document me - pub fn client(&self) -> Arc> { + pub fn client(&self) -> Arc { self.client.clone() } } /// IO interface for the Client handler struct ClientIoHandler { - client: Arc> + client: Arc } const CLIENT_TICK_TIMER: TimerToken = 0; @@ -59,16 +71,16 @@ impl IoHandler for ClientIoHandler { fn timeout(&self, _io: &IoContext, timer: TimerToken) { if timer == CLIENT_TICK_TIMER { - self.client.read().unwrap().tick(); + self.client.tick(); } } - fn message(&self, _io: &IoContext, net_message: &NetSyncMessage) { + fn message(&self, io: &IoContext, net_message: &NetSyncMessage) { match net_message { &UserMessage(ref message) => { match message { &SyncMessage::BlockVerified => { - self.client.write().unwrap().import_verified_blocks(); + self.client.import_verified_blocks(&io.channel()); }, _ => {}, // ignore other messages } diff --git a/src/sync/io.rs b/src/sync/io.rs index f49591a9f..754e3add5 100644 --- a/src/sync/io.rs +++ b/src/sync/io.rs @@ -1,7 +1,7 @@ use client::BlockChainClient; use util::{NetworkContext, PeerId, PacketId,}; use util::error::UtilError; -use sync::SyncMessage; +use service::SyncMessage; /// IO interface for the syning handler. /// Provides peer connection management and an interface to the blockchain client. @@ -14,7 +14,7 @@ pub trait SyncIo { /// Send a packet to a peer. fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError>; /// Get the blockchain - fn chain<'s>(&'s mut self) -> &'s mut BlockChainClient; + fn chain<'s>(&'s self) -> &'s BlockChainClient; /// Returns peer client identifier string fn peer_info(&self, peer_id: PeerId) -> String { peer_id.to_string() @@ -24,12 +24,12 @@ pub trait SyncIo { /// Wraps `NetworkContext` and the blockchain client pub struct NetSyncIo<'s, 'h> where 'h: 's { network: &'s NetworkContext<'h, SyncMessage>, - chain: &'s mut BlockChainClient + chain: &'s BlockChainClient } impl<'s, 'h> NetSyncIo<'s, 'h> { /// Creates a new instance from the `NetworkContext` and the blockchain client reference. - pub fn new(network: &'s NetworkContext<'h, SyncMessage>, chain: &'s mut BlockChainClient) -> NetSyncIo<'s, 'h> { + pub fn new(network: &'s NetworkContext<'h, SyncMessage>, chain: &'s BlockChainClient) -> NetSyncIo<'s, 'h> { NetSyncIo { network: network, chain: chain, @@ -50,7 +50,7 @@ impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> { self.network.send(peer_id, packet_id, data) } - fn chain<'a>(&'a mut self) -> &'a mut BlockChainClient { + fn chain<'a>(&'a self) -> &'a BlockChainClient { self.chain } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 9bb18a1c0..c87dee569 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -25,9 +25,10 @@ use std::ops::*; use std::sync::*; use client::Client; -use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId, NetworkIoMessage}; +use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; use sync::chain::ChainSync; -use util::{Bytes, TimerToken}; +use util::TimerToken; +use service::SyncMessage; use sync::io::NetSyncIo; mod chain; @@ -39,22 +40,10 @@ mod tests; const SYNC_TIMER: usize = 0; -/// Message type for external events -#[derive(Clone)] -pub enum SyncMessage { - /// New block has been imported into the blockchain - NewChainBlock(Bytes), //TODO: use Cow - /// A block is ready - BlockVerified, -} - -/// TODO [arkpar] Please document me -pub type NetSyncMessage = NetworkIoMessage; - /// Ethereum network protocol handler pub struct EthSync { /// Shared blockchain client. TODO: this should evetually become an IPC endpoint - chain: Arc>, + chain: Arc, /// Sync strategy sync: RwLock } @@ -63,7 +52,7 @@ pub use self::chain::SyncStatus; impl EthSync { /// Creates and register protocol with the network service - pub fn register(service: &mut NetworkService, chain: Arc>) { + pub fn register(service: &mut NetworkService, chain: Arc) { let sync = Arc::new(EthSync { chain: chain, sync: RwLock::new(ChainSync::new()), @@ -78,12 +67,12 @@ impl EthSync { /// Stop sync pub fn stop(&mut self, io: &mut NetworkContext) { - self.sync.write().unwrap().abort(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut())); + self.sync.write().unwrap().abort(&mut NetSyncIo::new(io, self.chain.deref())); } /// Restart sync pub fn restart(&mut self, io: &mut NetworkContext) { - self.sync.write().unwrap().restart(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut())); + self.sync.write().unwrap().restart(&mut NetSyncIo::new(io, self.chain.deref())); } } @@ -93,20 +82,20 @@ impl NetworkProtocolHandler for EthSync { } fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { - self.sync.write().unwrap().on_packet(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()) , *peer, packet_id, data); + self.sync.write().unwrap().on_packet(&mut NetSyncIo::new(io, self.chain.deref()) , *peer, packet_id, data); } fn connected(&self, io: &NetworkContext, peer: &PeerId) { - self.sync.write().unwrap().on_peer_connected(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()), *peer); + self.sync.write().unwrap().on_peer_connected(&mut NetSyncIo::new(io, self.chain.deref()), *peer); } fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { - self.sync.write().unwrap().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()), *peer); + self.sync.write().unwrap().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer); } fn timeout(&self, io: &NetworkContext, timer: TimerToken) { if timer == SYNC_TIMER { - self.sync.write().unwrap().maintain_sync(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut())); + self.sync.write().unwrap().maintain_sync(&mut NetSyncIo::new(io, self.chain.deref())); } } } diff --git a/util/src/io/service.rs b/util/src/io/service.rs index a229e4022..fab0f113d 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -102,6 +102,11 @@ impl IoContext where Message: Send + Clone + 'static { pub fn message(&self, message: Message) { self.channel.send(message).expect("Error seding message"); } + + /// Get message channel + pub fn channel(&self) -> IoChannel { + self.channel.clone() + } } #[derive(Clone)] diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index ada9c0d2b..9115c4362 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -34,6 +34,16 @@ impl JournalDB { } } + /// Create a new instance given a shared `backing` database. + pub fn new_with_arc(backing: Arc) -> JournalDB { + JournalDB { + forward: OverlayDB::new_with_arc(backing.clone()), + backing: backing, + inserts: vec![], + removes: vec![], + } + } + /// Create a new instance with an anonymous temporary database. pub fn new_temp() -> JournalDB { let mut dir = env::temp_dir(); diff --git a/util/src/network/host.rs b/util/src/network/host.rs index f83cb1908..b9b9496c4 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -675,13 +675,13 @@ impl IoHandler> for Host where Messa if let Some(connection) = self.connections.read().unwrap().get(stream).map(|c| c.clone()) { match connection.lock().unwrap().deref() { &ConnectionEntry::Handshake(ref h) => h.register_socket(reg, event_loop).expect("Error registering socket"), - _ => warn!("Unexpected session stream registration") + &ConnectionEntry::Session(_) => warn!("Unexpected session stream registration") } - } else { warn!("Unexpected stream registration")} + } else {} // expired } NODETABLE_RECEIVE => event_loop.register(self.udp_socket.lock().unwrap().deref(), Token(NODETABLE_RECEIVE), EventSet::all(), PollOpt::edge()).expect("Error registering stream"), TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"), - _ => warn!("Unexpected stream regitration") + _ => warn!("Unexpected stream registration") } } @@ -693,7 +693,7 @@ impl IoHandler> for Host where Messa &ConnectionEntry::Handshake(ref h) => h.update_socket(reg, event_loop).expect("Error updating socket"), &ConnectionEntry::Session(ref s) => s.update_socket(reg, event_loop).expect("Error updating socket"), } - } else { warn!("Unexpected stream update")} + } else {} // expired } NODETABLE_RECEIVE => event_loop.reregister(self.udp_socket.lock().unwrap().deref(), Token(NODETABLE_RECEIVE), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"), TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),