From cabb028c1ce67b5e6f4d9fb3c224dc73ee100e31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 1 Sep 2016 19:07:58 +0200 Subject: [PATCH] Propagating transactions to peers on timer. (#2035) --- ethcore/src/client/test_client.rs | 20 ++++- sync/src/api.rs | 1 + sync/src/chain.rs | 124 ++++++++++++++++++++++++------ 3 files changed, 119 insertions(+), 26 deletions(-) diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index fb7f9083e..9ee7f0933 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -33,7 +33,7 @@ use receipt::{Receipt, LocalizedReceipt}; use blockchain::extras::BlockReceipts; use error::{ImportResult}; use evm::{Factory as EvmFactory, VMType}; -use miner::{Miner, MinerService}; +use miner::{Miner, MinerService, TransactionImportResult}; use spec::Spec; use block_queue::BlockQueueInfo; @@ -254,6 +254,24 @@ impl TestBlockChainClient { BlockID::Latest | BlockID::Pending => self.numbers.read().get(&(self.numbers.read().len() - 1)).cloned() } } + + /// Inserts a transaction to miners transactions queue. + pub fn insert_transaction_to_queue(&self) { + let keypair = Random.generate().unwrap(); + let tx = Transaction { + action: Action::Create, + value: U256::from(100), + data: "3331600055".from_hex().unwrap(), + gas: U256::from(100_000), + gas_price: U256::one(), + nonce: U256::zero() + }; + let signed_tx = tx.sign(keypair.secret()); + self.set_balance(signed_tx.sender().unwrap(), 10_000_000.into()); + let res = self.miner.import_external_transactions(self, vec![signed_tx]); + let res = res.into_iter().next().unwrap().expect("Successful import"); + assert_eq!(res, TransactionImportResult::Current); + } } pub fn get_temp_journal_db() -> GuardedTempResult> { diff --git a/sync/src/api.rs b/sync/src/api.rs index 7b866d2bf..a19004642 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -119,6 +119,7 @@ impl NetworkProtocolHandler for SyncProtocolHandler { fn timeout(&self, io: &NetworkContext, _timer: TimerToken) { self.sync.write().maintain_peers(&mut NetSyncIo::new(io, &*self.chain)); self.sync.write().maintain_sync(&mut NetSyncIo::new(io, &*self.chain)); + self.sync.write().propagate_new_transactions(&mut NetSyncIo::new(io, &*self.chain)); } } diff --git a/sync/src/chain.rs b/sync/src/chain.rs index f5a54a0ba..10d90707a 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -241,7 +241,9 @@ struct PeerInfo { asking_hash: Option, /// Request timestamp ask_time: f64, - /// Pending request is expird and result should be ignored + /// Holds a set of transactions recently sent to this peer to avoid spamming. + last_sent_transactions: HashSet, + /// Pending request is expired and result should be ignored expired: bool, /// Peer fork confirmation status confirmation: ForkConfirmation, @@ -406,6 +408,7 @@ impl ChainSync { asking_blocks: Vec::new(), asking_hash: None, ask_time: 0f64, + last_sent_transactions: HashSet::new(), expired: false, confirmation: if self.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed }, }; @@ -1447,42 +1450,65 @@ impl ChainSync { } /// propagates new transactions to all peers - fn propagate_new_transactions(&mut self, io: &mut SyncIo) -> usize { + pub fn propagate_new_transactions(&mut self, io: &mut SyncIo) -> usize { // Early out of nobody to send to. if self.peers.is_empty() { return 0; } - let mut transactions = io.chain().pending_transactions(); + let transactions = io.chain().pending_transactions(); if transactions.is_empty() { return 0; } - let mut packet = RlpStream::new_list(transactions.len()); - let tx_count = transactions.len(); - for tx in transactions.drain(..) { - packet.append(&tx); - } - let rlp = packet.out(); - - let lucky_peers = { - // sqrt(x)/x scaled to max u32 - let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32; - let small = self.peers.len() < MIN_PEERS_PROPAGATION; - let lucky_peers = self.peers.iter() - .filter_map(|(&p, _)| if small || ::rand::random::() < fraction { Some(p.clone()) } else { None }) - .collect::>(); - - // taking at max of MAX_PEERS_PROPAGATION - lucky_peers.iter().cloned().take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).collect::>() + let all_transactions_hashes = transactions.iter().map(|ref tx| tx.hash()).collect::>(); + let all_transactions_rlp = { + let mut packet = RlpStream::new_list(transactions.len()); + for tx in &transactions { packet.append(tx); } + packet.out() }; + // sqrt(x)/x scaled to max u32 + let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32; + let small = self.peers.len() < MIN_PEERS_PROPAGATION; + + let lucky_peers = self.peers.iter_mut() + .filter(|_| small || ::rand::random::() < fraction) + .take(MAX_PEERS_PROPAGATION) + .filter_map(|(peer_id, mut peer_info)| { + // Send all transactions + if peer_info.last_sent_transactions.is_empty() { + peer_info.last_sent_transactions = all_transactions_hashes.clone(); + return Some((*peer_id, all_transactions_rlp.clone())); + } + + // Get hashes of all transactions to send to this peer + let to_send = all_transactions_hashes.difference(&peer_info.last_sent_transactions).cloned().collect::>(); + if to_send.is_empty() { + return None; + } + + // Construct RLP + let mut packet = RlpStream::new_list(to_send.len()); + for tx in &transactions { + if to_send.contains(&tx.hash()) { + packet.append(tx); + } + } + + peer_info.last_sent_transactions = to_send; + Some((*peer_id, packet.out())) + }) + .collect::>(); + + // Send RLPs let sent = lucky_peers.len(); - for peer_id in lucky_peers { - self.send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp.clone()); + for (peer_id, rlp) in lucky_peers.into_iter() { + self.send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp); } - trace!(target: "sync", "Sent {} transactions to {} peers.", tx_count, sent); + + trace!(target: "sync", "Sent up to {} transactions to {} peers.", transactions.len(), sent); sent } @@ -1512,16 +1538,18 @@ impl ChainSync { self.check_resume(io); } - /// called when block is imported to chain, updates transactions queue and propagates the blocks + /// called when block is imported to chain - propagates the blocks and updates transactions sent to peers pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256], sealed: &[H256]) { if io.is_chain_queue_empty() { - // Propagate latests blocks self.propagate_latest_blocks(io, sealed); } if !invalid.is_empty() { trace!(target: "sync", "Bad blocks in the queue, restarting"); self.restart_on_bad_block(io); } + for peer_info in self.peers.values_mut() { + peer_info.last_sent_transactions.clear(); + } } } @@ -1723,6 +1751,7 @@ mod tests { asking_blocks: Vec::new(), asking_hash: None, ask_time: 0f64, + last_sent_transactions: HashSet::new(), expired: false, confirmation: super::ForkConfirmation::Confirmed, }); @@ -1819,6 +1848,51 @@ mod tests { assert_eq!(0x07, io.queue[0].packet_id); } + #[test] + fn propagates_transactions() { + let mut client = TestBlockChainClient::new(); + client.add_blocks(100, EachBlockWith::Uncle); + client.insert_transaction_to_queue(); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client); + let mut queue = VecDeque::new(); + let mut io = TestIo::new(&mut client, &mut queue, None); + let peer_count = sync.propagate_new_transactions(&mut io); + // Try to propagate same transactions for the second time + let peer_count2 = sync.propagate_new_transactions(&mut io); + + // 1 message should be send + assert_eq!(1, io.queue.len()); + // 1 peer should be updated but only once + assert_eq!(1, peer_count); + assert_eq!(0, peer_count2); + // TRANSACTIONS_PACKET + assert_eq!(0x02, io.queue[0].packet_id); + } + + #[test] + fn propagates_transactions_again_after_new_block() { + let mut client = TestBlockChainClient::new(); + client.add_blocks(100, EachBlockWith::Uncle); + client.insert_transaction_to_queue(); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client); + let mut queue = VecDeque::new(); + let mut io = TestIo::new(&mut client, &mut queue, None); + let peer_count = sync.propagate_new_transactions(&mut io); + sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[]); + // Try to propagate same transactions for the second time + let peer_count2 = sync.propagate_new_transactions(&mut io); + + // 1 message should be send + assert_eq!(2, io.queue.len()); + // 1 peer should be updated but only once + assert_eq!(1, peer_count); + assert_eq!(1, peer_count2); + // TRANSACTIONS_PACKET + assert_eq!(0x02, io.queue[0].packet_id); + assert_eq!(0x02, io.queue[1].packet_id); + } + + #[test] fn handles_peer_new_block_malformed() { let mut client = TestBlockChainClient::new();