diff --git a/src/bin/client/main.rs b/src/bin/client/main.rs index 7d673f8d3..147ea2be2 100644 --- a/src/bin/client/main.rs +++ b/src/bin/client/main.rs @@ -14,6 +14,7 @@ use ethcore::client::*; use ethcore::service::{ClientService, NetSyncMessage}; use ethcore::ethereum; use ethcore::blockchain::CacheSize; +use ethcore::sync::EthSync; fn setup_log() { let mut builder = LogBuilder::new(); @@ -30,7 +31,7 @@ fn main() { setup_log(); let spec = ethereum::new_frontier(); let mut service = ClientService::start(spec).unwrap(); - let io_handler = Arc::new(ClientIoHandler { client: service.client(), info: Default::default() }); + let io_handler = Arc::new(ClientIoHandler { client: service.client(), info: Default::default(), sync: service.sync() }); service.io().register_handler(io_handler).expect("Error registering IO handler"); let exit = Arc::new(Condvar::new()); @@ -60,22 +61,29 @@ impl Default for Informant { } impl Informant { - pub fn tick(&self, client: &Client) { + pub fn tick(&self, client: &Client, sync: &EthSync) { // 5 seconds betwen calls. TODO: calculate this properly. let dur = 5usize; let chain_info = client.chain_info(); + let queue_info = client.queue_info(); let cache_info = client.cache_info(); let report = client.report(); + let sync_info = sync.status(); if let (_, &Some(ref last_cache_info), &Some(ref last_report)) = (self.chain_info.read().unwrap().deref(), self.cache_info.read().unwrap().deref(), self.report.read().unwrap().deref()) { - println!("[ {} {} ]---[ {} blk/s | {} tx/s | {} gas/s //···{}···// {} ({}) bl {} ({}) ex ]", + println!("[ {} {} ]---[ {} blk/s | {} tx/s | {} gas/s //··· {}/{} peers, {} downloaded, {} queued ···// {} ({}) bl {} ({}) ex ]", chain_info.best_block_number, chain_info.best_block_hash, (report.blocks_imported - last_report.blocks_imported) / dur, (report.transactions_applied - last_report.transactions_applied) / dur, (report.gas_processed - last_report.gas_processed) / From::from(dur), - 0, // TODO: peers + + sync_info.num_active_peers, + sync_info.num_peers, + sync_info.blocks_received, + queue_info.queue_size, + cache_info.blocks, cache_info.blocks as isize - last_cache_info.blocks as isize, cache_info.block_details, @@ -93,6 +101,7 @@ const INFO_TIMER: TimerToken = 0; struct ClientIoHandler { client: Arc, + sync: Arc, info: Informant, } @@ -103,7 +112,7 @@ impl IoHandler for ClientIoHandler { fn timeout(&self, _io: &IoContext, timer: TimerToken) { if INFO_TIMER == timer { - self.info.tick(&self.client); + self.info.tick(&self.client, &self.sync); } } } diff --git a/src/block_queue.rs b/src/block_queue.rs index 0bb184a1b..1ffd0f7ec 100644 --- a/src/block_queue.rs +++ b/src/block_queue.rs @@ -10,6 +10,15 @@ use views::*; use header::*; use service::*; +/// Block queue status +#[derive(Debug)] +pub struct BlockQueueInfo { + /// Indicates that queue is full + pub full: bool, + /// Number of queued blocks + pub queue_size: usize, +} + /// A queue of blocks. Sits between network or other I/O and the BlockChain. /// Sorts them ready for blockchain insertion. pub struct BlockQueue { @@ -65,14 +74,15 @@ impl BlockQueue { let deleting = Arc::new(AtomicBool::new(false)); let mut verifiers: Vec> = Vec::new(); - let thread_count = max(::num_cpus::get(), 2) - 1; - for _ in 0..thread_count { + let thread_count = max(::num_cpus::get(), 3) - 2; + for i in 0..thread_count { let verification = verification.clone(); let engine = engine.clone(); let more_to_verify = more_to_verify.clone(); let ready_signal = ready_signal.clone(); let deleting = deleting.clone(); - verifiers.push(thread::spawn(move || BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting))); + verifiers.push(thread::Builder::new().name(format!("Verifier #{}", i)).spawn(move || BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting)) + .expect("Error starting block verification thread")); } BlockQueue { engine: engine, @@ -206,7 +216,7 @@ impl BlockQueue { verification.verified = new_verified; } - /// TODO [arkpar] Please document me + /// Removes up to `max` verified blocks from the queue pub fn drain(&mut self, max: usize) -> Vec { let mut verification = self.verification.lock().unwrap(); let count = min(max, verification.verified.len()); @@ -217,8 +227,19 @@ impl BlockQueue { result.push(block); } self.ready_signal.reset(); + if !verification.verified.is_empty() { + self.ready_signal.set(); + } result } + + /// Get queue status. + pub fn queue_info(&self) -> BlockQueueInfo { + BlockQueueInfo { + full: false, + queue_size: self.verification.lock().unwrap().unverified.len(), + } + } } impl Drop for BlockQueue { diff --git a/src/blockchain.rs b/src/blockchain.rs index 27abe9ee3..0720d7229 100644 --- a/src/blockchain.rs +++ b/src/blockchain.rs @@ -342,19 +342,19 @@ impl BlockChain { Some(h) => h, None => return None, }; - Some(self._tree_route((from_details, from), (to_details, to))) + Some(self._tree_route((&from_details, &from), (&to_details, &to))) } /// Similar to `tree_route` function, but can be used to return a route /// between blocks which may not be in database yet. - fn _tree_route(&self, from: (BlockDetails, H256), to: (BlockDetails, H256)) -> TreeRoute { + fn _tree_route(&self, from: (&BlockDetails, &H256), to: (&BlockDetails, &H256)) -> TreeRoute { let mut from_branch = vec![]; let mut to_branch = vec![]; - let mut from_details = from.0; - let mut to_details = to.0; - let mut current_from = from.1; - let mut current_to = to.1; + let mut from_details = from.0.clone(); + let mut to_details = to.0.clone(); + let mut current_from = from.1.clone(); + let mut current_to = to.1.clone(); // reset from && to to the same level while from_details.number > to_details.number { @@ -409,7 +409,7 @@ impl BlockChain { // store block in db self.blocks_db.put(&hash, &bytes).unwrap(); - let (batch, new_best) = self.block_to_extras_insert_batch(bytes); + let (batch, new_best, details) = self.block_to_extras_insert_batch(bytes); // update best block let mut best_block = self.best_block.write().unwrap(); @@ -420,6 +420,8 @@ impl BlockChain { // update caches let mut write = self.block_details.write().unwrap(); write.remove(&header.parent_hash()); + write.insert(hash.clone(), details); + self.note_used(CacheID::Block(hash)); // update extras database self.extras_db.write(batch).unwrap(); @@ -427,7 +429,7 @@ impl BlockChain { /// Transforms block into WriteBatch that may be written into database /// Additionally, if it's new best block it returns new best block object. - fn block_to_extras_insert_batch(&self, bytes: &[u8]) -> (WriteBatch, Option) { + fn block_to_extras_insert_batch(&self, bytes: &[u8]) -> (WriteBatch, Option, BlockDetails) { // create views onto rlp let block = BlockView::new(bytes); let header = block.header_view(); @@ -459,7 +461,7 @@ impl BlockChain { // if it's not new best block, just return if !is_new_best { - return (batch, None); + return (batch, None, details); } // if its new best block we need to make sure that all ancestors @@ -467,7 +469,7 @@ impl BlockChain { // find the route between old best block and the new one let best_hash = self.best_block_hash(); let best_details = self.block_details(&best_hash).expect("best block hash is invalid!"); - let route = self._tree_route((best_details, best_hash), (details, hash.clone())); + let route = self._tree_route((&best_details, &best_hash), (&details, &hash)); match route.blocks.len() { // its our parent @@ -494,7 +496,7 @@ impl BlockChain { total_difficulty: total_difficulty }; - (batch, Some(best_block)) + (batch, Some(best_block), details) } /// Returns true if transaction is known. diff --git a/src/client.rs b/src/client.rs index cf8b0fd7c..04d372786 100644 --- a/src/client.rs +++ b/src/client.rs @@ -6,8 +6,8 @@ use error::*; use header::BlockNumber; use spec::Spec; use engine::Engine; -use block_queue::BlockQueue; -use db_queue::{DbQueue, StateDBCommit}; +use block_queue::{BlockQueue, BlockQueueInfo}; +use db_queue::{DbQueue}; use service::NetSyncMessage; use env_info::LastHashes; use verification::*; @@ -47,13 +47,6 @@ impl fmt::Display for BlockChainInfo { } } -/// Block queue status -#[derive(Debug)] -pub struct BlockQueueStatus { - /// TODO [arkpar] Please document me - pub full: bool, -} - /// TODO [arkpar] Please document me pub type TreeRoute = ::blockchain::TreeRoute; @@ -99,7 +92,7 @@ pub trait BlockChainClient : Sync + Send { fn import_block(&self, bytes: Bytes) -> ImportResult; /// Get block queue information. - fn queue_status(&self) -> BlockQueueStatus; + fn queue_info(&self) -> BlockQueueInfo; /// Clear block queue and abort all import activity. fn clear_queue(&self); @@ -149,8 +142,6 @@ impl Client { let mut opts = Options::new(); opts.set_max_open_files(256); opts.create_if_missing(true); - opts.set_disable_data_sync(true); - opts.set_disable_auto_compactions(true); /*opts.set_use_fsync(false); opts.set_bytes_per_sync(8388608); opts.set_disable_data_sync(false); @@ -199,7 +190,6 @@ impl Client { /// This is triggered by a message coming from a block queue when the block is ready for insertion pub fn import_verified_blocks(&self, _io: &IoChannel) { - let mut bad = HashSet::new(); let _import_lock = self.import_lock.lock(); let blocks = self.block_queue.write().unwrap().drain(128); @@ -243,11 +233,7 @@ impl Client { } } - let db = match self.uncommited_states.read().unwrap().get(&header.parent_hash) { - Some(db) => db.clone(), - None => self.state_db.clone(), - }; - + let db = self.state_db.clone(); let result = match enact_verified(&block, self.engine.deref().deref(), db, &parent, &last_hashes) { Ok(b) => b, Err(e) => { @@ -272,15 +258,6 @@ impl Client { return; } } - /* - let db = result.drain(); - self.uncommited_states.write().unwrap().insert(header.hash(), db.clone()); - self.db_queue.write().unwrap().queue(StateDBCommit { - now: header.number(), - hash: header.hash().clone(), - end: ancient.map(|n|(n, self.chain.read().unwrap().block_hash(n).unwrap())), - db: db, - });*/ self.report.write().unwrap().accrue_block(&block); trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); } @@ -369,10 +346,8 @@ impl BlockChainClient for Client { self.block_queue.write().unwrap().import_block(bytes) } - fn queue_status(&self) -> BlockQueueStatus { - BlockQueueStatus { - full: false - } + fn queue_info(&self) -> BlockQueueInfo { + self.block_queue.read().unwrap().queue_info() } fn clear_queue(&self) { diff --git a/src/service.rs b/src/service.rs index 4034ce841..b9b510d5e 100644 --- a/src/service.rs +++ b/src/service.rs @@ -21,6 +21,7 @@ pub type NetSyncMessage = NetworkIoMessage; pub struct ClientService { net_service: NetworkService, client: Arc, + sync: Arc, } impl ClientService { @@ -33,7 +34,7 @@ impl ClientService { dir.push(".parity"); dir.push(H64::from(spec.genesis_header().hash()).hex()); let client = try!(Client::new(spec, &dir, net_service.io().channel())); - EthSync::register(&mut net_service, client.clone()); + let sync = EthSync::register(&mut net_service, client.clone()); let client_io = Arc::new(ClientIoHandler { client: client.clone() }); @@ -42,6 +43,7 @@ impl ClientService { Ok(ClientService { net_service: net_service, client: client, + sync: sync, }) } @@ -53,6 +55,12 @@ impl ClientService { /// TODO [arkpar] Please document me pub fn client(&self) -> Arc { self.client.clone() + + } + + /// Get shared sync handler + pub fn sync(&self) -> Arc { + self.sync.clone() } } diff --git a/src/sync/chain.rs b/src/sync/chain.rs index ce748da08..f44f058c8 100644 --- a/src/sync/chain.rs +++ b/src/sync/chain.rs @@ -107,6 +107,10 @@ pub struct SyncStatus { pub blocks_total: usize, /// Number of blocks downloaded so far. pub blocks_received: usize, + /// Total number of connected peers + pub num_peers: usize, + /// Total number of active peers + pub num_active_peers: usize, } #[derive(PartialEq, Eq, Debug)] @@ -195,8 +199,10 @@ impl ChainSync { start_block_number: self.starting_block, last_imported_block_number: self.last_imported_block, highest_block_number: self.highest_block, - blocks_total: (self.last_imported_block - self.starting_block) as usize, - blocks_received: (self.highest_block - self.starting_block) as usize, + blocks_received: (self.last_imported_block - self.starting_block) as usize, + blocks_total: (self.highest_block - self.starting_block) as usize, + num_peers: self.peers.len(), + num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(), } } @@ -544,7 +550,7 @@ impl ChainSync { fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId) { self.clear_peer_download(peer_id); - if io.chain().queue_status().full { + if io.chain().queue_info().full { self.pause_sync(); return; } @@ -971,7 +977,7 @@ impl ChainSync { } /// Maintain other peers. Send out any new blocks and transactions - pub fn maintain_sync(&mut self, _io: &mut SyncIo) { + pub fn _maintain_sync(&mut self, _io: &mut SyncIo) { } } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index c87dee569..078100084 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -27,7 +27,6 @@ use std::sync::*; use client::Client; use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; use sync::chain::ChainSync; -use util::TimerToken; use service::SyncMessage; use sync::io::NetSyncIo; @@ -38,8 +37,6 @@ mod range_collection; #[cfg(test)] mod tests; -const SYNC_TIMER: usize = 0; - /// Ethereum network protocol handler pub struct EthSync { /// Shared blockchain client. TODO: this should evetually become an IPC endpoint @@ -52,12 +49,13 @@ pub use self::chain::SyncStatus; impl EthSync { /// Creates and register protocol with the network service - pub fn register(service: &mut NetworkService, chain: Arc) { + pub fn register(service: &mut NetworkService, chain: Arc) -> Arc { let sync = Arc::new(EthSync { chain: chain, sync: RwLock::new(ChainSync::new()), }); service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler"); + sync } /// Get sync status @@ -77,8 +75,7 @@ impl EthSync { } impl NetworkProtocolHandler for EthSync { - fn initialize(&self, io: &NetworkContext) { - io.register_timer(SYNC_TIMER, 1000).unwrap(); + fn initialize(&self, _io: &NetworkContext) { } fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { @@ -92,12 +89,6 @@ impl NetworkProtocolHandler for EthSync { fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { self.sync.write().unwrap().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer); } - - fn timeout(&self, io: &NetworkContext, timer: TimerToken) { - if timer == SYNC_TIMER { - self.sync.write().unwrap().maintain_sync(&mut NetSyncIo::new(io, self.chain.deref())); - } - } }