Merge pull request #3491 from ethcore/transactions-propagate

Propagations & local transactions tracking
This commit is contained in:
Gav Wood 2016-11-20 14:10:27 +01:00 committed by GitHub
commit efca40a733
37 changed files with 1921 additions and 148 deletions

1
Cargo.lock generated
View File

@ -297,6 +297,7 @@ dependencies = [
"heapsize 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "heapsize 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.9.4 (git+https://github.com/ethcore/hyper)", "hyper 0.9.4 (git+https://github.com/ethcore/hyper)",
"lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"lru-cache 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "lru-cache 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -27,6 +27,7 @@ time = "0.1"
rand = "0.3" rand = "0.3"
byteorder = "0.5" byteorder = "0.5"
transient-hashmap = "0.1" transient-hashmap = "0.1"
linked-hash-map = "0.3.0"
evmjit = { path = "../evmjit", optional = true } evmjit = { path = "../evmjit", optional = true }
clippy = { version = "0.0.96", optional = true} clippy = { version = "0.0.96", optional = true}
ethash = { path = "../ethash" } ethash = { path = "../ethash" }

View File

@ -891,12 +891,9 @@ impl BlockChainClient for Client {
let mut mode = self.mode.lock(); let mut mode = self.mode.lock();
*mode = new_mode.clone().into(); *mode = new_mode.clone().into();
trace!(target: "mode", "Mode now {:?}", &*mode); trace!(target: "mode", "Mode now {:?}", &*mode);
match *self.on_mode_change.lock() { if let Some(ref mut f) = *self.on_mode_change.lock() {
Some(ref mut f) => {
trace!(target: "mode", "Making callback..."); trace!(target: "mode", "Making callback...");
f(&*mode) f(&*mode)
},
_ => {}
} }
} }
match new_mode { match new_mode {

View File

@ -102,6 +102,7 @@ extern crate rlp;
extern crate ethcore_bloom_journal as bloom_journal; extern crate ethcore_bloom_journal as bloom_journal;
extern crate byteorder; extern crate byteorder;
extern crate transient_hashmap; extern crate transient_hashmap;
extern crate linked_hash_map;
#[macro_use] #[macro_use]
extern crate log; extern crate log;

View File

@ -0,0 +1,196 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Local Transactions List.
use linked_hash_map::LinkedHashMap;
use transaction::SignedTransaction;
use error::TransactionError;
use util::{U256, H256};
/// Status of local transaction.
/// Can indicate that the transaction is currently part of the queue (`Pending/Future`)
/// or gives a reason why the transaction was removed.
#[derive(Debug, PartialEq, Clone)]
pub enum Status {
/// The transaction is currently in the transaction queue.
Pending,
/// The transaction is in future part of the queue.
Future,
/// Transaction is already mined.
Mined(SignedTransaction),
/// Transaction is dropped because of limit
Dropped(SignedTransaction),
/// Replaced because of higher gas price of another transaction.
Replaced(SignedTransaction, U256, H256),
/// Transaction was never accepted to the queue.
Rejected(SignedTransaction, TransactionError),
/// Transaction is invalid.
Invalid(SignedTransaction),
}
impl Status {
fn is_current(&self) -> bool {
*self == Status::Pending || *self == Status::Future
}
}
/// Keeps track of local transactions that are in the queue or were mined/dropped recently.
#[derive(Debug)]
pub struct LocalTransactionsList {
max_old: usize,
transactions: LinkedHashMap<H256, Status>,
}
impl Default for LocalTransactionsList {
fn default() -> Self {
Self::new(10)
}
}
impl LocalTransactionsList {
pub fn new(max_old: usize) -> Self {
LocalTransactionsList {
max_old: max_old,
transactions: Default::default(),
}
}
pub fn mark_pending(&mut self, hash: H256) {
self.clear_old();
self.transactions.insert(hash, Status::Pending);
}
pub fn mark_future(&mut self, hash: H256) {
self.transactions.insert(hash, Status::Future);
self.clear_old();
}
pub fn mark_rejected(&mut self, tx: SignedTransaction, err: TransactionError) {
self.transactions.insert(tx.hash(), Status::Rejected(tx, err));
self.clear_old();
}
pub fn mark_replaced(&mut self, tx: SignedTransaction, gas_price: U256, hash: H256) {
self.transactions.insert(tx.hash(), Status::Replaced(tx, gas_price, hash));
self.clear_old();
}
pub fn mark_invalid(&mut self, tx: SignedTransaction) {
self.transactions.insert(tx.hash(), Status::Invalid(tx));
self.clear_old();
}
pub fn mark_dropped(&mut self, tx: SignedTransaction) {
self.transactions.insert(tx.hash(), Status::Dropped(tx));
self.clear_old();
}
pub fn mark_mined(&mut self, tx: SignedTransaction) {
self.transactions.insert(tx.hash(), Status::Mined(tx));
self.clear_old();
}
pub fn contains(&self, hash: &H256) -> bool {
self.transactions.contains_key(hash)
}
pub fn all_transactions(&self) -> &LinkedHashMap<H256, Status> {
&self.transactions
}
fn clear_old(&mut self) {
let number_of_old = self.transactions
.values()
.filter(|status| !status.is_current())
.count();
if self.max_old >= number_of_old {
return;
}
let to_remove = self.transactions
.iter()
.filter(|&(_, status)| !status.is_current())
.map(|(hash, _)| *hash)
.take(number_of_old - self.max_old)
.collect::<Vec<_>>();
for hash in to_remove {
self.transactions.remove(&hash);
}
}
}
#[cfg(test)]
mod tests {
use util::U256;
use ethkey::{Random, Generator};
use transaction::{Action, Transaction, SignedTransaction};
use super::{LocalTransactionsList, Status};
#[test]
fn should_add_transaction_as_pending() {
// given
let mut list = LocalTransactionsList::default();
// when
list.mark_pending(10.into());
list.mark_future(20.into());
// then
assert!(list.contains(&10.into()), "Should contain the transaction.");
assert!(list.contains(&20.into()), "Should contain the transaction.");
let statuses = list.all_transactions().values().cloned().collect::<Vec<Status>>();
assert_eq!(statuses, vec![Status::Pending, Status::Future]);
}
#[test]
fn should_clear_old_transactions() {
// given
let mut list = LocalTransactionsList::new(1);
let tx1 = new_tx(10.into());
let tx1_hash = tx1.hash();
let tx2 = new_tx(50.into());
let tx2_hash = tx2.hash();
list.mark_pending(10.into());
list.mark_invalid(tx1);
list.mark_dropped(tx2);
assert!(list.contains(&tx2_hash));
assert!(!list.contains(&tx1_hash));
assert!(list.contains(&10.into()));
// when
list.mark_future(15.into());
// then
assert!(list.contains(&10.into()));
assert!(list.contains(&15.into()));
}
fn new_tx(nonce: U256) -> SignedTransaction {
let keypair = Random.generate().unwrap();
Transaction {
action: Action::Create,
value: U256::from(100),
data: Default::default(),
gas: U256::from(10),
gas_price: U256::from(1245),
nonce: nonce
}.sign(keypair.secret(), None)
}
}

View File

@ -24,6 +24,7 @@ use views::{BlockView, HeaderView};
use header::Header; use header::Header;
use state::{State, CleanupMode}; use state::{State, CleanupMode};
use client::{MiningBlockChainClient, Executive, Executed, EnvInfo, TransactOptions, BlockID, CallAnalytics}; use client::{MiningBlockChainClient, Executive, Executed, EnvInfo, TransactOptions, BlockID, CallAnalytics};
use client::TransactionImportResult;
use executive::contract_address; use executive::contract_address;
use block::{ClosedBlock, SealedBlock, IsBlock, Block}; use block::{ClosedBlock, SealedBlock, IsBlock, Block};
use error::*; use error::*;
@ -34,8 +35,8 @@ use engines::Engine;
use miner::{MinerService, MinerStatus, TransactionQueue, PrioritizationStrategy, AccountDetails, TransactionOrigin}; use miner::{MinerService, MinerStatus, TransactionQueue, PrioritizationStrategy, AccountDetails, TransactionOrigin};
use miner::banning_queue::{BanningTransactionQueue, Threshold}; use miner::banning_queue::{BanningTransactionQueue, Threshold};
use miner::work_notify::WorkPoster; use miner::work_notify::WorkPoster;
use client::TransactionImportResult;
use miner::price_info::PriceInfo; use miner::price_info::PriceInfo;
use miner::local_transactions::{Status as LocalTransactionStatus};
use header::BlockNumber; use header::BlockNumber;
/// Different possible definitions for pending transaction set. /// Different possible definitions for pending transaction set.
@ -563,7 +564,7 @@ impl Miner {
prepare_new prepare_new
} }
fn add_transactions_to_queue(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, origin: TransactionOrigin, transaction_queue: &mut BanningTransactionQueue) -> fn add_transactions_to_queue(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, default_origin: TransactionOrigin, transaction_queue: &mut BanningTransactionQueue) ->
Vec<Result<TransactionImportResult, Error>> { Vec<Result<TransactionImportResult, Error>> {
let fetch_account = |a: &Address| AccountDetails { let fetch_account = |a: &Address| AccountDetails {
@ -571,6 +572,10 @@ impl Miner {
balance: chain.latest_balance(a), balance: chain.latest_balance(a),
}; };
let accounts = self.accounts.as_ref()
.and_then(|provider| provider.accounts().ok())
.map(|accounts| accounts.into_iter().collect::<HashSet<_>>());
let schedule = chain.latest_schedule(); let schedule = chain.latest_schedule();
let gas_required = |tx: &SignedTransaction| tx.gas_required(&schedule).into(); let gas_required = |tx: &SignedTransaction| tx.gas_required(&schedule).into();
let best_block_header: Header = ::rlp::decode(&chain.best_block_header()); let best_block_header: Header = ::rlp::decode(&chain.best_block_header());
@ -583,13 +588,22 @@ impl Miner {
} }
} }
) )
.map(|tx| match origin { .map(|tx| {
let origin = accounts.as_ref().and_then(|accounts| {
tx.sender().ok().and_then(|sender| match accounts.contains(&sender) {
true => Some(TransactionOrigin::Local),
false => None,
})
}).unwrap_or(default_origin);
match origin {
TransactionOrigin::Local | TransactionOrigin::RetractedBlock => { TransactionOrigin::Local | TransactionOrigin::RetractedBlock => {
transaction_queue.add(tx, origin, &fetch_account, &gas_required) transaction_queue.add(tx, origin, &fetch_account, &gas_required)
}, },
TransactionOrigin::External => { TransactionOrigin::External => {
transaction_queue.add_with_banlist(tx, &fetch_account, &gas_required) transaction_queue.add_with_banlist(tx, &fetch_account, &gas_required)
} }
}
}) })
.collect() .collect()
} }
@ -863,6 +877,14 @@ impl MinerService for Miner {
queue.top_transactions() queue.top_transactions()
} }
fn local_transactions(&self) -> BTreeMap<H256, LocalTransactionStatus> {
let queue = self.transaction_queue.lock();
queue.local_transactions()
.iter()
.map(|(hash, status)| (*hash, status.clone()))
.collect()
}
fn pending_transactions(&self, best_block: BlockNumber) -> Vec<SignedTransaction> { fn pending_transactions(&self, best_block: BlockNumber) -> Vec<SignedTransaction> {
let queue = self.transaction_queue.lock(); let queue = self.transaction_queue.lock();
match self.options.pending_set { match self.options.pending_set {

View File

@ -41,16 +41,18 @@
//! } //! }
//! ``` //! ```
mod miner;
mod external;
mod transaction_queue;
mod banning_queue; mod banning_queue;
mod work_notify; mod external;
mod local_transactions;
mod miner;
mod price_info; mod price_info;
mod transaction_queue;
mod work_notify;
pub use self::transaction_queue::{TransactionQueue, PrioritizationStrategy, AccountDetails, TransactionOrigin};
pub use self::miner::{Miner, MinerOptions, Banning, PendingSet, GasPricer, GasPriceCalibratorOptions, GasLimit};
pub use self::external::{ExternalMiner, ExternalMinerService}; pub use self::external::{ExternalMiner, ExternalMinerService};
pub use self::miner::{Miner, MinerOptions, Banning, PendingSet, GasPricer, GasPriceCalibratorOptions, GasLimit};
pub use self::transaction_queue::{TransactionQueue, PrioritizationStrategy, AccountDetails, TransactionOrigin};
pub use self::local_transactions::{Status as LocalTransactionStatus};
pub use client::TransactionImportResult; pub use client::TransactionImportResult;
use std::collections::BTreeMap; use std::collections::BTreeMap;
@ -145,6 +147,9 @@ pub trait MinerService : Send + Sync {
/// Get a list of all pending transactions. /// Get a list of all pending transactions.
fn pending_transactions(&self, best_block: BlockNumber) -> Vec<SignedTransaction>; fn pending_transactions(&self, best_block: BlockNumber) -> Vec<SignedTransaction>;
/// Get a list of local transactions with statuses.
fn local_transactions(&self) -> BTreeMap<H256, LocalTransactionStatus>;
/// Get a list of all pending receipts. /// Get a list of all pending receipts.
fn pending_receipts(&self, best_block: BlockNumber) -> BTreeMap<H256, Receipt>; fn pending_receipts(&self, best_block: BlockNumber) -> BTreeMap<H256, Receipt>;

View File

@ -86,11 +86,13 @@ use std::ops::Deref;
use std::cmp::Ordering; use std::cmp::Ordering;
use std::cmp; use std::cmp;
use std::collections::{HashSet, HashMap, BTreeSet, BTreeMap}; use std::collections::{HashSet, HashMap, BTreeSet, BTreeMap};
use linked_hash_map::LinkedHashMap;
use util::{Address, H256, Uint, U256}; use util::{Address, H256, Uint, U256};
use util::table::Table; use util::table::Table;
use transaction::*; use transaction::*;
use error::{Error, TransactionError}; use error::{Error, TransactionError};
use client::TransactionImportResult; use client::TransactionImportResult;
use miner::local_transactions::{LocalTransactionsList, Status as LocalTransactionStatus};
/// Transaction origin /// Transaction origin
#[derive(Clone, Copy, Debug, PartialEq, Eq)] #[derive(Clone, Copy, Debug, PartialEq, Eq)]
@ -125,6 +127,12 @@ impl Ord for TransactionOrigin {
} }
} }
impl TransactionOrigin {
fn is_local(&self) -> bool {
*self == TransactionOrigin::Local
}
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
/// Light structure used to identify transaction and its order /// Light structure used to identify transaction and its order
struct TransactionOrder { struct TransactionOrder {
@ -201,17 +209,16 @@ impl Ord for TransactionOrder {
return self.penalties.cmp(&b.penalties); return self.penalties.cmp(&b.penalties);
} }
// First check nonce_height
if self.nonce_height != b.nonce_height {
return self.nonce_height.cmp(&b.nonce_height);
}
// Local transactions should always have priority // Local transactions should always have priority
// NOTE nonce has to be checked first, cause otherwise the order might be wrong.
if self.origin != b.origin { if self.origin != b.origin {
return self.origin.cmp(&b.origin); return self.origin.cmp(&b.origin);
} }
// Check nonce_height
if self.nonce_height != b.nonce_height {
return self.nonce_height.cmp(&b.nonce_height);
}
match self.strategy { match self.strategy {
PrioritizationStrategy::GasAndGasPrice => { PrioritizationStrategy::GasAndGasPrice => {
if self.gas != b.gas { if self.gas != b.gas {
@ -242,6 +249,7 @@ impl Ord for TransactionOrder {
} }
/// Verified transaction (with sender) /// Verified transaction (with sender)
#[derive(Debug)]
struct VerifiedTransaction { struct VerifiedTransaction {
/// Transaction /// Transaction
transaction: SignedTransaction, transaction: SignedTransaction,
@ -352,7 +360,7 @@ impl TransactionSet {
/// ///
/// It drops transactions from this set but also removes associated `VerifiedTransaction`. /// It drops transactions from this set but also removes associated `VerifiedTransaction`.
/// Returns addresses and lowest nonces of transactions removed because of limit. /// Returns addresses and lowest nonces of transactions removed because of limit.
fn enforce_limit(&mut self, by_hash: &mut HashMap<H256, VerifiedTransaction>) -> Option<HashMap<Address, U256>> { fn enforce_limit(&mut self, by_hash: &mut HashMap<H256, VerifiedTransaction>, local: &mut LocalTransactionsList) -> Option<HashMap<Address, U256>> {
let mut count = 0; let mut count = 0;
let mut gas: U256 = 0.into(); let mut gas: U256 = 0.into();
let to_drop : Vec<(Address, U256)> = { let to_drop : Vec<(Address, U256)> = {
@ -379,9 +387,13 @@ impl TransactionSet {
.expect("Transaction has just been found in `by_priority`; so it is in `by_address` also."); .expect("Transaction has just been found in `by_priority`; so it is in `by_address` also.");
trace!(target: "txqueue", "Dropped out of limit transaction: {:?}", order.hash); trace!(target: "txqueue", "Dropped out of limit transaction: {:?}", order.hash);
by_hash.remove(&order.hash) let order = by_hash.remove(&order.hash)
.expect("hash is in `by_priorty`; all hashes in `by_priority` must be in `by_hash`; qed"); .expect("hash is in `by_priorty`; all hashes in `by_priority` must be in `by_hash`; qed");
if order.origin.is_local() {
local.mark_dropped(order.transaction);
}
let min = removed.get(&sender).map_or(nonce, |val| cmp::min(*val, nonce)); let min = removed.get(&sender).map_or(nonce, |val| cmp::min(*val, nonce));
removed.insert(sender, min); removed.insert(sender, min);
removed removed
@ -488,6 +500,8 @@ pub struct TransactionQueue {
by_hash: HashMap<H256, VerifiedTransaction>, by_hash: HashMap<H256, VerifiedTransaction>,
/// Last nonce of transaction in current (to quickly check next expected transaction) /// Last nonce of transaction in current (to quickly check next expected transaction)
last_nonces: HashMap<Address, U256>, last_nonces: HashMap<Address, U256>,
/// List of local transactions and their statuses.
local_transactions: LocalTransactionsList,
} }
impl Default for TransactionQueue { impl Default for TransactionQueue {
@ -529,6 +543,7 @@ impl TransactionQueue {
future: future, future: future,
by_hash: HashMap::new(), by_hash: HashMap::new(),
last_nonces: HashMap::new(), last_nonces: HashMap::new(),
local_transactions: LocalTransactionsList::default(),
} }
} }
@ -537,8 +552,8 @@ impl TransactionQueue {
self.current.set_limit(limit); self.current.set_limit(limit);
self.future.set_limit(limit); self.future.set_limit(limit);
// And ensure the limits // And ensure the limits
self.current.enforce_limit(&mut self.by_hash); self.current.enforce_limit(&mut self.by_hash, &mut self.local_transactions);
self.future.enforce_limit(&mut self.by_hash); self.future.enforce_limit(&mut self.by_hash, &mut self.local_transactions);
} }
/// Returns current limit of transactions in the queue. /// Returns current limit of transactions in the queue.
@ -578,7 +593,7 @@ impl TransactionQueue {
pub fn set_total_gas_limit(&mut self, gas_limit: U256) { pub fn set_total_gas_limit(&mut self, gas_limit: U256) {
self.future.gas_limit = gas_limit; self.future.gas_limit = gas_limit;
self.current.gas_limit = gas_limit; self.current.gas_limit = gas_limit;
self.future.enforce_limit(&mut self.by_hash); self.future.enforce_limit(&mut self.by_hash, &mut self.local_transactions);
} }
/// Set the new limit for the amount of gas any individual transaction may have. /// Set the new limit for the amount of gas any individual transaction may have.
@ -609,6 +624,46 @@ impl TransactionQueue {
F: Fn(&Address) -> AccountDetails, F: Fn(&Address) -> AccountDetails,
G: Fn(&SignedTransaction) -> U256, G: Fn(&SignedTransaction) -> U256,
{ {
if origin == TransactionOrigin::Local {
let hash = tx.hash();
let cloned_tx = tx.clone();
let result = self.add_internal(tx, origin, fetch_account, gas_estimator);
match result {
Ok(TransactionImportResult::Current) => {
self.local_transactions.mark_pending(hash);
},
Ok(TransactionImportResult::Future) => {
self.local_transactions.mark_future(hash);
},
Err(Error::Transaction(ref err)) => {
// Sometimes transactions are re-imported, so
// don't overwrite transactions if they are already on the list
if !self.local_transactions.contains(&cloned_tx.hash()) {
self.local_transactions.mark_rejected(cloned_tx, err.clone());
}
},
Err(_) => {
self.local_transactions.mark_invalid(cloned_tx);
},
}
result
} else {
self.add_internal(tx, origin, fetch_account, gas_estimator)
}
}
/// Adds signed transaction to the queue.
fn add_internal<F, G>(
&mut self,
tx: SignedTransaction,
origin: TransactionOrigin,
fetch_account: &F,
gas_estimator: &G,
) -> Result<TransactionImportResult, Error> where
F: Fn(&Address) -> AccountDetails,
G: Fn(&SignedTransaction) -> U256,
{
if tx.gas_price < self.minimal_gas_price && origin != TransactionOrigin::Local { if tx.gas_price < self.minimal_gas_price && origin != TransactionOrigin::Local {
trace!(target: "txqueue", trace!(target: "txqueue",
@ -647,7 +702,6 @@ impl TransactionQueue {
self.gas_limit, self.gas_limit,
self.tx_gas_limit self.tx_gas_limit
); );
return Err(Error::Transaction(TransactionError::GasLimitExceeded { return Err(Error::Transaction(TransactionError::GasLimitExceeded {
limit: self.gas_limit, limit: self.gas_limit,
got: tx.gas, got: tx.gas,
@ -722,6 +776,12 @@ impl TransactionQueue {
None => return, None => return,
Some(t) => t, Some(t) => t,
}; };
// Never penalize local transactions
if transaction.origin.is_local() {
return;
}
let sender = transaction.sender(); let sender = transaction.sender();
// Penalize all transactions from this sender // Penalize all transactions from this sender
@ -766,6 +826,11 @@ impl TransactionQueue {
trace!(target: "txqueue", "Removing invalid transaction: {:?}", transaction.hash()); trace!(target: "txqueue", "Removing invalid transaction: {:?}", transaction.hash());
// Mark in locals
if self.local_transactions.contains(transaction_hash) {
self.local_transactions.mark_invalid(transaction.transaction.clone());
}
// Remove from future // Remove from future
let order = self.future.drop(&sender, &nonce); let order = self.future.drop(&sender, &nonce);
if order.is_some() { if order.is_some() {
@ -788,6 +853,33 @@ impl TransactionQueue {
} }
} }
/// Marks all transactions from particular sender as local transactions
fn mark_transactions_local(&mut self, sender: &Address) {
fn mark_local<F: FnMut(H256)>(sender: &Address, set: &mut TransactionSet, mut mark: F) {
// Mark all transactions from this sender as local
let nonces_from_sender = set.by_address.row(sender)
.map(|row_map| {
row_map.iter().filter_map(|(nonce, order)| if order.origin.is_local() {
None
} else {
Some(*nonce)
}).collect::<Vec<U256>>()
})
.unwrap_or_else(Vec::new);
for k in nonces_from_sender {
let mut order = set.drop(sender, &k).expect("transaction known to be in self.current/self.future; qed");
order.origin = TransactionOrigin::Local;
mark(order.hash);
set.insert(*sender, k, order);
}
}
let local = &mut self.local_transactions;
mark_local(sender, &mut self.current, |hash| local.mark_pending(hash));
mark_local(sender, &mut self.future, |hash| local.mark_future(hash));
}
/// Update height of all transactions in future transactions set. /// Update height of all transactions in future transactions set.
fn update_future(&mut self, sender: &Address, current_nonce: U256) { fn update_future(&mut self, sender: &Address, current_nonce: U256) {
// We need to drain all transactions for current sender from future and reinsert them with updated height // We need to drain all transactions for current sender from future and reinsert them with updated height
@ -821,15 +913,21 @@ impl TransactionQueue {
qed"); qed");
if k >= current_nonce { if k >= current_nonce {
let order = order.update_height(k, current_nonce); let order = order.update_height(k, current_nonce);
if order.origin.is_local() {
self.local_transactions.mark_future(order.hash);
}
if let Some(old) = self.future.insert(*sender, k, order.clone()) { if let Some(old) = self.future.insert(*sender, k, order.clone()) {
Self::replace_orders(*sender, k, old, order, &mut self.future, &mut self.by_hash); Self::replace_orders(*sender, k, old, order, &mut self.future, &mut self.by_hash, &mut self.local_transactions);
} }
} else { } else {
trace!(target: "txqueue", "Removing old transaction: {:?} (nonce: {} < {})", order.hash, k, current_nonce); trace!(target: "txqueue", "Removing old transaction: {:?} (nonce: {} < {})", order.hash, k, current_nonce);
self.by_hash.remove(&order.hash).expect("All transactions in `future` are also in `by_hash`"); let tx = self.by_hash.remove(&order.hash).expect("All transactions in `future` are also in `by_hash`");
if tx.origin.is_local() {
self.local_transactions.mark_mined(tx.transaction);
} }
} }
self.future.enforce_limit(&mut self.by_hash); }
self.future.enforce_limit(&mut self.by_hash, &mut self.local_transactions);
} }
/// Returns top transactions from the queue ordered by priority. /// Returns top transactions from the queue ordered by priority.
@ -841,6 +939,11 @@ impl TransactionQueue {
.collect() .collect()
} }
/// Returns local transactions (some of them might not be part of the queue anymore).
pub fn local_transactions(&self) -> &LinkedHashMap<H256, LocalTransactionStatus> {
self.local_transactions.all_transactions()
}
#[cfg(test)] #[cfg(test)]
fn future_transactions(&self) -> Vec<SignedTransaction> { fn future_transactions(&self) -> Vec<SignedTransaction> {
self.future.by_priority self.future.by_priority
@ -897,8 +1000,11 @@ impl TransactionQueue {
self.future.by_gas_price.remove(&order.gas_price, &order.hash); self.future.by_gas_price.remove(&order.gas_price, &order.hash);
// Put to current // Put to current
let order = order.update_height(current_nonce, first_nonce); let order = order.update_height(current_nonce, first_nonce);
if order.origin.is_local() {
self.local_transactions.mark_pending(order.hash);
}
if let Some(old) = self.current.insert(address, current_nonce, order.clone()) { if let Some(old) = self.current.insert(address, current_nonce, order.clone()) {
Self::replace_orders(address, current_nonce, old, order, &mut self.current, &mut self.by_hash); Self::replace_orders(address, current_nonce, old, order, &mut self.current, &mut self.by_hash, &mut self.local_transactions);
} }
update_last_nonce_to = Some(current_nonce); update_last_nonce_to = Some(current_nonce);
current_nonce = current_nonce + U256::one(); current_nonce = current_nonce + U256::one();
@ -953,13 +1059,19 @@ impl TransactionQueue {
.cloned() .cloned()
.map_or(state_nonce, |n| n + U256::one()); .map_or(state_nonce, |n| n + U256::one());
if tx.origin.is_local() {
self.mark_transactions_local(&address);
}
// Future transaction // Future transaction
if nonce > next_nonce { if nonce > next_nonce {
// We have a gap - put to future. // We have a gap - put to future.
// Insert transaction (or replace old one with lower gas price) // Insert transaction (or replace old one with lower gas price)
try!(check_too_cheap(Self::replace_transaction(tx, state_nonce, min_gas_price, &mut self.future, &mut self.by_hash))); try!(check_too_cheap(
Self::replace_transaction(tx, state_nonce, min_gas_price, &mut self.future, &mut self.by_hash, &mut self.local_transactions)
));
// Enforce limit in Future // Enforce limit in Future
let removed = self.future.enforce_limit(&mut self.by_hash); let removed = self.future.enforce_limit(&mut self.by_hash, &mut self.local_transactions);
// Return an error if this transaction was not imported because of limit. // Return an error if this transaction was not imported because of limit.
try!(check_if_removed(&address, &nonce, removed)); try!(check_if_removed(&address, &nonce, removed));
@ -973,13 +1085,15 @@ impl TransactionQueue {
self.move_matching_future_to_current(address, nonce + U256::one(), state_nonce); self.move_matching_future_to_current(address, nonce + U256::one(), state_nonce);
// Replace transaction if any // Replace transaction if any
try!(check_too_cheap(Self::replace_transaction(tx, state_nonce, min_gas_price, &mut self.current, &mut self.by_hash))); try!(check_too_cheap(
Self::replace_transaction(tx, state_nonce, min_gas_price, &mut self.current, &mut self.by_hash, &mut self.local_transactions)
));
// Keep track of highest nonce stored in current // Keep track of highest nonce stored in current
let new_max = self.last_nonces.get(&address).map_or(nonce, |n| cmp::max(nonce, *n)); let new_max = self.last_nonces.get(&address).map_or(nonce, |n| cmp::max(nonce, *n));
self.last_nonces.insert(address, new_max); self.last_nonces.insert(address, new_max);
// Also enforce the limit // Also enforce the limit
let removed = self.current.enforce_limit(&mut self.by_hash); let removed = self.current.enforce_limit(&mut self.by_hash, &mut self.local_transactions);
// If some transaction were removed because of limit we need to update last_nonces also. // If some transaction were removed because of limit we need to update last_nonces also.
self.update_last_nonces(&removed); self.update_last_nonces(&removed);
// Trigger error if the transaction we are importing was removed. // Trigger error if the transaction we are importing was removed.
@ -1010,7 +1124,14 @@ impl TransactionQueue {
/// ///
/// Returns `true` if transaction actually got to the queue (`false` if there was already a transaction with higher /// Returns `true` if transaction actually got to the queue (`false` if there was already a transaction with higher
/// gas_price) /// gas_price)
fn replace_transaction(tx: VerifiedTransaction, base_nonce: U256, min_gas_price: (U256, PrioritizationStrategy), set: &mut TransactionSet, by_hash: &mut HashMap<H256, VerifiedTransaction>) -> bool { fn replace_transaction(
tx: VerifiedTransaction,
base_nonce: U256,
min_gas_price: (U256, PrioritizationStrategy),
set: &mut TransactionSet,
by_hash: &mut HashMap<H256, VerifiedTransaction>,
local: &mut LocalTransactionsList,
) -> bool {
let order = TransactionOrder::for_transaction(&tx, base_nonce, min_gas_price.0, min_gas_price.1); let order = TransactionOrder::for_transaction(&tx, base_nonce, min_gas_price.0, min_gas_price.1);
let hash = tx.hash(); let hash = tx.hash();
let address = tx.sender(); let address = tx.sender();
@ -1019,16 +1140,27 @@ impl TransactionQueue {
let old_hash = by_hash.insert(hash, tx); let old_hash = by_hash.insert(hash, tx);
assert!(old_hash.is_none(), "Each hash has to be inserted exactly once."); assert!(old_hash.is_none(), "Each hash has to be inserted exactly once.");
trace!(target: "txqueue", "Inserting: {:?}", order);
if let Some(old) = set.insert(address, nonce, order.clone()) { if let Some(old) = set.insert(address, nonce, order.clone()) {
Self::replace_orders(address, nonce, old, order, set, by_hash) Self::replace_orders(address, nonce, old, order, set, by_hash, local)
} else { } else {
true true
} }
} }
fn replace_orders(address: Address, nonce: U256, old: TransactionOrder, order: TransactionOrder, set: &mut TransactionSet, by_hash: &mut HashMap<H256, VerifiedTransaction>) -> bool { fn replace_orders(
address: Address,
nonce: U256,
old: TransactionOrder,
order: TransactionOrder,
set: &mut TransactionSet,
by_hash: &mut HashMap<H256, VerifiedTransaction>,
local: &mut LocalTransactionsList,
) -> bool {
// There was already transaction in queue. Let's check which one should stay // There was already transaction in queue. Let's check which one should stay
let old_hash = old.hash;
let new_hash = order.hash;
let old_fee = old.gas_price; let old_fee = old.gas_price;
let new_fee = order.gas_price; let new_fee = order.gas_price;
if old_fee.cmp(&new_fee) == Ordering::Greater { if old_fee.cmp(&new_fee) == Ordering::Greater {
@ -1036,12 +1168,18 @@ impl TransactionQueue {
// Put back old transaction since it has greater priority (higher gas_price) // Put back old transaction since it has greater priority (higher gas_price)
set.insert(address, nonce, old); set.insert(address, nonce, old);
// and remove new one // and remove new one
by_hash.remove(&order.hash).expect("The hash has been just inserted and no other line is altering `by_hash`."); let order = by_hash.remove(&order.hash).expect("The hash has been just inserted and no other line is altering `by_hash`.");
if order.origin.is_local() {
local.mark_replaced(order.transaction, old_fee, old_hash);
}
false false
} else { } else {
trace!(target: "txqueue", "Replaced transaction: {:?} with transaction with higher gas price: {:?}", old.hash, order.hash); trace!(target: "txqueue", "Replaced transaction: {:?} with transaction with higher gas price: {:?}", old.hash, order.hash);
// Make sure we remove old transaction entirely // Make sure we remove old transaction entirely
by_hash.remove(&old.hash).expect("The hash is coming from `future` so it has to be in `by_hash`."); let old = by_hash.remove(&old.hash).expect("The hash is coming from `future` so it has to be in `by_hash`.");
if old.origin.is_local() {
local.mark_replaced(old.transaction, new_fee, new_hash);
}
true true
} }
} }
@ -1078,6 +1216,7 @@ mod test {
use error::{Error, TransactionError}; use error::{Error, TransactionError};
use super::*; use super::*;
use super::{TransactionSet, TransactionOrder, VerifiedTransaction}; use super::{TransactionSet, TransactionOrder, VerifiedTransaction};
use miner::local_transactions::LocalTransactionsList;
use client::TransactionImportResult; use client::TransactionImportResult;
fn unwrap_tx_err(err: Result<TransactionImportResult, Error>) -> TransactionError { fn unwrap_tx_err(err: Result<TransactionImportResult, Error>) -> TransactionError {
@ -1208,6 +1347,7 @@ mod test {
#[test] #[test]
fn should_create_transaction_set() { fn should_create_transaction_set() {
// given // given
let mut local = LocalTransactionsList::default();
let mut set = TransactionSet { let mut set = TransactionSet {
by_priority: BTreeSet::new(), by_priority: BTreeSet::new(),
by_address: Table::new(), by_address: Table::new(),
@ -1235,7 +1375,7 @@ mod test {
assert_eq!(set.by_address.len(), 2); assert_eq!(set.by_address.len(), 2);
// when // when
set.enforce_limit(&mut by_hash); set.enforce_limit(&mut by_hash, &mut local);
// then // then
assert_eq!(by_hash.len(), 1); assert_eq!(by_hash.len(), 1);
@ -1628,6 +1768,31 @@ mod test {
assert_eq!(top.len(), 2); assert_eq!(top.len(), 2);
} }
#[test]
fn when_importing_local_should_mark_others_from_the_same_sender_as_local() {
// given
let mut txq = TransactionQueue::default();
let (tx1, tx2) = new_tx_pair_default(1.into(), 0.into());
// the second one has same nonce but higher `gas_price`
let (_, tx0) = new_similar_tx_pair();
txq.add(tx0.clone(), TransactionOrigin::External, &default_account_details, &gas_estimator).unwrap();
txq.add(tx1.clone(), TransactionOrigin::External, &default_account_details, &gas_estimator).unwrap();
// the one with higher gas price is first
assert_eq!(txq.top_transactions()[0], tx0);
assert_eq!(txq.top_transactions()[1], tx1);
// when
// insert second as local
txq.add(tx2.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap();
// then
// the order should be updated
assert_eq!(txq.top_transactions()[0], tx1);
assert_eq!(txq.top_transactions()[1], tx2);
assert_eq!(txq.top_transactions()[2], tx0);
}
#[test] #[test]
fn should_prioritize_reimported_transactions_within_same_nonce_height() { fn should_prioritize_reimported_transactions_within_same_nonce_height() {
// given // given
@ -1695,6 +1860,38 @@ mod test {
assert_eq!(top.len(), 4); assert_eq!(top.len(), 4);
} }
#[test]
fn should_not_penalize_local_transactions() {
// given
let mut txq = TransactionQueue::default();
// txa, txb - slightly bigger gas price to have consistent ordering
let (txa, txb) = new_tx_pair_default(1.into(), 0.into());
let (tx1, tx2) = new_tx_pair_with_gas_price_increment(3.into());
// insert everything
txq.add(txa.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap();
txq.add(txb.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap();
txq.add(tx1.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap();
txq.add(tx2.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap();
let top = txq.top_transactions();
assert_eq!(top[0], tx1);
assert_eq!(top[1], txa);
assert_eq!(top[2], tx2);
assert_eq!(top[3], txb);
assert_eq!(top.len(), 4);
// when
txq.penalize(&tx1.hash());
// then (order is the same)
let top = txq.top_transactions();
assert_eq!(top[0], tx1);
assert_eq!(top[1], txa);
assert_eq!(top[2], tx2);
assert_eq!(top[3], txb);
assert_eq!(top.len(), 4);
}
#[test] #[test]
fn should_penalize_transactions_from_sender() { fn should_penalize_transactions_from_sender() {
@ -1940,12 +2137,11 @@ mod test {
let mut txq = TransactionQueue::with_limits(PrioritizationStrategy::GasPriceOnly, 100, default_gas_val() * U256::from(2), !U256::zero()); let mut txq = TransactionQueue::with_limits(PrioritizationStrategy::GasPriceOnly, 100, default_gas_val() * U256::from(2), !U256::zero());
let (tx1, tx2) = new_tx_pair_default(U256::from(1), U256::from(1)); let (tx1, tx2) = new_tx_pair_default(U256::from(1), U256::from(1));
let (tx3, tx4) = new_tx_pair_default(U256::from(1), U256::from(2)); let (tx3, tx4) = new_tx_pair_default(U256::from(1), U256::from(2));
let (tx5, tx6) = new_tx_pair_default(U256::from(1), U256::from(2)); let (tx5, _) = new_tx_pair_default(U256::from(1), U256::from(2));
txq.add(tx1.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap(); txq.add(tx1.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap();
txq.add(tx2.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap(); txq.add(tx2.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap();
txq.add(tx5.clone(), TransactionOrigin::External, &default_account_details, &gas_estimator).unwrap();
// Not accepted because of limit // Not accepted because of limit
txq.add(tx6.clone(), TransactionOrigin::External, &default_account_details, &gas_estimator).unwrap_err(); txq.add(tx5.clone(), TransactionOrigin::External, &default_account_details, &gas_estimator).unwrap_err();
txq.add(tx3.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap(); txq.add(tx3.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap();
txq.add(tx4.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap(); txq.add(tx4.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap();
assert_eq!(txq.status().pending, 4); assert_eq!(txq.status().pending, 4);

View File

@ -16,7 +16,7 @@
//! Binary representation of types //! Binary representation of types
use util::{U256, U512, H256, H2048, Address}; use util::{U256, U512, H256, H512, H2048, Address};
use std::mem; use std::mem;
use std::collections::{VecDeque, BTreeMap}; use std::collections::{VecDeque, BTreeMap};
use std::ops::Range; use std::ops::Range;
@ -800,6 +800,7 @@ binary_fixed_size!(bool);
binary_fixed_size!(U256); binary_fixed_size!(U256);
binary_fixed_size!(U512); binary_fixed_size!(U512);
binary_fixed_size!(H256); binary_fixed_size!(H256);
binary_fixed_size!(H512);
binary_fixed_size!(H2048); binary_fixed_size!(H2048);
binary_fixed_size!(Address); binary_fixed_size!(Address);
binary_fixed_size!(BinHandshake); binary_fixed_size!(BinHandshake);

View File

@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
import { inAddress, inData, inHex, inNumber16, inOptions } from '../../format/input'; import { inAddress, inData, inHex, inNumber16, inOptions } from '../../format/input';
import { outAccountInfo, outAddress, outHistogram, outNumber, outPeers } from '../../format/output'; import { outAccountInfo, outAddress, outHistogram, outNumber, outPeers, outTransaction } from '../../format/output';
export default class Parity { export default class Parity {
constructor (transport) { constructor (transport) {
@ -117,16 +117,29 @@ export default class Parity {
.execute('parity_hashContent', url); .execute('parity_hashContent', url);
} }
importGethAccounts (accounts) {
return this._transport
.execute('parity_importGethAccounts', (accounts || []).map(inAddress))
.then((accounts) => (accounts || []).map(outAddress));
}
listGethAccounts () { listGethAccounts () {
return this._transport return this._transport
.execute('parity_listGethAccounts') .execute('parity_listGethAccounts')
.then((accounts) => (accounts || []).map(outAddress)); .then((accounts) => (accounts || []).map(outAddress));
} }
importGethAccounts (accounts) { localTransactions () {
return this._transport return this._transport
.execute('parity_importGethAccounts', (accounts || []).map(inAddress)) .execute('parity_localTransactions')
.then((accounts) => (accounts || []).map(outAddress)); .then(transactions => {
Object.values(transactions)
.filter(tx => tx.transaction)
.map(tx => {
tx.transaction = outTransaction(tx.transaction);
});
return transactions;
});
} }
minGasPrice () { minGasPrice () {
@ -192,6 +205,17 @@ export default class Parity {
.execute('parity_nodeName'); .execute('parity_nodeName');
} }
pendingTransactions () {
return this._transport
.execute('parity_pendingTransactions')
.then(data => data.map(outTransaction));
}
pendingTransactionsStats () {
return this._transport
.execute('parity_pendingTransactionsStats');
}
phraseToAddress (phrase) { phraseToAddress (phrase) {
return this._transport return this._transport
.execute('parity_phraseToAddress', phrase) .execute('parity_phraseToAddress', phrase)

17
js/src/dapps/localtx.html Normal file
View File

@ -0,0 +1,17 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<link rel="icon" href="/parity-logo-black-no-text.png" type="image/png">
<title>Local transactions Viewer</title>
</head>
<body>
<div id="container"></div>
<script src="vendor.js"></script>
<script src="commons.js"></script>
<script src="/parity-utils/parity.js"></script>
<script src="localtx.js"></script>
</body>
</html>

33
js/src/dapps/localtx.js Normal file
View File

@ -0,0 +1,33 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
import ReactDOM from 'react-dom';
import React from 'react';
import injectTapEventPlugin from 'react-tap-event-plugin';
injectTapEventPlugin();
import Application from './localtx/Application';
import '../../assets/fonts/Roboto/font.css';
import '../../assets/fonts/RobotoMono/font.css';
import './style.css';
import './localtx.html';
ReactDOM.render(
<Application />,
document.querySelector('#container')
);

View File

@ -0,0 +1,19 @@
.container {
padding: 1rem 2rem;
text-align: center;
h1 {
margin-top: 3rem;
margin-bottom: 1rem;
}
table {
text-align: left;
margin: auto;
max-width: 90vw;
th {
text-align: center;
}
}
}

View File

@ -0,0 +1,203 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
import BigNumber from 'bignumber.js';
import React, { Component } from 'react';
import { api } from '../parity';
import styles from './application.css';
import { Transaction, LocalTransaction } from '../Transaction';
export default class Application extends Component {
state = {
loading: true,
transactions: [],
localTransactions: {},
blockNumber: 0
}
componentDidMount () {
const poll = () => this.fetchTransactionData().then(poll).catch(poll);
this._timeout = setTimeout(poll, 2000);
}
componentWillUnmount () {
clearTimeout(this._timeout);
}
fetchTransactionData () {
return Promise.all([
api.parity.pendingTransactions(),
api.parity.pendingTransactionsStats(),
api.parity.localTransactions(),
api.eth.blockNumber()
]).then(([pending, stats, local, blockNumber]) => {
// Combine results together
const transactions = pending.map(tx => {
return {
transaction: tx,
stats: stats[tx.hash],
isLocal: !!local[tx.hash]
};
});
// Add transaction data to locals
transactions
.filter(tx => tx.isLocal)
.map(data => {
const tx = data.transaction;
local[tx.hash].transaction = tx;
local[tx.hash].stats = data.stats;
});
// Convert local transactions to array
const localTransactions = Object.keys(local).map(hash => {
const data = local[hash];
data.txHash = hash;
return data;
});
// Sort local transactions by nonce (move future to the end)
localTransactions.sort((a, b) => {
a = a.transaction || {};
b = b.transaction || {};
if (a.from && b.from && a.from !== b.from) {
return a.from < b.from;
}
if (!a.nonce || !b.nonce) {
return !a.nonce ? 1 : -1;
}
return new BigNumber(a.nonce).comparedTo(new BigNumber(b.nonce));
});
this.setState({
loading: false,
transactions,
localTransactions,
blockNumber
});
});
}
render () {
const { loading } = this.state;
if (loading) {
return (
<div className={ styles.container }>Loading...</div>
);
}
return (
<div className={ styles.container }>
<h1>Your local transactions</h1>
{ this.renderLocals() }
<h1>Transactions in the queue</h1>
{ this.renderQueueSummary() }
{ this.renderQueue() }
</div>
);
}
renderQueueSummary () {
const { transactions } = this.state;
if (!transactions.length) {
return null;
}
const count = transactions.length;
const locals = transactions.filter(tx => tx.isLocal).length;
const fee = transactions
.map(tx => tx.transaction)
.map(tx => tx.gasPrice.mul(tx.gas))
.reduce((sum, fee) => sum.add(fee), new BigNumber(0));
return (
<h3>
Count: <strong>{ locals ? `${count} (${locals})` : count }</strong>
&nbsp;
Total Fee: <strong>{ api.util.fromWei(fee).toFixed(3) } ETH</strong>
</h3>
);
}
renderQueue () {
const { blockNumber, transactions } = this.state;
if (!transactions.length) {
return (
<h3>The queue seems is empty.</h3>
);
}
return (
<table cellSpacing='0'>
<thead>
{ Transaction.renderHeader() }
</thead>
<tbody>
{
transactions.map((tx, idx) => (
<Transaction
key={ tx.transaction.hash }
idx={ idx + 1 }
isLocal={ tx.isLocal }
transaction={ tx.transaction }
stats={ tx.stats }
blockNumber={ blockNumber }
/>
))
}
</tbody>
</table>
);
}
renderLocals () {
const { localTransactions } = this.state;
if (!localTransactions.length) {
return (
<h3>You haven't sent any transactions yet.</h3>
);
}
return (
<table cellSpacing='0'>
<thead>
{ LocalTransaction.renderHeader() }
</thead>
<tbody>
{
localTransactions.map(tx => (
<LocalTransaction
key={ tx.txHash }
hash={ tx.txHash }
transaction={ tx.transaction }
status={ tx.status }
stats={ tx.stats }
details={ tx }
/>
))
}
</tbody>
</table>
);
}
}

View File

@ -0,0 +1,32 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
import React from 'react';
import { shallow } from 'enzyme';
import '../../../environment/tests';
import Application from './application';
describe('localtx/Application', () => {
describe('rendering', () => {
it('renders without crashing', () => {
const rendered = shallow(<Application />);
expect(rendered).to.be.defined;
});
});
});

View File

@ -0,0 +1,17 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
export default from './application';

View File

@ -0,0 +1,17 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
export { Transaction, LocalTransaction } from './transaction';

View File

@ -0,0 +1,31 @@
.from {
white-space: nowrap;
img {
vertical-align: middle;
}
}
.transaction {
td {
padding: 7px 15px;
}
td:first-child {
padding: 7px 0;
}
&.local {
background: #8bc34a;
}
}
.nowrap {
white-space: nowrap;
}
.edit {
label, input {
display: block;
}
}

View File

@ -0,0 +1,382 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
import BigNumber from 'bignumber.js';
import React, { Component, PropTypes } from 'react';
import classnames from 'classnames';
import { api } from '../parity';
import styles from './transaction.css';
import IdentityIcon from '../../githubhint/IdentityIcon';
class BaseTransaction extends Component {
shortHash (hash) {
return `${hash.substr(0, 5)}..${hash.substr(hash.length - 3)}`;
}
renderHash (hash) {
return (
<code title={ hash }>
{ this.shortHash(hash) }
</code>
);
}
renderFrom (transaction) {
if (!transaction) {
return '-';
}
return (
<div title={ transaction.from } className={ styles.from }>
<IdentityIcon
address={ transaction.from }
/>
0x{ transaction.nonce.toString(16) }
</div>
);
}
renderGasPrice (transaction) {
if (!transaction) {
return '-';
}
return (
<span title={ `${transaction.gasPrice.toFormat(0)} wei` }>
{ api.util.fromWei(transaction.gasPrice, 'shannon').toFormat(2) }&nbsp;shannon
</span>
);
}
renderGas (transaction) {
if (!transaction) {
return '-';
}
return (
<span title={ `${transaction.gas.toFormat(0)} Gas` }>
{ transaction.gas.div(10 ** 6).toFormat(3) }&nbsp;MGas
</span>
);
}
renderPropagation (stats) {
const noOfPeers = Object.keys(stats.propagatedTo).length;
const noOfPropagations = Object.values(stats.propagatedTo).reduce((sum, val) => sum + val, 0);
return (
<span className={ styles.nowrap }>
{ noOfPropagations } ({ noOfPeers } peers)
</span>
);
}
}
export class Transaction extends BaseTransaction {
static propTypes = {
idx: PropTypes.number.isRequired,
transaction: PropTypes.object.isRequired,
blockNumber: PropTypes.object.isRequired,
isLocal: PropTypes.bool,
stats: PropTypes.object
};
static defaultProps = {
isLocal: false,
stats: {
firstSeen: 0,
propagatedTo: {}
}
};
static renderHeader () {
return (
<tr className={ styles.header }>
<th></th>
<th>
Transaction
</th>
<th>
From
</th>
<th>
Gas Price
</th>
<th>
Gas
</th>
<th>
First propagation
</th>
<th>
# Propagated
</th>
<th>
</th>
</tr>
);
}
render () {
const { isLocal, stats, transaction, idx } = this.props;
const blockNo = new BigNumber(stats.firstSeen);
const clazz = classnames(styles.transaction, {
[styles.local]: isLocal
});
return (
<tr className={ clazz }>
<td>
{ idx }.
</td>
<td>
{ this.renderHash(transaction.hash) }
</td>
<td>
{ this.renderFrom(transaction) }
</td>
<td>
{ this.renderGasPrice(transaction) }
</td>
<td>
{ this.renderGas(transaction) }
</td>
<td title={ blockNo.toFormat(0) }>
{ this.renderTime(stats.firstSeen) }
</td>
<td>
{ this.renderPropagation(stats) }
</td>
</tr>
);
}
renderTime (firstSeen) {
const { blockNumber } = this.props;
if (!firstSeen) {
return 'never';
}
const timeInMinutes = blockNumber.sub(firstSeen).mul(14).div(60).toFormat(1);
return `${timeInMinutes} minutes ago`;
}
}
export class LocalTransaction extends BaseTransaction {
static propTypes = {
hash: PropTypes.string.isRequired,
status: PropTypes.string.isRequired,
transaction: PropTypes.object,
isLocal: PropTypes.bool,
stats: PropTypes.object,
details: PropTypes.object
};
static defaultProps = {
stats: {
propagatedTo: {}
}
};
static renderHeader () {
return (
<tr className={ styles.header }>
<th></th>
<th>
Transaction
</th>
<th>
From
</th>
<th>
Gas Price / Gas
</th>
<th>
Status
</th>
</tr>
);
}
state = {
isSending: false,
isResubmitting: false,
gasPrice: null,
gas: null
};
toggleResubmit = () => {
const { transaction } = this.props;
const { isResubmitting, gasPrice } = this.state;
this.setState({
isResubmitting: !isResubmitting
});
if (gasPrice === null) {
this.setState({
gasPrice: `0x${transaction.gasPrice.toString(16)}`,
gas: `0x${transaction.gas.toString(16)}`
});
}
};
setGasPrice = el => {
this.setState({
gasPrice: el.target.value
});
};
setGas = el => {
this.setState({
gas: el.target.value
});
};
sendTransaction = () => {
const { transaction } = this.props;
const { gasPrice, gas } = this.state;
const newTransaction = {
from: transaction.from,
to: transaction.to,
nonce: transaction.nonce,
value: transaction.value,
data: transaction.data,
gasPrice, gas
};
this.setState({
isResubmitting: false,
isSending: true
});
const closeSending = () => this.setState({
isSending: false,
gasPrice: null,
gas: null
});
api.eth.sendTransaction(newTransaction)
.then(closeSending)
.catch(closeSending);
};
render () {
if (this.state.isResubmitting) {
return this.renderResubmit();
}
const { stats, transaction, hash, status } = this.props;
const { isSending } = this.state;
const resubmit = isSending ? (
'sending...'
) : (
<a href='javascript:void' onClick={ this.toggleResubmit }>
resubmit
</a>
);
return (
<tr className={ styles.transaction }>
<td>
{ !transaction ? null : resubmit }
</td>
<td>
{ this.renderHash(hash) }
</td>
<td>
{ this.renderFrom(transaction) }
</td>
<td>
{ this.renderGasPrice(transaction) }
<br />
{ this.renderGas(transaction) }
</td>
<td>
{ this.renderStatus() }
<br />
{ status === 'pending' ? this.renderPropagation(stats) : null }
</td>
</tr>
);
}
renderStatus () {
const { details } = this.props;
let state = {
'pending': () => 'In queue: Pending',
'future': () => 'In queue: Future',
'mined': () => 'Mined',
'dropped': () => 'Dropped because of queue limit',
'invalid': () => 'Transaction is invalid',
'rejected': () => `Rejected: ${details.error}`,
'replaced': () => `Replaced by ${this.shortHash(details.hash)}`
}[this.props.status];
return state ? state() : 'unknown';
}
// TODO [ToDr] Gas Price / Gas selection is not needed
// when signer supports gasPrice/gas tunning.
renderResubmit () {
const { transaction } = this.props;
const { gasPrice, gas } = this.state;
return (
<tr className={ styles.transaction }>
<td>
<a href='javascript:void' onClick={ this.toggleResubmit }>
cancel
</a>
</td>
<td>
{ this.renderHash(transaction.hash) }
</td>
<td>
{ this.renderFrom(transaction) }
</td>
<td className={ styles.edit }>
<input
type='text'
value={ gasPrice }
onChange={ this.setGasPrice }
/>
<input
type='text'
value={ gas }
onChange={ this.setGas }
/>
</td>
<td colSpan='2'>
<a href='javascript:void' onClick={ this.sendTransaction }>
Send
</a>
</td>
</tr>
);
}
}

View File

@ -0,0 +1,67 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
import React from 'react';
import { shallow } from 'enzyme';
import '../../../environment/tests';
import EthApi from '../../../api';
// Mock API for tests
import * as Api from '../parity';
Api.api = {
util: EthApi.prototype.util
};
import BigNumber from 'bignumber.js';
import { Transaction, LocalTransaction } from './transaction';
describe('localtx/Transaction', () => {
describe('rendering', () => {
it('renders without crashing', () => {
const transaction = {
hash: '0x1234567890',
nonce: 15,
gasPrice: new BigNumber(10),
gas: new BigNumber(10)
};
const rendered = shallow(
<Transaction
isLocal={ false }
transaction={ transaction }
blockNumber={ new BigNumber(0) }
/>
);
expect(rendered).to.be.defined;
});
});
});
describe('localtx/LocalTransaction', () => {
describe('rendering', () => {
it('renders without crashing', () => {
const rendered = shallow(
<LocalTransaction
hash={ '0x1234567890' }
status={ 'pending' }
/>
);
expect(rendered).to.be.defined;
});
});
});

View File

@ -0,0 +1,21 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
const api = window.parent.secureApi;
export {
api
};

View File

@ -224,15 +224,6 @@ export default {
} }
}, },
listGethAccounts: {
desc: 'Returns a list of the accounts available from Geth',
params: [],
returns: {
type: Array,
desc: '20 Bytes addresses owned by the client.'
}
},
importGethAccounts: { importGethAccounts: {
desc: 'Imports a list of accounts from geth', desc: 'Imports a list of accounts from geth',
params: [ params: [
@ -247,6 +238,24 @@ export default {
} }
}, },
listGethAccounts: {
desc: 'Returns a list of the accounts available from Geth',
params: [],
returns: {
type: Array,
desc: '20 Bytes addresses owned by the client.'
}
},
localTransactions: {
desc: 'Returns an object of current and past local transactions.',
params: [],
returns: {
type: Object,
desc: 'Mapping of `tx hash` into status object.'
}
},
minGasPrice: { minGasPrice: {
desc: 'Returns currently set minimal gas price', desc: 'Returns currently set minimal gas price',
params: [], params: [],
@ -379,6 +388,24 @@ export default {
} }
}, },
pendingTransactions: {
desc: 'Returns a list of transactions currently in the queue.',
params: [],
returns: {
type: Array,
desc: 'Transactions ordered by priority'
}
},
pendingTransactionsStats: {
desc: 'Returns propagation stats for transactions in the queue',
params: [],
returns: {
type: Object,
desc: 'mapping of `tx hash` into `stats`'
}
},
phraseToAddress: { phraseToAddress: {
desc: 'Converts a secret phrase into the corresponting address', desc: 'Converts a secret phrase into the corresponting address',
params: [ params: [

View File

@ -44,5 +44,14 @@
"version": "1.0.0", "version": "1.0.0",
"visible": false, "visible": false,
"secure": true "secure": true
},
{
"id": "0xae74ad174b95cdbd01c88ac5b73a296d33e9088fc2a200e76bcedf3a94a7815d",
"url": "localtx",
"name": "TxQueue Viewer",
"description": "Have a peak on internals of transaction queue of your node.",
"author": "Parity Team <admin@ethcore.io>",
"version": "1.0.0",
"secure": true
} }
] ]

View File

@ -40,6 +40,7 @@ module.exports = {
'githubhint': ['./dapps/githubhint.js'], 'githubhint': ['./dapps/githubhint.js'],
'registry': ['./dapps/registry.js'], 'registry': ['./dapps/registry.js'],
'signaturereg': ['./dapps/signaturereg.js'], 'signaturereg': ['./dapps/signaturereg.js'],
'localtx': ['./dapps/localtx.js'],
'tokenreg': ['./dapps/tokenreg.js'], 'tokenreg': ['./dapps/tokenreg.js'],
// app // app
'index': ['./index.js'] 'index': ['./index.js']

View File

@ -22,7 +22,7 @@ macro_rules! rpc_unimplemented {
use std::fmt; use std::fmt;
use rlp::DecoderError; use rlp::DecoderError;
use ethcore::error::{Error as EthcoreError, CallError}; use ethcore::error::{Error as EthcoreError, CallError, TransactionError};
use ethcore::account_provider::{Error as AccountError}; use ethcore::account_provider::{Error as AccountError};
use fetch::FetchError; use fetch::FetchError;
use jsonrpc_core::{Error, ErrorCode, Value}; use jsonrpc_core::{Error, ErrorCode, Value};
@ -227,11 +227,10 @@ pub fn from_password_error(error: AccountError) -> Error {
} }
} }
pub fn from_transaction_error(error: EthcoreError) -> Error { pub fn transaction_message(error: TransactionError) -> String {
use ethcore::error::TransactionError::*; use ethcore::error::TransactionError::*;
if let EthcoreError::Transaction(e) = error { match error {
let msg = match e {
AlreadyImported => "Transaction with the same hash was already imported.".into(), AlreadyImported => "Transaction with the same hash was already imported.".into(),
Old => "Transaction nonce is too low. Try incrementing the nonce.".into(), Old => "Transaction nonce is too low. Try incrementing the nonce.".into(),
TooCheapToReplace => { TooCheapToReplace => {
@ -252,15 +251,20 @@ pub fn from_transaction_error(error: EthcoreError) -> Error {
GasLimitExceeded { limit, got } => { GasLimitExceeded { limit, got } => {
format!("Transaction cost exceeds current gas limit. Limit: {}, got: {}. Try decreasing supplied gas.", limit, got) format!("Transaction cost exceeds current gas limit. Limit: {}, got: {}. Try decreasing supplied gas.", limit, got)
}, },
InvalidNetworkId => "Invalid network id.".into(),
InvalidGasLimit(_) => "Supplied gas is beyond limit.".into(), InvalidGasLimit(_) => "Supplied gas is beyond limit.".into(),
SenderBanned => "Sender is banned in local queue.".into(), SenderBanned => "Sender is banned in local queue.".into(),
RecipientBanned => "Recipient is banned in local queue.".into(), RecipientBanned => "Recipient is banned in local queue.".into(),
CodeBanned => "Code is banned in local queue.".into(), CodeBanned => "Code is banned in local queue.".into(),
e => format!("{}", e).into(), }
}; }
pub fn from_transaction_error(error: EthcoreError) -> Error {
if let EthcoreError::Transaction(e) = error {
Error { Error {
code: ErrorCode::ServerError(codes::TRANSACTION_ERROR), code: ErrorCode::ServerError(codes::TRANSACTION_ERROR),
message: msg, message: transaction_message(e),
data: None, data: None,
} }
} else { } else {

View File

@ -34,7 +34,11 @@ use ethcore::account_provider::AccountProvider;
use jsonrpc_core::Error; use jsonrpc_core::Error;
use v1::traits::Parity; use v1::traits::Parity;
use v1::types::{Bytes, U256, H160, H256, H512, Peers, Transaction, RpcSettings, Histogram}; use v1::types::{
Bytes, U256, H160, H256, H512,
Peers, Transaction, RpcSettings, Histogram,
TransactionStats, LocalTransactionStatus,
};
use v1::helpers::{errors, SigningQueue, SignerService, NetworkSettings}; use v1::helpers::{errors, SigningQueue, SignerService, NetworkSettings};
use v1::helpers::dispatch::DEFAULT_MAC; use v1::helpers::dispatch::DEFAULT_MAC;
@ -259,6 +263,27 @@ impl<C, M, S: ?Sized> Parity for ParityClient<C, M, S> where
Ok(take_weak!(self.miner).all_transactions().into_iter().map(Into::into).collect::<Vec<_>>()) Ok(take_weak!(self.miner).all_transactions().into_iter().map(Into::into).collect::<Vec<_>>())
} }
fn pending_transactions_stats(&self) -> Result<BTreeMap<H256, TransactionStats>, Error> {
try!(self.active());
let stats = take_weak!(self.sync).transactions_stats();
Ok(stats.into_iter()
.map(|(hash, stats)| (hash.into(), stats.into()))
.collect()
)
}
fn local_transactions(&self) -> Result<BTreeMap<H256, LocalTransactionStatus>, Error> {
try!(self.active());
let transactions = take_weak!(self.miner).local_transactions();
Ok(transactions
.into_iter()
.map(|(hash, status)| (hash.into(), status.into()))
.collect()
)
}
fn signer_port(&self) -> Result<u16, Error> { fn signer_port(&self) -> Result<u16, Error> {
try!(self.active()); try!(self.active());

View File

@ -24,7 +24,7 @@ use ethcore::block::{ClosedBlock, IsBlock};
use ethcore::header::BlockNumber; use ethcore::header::BlockNumber;
use ethcore::transaction::SignedTransaction; use ethcore::transaction::SignedTransaction;
use ethcore::receipt::{Receipt, RichReceipt}; use ethcore::receipt::{Receipt, RichReceipt};
use ethcore::miner::{MinerService, MinerStatus, TransactionImportResult}; use ethcore::miner::{MinerService, MinerStatus, TransactionImportResult, LocalTransactionStatus};
/// Test miner service. /// Test miner service.
pub struct TestMinerService { pub struct TestMinerService {
@ -34,6 +34,8 @@ pub struct TestMinerService {
pub latest_closed_block: Mutex<Option<ClosedBlock>>, pub latest_closed_block: Mutex<Option<ClosedBlock>>,
/// Pre-existed pending transactions /// Pre-existed pending transactions
pub pending_transactions: Mutex<HashMap<H256, SignedTransaction>>, pub pending_transactions: Mutex<HashMap<H256, SignedTransaction>>,
/// Pre-existed local transactions
pub local_transactions: Mutex<BTreeMap<H256, LocalTransactionStatus>>,
/// Pre-existed pending receipts /// Pre-existed pending receipts
pub pending_receipts: Mutex<BTreeMap<H256, Receipt>>, pub pending_receipts: Mutex<BTreeMap<H256, Receipt>>,
/// Last nonces. /// Last nonces.
@ -53,6 +55,7 @@ impl Default for TestMinerService {
imported_transactions: Mutex::new(Vec::new()), imported_transactions: Mutex::new(Vec::new()),
latest_closed_block: Mutex::new(None), latest_closed_block: Mutex::new(None),
pending_transactions: Mutex::new(HashMap::new()), pending_transactions: Mutex::new(HashMap::new()),
local_transactions: Mutex::new(BTreeMap::new()),
pending_receipts: Mutex::new(BTreeMap::new()), pending_receipts: Mutex::new(BTreeMap::new()),
last_nonces: RwLock::new(HashMap::new()), last_nonces: RwLock::new(HashMap::new()),
min_gas_price: RwLock::new(U256::from(20_000_000)), min_gas_price: RwLock::new(U256::from(20_000_000)),
@ -195,6 +198,10 @@ impl MinerService for TestMinerService {
self.pending_transactions.lock().values().cloned().collect() self.pending_transactions.lock().values().cloned().collect()
} }
fn local_transactions(&self) -> BTreeMap<H256, LocalTransactionStatus> {
self.local_transactions.lock().iter().map(|(hash, stats)| (*hash, stats.clone())).collect()
}
fn pending_transactions(&self, _best_block: BlockNumber) -> Vec<SignedTransaction> { fn pending_transactions(&self, _best_block: BlockNumber) -> Vec<SignedTransaction> {
self.pending_transactions.lock().values().cloned().collect() self.pending_transactions.lock().values().cloned().collect()
} }

View File

@ -16,8 +16,9 @@
//! Test implementation of SyncProvider. //! Test implementation of SyncProvider.
use util::{RwLock}; use std::collections::BTreeMap;
use ethsync::{SyncProvider, SyncStatus, SyncState, PeerInfo}; use util::{H256, RwLock};
use ethsync::{SyncProvider, SyncStatus, SyncState, PeerInfo, TransactionStats};
/// TestSyncProvider config. /// TestSyncProvider config.
pub struct Config { pub struct Config {
@ -97,5 +98,22 @@ impl SyncProvider for TestSyncProvider {
fn enode(&self) -> Option<String> { fn enode(&self) -> Option<String> {
None None
} }
fn transactions_stats(&self) -> BTreeMap<H256, TransactionStats> {
map![
1.into() => TransactionStats {
first_seen: 10,
propagated_to: map![
128.into() => 16
]
},
5.into() => TransactionStats {
first_seen: 16,
propagated_to: map![
16.into() => 1
]
}
]
}
} }

View File

@ -18,8 +18,9 @@ use std::sync::Arc;
use util::log::RotatingLogger; use util::log::RotatingLogger;
use util::Address; use util::Address;
use ethsync::ManageNetwork; use ethsync::ManageNetwork;
use ethcore::client::{TestBlockChainClient};
use ethcore::account_provider::AccountProvider; use ethcore::account_provider::AccountProvider;
use ethcore::client::{TestBlockChainClient};
use ethcore::miner::LocalTransactionStatus;
use ethstore::ethkey::{Generator, Random}; use ethstore::ethkey::{Generator, Random};
use jsonrpc_core::IoHandler; use jsonrpc_core::IoHandler;
@ -355,3 +356,28 @@ fn rpc_parity_next_nonce() {
assert_eq!(io1.handle_request_sync(&request), Some(response1.to_owned())); assert_eq!(io1.handle_request_sync(&request), Some(response1.to_owned()));
assert_eq!(io2.handle_request_sync(&request), Some(response2.to_owned())); assert_eq!(io2.handle_request_sync(&request), Some(response2.to_owned()));
} }
#[test]
fn rpc_parity_transactions_stats() {
let deps = Dependencies::new();
let io = deps.default_client();
let request = r#"{"jsonrpc": "2.0", "method": "parity_pendingTransactionsStats", "params":[], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":{"0x0000000000000000000000000000000000000000000000000000000000000001":{"firstSeen":10,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000080":16}},"0x0000000000000000000000000000000000000000000000000000000000000005":{"firstSeen":16,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010":1}}},"id":1}"#;
assert_eq!(io.handle_request_sync(request), Some(response.to_owned()));
}
#[test]
fn rpc_parity_local_transactions() {
let deps = Dependencies::new();
let io = deps.default_client();
deps.miner.local_transactions.lock().insert(10.into(), LocalTransactionStatus::Pending);
deps.miner.local_transactions.lock().insert(15.into(), LocalTransactionStatus::Future);
let request = r#"{"jsonrpc": "2.0", "method": "parity_localTransactions", "params":[], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":{"0x000000000000000000000000000000000000000000000000000000000000000a":{"status":"pending"},"0x000000000000000000000000000000000000000000000000000000000000000f":{"status":"future"}},"id":1}"#;
assert_eq!(io.handle_request_sync(request), Some(response.to_owned()));
}

View File

@ -19,7 +19,11 @@ use jsonrpc_core::Error;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use v1::helpers::auto_args::Wrap; use v1::helpers::auto_args::Wrap;
use v1::types::{H160, H256, H512, U256, Bytes, Peers, Transaction, RpcSettings, Histogram}; use v1::types::{
H160, H256, H512, U256, Bytes,
Peers, Transaction, RpcSettings, Histogram,
TransactionStats, LocalTransactionStatus,
};
build_rpc_trait! { build_rpc_trait! {
/// Parity-specific rpc interface. /// Parity-specific rpc interface.
@ -115,6 +119,14 @@ build_rpc_trait! {
#[rpc(name = "parity_pendingTransactions")] #[rpc(name = "parity_pendingTransactions")]
fn pending_transactions(&self) -> Result<Vec<Transaction>, Error>; fn pending_transactions(&self) -> Result<Vec<Transaction>, Error>;
/// Returns propagation statistics on transactions pending in the queue.
#[rpc(name = "parity_pendingTransactionsStats")]
fn pending_transactions_stats(&self) -> Result<BTreeMap<H256, TransactionStats>, Error>;
/// Returns a list of current and past local transactions with status details.
#[rpc(name = "parity_localTransactions")]
fn local_transactions(&self) -> Result<BTreeMap<H256, LocalTransactionStatus>, Error>;
/// Returns current Trusted Signer port or an error if signer is disabled. /// Returns current Trusted Signer port or an error if signer is disabled.
#[rpc(name = "parity_signerPort")] #[rpc(name = "parity_signerPort")]
fn signer_port(&self) -> Result<u16, Error>; fn signer_port(&self) -> Result<u16, Error>;

View File

@ -43,8 +43,8 @@ pub use self::filter::{Filter, FilterChanges};
pub use self::hash::{H64, H160, H256, H512, H520, H2048}; pub use self::hash::{H64, H160, H256, H512, H520, H2048};
pub use self::index::Index; pub use self::index::Index;
pub use self::log::Log; pub use self::log::Log;
pub use self::sync::{SyncStatus, SyncInfo, Peers, PeerInfo, PeerNetworkInfo, PeerProtocolsInfo, PeerEthereumProtocolInfo}; pub use self::sync::{SyncStatus, SyncInfo, Peers, PeerInfo, PeerNetworkInfo, PeerProtocolsInfo, PeerEthereumProtocolInfo, TransactionStats};
pub use self::transaction::{Transaction, RichRawTransaction}; pub use self::transaction::{Transaction, RichRawTransaction, LocalTransactionStatus};
pub use self::transaction_request::TransactionRequest; pub use self::transaction_request::TransactionRequest;
pub use self::receipt::Receipt; pub use self::receipt::Receipt;
pub use self::rpc_settings::RpcSettings; pub use self::rpc_settings::RpcSettings;

View File

@ -14,9 +14,10 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use ethsync::PeerInfo as SyncPeerInfo; use std::collections::BTreeMap;
use ethsync::{PeerInfo as SyncPeerInfo, TransactionStats as SyncTransactionStats};
use serde::{Serialize, Serializer}; use serde::{Serialize, Serializer};
use v1::types::U256; use v1::types::{U256, H512};
/// Sync info /// Sync info
#[derive(Default, Debug, Serialize, PartialEq)] #[derive(Default, Debug, Serialize, PartialEq)]
@ -117,8 +118,19 @@ impl Serialize for SyncStatus {
} }
} }
/// Propagation statistics for pending transaction.
#[derive(Default, Debug, Serialize)]
pub struct TransactionStats {
/// Block no this transaction was first seen.
#[serde(rename="firstSeen")]
pub first_seen: u64,
/// Peers this transaction was propagated to with count.
#[serde(rename="propagatedTo")]
pub propagated_to: BTreeMap<H512, usize>,
}
impl From<SyncPeerInfo> for PeerInfo { impl From<SyncPeerInfo> for PeerInfo {
fn from(p: SyncPeerInfo) -> PeerInfo { fn from(p: SyncPeerInfo) -> Self {
PeerInfo { PeerInfo {
id: p.id, id: p.id,
name: p.client_version, name: p.client_version,
@ -138,10 +150,23 @@ impl From<SyncPeerInfo> for PeerInfo {
} }
} }
impl From<SyncTransactionStats> for TransactionStats {
fn from(s: SyncTransactionStats) -> Self {
TransactionStats {
first_seen: s.first_seen,
propagated_to: s.propagated_to
.into_iter()
.map(|(id, count)| (id.into(), count))
.collect()
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use serde_json; use serde_json;
use super::{SyncInfo, SyncStatus, Peers}; use std::collections::BTreeMap;
use super::{SyncInfo, SyncStatus, Peers, TransactionStats};
#[test] #[test]
fn test_serialize_sync_info() { fn test_serialize_sync_info() {
@ -176,4 +201,17 @@ mod tests {
let serialized = serde_json::to_string(&t).unwrap(); let serialized = serde_json::to_string(&t).unwrap();
assert_eq!(serialized, r#"{"startingBlock":"0x0","currentBlock":"0x0","highestBlock":"0x0","warpChunksAmount":null,"warpChunksProcessed":null,"blockGap":["0x1","0x5"]}"#) assert_eq!(serialized, r#"{"startingBlock":"0x0","currentBlock":"0x0","highestBlock":"0x0","warpChunksAmount":null,"warpChunksProcessed":null,"blockGap":["0x1","0x5"]}"#)
} }
#[test]
fn test_serialize_transaction_stats() {
let stats = TransactionStats {
first_seen: 100,
propagated_to: map![
10.into() => 50
]
};
let serialized = serde_json::to_string(&stats).unwrap();
assert_eq!(serialized, r#"{"firstSeen":100,"propagatedTo":{"0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a":50}}"#)
}
} }

View File

@ -14,8 +14,11 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use serde::{Serialize, Serializer};
use ethcore::miner;
use ethcore::contract_address; use ethcore::contract_address;
use ethcore::transaction::{LocalizedTransaction, Action, SignedTransaction}; use ethcore::transaction::{LocalizedTransaction, Action, SignedTransaction};
use v1::helpers::errors;
use v1::types::{Bytes, H160, H256, U256, H512}; use v1::types::{Bytes, H160, H256, U256, H512};
/// Transaction /// Transaction
@ -62,6 +65,73 @@ pub struct Transaction {
pub s: H256, pub s: H256,
} }
/// Local Transaction Status
#[derive(Debug)]
pub enum LocalTransactionStatus {
/// Transaction is pending
Pending,
/// Transaction is in future part of the queue
Future,
/// Transaction is already mined.
Mined(Transaction),
/// Transaction was dropped because of limit.
Dropped(Transaction),
/// Transaction was replaced by transaction with higher gas price.
Replaced(Transaction, U256, H256),
/// Transaction never got into the queue.
Rejected(Transaction, String),
/// Transaction is invalid.
Invalid(Transaction),
}
impl Serialize for LocalTransactionStatus {
fn serialize<S>(&self, serializer: &mut S) -> Result<(), S::Error>
where S: Serializer
{
use self::LocalTransactionStatus::*;
let elems = match *self {
Pending | Future => 1,
Mined(..) | Dropped(..) | Invalid(..) => 2,
Rejected(..) => 3,
Replaced(..) => 4,
};
let status = "status";
let transaction = "transaction";
let mut state = try!(serializer.serialize_struct("LocalTransactionStatus", elems));
match *self {
Pending => try!(serializer.serialize_struct_elt(&mut state, status, "pending")),
Future => try!(serializer.serialize_struct_elt(&mut state, status, "future")),
Mined(ref tx) => {
try!(serializer.serialize_struct_elt(&mut state, status, "mined"));
try!(serializer.serialize_struct_elt(&mut state, transaction, tx));
},
Dropped(ref tx) => {
try!(serializer.serialize_struct_elt(&mut state, status, "dropped"));
try!(serializer.serialize_struct_elt(&mut state, transaction, tx));
},
Invalid(ref tx) => {
try!(serializer.serialize_struct_elt(&mut state, status, "invalid"));
try!(serializer.serialize_struct_elt(&mut state, transaction, tx));
},
Rejected(ref tx, ref reason) => {
try!(serializer.serialize_struct_elt(&mut state, status, "rejected"));
try!(serializer.serialize_struct_elt(&mut state, transaction, tx));
try!(serializer.serialize_struct_elt(&mut state, "error", reason));
},
Replaced(ref tx, ref gas_price, ref hash) => {
try!(serializer.serialize_struct_elt(&mut state, status, "replaced"));
try!(serializer.serialize_struct_elt(&mut state, transaction, tx));
try!(serializer.serialize_struct_elt(&mut state, "hash", hash));
try!(serializer.serialize_struct_elt(&mut state, "gasPrice", gas_price));
},
}
serializer.serialize_struct_end(state)
}
}
/// Geth-compatible output for eth_signTransaction method /// Geth-compatible output for eth_signTransaction method
#[derive(Debug, Default, Clone, PartialEq, Serialize)] #[derive(Debug, Default, Clone, PartialEq, Serialize)]
pub struct RichRawTransaction { pub struct RichRawTransaction {
@ -144,9 +214,24 @@ impl From<SignedTransaction> for Transaction {
} }
} }
impl From<miner::LocalTransactionStatus> for LocalTransactionStatus {
fn from(s: miner::LocalTransactionStatus) -> Self {
use ethcore::miner::LocalTransactionStatus::*;
match s {
Pending => LocalTransactionStatus::Pending,
Future => LocalTransactionStatus::Future,
Mined(tx) => LocalTransactionStatus::Mined(tx.into()),
Dropped(tx) => LocalTransactionStatus::Dropped(tx.into()),
Rejected(tx, err) => LocalTransactionStatus::Rejected(tx.into(), errors::transaction_message(err)),
Replaced(tx, gas_price, hash) => LocalTransactionStatus::Replaced(tx.into(), gas_price.into(), hash.into()),
Invalid(tx) => LocalTransactionStatus::Invalid(tx.into()),
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::Transaction; use super::{Transaction, LocalTransactionStatus};
use serde_json; use serde_json;
#[test] #[test]
@ -155,5 +240,50 @@ mod tests {
let serialized = serde_json::to_string(&t).unwrap(); let serialized = serde_json::to_string(&t).unwrap();
assert_eq!(serialized, r#"{"hash":"0x0000000000000000000000000000000000000000000000000000000000000000","nonce":"0x0","blockHash":null,"blockNumber":null,"transactionIndex":null,"from":"0x0000000000000000000000000000000000000000","to":null,"value":"0x0","gasPrice":"0x0","gas":"0x0","input":"0x","creates":null,"raw":"0x","publicKey":null,"v":0,"r":"0x0000000000000000000000000000000000000000000000000000000000000000","s":"0x0000000000000000000000000000000000000000000000000000000000000000"}"#); assert_eq!(serialized, r#"{"hash":"0x0000000000000000000000000000000000000000000000000000000000000000","nonce":"0x0","blockHash":null,"blockNumber":null,"transactionIndex":null,"from":"0x0000000000000000000000000000000000000000","to":null,"value":"0x0","gasPrice":"0x0","gas":"0x0","input":"0x","creates":null,"raw":"0x","publicKey":null,"v":0,"r":"0x0000000000000000000000000000000000000000000000000000000000000000","s":"0x0000000000000000000000000000000000000000000000000000000000000000"}"#);
} }
#[test]
fn test_local_transaction_status_serialize() {
let tx_ser = serde_json::to_string(&Transaction::default()).unwrap();
let status1 = LocalTransactionStatus::Pending;
let status2 = LocalTransactionStatus::Future;
let status3 = LocalTransactionStatus::Mined(Transaction::default());
let status4 = LocalTransactionStatus::Dropped(Transaction::default());
let status5 = LocalTransactionStatus::Invalid(Transaction::default());
let status6 = LocalTransactionStatus::Rejected(Transaction::default(), "Just because".into());
let status7 = LocalTransactionStatus::Replaced(Transaction::default(), 5.into(), 10.into());
assert_eq!(
serde_json::to_string(&status1).unwrap(),
r#"{"status":"pending"}"#
);
assert_eq!(
serde_json::to_string(&status2).unwrap(),
r#"{"status":"future"}"#
);
assert_eq!(
serde_json::to_string(&status3).unwrap(),
r#"{"status":"mined","transaction":"#.to_owned() + &format!("{}", tx_ser) + r#"}"#
);
assert_eq!(
serde_json::to_string(&status4).unwrap(),
r#"{"status":"dropped","transaction":"#.to_owned() + &format!("{}", tx_ser) + r#"}"#
);
assert_eq!(
serde_json::to_string(&status5).unwrap(),
r#"{"status":"invalid","transaction":"#.to_owned() + &format!("{}", tx_ser) + r#"}"#
);
assert_eq!(
serde_json::to_string(&status6).unwrap(),
r#"{"status":"rejected","transaction":"#.to_owned() +
&format!("{}", tx_ser) +
r#","error":"Just because"}"#
);
assert_eq!(
serde_json::to_string(&status7).unwrap(),
r#"{"status":"replaced","transaction":"#.to_owned() +
&format!("{}", tx_ser) +
r#","hash":"0x000000000000000000000000000000000000000000000000000000000000000a","gasPrice":"0x5"}"#
);
}
} }

View File

@ -15,13 +15,13 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc; use std::sync::Arc;
use std::collections::HashMap; use std::collections::{HashMap, BTreeMap};
use std::io; use std::io;
use util::Bytes; use util::Bytes;
use network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId, ProtocolId, use network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId, ProtocolId,
NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode, NetworkError, NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode, NetworkError,
AllowIP as NetworkAllowIP}; AllowIP as NetworkAllowIP};
use util::{U256, H256}; use util::{U256, H256, H512};
use io::{TimerToken}; use io::{TimerToken};
use ethcore::client::{BlockChainClient, ChainNotify}; use ethcore::client::{BlockChainClient, ChainNotify};
use ethcore::snapshot::SnapshotService; use ethcore::snapshot::SnapshotService;
@ -76,6 +76,16 @@ pub trait SyncProvider: Send + Sync {
/// Get the enode if available. /// Get the enode if available.
fn enode(&self) -> Option<String>; fn enode(&self) -> Option<String>;
/// Returns propagation count for pending transactions.
fn transactions_stats(&self) -> BTreeMap<H256, TransactionStats>;
}
/// Transaction stats
#[derive(Debug, Binary)]
pub struct TransactionStats {
pub first_seen: u64,
pub propagated_to: BTreeMap<H512, usize>,
} }
/// Peer connection information /// Peer connection information
@ -150,6 +160,14 @@ impl SyncProvider for EthSync {
fn enode(&self) -> Option<String> { fn enode(&self) -> Option<String> {
self.network.external_url() self.network.external_url()
} }
fn transactions_stats(&self) -> BTreeMap<H256, TransactionStats> {
let sync = self.handler.sync.read();
sync.transactions_stats()
.iter()
.map(|(hash, stats)| (*hash, stats.into()))
.collect()
}
} }
struct SyncProtocolHandler { struct SyncProtocolHandler {

View File

@ -104,6 +104,7 @@ use block_sync::{BlockDownloader, BlockRequest, BlockDownloaderImportError as Do
use snapshot::{Snapshot, ChunkType}; use snapshot::{Snapshot, ChunkType};
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use api::{PeerInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID}; use api::{PeerInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID};
use transactions_stats::{TransactionsStats, Stats as TransactionStats};
known_heap_size!(0, PeerInfo); known_heap_size!(0, PeerInfo);
@ -259,7 +260,7 @@ enum ForkConfirmation {
Unconfirmed, Unconfirmed,
/// Peers chain is too short to confirm the fork. /// Peers chain is too short to confirm the fork.
TooShort, TooShort,
/// Fork is confurmed. /// Fork is confirmed.
Confirmed, Confirmed,
} }
@ -349,6 +350,8 @@ pub struct ChainSync {
handshaking_peers: HashMap<PeerId, u64>, handshaking_peers: HashMap<PeerId, u64>,
/// Sync start timestamp. Measured when first peer is connected /// Sync start timestamp. Measured when first peer is connected
sync_start_time: Option<u64>, sync_start_time: Option<u64>,
/// Transactions propagation statistics
transactions_stats: TransactionsStats,
} }
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>; type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
@ -371,6 +374,7 @@ impl ChainSync {
fork_block: config.fork_block, fork_block: config.fork_block,
snapshot: Snapshot::new(), snapshot: Snapshot::new(),
sync_start_time: None, sync_start_time: None,
transactions_stats: TransactionsStats::default(),
}; };
sync.update_targets(chain); sync.update_targets(chain);
sync sync
@ -419,6 +423,11 @@ impl ChainSync {
.collect() .collect()
} }
/// Returns transactions propagation statistics
pub fn transactions_stats(&self) -> &H256FastMap<TransactionStats> {
self.transactions_stats.stats()
}
/// Abort all sync activity /// Abort all sync activity
pub fn abort(&mut self, io: &mut SyncIo) { pub fn abort(&mut self, io: &mut SyncIo) {
self.reset_and_continue(io); self.reset_and_continue(io);
@ -1867,7 +1876,7 @@ impl ChainSync {
/// propagates new transactions to all peers /// propagates new transactions to all peers
pub fn propagate_new_transactions(&mut self, io: &mut SyncIo) -> usize { pub fn propagate_new_transactions(&mut self, io: &mut SyncIo) -> usize {
// Early out of nobody to send to. // Early out if nobody to send to.
if self.peers.is_empty() { if self.peers.is_empty() {
return 0; return 0;
} }
@ -1884,16 +1893,27 @@ impl ChainSync {
packet.out() packet.out()
}; };
// Clear old transactions from stats
self.transactions_stats.retain(&all_transactions_hashes);
// sqrt(x)/x scaled to max u32 // sqrt(x)/x scaled to max u32
let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32; let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32;
let small = self.peers.len() < MIN_PEERS_PROPAGATION; let small = self.peers.len() < MIN_PEERS_PROPAGATION;
let block_number = io.chain().chain_info().best_block_number;
let lucky_peers = self.peers.iter_mut() let lucky_peers = {
let stats = &mut self.transactions_stats;
self.peers.iter_mut()
.filter(|_| small || ::rand::random::<u32>() < fraction) .filter(|_| small || ::rand::random::<u32>() < fraction)
.take(MAX_PEERS_PROPAGATION) .take(MAX_PEERS_PROPAGATION)
.filter_map(|(peer_id, mut peer_info)| { .filter_map(|(peer_id, mut peer_info)| {
// Send all transactions // Send all transactions
if peer_info.last_sent_transactions.is_empty() { if peer_info.last_sent_transactions.is_empty() {
// update stats
for hash in &all_transactions_hashes {
let id = io.peer_session_info(*peer_id).and_then(|info| info.id);
stats.propagated(*hash, id, block_number);
}
peer_info.last_sent_transactions = all_transactions_hashes.clone(); peer_info.last_sent_transactions = all_transactions_hashes.clone();
return Some((*peer_id, all_transactions_rlp.clone())); return Some((*peer_id, all_transactions_rlp.clone()));
} }
@ -1909,13 +1929,17 @@ impl ChainSync {
for tx in &transactions { for tx in &transactions {
if to_send.contains(&tx.hash()) { if to_send.contains(&tx.hash()) {
packet.append(tx); packet.append(tx);
// update stats
let id = io.peer_session_info(*peer_id).and_then(|info| info.id);
stats.propagated(tx.hash(), id, block_number);
} }
} }
peer_info.last_sent_transactions = all_transactions_hashes.clone(); peer_info.last_sent_transactions = all_transactions_hashes.clone();
Some((*peer_id, packet.out())) Some((*peer_id, packet.out()))
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>()
};
// Send RLPs // Send RLPs
let sent = lucky_peers.len(); let sent = lucky_peers.len();
@ -1965,9 +1989,6 @@ impl ChainSync {
trace!(target: "sync", "Bad blocks in the queue, restarting"); trace!(target: "sync", "Bad blocks in the queue, restarting");
self.restart(io); self.restart(io);
} }
for peer_info in self.peers.values_mut() {
peer_info.last_sent_transactions.clear();
}
} }
} }
@ -2293,18 +2314,23 @@ mod tests {
let peer_count = sync.propagate_new_transactions(&mut io); let peer_count = sync.propagate_new_transactions(&mut io);
// Try to propagate same transactions for the second time // Try to propagate same transactions for the second time
let peer_count2 = sync.propagate_new_transactions(&mut io); let peer_count2 = sync.propagate_new_transactions(&mut io);
// Even after new block transactions should not be propagated twice
sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[]);
// Try to propagate same transactions for the third time
let peer_count3 = sync.propagate_new_transactions(&mut io);
// 1 message should be send // 1 message should be send
assert_eq!(1, io.queue.len()); assert_eq!(1, io.queue.len());
// 1 peer should be updated but only once // 1 peer should be updated but only once
assert_eq!(1, peer_count); assert_eq!(1, peer_count);
assert_eq!(0, peer_count2); assert_eq!(0, peer_count2);
assert_eq!(0, peer_count3);
// TRANSACTIONS_PACKET // TRANSACTIONS_PACKET
assert_eq!(0x02, io.queue[0].packet_id); assert_eq!(0x02, io.queue[0].packet_id);
} }
#[test] #[test]
fn propagates_transactions_again_after_new_block() { fn propagates_new_transactions_after_new_block() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle); client.add_blocks(100, EachBlockWith::Uncle);
client.insert_transaction_to_queue(); client.insert_transaction_to_queue();
@ -2313,15 +2339,14 @@ mod tests {
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &mut queue, None); let mut io = TestIo::new(&mut client, &ss, &mut queue, None);
let peer_count = sync.propagate_new_transactions(&mut io); let peer_count = sync.propagate_new_transactions(&mut io);
io.chain.insert_transaction_to_queue();
// New block import should trigger propagation.
sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[]); sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[]);
// Try to propagate same transactions for the second time
let peer_count2 = sync.propagate_new_transactions(&mut io);
// 2 message should be send // 2 message should be send
assert_eq!(2, io.queue.len()); assert_eq!(2, io.queue.len());
// 1 peer should be updated twice // 1 peer should receive the message
assert_eq!(1, peer_count); assert_eq!(1, peer_count);
assert_eq!(1, peer_count2);
// TRANSACTIONS_PACKET // TRANSACTIONS_PACKET
assert_eq!(0x02, io.queue[0].packet_id); assert_eq!(0x02, io.queue[0].packet_id);
assert_eq!(0x02, io.queue[1].packet_id); assert_eq!(0x02, io.queue[1].packet_id);
@ -2360,6 +2385,21 @@ mod tests {
assert_eq!(0x02, io.queue[1].packet_id); assert_eq!(0x02, io.queue[1].packet_id);
} }
#[test]
fn should_maintain_transations_propagation_stats() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
client.insert_transaction_to_queue();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
let mut queue = VecDeque::new();
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &mut queue, None);
sync.propagate_new_transactions(&mut io);
let stats = sync.transactions_stats();
assert_eq!(stats.len(), 1, "Should maintain stats for single transaction.")
}
#[test] #[test]
fn handles_peer_new_block_malformed() { fn handles_peer_new_block_malformed() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();

View File

@ -51,6 +51,7 @@ mod blocks;
mod block_sync; mod block_sync;
mod sync_io; mod sync_io;
mod snapshot; mod snapshot;
mod transactions_stats;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
@ -61,7 +62,7 @@ mod api {
} }
pub use api::{EthSync, SyncProvider, SyncClient, NetworkManagerClient, ManageNetwork, SyncConfig, pub use api::{EthSync, SyncProvider, SyncClient, NetworkManagerClient, ManageNetwork, SyncConfig,
ServiceConfiguration, NetworkConfiguration, PeerInfo, AllowIP}; ServiceConfiguration, NetworkConfiguration, PeerInfo, AllowIP, TransactionStats};
pub use chain::{SyncStatus, SyncState}; pub use chain::{SyncStatus, SyncState};
pub use network::{is_valid_node_url, NonReservedPeerMode, NetworkError}; pub use network::{is_valid_node_url, NonReservedPeerMode, NetworkError};

View File

@ -0,0 +1,134 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use api::TransactionStats;
use std::collections::{HashSet, HashMap};
use util::{H256, H512};
use util::hash::H256FastMap;
type NodeId = H512;
type BlockNumber = u64;
#[derive(Debug, PartialEq, Clone)]
pub struct Stats {
first_seen: BlockNumber,
propagated_to: HashMap<NodeId, usize>,
}
impl Stats {
pub fn new(number: BlockNumber) -> Self {
Stats {
first_seen: number,
propagated_to: Default::default(),
}
}
}
impl<'a> From<&'a Stats> for TransactionStats {
fn from(other: &'a Stats) -> Self {
TransactionStats {
first_seen: other.first_seen,
propagated_to: other.propagated_to
.iter()
.map(|(hash, size)| (*hash, *size))
.collect(),
}
}
}
#[derive(Debug, Default)]
pub struct TransactionsStats {
pending_transactions: H256FastMap<Stats>,
}
impl TransactionsStats {
/// Increases number of propagations to given `enodeid`.
pub fn propagated(&mut self, hash: H256, enode_id: Option<NodeId>, current_block_num: BlockNumber) {
let enode_id = enode_id.unwrap_or_default();
let mut stats = self.pending_transactions.entry(hash).or_insert_with(|| Stats::new(current_block_num));
let mut count = stats.propagated_to.entry(enode_id).or_insert(0);
*count = count.saturating_add(1);
}
/// Returns propagation stats for given hash or `None` if hash is not known.
#[cfg(test)]
pub fn get(&self, hash: &H256) -> Option<&Stats> {
self.pending_transactions.get(hash)
}
pub fn stats(&self) -> &H256FastMap<Stats> {
&self.pending_transactions
}
/// Retains only transactions present in given `HashSet`.
pub fn retain(&mut self, hashes: &HashSet<H256>) {
let to_remove = self.pending_transactions.keys()
.filter(|hash| !hashes.contains(hash))
.cloned()
.collect::<Vec<_>>();
for hash in to_remove {
self.pending_transactions.remove(&hash);
}
}
}
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use super::{Stats, TransactionsStats};
#[test]
fn should_keep_track_of_propagations() {
// given
let mut stats = TransactionsStats::default();
let hash = 5.into();
let enodeid1 = 2.into();
let enodeid2 = 5.into();
// when
stats.propagated(hash, Some(enodeid1), 5);
stats.propagated(hash, Some(enodeid1), 10);
stats.propagated(hash, Some(enodeid2), 15);
// then
let stats = stats.get(&hash);
assert_eq!(stats, Some(&Stats {
first_seen: 5,
propagated_to: hash_map![
enodeid1 => 2,
enodeid2 => 1
]
}));
}
#[test]
fn should_remove_hash_from_tracking() {
// given
let mut stats = TransactionsStats::default();
let hash = 5.into();
let enodeid1 = 5.into();
stats.propagated(hash, Some(enodeid1), 10);
// when
stats.retain(&HashSet::new());
// then
let stats = stats.get(&hash);
assert_eq!(stats, None);
}
}