Backporting to beta (#4418)
* v1.5.1 * Disable notifications (#4243) * Fix wrong token handling (#4254) * Fixing wrong token displayed * Linting * Revert filtering out * Revert the revert * Don't panic on uknown git commit hash (#4231) * Additional logs for own transactions (#4278) * Integration with zgp whitelist contract (#4215) * zgp-transactions checker * polishing * rename + refactor * refuse-service-transactions cl option * fixed tests compilation * Renaming signAndSendTransaction to sendTransaction (#4351) * Fixed deadlock in external_url (#4354) * Fixing web3 in console (#4382) * Fixing estimate gas in case histogram is not available (#4387) * Restarting fetch client every now and then (#4399)
This commit is contained in:
committed by
Gav Wood
parent
b09cfe1885
commit
c7dbd87f8e
@@ -96,6 +96,7 @@ use ethcore::header::{BlockNumber, Header as BlockHeader};
|
||||
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, BlockImportError, BlockQueueInfo};
|
||||
use ethcore::error::*;
|
||||
use ethcore::snapshot::{ManifestData, RestorationStatus};
|
||||
use ethcore::transaction::PendingTransaction;
|
||||
use sync_io::SyncIo;
|
||||
use time;
|
||||
use super::SyncConfig;
|
||||
@@ -1959,7 +1960,46 @@ impl ChainSync {
|
||||
return 0;
|
||||
}
|
||||
|
||||
let all_transactions_hashes = transactions.iter().map(|tx| tx.transaction.hash()).collect::<HashSet<H256>>();
|
||||
let (transactions, service_transactions): (Vec<_>, Vec<_>) = transactions.into_iter()
|
||||
.partition(|tx| !tx.transaction.gas_price.is_zero());
|
||||
|
||||
// usual transactions could be propagated to all peers
|
||||
let mut affected_peers = HashSet::new();
|
||||
if !transactions.is_empty() {
|
||||
let peers = self.select_peers_for_transactions(|_| true);
|
||||
affected_peers = self.propagate_transactions_to_peers(io, peers, transactions);
|
||||
}
|
||||
|
||||
// most of times service_transactions will be empty
|
||||
// => there's no need to merge packets
|
||||
if !service_transactions.is_empty() {
|
||||
let service_transactions_peers = self.select_peers_for_transactions(|peer_id| accepts_service_transaction(&io.peer_info(*peer_id)));
|
||||
let service_transactions_affected_peers = self.propagate_transactions_to_peers(io, service_transactions_peers, service_transactions);
|
||||
affected_peers.extend(&service_transactions_affected_peers);
|
||||
}
|
||||
|
||||
affected_peers.len()
|
||||
}
|
||||
|
||||
fn select_peers_for_transactions<F>(&self, filter: F) -> Vec<PeerId>
|
||||
where F: Fn(&PeerId) -> bool {
|
||||
// sqrt(x)/x scaled to max u32
|
||||
let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32;
|
||||
let small = self.peers.len() < MIN_PEERS_PROPAGATION;
|
||||
|
||||
let mut random = random::new();
|
||||
self.peers.keys()
|
||||
.cloned()
|
||||
.filter(filter)
|
||||
.filter(|_| small || random.next_u32() < fraction)
|
||||
.take(MAX_PEERS_PROPAGATION)
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn propagate_transactions_to_peers(&mut self, io: &mut SyncIo, peers: Vec<PeerId>, transactions: Vec<PendingTransaction>) -> HashSet<PeerId> {
|
||||
let all_transactions_hashes = transactions.iter()
|
||||
.map(|tx| tx.transaction.hash())
|
||||
.collect::<HashSet<H256>>();
|
||||
let all_transactions_rlp = {
|
||||
let mut packet = RlpStream::new_list(transactions.len());
|
||||
for tx in &transactions { packet.append(&tx.transaction); }
|
||||
@@ -1970,26 +2010,24 @@ impl ChainSync {
|
||||
self.transactions_stats.retain(&all_transactions_hashes);
|
||||
|
||||
// sqrt(x)/x scaled to max u32
|
||||
let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32;
|
||||
let small = self.peers.len() < MIN_PEERS_PROPAGATION;
|
||||
let block_number = io.chain().chain_info().best_block_number;
|
||||
|
||||
let mut random = random::new();
|
||||
let lucky_peers = {
|
||||
let stats = &mut self.transactions_stats;
|
||||
self.peers.iter_mut()
|
||||
.filter(|_| small || random.next_u32() < fraction)
|
||||
.take(MAX_PEERS_PROPAGATION)
|
||||
.filter_map(|(peer_id, mut peer_info)| {
|
||||
peers.into_iter()
|
||||
.filter_map(|peer_id| {
|
||||
let stats = &mut self.transactions_stats;
|
||||
let peer_info = self.peers.get_mut(&peer_id)
|
||||
.expect("peer_id is form peers; peers is result of select_peers_for_transactions; select_peers_for_transactions selects peers from self.peers; qed");
|
||||
|
||||
// Send all transactions
|
||||
if peer_info.last_sent_transactions.is_empty() {
|
||||
// update stats
|
||||
for hash in &all_transactions_hashes {
|
||||
let id = io.peer_session_info(*peer_id).and_then(|info| info.id);
|
||||
let id = io.peer_session_info(peer_id).and_then(|info| info.id);
|
||||
stats.propagated(*hash, id, block_number);
|
||||
}
|
||||
peer_info.last_sent_transactions = all_transactions_hashes.clone();
|
||||
return Some((*peer_id, all_transactions_hashes.len(), all_transactions_rlp.clone()));
|
||||
return Some((peer_id, all_transactions_hashes.len(), all_transactions_rlp.clone()));
|
||||
}
|
||||
|
||||
// Get hashes of all transactions to send to this peer
|
||||
@@ -2007,7 +2045,7 @@ impl ChainSync {
|
||||
if to_send.contains(&tx.transaction.hash()) {
|
||||
packet.append(&tx.transaction);
|
||||
// update stats
|
||||
let id = io.peer_session_info(*peer_id).and_then(|info| info.id);
|
||||
let id = io.peer_session_info(peer_id).and_then(|info| info.id);
|
||||
stats.propagated(tx.transaction.hash(), id, block_number);
|
||||
}
|
||||
}
|
||||
@@ -2017,22 +2055,25 @@ impl ChainSync {
|
||||
.chain(&to_send)
|
||||
.cloned()
|
||||
.collect();
|
||||
Some((*peer_id, to_send.len(), packet.out()))
|
||||
Some((peer_id, to_send.len(), packet.out()))
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
// Send RLPs
|
||||
let peers = lucky_peers.len();
|
||||
if peers > 0 {
|
||||
let mut peers = HashSet::new();
|
||||
if lucky_peers.len() > 0 {
|
||||
let mut max_sent = 0;
|
||||
let lucky_peers_len = lucky_peers.len();
|
||||
for (peer_id, sent, rlp) in lucky_peers {
|
||||
peers.insert(peer_id);
|
||||
self.send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp);
|
||||
trace!(target: "sync", "{:02} <- Transactions ({} entries)", peer_id, sent);
|
||||
max_sent = max(max_sent, sent);
|
||||
}
|
||||
debug!(target: "sync", "Sent up to {} transactions to {} peers.", max_sent, peers);
|
||||
debug!(target: "sync", "Sent up to {} transactions to {} peers.", max_sent, lucky_peers_len);
|
||||
}
|
||||
|
||||
peers
|
||||
}
|
||||
|
||||
@@ -2119,12 +2160,30 @@ impl ChainSync {
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks if peer is able to process service transactions
|
||||
fn accepts_service_transaction(client_id: &str) -> bool {
|
||||
// Parity versions starting from this will accept service-transactions
|
||||
const SERVICE_TRANSACTIONS_VERSION: (u32, u32) = (1u32, 6u32);
|
||||
// Parity client string prefix
|
||||
const PARITY_CLIENT_ID_PREFIX: &'static str = "Parity/v";
|
||||
|
||||
if !client_id.starts_with(PARITY_CLIENT_ID_PREFIX) {
|
||||
return false;
|
||||
}
|
||||
let ver: Vec<u32> = client_id[PARITY_CLIENT_ID_PREFIX.len()..].split('.')
|
||||
.take(2)
|
||||
.filter_map(|s| s.parse().ok())
|
||||
.collect();
|
||||
ver.len() == 2 && (ver[0] > SERVICE_TRANSACTIONS_VERSION.0 || (ver[0] == SERVICE_TRANSACTIONS_VERSION.0 && ver[1] >= SERVICE_TRANSACTIONS_VERSION.1))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::{HashSet, VecDeque};
|
||||
use network::PeerId;
|
||||
use tests::helpers::*;
|
||||
use tests::snapshot::TestSnapshotService;
|
||||
use util::{U256, RwLock};
|
||||
use util::{Uint, U256, RwLock};
|
||||
use util::sha3::Hashable;
|
||||
use util::hash::{H256, FixedHash};
|
||||
use util::bytes::Bytes;
|
||||
@@ -2134,6 +2193,7 @@ mod tests {
|
||||
use super::{PeerInfo, PeerAsking};
|
||||
use ethcore::header::*;
|
||||
use ethcore::client::*;
|
||||
use ethcore::transaction::SignedTransaction;
|
||||
use ethcore::miner::MinerService;
|
||||
|
||||
fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes {
|
||||
@@ -2359,7 +2419,12 @@ mod tests {
|
||||
|
||||
fn dummy_sync_with_peer(peer_latest_hash: H256, client: &BlockChainClient) -> ChainSync {
|
||||
let mut sync = ChainSync::new(SyncConfig::default(), client);
|
||||
sync.peers.insert(0,
|
||||
insert_dummy_peer(&mut sync, 0, peer_latest_hash);
|
||||
sync
|
||||
}
|
||||
|
||||
fn insert_dummy_peer(sync: &mut ChainSync, peer_id: PeerId, peer_latest_hash: H256) {
|
||||
sync.peers.insert(peer_id,
|
||||
PeerInfo {
|
||||
protocol_version: 0,
|
||||
genesis: H256::zero(),
|
||||
@@ -2378,7 +2443,7 @@ mod tests {
|
||||
asking_snapshot_data: None,
|
||||
block_set: None,
|
||||
});
|
||||
sync
|
||||
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -2630,6 +2695,79 @@ mod tests {
|
||||
assert_eq!(stats.len(), 1, "Should maintain stats for single transaction.")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_propagate_service_transaction_to_selected_peers_only() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.insert_transaction_with_gas_price_to_queue(U256::zero());
|
||||
let block_hash = client.block_hash_delta_minus(1);
|
||||
let mut sync = ChainSync::new(SyncConfig::default(), &client);
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
|
||||
// when peer#1 is Geth
|
||||
insert_dummy_peer(&mut sync, 1, block_hash);
|
||||
io.peers_info.insert(1, "Geth".to_owned());
|
||||
// and peer#2 is Parity, accepting service transactions
|
||||
insert_dummy_peer(&mut sync, 2, block_hash);
|
||||
io.peers_info.insert(2, "Parity/v1.6".to_owned());
|
||||
// and peer#3 is Parity, discarding service transactions
|
||||
insert_dummy_peer(&mut sync, 3, block_hash);
|
||||
io.peers_info.insert(3, "Parity/v1.5".to_owned());
|
||||
// and peer#4 is Parity, accepting service transactions
|
||||
insert_dummy_peer(&mut sync, 4, block_hash);
|
||||
io.peers_info.insert(4, "Parity/v1.7.3-ABCDEFGH".to_owned());
|
||||
|
||||
// and new service transaction is propagated to peers
|
||||
sync.propagate_new_transactions(&mut io);
|
||||
|
||||
// peer#2 && peer#4 are receiving service transaction
|
||||
assert!(io.packets.iter().any(|p| p.packet_id == 0x02 && p.recipient == 2)); // TRANSACTIONS_PACKET
|
||||
assert!(io.packets.iter().any(|p| p.packet_id == 0x02 && p.recipient == 4)); // TRANSACTIONS_PACKET
|
||||
assert_eq!(io.packets.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_propagate_service_transaction_is_sent_as_separate_message() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
let tx1_hash = client.insert_transaction_to_queue();
|
||||
let tx2_hash = client.insert_transaction_with_gas_price_to_queue(U256::zero());
|
||||
let block_hash = client.block_hash_delta_minus(1);
|
||||
let mut sync = ChainSync::new(SyncConfig::default(), &client);
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
|
||||
// when peer#1 is Parity, accepting service transactions
|
||||
insert_dummy_peer(&mut sync, 1, block_hash);
|
||||
io.peers_info.insert(1, "Parity/v1.6".to_owned());
|
||||
|
||||
// and service + non-service transactions are propagated to peers
|
||||
sync.propagate_new_transactions(&mut io);
|
||||
|
||||
// two separate packets for peer are queued:
|
||||
// 1) with non-service-transaction
|
||||
// 2) with service transaction
|
||||
let sent_transactions: Vec<SignedTransaction> = io.packets.iter()
|
||||
.filter_map(|p| {
|
||||
if p.packet_id != 0x02 || p.recipient != 1 { // TRANSACTIONS_PACKET
|
||||
return None;
|
||||
}
|
||||
|
||||
let rlp = UntrustedRlp::new(&*p.data);
|
||||
let item_count = rlp.item_count();
|
||||
if item_count != 1 {
|
||||
return None;
|
||||
}
|
||||
|
||||
rlp.at(0).ok().and_then(|r| r.as_val().ok())
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(sent_transactions.len(), 2);
|
||||
assert!(sent_transactions.iter().any(|tx| tx.hash() == tx1_hash));
|
||||
assert!(sent_transactions.iter().any(|tx| tx.hash() == tx2_hash));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handles_peer_new_block_malformed() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
|
||||
@@ -49,6 +49,7 @@ pub struct TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
|
||||
pub sender: Option<PeerId>,
|
||||
pub to_disconnect: HashSet<PeerId>,
|
||||
pub packets: Vec<TestPacket>,
|
||||
pub peers_info: HashMap<PeerId, String>,
|
||||
overlay: RwLock<HashMap<BlockNumber, Bytes>>,
|
||||
}
|
||||
|
||||
@@ -62,6 +63,7 @@ impl<'p, C> TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
|
||||
to_disconnect: HashSet::new(),
|
||||
overlay: RwLock::new(HashMap::new()),
|
||||
packets: Vec::new(),
|
||||
peers_info: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -111,6 +113,12 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
|
||||
&*self.chain
|
||||
}
|
||||
|
||||
fn peer_info(&self, peer_id: PeerId) -> String {
|
||||
self.peers_info.get(&peer_id)
|
||||
.cloned()
|
||||
.unwrap_or_else(|| peer_id.to_string())
|
||||
}
|
||||
|
||||
fn snapshot_service(&self) -> &SnapshotService {
|
||||
self.snapshot_service
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user