|
|
|
|
@@ -88,6 +88,7 @@
|
|
|
|
|
///
|
|
|
|
|
|
|
|
|
|
use util::*;
|
|
|
|
|
use rlp::*;
|
|
|
|
|
use network::*;
|
|
|
|
|
use std::mem::{replace};
|
|
|
|
|
use ethcore::views::{HeaderView, BlockView};
|
|
|
|
|
@@ -156,7 +157,7 @@ pub enum SyncState {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Syncing status and statistics
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
#[derive(Clone, Copy)]
|
|
|
|
|
pub struct SyncStatus {
|
|
|
|
|
/// State
|
|
|
|
|
pub state: SyncState,
|
|
|
|
|
@@ -241,7 +242,9 @@ struct PeerInfo {
|
|
|
|
|
asking_hash: Option<H256>,
|
|
|
|
|
/// Request timestamp
|
|
|
|
|
ask_time: f64,
|
|
|
|
|
/// Pending request is expird and result should be ignored
|
|
|
|
|
/// Holds a set of transactions recently sent to this peer to avoid spamming.
|
|
|
|
|
last_sent_transactions: HashSet<H256>,
|
|
|
|
|
/// Pending request is expired and result should be ignored
|
|
|
|
|
expired: bool,
|
|
|
|
|
/// Peer fork confirmation status
|
|
|
|
|
confirmation: ForkConfirmation,
|
|
|
|
|
@@ -406,6 +409,7 @@ impl ChainSync {
|
|
|
|
|
asking_blocks: Vec::new(),
|
|
|
|
|
asking_hash: None,
|
|
|
|
|
ask_time: 0f64,
|
|
|
|
|
last_sent_transactions: HashSet::new(),
|
|
|
|
|
expired: false,
|
|
|
|
|
confirmation: if self.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed },
|
|
|
|
|
};
|
|
|
|
|
@@ -1447,42 +1451,67 @@ impl ChainSync {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// propagates new transactions to all peers
|
|
|
|
|
fn propagate_new_transactions(&mut self, io: &mut SyncIo) -> usize {
|
|
|
|
|
pub fn propagate_new_transactions(&mut self, io: &mut SyncIo) -> usize {
|
|
|
|
|
|
|
|
|
|
// Early out of nobody to send to.
|
|
|
|
|
if self.peers.is_empty() {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let mut transactions = io.chain().pending_transactions();
|
|
|
|
|
let transactions = io.chain().pending_transactions();
|
|
|
|
|
if transactions.is_empty() {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let mut packet = RlpStream::new_list(transactions.len());
|
|
|
|
|
let tx_count = transactions.len();
|
|
|
|
|
for tx in transactions.drain(..) {
|
|
|
|
|
packet.append(&tx);
|
|
|
|
|
}
|
|
|
|
|
let rlp = packet.out();
|
|
|
|
|
|
|
|
|
|
let lucky_peers = {
|
|
|
|
|
// sqrt(x)/x scaled to max u32
|
|
|
|
|
let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32;
|
|
|
|
|
let small = self.peers.len() < MIN_PEERS_PROPAGATION;
|
|
|
|
|
let lucky_peers = self.peers.iter()
|
|
|
|
|
.filter_map(|(&p, _)| if small || ::rand::random::<u32>() < fraction { Some(p.clone()) } else { None })
|
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
|
|
|
|
|
|
// taking at max of MAX_PEERS_PROPAGATION
|
|
|
|
|
lucky_peers.iter().cloned().take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).collect::<Vec<PeerId>>()
|
|
|
|
|
let all_transactions_hashes = transactions.iter().map(|ref tx| tx.hash()).collect::<HashSet<H256>>();
|
|
|
|
|
let all_transactions_rlp = {
|
|
|
|
|
let mut packet = RlpStream::new_list(transactions.len());
|
|
|
|
|
for tx in &transactions { packet.append(tx); }
|
|
|
|
|
packet.out()
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// sqrt(x)/x scaled to max u32
|
|
|
|
|
let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32;
|
|
|
|
|
let small = self.peers.len() < MIN_PEERS_PROPAGATION;
|
|
|
|
|
|
|
|
|
|
let lucky_peers = self.peers.iter_mut()
|
|
|
|
|
.filter(|_| small || ::rand::random::<u32>() < fraction)
|
|
|
|
|
.take(MAX_PEERS_PROPAGATION)
|
|
|
|
|
.filter_map(|(peer_id, mut peer_info)| {
|
|
|
|
|
// Send all transactions
|
|
|
|
|
if peer_info.last_sent_transactions.is_empty() {
|
|
|
|
|
peer_info.last_sent_transactions = all_transactions_hashes.clone();
|
|
|
|
|
return Some((*peer_id, all_transactions_rlp.clone()));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Get hashes of all transactions to send to this peer
|
|
|
|
|
let to_send = all_transactions_hashes.difference(&peer_info.last_sent_transactions).cloned().collect::<HashSet<_>>();
|
|
|
|
|
if to_send.is_empty() {
|
|
|
|
|
return None;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Construct RLP
|
|
|
|
|
let mut packet = RlpStream::new_list(to_send.len());
|
|
|
|
|
for tx in &transactions {
|
|
|
|
|
if to_send.contains(&tx.hash()) {
|
|
|
|
|
packet.append(tx);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
peer_info.last_sent_transactions = all_transactions_hashes.clone();
|
|
|
|
|
Some((*peer_id, packet.out()))
|
|
|
|
|
})
|
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
|
|
|
|
|
|
// Send RLPs
|
|
|
|
|
let sent = lucky_peers.len();
|
|
|
|
|
for peer_id in lucky_peers {
|
|
|
|
|
self.send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp.clone());
|
|
|
|
|
if sent > 0 {
|
|
|
|
|
for (peer_id, rlp) in lucky_peers.into_iter() {
|
|
|
|
|
self.send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
trace!(target: "sync", "Sent up to {} transactions to {} peers.", transactions.len(), sent);
|
|
|
|
|
}
|
|
|
|
|
trace!(target: "sync", "Sent {} transactions to {} peers.", tx_count, sent);
|
|
|
|
|
sent
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -1512,16 +1541,18 @@ impl ChainSync {
|
|
|
|
|
self.check_resume(io);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// called when block is imported to chain, updates transactions queue and propagates the blocks
|
|
|
|
|
/// called when block is imported to chain - propagates the blocks and updates transactions sent to peers
|
|
|
|
|
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256], sealed: &[H256]) {
|
|
|
|
|
if io.is_chain_queue_empty() {
|
|
|
|
|
// Propagate latests blocks
|
|
|
|
|
self.propagate_latest_blocks(io, sealed);
|
|
|
|
|
}
|
|
|
|
|
if !invalid.is_empty() {
|
|
|
|
|
trace!(target: "sync", "Bad blocks in the queue, restarting");
|
|
|
|
|
self.restart_on_bad_block(io);
|
|
|
|
|
}
|
|
|
|
|
for peer_info in self.peers.values_mut() {
|
|
|
|
|
peer_info.last_sent_transactions.clear();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -1531,6 +1562,7 @@ mod tests {
|
|
|
|
|
use super::*;
|
|
|
|
|
use ::SyncConfig;
|
|
|
|
|
use util::*;
|
|
|
|
|
use rlp::*;
|
|
|
|
|
use super::{PeerInfo, PeerAsking};
|
|
|
|
|
use ethcore::views::BlockView;
|
|
|
|
|
use ethcore::header::*;
|
|
|
|
|
@@ -1548,8 +1580,8 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
let mut rlp = RlpStream::new_list(3);
|
|
|
|
|
rlp.append(&header);
|
|
|
|
|
rlp.append_raw(&rlp::EMPTY_LIST_RLP, 1);
|
|
|
|
|
rlp.append_raw(&rlp::EMPTY_LIST_RLP, 1);
|
|
|
|
|
rlp.append_raw(&::rlp::EMPTY_LIST_RLP, 1);
|
|
|
|
|
rlp.append_raw(&::rlp::EMPTY_LIST_RLP, 1);
|
|
|
|
|
rlp.out()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -1723,6 +1755,7 @@ mod tests {
|
|
|
|
|
asking_blocks: Vec::new(),
|
|
|
|
|
asking_hash: None,
|
|
|
|
|
ask_time: 0f64,
|
|
|
|
|
last_sent_transactions: HashSet::new(),
|
|
|
|
|
expired: false,
|
|
|
|
|
confirmation: super::ForkConfirmation::Confirmed,
|
|
|
|
|
});
|
|
|
|
|
@@ -1819,6 +1852,83 @@ mod tests {
|
|
|
|
|
assert_eq!(0x07, io.queue[0].packet_id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn propagates_transactions() {
|
|
|
|
|
let mut client = TestBlockChainClient::new();
|
|
|
|
|
client.add_blocks(100, EachBlockWith::Uncle);
|
|
|
|
|
client.insert_transaction_to_queue();
|
|
|
|
|
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
|
|
|
|
|
let mut queue = VecDeque::new();
|
|
|
|
|
let mut io = TestIo::new(&mut client, &mut queue, None);
|
|
|
|
|
let peer_count = sync.propagate_new_transactions(&mut io);
|
|
|
|
|
// Try to propagate same transactions for the second time
|
|
|
|
|
let peer_count2 = sync.propagate_new_transactions(&mut io);
|
|
|
|
|
|
|
|
|
|
// 1 message should be send
|
|
|
|
|
assert_eq!(1, io.queue.len());
|
|
|
|
|
// 1 peer should be updated but only once
|
|
|
|
|
assert_eq!(1, peer_count);
|
|
|
|
|
assert_eq!(0, peer_count2);
|
|
|
|
|
// TRANSACTIONS_PACKET
|
|
|
|
|
assert_eq!(0x02, io.queue[0].packet_id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn propagates_transactions_again_after_new_block() {
|
|
|
|
|
let mut client = TestBlockChainClient::new();
|
|
|
|
|
client.add_blocks(100, EachBlockWith::Uncle);
|
|
|
|
|
client.insert_transaction_to_queue();
|
|
|
|
|
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
|
|
|
|
|
let mut queue = VecDeque::new();
|
|
|
|
|
let mut io = TestIo::new(&mut client, &mut queue, None);
|
|
|
|
|
let peer_count = sync.propagate_new_transactions(&mut io);
|
|
|
|
|
sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[]);
|
|
|
|
|
// Try to propagate same transactions for the second time
|
|
|
|
|
let peer_count2 = sync.propagate_new_transactions(&mut io);
|
|
|
|
|
|
|
|
|
|
// 2 message should be send
|
|
|
|
|
assert_eq!(2, io.queue.len());
|
|
|
|
|
// 1 peer should be updated twice
|
|
|
|
|
assert_eq!(1, peer_count);
|
|
|
|
|
assert_eq!(1, peer_count2);
|
|
|
|
|
// TRANSACTIONS_PACKET
|
|
|
|
|
assert_eq!(0x02, io.queue[0].packet_id);
|
|
|
|
|
assert_eq!(0x02, io.queue[1].packet_id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn propagates_transactions_without_alternating() {
|
|
|
|
|
let mut client = TestBlockChainClient::new();
|
|
|
|
|
client.add_blocks(100, EachBlockWith::Uncle);
|
|
|
|
|
client.insert_transaction_to_queue();
|
|
|
|
|
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
|
|
|
|
|
let mut queue = VecDeque::new();
|
|
|
|
|
// should sent some
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
let mut io = TestIo::new(&mut client, &mut queue, None);
|
|
|
|
|
let peer_count = sync.propagate_new_transactions(&mut io);
|
|
|
|
|
assert_eq!(1, io.queue.len());
|
|
|
|
|
assert_eq!(1, peer_count);
|
|
|
|
|
}
|
|
|
|
|
// Insert some more
|
|
|
|
|
client.insert_transaction_to_queue();
|
|
|
|
|
let mut io = TestIo::new(&mut client, &mut queue, None);
|
|
|
|
|
// Propagate new transactions
|
|
|
|
|
let peer_count2 = sync.propagate_new_transactions(&mut io);
|
|
|
|
|
// And now the peer should have all transactions
|
|
|
|
|
let peer_count3 = sync.propagate_new_transactions(&mut io);
|
|
|
|
|
|
|
|
|
|
// 2 message should be send (in total)
|
|
|
|
|
assert_eq!(2, io.queue.len());
|
|
|
|
|
// 1 peer should be updated but only once after inserting new transaction
|
|
|
|
|
assert_eq!(1, peer_count2);
|
|
|
|
|
assert_eq!(0, peer_count3);
|
|
|
|
|
// TRANSACTIONS_PACKET
|
|
|
|
|
assert_eq!(0x02, io.queue[0].packet_id);
|
|
|
|
|
assert_eq!(0x02, io.queue[1].packet_id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn handles_peer_new_block_malformed() {
|
|
|
|
|
let mut client = TestBlockChainClient::new();
|
|
|
|
|
|