From e1ade5b3750561101ce8519e3767ae7b63886e05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 10 Dec 2016 14:56:41 +0100 Subject: [PATCH 1/7] Maintaining a list of transactions propagated from other peers --- ethcore/src/client/chain_notify.rs | 11 ++++- ethcore/src/client/client.rs | 15 ++++--- ethcore/src/client/test_client.rs | 2 +- ethcore/src/client/traits.rs | 8 ++-- ethcore/src/service.rs | 6 ++- .../dapps/localtx/Transaction/transaction.js | 26 ++++++++++-- .../localtx/Transaction/transaction.spec.js | 2 +- rpc/src/v1/tests/helpers/sync_provider.rs | 10 ++++- rpc/src/v1/types/sync.rs | 16 ++++++-- sync/src/api.rs | 20 +++++++--- sync/src/chain.rs | 10 ++++- sync/src/transactions_stats.rs | 40 +++++++++++++++++++ 12 files changed, 137 insertions(+), 29 deletions(-) diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs index e0282d460..50ff20e38 100644 --- a/ethcore/src/client/chain_notify.rs +++ b/ethcore/src/client/chain_notify.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . use ipc::IpcConfig; -use util::H256; +use util::{H256, H512}; /// Represents what has to be handled by actor listening to chain events #[ipc] @@ -40,6 +40,15 @@ pub trait ChainNotify : Send + Sync { fn stop(&self) { // does nothing by default } + + /// fires when new transactions are imported + fn transactions_imported(&self, + _hashes: Vec, + _peer_id: Option, + _block_num: u64, + ) { + // does nothing by default + } } impl IpcConfig for ChainNotify { } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index dfd899b29..9add41e4f 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -25,7 +25,7 @@ use time::precise_time_ns; use util::{Bytes, PerfTimer, Itertools, Mutex, RwLock, Hashable}; use util::{journaldb, TrieFactory, Trie}; use util::trie::TrieSpec; -use util::{U256, H256, Address, H2048, Uint, FixedHash}; +use util::{U256, H256, H512, Address, H2048, Uint, FixedHash}; use util::kvdb::*; // other @@ -559,11 +559,16 @@ impl Client { } /// Import transactions from the IO queue - pub fn import_queued_transactions(&self, transactions: &[Bytes]) -> usize { + pub fn import_queued_transactions(&self, transactions: &[Bytes], peer_id: Option) -> usize { trace!(target: "external_tx", "Importing queued"); let _timer = PerfTimer::new("import_queued_transactions"); self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst); - let txs = transactions.iter().filter_map(|bytes| UntrustedRlp::new(bytes).as_val().ok()).collect(); + let txs: Vec = transactions.iter().filter_map(|bytes| UntrustedRlp::new(bytes).as_val().ok()).collect(); + let hashes: Vec<_> = txs.iter().map(|tx| tx.hash()).collect(); + let block_number = self.chain_info().best_block_number; + self.notify(|notify| { + notify.transactions_imported(hashes.clone(), peer_id.clone(), block_number); + }); let results = self.miner.import_external_transactions(self, txs); results.len() } @@ -1264,14 +1269,14 @@ impl BlockChainClient for Client { (*self.build_last_hashes(self.chain.read().best_block_hash())).clone() } - fn queue_transactions(&self, transactions: Vec) { + fn queue_transactions(&self, transactions: Vec, node_id: Option) { let queue_size = self.queue_transactions.load(AtomicOrdering::Relaxed); trace!(target: "external_tx", "Queue size: {}", queue_size); if queue_size > MAX_TX_QUEUE_SIZE { debug!("Ignoring {} transactions: queue is full", transactions.len()); } else { let len = transactions.len(); - match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions)) { + match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions, node_id)) { Ok(_) => { self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst); } diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index 317a481c7..44efade66 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -657,7 +657,7 @@ impl BlockChainClient for TestBlockChainClient { unimplemented!(); } - fn queue_transactions(&self, transactions: Vec) { + fn queue_transactions(&self, transactions: Vec, _peer_id: Option) { // import right here let txs = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect(); self.miner.import_external_transactions(self, txs); diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs index e23a564d4..c032d4059 100644 --- a/ethcore/src/client/traits.rs +++ b/ethcore/src/client/traits.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . use std::collections::BTreeMap; -use util::{U256, Address, H256, H2048, Bytes, Itertools}; +use util::{U256, Address, H256, H512, H2048, Bytes, Itertools}; use util::stats::Histogram; use blockchain::TreeRoute; use verification::queue::QueueInfo as BlockQueueInfo; @@ -200,7 +200,7 @@ pub trait BlockChainClient : Sync + Send { fn last_hashes(&self) -> LastHashes; /// Queue transactions for importing. - fn queue_transactions(&self, transactions: Vec); + fn queue_transactions(&self, transactions: Vec, peer_id: Option); /// list all transactions fn pending_transactions(&self) -> Vec; @@ -294,9 +294,9 @@ pub trait ProvingBlockChainClient: BlockChainClient { /// The key is the keccak hash of the account's address. /// Returns a vector of raw trie nodes (in order from the root) proving the query. /// Nodes after `from_level` may be omitted. - /// An empty vector indicates unservable query. + /// An empty vector indicates unservable query. fn prove_account(&self, key1: H256, from_level: u32, id: BlockID) -> Vec; /// Get code by address hash. fn code_by_hash(&self, account_key: H256, id: BlockID) -> Bytes; -} \ No newline at end of file +} diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 36b5e7157..b595843a8 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -39,7 +39,7 @@ pub enum ClientIoMessage { /// A block is ready BlockVerified, /// New transaction RLPs are ready to be imported - NewTransactions(Vec), + NewTransactions(Vec, Option), /// Begin snapshot restoration BeginRestoration(ManifestData), /// Feed a state chunk to the snapshot service @@ -196,7 +196,9 @@ impl IoHandler for ClientIoHandler { match *net_message { ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(); } - ClientIoMessage::NewTransactions(ref transactions) => { self.client.import_queued_transactions(transactions); } + ClientIoMessage::NewTransactions(ref transactions, ref peer_id) => { + self.client.import_queued_transactions(transactions, peer_id.clone()); + } ClientIoMessage::BeginRestoration(ref manifest) => { if let Err(e) = self.snapshot.init_restore(manifest.clone(), true) { warn!("Failed to initialize snapshot restoration: {}", e); diff --git a/js/src/dapps/localtx/Transaction/transaction.js b/js/src/dapps/localtx/Transaction/transaction.js index 17a45ecd6..c9ca10ba5 100644 --- a/js/src/dapps/localtx/Transaction/transaction.js +++ b/js/src/dapps/localtx/Transaction/transaction.js @@ -48,7 +48,6 @@ class BaseTransaction extends Component { - 0x{ transaction.nonce.toString(16) } ); } @@ -87,6 +86,17 @@ class BaseTransaction extends Component { ); } + + renderReceived (stats) { + const noOfPeers = Object.keys(stats.receivedFrom).length; + const noOfPropagations = Object.values(stats.receivedFrom).reduce((sum, val) => sum + val, 0); + + return ( + + { noOfPropagations } ({ noOfPeers } peers) + + ); + } } export class Transaction extends BaseTransaction { @@ -103,7 +113,8 @@ export class Transaction extends BaseTransaction { isLocal: false, stats: { firstSeen: 0, - propagatedTo: {} + propagatedTo: {}, + receivedFrom: {} } }; @@ -129,6 +140,9 @@ export class Transaction extends BaseTransaction { # Propagated + + # Received + ); @@ -165,6 +179,9 @@ export class Transaction extends BaseTransaction { { this.renderPropagation(stats) } + + { this.renderReceived(stats) } + ); } @@ -193,7 +210,8 @@ export class LocalTransaction extends BaseTransaction { static defaultProps = { stats: { - propagatedTo: {} + propagatedTo: {}, + receivedFrom: {} } }; @@ -317,6 +335,8 @@ export class LocalTransaction extends BaseTransaction { { this.renderStatus() }
{ status === 'pending' ? this.renderPropagation(stats) : null } +
+ { status === 'pending' ? this.renderReceived(stats) : null } ); diff --git a/js/src/dapps/localtx/Transaction/transaction.spec.js b/js/src/dapps/localtx/Transaction/transaction.spec.js index 04f2f8de8..2bd3691db 100644 --- a/js/src/dapps/localtx/Transaction/transaction.spec.js +++ b/js/src/dapps/localtx/Transaction/transaction.spec.js @@ -34,7 +34,7 @@ describe('dapps/localtx/Transaction', () => { it('renders without crashing', () => { const transaction = { hash: '0x1234567890', - nonce: 15, + nonce: new BigNumber(15), gasPrice: new BigNumber(10), gas: new BigNumber(10) }; diff --git a/rpc/src/v1/tests/helpers/sync_provider.rs b/rpc/src/v1/tests/helpers/sync_provider.rs index 8800d926a..aa7e8d849 100644 --- a/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/rpc/src/v1/tests/helpers/sync_provider.rs @@ -105,13 +105,19 @@ impl SyncProvider for TestSyncProvider { first_seen: 10, propagated_to: map![ 128.into() => 16 - ] + ], + received_from: map![ + 1.into() => 10 + ], }, 5.into() => TransactionStats { first_seen: 16, propagated_to: map![ 16.into() => 1 - ] + ], + received_from: map![ + 256.into() => 2 + ], } ] } diff --git a/rpc/src/v1/types/sync.rs b/rpc/src/v1/types/sync.rs index 6f8938be9..65d989156 100644 --- a/rpc/src/v1/types/sync.rs +++ b/rpc/src/v1/types/sync.rs @@ -127,6 +127,9 @@ pub struct TransactionStats { /// Peers this transaction was propagated to with count. #[serde(rename="propagatedTo")] pub propagated_to: BTreeMap, + /// Peers that propagated this transaction back. + #[serde(rename="receivedFrom")] + pub received_from: BTreeMap, } impl From for PeerInfo { @@ -157,7 +160,11 @@ impl From for TransactionStats { propagated_to: s.propagated_to .into_iter() .map(|(id, count)| (id.into(), count)) - .collect() + .collect(), + received_from: s.received_from + .into_iter() + .map(|(id, count)| (id.into(), count)) + .collect(), } } } @@ -208,10 +215,13 @@ mod tests { first_seen: 100, propagated_to: map![ 10.into() => 50 - ] + ], + received_from: map![ + 1.into() => 1000 + ], }; let serialized = serde_json::to_string(&stats).unwrap(); - assert_eq!(serialized, r#"{"firstSeen":100,"propagatedTo":{"0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a":50}}"#) + assert_eq!(serialized, r#"{"firstSeen":100,"propagatedTo":{"0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a":50},"receivedFrom":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001":1000}}"#) } } diff --git a/sync/src/api.rs b/sync/src/api.rs index 7c531bf7c..10434ce26 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -99,6 +99,8 @@ pub struct TransactionStats { pub first_seen: u64, /// Peers it was propagated to. pub propagated_to: BTreeMap, + /// Peers that propagated the transaction back. + pub received_from: BTreeMap, } /// Peer connection information @@ -144,7 +146,7 @@ pub struct EthSync { network: NetworkService, /// Main (eth/par) protocol handler sync_handler: Arc, - /// Light (les) protocol handler + /// Light (les) protocol handler light_proto: Option>, /// The main subprotocol name subprotocol_name: [u8; 3], @@ -155,7 +157,7 @@ pub struct EthSync { impl EthSync { /// Creates and register protocol with the network service pub fn new(params: Params) -> Result, NetworkError> { - let pruning_info = params.chain.pruning_info(); + let pruning_info = params.chain.pruning_info(); let light_proto = match params.config.serve_light { false => None, true => Some({ @@ -297,7 +299,7 @@ impl ChainNotify for EthSync { Some(lp) => lp, None => return, }; - + let chain_info = self.sync_handler.chain.chain_info(); light_proto.make_announcement(context, Announcement { head_hash: chain_info.best_block_hash, @@ -323,7 +325,7 @@ impl ChainNotify for EthSync { // register the warp sync subprotocol self.network.register_protocol(self.sync_handler.clone(), WARP_SYNC_PROTOCOL_ID, SNAPSHOT_SYNC_PACKET_COUNT, &[1u8]) .unwrap_or_else(|e| warn!("Error registering snapshot sync protocol: {:?}", e)); - + // register the light protocol. if let Some(light_proto) = self.light_proto.as_ref().map(|x| x.clone()) { self.network.register_protocol(light_proto, self.light_subprotocol_name, ::light::net::PACKET_COUNT, ::light::net::PROTOCOL_VERSIONS) @@ -335,6 +337,11 @@ impl ChainNotify for EthSync { self.sync_handler.snapshot_service.abort_restore(); self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e)); } + + fn transactions_imported(&self, hashes: Vec, peer_id: Option, block_number: u64) { + let mut sync = self.sync_handler.sync.write(); + sync.transactions_imported(hashes, peer_id, block_number); + } } /// LES event handler. @@ -344,7 +351,8 @@ struct TxRelay(Arc); impl LightHandler for TxRelay { fn on_transactions(&self, ctx: &EventContext, relay: &[::ethcore::transaction::SignedTransaction]) { trace!(target: "les", "Relaying {} transactions from peer {}", relay.len(), ctx.peer()); - self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect()) + // TODO [ToDr] Can we get a peer enode somehow? + self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect(), None) } } @@ -547,4 +555,4 @@ pub struct ServiceConfiguration { pub net: NetworkConfiguration, /// IPC path. pub io_path: String, -} \ No newline at end of file +} diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 2d53ad5ee..9115ac297 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -432,6 +432,13 @@ impl ChainSync { self.transactions_stats.stats() } + /// Updates statistics for imported transactions. + pub fn transactions_imported(&mut self, hashes: Vec, peer_id: Option, block_number: u64) { + for hash in hashes { + self.transactions_stats.received(hash, peer_id, block_number); + } + } + /// Abort all sync activity pub fn abort(&mut self, io: &mut SyncIo) { self.reset_and_continue(io); @@ -1409,7 +1416,8 @@ impl ChainSync { let tx = rlp.as_raw().to_vec(); transactions.push(tx); } - io.chain().queue_transactions(transactions); + let id = io.peer_session_info(peer_id).and_then(|info| info.id); + io.chain().queue_transactions(transactions, id); Ok(()) } diff --git a/sync/src/transactions_stats.rs b/sync/src/transactions_stats.rs index 8c5eb6dda..a91a860e5 100644 --- a/sync/src/transactions_stats.rs +++ b/sync/src/transactions_stats.rs @@ -26,6 +26,7 @@ type BlockNumber = u64; pub struct Stats { first_seen: BlockNumber, propagated_to: HashMap, + received_from: HashMap, } impl Stats { @@ -33,6 +34,7 @@ impl Stats { Stats { first_seen: number, propagated_to: Default::default(), + received_from: Default::default(), } } } @@ -45,6 +47,10 @@ impl<'a> From<&'a Stats> for TransactionStats { .iter() .map(|(hash, size)| (*hash, *size)) .collect(), + received_from: other.received_from + .iter() + .map(|(hash, size)| (*hash, *size)) + .collect(), } } } @@ -63,6 +69,14 @@ impl TransactionsStats { *count = count.saturating_add(1); } + /// Increase number of back-propagations from given `enodeid`. + pub fn received(&mut self, hash: H256, enode_id: Option, current_block_num: BlockNumber) { + let enode_id = enode_id.unwrap_or_default(); + let mut stats = self.pending_transactions.entry(hash).or_insert_with(|| Stats::new(current_block_num)); + let mut count = stats.received_from.entry(enode_id).or_insert(0); + *count = count.saturating_add(1); + } + /// Returns propagation stats for given hash or `None` if hash is not known. #[cfg(test)] pub fn get(&self, hash: &H256) -> Option<&Stats> { @@ -112,6 +126,32 @@ mod tests { propagated_to: hash_map![ enodeid1 => 2, enodeid2 => 1 + ], + received_from: Default::default(), + })); + } + + #[test] + fn should_keep_track_of_back_propagations() { + // given + let mut stats = TransactionsStats::default(); + let hash = 5.into(); + let enodeid1 = 2.into(); + let enodeid2 = 5.into(); + + // when + stats.received(hash, Some(enodeid1), 5); + stats.received(hash, Some(enodeid1), 10); + stats.received(hash, Some(enodeid2), 15); + + // then + let stats = stats.get(&hash); + assert_eq!(stats, Some(&Stats { + first_seen: 5, + propagated_to: Default::default(), + received_from: hash_map![ + enodeid1 => 2, + enodeid2 => 1 ] })); } From e66157f9227606d8c4b07115b0ac2d91faebc7fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 10 Dec 2016 16:40:32 +0100 Subject: [PATCH 2/7] fixing test --- rpc/src/v1/tests/mocked/parity.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/src/v1/tests/mocked/parity.rs b/rpc/src/v1/tests/mocked/parity.rs index 9b4daaccd..45ee4aa75 100644 --- a/rpc/src/v1/tests/mocked/parity.rs +++ b/rpc/src/v1/tests/mocked/parity.rs @@ -363,7 +363,7 @@ fn rpc_parity_transactions_stats() { let io = deps.default_client(); let request = r#"{"jsonrpc": "2.0", "method": "parity_pendingTransactionsStats", "params":[], "id": 1}"#; - let response = r#"{"jsonrpc":"2.0","result":{"0x0000000000000000000000000000000000000000000000000000000000000001":{"firstSeen":10,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000080":16}},"0x0000000000000000000000000000000000000000000000000000000000000005":{"firstSeen":16,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010":1}}},"id":1}"#; + let response = r#"{"jsonrpc":"2.0","result":{"0x0000000000000000000000000000000000000000000000000000000000000001":{"firstSeen":10,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000080":16},"receivedFrom":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001":10}},"0x0000000000000000000000000000000000000000000000000000000000000005":{"firstSeen":16,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010":1},"receivedFrom":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000100":2}}},"id":1}"#; assert_eq!(io.handle_request_sync(request), Some(response.to_owned())); } From aaf6da4c0003aeee8778b7954a4f48f055603351 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 10 Dec 2016 16:51:50 +0100 Subject: [PATCH 3/7] Returning persistent node id --- ethcore/light/src/net/context.rs | 29 +++++++--- ethcore/light/src/net/tests/mod.rs | 86 ++++++++++++++++-------------- sync/src/api.rs | 3 +- util/network/src/lib.rs | 2 +- 4 files changed, 70 insertions(+), 50 deletions(-) diff --git a/ethcore/light/src/net/context.rs b/ethcore/light/src/net/context.rs index c05e69b0f..af1f4c677 100644 --- a/ethcore/light/src/net/context.rs +++ b/ethcore/light/src/net/context.rs @@ -16,13 +16,13 @@ //! I/O and event context generalizations. -use network::{NetworkContext, PeerId}; +use network::{NetworkContext, PeerId, NodeId}; use super::{Announcement, LightProtocol, ReqId}; use super::error::Error; use request::Request; -/// An I/O context which allows sending and receiving packets as well as +/// An I/O context which allows sending and receiving packets as well as /// disconnecting peers. This is used as a generalization of the portions /// of a p2p network which the light protocol structure makes use of. pub trait IoContext { @@ -41,6 +41,9 @@ pub trait IoContext { /// Get a peer's protocol version. fn protocol_version(&self, peer: PeerId) -> Option; + + /// Persistent peer id + fn persistent_peer_id(&self, peer: PeerId) -> Option; } impl<'a> IoContext for NetworkContext<'a> { @@ -67,6 +70,10 @@ impl<'a> IoContext for NetworkContext<'a> { fn protocol_version(&self, peer: PeerId) -> Option { self.protocol_version(self.subprotocol_name(), peer) } + + fn persistent_peer_id(&self, peer: PeerId) -> Option { + self.session_info(peer).and_then(|info| info.id) + } } /// Context for a protocol event. @@ -75,6 +82,9 @@ pub trait EventContext { /// disconnected/connected peer. fn peer(&self) -> PeerId; + /// Returns the relevant's peer persistent Id (aka NodeId). + fn persistent_peer_id(&self) -> Option; + /// Make a request from a peer. fn request_from(&self, peer: PeerId, request: Request) -> Result; @@ -89,7 +99,7 @@ pub trait EventContext { fn disable_peer(&self, peer: PeerId); } -/// Concrete implementation of `EventContext` over the light protocol struct and +/// Concrete implementation of `EventContext` over the light protocol struct and /// an io context. pub struct Ctx<'a> { /// Io context to enable immediate response to events. @@ -97,11 +107,18 @@ pub struct Ctx<'a> { /// Protocol implementation. pub proto: &'a LightProtocol, /// Relevant peer for event. - pub peer: PeerId, + pub peer: PeerId, } impl<'a> EventContext for Ctx<'a> { - fn peer(&self) -> PeerId { self.peer } + + fn peer(&self) -> PeerId { + self.peer + } + + fn persistent_peer_id(&self) -> Option { + self.io.persistent_peer_id(self.peer) + } fn request_from(&self, peer: PeerId, request: Request) -> Result { self.proto.request_from(self.io, &peer, request) } @@ -117,4 +134,4 @@ impl<'a> EventContext for Ctx<'a> { fn disable_peer(&self, peer: PeerId) { self.io.disable_peer(peer); } -} \ No newline at end of file +} diff --git a/ethcore/light/src/net/tests/mod.rs b/ethcore/light/src/net/tests/mod.rs index 876432ce2..e2a17a41e 100644 --- a/ethcore/light/src/net/tests/mod.rs +++ b/ethcore/light/src/net/tests/mod.rs @@ -15,13 +15,13 @@ // along with Parity. If not, see . //! Tests for the `LightProtocol` implementation. -//! These don't test of the higher level logic on top of +//! These don't test of the higher level logic on top of use ethcore::blockchain_info::BlockChainInfo; use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient}; use ethcore::ids::BlockId; use ethcore::transaction::SignedTransaction; -use network::PeerId; +use network::{PeerId, NodeId}; use net::buffer_flow::FlowParams; use net::context::IoContext; @@ -68,6 +68,10 @@ impl IoContext for Expect { fn protocol_version(&self, _peer: PeerId) -> Option { Some(super::MAX_PROTOCOL_VERSION) } + + fn persistent_peer_id(&self, _peer: PeerId) -> Option { + None + } } // can't implement directly for Arc due to cross-crate orphan rules. @@ -106,7 +110,7 @@ impl Provider for TestProvider { .map(|x: u64| x.saturating_mul(req.skip + 1)) .take_while(|x| if req.reverse { x < &start_num } else { best_num - start_num >= *x }) .map(|x| if req.reverse { start_num - x } else { start_num + x }) - .map(|x| self.0.client.block_header(BlockId::Number(x))) + .map(|x| self.0.client.block_header(BlockId::Number(x))) .take_while(|x| x.is_some()) .flat_map(|x| x) .collect() @@ -139,12 +143,12 @@ impl Provider for TestProvider { } } }) - .collect() + .collect() } fn contract_code(&self, req: request::ContractCodes) -> Vec { req.code_requests.into_iter() - .map(|req| { + .map(|req| { req.account_key.iter().chain(req.account_key.iter()).cloned().collect() }) .collect() @@ -202,9 +206,9 @@ fn status(chain_info: BlockChainInfo) -> Status { #[test] fn handshake_expected() { let flow_params = make_flow_params(); - let capabilities = capabilities(); + let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let status = status(provider.client.chain_info()); @@ -217,9 +221,9 @@ fn handshake_expected() { #[should_panic] fn genesis_mismatch() { let flow_params = make_flow_params(); - let capabilities = capabilities(); + let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let mut status = status(provider.client.chain_info()); status.genesis_hash = H256::default(); @@ -232,15 +236,15 @@ fn genesis_mismatch() { #[test] fn buffer_overflow() { let flow_params = make_flow_params(); - let capabilities = capabilities(); + let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&status, &capabilities, Some(&flow_params)); - proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); + let packet_body = write_handshake(&status, &capabilities, Some(&flow_params)); + proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); } { @@ -266,9 +270,9 @@ fn buffer_overflow() { #[test] fn get_block_headers() { let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); - let capabilities = capabilities(); + let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let cur_status = status(provider.client.chain_info()); let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params)); @@ -278,8 +282,8 @@ fn get_block_headers() { let cur_status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); - proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); + let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); + proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); } @@ -300,7 +304,7 @@ fn get_block_headers() { let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Headers, 10); let mut response_stream = RlpStream::new_list(12); - + response_stream.append(&req_id).append(&new_buf); for header in headers { response_stream.append_raw(&header, 1); @@ -316,9 +320,9 @@ fn get_block_headers() { #[test] fn get_block_bodies() { let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); - let capabilities = capabilities(); + let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let cur_status = status(provider.client.chain_info()); let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params)); @@ -328,8 +332,8 @@ fn get_block_bodies() { let cur_status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); - proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); + let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); + proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); } @@ -347,7 +351,7 @@ fn get_block_bodies() { let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Bodies, 10); let mut response_stream = RlpStream::new_list(12); - + response_stream.append(&req_id).append(&new_buf); for body in bodies { response_stream.append_raw(&body, 1); @@ -363,9 +367,9 @@ fn get_block_bodies() { #[test] fn get_block_receipts() { let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); - let capabilities = capabilities(); + let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let cur_status = status(provider.client.chain_info()); let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params)); @@ -375,8 +379,8 @@ fn get_block_receipts() { let cur_status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); - proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); + let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); + proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); } @@ -400,7 +404,7 @@ fn get_block_receipts() { let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Receipts, receipts.len()); let mut response_stream = RlpStream::new_list(2 + receipts.len()); - + response_stream.append(&req_id).append(&new_buf); for block_receipts in receipts { response_stream.append_raw(&block_receipts, 1); @@ -416,15 +420,15 @@ fn get_block_receipts() { #[test] fn get_state_proofs() { let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); - let capabilities = capabilities(); + let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let cur_status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); - proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone())); + let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); + proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone())); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body); } @@ -432,7 +436,7 @@ fn get_state_proofs() { let key1 = U256::from(11223344).into(); let key2 = U256::from(99988887).into(); - let request = Request::StateProofs (request::StateProofs { + let request = Request::StateProofs (request::StateProofs { requests: vec![ request::StateProof { block: H256::default(), key1: key1, key2: None, from_level: 0 }, request::StateProof { block: H256::default(), key1: key1, key2: Some(key2), from_level: 0}, @@ -449,7 +453,7 @@ fn get_state_proofs() { let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::StateProofs, 2); let mut response_stream = RlpStream::new_list(4); - + response_stream.append(&req_id).append(&new_buf); for proof in proofs { response_stream.append_raw(&proof, 1); @@ -465,15 +469,15 @@ fn get_state_proofs() { #[test] fn get_contract_code() { let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); - let capabilities = capabilities(); + let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let cur_status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); - proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone())); + let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); + proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone())); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body); } @@ -481,7 +485,7 @@ fn get_contract_code() { let key1 = U256::from(11223344).into(); let key2 = U256::from(99988887).into(); - let request = Request::Codes (request::ContractCodes { + let request = Request::Codes (request::ContractCodes { code_requests: vec![ request::ContractCode { block_hash: H256::default(), account_key: key1 }, request::ContractCode { block_hash: H256::default(), account_key: key2 }, @@ -498,7 +502,7 @@ fn get_contract_code() { let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Codes, 2); let mut response_stream = RlpStream::new_list(4); - + response_stream.append(&req_id).append(&new_buf); for code in codes { response_stream.append(&code); @@ -509,4 +513,4 @@ fn get_contract_code() { let expected = Expect::Respond(packet::CONTRACT_CODES, response); proto.handle_packet(&expected, &1, packet::GET_CONTRACT_CODES, &request_body); -} \ No newline at end of file +} diff --git a/sync/src/api.rs b/sync/src/api.rs index 10434ce26..0f3695fe9 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -351,8 +351,7 @@ struct TxRelay(Arc); impl LightHandler for TxRelay { fn on_transactions(&self, ctx: &EventContext, relay: &[::ethcore::transaction::SignedTransaction]) { trace!(target: "les", "Relaying {} transactions from peer {}", relay.len(), ctx.peer()); - // TODO [ToDr] Can we get a peer enode somehow? - self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect(), None) + self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect(), ctx.persistent_peer_id()) } } diff --git a/util/network/src/lib.rs b/util/network/src/lib.rs index f21cb498d..a1eef68fa 100644 --- a/util/network/src/lib.rs +++ b/util/network/src/lib.rs @@ -99,7 +99,7 @@ pub use stats::NetworkStats; pub use session::SessionInfo; use io::TimerToken; -pub use node_table::is_valid_node_url; +pub use node_table::{is_valid_node_url, NodeId}; const PROTOCOL_VERSION: u32 = 4; From 19ca9ad460cbf1fa3fced7197420935473580b41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 10 Dec 2016 21:22:19 +0100 Subject: [PATCH 4/7] Prevent broadcasting transactions to peer that send them. --- ethcore/src/client/chain_notify.rs | 7 ++-- ethcore/src/client/client.rs | 9 ++--- ethcore/src/client/test_client.rs | 2 +- ethcore/src/client/traits.rs | 4 +- ethcore/src/service.rs | 6 +-- .../dapps/localtx/Transaction/transaction.js | 25 +----------- rpc/src/v1/tests/helpers/sync_provider.rs | 6 --- rpc/src/v1/tests/mocked/parity.rs | 2 +- rpc/src/v1/types/sync.rs | 12 +----- sync/src/api.rs | 8 ++-- sync/src/chain.rs | 11 +++-- sync/src/transactions_stats.rs | 40 ------------------- 12 files changed, 25 insertions(+), 107 deletions(-) diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs index 50ff20e38..ddab542fb 100644 --- a/ethcore/src/client/chain_notify.rs +++ b/ethcore/src/client/chain_notify.rs @@ -41,11 +41,10 @@ pub trait ChainNotify : Send + Sync { // does nothing by default } - /// fires when new transactions are imported - fn transactions_imported(&self, + /// fires when new transactions are received from a peer + fn transactions_received(&self, _hashes: Vec, - _peer_id: Option, - _block_num: u64, + _peer_id: usize, ) { // does nothing by default } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 1387dad9a..c258ed1eb 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -559,15 +559,14 @@ impl Client { } /// Import transactions from the IO queue - pub fn import_queued_transactions(&self, transactions: &[Bytes], peer_id: Option) -> usize { + pub fn import_queued_transactions(&self, transactions: &[Bytes], peer_id: usize) -> usize { trace!(target: "external_tx", "Importing queued"); let _timer = PerfTimer::new("import_queued_transactions"); self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst); let txs: Vec = transactions.iter().filter_map(|bytes| UntrustedRlp::new(bytes).as_val().ok()).collect(); let hashes: Vec<_> = txs.iter().map(|tx| tx.hash()).collect(); - let block_number = self.chain_info().best_block_number; self.notify(|notify| { - notify.transactions_imported(hashes.clone(), peer_id.clone(), block_number); + notify.transactions_received(hashes.clone(), peer_id); }); let results = self.miner.import_external_transactions(self, txs); results.len() @@ -1269,14 +1268,14 @@ impl BlockChainClient for Client { (*self.build_last_hashes(self.chain.read().best_block_hash())).clone() } - fn queue_transactions(&self, transactions: Vec, node_id: Option) { + fn queue_transactions(&self, transactions: Vec, peer_id: usize) { let queue_size = self.queue_transactions.load(AtomicOrdering::Relaxed); trace!(target: "external_tx", "Queue size: {}", queue_size); if queue_size > MAX_TX_QUEUE_SIZE { debug!("Ignoring {} transactions: queue is full", transactions.len()); } else { let len = transactions.len(); - match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions, node_id)) { + match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions, peer_id)) { Ok(_) => { self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst); } diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index a2af13794..7f27c9151 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -657,7 +657,7 @@ impl BlockChainClient for TestBlockChainClient { unimplemented!(); } - fn queue_transactions(&self, transactions: Vec, _peer_id: Option) { + fn queue_transactions(&self, transactions: Vec, _peer_id: usize) { // import right here let txs = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect(); self.miner.import_external_transactions(self, txs); diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs index ccf07ea3f..fed864607 100644 --- a/ethcore/src/client/traits.rs +++ b/ethcore/src/client/traits.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . use std::collections::BTreeMap; -use util::{U256, Address, H256, H512, H2048, Bytes, Itertools}; +use util::{U256, Address, H256, H2048, Bytes, Itertools}; use util::stats::Histogram; use blockchain::TreeRoute; use verification::queue::QueueInfo as BlockQueueInfo; @@ -200,7 +200,7 @@ pub trait BlockChainClient : Sync + Send { fn last_hashes(&self) -> LastHashes; /// Queue transactions for importing. - fn queue_transactions(&self, transactions: Vec, peer_id: Option); + fn queue_transactions(&self, transactions: Vec, peer_id: usize); /// list all transactions fn pending_transactions(&self) -> Vec; diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index b595843a8..9b96911e4 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -39,7 +39,7 @@ pub enum ClientIoMessage { /// A block is ready BlockVerified, /// New transaction RLPs are ready to be imported - NewTransactions(Vec, Option), + NewTransactions(Vec, usize), /// Begin snapshot restoration BeginRestoration(ManifestData), /// Feed a state chunk to the snapshot service @@ -196,8 +196,8 @@ impl IoHandler for ClientIoHandler { match *net_message { ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(); } - ClientIoMessage::NewTransactions(ref transactions, ref peer_id) => { - self.client.import_queued_transactions(transactions, peer_id.clone()); + ClientIoMessage::NewTransactions(ref transactions, peer_id) => { + self.client.import_queued_transactions(transactions, peer_id); } ClientIoMessage::BeginRestoration(ref manifest) => { if let Err(e) = self.snapshot.init_restore(manifest.clone(), true) { diff --git a/js/src/dapps/localtx/Transaction/transaction.js b/js/src/dapps/localtx/Transaction/transaction.js index c9ca10ba5..d1c98f360 100644 --- a/js/src/dapps/localtx/Transaction/transaction.js +++ b/js/src/dapps/localtx/Transaction/transaction.js @@ -86,17 +86,6 @@ class BaseTransaction extends Component { ); } - - renderReceived (stats) { - const noOfPeers = Object.keys(stats.receivedFrom).length; - const noOfPropagations = Object.values(stats.receivedFrom).reduce((sum, val) => sum + val, 0); - - return ( - - { noOfPropagations } ({ noOfPeers } peers) - - ); - } } export class Transaction extends BaseTransaction { @@ -113,8 +102,7 @@ export class Transaction extends BaseTransaction { isLocal: false, stats: { firstSeen: 0, - propagatedTo: {}, - receivedFrom: {} + propagatedTo: {} } }; @@ -140,9 +128,6 @@ export class Transaction extends BaseTransaction { # Propagated - - # Received - ); @@ -179,9 +164,6 @@ export class Transaction extends BaseTransaction { { this.renderPropagation(stats) } - - { this.renderReceived(stats) } - ); } @@ -210,8 +192,7 @@ export class LocalTransaction extends BaseTransaction { static defaultProps = { stats: { - propagatedTo: {}, - receivedFrom: {} + propagatedTo: {} } }; @@ -335,8 +316,6 @@ export class LocalTransaction extends BaseTransaction { { this.renderStatus() }
{ status === 'pending' ? this.renderPropagation(stats) : null } -
- { status === 'pending' ? this.renderReceived(stats) : null } ); diff --git a/rpc/src/v1/tests/helpers/sync_provider.rs b/rpc/src/v1/tests/helpers/sync_provider.rs index aa7e8d849..2517abd46 100644 --- a/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/rpc/src/v1/tests/helpers/sync_provider.rs @@ -106,18 +106,12 @@ impl SyncProvider for TestSyncProvider { propagated_to: map![ 128.into() => 16 ], - received_from: map![ - 1.into() => 10 - ], }, 5.into() => TransactionStats { first_seen: 16, propagated_to: map![ 16.into() => 1 ], - received_from: map![ - 256.into() => 2 - ], } ] } diff --git a/rpc/src/v1/tests/mocked/parity.rs b/rpc/src/v1/tests/mocked/parity.rs index 45ee4aa75..9b4daaccd 100644 --- a/rpc/src/v1/tests/mocked/parity.rs +++ b/rpc/src/v1/tests/mocked/parity.rs @@ -363,7 +363,7 @@ fn rpc_parity_transactions_stats() { let io = deps.default_client(); let request = r#"{"jsonrpc": "2.0", "method": "parity_pendingTransactionsStats", "params":[], "id": 1}"#; - let response = r#"{"jsonrpc":"2.0","result":{"0x0000000000000000000000000000000000000000000000000000000000000001":{"firstSeen":10,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000080":16},"receivedFrom":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001":10}},"0x0000000000000000000000000000000000000000000000000000000000000005":{"firstSeen":16,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010":1},"receivedFrom":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000100":2}}},"id":1}"#; + let response = r#"{"jsonrpc":"2.0","result":{"0x0000000000000000000000000000000000000000000000000000000000000001":{"firstSeen":10,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000080":16}},"0x0000000000000000000000000000000000000000000000000000000000000005":{"firstSeen":16,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010":1}}},"id":1}"#; assert_eq!(io.handle_request_sync(request), Some(response.to_owned())); } diff --git a/rpc/src/v1/types/sync.rs b/rpc/src/v1/types/sync.rs index 65d989156..8d3726e7a 100644 --- a/rpc/src/v1/types/sync.rs +++ b/rpc/src/v1/types/sync.rs @@ -127,9 +127,6 @@ pub struct TransactionStats { /// Peers this transaction was propagated to with count. #[serde(rename="propagatedTo")] pub propagated_to: BTreeMap, - /// Peers that propagated this transaction back. - #[serde(rename="receivedFrom")] - pub received_from: BTreeMap, } impl From for PeerInfo { @@ -161,10 +158,6 @@ impl From for TransactionStats { .into_iter() .map(|(id, count)| (id.into(), count)) .collect(), - received_from: s.received_from - .into_iter() - .map(|(id, count)| (id.into(), count)) - .collect(), } } } @@ -216,12 +209,9 @@ mod tests { propagated_to: map![ 10.into() => 50 ], - received_from: map![ - 1.into() => 1000 - ], }; let serialized = serde_json::to_string(&stats).unwrap(); - assert_eq!(serialized, r#"{"firstSeen":100,"propagatedTo":{"0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a":50},"receivedFrom":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001":1000}}"#) + assert_eq!(serialized, r#"{"firstSeen":100,"propagatedTo":{"0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a":50}}"#) } } diff --git a/sync/src/api.rs b/sync/src/api.rs index 0f3695fe9..c675ee6d3 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -99,8 +99,6 @@ pub struct TransactionStats { pub first_seen: u64, /// Peers it was propagated to. pub propagated_to: BTreeMap, - /// Peers that propagated the transaction back. - pub received_from: BTreeMap, } /// Peer connection information @@ -338,9 +336,9 @@ impl ChainNotify for EthSync { self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e)); } - fn transactions_imported(&self, hashes: Vec, peer_id: Option, block_number: u64) { + fn transactions_received(&self, hashes: Vec, peer_id: PeerId) { let mut sync = self.sync_handler.sync.write(); - sync.transactions_imported(hashes, peer_id, block_number); + sync.transactions_received(hashes, peer_id); } } @@ -351,7 +349,7 @@ struct TxRelay(Arc); impl LightHandler for TxRelay { fn on_transactions(&self, ctx: &EventContext, relay: &[::ethcore::transaction::SignedTransaction]) { trace!(target: "les", "Relaying {} transactions from peer {}", relay.len(), ctx.peer()); - self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect(), ctx.persistent_peer_id()) + self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect(), ctx.peer()) } } diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 243c6b431..ecd95c68a 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -432,10 +432,10 @@ impl ChainSync { self.transactions_stats.stats() } - /// Updates statistics for imported transactions. - pub fn transactions_imported(&mut self, hashes: Vec, peer_id: Option, block_number: u64) { - for hash in hashes { - self.transactions_stats.received(hash, peer_id, block_number); + /// Updates transactions were received by a peer + pub fn transactions_received(&mut self, hashes: Vec, peer_id: PeerId) { + if let Some(mut peer_info) = self.peers.get_mut(&peer_id) { + peer_info.last_sent_transactions.extend(&hashes); } } @@ -1416,8 +1416,7 @@ impl ChainSync { let tx = rlp.as_raw().to_vec(); transactions.push(tx); } - let id = io.peer_session_info(peer_id).and_then(|info| info.id); - io.chain().queue_transactions(transactions, id); + io.chain().queue_transactions(transactions, peer_id); Ok(()) } diff --git a/sync/src/transactions_stats.rs b/sync/src/transactions_stats.rs index a91a860e5..fa8eb6e82 100644 --- a/sync/src/transactions_stats.rs +++ b/sync/src/transactions_stats.rs @@ -26,7 +26,6 @@ type BlockNumber = u64; pub struct Stats { first_seen: BlockNumber, propagated_to: HashMap, - received_from: HashMap, } impl Stats { @@ -34,7 +33,6 @@ impl Stats { Stats { first_seen: number, propagated_to: Default::default(), - received_from: Default::default(), } } } @@ -47,10 +45,6 @@ impl<'a> From<&'a Stats> for TransactionStats { .iter() .map(|(hash, size)| (*hash, *size)) .collect(), - received_from: other.received_from - .iter() - .map(|(hash, size)| (*hash, *size)) - .collect(), } } } @@ -69,14 +63,6 @@ impl TransactionsStats { *count = count.saturating_add(1); } - /// Increase number of back-propagations from given `enodeid`. - pub fn received(&mut self, hash: H256, enode_id: Option, current_block_num: BlockNumber) { - let enode_id = enode_id.unwrap_or_default(); - let mut stats = self.pending_transactions.entry(hash).or_insert_with(|| Stats::new(current_block_num)); - let mut count = stats.received_from.entry(enode_id).or_insert(0); - *count = count.saturating_add(1); - } - /// Returns propagation stats for given hash or `None` if hash is not known. #[cfg(test)] pub fn get(&self, hash: &H256) -> Option<&Stats> { @@ -127,32 +113,6 @@ mod tests { enodeid1 => 2, enodeid2 => 1 ], - received_from: Default::default(), - })); - } - - #[test] - fn should_keep_track_of_back_propagations() { - // given - let mut stats = TransactionsStats::default(); - let hash = 5.into(); - let enodeid1 = 2.into(); - let enodeid2 = 5.into(); - - // when - stats.received(hash, Some(enodeid1), 5); - stats.received(hash, Some(enodeid1), 10); - stats.received(hash, Some(enodeid2), 15); - - // then - let stats = stats.get(&hash); - assert_eq!(stats, Some(&Stats { - first_seen: 5, - propagated_to: Default::default(), - received_from: hash_map![ - enodeid1 => 2, - enodeid2 => 1 - ] })); } From b5020d3c8d387ed3a51a1779688eb904e1b165d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 10 Dec 2016 21:25:08 +0100 Subject: [PATCH 5/7] Fixing Light context API --- ethcore/light/src/net/context.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ethcore/light/src/net/context.rs b/ethcore/light/src/net/context.rs index af1f4c677..522722bcd 100644 --- a/ethcore/light/src/net/context.rs +++ b/ethcore/light/src/net/context.rs @@ -43,7 +43,7 @@ pub trait IoContext { fn protocol_version(&self, peer: PeerId) -> Option; /// Persistent peer id - fn persistent_peer_id(&self, peer: PeerId) -> Option; + fn persistent_peer_id(&self, peer: &PeerId) -> Option; } impl<'a> IoContext for NetworkContext<'a> { @@ -71,8 +71,8 @@ impl<'a> IoContext for NetworkContext<'a> { self.protocol_version(self.subprotocol_name(), peer) } - fn persistent_peer_id(&self, peer: PeerId) -> Option { - self.session_info(peer).and_then(|info| info.id) + fn persistent_peer_id(&self, peer: &PeerId) -> Option { + self.session_info(*peer).and_then(|info| info.id) } } @@ -83,7 +83,7 @@ pub trait EventContext { fn peer(&self) -> PeerId; /// Returns the relevant's peer persistent Id (aka NodeId). - fn persistent_peer_id(&self) -> Option; + fn persistent_peer_id(&self, id: &PeerId) -> Option; /// Make a request from a peer. fn request_from(&self, peer: PeerId, request: Request) -> Result; @@ -116,8 +116,8 @@ impl<'a> EventContext for Ctx<'a> { self.peer } - fn persistent_peer_id(&self) -> Option { - self.io.persistent_peer_id(self.peer) + fn persistent_peer_id(&self, id: &PeerId) -> Option { + self.io.persistent_peer_id(id) } fn request_from(&self, peer: PeerId, request: Request) -> Result { self.proto.request_from(self.io, &peer, request) From 1e8638608c3ccc5db718bf3f6d2e77918e585abe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 10 Dec 2016 22:59:35 +0100 Subject: [PATCH 6/7] fixing tests --- ethcore/light/src/net/tests/mod.rs | 2 +- ethcore/src/client/chain_notify.rs | 2 +- ethcore/src/client/client.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ethcore/light/src/net/tests/mod.rs b/ethcore/light/src/net/tests/mod.rs index e2a17a41e..227eb9b7c 100644 --- a/ethcore/light/src/net/tests/mod.rs +++ b/ethcore/light/src/net/tests/mod.rs @@ -69,7 +69,7 @@ impl IoContext for Expect { Some(super::MAX_PROTOCOL_VERSION) } - fn persistent_peer_id(&self, _peer: PeerId) -> Option { + fn persistent_peer_id(&self, _peer: &PeerId) -> Option { None } } diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs index ddab542fb..eb695e063 100644 --- a/ethcore/src/client/chain_notify.rs +++ b/ethcore/src/client/chain_notify.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . use ipc::IpcConfig; -use util::{H256, H512}; +use util::H256; /// Represents what has to be handled by actor listening to chain events #[ipc] diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index c258ed1eb..70f198987 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -25,7 +25,7 @@ use time::precise_time_ns; use util::{Bytes, PerfTimer, Itertools, Mutex, RwLock, Hashable}; use util::{journaldb, TrieFactory, Trie}; use util::trie::TrieSpec; -use util::{U256, H256, H512, Address, H2048, Uint, FixedHash}; +use util::{U256, H256, Address, H2048, Uint, FixedHash}; use util::kvdb::*; // other From 83d9bc189b6febc532cdbb5894fbcc08aa7f9ba7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sun, 11 Dec 2016 21:08:15 +0100 Subject: [PATCH 7/7] Fixing test --- ethcore/light/src/net/tests/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethcore/light/src/net/tests/mod.rs b/ethcore/light/src/net/tests/mod.rs index 0b3226f0d..46e47bb3c 100644 --- a/ethcore/light/src/net/tests/mod.rs +++ b/ethcore/light/src/net/tests/mod.rs @@ -69,7 +69,7 @@ impl IoContext for Expect { Some(super::MAX_PROTOCOL_VERSION) } - fn persistent_peer_id(&self, _peer: &PeerId) -> Option { + fn persistent_peer_id(&self, _peer: PeerId) -> Option { None } }