Maintaining a list of transactions propagated from other peers

This commit is contained in:
Tomasz Drwięga 2016-12-10 14:56:41 +01:00
parent 6eb63a7316
commit e1ade5b375
12 changed files with 137 additions and 29 deletions

View File

@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use ipc::IpcConfig; use ipc::IpcConfig;
use util::H256; use util::{H256, H512};
/// Represents what has to be handled by actor listening to chain events /// Represents what has to be handled by actor listening to chain events
#[ipc] #[ipc]
@ -40,6 +40,15 @@ pub trait ChainNotify : Send + Sync {
fn stop(&self) { fn stop(&self) {
// does nothing by default // does nothing by default
} }
/// fires when new transactions are imported
fn transactions_imported(&self,
_hashes: Vec<H256>,
_peer_id: Option<H512>,
_block_num: u64,
) {
// does nothing by default
}
} }
impl IpcConfig for ChainNotify { } impl IpcConfig for ChainNotify { }

View File

@ -25,7 +25,7 @@ use time::precise_time_ns;
use util::{Bytes, PerfTimer, Itertools, Mutex, RwLock, Hashable}; use util::{Bytes, PerfTimer, Itertools, Mutex, RwLock, Hashable};
use util::{journaldb, TrieFactory, Trie}; use util::{journaldb, TrieFactory, Trie};
use util::trie::TrieSpec; use util::trie::TrieSpec;
use util::{U256, H256, Address, H2048, Uint, FixedHash}; use util::{U256, H256, H512, Address, H2048, Uint, FixedHash};
use util::kvdb::*; use util::kvdb::*;
// other // other
@ -559,11 +559,16 @@ impl Client {
} }
/// Import transactions from the IO queue /// Import transactions from the IO queue
pub fn import_queued_transactions(&self, transactions: &[Bytes]) -> usize { pub fn import_queued_transactions(&self, transactions: &[Bytes], peer_id: Option<H512>) -> usize {
trace!(target: "external_tx", "Importing queued"); trace!(target: "external_tx", "Importing queued");
let _timer = PerfTimer::new("import_queued_transactions"); let _timer = PerfTimer::new("import_queued_transactions");
self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst); self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst);
let txs = transactions.iter().filter_map(|bytes| UntrustedRlp::new(bytes).as_val().ok()).collect(); let txs: Vec<SignedTransaction> = transactions.iter().filter_map(|bytes| UntrustedRlp::new(bytes).as_val().ok()).collect();
let hashes: Vec<_> = txs.iter().map(|tx| tx.hash()).collect();
let block_number = self.chain_info().best_block_number;
self.notify(|notify| {
notify.transactions_imported(hashes.clone(), peer_id.clone(), block_number);
});
let results = self.miner.import_external_transactions(self, txs); let results = self.miner.import_external_transactions(self, txs);
results.len() results.len()
} }
@ -1264,14 +1269,14 @@ impl BlockChainClient for Client {
(*self.build_last_hashes(self.chain.read().best_block_hash())).clone() (*self.build_last_hashes(self.chain.read().best_block_hash())).clone()
} }
fn queue_transactions(&self, transactions: Vec<Bytes>) { fn queue_transactions(&self, transactions: Vec<Bytes>, node_id: Option<H512>) {
let queue_size = self.queue_transactions.load(AtomicOrdering::Relaxed); let queue_size = self.queue_transactions.load(AtomicOrdering::Relaxed);
trace!(target: "external_tx", "Queue size: {}", queue_size); trace!(target: "external_tx", "Queue size: {}", queue_size);
if queue_size > MAX_TX_QUEUE_SIZE { if queue_size > MAX_TX_QUEUE_SIZE {
debug!("Ignoring {} transactions: queue is full", transactions.len()); debug!("Ignoring {} transactions: queue is full", transactions.len());
} else { } else {
let len = transactions.len(); let len = transactions.len();
match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions)) { match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions, node_id)) {
Ok(_) => { Ok(_) => {
self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst); self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst);
} }

View File

@ -657,7 +657,7 @@ impl BlockChainClient for TestBlockChainClient {
unimplemented!(); unimplemented!();
} }
fn queue_transactions(&self, transactions: Vec<Bytes>) { fn queue_transactions(&self, transactions: Vec<Bytes>, _peer_id: Option<H512>) {
// import right here // import right here
let txs = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect(); let txs = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
self.miner.import_external_transactions(self, txs); self.miner.import_external_transactions(self, txs);

View File

@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::BTreeMap; use std::collections::BTreeMap;
use util::{U256, Address, H256, H2048, Bytes, Itertools}; use util::{U256, Address, H256, H512, H2048, Bytes, Itertools};
use util::stats::Histogram; use util::stats::Histogram;
use blockchain::TreeRoute; use blockchain::TreeRoute;
use verification::queue::QueueInfo as BlockQueueInfo; use verification::queue::QueueInfo as BlockQueueInfo;
@ -200,7 +200,7 @@ pub trait BlockChainClient : Sync + Send {
fn last_hashes(&self) -> LastHashes; fn last_hashes(&self) -> LastHashes;
/// Queue transactions for importing. /// Queue transactions for importing.
fn queue_transactions(&self, transactions: Vec<Bytes>); fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: Option<H512>);
/// list all transactions /// list all transactions
fn pending_transactions(&self) -> Vec<SignedTransaction>; fn pending_transactions(&self) -> Vec<SignedTransaction>;
@ -294,9 +294,9 @@ pub trait ProvingBlockChainClient: BlockChainClient {
/// The key is the keccak hash of the account's address. /// The key is the keccak hash of the account's address.
/// Returns a vector of raw trie nodes (in order from the root) proving the query. /// Returns a vector of raw trie nodes (in order from the root) proving the query.
/// Nodes after `from_level` may be omitted. /// Nodes after `from_level` may be omitted.
/// An empty vector indicates unservable query. /// An empty vector indicates unservable query.
fn prove_account(&self, key1: H256, from_level: u32, id: BlockID) -> Vec<Bytes>; fn prove_account(&self, key1: H256, from_level: u32, id: BlockID) -> Vec<Bytes>;
/// Get code by address hash. /// Get code by address hash.
fn code_by_hash(&self, account_key: H256, id: BlockID) -> Bytes; fn code_by_hash(&self, account_key: H256, id: BlockID) -> Bytes;
} }

View File

@ -39,7 +39,7 @@ pub enum ClientIoMessage {
/// A block is ready /// A block is ready
BlockVerified, BlockVerified,
/// New transaction RLPs are ready to be imported /// New transaction RLPs are ready to be imported
NewTransactions(Vec<Bytes>), NewTransactions(Vec<Bytes>, Option<H512>),
/// Begin snapshot restoration /// Begin snapshot restoration
BeginRestoration(ManifestData), BeginRestoration(ManifestData),
/// Feed a state chunk to the snapshot service /// Feed a state chunk to the snapshot service
@ -196,7 +196,9 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
match *net_message { match *net_message {
ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(); } ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(); }
ClientIoMessage::NewTransactions(ref transactions) => { self.client.import_queued_transactions(transactions); } ClientIoMessage::NewTransactions(ref transactions, ref peer_id) => {
self.client.import_queued_transactions(transactions, peer_id.clone());
}
ClientIoMessage::BeginRestoration(ref manifest) => { ClientIoMessage::BeginRestoration(ref manifest) => {
if let Err(e) = self.snapshot.init_restore(manifest.clone(), true) { if let Err(e) = self.snapshot.init_restore(manifest.clone(), true) {
warn!("Failed to initialize snapshot restoration: {}", e); warn!("Failed to initialize snapshot restoration: {}", e);

View File

@ -48,7 +48,6 @@ class BaseTransaction extends Component {
<IdentityIcon <IdentityIcon
address={ transaction.from } address={ transaction.from }
/> />
0x{ transaction.nonce.toString(16) }
</div> </div>
); );
} }
@ -87,6 +86,17 @@ class BaseTransaction extends Component {
</span> </span>
); );
} }
renderReceived (stats) {
const noOfPeers = Object.keys(stats.receivedFrom).length;
const noOfPropagations = Object.values(stats.receivedFrom).reduce((sum, val) => sum + val, 0);
return (
<span className={ styles.nowrap }>
{ noOfPropagations } ({ noOfPeers } peers)
</span>
);
}
} }
export class Transaction extends BaseTransaction { export class Transaction extends BaseTransaction {
@ -103,7 +113,8 @@ export class Transaction extends BaseTransaction {
isLocal: false, isLocal: false,
stats: { stats: {
firstSeen: 0, firstSeen: 0,
propagatedTo: {} propagatedTo: {},
receivedFrom: {}
} }
}; };
@ -129,6 +140,9 @@ export class Transaction extends BaseTransaction {
<th> <th>
# Propagated # Propagated
</th> </th>
<th>
# Received
</th>
<th /> <th />
</tr> </tr>
); );
@ -165,6 +179,9 @@ export class Transaction extends BaseTransaction {
<td> <td>
{ this.renderPropagation(stats) } { this.renderPropagation(stats) }
</td> </td>
<td>
{ this.renderReceived(stats) }
</td>
</tr> </tr>
); );
} }
@ -193,7 +210,8 @@ export class LocalTransaction extends BaseTransaction {
static defaultProps = { static defaultProps = {
stats: { stats: {
propagatedTo: {} propagatedTo: {},
receivedFrom: {}
} }
}; };
@ -317,6 +335,8 @@ export class LocalTransaction extends BaseTransaction {
{ this.renderStatus() } { this.renderStatus() }
<br /> <br />
{ status === 'pending' ? this.renderPropagation(stats) : null } { status === 'pending' ? this.renderPropagation(stats) : null }
<br />
{ status === 'pending' ? this.renderReceived(stats) : null }
</td> </td>
</tr> </tr>
); );

View File

@ -34,7 +34,7 @@ describe('dapps/localtx/Transaction', () => {
it('renders without crashing', () => { it('renders without crashing', () => {
const transaction = { const transaction = {
hash: '0x1234567890', hash: '0x1234567890',
nonce: 15, nonce: new BigNumber(15),
gasPrice: new BigNumber(10), gasPrice: new BigNumber(10),
gas: new BigNumber(10) gas: new BigNumber(10)
}; };

View File

@ -105,13 +105,19 @@ impl SyncProvider for TestSyncProvider {
first_seen: 10, first_seen: 10,
propagated_to: map![ propagated_to: map![
128.into() => 16 128.into() => 16
] ],
received_from: map![
1.into() => 10
],
}, },
5.into() => TransactionStats { 5.into() => TransactionStats {
first_seen: 16, first_seen: 16,
propagated_to: map![ propagated_to: map![
16.into() => 1 16.into() => 1
] ],
received_from: map![
256.into() => 2
],
} }
] ]
} }

View File

@ -127,6 +127,9 @@ pub struct TransactionStats {
/// Peers this transaction was propagated to with count. /// Peers this transaction was propagated to with count.
#[serde(rename="propagatedTo")] #[serde(rename="propagatedTo")]
pub propagated_to: BTreeMap<H512, usize>, pub propagated_to: BTreeMap<H512, usize>,
/// Peers that propagated this transaction back.
#[serde(rename="receivedFrom")]
pub received_from: BTreeMap<H512, usize>,
} }
impl From<SyncPeerInfo> for PeerInfo { impl From<SyncPeerInfo> for PeerInfo {
@ -157,7 +160,11 @@ impl From<SyncTransactionStats> for TransactionStats {
propagated_to: s.propagated_to propagated_to: s.propagated_to
.into_iter() .into_iter()
.map(|(id, count)| (id.into(), count)) .map(|(id, count)| (id.into(), count))
.collect() .collect(),
received_from: s.received_from
.into_iter()
.map(|(id, count)| (id.into(), count))
.collect(),
} }
} }
} }
@ -208,10 +215,13 @@ mod tests {
first_seen: 100, first_seen: 100,
propagated_to: map![ propagated_to: map![
10.into() => 50 10.into() => 50
] ],
received_from: map![
1.into() => 1000
],
}; };
let serialized = serde_json::to_string(&stats).unwrap(); let serialized = serde_json::to_string(&stats).unwrap();
assert_eq!(serialized, r#"{"firstSeen":100,"propagatedTo":{"0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a":50}}"#) assert_eq!(serialized, r#"{"firstSeen":100,"propagatedTo":{"0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a":50},"receivedFrom":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001":1000}}"#)
} }
} }

View File

@ -99,6 +99,8 @@ pub struct TransactionStats {
pub first_seen: u64, pub first_seen: u64,
/// Peers it was propagated to. /// Peers it was propagated to.
pub propagated_to: BTreeMap<H512, usize>, pub propagated_to: BTreeMap<H512, usize>,
/// Peers that propagated the transaction back.
pub received_from: BTreeMap<H512, usize>,
} }
/// Peer connection information /// Peer connection information
@ -144,7 +146,7 @@ pub struct EthSync {
network: NetworkService, network: NetworkService,
/// Main (eth/par) protocol handler /// Main (eth/par) protocol handler
sync_handler: Arc<SyncProtocolHandler>, sync_handler: Arc<SyncProtocolHandler>,
/// Light (les) protocol handler /// Light (les) protocol handler
light_proto: Option<Arc<LightProtocol>>, light_proto: Option<Arc<LightProtocol>>,
/// The main subprotocol name /// The main subprotocol name
subprotocol_name: [u8; 3], subprotocol_name: [u8; 3],
@ -155,7 +157,7 @@ pub struct EthSync {
impl EthSync { impl EthSync {
/// Creates and register protocol with the network service /// Creates and register protocol with the network service
pub fn new(params: Params) -> Result<Arc<EthSync>, NetworkError> { pub fn new(params: Params) -> Result<Arc<EthSync>, NetworkError> {
let pruning_info = params.chain.pruning_info(); let pruning_info = params.chain.pruning_info();
let light_proto = match params.config.serve_light { let light_proto = match params.config.serve_light {
false => None, false => None,
true => Some({ true => Some({
@ -297,7 +299,7 @@ impl ChainNotify for EthSync {
Some(lp) => lp, Some(lp) => lp,
None => return, None => return,
}; };
let chain_info = self.sync_handler.chain.chain_info(); let chain_info = self.sync_handler.chain.chain_info();
light_proto.make_announcement(context, Announcement { light_proto.make_announcement(context, Announcement {
head_hash: chain_info.best_block_hash, head_hash: chain_info.best_block_hash,
@ -323,7 +325,7 @@ impl ChainNotify for EthSync {
// register the warp sync subprotocol // register the warp sync subprotocol
self.network.register_protocol(self.sync_handler.clone(), WARP_SYNC_PROTOCOL_ID, SNAPSHOT_SYNC_PACKET_COUNT, &[1u8]) self.network.register_protocol(self.sync_handler.clone(), WARP_SYNC_PROTOCOL_ID, SNAPSHOT_SYNC_PACKET_COUNT, &[1u8])
.unwrap_or_else(|e| warn!("Error registering snapshot sync protocol: {:?}", e)); .unwrap_or_else(|e| warn!("Error registering snapshot sync protocol: {:?}", e));
// register the light protocol. // register the light protocol.
if let Some(light_proto) = self.light_proto.as_ref().map(|x| x.clone()) { if let Some(light_proto) = self.light_proto.as_ref().map(|x| x.clone()) {
self.network.register_protocol(light_proto, self.light_subprotocol_name, ::light::net::PACKET_COUNT, ::light::net::PROTOCOL_VERSIONS) self.network.register_protocol(light_proto, self.light_subprotocol_name, ::light::net::PACKET_COUNT, ::light::net::PROTOCOL_VERSIONS)
@ -335,6 +337,11 @@ impl ChainNotify for EthSync {
self.sync_handler.snapshot_service.abort_restore(); self.sync_handler.snapshot_service.abort_restore();
self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e)); self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e));
} }
fn transactions_imported(&self, hashes: Vec<H256>, peer_id: Option<H512>, block_number: u64) {
let mut sync = self.sync_handler.sync.write();
sync.transactions_imported(hashes, peer_id, block_number);
}
} }
/// LES event handler. /// LES event handler.
@ -344,7 +351,8 @@ struct TxRelay(Arc<BlockChainClient>);
impl LightHandler for TxRelay { impl LightHandler for TxRelay {
fn on_transactions(&self, ctx: &EventContext, relay: &[::ethcore::transaction::SignedTransaction]) { fn on_transactions(&self, ctx: &EventContext, relay: &[::ethcore::transaction::SignedTransaction]) {
trace!(target: "les", "Relaying {} transactions from peer {}", relay.len(), ctx.peer()); trace!(target: "les", "Relaying {} transactions from peer {}", relay.len(), ctx.peer());
self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect()) // TODO [ToDr] Can we get a peer enode somehow?
self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect(), None)
} }
} }
@ -547,4 +555,4 @@ pub struct ServiceConfiguration {
pub net: NetworkConfiguration, pub net: NetworkConfiguration,
/// IPC path. /// IPC path.
pub io_path: String, pub io_path: String,
} }

View File

@ -432,6 +432,13 @@ impl ChainSync {
self.transactions_stats.stats() self.transactions_stats.stats()
} }
/// Updates statistics for imported transactions.
pub fn transactions_imported(&mut self, hashes: Vec<H256>, peer_id: Option<H512>, block_number: u64) {
for hash in hashes {
self.transactions_stats.received(hash, peer_id, block_number);
}
}
/// Abort all sync activity /// Abort all sync activity
pub fn abort(&mut self, io: &mut SyncIo) { pub fn abort(&mut self, io: &mut SyncIo) {
self.reset_and_continue(io); self.reset_and_continue(io);
@ -1409,7 +1416,8 @@ impl ChainSync {
let tx = rlp.as_raw().to_vec(); let tx = rlp.as_raw().to_vec();
transactions.push(tx); transactions.push(tx);
} }
io.chain().queue_transactions(transactions); let id = io.peer_session_info(peer_id).and_then(|info| info.id);
io.chain().queue_transactions(transactions, id);
Ok(()) Ok(())
} }

View File

@ -26,6 +26,7 @@ type BlockNumber = u64;
pub struct Stats { pub struct Stats {
first_seen: BlockNumber, first_seen: BlockNumber,
propagated_to: HashMap<NodeId, usize>, propagated_to: HashMap<NodeId, usize>,
received_from: HashMap<NodeId, usize>,
} }
impl Stats { impl Stats {
@ -33,6 +34,7 @@ impl Stats {
Stats { Stats {
first_seen: number, first_seen: number,
propagated_to: Default::default(), propagated_to: Default::default(),
received_from: Default::default(),
} }
} }
} }
@ -45,6 +47,10 @@ impl<'a> From<&'a Stats> for TransactionStats {
.iter() .iter()
.map(|(hash, size)| (*hash, *size)) .map(|(hash, size)| (*hash, *size))
.collect(), .collect(),
received_from: other.received_from
.iter()
.map(|(hash, size)| (*hash, *size))
.collect(),
} }
} }
} }
@ -63,6 +69,14 @@ impl TransactionsStats {
*count = count.saturating_add(1); *count = count.saturating_add(1);
} }
/// Increase number of back-propagations from given `enodeid`.
pub fn received(&mut self, hash: H256, enode_id: Option<NodeId>, current_block_num: BlockNumber) {
let enode_id = enode_id.unwrap_or_default();
let mut stats = self.pending_transactions.entry(hash).or_insert_with(|| Stats::new(current_block_num));
let mut count = stats.received_from.entry(enode_id).or_insert(0);
*count = count.saturating_add(1);
}
/// Returns propagation stats for given hash or `None` if hash is not known. /// Returns propagation stats for given hash or `None` if hash is not known.
#[cfg(test)] #[cfg(test)]
pub fn get(&self, hash: &H256) -> Option<&Stats> { pub fn get(&self, hash: &H256) -> Option<&Stats> {
@ -112,6 +126,32 @@ mod tests {
propagated_to: hash_map![ propagated_to: hash_map![
enodeid1 => 2, enodeid1 => 2,
enodeid2 => 1 enodeid2 => 1
],
received_from: Default::default(),
}));
}
#[test]
fn should_keep_track_of_back_propagations() {
// given
let mut stats = TransactionsStats::default();
let hash = 5.into();
let enodeid1 = 2.into();
let enodeid2 = 5.into();
// when
stats.received(hash, Some(enodeid1), 5);
stats.received(hash, Some(enodeid1), 10);
stats.received(hash, Some(enodeid2), 15);
// then
let stats = stats.get(&hash);
assert_eq!(stats, Some(&Stats {
first_seen: 5,
propagated_to: Default::default(),
received_from: hash_map![
enodeid1 => 2,
enodeid2 => 1
] ]
})); }));
} }