From 3438cda4324106c164eac3686ff1a60efc7a3372 Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 6 Apr 2016 23:03:07 +0200 Subject: [PATCH] Propagate transaction queue --- miner/src/lib.rs | 5 +++- miner/src/miner.rs | 5 ++++ rpc/src/v1/impls/eth.rs | 8 +++---- rpc/src/v1/tests/helpers/miner_service.rs | 4 ++++ rpc/src/v1/tests/helpers/sync_provider.rs | 5 +--- sync/src/chain.rs | 28 +++++++++-------------- sync/src/lib.rs | 10 +------- 7 files changed, 29 insertions(+), 36 deletions(-) diff --git a/miner/src/lib.rs b/miner/src/lib.rs index 9f757fb67..c6e07a953 100644 --- a/miner/src/lib.rs +++ b/miner/src/lib.rs @@ -105,9 +105,12 @@ pub trait MinerService : Send + Sync { /// Get the sealing work package and if `Some`, apply some transform. fn map_sealing_work(&self, chain: &BlockChainClient, f: F) -> Option where F: FnOnce(&ClosedBlock) -> T; - /// Query pending transactions for hash + /// Query pending transactions for hash. fn transaction(&self, hash: &H256) -> Option; + /// Get a list of all pending transactions. + fn pending_transactions(&self) -> Vec; + /// Returns highest transaction nonce for given address. fn last_nonce(&self, address: &Address) -> Option; diff --git a/miner/src/miner.rs b/miner/src/miner.rs index 0e31c504f..3b4d1f32d 100644 --- a/miner/src/miner.rs +++ b/miner/src/miner.rs @@ -228,6 +228,11 @@ impl MinerService for Miner { queue.find(hash) } + fn pending_transactions(&self) -> Vec { + let queue = self.transaction_queue.lock().unwrap(); + queue.top_transactions() + } + fn last_nonce(&self, address: &Address) -> Option { self.transaction_queue.lock().unwrap().last_nonce(address) } diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 30dc79662..ad4b037df 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -186,7 +186,7 @@ impl EthClient }.fake_sign(from)) } - fn dispatch_transaction(&self, signed_transaction: SignedTransaction, raw_transaction: Vec) -> Result { + fn dispatch_transaction(&self, signed_transaction: SignedTransaction) -> Result { let hash = signed_transaction.hash(); let import = { @@ -203,7 +203,6 @@ impl EthClient match import.into_iter().collect::, _>>() { Ok(_) => { - take_weak!(self.sync).new_transaction(raw_transaction); to_value(&hash) } Err(e) => { @@ -504,8 +503,7 @@ impl Eth for EthClient data: request.data.map_or_else(Vec::new, |d| d.to_vec()), }.sign(&secret) }; - let raw_transaction = encode(&signed_transaction).to_vec(); - self.dispatch_transaction(signed_transaction, raw_transaction) + self.dispatch_transaction(signed_transaction) }, Err(_) => { to_value(&H256::zero()) } } @@ -517,7 +515,7 @@ impl Eth for EthClient .and_then(|(raw_transaction, )| { let raw_transaction = raw_transaction.to_vec(); match UntrustedRlp::new(&raw_transaction).as_val() { - Ok(signed_transaction) => self.dispatch_transaction(signed_transaction, raw_transaction), + Ok(signed_transaction) => self.dispatch_transaction(signed_transaction), Err(_) => to_value(&H256::zero()), } }) diff --git a/rpc/src/v1/tests/helpers/miner_service.rs b/rpc/src/v1/tests/helpers/miner_service.rs index 80a5e356d..815085d3b 100644 --- a/rpc/src/v1/tests/helpers/miner_service.rs +++ b/rpc/src/v1/tests/helpers/miner_service.rs @@ -98,6 +98,10 @@ impl MinerService for TestMinerService { self.pending_transactions.lock().unwrap().get(hash).cloned() } + fn pending_transactions(&self) -> Vec { + self.pending_transactions.lock().unwrap().values().cloned().collect() + } + fn last_nonce(&self, address: &Address) -> Option { self.last_nonces.read().unwrap().get(address).cloned() } diff --git a/rpc/src/v1/tests/helpers/sync_provider.rs b/rpc/src/v1/tests/helpers/sync_provider.rs index 59188f0a7..633e0d45b 100644 --- a/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/rpc/src/v1/tests/helpers/sync_provider.rs @@ -16,7 +16,7 @@ //! Test implementation of SyncProvider. -use util::{U256, Bytes}; +use util::{U256}; use ethsync::{SyncProvider, SyncStatus, SyncState}; use std::sync::{RwLock}; @@ -59,8 +59,5 @@ impl SyncProvider for TestSyncProvider { fn status(&self) -> SyncStatus { self.status.read().unwrap().clone() } - - fn new_transaction(&self, _raw_transaction: Bytes) { - } } diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 1a7d11f51..74bcb8d38 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -217,10 +217,6 @@ pub struct ChainSync { network_id: U256, /// Miner miner: Arc, - - /// Transactions to propagate - // TODO: reconsider where this is in the codebase - seems a little dodgy to have here. - transactions_to_send: Vec, } type RlpResponseResult = Result, PacketDecodeError>; @@ -247,7 +243,6 @@ impl ChainSync { max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), network_id: config.network_id, miner: miner, - transactions_to_send: vec![], } } @@ -950,11 +945,6 @@ impl ChainSync { } } - /// Place a new transaction on the wire. - pub fn new_transaction(&mut self, raw_transaction: Bytes) { - self.transactions_to_send.push(raw_transaction); - } - /// Called when peer sends us new transactions fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { // accepting transactions once only fully synced @@ -1296,11 +1286,16 @@ impl ChainSync { return 0; } - let mut packet = RlpStream::new_list(self.transactions_to_send.len()); - for tx in &self.transactions_to_send { - packet.append_raw(tx, 1); + let mut transactions = self.miner.pending_transactions(); + if transactions.is_empty() { + return 0; + } + + let mut packet = RlpStream::new_list(transactions.len()); + let tx_count = transactions.len(); + for tx in transactions.drain(..) { + packet.append(&tx); } - self.transactions_to_send.clear(); let rlp = packet.out(); let lucky_peers = { @@ -1319,13 +1314,12 @@ impl ChainSync { for peer_id in lucky_peers { self.send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp.clone()); } + trace!(target: "sync", "Sent {} transactions to {} peers.", tx_count, sent); sent } fn propagate_latest_blocks(&mut self, io: &mut SyncIo) { - if !self.transactions_to_send.is_empty() { - self.propagate_new_transactions(io); - } + self.propagate_new_transactions(io); let chain_info = io.chain().chain_info(); if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { let blocks = self.propagate_blocks(&chain_info, io); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index ea4a1daea..a4f6eff38 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -66,7 +66,7 @@ use std::ops::*; use std::sync::*; use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; use util::TimerToken; -use util::{U256, Bytes, ONE_U256}; +use util::{U256, ONE_U256}; use ethcore::client::Client; use ethcore::service::SyncMessage; use ethminer::Miner; @@ -101,9 +101,6 @@ impl Default for SyncConfig { pub trait SyncProvider: Send + Sync { /// Get sync status fn status(&self) -> SyncStatus; - - /// Note that a user has submitted a new transaction. - fn new_transaction(&self, raw_transaction: Bytes); } /// Ethereum network protocol handler @@ -143,11 +140,6 @@ impl SyncProvider for EthSync { fn status(&self) -> SyncStatus { self.sync.read().unwrap().status() } - - /// Note that a user has submitted a new transaction. - fn new_transaction(&self, raw_transaction: Bytes) { - self.sync.write().unwrap().new_transaction(raw_transaction); - } } impl NetworkProtocolHandler for EthSync {