Propagating transactions to peers on timer. (#2035)

This commit is contained in:
Tomasz Drwięga 2016-09-01 19:07:58 +02:00 committed by Arkadiy Paronyan
parent 9a5668f802
commit cabb028c1c
3 changed files with 119 additions and 26 deletions

View File

@ -33,7 +33,7 @@ use receipt::{Receipt, LocalizedReceipt};
use blockchain::extras::BlockReceipts;
use error::{ImportResult};
use evm::{Factory as EvmFactory, VMType};
use miner::{Miner, MinerService};
use miner::{Miner, MinerService, TransactionImportResult};
use spec::Spec;
use block_queue::BlockQueueInfo;
@ -254,6 +254,24 @@ impl TestBlockChainClient {
BlockID::Latest | BlockID::Pending => self.numbers.read().get(&(self.numbers.read().len() - 1)).cloned()
}
}
/// Inserts a transaction to miners transactions queue.
pub fn insert_transaction_to_queue(&self) {
let keypair = Random.generate().unwrap();
let tx = Transaction {
action: Action::Create,
value: U256::from(100),
data: "3331600055".from_hex().unwrap(),
gas: U256::from(100_000),
gas_price: U256::one(),
nonce: U256::zero()
};
let signed_tx = tx.sign(keypair.secret());
self.set_balance(signed_tx.sender().unwrap(), 10_000_000.into());
let res = self.miner.import_external_transactions(self, vec![signed_tx]);
let res = res.into_iter().next().unwrap().expect("Successful import");
assert_eq!(res, TransactionImportResult::Current);
}
}
pub fn get_temp_journal_db() -> GuardedTempResult<Box<JournalDB>> {

View File

@ -119,6 +119,7 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
fn timeout(&self, io: &NetworkContext, _timer: TimerToken) {
self.sync.write().maintain_peers(&mut NetSyncIo::new(io, &*self.chain));
self.sync.write().maintain_sync(&mut NetSyncIo::new(io, &*self.chain));
self.sync.write().propagate_new_transactions(&mut NetSyncIo::new(io, &*self.chain));
}
}

View File

@ -241,7 +241,9 @@ struct PeerInfo {
asking_hash: Option<H256>,
/// Request timestamp
ask_time: f64,
/// Pending request is expird and result should be ignored
/// Holds a set of transactions recently sent to this peer to avoid spamming.
last_sent_transactions: HashSet<H256>,
/// Pending request is expired and result should be ignored
expired: bool,
/// Peer fork confirmation status
confirmation: ForkConfirmation,
@ -406,6 +408,7 @@ impl ChainSync {
asking_blocks: Vec::new(),
asking_hash: None,
ask_time: 0f64,
last_sent_transactions: HashSet::new(),
expired: false,
confirmation: if self.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed },
};
@ -1447,42 +1450,65 @@ impl ChainSync {
}
/// propagates new transactions to all peers
fn propagate_new_transactions(&mut self, io: &mut SyncIo) -> usize {
pub fn propagate_new_transactions(&mut self, io: &mut SyncIo) -> usize {
// Early out of nobody to send to.
if self.peers.is_empty() {
return 0;
}
let mut transactions = io.chain().pending_transactions();
let transactions = io.chain().pending_transactions();
if transactions.is_empty() {
return 0;
}
let all_transactions_hashes = transactions.iter().map(|ref tx| tx.hash()).collect::<HashSet<H256>>();
let all_transactions_rlp = {
let mut packet = RlpStream::new_list(transactions.len());
let tx_count = transactions.len();
for tx in transactions.drain(..) {
packet.append(&tx);
}
let rlp = packet.out();
for tx in &transactions { packet.append(tx); }
packet.out()
};
let lucky_peers = {
// sqrt(x)/x scaled to max u32
let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32;
let small = self.peers.len() < MIN_PEERS_PROPAGATION;
let lucky_peers = self.peers.iter()
.filter_map(|(&p, _)| if small || ::rand::random::<u32>() < fraction { Some(p.clone()) } else { None })
let lucky_peers = self.peers.iter_mut()
.filter(|_| small || ::rand::random::<u32>() < fraction)
.take(MAX_PEERS_PROPAGATION)
.filter_map(|(peer_id, mut peer_info)| {
// Send all transactions
if peer_info.last_sent_transactions.is_empty() {
peer_info.last_sent_transactions = all_transactions_hashes.clone();
return Some((*peer_id, all_transactions_rlp.clone()));
}
// Get hashes of all transactions to send to this peer
let to_send = all_transactions_hashes.difference(&peer_info.last_sent_transactions).cloned().collect::<HashSet<_>>();
if to_send.is_empty() {
return None;
}
// Construct RLP
let mut packet = RlpStream::new_list(to_send.len());
for tx in &transactions {
if to_send.contains(&tx.hash()) {
packet.append(tx);
}
}
peer_info.last_sent_transactions = to_send;
Some((*peer_id, packet.out()))
})
.collect::<Vec<_>>();
// taking at max of MAX_PEERS_PROPAGATION
lucky_peers.iter().cloned().take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).collect::<Vec<PeerId>>()
};
// Send RLPs
let sent = lucky_peers.len();
for peer_id in lucky_peers {
self.send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp.clone());
for (peer_id, rlp) in lucky_peers.into_iter() {
self.send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp);
}
trace!(target: "sync", "Sent {} transactions to {} peers.", tx_count, sent);
trace!(target: "sync", "Sent up to {} transactions to {} peers.", transactions.len(), sent);
sent
}
@ -1512,16 +1538,18 @@ impl ChainSync {
self.check_resume(io);
}
/// called when block is imported to chain, updates transactions queue and propagates the blocks
/// called when block is imported to chain - propagates the blocks and updates transactions sent to peers
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256], sealed: &[H256]) {
if io.is_chain_queue_empty() {
// Propagate latests blocks
self.propagate_latest_blocks(io, sealed);
}
if !invalid.is_empty() {
trace!(target: "sync", "Bad blocks in the queue, restarting");
self.restart_on_bad_block(io);
}
for peer_info in self.peers.values_mut() {
peer_info.last_sent_transactions.clear();
}
}
}
@ -1723,6 +1751,7 @@ mod tests {
asking_blocks: Vec::new(),
asking_hash: None,
ask_time: 0f64,
last_sent_transactions: HashSet::new(),
expired: false,
confirmation: super::ForkConfirmation::Confirmed,
});
@ -1819,6 +1848,51 @@ mod tests {
assert_eq!(0x07, io.queue[0].packet_id);
}
#[test]
fn propagates_transactions() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
client.insert_transaction_to_queue();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
let mut queue = VecDeque::new();
let mut io = TestIo::new(&mut client, &mut queue, None);
let peer_count = sync.propagate_new_transactions(&mut io);
// Try to propagate same transactions for the second time
let peer_count2 = sync.propagate_new_transactions(&mut io);
// 1 message should be send
assert_eq!(1, io.queue.len());
// 1 peer should be updated but only once
assert_eq!(1, peer_count);
assert_eq!(0, peer_count2);
// TRANSACTIONS_PACKET
assert_eq!(0x02, io.queue[0].packet_id);
}
#[test]
fn propagates_transactions_again_after_new_block() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
client.insert_transaction_to_queue();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
let mut queue = VecDeque::new();
let mut io = TestIo::new(&mut client, &mut queue, None);
let peer_count = sync.propagate_new_transactions(&mut io);
sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[]);
// Try to propagate same transactions for the second time
let peer_count2 = sync.propagate_new_transactions(&mut io);
// 1 message should be send
assert_eq!(2, io.queue.len());
// 1 peer should be updated but only once
assert_eq!(1, peer_count);
assert_eq!(1, peer_count2);
// TRANSACTIONS_PACKET
assert_eq!(0x02, io.queue[0].packet_id);
assert_eq!(0x02, io.queue[1].packet_id);
}
#[test]
fn handles_peer_new_block_malformed() {
let mut client = TestBlockChainClient::new();