Propagate transactions for next 4 blocks. (#9265)

Closes #9255 

This PR also removes the limit of max 64 transactions per packet, currently we only attempt to prevent the packet size to go over 8MB. This will only be the case for super-large transactions or high-block-gas-limit chains.

Patching this is important only for chains that have blocks that can fit more than 4k transactions (over 86M block gas limit)

For mainnet, we should actually see a tiny bit faster propagation since instead of computing 4k pending set, we only need `4 * 8M / 21k = 1523` transactions.

Running some tests on `dekompile` node right now, to check how it performs in the wild.
This commit is contained in:
Tomasz Drwięga 2018-08-02 12:58:02 +02:00 committed by André Silva
parent b4ae1b6528
commit 90d7823acb
9 changed files with 37 additions and 32 deletions

View File

@ -70,9 +70,6 @@ const PROPAGATE_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3; const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3;
const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60); const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60);
/// Max number of transactions in a single packet.
const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64;
// minimum interval between updates. // minimum interval between updates.
const UPDATE_INTERVAL: Duration = Duration::from_millis(5000); const UPDATE_INTERVAL: Duration = Duration::from_millis(5000);
@ -648,7 +645,7 @@ impl LightProtocol {
fn propagate_transactions(&self, io: &IoContext) { fn propagate_transactions(&self, io: &IoContext) {
if self.capabilities.read().tx_relay { return } if self.capabilities.read().tx_relay { return }
let ready_transactions = self.provider.ready_transactions(MAX_TRANSACTIONS_TO_PROPAGATE); let ready_transactions = self.provider.transactions_to_propagate();
if ready_transactions.is_empty() { return } if ready_transactions.is_empty() { return }
trace!(target: "pip", "propagate transactions: {} ready", ready_transactions.len()); trace!(target: "pip", "propagate transactions: {} ready", ready_transactions.len());

View File

@ -171,8 +171,8 @@ impl Provider for TestProvider {
}) })
} }
fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction> { fn transactions_to_propagate(&self) -> Vec<PendingTransaction> {
self.0.client.ready_transactions(max_len) self.0.client.transactions_to_propagate()
} }
} }

View File

@ -128,7 +128,7 @@ pub trait Provider: Send + Sync {
fn header_proof(&self, req: request::CompleteHeaderProofRequest) -> Option<request::HeaderProofResponse>; fn header_proof(&self, req: request::CompleteHeaderProofRequest) -> Option<request::HeaderProofResponse>;
/// Provide pending transactions. /// Provide pending transactions.
fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction>; fn transactions_to_propagate(&self) -> Vec<PendingTransaction>;
/// Provide a proof-of-execution for the given transaction proof request. /// Provide a proof-of-execution for the given transaction proof request.
/// Returns a vector of all state items necessary to execute the transaction. /// Returns a vector of all state items necessary to execute the transaction.
@ -283,8 +283,8 @@ impl<T: ProvingBlockChainClient + ?Sized> Provider for T {
.map(|(_, proof)| ::request::ExecutionResponse { items: proof }) .map(|(_, proof)| ::request::ExecutionResponse { items: proof })
} }
fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction> { fn transactions_to_propagate(&self) -> Vec<PendingTransaction> {
BlockChainClient::ready_transactions(self, max_len) BlockChainClient::transactions_to_propagate(self)
.into_iter() .into_iter()
.map(|tx| tx.pending().clone()) .map(|tx| tx.pending().clone())
.collect() .collect()
@ -370,12 +370,10 @@ impl<L: AsLightClient + Send + Sync> Provider for LightProvider<L> {
None None
} }
fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction> { fn transactions_to_propagate(&self) -> Vec<PendingTransaction> {
let chain_info = self.chain_info(); let chain_info = self.chain_info();
let mut transactions = self.txqueue.read() self.txqueue.read()
.ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp); .ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp)
transactions.truncate(max_len);
transactions
} }
} }

View File

@ -15,6 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{HashSet, BTreeMap, VecDeque}; use std::collections::{HashSet, BTreeMap, VecDeque};
use std::cmp;
use std::fmt; use std::fmt;
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
@ -1945,7 +1946,25 @@ impl BlockChainClient for Client {
(*self.build_last_hashes(&self.chain.read().best_block_hash())).clone() (*self.build_last_hashes(&self.chain.read().best_block_hash())).clone()
} }
fn ready_transactions(&self, max_len: usize) -> Vec<Arc<VerifiedTransaction>> { fn transactions_to_propagate(&self) -> Vec<Arc<VerifiedTransaction>> {
const PROPAGATE_FOR_BLOCKS: u32 = 4;
const MIN_TX_TO_PROPAGATE: usize = 256;
let block_gas_limit = *self.best_block_header().gas_limit();
let min_tx_gas: U256 = self.latest_schedule().tx_gas.into();
let max_len = if min_tx_gas.is_zero() {
usize::max_value()
} else {
cmp::max(
MIN_TX_TO_PROPAGATE,
cmp::min(
(block_gas_limit / min_tx_gas) * PROPAGATE_FOR_BLOCKS,
// never more than usize
usize::max_value().into()
).as_u64() as usize
)
};
self.importer.miner.ready_transactions(self, max_len, ::miner::PendingOrdering::Priority) self.importer.miner.ready_transactions(self, max_len, ::miner::PendingOrdering::Priority)
} }

View File

@ -807,8 +807,8 @@ impl BlockChainClient for TestBlockChainClient {
self.traces.read().clone() self.traces.read().clone()
} }
fn ready_transactions(&self, max_len: usize) -> Vec<Arc<VerifiedTransaction>> { fn transactions_to_propagate(&self) -> Vec<Arc<VerifiedTransaction>> {
self.miner.ready_transactions(self, max_len, miner::PendingOrdering::Priority) self.miner.ready_transactions(self, 4096, miner::PendingOrdering::Priority)
} }
fn signing_chain_id(&self) -> Option<u64> { None } fn signing_chain_id(&self) -> Option<u64> { None }

View File

@ -321,8 +321,8 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra
/// Get last hashes starting from best block. /// Get last hashes starting from best block.
fn last_hashes(&self) -> LastHashes; fn last_hashes(&self) -> LastHashes;
/// List all transactions that are allowed into the next block. /// List all ready transactions that should be propagated to other peers.
fn ready_transactions(&self, max_len: usize) -> Vec<Arc<VerifiedTransaction>>; fn transactions_to_propagate(&self) -> Vec<Arc<VerifiedTransaction>>;
/// Sorted list of transaction gas prices from at least last sample_size blocks. /// Sorted list of transaction gas prices from at least last sample_size blocks.
fn gas_price_corpus(&self, sample_size: usize) -> ::stats::Corpus<U256> { fn gas_price_corpus(&self, sample_size: usize) -> ::stats::Corpus<U256> {

View File

@ -316,11 +316,11 @@ fn does_not_propagate_delayed_transactions() {
client.miner().import_own_transaction(&*client, tx0).unwrap(); client.miner().import_own_transaction(&*client, tx0).unwrap();
client.miner().import_own_transaction(&*client, tx1).unwrap(); client.miner().import_own_transaction(&*client, tx1).unwrap();
assert_eq!(0, client.ready_transactions(10).len()); assert_eq!(0, client.transactions_to_propagate().len());
assert_eq!(0, client.miner().ready_transactions(&*client, 10, PendingOrdering::Priority).len()); assert_eq!(0, client.miner().ready_transactions(&*client, 10, PendingOrdering::Priority).len());
push_blocks_to_client(&client, 53, 2, 2); push_blocks_to_client(&client, 53, 2, 2);
client.flush_queue(); client.flush_queue();
assert_eq!(2, client.ready_transactions(10).len()); assert_eq!(2, client.transactions_to_propagate().len());
assert_eq!(2, client.miner().ready_transactions(&*client, 10, PendingOrdering::Priority).len()); assert_eq!(2, client.miner().ready_transactions(&*client, 10, PendingOrdering::Priority).len());
} }

View File

@ -149,12 +149,6 @@ const MAX_NEW_HASHES: usize = 64;
const MAX_NEW_BLOCK_AGE: BlockNumber = 20; const MAX_NEW_BLOCK_AGE: BlockNumber = 20;
// maximal packet size with transactions (cannot be greater than 16MB - protocol limitation). // maximal packet size with transactions (cannot be greater than 16MB - protocol limitation).
const MAX_TRANSACTION_PACKET_SIZE: usize = 8 * 1024 * 1024; const MAX_TRANSACTION_PACKET_SIZE: usize = 8 * 1024 * 1024;
// Maximal number of transactions queried from miner to propagate.
// This set is used to diff with transactions known by the peer and
// we will send a difference of length up to `MAX_TRANSACTIONS_TO_PROPAGATE`.
const MAX_TRANSACTIONS_TO_QUERY: usize = 4096;
// Maximal number of transactions in sent in single packet.
const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64;
// Min number of blocks to be behind for a snapshot sync // Min number of blocks to be behind for a snapshot sync
const SNAPSHOT_RESTORE_THRESHOLD: BlockNumber = 30000; const SNAPSHOT_RESTORE_THRESHOLD: BlockNumber = 30000;
const SNAPSHOT_MIN_PEERS: usize = 3; const SNAPSHOT_MIN_PEERS: usize = 3;

View File

@ -29,11 +29,9 @@ use transaction::SignedTransaction;
use super::{ use super::{
random, random,
ChainSync, ChainSync,
MAX_TRANSACTION_PACKET_SIZE,
MAX_PEER_LAG_PROPAGATION, MAX_PEER_LAG_PROPAGATION,
MAX_PEERS_PROPAGATION, MAX_PEERS_PROPAGATION,
MAX_TRANSACTION_PACKET_SIZE,
MAX_TRANSACTIONS_TO_PROPAGATE,
MAX_TRANSACTIONS_TO_QUERY,
MIN_PEERS_PROPAGATION, MIN_PEERS_PROPAGATION,
CONSENSUS_DATA_PACKET, CONSENSUS_DATA_PACKET,
NEW_BLOCK_HASHES_PACKET, NEW_BLOCK_HASHES_PACKET,
@ -121,7 +119,7 @@ impl SyncPropagator {
return 0; return 0;
} }
let transactions = io.chain().ready_transactions(MAX_TRANSACTIONS_TO_QUERY); let transactions = io.chain().transactions_to_propagate();
if transactions.is_empty() { if transactions.is_empty() {
return 0; return 0;
} }
@ -184,7 +182,6 @@ impl SyncPropagator {
// Get hashes of all transactions to send to this peer // Get hashes of all transactions to send to this peer
let to_send = all_transactions_hashes.difference(&peer_info.last_sent_transactions) let to_send = all_transactions_hashes.difference(&peer_info.last_sent_transactions)
.take(MAX_TRANSACTIONS_TO_PROPAGATE)
.cloned() .cloned()
.collect::<HashSet<_>>(); .collect::<HashSet<_>>();
if to_send.is_empty() { if to_send.is_empty() {