Propagate transaction queue

This commit is contained in:
arkpar 2016-04-06 23:03:07 +02:00
parent 5685fde606
commit 3438cda432
7 changed files with 29 additions and 36 deletions

View File

@ -105,9 +105,12 @@ pub trait MinerService : Send + Sync {
/// Get the sealing work package and if `Some`, apply some transform. /// Get the sealing work package and if `Some`, apply some transform.
fn map_sealing_work<F, T>(&self, chain: &BlockChainClient, f: F) -> Option<T> where F: FnOnce(&ClosedBlock) -> T; fn map_sealing_work<F, T>(&self, chain: &BlockChainClient, f: F) -> Option<T> where F: FnOnce(&ClosedBlock) -> T;
/// Query pending transactions for hash /// Query pending transactions for hash.
fn transaction(&self, hash: &H256) -> Option<SignedTransaction>; fn transaction(&self, hash: &H256) -> Option<SignedTransaction>;
/// Get a list of all pending transactions.
fn pending_transactions(&self) -> Vec<SignedTransaction>;
/// Returns highest transaction nonce for given address. /// Returns highest transaction nonce for given address.
fn last_nonce(&self, address: &Address) -> Option<U256>; fn last_nonce(&self, address: &Address) -> Option<U256>;

View File

@ -228,6 +228,11 @@ impl MinerService for Miner {
queue.find(hash) queue.find(hash)
} }
fn pending_transactions(&self) -> Vec<SignedTransaction> {
let queue = self.transaction_queue.lock().unwrap();
queue.top_transactions()
}
fn last_nonce(&self, address: &Address) -> Option<U256> { fn last_nonce(&self, address: &Address) -> Option<U256> {
self.transaction_queue.lock().unwrap().last_nonce(address) self.transaction_queue.lock().unwrap().last_nonce(address)
} }

View File

@ -186,7 +186,7 @@ impl<C, S, A, M, EM> EthClient<C, S, A, M, EM>
}.fake_sign(from)) }.fake_sign(from))
} }
fn dispatch_transaction(&self, signed_transaction: SignedTransaction, raw_transaction: Vec<u8>) -> Result<Value, Error> { fn dispatch_transaction(&self, signed_transaction: SignedTransaction) -> Result<Value, Error> {
let hash = signed_transaction.hash(); let hash = signed_transaction.hash();
let import = { let import = {
@ -203,7 +203,6 @@ impl<C, S, A, M, EM> EthClient<C, S, A, M, EM>
match import.into_iter().collect::<Result<Vec<_>, _>>() { match import.into_iter().collect::<Result<Vec<_>, _>>() {
Ok(_) => { Ok(_) => {
take_weak!(self.sync).new_transaction(raw_transaction);
to_value(&hash) to_value(&hash)
} }
Err(e) => { Err(e) => {
@ -504,8 +503,7 @@ impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM>
data: request.data.map_or_else(Vec::new, |d| d.to_vec()), data: request.data.map_or_else(Vec::new, |d| d.to_vec()),
}.sign(&secret) }.sign(&secret)
}; };
let raw_transaction = encode(&signed_transaction).to_vec(); self.dispatch_transaction(signed_transaction)
self.dispatch_transaction(signed_transaction, raw_transaction)
}, },
Err(_) => { to_value(&H256::zero()) } Err(_) => { to_value(&H256::zero()) }
} }
@ -517,7 +515,7 @@ impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM>
.and_then(|(raw_transaction, )| { .and_then(|(raw_transaction, )| {
let raw_transaction = raw_transaction.to_vec(); let raw_transaction = raw_transaction.to_vec();
match UntrustedRlp::new(&raw_transaction).as_val() { match UntrustedRlp::new(&raw_transaction).as_val() {
Ok(signed_transaction) => self.dispatch_transaction(signed_transaction, raw_transaction), Ok(signed_transaction) => self.dispatch_transaction(signed_transaction),
Err(_) => to_value(&H256::zero()), Err(_) => to_value(&H256::zero()),
} }
}) })

View File

@ -98,6 +98,10 @@ impl MinerService for TestMinerService {
self.pending_transactions.lock().unwrap().get(hash).cloned() self.pending_transactions.lock().unwrap().get(hash).cloned()
} }
fn pending_transactions(&self) -> Vec<SignedTransaction> {
self.pending_transactions.lock().unwrap().values().cloned().collect()
}
fn last_nonce(&self, address: &Address) -> Option<U256> { fn last_nonce(&self, address: &Address) -> Option<U256> {
self.last_nonces.read().unwrap().get(address).cloned() self.last_nonces.read().unwrap().get(address).cloned()
} }

View File

@ -16,7 +16,7 @@
//! Test implementation of SyncProvider. //! Test implementation of SyncProvider.
use util::{U256, Bytes}; use util::{U256};
use ethsync::{SyncProvider, SyncStatus, SyncState}; use ethsync::{SyncProvider, SyncStatus, SyncState};
use std::sync::{RwLock}; use std::sync::{RwLock};
@ -59,8 +59,5 @@ impl SyncProvider for TestSyncProvider {
fn status(&self) -> SyncStatus { fn status(&self) -> SyncStatus {
self.status.read().unwrap().clone() self.status.read().unwrap().clone()
} }
fn new_transaction(&self, _raw_transaction: Bytes) {
}
} }

View File

@ -217,10 +217,6 @@ pub struct ChainSync {
network_id: U256, network_id: U256,
/// Miner /// Miner
miner: Arc<Miner>, miner: Arc<Miner>,
/// Transactions to propagate
// TODO: reconsider where this is in the codebase - seems a little dodgy to have here.
transactions_to_send: Vec<Bytes>,
} }
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>; type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
@ -247,7 +243,6 @@ impl ChainSync {
max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
network_id: config.network_id, network_id: config.network_id,
miner: miner, miner: miner,
transactions_to_send: vec![],
} }
} }
@ -950,11 +945,6 @@ impl ChainSync {
} }
} }
/// Place a new transaction on the wire.
pub fn new_transaction(&mut self, raw_transaction: Bytes) {
self.transactions_to_send.push(raw_transaction);
}
/// Called when peer sends us new transactions /// Called when peer sends us new transactions
fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
// accepting transactions once only fully synced // accepting transactions once only fully synced
@ -1296,11 +1286,16 @@ impl ChainSync {
return 0; return 0;
} }
let mut packet = RlpStream::new_list(self.transactions_to_send.len()); let mut transactions = self.miner.pending_transactions();
for tx in &self.transactions_to_send { if transactions.is_empty() {
packet.append_raw(tx, 1); return 0;
}
let mut packet = RlpStream::new_list(transactions.len());
let tx_count = transactions.len();
for tx in transactions.drain(..) {
packet.append(&tx);
} }
self.transactions_to_send.clear();
let rlp = packet.out(); let rlp = packet.out();
let lucky_peers = { let lucky_peers = {
@ -1319,13 +1314,12 @@ impl ChainSync {
for peer_id in lucky_peers { for peer_id in lucky_peers {
self.send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp.clone()); self.send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp.clone());
} }
trace!(target: "sync", "Sent {} transactions to {} peers.", tx_count, sent);
sent sent
} }
fn propagate_latest_blocks(&mut self, io: &mut SyncIo) { fn propagate_latest_blocks(&mut self, io: &mut SyncIo) {
if !self.transactions_to_send.is_empty() {
self.propagate_new_transactions(io); self.propagate_new_transactions(io);
}
let chain_info = io.chain().chain_info(); let chain_info = io.chain().chain_info();
if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
let blocks = self.propagate_blocks(&chain_info, io); let blocks = self.propagate_blocks(&chain_info, io);

View File

@ -66,7 +66,7 @@ use std::ops::*;
use std::sync::*; use std::sync::*;
use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId};
use util::TimerToken; use util::TimerToken;
use util::{U256, Bytes, ONE_U256}; use util::{U256, ONE_U256};
use ethcore::client::Client; use ethcore::client::Client;
use ethcore::service::SyncMessage; use ethcore::service::SyncMessage;
use ethminer::Miner; use ethminer::Miner;
@ -101,9 +101,6 @@ impl Default for SyncConfig {
pub trait SyncProvider: Send + Sync { pub trait SyncProvider: Send + Sync {
/// Get sync status /// Get sync status
fn status(&self) -> SyncStatus; fn status(&self) -> SyncStatus;
/// Note that a user has submitted a new transaction.
fn new_transaction(&self, raw_transaction: Bytes);
} }
/// Ethereum network protocol handler /// Ethereum network protocol handler
@ -143,11 +140,6 @@ impl SyncProvider for EthSync {
fn status(&self) -> SyncStatus { fn status(&self) -> SyncStatus {
self.sync.read().unwrap().status() self.sync.read().unwrap().status()
} }
/// Note that a user has submitted a new transaction.
fn new_transaction(&self, raw_transaction: Bytes) {
self.sync.write().unwrap().new_transaction(raw_transaction);
}
} }
impl NetworkProtocolHandler<SyncMessage> for EthSync { impl NetworkProtocolHandler<SyncMessage> for EthSync {