TX processing queue

This commit is contained in:
arkpar 2016-06-19 14:35:42 +02:00
parent 75a38500f1
commit 09b8116cde
7 changed files with 110 additions and 7 deletions

View File

@ -18,6 +18,7 @@
use std::marker::PhantomData;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use util::*;
use util::panics::*;
use views::BlockView;
@ -50,6 +51,8 @@ pub use types::block_status::BlockStatus;
use evm::Factory as EvmFactory;
use miner::{Miner, MinerService, TransactionImportResult, AccountDetails};
const MAX_TX_QUEUE_SIZE: usize = 4096;
impl fmt::Display for BlockChainInfo {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "#{}.{}", self.best_block_number, self.best_block_hash)
@ -92,6 +95,8 @@ pub struct Client<V = CanonVerifier> where V: Verifier {
verifier: PhantomData<V>,
vm_factory: Arc<EvmFactory>,
miner: Arc<Miner>,
io_channel: IoChannel<NetSyncMessage>,
queue_transactions: AtomicUsize,
}
const HISTORY: u64 = 1200;
@ -149,7 +154,7 @@ impl<V> Client<V> where V: Verifier {
let engine = Arc::new(spec.engine);
let block_queue = BlockQueue::new(config.queue, engine.clone(), message_channel);
let block_queue = BlockQueue::new(config.queue, engine.clone(), message_channel.clone());
let panic_handler = PanicHandler::new_in_arc();
panic_handler.forward_from(&block_queue);
@ -165,6 +170,8 @@ impl<V> Client<V> where V: Verifier {
verifier: PhantomData,
vm_factory: Arc::new(EvmFactory::new(config.vm_type)),
miner: miner,
io_channel: message_channel,
queue_transactions: AtomicUsize::new(0),
};
Ok(Arc::new(client))
@ -271,6 +278,7 @@ impl<V> Client<V> where V: Verifier {
let mut import_results = Vec::with_capacity(max_blocks_to_import);
let _import_lock = self.import_lock.lock();
let _timer = PerfTimer::new("import_verified_blocks");
let blocks = self.block_queue.drain(max_blocks_to_import);
let original_best = self.chain_info().best_block_hash;
@ -361,6 +369,19 @@ impl<V> Client<V> where V: Verifier {
imported
}
/// Import transactions from the IO queue
pub fn import_queued_transactions(&self, transactions: &[Bytes]) -> usize {
let _timer = PerfTimer::new("import_queued_transactions");
self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst);
let fetch_account = |a: &Address| AccountDetails {
nonce: self.latest_nonce(a),
balance: self.latest_balance(a),
};
let tx = transactions.iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
let results = self.miner.import_transactions(tx, fetch_account);
results.len()
}
/// Attempt to get a copy of a specific block's state.
///
/// This will not fail if given BlockID::Latest.
@ -750,6 +771,22 @@ impl<V> BlockChainClient for Client<V> where V: Verifier {
self.miner.import_transactions(transactions, fetch_account)
}
fn queue_transactions(&self, transactions: Vec<Bytes>) {
if self.queue_transactions.load(AtomicOrdering::Relaxed) > MAX_TX_QUEUE_SIZE {
debug!("Ignoring {} transactions: queue is full", transactions.len());
} else {
let len = transactions.len();
match self.io_channel.send(NetworkIoMessage::User(SyncMessage::NewTransactions(transactions))) {
Ok(_) => {
self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst);
}
Err(e) => {
debug!("Ignoring {} transactions: error queueing: {}", len, e);
}
}
}
}
fn all_transactions(&self) -> Vec<SignedTransaction> {
self.miner.all_transactions()
}

View File

@ -193,6 +193,9 @@ pub trait BlockChainClient : Sync + Send {
/// import transactions from network/other 3rd party
fn import_transactions(&self, transactions: Vec<SignedTransaction>) -> Vec<Result<TransactionImportResult, EthError>>;
/// Queue transactions for importing.
fn queue_transactions(&self, transactions: Vec<Bytes>);
/// list all transactions
fn all_transactions(&self) -> Vec<SignedTransaction>;

View File

@ -493,6 +493,12 @@ impl BlockChainClient for TestBlockChainClient {
self.miner.import_transactions(transactions, &fetch_account)
}
fn queue_transactions(&self, transactions: Vec<Bytes>) {
// import right here
let tx = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
self.import_transactions(tx);
}
fn all_transactions(&self) -> Vec<SignedTransaction> {
self.miner.all_transactions()
}

View File

@ -41,6 +41,8 @@ pub enum SyncMessage {
NewChainHead,
/// A block is ready
BlockVerified,
/// New transaction RLPs are ready to be imported
NewTransactions(Vec<Bytes>),
/// Start network command.
StartNetwork,
/// Stop network command.
@ -136,6 +138,9 @@ impl IoHandler<NetSyncMessage> for ClientIoHandler {
SyncMessage::BlockVerified => {
self.client.import_verified_blocks(&io.channel());
},
SyncMessage::NewTransactions(ref transactions) => {
self.client.import_queued_transactions(&transactions);
},
_ => {}, // ignore other messages
}
}

View File

@ -95,7 +95,6 @@ use ethcore::views::{HeaderView, BlockView};
use ethcore::header::{BlockNumber, Header as BlockHeader};
use ethcore::client::{BlockChainClient, BlockStatus, BlockID, BlockChainInfo};
use ethcore::error::*;
use ethcore::transaction::SignedTransaction;
use ethcore::block::Block;
use io::SyncIo;
use time;
@ -940,15 +939,15 @@ impl ChainSync {
return Ok(());
}
let item_count = r.item_count();
let mut item_count = r.item_count();
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
item_count = min(item_count, MAX_TX_TO_IMPORT);
let mut transactions = Vec::with_capacity(item_count);
for i in 0 .. min(item_count, MAX_TX_TO_IMPORT) {
let tx: SignedTransaction = try!(r.val_at(i));
for i in 0 .. item_count {
let tx = try!(r.at(i)).as_raw().to_vec();
transactions.push(tx);
}
let _ = io.chain().import_transactions(transactions);
let _ = io.chain().queue_transactions(transactions);
Ok(())
}

View File

@ -154,6 +154,7 @@ pub mod panics;
pub mod table;
pub mod network_settings;
pub mod path;
mod timer;
pub use common::*;
pub use misc::*;
@ -175,6 +176,7 @@ pub use network::*;
pub use io::*;
pub use log::*;
pub use kvdb::*;
pub use timer::*;
#[cfg(test)]
mod tests {

51
util/src/timer.rs Normal file
View File

@ -0,0 +1,51 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Performance timer with logging
use time::precise_time_ns;
/// Performance timer with logging. Starts measuring time in the constructor, prints
/// elapsed time in the destructor or when `stop` is called.
pub struct PerfTimer {
name: &'static str,
start: u64,
stopped: bool,
}
impl PerfTimer {
/// Create an instance with given name.
pub fn new(name: &'static str) -> PerfTimer {
PerfTimer {
name: name,
start: precise_time_ns(),
stopped: false,
}
}
/// Stop the timer and print elapsed time on trace level with `perf` target.
pub fn stop(&mut self) {
if !self.stopped {
trace!(target: "perf", "{}: {:.2}ms", self.name, (precise_time_ns() - self.start) as f32 / 1000_000.0);
self.stopped = true;
}
}
}
impl Drop for PerfTimer {
fn drop(&mut self) {
self.stop()
}
}