diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs index e0282d460..50ff20e38 100644 --- a/ethcore/src/client/chain_notify.rs +++ b/ethcore/src/client/chain_notify.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . use ipc::IpcConfig; -use util::H256; +use util::{H256, H512}; /// Represents what has to be handled by actor listening to chain events #[ipc] @@ -40,6 +40,15 @@ pub trait ChainNotify : Send + Sync { fn stop(&self) { // does nothing by default } + + /// fires when new transactions are imported + fn transactions_imported(&self, + _hashes: Vec, + _peer_id: Option, + _block_num: u64, + ) { + // does nothing by default + } } impl IpcConfig for ChainNotify { } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index dfd899b29..9add41e4f 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -25,7 +25,7 @@ use time::precise_time_ns; use util::{Bytes, PerfTimer, Itertools, Mutex, RwLock, Hashable}; use util::{journaldb, TrieFactory, Trie}; use util::trie::TrieSpec; -use util::{U256, H256, Address, H2048, Uint, FixedHash}; +use util::{U256, H256, H512, Address, H2048, Uint, FixedHash}; use util::kvdb::*; // other @@ -559,11 +559,16 @@ impl Client { } /// Import transactions from the IO queue - pub fn import_queued_transactions(&self, transactions: &[Bytes]) -> usize { + pub fn import_queued_transactions(&self, transactions: &[Bytes], peer_id: Option) -> usize { trace!(target: "external_tx", "Importing queued"); let _timer = PerfTimer::new("import_queued_transactions"); self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst); - let txs = transactions.iter().filter_map(|bytes| UntrustedRlp::new(bytes).as_val().ok()).collect(); + let txs: Vec = transactions.iter().filter_map(|bytes| UntrustedRlp::new(bytes).as_val().ok()).collect(); + let hashes: Vec<_> = txs.iter().map(|tx| tx.hash()).collect(); + let block_number = self.chain_info().best_block_number; + self.notify(|notify| { + notify.transactions_imported(hashes.clone(), peer_id.clone(), block_number); + }); let results = self.miner.import_external_transactions(self, txs); results.len() } @@ -1264,14 +1269,14 @@ impl BlockChainClient for Client { (*self.build_last_hashes(self.chain.read().best_block_hash())).clone() } - fn queue_transactions(&self, transactions: Vec) { + fn queue_transactions(&self, transactions: Vec, node_id: Option) { let queue_size = self.queue_transactions.load(AtomicOrdering::Relaxed); trace!(target: "external_tx", "Queue size: {}", queue_size); if queue_size > MAX_TX_QUEUE_SIZE { debug!("Ignoring {} transactions: queue is full", transactions.len()); } else { let len = transactions.len(); - match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions)) { + match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions, node_id)) { Ok(_) => { self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst); } diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index 317a481c7..44efade66 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -657,7 +657,7 @@ impl BlockChainClient for TestBlockChainClient { unimplemented!(); } - fn queue_transactions(&self, transactions: Vec) { + fn queue_transactions(&self, transactions: Vec, _peer_id: Option) { // import right here let txs = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect(); self.miner.import_external_transactions(self, txs); diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs index e23a564d4..c032d4059 100644 --- a/ethcore/src/client/traits.rs +++ b/ethcore/src/client/traits.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . use std::collections::BTreeMap; -use util::{U256, Address, H256, H2048, Bytes, Itertools}; +use util::{U256, Address, H256, H512, H2048, Bytes, Itertools}; use util::stats::Histogram; use blockchain::TreeRoute; use verification::queue::QueueInfo as BlockQueueInfo; @@ -200,7 +200,7 @@ pub trait BlockChainClient : Sync + Send { fn last_hashes(&self) -> LastHashes; /// Queue transactions for importing. - fn queue_transactions(&self, transactions: Vec); + fn queue_transactions(&self, transactions: Vec, peer_id: Option); /// list all transactions fn pending_transactions(&self) -> Vec; @@ -294,9 +294,9 @@ pub trait ProvingBlockChainClient: BlockChainClient { /// The key is the keccak hash of the account's address. /// Returns a vector of raw trie nodes (in order from the root) proving the query. /// Nodes after `from_level` may be omitted. - /// An empty vector indicates unservable query. + /// An empty vector indicates unservable query. fn prove_account(&self, key1: H256, from_level: u32, id: BlockID) -> Vec; /// Get code by address hash. fn code_by_hash(&self, account_key: H256, id: BlockID) -> Bytes; -} \ No newline at end of file +} diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 36b5e7157..b595843a8 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -39,7 +39,7 @@ pub enum ClientIoMessage { /// A block is ready BlockVerified, /// New transaction RLPs are ready to be imported - NewTransactions(Vec), + NewTransactions(Vec, Option), /// Begin snapshot restoration BeginRestoration(ManifestData), /// Feed a state chunk to the snapshot service @@ -196,7 +196,9 @@ impl IoHandler for ClientIoHandler { match *net_message { ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(); } - ClientIoMessage::NewTransactions(ref transactions) => { self.client.import_queued_transactions(transactions); } + ClientIoMessage::NewTransactions(ref transactions, ref peer_id) => { + self.client.import_queued_transactions(transactions, peer_id.clone()); + } ClientIoMessage::BeginRestoration(ref manifest) => { if let Err(e) = self.snapshot.init_restore(manifest.clone(), true) { warn!("Failed to initialize snapshot restoration: {}", e); diff --git a/js/src/dapps/localtx/Transaction/transaction.js b/js/src/dapps/localtx/Transaction/transaction.js index 17a45ecd6..c9ca10ba5 100644 --- a/js/src/dapps/localtx/Transaction/transaction.js +++ b/js/src/dapps/localtx/Transaction/transaction.js @@ -48,7 +48,6 @@ class BaseTransaction extends Component { - 0x{ transaction.nonce.toString(16) } ); } @@ -87,6 +86,17 @@ class BaseTransaction extends Component { ); } + + renderReceived (stats) { + const noOfPeers = Object.keys(stats.receivedFrom).length; + const noOfPropagations = Object.values(stats.receivedFrom).reduce((sum, val) => sum + val, 0); + + return ( + + { noOfPropagations } ({ noOfPeers } peers) + + ); + } } export class Transaction extends BaseTransaction { @@ -103,7 +113,8 @@ export class Transaction extends BaseTransaction { isLocal: false, stats: { firstSeen: 0, - propagatedTo: {} + propagatedTo: {}, + receivedFrom: {} } }; @@ -129,6 +140,9 @@ export class Transaction extends BaseTransaction { # Propagated + + # Received + ); @@ -165,6 +179,9 @@ export class Transaction extends BaseTransaction { { this.renderPropagation(stats) } + + { this.renderReceived(stats) } + ); } @@ -193,7 +210,8 @@ export class LocalTransaction extends BaseTransaction { static defaultProps = { stats: { - propagatedTo: {} + propagatedTo: {}, + receivedFrom: {} } }; @@ -317,6 +335,8 @@ export class LocalTransaction extends BaseTransaction { { this.renderStatus() }
{ status === 'pending' ? this.renderPropagation(stats) : null } +
+ { status === 'pending' ? this.renderReceived(stats) : null } ); diff --git a/js/src/dapps/localtx/Transaction/transaction.spec.js b/js/src/dapps/localtx/Transaction/transaction.spec.js index 04f2f8de8..2bd3691db 100644 --- a/js/src/dapps/localtx/Transaction/transaction.spec.js +++ b/js/src/dapps/localtx/Transaction/transaction.spec.js @@ -34,7 +34,7 @@ describe('dapps/localtx/Transaction', () => { it('renders without crashing', () => { const transaction = { hash: '0x1234567890', - nonce: 15, + nonce: new BigNumber(15), gasPrice: new BigNumber(10), gas: new BigNumber(10) }; diff --git a/rpc/src/v1/tests/helpers/sync_provider.rs b/rpc/src/v1/tests/helpers/sync_provider.rs index 8800d926a..aa7e8d849 100644 --- a/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/rpc/src/v1/tests/helpers/sync_provider.rs @@ -105,13 +105,19 @@ impl SyncProvider for TestSyncProvider { first_seen: 10, propagated_to: map![ 128.into() => 16 - ] + ], + received_from: map![ + 1.into() => 10 + ], }, 5.into() => TransactionStats { first_seen: 16, propagated_to: map![ 16.into() => 1 - ] + ], + received_from: map![ + 256.into() => 2 + ], } ] } diff --git a/rpc/src/v1/types/sync.rs b/rpc/src/v1/types/sync.rs index 6f8938be9..65d989156 100644 --- a/rpc/src/v1/types/sync.rs +++ b/rpc/src/v1/types/sync.rs @@ -127,6 +127,9 @@ pub struct TransactionStats { /// Peers this transaction was propagated to with count. #[serde(rename="propagatedTo")] pub propagated_to: BTreeMap, + /// Peers that propagated this transaction back. + #[serde(rename="receivedFrom")] + pub received_from: BTreeMap, } impl From for PeerInfo { @@ -157,7 +160,11 @@ impl From for TransactionStats { propagated_to: s.propagated_to .into_iter() .map(|(id, count)| (id.into(), count)) - .collect() + .collect(), + received_from: s.received_from + .into_iter() + .map(|(id, count)| (id.into(), count)) + .collect(), } } } @@ -208,10 +215,13 @@ mod tests { first_seen: 100, propagated_to: map![ 10.into() => 50 - ] + ], + received_from: map![ + 1.into() => 1000 + ], }; let serialized = serde_json::to_string(&stats).unwrap(); - assert_eq!(serialized, r#"{"firstSeen":100,"propagatedTo":{"0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a":50}}"#) + assert_eq!(serialized, r#"{"firstSeen":100,"propagatedTo":{"0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a":50},"receivedFrom":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001":1000}}"#) } } diff --git a/sync/src/api.rs b/sync/src/api.rs index 7c531bf7c..10434ce26 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -99,6 +99,8 @@ pub struct TransactionStats { pub first_seen: u64, /// Peers it was propagated to. pub propagated_to: BTreeMap, + /// Peers that propagated the transaction back. + pub received_from: BTreeMap, } /// Peer connection information @@ -144,7 +146,7 @@ pub struct EthSync { network: NetworkService, /// Main (eth/par) protocol handler sync_handler: Arc, - /// Light (les) protocol handler + /// Light (les) protocol handler light_proto: Option>, /// The main subprotocol name subprotocol_name: [u8; 3], @@ -155,7 +157,7 @@ pub struct EthSync { impl EthSync { /// Creates and register protocol with the network service pub fn new(params: Params) -> Result, NetworkError> { - let pruning_info = params.chain.pruning_info(); + let pruning_info = params.chain.pruning_info(); let light_proto = match params.config.serve_light { false => None, true => Some({ @@ -297,7 +299,7 @@ impl ChainNotify for EthSync { Some(lp) => lp, None => return, }; - + let chain_info = self.sync_handler.chain.chain_info(); light_proto.make_announcement(context, Announcement { head_hash: chain_info.best_block_hash, @@ -323,7 +325,7 @@ impl ChainNotify for EthSync { // register the warp sync subprotocol self.network.register_protocol(self.sync_handler.clone(), WARP_SYNC_PROTOCOL_ID, SNAPSHOT_SYNC_PACKET_COUNT, &[1u8]) .unwrap_or_else(|e| warn!("Error registering snapshot sync protocol: {:?}", e)); - + // register the light protocol. if let Some(light_proto) = self.light_proto.as_ref().map(|x| x.clone()) { self.network.register_protocol(light_proto, self.light_subprotocol_name, ::light::net::PACKET_COUNT, ::light::net::PROTOCOL_VERSIONS) @@ -335,6 +337,11 @@ impl ChainNotify for EthSync { self.sync_handler.snapshot_service.abort_restore(); self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e)); } + + fn transactions_imported(&self, hashes: Vec, peer_id: Option, block_number: u64) { + let mut sync = self.sync_handler.sync.write(); + sync.transactions_imported(hashes, peer_id, block_number); + } } /// LES event handler. @@ -344,7 +351,8 @@ struct TxRelay(Arc); impl LightHandler for TxRelay { fn on_transactions(&self, ctx: &EventContext, relay: &[::ethcore::transaction::SignedTransaction]) { trace!(target: "les", "Relaying {} transactions from peer {}", relay.len(), ctx.peer()); - self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect()) + // TODO [ToDr] Can we get a peer enode somehow? + self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect(), None) } } @@ -547,4 +555,4 @@ pub struct ServiceConfiguration { pub net: NetworkConfiguration, /// IPC path. pub io_path: String, -} \ No newline at end of file +} diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 2d53ad5ee..9115ac297 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -432,6 +432,13 @@ impl ChainSync { self.transactions_stats.stats() } + /// Updates statistics for imported transactions. + pub fn transactions_imported(&mut self, hashes: Vec, peer_id: Option, block_number: u64) { + for hash in hashes { + self.transactions_stats.received(hash, peer_id, block_number); + } + } + /// Abort all sync activity pub fn abort(&mut self, io: &mut SyncIo) { self.reset_and_continue(io); @@ -1409,7 +1416,8 @@ impl ChainSync { let tx = rlp.as_raw().to_vec(); transactions.push(tx); } - io.chain().queue_transactions(transactions); + let id = io.peer_session_info(peer_id).and_then(|info| info.id); + io.chain().queue_transactions(transactions, id); Ok(()) } diff --git a/sync/src/transactions_stats.rs b/sync/src/transactions_stats.rs index 8c5eb6dda..a91a860e5 100644 --- a/sync/src/transactions_stats.rs +++ b/sync/src/transactions_stats.rs @@ -26,6 +26,7 @@ type BlockNumber = u64; pub struct Stats { first_seen: BlockNumber, propagated_to: HashMap, + received_from: HashMap, } impl Stats { @@ -33,6 +34,7 @@ impl Stats { Stats { first_seen: number, propagated_to: Default::default(), + received_from: Default::default(), } } } @@ -45,6 +47,10 @@ impl<'a> From<&'a Stats> for TransactionStats { .iter() .map(|(hash, size)| (*hash, *size)) .collect(), + received_from: other.received_from + .iter() + .map(|(hash, size)| (*hash, *size)) + .collect(), } } } @@ -63,6 +69,14 @@ impl TransactionsStats { *count = count.saturating_add(1); } + /// Increase number of back-propagations from given `enodeid`. + pub fn received(&mut self, hash: H256, enode_id: Option, current_block_num: BlockNumber) { + let enode_id = enode_id.unwrap_or_default(); + let mut stats = self.pending_transactions.entry(hash).or_insert_with(|| Stats::new(current_block_num)); + let mut count = stats.received_from.entry(enode_id).or_insert(0); + *count = count.saturating_add(1); + } + /// Returns propagation stats for given hash or `None` if hash is not known. #[cfg(test)] pub fn get(&self, hash: &H256) -> Option<&Stats> { @@ -112,6 +126,32 @@ mod tests { propagated_to: hash_map![ enodeid1 => 2, enodeid2 => 1 + ], + received_from: Default::default(), + })); + } + + #[test] + fn should_keep_track_of_back_propagations() { + // given + let mut stats = TransactionsStats::default(); + let hash = 5.into(); + let enodeid1 = 2.into(); + let enodeid2 = 5.into(); + + // when + stats.received(hash, Some(enodeid1), 5); + stats.received(hash, Some(enodeid1), 10); + stats.received(hash, Some(enodeid2), 15); + + // then + let stats = stats.get(&hash); + assert_eq!(stats, Some(&Stats { + first_seen: 5, + propagated_to: Default::default(), + received_from: hash_map![ + enodeid1 => 2, + enodeid2 => 1 ] })); }