diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index c6fe14ecd..bf67a8772 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -18,6 +18,7 @@ use std::marker::PhantomData; use std::path::PathBuf; +use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering}; use util::*; use util::panics::*; use views::BlockView; @@ -50,6 +51,8 @@ pub use types::block_status::BlockStatus; use evm::Factory as EvmFactory; use miner::{Miner, MinerService, TransactionImportResult, AccountDetails}; +const MAX_TX_QUEUE_SIZE: usize = 4096; + impl fmt::Display for BlockChainInfo { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "#{}.{}", self.best_block_number, self.best_block_hash) @@ -92,6 +95,8 @@ pub struct Client where V: Verifier { verifier: PhantomData, vm_factory: Arc, miner: Arc, + io_channel: IoChannel, + queue_transactions: AtomicUsize, } const HISTORY: u64 = 1200; @@ -152,7 +157,7 @@ impl Client where V: Verifier { let engine = Arc::new(spec.engine); - let block_queue = BlockQueue::new(config.queue, engine.clone(), message_channel); + let block_queue = BlockQueue::new(config.queue, engine.clone(), message_channel.clone()); let panic_handler = PanicHandler::new_in_arc(); panic_handler.forward_from(&block_queue); @@ -168,6 +173,8 @@ impl Client where V: Verifier { verifier: PhantomData, vm_factory: Arc::new(EvmFactory::new(config.vm_type)), miner: miner, + io_channel: message_channel, + queue_transactions: AtomicUsize::new(0), }; Ok(Arc::new(client)) @@ -274,6 +281,7 @@ impl Client where V: Verifier { let mut import_results = Vec::with_capacity(max_blocks_to_import); let _import_lock = self.import_lock.lock(); + let _timer = PerfTimer::new("import_verified_blocks"); let blocks = self.block_queue.drain(max_blocks_to_import); let original_best = self.chain_info().best_block_hash; @@ -364,6 +372,19 @@ impl Client where V: Verifier { imported } + /// Import transactions from the IO queue + pub fn import_queued_transactions(&self, transactions: &[Bytes]) -> usize { + let _timer = PerfTimer::new("import_queued_transactions"); + self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst); + let fetch_account = |a: &Address| AccountDetails { + nonce: self.latest_nonce(a), + balance: self.latest_balance(a), + }; + let tx = transactions.iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect(); + let results = self.miner.import_transactions(tx, fetch_account); + results.len() + } + /// Attempt to get a copy of a specific block's state. /// /// This will not fail if given BlockID::Latest. @@ -753,6 +774,22 @@ impl BlockChainClient for Client where V: Verifier { self.miner.import_transactions(transactions, fetch_account) } + fn queue_transactions(&self, transactions: Vec) { + if self.queue_transactions.load(AtomicOrdering::Relaxed) > MAX_TX_QUEUE_SIZE { + debug!("Ignoring {} transactions: queue is full", transactions.len()); + } else { + let len = transactions.len(); + match self.io_channel.send(NetworkIoMessage::User(SyncMessage::NewTransactions(transactions))) { + Ok(_) => { + self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst); + } + Err(e) => { + debug!("Ignoring {} transactions: error queueing: {}", len, e); + } + } + } + } + fn all_transactions(&self) -> Vec { self.miner.all_transactions() } diff --git a/ethcore/src/client/mod.rs b/ethcore/src/client/mod.rs index 3fec68815..9318e0185 100644 --- a/ethcore/src/client/mod.rs +++ b/ethcore/src/client/mod.rs @@ -193,6 +193,9 @@ pub trait BlockChainClient : Sync + Send { /// import transactions from network/other 3rd party fn import_transactions(&self, transactions: Vec) -> Vec>; + /// Queue transactions for importing. + fn queue_transactions(&self, transactions: Vec); + /// list all transactions fn all_transactions(&self) -> Vec; diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index a9f7f6300..b6cc946fc 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -493,6 +493,12 @@ impl BlockChainClient for TestBlockChainClient { self.miner.import_transactions(transactions, &fetch_account) } + fn queue_transactions(&self, transactions: Vec) { + // import right here + let tx = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect(); + self.import_transactions(tx); + } + fn all_transactions(&self) -> Vec { self.miner.all_transactions() } diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index d9040113f..03a85ce13 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -41,6 +41,8 @@ pub enum SyncMessage { NewChainHead, /// A block is ready BlockVerified, + /// New transaction RLPs are ready to be imported + NewTransactions(Vec), /// Start network command. StartNetwork, /// Stop network command. @@ -136,6 +138,9 @@ impl IoHandler for ClientIoHandler { SyncMessage::BlockVerified => { self.client.import_verified_blocks(&io.channel()); }, + SyncMessage::NewTransactions(ref transactions) => { + self.client.import_queued_transactions(&transactions); + }, _ => {}, // ignore other messages } } diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 3b608610d..01640ec4d 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -95,7 +95,6 @@ use ethcore::views::{HeaderView, BlockView}; use ethcore::header::{BlockNumber, Header as BlockHeader}; use ethcore::client::{BlockChainClient, BlockStatus, BlockID, BlockChainInfo}; use ethcore::error::*; -use ethcore::transaction::SignedTransaction; use ethcore::block::Block; use io::SyncIo; use time; @@ -940,15 +939,15 @@ impl ChainSync { return Ok(()); } - let item_count = r.item_count(); + let mut item_count = r.item_count(); trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count); - + item_count = min(item_count, MAX_TX_TO_IMPORT); let mut transactions = Vec::with_capacity(item_count); - for i in 0 .. min(item_count, MAX_TX_TO_IMPORT) { - let tx: SignedTransaction = try!(r.val_at(i)); + for i in 0 .. item_count { + let tx = try!(r.at(i)).as_raw().to_vec(); transactions.push(tx); } - let _ = io.chain().import_transactions(transactions); + let _ = io.chain().queue_transactions(transactions); Ok(()) } diff --git a/util/src/lib.rs b/util/src/lib.rs index e43bbbab0..adaf08e77 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -154,6 +154,7 @@ pub mod panics; pub mod table; pub mod network_settings; pub mod path; +mod timer; pub use common::*; pub use misc::*; @@ -175,6 +176,7 @@ pub use network::*; pub use io::*; pub use log::*; pub use kvdb::*; +pub use timer::*; #[cfg(test)] mod tests { diff --git a/util/src/timer.rs b/util/src/timer.rs new file mode 100644 index 000000000..5d95ff7de --- /dev/null +++ b/util/src/timer.rs @@ -0,0 +1,51 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Performance timer with logging +use time::precise_time_ns; + +/// Performance timer with logging. Starts measuring time in the constructor, prints +/// elapsed time in the destructor or when `stop` is called. +pub struct PerfTimer { + name: &'static str, + start: u64, + stopped: bool, +} + +impl PerfTimer { + /// Create an instance with given name. + pub fn new(name: &'static str) -> PerfTimer { + PerfTimer { + name: name, + start: precise_time_ns(), + stopped: false, + } + } + + /// Stop the timer and print elapsed time on trace level with `perf` target. + pub fn stop(&mut self) { + if !self.stopped { + trace!(target: "perf", "{}: {:.2}ms", self.name, (precise_time_ns() - self.start) as f32 / 1000_000.0); + self.stopped = true; + } + } +} + +impl Drop for PerfTimer { + fn drop(&mut self) { + self.stop() + } +}