diff --git a/Cargo.lock b/Cargo.lock index fb441ef96..dafac3d7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -297,6 +297,7 @@ dependencies = [ "heapsize 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.9.4 (git+https://github.com/ethcore/hyper)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "lru-cache 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ethcore/Cargo.toml b/ethcore/Cargo.toml index 667b40ace..48fce064a 100644 --- a/ethcore/Cargo.toml +++ b/ethcore/Cargo.toml @@ -27,6 +27,7 @@ time = "0.1" rand = "0.3" byteorder = "0.5" transient-hashmap = "0.1" +linked-hash-map = "0.3.0" evmjit = { path = "../evmjit", optional = true } clippy = { version = "0.0.96", optional = true} ethash = { path = "../ethash" } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 1c5be7509..0e0b292f9 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -262,7 +262,7 @@ impl Client { } } - /// Register an action to be done if a mode change happens. + /// Register an action to be done if a mode change happens. pub fn on_mode_change(&self, f: F) where F: 'static + FnMut(&Mode) + Send { *self.on_mode_change.lock() = Some(Box::new(f)); } @@ -891,12 +891,9 @@ impl BlockChainClient for Client { let mut mode = self.mode.lock(); *mode = new_mode.clone().into(); trace!(target: "mode", "Mode now {:?}", &*mode); - match *self.on_mode_change.lock() { - Some(ref mut f) => { - trace!(target: "mode", "Making callback..."); - f(&*mode) - }, - _ => {} + if let Some(ref mut f) = *self.on_mode_change.lock() { + trace!(target: "mode", "Making callback..."); + f(&*mode) } } match new_mode { diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs index bf3e59171..ca343b1a7 100644 --- a/ethcore/src/lib.rs +++ b/ethcore/src/lib.rs @@ -102,6 +102,7 @@ extern crate rlp; extern crate ethcore_bloom_journal as bloom_journal; extern crate byteorder; extern crate transient_hashmap; +extern crate linked_hash_map; #[macro_use] extern crate log; diff --git a/ethcore/src/miner/local_transactions.rs b/ethcore/src/miner/local_transactions.rs new file mode 100644 index 000000000..c8afcc0d5 --- /dev/null +++ b/ethcore/src/miner/local_transactions.rs @@ -0,0 +1,196 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Local Transactions List. + +use linked_hash_map::LinkedHashMap; +use transaction::SignedTransaction; +use error::TransactionError; +use util::{U256, H256}; + +/// Status of local transaction. +/// Can indicate that the transaction is currently part of the queue (`Pending/Future`) +/// or gives a reason why the transaction was removed. +#[derive(Debug, PartialEq, Clone)] +pub enum Status { + /// The transaction is currently in the transaction queue. + Pending, + /// The transaction is in future part of the queue. + Future, + /// Transaction is already mined. + Mined(SignedTransaction), + /// Transaction is dropped because of limit + Dropped(SignedTransaction), + /// Replaced because of higher gas price of another transaction. + Replaced(SignedTransaction, U256, H256), + /// Transaction was never accepted to the queue. + Rejected(SignedTransaction, TransactionError), + /// Transaction is invalid. + Invalid(SignedTransaction), +} + +impl Status { + fn is_current(&self) -> bool { + *self == Status::Pending || *self == Status::Future + } +} + +/// Keeps track of local transactions that are in the queue or were mined/dropped recently. +#[derive(Debug)] +pub struct LocalTransactionsList { + max_old: usize, + transactions: LinkedHashMap, +} + +impl Default for LocalTransactionsList { + fn default() -> Self { + Self::new(10) + } +} + +impl LocalTransactionsList { + pub fn new(max_old: usize) -> Self { + LocalTransactionsList { + max_old: max_old, + transactions: Default::default(), + } + } + + pub fn mark_pending(&mut self, hash: H256) { + self.clear_old(); + self.transactions.insert(hash, Status::Pending); + } + + pub fn mark_future(&mut self, hash: H256) { + self.transactions.insert(hash, Status::Future); + self.clear_old(); + } + + pub fn mark_rejected(&mut self, tx: SignedTransaction, err: TransactionError) { + self.transactions.insert(tx.hash(), Status::Rejected(tx, err)); + self.clear_old(); + } + + pub fn mark_replaced(&mut self, tx: SignedTransaction, gas_price: U256, hash: H256) { + self.transactions.insert(tx.hash(), Status::Replaced(tx, gas_price, hash)); + self.clear_old(); + } + + pub fn mark_invalid(&mut self, tx: SignedTransaction) { + self.transactions.insert(tx.hash(), Status::Invalid(tx)); + self.clear_old(); + } + + pub fn mark_dropped(&mut self, tx: SignedTransaction) { + self.transactions.insert(tx.hash(), Status::Dropped(tx)); + self.clear_old(); + } + + pub fn mark_mined(&mut self, tx: SignedTransaction) { + self.transactions.insert(tx.hash(), Status::Mined(tx)); + self.clear_old(); + } + + pub fn contains(&self, hash: &H256) -> bool { + self.transactions.contains_key(hash) + } + + pub fn all_transactions(&self) -> &LinkedHashMap { + &self.transactions + } + + fn clear_old(&mut self) { + let number_of_old = self.transactions + .values() + .filter(|status| !status.is_current()) + .count(); + + if self.max_old >= number_of_old { + return; + } + + let to_remove = self.transactions + .iter() + .filter(|&(_, status)| !status.is_current()) + .map(|(hash, _)| *hash) + .take(number_of_old - self.max_old) + .collect::>(); + + for hash in to_remove { + self.transactions.remove(&hash); + } + } +} + +#[cfg(test)] +mod tests { + use util::U256; + use ethkey::{Random, Generator}; + use transaction::{Action, Transaction, SignedTransaction}; + use super::{LocalTransactionsList, Status}; + + #[test] + fn should_add_transaction_as_pending() { + // given + let mut list = LocalTransactionsList::default(); + + // when + list.mark_pending(10.into()); + list.mark_future(20.into()); + + // then + assert!(list.contains(&10.into()), "Should contain the transaction."); + assert!(list.contains(&20.into()), "Should contain the transaction."); + let statuses = list.all_transactions().values().cloned().collect::>(); + assert_eq!(statuses, vec![Status::Pending, Status::Future]); + } + + #[test] + fn should_clear_old_transactions() { + // given + let mut list = LocalTransactionsList::new(1); + let tx1 = new_tx(10.into()); + let tx1_hash = tx1.hash(); + let tx2 = new_tx(50.into()); + let tx2_hash = tx2.hash(); + + list.mark_pending(10.into()); + list.mark_invalid(tx1); + list.mark_dropped(tx2); + assert!(list.contains(&tx2_hash)); + assert!(!list.contains(&tx1_hash)); + assert!(list.contains(&10.into())); + + // when + list.mark_future(15.into()); + + // then + assert!(list.contains(&10.into())); + assert!(list.contains(&15.into())); + } + + fn new_tx(nonce: U256) -> SignedTransaction { + let keypair = Random.generate().unwrap(); + Transaction { + action: Action::Create, + value: U256::from(100), + data: Default::default(), + gas: U256::from(10), + gas_price: U256::from(1245), + nonce: nonce + }.sign(keypair.secret(), None) + } +} diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index 8a87ea189..7ad18ebfc 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -24,6 +24,7 @@ use views::{BlockView, HeaderView}; use header::Header; use state::{State, CleanupMode}; use client::{MiningBlockChainClient, Executive, Executed, EnvInfo, TransactOptions, BlockID, CallAnalytics}; +use client::TransactionImportResult; use executive::contract_address; use block::{ClosedBlock, SealedBlock, IsBlock, Block}; use error::*; @@ -34,8 +35,8 @@ use engines::Engine; use miner::{MinerService, MinerStatus, TransactionQueue, PrioritizationStrategy, AccountDetails, TransactionOrigin}; use miner::banning_queue::{BanningTransactionQueue, Threshold}; use miner::work_notify::WorkPoster; -use client::TransactionImportResult; use miner::price_info::PriceInfo; +use miner::local_transactions::{Status as LocalTransactionStatus}; use header::BlockNumber; /// Different possible definitions for pending transaction set. @@ -563,7 +564,7 @@ impl Miner { prepare_new } - fn add_transactions_to_queue(&self, chain: &MiningBlockChainClient, transactions: Vec, origin: TransactionOrigin, transaction_queue: &mut BanningTransactionQueue) -> + fn add_transactions_to_queue(&self, chain: &MiningBlockChainClient, transactions: Vec, default_origin: TransactionOrigin, transaction_queue: &mut BanningTransactionQueue) -> Vec> { let fetch_account = |a: &Address| AccountDetails { @@ -571,6 +572,10 @@ impl Miner { balance: chain.latest_balance(a), }; + let accounts = self.accounts.as_ref() + .and_then(|provider| provider.accounts().ok()) + .map(|accounts| accounts.into_iter().collect::>()); + let schedule = chain.latest_schedule(); let gas_required = |tx: &SignedTransaction| tx.gas_required(&schedule).into(); let best_block_header: Header = ::rlp::decode(&chain.best_block_header()); @@ -583,12 +588,21 @@ impl Miner { } } ) - .map(|tx| match origin { - TransactionOrigin::Local | TransactionOrigin::RetractedBlock => { - transaction_queue.add(tx, origin, &fetch_account, &gas_required) - }, - TransactionOrigin::External => { - transaction_queue.add_with_banlist(tx, &fetch_account, &gas_required) + .map(|tx| { + let origin = accounts.as_ref().and_then(|accounts| { + tx.sender().ok().and_then(|sender| match accounts.contains(&sender) { + true => Some(TransactionOrigin::Local), + false => None, + }) + }).unwrap_or(default_origin); + + match origin { + TransactionOrigin::Local | TransactionOrigin::RetractedBlock => { + transaction_queue.add(tx, origin, &fetch_account, &gas_required) + }, + TransactionOrigin::External => { + transaction_queue.add_with_banlist(tx, &fetch_account, &gas_required) + } } }) .collect() @@ -863,6 +877,14 @@ impl MinerService for Miner { queue.top_transactions() } + fn local_transactions(&self) -> BTreeMap { + let queue = self.transaction_queue.lock(); + queue.local_transactions() + .iter() + .map(|(hash, status)| (*hash, status.clone())) + .collect() + } + fn pending_transactions(&self, best_block: BlockNumber) -> Vec { let queue = self.transaction_queue.lock(); match self.options.pending_set { diff --git a/ethcore/src/miner/mod.rs b/ethcore/src/miner/mod.rs index da93dc0b7..1fb2244fd 100644 --- a/ethcore/src/miner/mod.rs +++ b/ethcore/src/miner/mod.rs @@ -41,16 +41,18 @@ //! } //! ``` -mod miner; -mod external; -mod transaction_queue; mod banning_queue; -mod work_notify; +mod external; +mod local_transactions; +mod miner; mod price_info; +mod transaction_queue; +mod work_notify; -pub use self::transaction_queue::{TransactionQueue, PrioritizationStrategy, AccountDetails, TransactionOrigin}; -pub use self::miner::{Miner, MinerOptions, Banning, PendingSet, GasPricer, GasPriceCalibratorOptions, GasLimit}; pub use self::external::{ExternalMiner, ExternalMinerService}; +pub use self::miner::{Miner, MinerOptions, Banning, PendingSet, GasPricer, GasPriceCalibratorOptions, GasLimit}; +pub use self::transaction_queue::{TransactionQueue, PrioritizationStrategy, AccountDetails, TransactionOrigin}; +pub use self::local_transactions::{Status as LocalTransactionStatus}; pub use client::TransactionImportResult; use std::collections::BTreeMap; @@ -145,6 +147,9 @@ pub trait MinerService : Send + Sync { /// Get a list of all pending transactions. fn pending_transactions(&self, best_block: BlockNumber) -> Vec; + /// Get a list of local transactions with statuses. + fn local_transactions(&self) -> BTreeMap; + /// Get a list of all pending receipts. fn pending_receipts(&self, best_block: BlockNumber) -> BTreeMap; diff --git a/ethcore/src/miner/transaction_queue.rs b/ethcore/src/miner/transaction_queue.rs index cc10bbe98..bfbd3fade 100644 --- a/ethcore/src/miner/transaction_queue.rs +++ b/ethcore/src/miner/transaction_queue.rs @@ -86,11 +86,13 @@ use std::ops::Deref; use std::cmp::Ordering; use std::cmp; use std::collections::{HashSet, HashMap, BTreeSet, BTreeMap}; +use linked_hash_map::LinkedHashMap; use util::{Address, H256, Uint, U256}; use util::table::Table; use transaction::*; use error::{Error, TransactionError}; use client::TransactionImportResult; +use miner::local_transactions::{LocalTransactionsList, Status as LocalTransactionStatus}; /// Transaction origin #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -125,6 +127,12 @@ impl Ord for TransactionOrigin { } } +impl TransactionOrigin { + fn is_local(&self) -> bool { + *self == TransactionOrigin::Local + } +} + #[derive(Clone, Debug)] /// Light structure used to identify transaction and its order struct TransactionOrder { @@ -201,17 +209,16 @@ impl Ord for TransactionOrder { return self.penalties.cmp(&b.penalties); } - // First check nonce_height - if self.nonce_height != b.nonce_height { - return self.nonce_height.cmp(&b.nonce_height); - } - // Local transactions should always have priority - // NOTE nonce has to be checked first, cause otherwise the order might be wrong. if self.origin != b.origin { return self.origin.cmp(&b.origin); } + // Check nonce_height + if self.nonce_height != b.nonce_height { + return self.nonce_height.cmp(&b.nonce_height); + } + match self.strategy { PrioritizationStrategy::GasAndGasPrice => { if self.gas != b.gas { @@ -242,6 +249,7 @@ impl Ord for TransactionOrder { } /// Verified transaction (with sender) +#[derive(Debug)] struct VerifiedTransaction { /// Transaction transaction: SignedTransaction, @@ -352,7 +360,7 @@ impl TransactionSet { /// /// It drops transactions from this set but also removes associated `VerifiedTransaction`. /// Returns addresses and lowest nonces of transactions removed because of limit. - fn enforce_limit(&mut self, by_hash: &mut HashMap) -> Option> { + fn enforce_limit(&mut self, by_hash: &mut HashMap, local: &mut LocalTransactionsList) -> Option> { let mut count = 0; let mut gas: U256 = 0.into(); let to_drop : Vec<(Address, U256)> = { @@ -379,9 +387,13 @@ impl TransactionSet { .expect("Transaction has just been found in `by_priority`; so it is in `by_address` also."); trace!(target: "txqueue", "Dropped out of limit transaction: {:?}", order.hash); - by_hash.remove(&order.hash) + let order = by_hash.remove(&order.hash) .expect("hash is in `by_priorty`; all hashes in `by_priority` must be in `by_hash`; qed"); + if order.origin.is_local() { + local.mark_dropped(order.transaction); + } + let min = removed.get(&sender).map_or(nonce, |val| cmp::min(*val, nonce)); removed.insert(sender, min); removed @@ -488,6 +500,8 @@ pub struct TransactionQueue { by_hash: HashMap, /// Last nonce of transaction in current (to quickly check next expected transaction) last_nonces: HashMap, + /// List of local transactions and their statuses. + local_transactions: LocalTransactionsList, } impl Default for TransactionQueue { @@ -529,6 +543,7 @@ impl TransactionQueue { future: future, by_hash: HashMap::new(), last_nonces: HashMap::new(), + local_transactions: LocalTransactionsList::default(), } } @@ -537,8 +552,8 @@ impl TransactionQueue { self.current.set_limit(limit); self.future.set_limit(limit); // And ensure the limits - self.current.enforce_limit(&mut self.by_hash); - self.future.enforce_limit(&mut self.by_hash); + self.current.enforce_limit(&mut self.by_hash, &mut self.local_transactions); + self.future.enforce_limit(&mut self.by_hash, &mut self.local_transactions); } /// Returns current limit of transactions in the queue. @@ -578,7 +593,7 @@ impl TransactionQueue { pub fn set_total_gas_limit(&mut self, gas_limit: U256) { self.future.gas_limit = gas_limit; self.current.gas_limit = gas_limit; - self.future.enforce_limit(&mut self.by_hash); + self.future.enforce_limit(&mut self.by_hash, &mut self.local_transactions); } /// Set the new limit for the amount of gas any individual transaction may have. @@ -609,6 +624,46 @@ impl TransactionQueue { F: Fn(&Address) -> AccountDetails, G: Fn(&SignedTransaction) -> U256, { + if origin == TransactionOrigin::Local { + let hash = tx.hash(); + let cloned_tx = tx.clone(); + + let result = self.add_internal(tx, origin, fetch_account, gas_estimator); + match result { + Ok(TransactionImportResult::Current) => { + self.local_transactions.mark_pending(hash); + }, + Ok(TransactionImportResult::Future) => { + self.local_transactions.mark_future(hash); + }, + Err(Error::Transaction(ref err)) => { + // Sometimes transactions are re-imported, so + // don't overwrite transactions if they are already on the list + if !self.local_transactions.contains(&cloned_tx.hash()) { + self.local_transactions.mark_rejected(cloned_tx, err.clone()); + } + }, + Err(_) => { + self.local_transactions.mark_invalid(cloned_tx); + }, + } + result + } else { + self.add_internal(tx, origin, fetch_account, gas_estimator) + } + } + + /// Adds signed transaction to the queue. + fn add_internal( + &mut self, + tx: SignedTransaction, + origin: TransactionOrigin, + fetch_account: &F, + gas_estimator: &G, + ) -> Result where + F: Fn(&Address) -> AccountDetails, + G: Fn(&SignedTransaction) -> U256, + { if tx.gas_price < self.minimal_gas_price && origin != TransactionOrigin::Local { trace!(target: "txqueue", @@ -647,7 +702,6 @@ impl TransactionQueue { self.gas_limit, self.tx_gas_limit ); - return Err(Error::Transaction(TransactionError::GasLimitExceeded { limit: self.gas_limit, got: tx.gas, @@ -722,6 +776,12 @@ impl TransactionQueue { None => return, Some(t) => t, }; + + // Never penalize local transactions + if transaction.origin.is_local() { + return; + } + let sender = transaction.sender(); // Penalize all transactions from this sender @@ -766,6 +826,11 @@ impl TransactionQueue { trace!(target: "txqueue", "Removing invalid transaction: {:?}", transaction.hash()); + // Mark in locals + if self.local_transactions.contains(transaction_hash) { + self.local_transactions.mark_invalid(transaction.transaction.clone()); + } + // Remove from future let order = self.future.drop(&sender, &nonce); if order.is_some() { @@ -788,6 +853,33 @@ impl TransactionQueue { } } + /// Marks all transactions from particular sender as local transactions + fn mark_transactions_local(&mut self, sender: &Address) { + fn mark_local(sender: &Address, set: &mut TransactionSet, mut mark: F) { + // Mark all transactions from this sender as local + let nonces_from_sender = set.by_address.row(sender) + .map(|row_map| { + row_map.iter().filter_map(|(nonce, order)| if order.origin.is_local() { + None + } else { + Some(*nonce) + }).collect::>() + }) + .unwrap_or_else(Vec::new); + + for k in nonces_from_sender { + let mut order = set.drop(sender, &k).expect("transaction known to be in self.current/self.future; qed"); + order.origin = TransactionOrigin::Local; + mark(order.hash); + set.insert(*sender, k, order); + } + } + + let local = &mut self.local_transactions; + mark_local(sender, &mut self.current, |hash| local.mark_pending(hash)); + mark_local(sender, &mut self.future, |hash| local.mark_future(hash)); + } + /// Update height of all transactions in future transactions set. fn update_future(&mut self, sender: &Address, current_nonce: U256) { // We need to drain all transactions for current sender from future and reinsert them with updated height @@ -821,15 +913,21 @@ impl TransactionQueue { qed"); if k >= current_nonce { let order = order.update_height(k, current_nonce); + if order.origin.is_local() { + self.local_transactions.mark_future(order.hash); + } if let Some(old) = self.future.insert(*sender, k, order.clone()) { - Self::replace_orders(*sender, k, old, order, &mut self.future, &mut self.by_hash); + Self::replace_orders(*sender, k, old, order, &mut self.future, &mut self.by_hash, &mut self.local_transactions); } } else { trace!(target: "txqueue", "Removing old transaction: {:?} (nonce: {} < {})", order.hash, k, current_nonce); - self.by_hash.remove(&order.hash).expect("All transactions in `future` are also in `by_hash`"); + let tx = self.by_hash.remove(&order.hash).expect("All transactions in `future` are also in `by_hash`"); + if tx.origin.is_local() { + self.local_transactions.mark_mined(tx.transaction); + } } } - self.future.enforce_limit(&mut self.by_hash); + self.future.enforce_limit(&mut self.by_hash, &mut self.local_transactions); } /// Returns top transactions from the queue ordered by priority. @@ -841,6 +939,11 @@ impl TransactionQueue { .collect() } + /// Returns local transactions (some of them might not be part of the queue anymore). + pub fn local_transactions(&self) -> &LinkedHashMap { + self.local_transactions.all_transactions() + } + #[cfg(test)] fn future_transactions(&self) -> Vec { self.future.by_priority @@ -897,8 +1000,11 @@ impl TransactionQueue { self.future.by_gas_price.remove(&order.gas_price, &order.hash); // Put to current let order = order.update_height(current_nonce, first_nonce); + if order.origin.is_local() { + self.local_transactions.mark_pending(order.hash); + } if let Some(old) = self.current.insert(address, current_nonce, order.clone()) { - Self::replace_orders(address, current_nonce, old, order, &mut self.current, &mut self.by_hash); + Self::replace_orders(address, current_nonce, old, order, &mut self.current, &mut self.by_hash, &mut self.local_transactions); } update_last_nonce_to = Some(current_nonce); current_nonce = current_nonce + U256::one(); @@ -953,13 +1059,19 @@ impl TransactionQueue { .cloned() .map_or(state_nonce, |n| n + U256::one()); + if tx.origin.is_local() { + self.mark_transactions_local(&address); + } + // Future transaction if nonce > next_nonce { // We have a gap - put to future. // Insert transaction (or replace old one with lower gas price) - try!(check_too_cheap(Self::replace_transaction(tx, state_nonce, min_gas_price, &mut self.future, &mut self.by_hash))); + try!(check_too_cheap( + Self::replace_transaction(tx, state_nonce, min_gas_price, &mut self.future, &mut self.by_hash, &mut self.local_transactions) + )); // Enforce limit in Future - let removed = self.future.enforce_limit(&mut self.by_hash); + let removed = self.future.enforce_limit(&mut self.by_hash, &mut self.local_transactions); // Return an error if this transaction was not imported because of limit. try!(check_if_removed(&address, &nonce, removed)); @@ -973,13 +1085,15 @@ impl TransactionQueue { self.move_matching_future_to_current(address, nonce + U256::one(), state_nonce); // Replace transaction if any - try!(check_too_cheap(Self::replace_transaction(tx, state_nonce, min_gas_price, &mut self.current, &mut self.by_hash))); + try!(check_too_cheap( + Self::replace_transaction(tx, state_nonce, min_gas_price, &mut self.current, &mut self.by_hash, &mut self.local_transactions) + )); // Keep track of highest nonce stored in current let new_max = self.last_nonces.get(&address).map_or(nonce, |n| cmp::max(nonce, *n)); self.last_nonces.insert(address, new_max); // Also enforce the limit - let removed = self.current.enforce_limit(&mut self.by_hash); + let removed = self.current.enforce_limit(&mut self.by_hash, &mut self.local_transactions); // If some transaction were removed because of limit we need to update last_nonces also. self.update_last_nonces(&removed); // Trigger error if the transaction we are importing was removed. @@ -1010,7 +1124,14 @@ impl TransactionQueue { /// /// Returns `true` if transaction actually got to the queue (`false` if there was already a transaction with higher /// gas_price) - fn replace_transaction(tx: VerifiedTransaction, base_nonce: U256, min_gas_price: (U256, PrioritizationStrategy), set: &mut TransactionSet, by_hash: &mut HashMap) -> bool { + fn replace_transaction( + tx: VerifiedTransaction, + base_nonce: U256, + min_gas_price: (U256, PrioritizationStrategy), + set: &mut TransactionSet, + by_hash: &mut HashMap, + local: &mut LocalTransactionsList, + ) -> bool { let order = TransactionOrder::for_transaction(&tx, base_nonce, min_gas_price.0, min_gas_price.1); let hash = tx.hash(); let address = tx.sender(); @@ -1019,16 +1140,27 @@ impl TransactionQueue { let old_hash = by_hash.insert(hash, tx); assert!(old_hash.is_none(), "Each hash has to be inserted exactly once."); + trace!(target: "txqueue", "Inserting: {:?}", order); if let Some(old) = set.insert(address, nonce, order.clone()) { - Self::replace_orders(address, nonce, old, order, set, by_hash) + Self::replace_orders(address, nonce, old, order, set, by_hash, local) } else { true } } - fn replace_orders(address: Address, nonce: U256, old: TransactionOrder, order: TransactionOrder, set: &mut TransactionSet, by_hash: &mut HashMap) -> bool { + fn replace_orders( + address: Address, + nonce: U256, + old: TransactionOrder, + order: TransactionOrder, + set: &mut TransactionSet, + by_hash: &mut HashMap, + local: &mut LocalTransactionsList, + ) -> bool { // There was already transaction in queue. Let's check which one should stay + let old_hash = old.hash; + let new_hash = order.hash; let old_fee = old.gas_price; let new_fee = order.gas_price; if old_fee.cmp(&new_fee) == Ordering::Greater { @@ -1036,12 +1168,18 @@ impl TransactionQueue { // Put back old transaction since it has greater priority (higher gas_price) set.insert(address, nonce, old); // and remove new one - by_hash.remove(&order.hash).expect("The hash has been just inserted and no other line is altering `by_hash`."); + let order = by_hash.remove(&order.hash).expect("The hash has been just inserted and no other line is altering `by_hash`."); + if order.origin.is_local() { + local.mark_replaced(order.transaction, old_fee, old_hash); + } false } else { trace!(target: "txqueue", "Replaced transaction: {:?} with transaction with higher gas price: {:?}", old.hash, order.hash); // Make sure we remove old transaction entirely - by_hash.remove(&old.hash).expect("The hash is coming from `future` so it has to be in `by_hash`."); + let old = by_hash.remove(&old.hash).expect("The hash is coming from `future` so it has to be in `by_hash`."); + if old.origin.is_local() { + local.mark_replaced(old.transaction, new_fee, new_hash); + } true } } @@ -1078,6 +1216,7 @@ mod test { use error::{Error, TransactionError}; use super::*; use super::{TransactionSet, TransactionOrder, VerifiedTransaction}; + use miner::local_transactions::LocalTransactionsList; use client::TransactionImportResult; fn unwrap_tx_err(err: Result) -> TransactionError { @@ -1208,6 +1347,7 @@ mod test { #[test] fn should_create_transaction_set() { // given + let mut local = LocalTransactionsList::default(); let mut set = TransactionSet { by_priority: BTreeSet::new(), by_address: Table::new(), @@ -1235,7 +1375,7 @@ mod test { assert_eq!(set.by_address.len(), 2); // when - set.enforce_limit(&mut by_hash); + set.enforce_limit(&mut by_hash, &mut local); // then assert_eq!(by_hash.len(), 1); @@ -1628,6 +1768,31 @@ mod test { assert_eq!(top.len(), 2); } + #[test] + fn when_importing_local_should_mark_others_from_the_same_sender_as_local() { + // given + let mut txq = TransactionQueue::default(); + let (tx1, tx2) = new_tx_pair_default(1.into(), 0.into()); + // the second one has same nonce but higher `gas_price` + let (_, tx0) = new_similar_tx_pair(); + + txq.add(tx0.clone(), TransactionOrigin::External, &default_account_details, &gas_estimator).unwrap(); + txq.add(tx1.clone(), TransactionOrigin::External, &default_account_details, &gas_estimator).unwrap(); + // the one with higher gas price is first + assert_eq!(txq.top_transactions()[0], tx0); + assert_eq!(txq.top_transactions()[1], tx1); + + // when + // insert second as local + txq.add(tx2.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap(); + + // then + // the order should be updated + assert_eq!(txq.top_transactions()[0], tx1); + assert_eq!(txq.top_transactions()[1], tx2); + assert_eq!(txq.top_transactions()[2], tx0); + } + #[test] fn should_prioritize_reimported_transactions_within_same_nonce_height() { // given @@ -1695,6 +1860,38 @@ mod test { assert_eq!(top.len(), 4); } + #[test] + fn should_not_penalize_local_transactions() { + // given + let mut txq = TransactionQueue::default(); + // txa, txb - slightly bigger gas price to have consistent ordering + let (txa, txb) = new_tx_pair_default(1.into(), 0.into()); + let (tx1, tx2) = new_tx_pair_with_gas_price_increment(3.into()); + + // insert everything + txq.add(txa.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap(); + txq.add(txb.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap(); + txq.add(tx1.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap(); + txq.add(tx2.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap(); + + let top = txq.top_transactions(); + assert_eq!(top[0], tx1); + assert_eq!(top[1], txa); + assert_eq!(top[2], tx2); + assert_eq!(top[3], txb); + assert_eq!(top.len(), 4); + + // when + txq.penalize(&tx1.hash()); + + // then (order is the same) + let top = txq.top_transactions(); + assert_eq!(top[0], tx1); + assert_eq!(top[1], txa); + assert_eq!(top[2], tx2); + assert_eq!(top[3], txb); + assert_eq!(top.len(), 4); + } #[test] fn should_penalize_transactions_from_sender() { @@ -1940,12 +2137,11 @@ mod test { let mut txq = TransactionQueue::with_limits(PrioritizationStrategy::GasPriceOnly, 100, default_gas_val() * U256::from(2), !U256::zero()); let (tx1, tx2) = new_tx_pair_default(U256::from(1), U256::from(1)); let (tx3, tx4) = new_tx_pair_default(U256::from(1), U256::from(2)); - let (tx5, tx6) = new_tx_pair_default(U256::from(1), U256::from(2)); + let (tx5, _) = new_tx_pair_default(U256::from(1), U256::from(2)); txq.add(tx1.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap(); txq.add(tx2.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap(); - txq.add(tx5.clone(), TransactionOrigin::External, &default_account_details, &gas_estimator).unwrap(); // Not accepted because of limit - txq.add(tx6.clone(), TransactionOrigin::External, &default_account_details, &gas_estimator).unwrap_err(); + txq.add(tx5.clone(), TransactionOrigin::External, &default_account_details, &gas_estimator).unwrap_err(); txq.add(tx3.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap(); txq.add(tx4.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap(); assert_eq!(txq.status().pending, 4); diff --git a/ipc/rpc/src/binary.rs b/ipc/rpc/src/binary.rs index 3908992d1..e974626d0 100644 --- a/ipc/rpc/src/binary.rs +++ b/ipc/rpc/src/binary.rs @@ -16,7 +16,7 @@ //! Binary representation of types -use util::{U256, U512, H256, H2048, Address}; +use util::{U256, U512, H256, H512, H2048, Address}; use std::mem; use std::collections::{VecDeque, BTreeMap}; use std::ops::Range; @@ -800,6 +800,7 @@ binary_fixed_size!(bool); binary_fixed_size!(U256); binary_fixed_size!(U512); binary_fixed_size!(H256); +binary_fixed_size!(H512); binary_fixed_size!(H2048); binary_fixed_size!(Address); binary_fixed_size!(BinHandshake); diff --git a/js/src/api/rpc/parity/parity.js b/js/src/api/rpc/parity/parity.js index a33828b80..d14cc6554 100644 --- a/js/src/api/rpc/parity/parity.js +++ b/js/src/api/rpc/parity/parity.js @@ -15,7 +15,7 @@ // along with Parity. If not, see . import { inAddress, inData, inHex, inNumber16, inOptions } from '../../format/input'; -import { outAccountInfo, outAddress, outHistogram, outNumber, outPeers } from '../../format/output'; +import { outAccountInfo, outAddress, outHistogram, outNumber, outPeers, outTransaction } from '../../format/output'; export default class Parity { constructor (transport) { @@ -117,16 +117,29 @@ export default class Parity { .execute('parity_hashContent', url); } + importGethAccounts (accounts) { + return this._transport + .execute('parity_importGethAccounts', (accounts || []).map(inAddress)) + .then((accounts) => (accounts || []).map(outAddress)); + } + listGethAccounts () { return this._transport .execute('parity_listGethAccounts') .then((accounts) => (accounts || []).map(outAddress)); } - importGethAccounts (accounts) { + localTransactions () { return this._transport - .execute('parity_importGethAccounts', (accounts || []).map(inAddress)) - .then((accounts) => (accounts || []).map(outAddress)); + .execute('parity_localTransactions') + .then(transactions => { + Object.values(transactions) + .filter(tx => tx.transaction) + .map(tx => { + tx.transaction = outTransaction(tx.transaction); + }); + return transactions; + }); } minGasPrice () { @@ -192,6 +205,17 @@ export default class Parity { .execute('parity_nodeName'); } + pendingTransactions () { + return this._transport + .execute('parity_pendingTransactions') + .then(data => data.map(outTransaction)); + } + + pendingTransactionsStats () { + return this._transport + .execute('parity_pendingTransactionsStats'); + } + phraseToAddress (phrase) { return this._transport .execute('parity_phraseToAddress', phrase) diff --git a/js/src/dapps/localtx.html b/js/src/dapps/localtx.html new file mode 100644 index 000000000..d1e6fed05 --- /dev/null +++ b/js/src/dapps/localtx.html @@ -0,0 +1,17 @@ + + + + + + + + Local transactions Viewer + + +
+ + + + + + diff --git a/js/src/dapps/localtx.js b/js/src/dapps/localtx.js new file mode 100644 index 000000000..98561f33f --- /dev/null +++ b/js/src/dapps/localtx.js @@ -0,0 +1,33 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +import ReactDOM from 'react-dom'; +import React from 'react'; + +import injectTapEventPlugin from 'react-tap-event-plugin'; +injectTapEventPlugin(); + +import Application from './localtx/Application'; + +import '../../assets/fonts/Roboto/font.css'; +import '../../assets/fonts/RobotoMono/font.css'; +import './style.css'; +import './localtx.html'; + +ReactDOM.render( + , + document.querySelector('#container') +); diff --git a/js/src/dapps/localtx/Application/application.css b/js/src/dapps/localtx/Application/application.css new file mode 100644 index 000000000..4b5f0bc31 --- /dev/null +++ b/js/src/dapps/localtx/Application/application.css @@ -0,0 +1,19 @@ +.container { + padding: 1rem 2rem; + text-align: center; + + h1 { + margin-top: 3rem; + margin-bottom: 1rem; + } + + table { + text-align: left; + margin: auto; + max-width: 90vw; + + th { + text-align: center; + } + } +} diff --git a/js/src/dapps/localtx/Application/application.js b/js/src/dapps/localtx/Application/application.js new file mode 100644 index 000000000..89b0b73b4 --- /dev/null +++ b/js/src/dapps/localtx/Application/application.js @@ -0,0 +1,203 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +import BigNumber from 'bignumber.js'; +import React, { Component } from 'react'; + +import { api } from '../parity'; + +import styles from './application.css'; + +import { Transaction, LocalTransaction } from '../Transaction'; + +export default class Application extends Component { + state = { + loading: true, + transactions: [], + localTransactions: {}, + blockNumber: 0 + } + + componentDidMount () { + const poll = () => this.fetchTransactionData().then(poll).catch(poll); + this._timeout = setTimeout(poll, 2000); + } + + componentWillUnmount () { + clearTimeout(this._timeout); + } + + fetchTransactionData () { + return Promise.all([ + api.parity.pendingTransactions(), + api.parity.pendingTransactionsStats(), + api.parity.localTransactions(), + api.eth.blockNumber() + ]).then(([pending, stats, local, blockNumber]) => { + // Combine results together + const transactions = pending.map(tx => { + return { + transaction: tx, + stats: stats[tx.hash], + isLocal: !!local[tx.hash] + }; + }); + + // Add transaction data to locals + transactions + .filter(tx => tx.isLocal) + .map(data => { + const tx = data.transaction; + local[tx.hash].transaction = tx; + local[tx.hash].stats = data.stats; + }); + + // Convert local transactions to array + const localTransactions = Object.keys(local).map(hash => { + const data = local[hash]; + data.txHash = hash; + return data; + }); + + // Sort local transactions by nonce (move future to the end) + localTransactions.sort((a, b) => { + a = a.transaction || {}; + b = b.transaction || {}; + + if (a.from && b.from && a.from !== b.from) { + return a.from < b.from; + } + + if (!a.nonce || !b.nonce) { + return !a.nonce ? 1 : -1; + } + + return new BigNumber(a.nonce).comparedTo(new BigNumber(b.nonce)); + }); + + this.setState({ + loading: false, + transactions, + localTransactions, + blockNumber + }); + }); + } + + render () { + const { loading } = this.state; + + if (loading) { + return ( +
Loading...
+ ); + } + + return ( +
+

Your local transactions

+ { this.renderLocals() } +

Transactions in the queue

+ { this.renderQueueSummary() } + { this.renderQueue() } +
+ ); + } + + renderQueueSummary () { + const { transactions } = this.state; + if (!transactions.length) { + return null; + } + + const count = transactions.length; + const locals = transactions.filter(tx => tx.isLocal).length; + const fee = transactions + .map(tx => tx.transaction) + .map(tx => tx.gasPrice.mul(tx.gas)) + .reduce((sum, fee) => sum.add(fee), new BigNumber(0)); + + return ( +

+ Count: { locals ? `${count} (${locals})` : count } +   + Total Fee: { api.util.fromWei(fee).toFixed(3) } ETH +

+ ); + } + + renderQueue () { + const { blockNumber, transactions } = this.state; + if (!transactions.length) { + return ( +

The queue seems is empty.

+ ); + } + + return ( + + + { Transaction.renderHeader() } + + + { + transactions.map((tx, idx) => ( + + )) + } + +
+ ); + } + + renderLocals () { + const { localTransactions } = this.state; + if (!localTransactions.length) { + return ( +

You haven't sent any transactions yet.

+ ); + } + + return ( + + + { LocalTransaction.renderHeader() } + + + { + localTransactions.map(tx => ( + + )) + } + +
+ ); + } +} diff --git a/js/src/dapps/localtx/Application/application.spec.js b/js/src/dapps/localtx/Application/application.spec.js new file mode 100644 index 000000000..2044b4e14 --- /dev/null +++ b/js/src/dapps/localtx/Application/application.spec.js @@ -0,0 +1,32 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +import React from 'react'; +import { shallow } from 'enzyme'; + +import '../../../environment/tests'; + +import Application from './application'; + +describe('localtx/Application', () => { + describe('rendering', () => { + it('renders without crashing', () => { + const rendered = shallow(); + + expect(rendered).to.be.defined; + }); + }); +}); diff --git a/js/src/dapps/localtx/Application/index.js b/js/src/dapps/localtx/Application/index.js new file mode 100644 index 000000000..236578226 --- /dev/null +++ b/js/src/dapps/localtx/Application/index.js @@ -0,0 +1,17 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +export default from './application'; diff --git a/js/src/dapps/localtx/Transaction/index.js b/js/src/dapps/localtx/Transaction/index.js new file mode 100644 index 000000000..56854f412 --- /dev/null +++ b/js/src/dapps/localtx/Transaction/index.js @@ -0,0 +1,17 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +export { Transaction, LocalTransaction } from './transaction'; diff --git a/js/src/dapps/localtx/Transaction/transaction.css b/js/src/dapps/localtx/Transaction/transaction.css new file mode 100644 index 000000000..c49d9479f --- /dev/null +++ b/js/src/dapps/localtx/Transaction/transaction.css @@ -0,0 +1,31 @@ +.from { + white-space: nowrap; + + img { + vertical-align: middle; + } +} + +.transaction { + td { + padding: 7px 15px; + } + + td:first-child { + padding: 7px 0; + } + + &.local { + background: #8bc34a; + } +} + +.nowrap { + white-space: nowrap; +} + +.edit { + label, input { + display: block; + } +} diff --git a/js/src/dapps/localtx/Transaction/transaction.js b/js/src/dapps/localtx/Transaction/transaction.js new file mode 100644 index 000000000..90981304b --- /dev/null +++ b/js/src/dapps/localtx/Transaction/transaction.js @@ -0,0 +1,382 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +import BigNumber from 'bignumber.js'; +import React, { Component, PropTypes } from 'react'; +import classnames from 'classnames'; + +import { api } from '../parity'; + +import styles from './transaction.css'; + +import IdentityIcon from '../../githubhint/IdentityIcon'; + +class BaseTransaction extends Component { + + shortHash (hash) { + return `${hash.substr(0, 5)}..${hash.substr(hash.length - 3)}`; + } + + renderHash (hash) { + return ( + + { this.shortHash(hash) } + + ); + } + + renderFrom (transaction) { + if (!transaction) { + return '-'; + } + + return ( +
+ + 0x{ transaction.nonce.toString(16) } +
+ ); + } + + renderGasPrice (transaction) { + if (!transaction) { + return '-'; + } + + return ( + + { api.util.fromWei(transaction.gasPrice, 'shannon').toFormat(2) } shannon + + ); + } + + renderGas (transaction) { + if (!transaction) { + return '-'; + } + + return ( + + { transaction.gas.div(10 ** 6).toFormat(3) } MGas + + ); + } + + renderPropagation (stats) { + const noOfPeers = Object.keys(stats.propagatedTo).length; + const noOfPropagations = Object.values(stats.propagatedTo).reduce((sum, val) => sum + val, 0); + + return ( + + { noOfPropagations } ({ noOfPeers } peers) + + ); + } +} + +export class Transaction extends BaseTransaction { + + static propTypes = { + idx: PropTypes.number.isRequired, + transaction: PropTypes.object.isRequired, + blockNumber: PropTypes.object.isRequired, + isLocal: PropTypes.bool, + stats: PropTypes.object + }; + + static defaultProps = { + isLocal: false, + stats: { + firstSeen: 0, + propagatedTo: {} + } + }; + + static renderHeader () { + return ( + + + + Transaction + + + From + + + Gas Price + + + Gas + + + First propagation + + + # Propagated + + + + + ); + } + + render () { + const { isLocal, stats, transaction, idx } = this.props; + const blockNo = new BigNumber(stats.firstSeen); + + const clazz = classnames(styles.transaction, { + [styles.local]: isLocal + }); + + return ( + + + { idx }. + + + { this.renderHash(transaction.hash) } + + + { this.renderFrom(transaction) } + + + { this.renderGasPrice(transaction) } + + + { this.renderGas(transaction) } + + + { this.renderTime(stats.firstSeen) } + + + { this.renderPropagation(stats) } + + + ); + } + + renderTime (firstSeen) { + const { blockNumber } = this.props; + if (!firstSeen) { + return 'never'; + } + + const timeInMinutes = blockNumber.sub(firstSeen).mul(14).div(60).toFormat(1); + return `${timeInMinutes} minutes ago`; + } +} + +export class LocalTransaction extends BaseTransaction { + + static propTypes = { + hash: PropTypes.string.isRequired, + status: PropTypes.string.isRequired, + transaction: PropTypes.object, + isLocal: PropTypes.bool, + stats: PropTypes.object, + details: PropTypes.object + }; + + static defaultProps = { + stats: { + propagatedTo: {} + } + }; + + static renderHeader () { + return ( + + + + Transaction + + + From + + + Gas Price / Gas + + + Status + + + ); + } + + state = { + isSending: false, + isResubmitting: false, + gasPrice: null, + gas: null + }; + + toggleResubmit = () => { + const { transaction } = this.props; + const { isResubmitting, gasPrice } = this.state; + + this.setState({ + isResubmitting: !isResubmitting + }); + + if (gasPrice === null) { + this.setState({ + gasPrice: `0x${transaction.gasPrice.toString(16)}`, + gas: `0x${transaction.gas.toString(16)}` + }); + } + }; + + setGasPrice = el => { + this.setState({ + gasPrice: el.target.value + }); + }; + + setGas = el => { + this.setState({ + gas: el.target.value + }); + }; + + sendTransaction = () => { + const { transaction } = this.props; + const { gasPrice, gas } = this.state; + + const newTransaction = { + from: transaction.from, + to: transaction.to, + nonce: transaction.nonce, + value: transaction.value, + data: transaction.data, + gasPrice, gas + }; + + this.setState({ + isResubmitting: false, + isSending: true + }); + + const closeSending = () => this.setState({ + isSending: false, + gasPrice: null, + gas: null + }); + + api.eth.sendTransaction(newTransaction) + .then(closeSending) + .catch(closeSending); + }; + + render () { + if (this.state.isResubmitting) { + return this.renderResubmit(); + } + + const { stats, transaction, hash, status } = this.props; + const { isSending } = this.state; + + const resubmit = isSending ? ( + 'sending...' + ) : ( + + resubmit + + ); + + return ( + + + { !transaction ? null : resubmit } + + + { this.renderHash(hash) } + + + { this.renderFrom(transaction) } + + + { this.renderGasPrice(transaction) } +
+ { this.renderGas(transaction) } + + + { this.renderStatus() } +
+ { status === 'pending' ? this.renderPropagation(stats) : null } + + + ); + } + + renderStatus () { + const { details } = this.props; + + let state = { + 'pending': () => 'In queue: Pending', + 'future': () => 'In queue: Future', + 'mined': () => 'Mined', + 'dropped': () => 'Dropped because of queue limit', + 'invalid': () => 'Transaction is invalid', + 'rejected': () => `Rejected: ${details.error}`, + 'replaced': () => `Replaced by ${this.shortHash(details.hash)}` + }[this.props.status]; + + return state ? state() : 'unknown'; + } + + // TODO [ToDr] Gas Price / Gas selection is not needed + // when signer supports gasPrice/gas tunning. + renderResubmit () { + const { transaction } = this.props; + const { gasPrice, gas } = this.state; + + return ( + + + + cancel + + + + { this.renderHash(transaction.hash) } + + + { this.renderFrom(transaction) } + + + + + + + + Send + + + + ); + } + +} diff --git a/js/src/dapps/localtx/Transaction/transaction.spec.js b/js/src/dapps/localtx/Transaction/transaction.spec.js new file mode 100644 index 000000000..5e9c39147 --- /dev/null +++ b/js/src/dapps/localtx/Transaction/transaction.spec.js @@ -0,0 +1,67 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +import React from 'react'; +import { shallow } from 'enzyme'; + +import '../../../environment/tests'; +import EthApi from '../../../api'; + +// Mock API for tests +import * as Api from '../parity'; +Api.api = { + util: EthApi.prototype.util +}; + +import BigNumber from 'bignumber.js'; +import { Transaction, LocalTransaction } from './transaction'; + +describe('localtx/Transaction', () => { + describe('rendering', () => { + it('renders without crashing', () => { + const transaction = { + hash: '0x1234567890', + nonce: 15, + gasPrice: new BigNumber(10), + gas: new BigNumber(10) + }; + const rendered = shallow( + + ); + + expect(rendered).to.be.defined; + }); + }); +}); + +describe('localtx/LocalTransaction', () => { + describe('rendering', () => { + it('renders without crashing', () => { + const rendered = shallow( + + ); + + expect(rendered).to.be.defined; + }); + }); +}); diff --git a/js/src/dapps/localtx/parity.js b/js/src/dapps/localtx/parity.js new file mode 100644 index 000000000..acee4dee0 --- /dev/null +++ b/js/src/dapps/localtx/parity.js @@ -0,0 +1,21 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +const api = window.parent.secureApi; + +export { + api +}; diff --git a/js/src/jsonrpc/interfaces/parity.js b/js/src/jsonrpc/interfaces/parity.js index 5dd313e00..a5717f502 100644 --- a/js/src/jsonrpc/interfaces/parity.js +++ b/js/src/jsonrpc/interfaces/parity.js @@ -224,15 +224,6 @@ export default { } }, - listGethAccounts: { - desc: 'Returns a list of the accounts available from Geth', - params: [], - returns: { - type: Array, - desc: '20 Bytes addresses owned by the client.' - } - }, - importGethAccounts: { desc: 'Imports a list of accounts from geth', params: [ @@ -247,6 +238,24 @@ export default { } }, + listGethAccounts: { + desc: 'Returns a list of the accounts available from Geth', + params: [], + returns: { + type: Array, + desc: '20 Bytes addresses owned by the client.' + } + }, + + localTransactions: { + desc: 'Returns an object of current and past local transactions.', + params: [], + returns: { + type: Object, + desc: 'Mapping of `tx hash` into status object.' + } + }, + minGasPrice: { desc: 'Returns currently set minimal gas price', params: [], @@ -379,6 +388,24 @@ export default { } }, + pendingTransactions: { + desc: 'Returns a list of transactions currently in the queue.', + params: [], + returns: { + type: Array, + desc: 'Transactions ordered by priority' + } + }, + + pendingTransactionsStats: { + desc: 'Returns propagation stats for transactions in the queue', + params: [], + returns: { + type: Object, + desc: 'mapping of `tx hash` into `stats`' + } + }, + phraseToAddress: { desc: 'Converts a secret phrase into the corresponting address', params: [ diff --git a/js/src/views/Dapps/builtin.json b/js/src/views/Dapps/builtin.json index 80b18c065..9aef97615 100644 --- a/js/src/views/Dapps/builtin.json +++ b/js/src/views/Dapps/builtin.json @@ -44,5 +44,14 @@ "version": "1.0.0", "visible": false, "secure": true + }, + { + "id": "0xae74ad174b95cdbd01c88ac5b73a296d33e9088fc2a200e76bcedf3a94a7815d", + "url": "localtx", + "name": "TxQueue Viewer", + "description": "Have a peak on internals of transaction queue of your node.", + "author": "Parity Team ", + "version": "1.0.0", + "secure": true } ] diff --git a/js/webpack.config.js b/js/webpack.config.js index 4413299fa..7d445262e 100644 --- a/js/webpack.config.js +++ b/js/webpack.config.js @@ -40,6 +40,7 @@ module.exports = { 'githubhint': ['./dapps/githubhint.js'], 'registry': ['./dapps/registry.js'], 'signaturereg': ['./dapps/signaturereg.js'], + 'localtx': ['./dapps/localtx.js'], 'tokenreg': ['./dapps/tokenreg.js'], // app 'index': ['./index.js'] diff --git a/rpc/src/v1/helpers/errors.rs b/rpc/src/v1/helpers/errors.rs index d36feca4b..673987084 100644 --- a/rpc/src/v1/helpers/errors.rs +++ b/rpc/src/v1/helpers/errors.rs @@ -22,7 +22,7 @@ macro_rules! rpc_unimplemented { use std::fmt; use rlp::DecoderError; -use ethcore::error::{Error as EthcoreError, CallError}; +use ethcore::error::{Error as EthcoreError, CallError, TransactionError}; use ethcore::account_provider::{Error as AccountError}; use fetch::FetchError; use jsonrpc_core::{Error, ErrorCode, Value}; @@ -227,40 +227,44 @@ pub fn from_password_error(error: AccountError) -> Error { } } -pub fn from_transaction_error(error: EthcoreError) -> Error { +pub fn transaction_message(error: TransactionError) -> String { use ethcore::error::TransactionError::*; + match error { + AlreadyImported => "Transaction with the same hash was already imported.".into(), + Old => "Transaction nonce is too low. Try incrementing the nonce.".into(), + TooCheapToReplace => { + "Transaction gas price is too low. There is another transaction with same nonce in the queue. Try increasing the gas price or incrementing the nonce.".into() + }, + LimitReached => { + "There are too many transactions in the queue. Your transaction was dropped due to limit. Try increasing the fee.".into() + }, + InsufficientGas { minimal, got } => { + format!("Transaction gas is too low. There is not enough gas to cover minimal cost of the transaction (minimal: {}, got: {}). Try increasing supplied gas.", minimal, got) + }, + InsufficientGasPrice { minimal, got } => { + format!("Transaction gas price is too low. It does not satisfy your node's minimal gas price (minimal: {}, got: {}). Try increasing the gas price.", minimal, got) + }, + InsufficientBalance { balance, cost } => { + format!("Insufficient funds. Account you try to send transaction from does not have enough funds. Required {} and got: {}.", cost, balance) + }, + GasLimitExceeded { limit, got } => { + format!("Transaction cost exceeds current gas limit. Limit: {}, got: {}. Try decreasing supplied gas.", limit, got) + }, + InvalidNetworkId => "Invalid network id.".into(), + InvalidGasLimit(_) => "Supplied gas is beyond limit.".into(), + SenderBanned => "Sender is banned in local queue.".into(), + RecipientBanned => "Recipient is banned in local queue.".into(), + CodeBanned => "Code is banned in local queue.".into(), + } +} + +pub fn from_transaction_error(error: EthcoreError) -> Error { + if let EthcoreError::Transaction(e) = error { - let msg = match e { - AlreadyImported => "Transaction with the same hash was already imported.".into(), - Old => "Transaction nonce is too low. Try incrementing the nonce.".into(), - TooCheapToReplace => { - "Transaction gas price is too low. There is another transaction with same nonce in the queue. Try increasing the gas price or incrementing the nonce.".into() - }, - LimitReached => { - "There are too many transactions in the queue. Your transaction was dropped due to limit. Try increasing the fee.".into() - }, - InsufficientGas { minimal, got } => { - format!("Transaction gas is too low. There is not enough gas to cover minimal cost of the transaction (minimal: {}, got: {}). Try increasing supplied gas.", minimal, got) - }, - InsufficientGasPrice { minimal, got } => { - format!("Transaction gas price is too low. It does not satisfy your node's minimal gas price (minimal: {}, got: {}). Try increasing the gas price.", minimal, got) - }, - InsufficientBalance { balance, cost } => { - format!("Insufficient funds. Account you try to send transaction from does not have enough funds. Required {} and got: {}.", cost, balance) - }, - GasLimitExceeded { limit, got } => { - format!("Transaction cost exceeds current gas limit. Limit: {}, got: {}. Try decreasing supplied gas.", limit, got) - }, - InvalidGasLimit(_) => "Supplied gas is beyond limit.".into(), - SenderBanned => "Sender is banned in local queue.".into(), - RecipientBanned => "Recipient is banned in local queue.".into(), - CodeBanned => "Code is banned in local queue.".into(), - e => format!("{}", e).into(), - }; Error { code: ErrorCode::ServerError(codes::TRANSACTION_ERROR), - message: msg, + message: transaction_message(e), data: None, } } else { diff --git a/rpc/src/v1/impls/parity.rs b/rpc/src/v1/impls/parity.rs index 1b8ee9695..1fdcbdef8 100644 --- a/rpc/src/v1/impls/parity.rs +++ b/rpc/src/v1/impls/parity.rs @@ -34,7 +34,11 @@ use ethcore::account_provider::AccountProvider; use jsonrpc_core::Error; use v1::traits::Parity; -use v1::types::{Bytes, U256, H160, H256, H512, Peers, Transaction, RpcSettings, Histogram}; +use v1::types::{ + Bytes, U256, H160, H256, H512, + Peers, Transaction, RpcSettings, Histogram, + TransactionStats, LocalTransactionStatus, +}; use v1::helpers::{errors, SigningQueue, SignerService, NetworkSettings}; use v1::helpers::dispatch::DEFAULT_MAC; @@ -259,6 +263,27 @@ impl Parity for ParityClient where Ok(take_weak!(self.miner).all_transactions().into_iter().map(Into::into).collect::>()) } + fn pending_transactions_stats(&self) -> Result, Error> { + try!(self.active()); + + let stats = take_weak!(self.sync).transactions_stats(); + Ok(stats.into_iter() + .map(|(hash, stats)| (hash.into(), stats.into())) + .collect() + ) + } + + fn local_transactions(&self) -> Result, Error> { + try!(self.active()); + + let transactions = take_weak!(self.miner).local_transactions(); + Ok(transactions + .into_iter() + .map(|(hash, status)| (hash.into(), status.into())) + .collect() + ) + } + fn signer_port(&self) -> Result { try!(self.active()); diff --git a/rpc/src/v1/tests/helpers/miner_service.rs b/rpc/src/v1/tests/helpers/miner_service.rs index af158d564..ad55faa7b 100644 --- a/rpc/src/v1/tests/helpers/miner_service.rs +++ b/rpc/src/v1/tests/helpers/miner_service.rs @@ -24,7 +24,7 @@ use ethcore::block::{ClosedBlock, IsBlock}; use ethcore::header::BlockNumber; use ethcore::transaction::SignedTransaction; use ethcore::receipt::{Receipt, RichReceipt}; -use ethcore::miner::{MinerService, MinerStatus, TransactionImportResult}; +use ethcore::miner::{MinerService, MinerStatus, TransactionImportResult, LocalTransactionStatus}; /// Test miner service. pub struct TestMinerService { @@ -34,6 +34,8 @@ pub struct TestMinerService { pub latest_closed_block: Mutex>, /// Pre-existed pending transactions pub pending_transactions: Mutex>, + /// Pre-existed local transactions + pub local_transactions: Mutex>, /// Pre-existed pending receipts pub pending_receipts: Mutex>, /// Last nonces. @@ -53,6 +55,7 @@ impl Default for TestMinerService { imported_transactions: Mutex::new(Vec::new()), latest_closed_block: Mutex::new(None), pending_transactions: Mutex::new(HashMap::new()), + local_transactions: Mutex::new(BTreeMap::new()), pending_receipts: Mutex::new(BTreeMap::new()), last_nonces: RwLock::new(HashMap::new()), min_gas_price: RwLock::new(U256::from(20_000_000)), @@ -195,6 +198,10 @@ impl MinerService for TestMinerService { self.pending_transactions.lock().values().cloned().collect() } + fn local_transactions(&self) -> BTreeMap { + self.local_transactions.lock().iter().map(|(hash, stats)| (*hash, stats.clone())).collect() + } + fn pending_transactions(&self, _best_block: BlockNumber) -> Vec { self.pending_transactions.lock().values().cloned().collect() } diff --git a/rpc/src/v1/tests/helpers/sync_provider.rs b/rpc/src/v1/tests/helpers/sync_provider.rs index e0f811fc0..24be33417 100644 --- a/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/rpc/src/v1/tests/helpers/sync_provider.rs @@ -16,8 +16,9 @@ //! Test implementation of SyncProvider. -use util::{RwLock}; -use ethsync::{SyncProvider, SyncStatus, SyncState, PeerInfo}; +use std::collections::BTreeMap; +use util::{H256, RwLock}; +use ethsync::{SyncProvider, SyncStatus, SyncState, PeerInfo, TransactionStats}; /// TestSyncProvider config. pub struct Config { @@ -74,7 +75,7 @@ impl SyncProvider for TestSyncProvider { PeerInfo { id: Some("node1".to_owned()), client_version: "Parity/1".to_owned(), - capabilities: vec!["eth/62".to_owned(), "eth/63".to_owned()], + capabilities: vec!["eth/62".to_owned(), "eth/63".to_owned()], remote_address: "127.0.0.1:7777".to_owned(), local_address: "127.0.0.1:8888".to_owned(), eth_version: 62, @@ -84,7 +85,7 @@ impl SyncProvider for TestSyncProvider { PeerInfo { id: None, client_version: "Parity/2".to_owned(), - capabilities: vec!["eth/63".to_owned(), "eth/64".to_owned()], + capabilities: vec!["eth/63".to_owned(), "eth/64".to_owned()], remote_address: "Handshake".to_owned(), local_address: "127.0.0.1:3333".to_owned(), eth_version: 64, @@ -97,5 +98,22 @@ impl SyncProvider for TestSyncProvider { fn enode(&self) -> Option { None } + + fn transactions_stats(&self) -> BTreeMap { + map![ + 1.into() => TransactionStats { + first_seen: 10, + propagated_to: map![ + 128.into() => 16 + ] + }, + 5.into() => TransactionStats { + first_seen: 16, + propagated_to: map![ + 16.into() => 1 + ] + } + ] + } } diff --git a/rpc/src/v1/tests/mocked/parity.rs b/rpc/src/v1/tests/mocked/parity.rs index b5c8187c7..5226e2f96 100644 --- a/rpc/src/v1/tests/mocked/parity.rs +++ b/rpc/src/v1/tests/mocked/parity.rs @@ -18,8 +18,9 @@ use std::sync::Arc; use util::log::RotatingLogger; use util::Address; use ethsync::ManageNetwork; -use ethcore::client::{TestBlockChainClient}; use ethcore::account_provider::AccountProvider; +use ethcore::client::{TestBlockChainClient}; +use ethcore::miner::LocalTransactionStatus; use ethstore::ethkey::{Generator, Random}; use jsonrpc_core::IoHandler; @@ -355,3 +356,28 @@ fn rpc_parity_next_nonce() { assert_eq!(io1.handle_request_sync(&request), Some(response1.to_owned())); assert_eq!(io2.handle_request_sync(&request), Some(response2.to_owned())); } + +#[test] +fn rpc_parity_transactions_stats() { + let deps = Dependencies::new(); + let io = deps.default_client(); + + let request = r#"{"jsonrpc": "2.0", "method": "parity_pendingTransactionsStats", "params":[], "id": 1}"#; + let response = r#"{"jsonrpc":"2.0","result":{"0x0000000000000000000000000000000000000000000000000000000000000001":{"firstSeen":10,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000080":16}},"0x0000000000000000000000000000000000000000000000000000000000000005":{"firstSeen":16,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010":1}}},"id":1}"#; + + assert_eq!(io.handle_request_sync(request), Some(response.to_owned())); +} + +#[test] +fn rpc_parity_local_transactions() { + let deps = Dependencies::new(); + let io = deps.default_client(); + deps.miner.local_transactions.lock().insert(10.into(), LocalTransactionStatus::Pending); + deps.miner.local_transactions.lock().insert(15.into(), LocalTransactionStatus::Future); + + let request = r#"{"jsonrpc": "2.0", "method": "parity_localTransactions", "params":[], "id": 1}"#; + let response = r#"{"jsonrpc":"2.0","result":{"0x000000000000000000000000000000000000000000000000000000000000000a":{"status":"pending"},"0x000000000000000000000000000000000000000000000000000000000000000f":{"status":"future"}},"id":1}"#; + + assert_eq!(io.handle_request_sync(request), Some(response.to_owned())); +} + diff --git a/rpc/src/v1/traits/parity.rs b/rpc/src/v1/traits/parity.rs index f8c219a89..b4df594e8 100644 --- a/rpc/src/v1/traits/parity.rs +++ b/rpc/src/v1/traits/parity.rs @@ -19,7 +19,11 @@ use jsonrpc_core::Error; use std::collections::BTreeMap; use v1::helpers::auto_args::Wrap; -use v1::types::{H160, H256, H512, U256, Bytes, Peers, Transaction, RpcSettings, Histogram}; +use v1::types::{ + H160, H256, H512, U256, Bytes, + Peers, Transaction, RpcSettings, Histogram, + TransactionStats, LocalTransactionStatus, +}; build_rpc_trait! { /// Parity-specific rpc interface. @@ -115,6 +119,14 @@ build_rpc_trait! { #[rpc(name = "parity_pendingTransactions")] fn pending_transactions(&self) -> Result, Error>; + /// Returns propagation statistics on transactions pending in the queue. + #[rpc(name = "parity_pendingTransactionsStats")] + fn pending_transactions_stats(&self) -> Result, Error>; + + /// Returns a list of current and past local transactions with status details. + #[rpc(name = "parity_localTransactions")] + fn local_transactions(&self) -> Result, Error>; + /// Returns current Trusted Signer port or an error if signer is disabled. #[rpc(name = "parity_signerPort")] fn signer_port(&self) -> Result; diff --git a/rpc/src/v1/types/mod.rs.in b/rpc/src/v1/types/mod.rs.in index 924f12884..c10d6e36f 100644 --- a/rpc/src/v1/types/mod.rs.in +++ b/rpc/src/v1/types/mod.rs.in @@ -43,8 +43,8 @@ pub use self::filter::{Filter, FilterChanges}; pub use self::hash::{H64, H160, H256, H512, H520, H2048}; pub use self::index::Index; pub use self::log::Log; -pub use self::sync::{SyncStatus, SyncInfo, Peers, PeerInfo, PeerNetworkInfo, PeerProtocolsInfo, PeerEthereumProtocolInfo}; -pub use self::transaction::{Transaction, RichRawTransaction}; +pub use self::sync::{SyncStatus, SyncInfo, Peers, PeerInfo, PeerNetworkInfo, PeerProtocolsInfo, PeerEthereumProtocolInfo, TransactionStats}; +pub use self::transaction::{Transaction, RichRawTransaction, LocalTransactionStatus}; pub use self::transaction_request::TransactionRequest; pub use self::receipt::Receipt; pub use self::rpc_settings::RpcSettings; diff --git a/rpc/src/v1/types/sync.rs b/rpc/src/v1/types/sync.rs index a0f61e799..6f8938be9 100644 --- a/rpc/src/v1/types/sync.rs +++ b/rpc/src/v1/types/sync.rs @@ -14,9 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use ethsync::PeerInfo as SyncPeerInfo; +use std::collections::BTreeMap; +use ethsync::{PeerInfo as SyncPeerInfo, TransactionStats as SyncTransactionStats}; use serde::{Serialize, Serializer}; -use v1::types::U256; +use v1::types::{U256, H512}; /// Sync info #[derive(Default, Debug, Serialize, PartialEq)] @@ -117,8 +118,19 @@ impl Serialize for SyncStatus { } } +/// Propagation statistics for pending transaction. +#[derive(Default, Debug, Serialize)] +pub struct TransactionStats { + /// Block no this transaction was first seen. + #[serde(rename="firstSeen")] + pub first_seen: u64, + /// Peers this transaction was propagated to with count. + #[serde(rename="propagatedTo")] + pub propagated_to: BTreeMap, +} + impl From for PeerInfo { - fn from(p: SyncPeerInfo) -> PeerInfo { + fn from(p: SyncPeerInfo) -> Self { PeerInfo { id: p.id, name: p.client_version, @@ -138,10 +150,23 @@ impl From for PeerInfo { } } +impl From for TransactionStats { + fn from(s: SyncTransactionStats) -> Self { + TransactionStats { + first_seen: s.first_seen, + propagated_to: s.propagated_to + .into_iter() + .map(|(id, count)| (id.into(), count)) + .collect() + } + } +} + #[cfg(test)] mod tests { use serde_json; - use super::{SyncInfo, SyncStatus, Peers}; + use std::collections::BTreeMap; + use super::{SyncInfo, SyncStatus, Peers, TransactionStats}; #[test] fn test_serialize_sync_info() { @@ -176,4 +201,17 @@ mod tests { let serialized = serde_json::to_string(&t).unwrap(); assert_eq!(serialized, r#"{"startingBlock":"0x0","currentBlock":"0x0","highestBlock":"0x0","warpChunksAmount":null,"warpChunksProcessed":null,"blockGap":["0x1","0x5"]}"#) } + + #[test] + fn test_serialize_transaction_stats() { + let stats = TransactionStats { + first_seen: 100, + propagated_to: map![ + 10.into() => 50 + ] + }; + + let serialized = serde_json::to_string(&stats).unwrap(); + assert_eq!(serialized, r#"{"firstSeen":100,"propagatedTo":{"0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a":50}}"#) + } } diff --git a/rpc/src/v1/types/transaction.rs b/rpc/src/v1/types/transaction.rs index 1f1ef1787..f566f9b20 100644 --- a/rpc/src/v1/types/transaction.rs +++ b/rpc/src/v1/types/transaction.rs @@ -14,8 +14,11 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +use serde::{Serialize, Serializer}; +use ethcore::miner; use ethcore::contract_address; use ethcore::transaction::{LocalizedTransaction, Action, SignedTransaction}; +use v1::helpers::errors; use v1::types::{Bytes, H160, H256, U256, H512}; /// Transaction @@ -62,6 +65,73 @@ pub struct Transaction { pub s: H256, } +/// Local Transaction Status +#[derive(Debug)] +pub enum LocalTransactionStatus { + /// Transaction is pending + Pending, + /// Transaction is in future part of the queue + Future, + /// Transaction is already mined. + Mined(Transaction), + /// Transaction was dropped because of limit. + Dropped(Transaction), + /// Transaction was replaced by transaction with higher gas price. + Replaced(Transaction, U256, H256), + /// Transaction never got into the queue. + Rejected(Transaction, String), + /// Transaction is invalid. + Invalid(Transaction), +} + +impl Serialize for LocalTransactionStatus { + fn serialize(&self, serializer: &mut S) -> Result<(), S::Error> + where S: Serializer + { + use self::LocalTransactionStatus::*; + + let elems = match *self { + Pending | Future => 1, + Mined(..) | Dropped(..) | Invalid(..) => 2, + Rejected(..) => 3, + Replaced(..) => 4, + }; + + let status = "status"; + let transaction = "transaction"; + + let mut state = try!(serializer.serialize_struct("LocalTransactionStatus", elems)); + match *self { + Pending => try!(serializer.serialize_struct_elt(&mut state, status, "pending")), + Future => try!(serializer.serialize_struct_elt(&mut state, status, "future")), + Mined(ref tx) => { + try!(serializer.serialize_struct_elt(&mut state, status, "mined")); + try!(serializer.serialize_struct_elt(&mut state, transaction, tx)); + }, + Dropped(ref tx) => { + try!(serializer.serialize_struct_elt(&mut state, status, "dropped")); + try!(serializer.serialize_struct_elt(&mut state, transaction, tx)); + }, + Invalid(ref tx) => { + try!(serializer.serialize_struct_elt(&mut state, status, "invalid")); + try!(serializer.serialize_struct_elt(&mut state, transaction, tx)); + }, + Rejected(ref tx, ref reason) => { + try!(serializer.serialize_struct_elt(&mut state, status, "rejected")); + try!(serializer.serialize_struct_elt(&mut state, transaction, tx)); + try!(serializer.serialize_struct_elt(&mut state, "error", reason)); + }, + Replaced(ref tx, ref gas_price, ref hash) => { + try!(serializer.serialize_struct_elt(&mut state, status, "replaced")); + try!(serializer.serialize_struct_elt(&mut state, transaction, tx)); + try!(serializer.serialize_struct_elt(&mut state, "hash", hash)); + try!(serializer.serialize_struct_elt(&mut state, "gasPrice", gas_price)); + }, + } + serializer.serialize_struct_end(state) + } +} + /// Geth-compatible output for eth_signTransaction method #[derive(Debug, Default, Clone, PartialEq, Serialize)] pub struct RichRawTransaction { @@ -144,9 +214,24 @@ impl From for Transaction { } } +impl From for LocalTransactionStatus { + fn from(s: miner::LocalTransactionStatus) -> Self { + use ethcore::miner::LocalTransactionStatus::*; + match s { + Pending => LocalTransactionStatus::Pending, + Future => LocalTransactionStatus::Future, + Mined(tx) => LocalTransactionStatus::Mined(tx.into()), + Dropped(tx) => LocalTransactionStatus::Dropped(tx.into()), + Rejected(tx, err) => LocalTransactionStatus::Rejected(tx.into(), errors::transaction_message(err)), + Replaced(tx, gas_price, hash) => LocalTransactionStatus::Replaced(tx.into(), gas_price.into(), hash.into()), + Invalid(tx) => LocalTransactionStatus::Invalid(tx.into()), + } + } +} + #[cfg(test)] mod tests { - use super::Transaction; + use super::{Transaction, LocalTransactionStatus}; use serde_json; #[test] @@ -155,5 +240,50 @@ mod tests { let serialized = serde_json::to_string(&t).unwrap(); assert_eq!(serialized, r#"{"hash":"0x0000000000000000000000000000000000000000000000000000000000000000","nonce":"0x0","blockHash":null,"blockNumber":null,"transactionIndex":null,"from":"0x0000000000000000000000000000000000000000","to":null,"value":"0x0","gasPrice":"0x0","gas":"0x0","input":"0x","creates":null,"raw":"0x","publicKey":null,"v":0,"r":"0x0000000000000000000000000000000000000000000000000000000000000000","s":"0x0000000000000000000000000000000000000000000000000000000000000000"}"#); } + + #[test] + fn test_local_transaction_status_serialize() { + let tx_ser = serde_json::to_string(&Transaction::default()).unwrap(); + let status1 = LocalTransactionStatus::Pending; + let status2 = LocalTransactionStatus::Future; + let status3 = LocalTransactionStatus::Mined(Transaction::default()); + let status4 = LocalTransactionStatus::Dropped(Transaction::default()); + let status5 = LocalTransactionStatus::Invalid(Transaction::default()); + let status6 = LocalTransactionStatus::Rejected(Transaction::default(), "Just because".into()); + let status7 = LocalTransactionStatus::Replaced(Transaction::default(), 5.into(), 10.into()); + + assert_eq!( + serde_json::to_string(&status1).unwrap(), + r#"{"status":"pending"}"# + ); + assert_eq!( + serde_json::to_string(&status2).unwrap(), + r#"{"status":"future"}"# + ); + assert_eq!( + serde_json::to_string(&status3).unwrap(), + r#"{"status":"mined","transaction":"#.to_owned() + &format!("{}", tx_ser) + r#"}"# + ); + assert_eq!( + serde_json::to_string(&status4).unwrap(), + r#"{"status":"dropped","transaction":"#.to_owned() + &format!("{}", tx_ser) + r#"}"# + ); + assert_eq!( + serde_json::to_string(&status5).unwrap(), + r#"{"status":"invalid","transaction":"#.to_owned() + &format!("{}", tx_ser) + r#"}"# + ); + assert_eq!( + serde_json::to_string(&status6).unwrap(), + r#"{"status":"rejected","transaction":"#.to_owned() + + &format!("{}", tx_ser) + + r#","error":"Just because"}"# + ); + assert_eq!( + serde_json::to_string(&status7).unwrap(), + r#"{"status":"replaced","transaction":"#.to_owned() + + &format!("{}", tx_ser) + + r#","hash":"0x000000000000000000000000000000000000000000000000000000000000000a","gasPrice":"0x5"}"# + ); + } } diff --git a/sync/src/api.rs b/sync/src/api.rs index d9dbbd263..3191483e4 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -15,13 +15,13 @@ // along with Parity. If not, see . use std::sync::Arc; -use std::collections::HashMap; +use std::collections::{HashMap, BTreeMap}; use std::io; use util::Bytes; use network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId, ProtocolId, NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode, NetworkError, AllowIP as NetworkAllowIP}; -use util::{U256, H256}; +use util::{U256, H256, H512}; use io::{TimerToken}; use ethcore::client::{BlockChainClient, ChainNotify}; use ethcore::snapshot::SnapshotService; @@ -76,6 +76,16 @@ pub trait SyncProvider: Send + Sync { /// Get the enode if available. fn enode(&self) -> Option; + + /// Returns propagation count for pending transactions. + fn transactions_stats(&self) -> BTreeMap; +} + +/// Transaction stats +#[derive(Debug, Binary)] +pub struct TransactionStats { + pub first_seen: u64, + pub propagated_to: BTreeMap, } /// Peer connection information @@ -150,6 +160,14 @@ impl SyncProvider for EthSync { fn enode(&self) -> Option { self.network.external_url() } + + fn transactions_stats(&self) -> BTreeMap { + let sync = self.handler.sync.read(); + sync.transactions_stats() + .iter() + .map(|(hash, stats)| (*hash, stats.into())) + .collect() + } } struct SyncProtocolHandler { diff --git a/sync/src/chain.rs b/sync/src/chain.rs index ffd89ecdd..d98b142cb 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -104,6 +104,7 @@ use block_sync::{BlockDownloader, BlockRequest, BlockDownloaderImportError as Do use snapshot::{Snapshot, ChunkType}; use rand::{thread_rng, Rng}; use api::{PeerInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID}; +use transactions_stats::{TransactionsStats, Stats as TransactionStats}; known_heap_size!(0, PeerInfo); @@ -259,7 +260,7 @@ enum ForkConfirmation { Unconfirmed, /// Peers chain is too short to confirm the fork. TooShort, - /// Fork is confurmed. + /// Fork is confirmed. Confirmed, } @@ -349,6 +350,8 @@ pub struct ChainSync { handshaking_peers: HashMap, /// Sync start timestamp. Measured when first peer is connected sync_start_time: Option, + /// Transactions propagation statistics + transactions_stats: TransactionsStats, } type RlpResponseResult = Result, PacketDecodeError>; @@ -371,6 +374,7 @@ impl ChainSync { fork_block: config.fork_block, snapshot: Snapshot::new(), sync_start_time: None, + transactions_stats: TransactionsStats::default(), }; sync.update_targets(chain); sync @@ -419,6 +423,11 @@ impl ChainSync { .collect() } + /// Returns transactions propagation statistics + pub fn transactions_stats(&self) -> &H256FastMap { + self.transactions_stats.stats() + } + /// Abort all sync activity pub fn abort(&mut self, io: &mut SyncIo) { self.reset_and_continue(io); @@ -1867,7 +1876,7 @@ impl ChainSync { /// propagates new transactions to all peers pub fn propagate_new_transactions(&mut self, io: &mut SyncIo) -> usize { - // Early out of nobody to send to. + // Early out if nobody to send to. if self.peers.is_empty() { return 0; } @@ -1884,38 +1893,53 @@ impl ChainSync { packet.out() }; + // Clear old transactions from stats + self.transactions_stats.retain(&all_transactions_hashes); + // sqrt(x)/x scaled to max u32 let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32; let small = self.peers.len() < MIN_PEERS_PROPAGATION; + let block_number = io.chain().chain_info().best_block_number; - let lucky_peers = self.peers.iter_mut() - .filter(|_| small || ::rand::random::() < fraction) - .take(MAX_PEERS_PROPAGATION) - .filter_map(|(peer_id, mut peer_info)| { - // Send all transactions - if peer_info.last_sent_transactions.is_empty() { - peer_info.last_sent_transactions = all_transactions_hashes.clone(); - return Some((*peer_id, all_transactions_rlp.clone())); - } - - // Get hashes of all transactions to send to this peer - let to_send = all_transactions_hashes.difference(&peer_info.last_sent_transactions).cloned().collect::>(); - if to_send.is_empty() { - return None; - } - - // Construct RLP - let mut packet = RlpStream::new_list(to_send.len()); - for tx in &transactions { - if to_send.contains(&tx.hash()) { - packet.append(tx); + let lucky_peers = { + let stats = &mut self.transactions_stats; + self.peers.iter_mut() + .filter(|_| small || ::rand::random::() < fraction) + .take(MAX_PEERS_PROPAGATION) + .filter_map(|(peer_id, mut peer_info)| { + // Send all transactions + if peer_info.last_sent_transactions.is_empty() { + // update stats + for hash in &all_transactions_hashes { + let id = io.peer_session_info(*peer_id).and_then(|info| info.id); + stats.propagated(*hash, id, block_number); + } + peer_info.last_sent_transactions = all_transactions_hashes.clone(); + return Some((*peer_id, all_transactions_rlp.clone())); } - } - peer_info.last_sent_transactions = all_transactions_hashes.clone(); - Some((*peer_id, packet.out())) - }) - .collect::>(); + // Get hashes of all transactions to send to this peer + let to_send = all_transactions_hashes.difference(&peer_info.last_sent_transactions).cloned().collect::>(); + if to_send.is_empty() { + return None; + } + + // Construct RLP + let mut packet = RlpStream::new_list(to_send.len()); + for tx in &transactions { + if to_send.contains(&tx.hash()) { + packet.append(tx); + // update stats + let id = io.peer_session_info(*peer_id).and_then(|info| info.id); + stats.propagated(tx.hash(), id, block_number); + } + } + + peer_info.last_sent_transactions = all_transactions_hashes.clone(); + Some((*peer_id, packet.out())) + }) + .collect::>() + }; // Send RLPs let sent = lucky_peers.len(); @@ -1965,9 +1989,6 @@ impl ChainSync { trace!(target: "sync", "Bad blocks in the queue, restarting"); self.restart(io); } - for peer_info in self.peers.values_mut() { - peer_info.last_sent_transactions.clear(); - } } } @@ -2293,18 +2314,23 @@ mod tests { let peer_count = sync.propagate_new_transactions(&mut io); // Try to propagate same transactions for the second time let peer_count2 = sync.propagate_new_transactions(&mut io); + // Even after new block transactions should not be propagated twice + sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[]); + // Try to propagate same transactions for the third time + let peer_count3 = sync.propagate_new_transactions(&mut io); // 1 message should be send assert_eq!(1, io.queue.len()); // 1 peer should be updated but only once assert_eq!(1, peer_count); assert_eq!(0, peer_count2); + assert_eq!(0, peer_count3); // TRANSACTIONS_PACKET assert_eq!(0x02, io.queue[0].packet_id); } #[test] - fn propagates_transactions_again_after_new_block() { + fn propagates_new_transactions_after_new_block() { let mut client = TestBlockChainClient::new(); client.add_blocks(100, EachBlockWith::Uncle); client.insert_transaction_to_queue(); @@ -2313,15 +2339,14 @@ mod tests { let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &mut queue, None); let peer_count = sync.propagate_new_transactions(&mut io); + io.chain.insert_transaction_to_queue(); + // New block import should trigger propagation. sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[]); - // Try to propagate same transactions for the second time - let peer_count2 = sync.propagate_new_transactions(&mut io); // 2 message should be send assert_eq!(2, io.queue.len()); - // 1 peer should be updated twice + // 1 peer should receive the message assert_eq!(1, peer_count); - assert_eq!(1, peer_count2); // TRANSACTIONS_PACKET assert_eq!(0x02, io.queue[0].packet_id); assert_eq!(0x02, io.queue[1].packet_id); @@ -2360,6 +2385,21 @@ mod tests { assert_eq!(0x02, io.queue[1].packet_id); } + #[test] + fn should_maintain_transations_propagation_stats() { + let mut client = TestBlockChainClient::new(); + client.add_blocks(100, EachBlockWith::Uncle); + client.insert_transaction_to_queue(); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client); + let mut queue = VecDeque::new(); + let ss = TestSnapshotService::new(); + let mut io = TestIo::new(&mut client, &ss, &mut queue, None); + sync.propagate_new_transactions(&mut io); + + let stats = sync.transactions_stats(); + assert_eq!(stats.len(), 1, "Should maintain stats for single transaction.") + } + #[test] fn handles_peer_new_block_malformed() { let mut client = TestBlockChainClient::new(); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 532c05711..2061e4e3a 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -51,6 +51,7 @@ mod blocks; mod block_sync; mod sync_io; mod snapshot; +mod transactions_stats; #[cfg(test)] mod tests; @@ -61,7 +62,7 @@ mod api { } pub use api::{EthSync, SyncProvider, SyncClient, NetworkManagerClient, ManageNetwork, SyncConfig, - ServiceConfiguration, NetworkConfiguration, PeerInfo, AllowIP}; + ServiceConfiguration, NetworkConfiguration, PeerInfo, AllowIP, TransactionStats}; pub use chain::{SyncStatus, SyncState}; pub use network::{is_valid_node_url, NonReservedPeerMode, NetworkError}; diff --git a/sync/src/transactions_stats.rs b/sync/src/transactions_stats.rs new file mode 100644 index 000000000..8c5eb6dda --- /dev/null +++ b/sync/src/transactions_stats.rs @@ -0,0 +1,134 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use api::TransactionStats; +use std::collections::{HashSet, HashMap}; +use util::{H256, H512}; +use util::hash::H256FastMap; + +type NodeId = H512; +type BlockNumber = u64; + +#[derive(Debug, PartialEq, Clone)] +pub struct Stats { + first_seen: BlockNumber, + propagated_to: HashMap, +} + +impl Stats { + pub fn new(number: BlockNumber) -> Self { + Stats { + first_seen: number, + propagated_to: Default::default(), + } + } +} + +impl<'a> From<&'a Stats> for TransactionStats { + fn from(other: &'a Stats) -> Self { + TransactionStats { + first_seen: other.first_seen, + propagated_to: other.propagated_to + .iter() + .map(|(hash, size)| (*hash, *size)) + .collect(), + } + } +} + +#[derive(Debug, Default)] +pub struct TransactionsStats { + pending_transactions: H256FastMap, +} + +impl TransactionsStats { + /// Increases number of propagations to given `enodeid`. + pub fn propagated(&mut self, hash: H256, enode_id: Option, current_block_num: BlockNumber) { + let enode_id = enode_id.unwrap_or_default(); + let mut stats = self.pending_transactions.entry(hash).or_insert_with(|| Stats::new(current_block_num)); + let mut count = stats.propagated_to.entry(enode_id).or_insert(0); + *count = count.saturating_add(1); + } + + /// Returns propagation stats for given hash or `None` if hash is not known. + #[cfg(test)] + pub fn get(&self, hash: &H256) -> Option<&Stats> { + self.pending_transactions.get(hash) + } + + pub fn stats(&self) -> &H256FastMap { + &self.pending_transactions + } + + /// Retains only transactions present in given `HashSet`. + pub fn retain(&mut self, hashes: &HashSet) { + let to_remove = self.pending_transactions.keys() + .filter(|hash| !hashes.contains(hash)) + .cloned() + .collect::>(); + + for hash in to_remove { + self.pending_transactions.remove(&hash); + } + } +} + +#[cfg(test)] +mod tests { + + use std::collections::{HashMap, HashSet}; + use super::{Stats, TransactionsStats}; + + #[test] + fn should_keep_track_of_propagations() { + // given + let mut stats = TransactionsStats::default(); + let hash = 5.into(); + let enodeid1 = 2.into(); + let enodeid2 = 5.into(); + + // when + stats.propagated(hash, Some(enodeid1), 5); + stats.propagated(hash, Some(enodeid1), 10); + stats.propagated(hash, Some(enodeid2), 15); + + // then + let stats = stats.get(&hash); + assert_eq!(stats, Some(&Stats { + first_seen: 5, + propagated_to: hash_map![ + enodeid1 => 2, + enodeid2 => 1 + ] + })); + } + + #[test] + fn should_remove_hash_from_tracking() { + // given + let mut stats = TransactionsStats::default(); + let hash = 5.into(); + let enodeid1 = 5.into(); + stats.propagated(hash, Some(enodeid1), 10); + + // when + stats.retain(&HashSet::new()); + + // then + let stats = stats.get(&hash); + assert_eq!(stats, None); + } +}