Beta backports (#4067)

* Improving logs for transaction propagation

Conflicts:
	sync/src/chain.rs

* Propagate only on timer

Conflicts:
	sync/src/chain.rs

* Maintaining a list of transactions propagated from other peers

Conflicts:
	ethcore/src/client/client.rs
	ethcore/src/client/traits.rs
	js/src/dapps/localtx/Transaction/transaction.js
	js/src/dapps/localtx/Transaction/transaction.spec.js
	rpc/src/v1/tests/helpers/sync_provider.rs
	rpc/src/v1/types/sync.rs
	sync/src/api.rs
	sync/src/chain.rs
	sync/src/transactions_stats.rs

* fixing test

Conflicts:
	rpc/src/v1/tests/mocked/parity.rs

* Returning persistent node id

Conflicts:
	ethcore/light/src/net/context.rs
	ethcore/light/src/net/tests/mod.rs
	sync/src/api.rs

* Prevent broadcasting transactions to peer that send them.

Conflicts:
	js/src/dapps/localtx/Transaction/transaction.js
	rpc/src/v1/tests/helpers/sync_provider.rs
	rpc/src/v1/tests/mocked/parity.rs
	rpc/src/v1/types/sync.rs
	sync/src/api.rs
	sync/src/chain.rs
	sync/src/transactions_stats.rs

* Bumping versions

* Fixing broken classic.json

Former-commit-id: 2cadea67a0a2f60dd11e0fb943bb0c79b42ab4eb
This commit is contained in:
Tomasz Drwięga
2017-01-06 16:05:35 +01:00
committed by Gav Wood
parent a6204ab7bf
commit 091a59ef0b
15 changed files with 82 additions and 47 deletions

View File

@@ -73,7 +73,7 @@ pub trait SyncProvider: Send + Sync {
/// Get peers information
fn peers(&self) -> Vec<PeerInfo>;
/// Get the enode if available.
fn enode(&self) -> Option<String>;
}
@@ -233,6 +233,10 @@ impl ChainNotify for EthSync {
fn stop(&self) {
self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e));
}
fn transactions_received(&self, hashes: Vec<H256>, peer_id: PeerId) {
self.handler.sync.write().transactions_received(hashes, peer_id);
}
}
impl IpcConfig for ManageNetwork { }

View File

@@ -438,6 +438,13 @@ impl ChainSync {
.collect()
}
/// Updates transactions were received by a peer
pub fn transactions_received(&mut self, hashes: Vec<H256>, peer_id: PeerId) {
if let Some(mut peer_info) = self.peers.get_mut(&peer_id) {
peer_info.last_sent_transactions.extend(&hashes);
}
}
/// Abort all sync activity
pub fn abort(&mut self, io: &mut SyncIo) {
self.reset_and_continue(io);
@@ -1422,7 +1429,7 @@ impl ChainSync {
}
let mut item_count = r.item_count();
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
trace!(target: "sync", "{:02} -> Transactions ({} entries)", peer_id, item_count);
item_count = min(item_count, MAX_TX_TO_IMPORT);
let mut transactions = Vec::with_capacity(item_count);
for i in 0 .. item_count {
@@ -1434,7 +1441,7 @@ impl ChainSync {
let tx = rlp.as_raw().to_vec();
transactions.push(tx);
}
io.chain().queue_transactions(transactions);
io.chain().queue_transactions(transactions, peer_id);
Ok(())
}
@@ -1935,7 +1942,7 @@ impl ChainSync {
// Send all transactions
if peer_info.last_sent_transactions.is_empty() {
peer_info.last_sent_transactions = all_transactions_hashes.clone();
return Some((*peer_id, all_transactions_rlp.clone()));
return Some((*peer_id, all_transactions_hashes.len(), all_transactions_rlp.clone()));
}
// Get hashes of all transactions to send to this peer
@@ -1953,20 +1960,22 @@ impl ChainSync {
}
peer_info.last_sent_transactions = all_transactions_hashes.clone();
Some((*peer_id, packet.out()))
Some((*peer_id, to_send.len(), packet.out()))
})
.collect::<Vec<_>>();
// Send RLPs
let sent = lucky_peers.len();
if sent > 0 {
for (peer_id, rlp) in lucky_peers {
let peers = lucky_peers.len();
if peers > 0 {
let mut max_sent = 0;
for (peer_id, sent, rlp) in lucky_peers {
self.send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp);
trace!(target: "sync", "{:02} <- Transactions ({} entries)", peer_id, sent);
max_sent = max(max_sent, sent);
}
trace!(target: "sync", "Sent up to {} transactions to {} peers.", transactions.len(), sent);
debug!(target: "sync", "Sent up to {} transactions to {} peers.", max_sent, peers);
}
sent
peers
}
fn propagate_latest_blocks(&mut self, io: &mut SyncIo, sealed: &[H256]) {
@@ -1986,7 +1995,6 @@ impl ChainSync {
trace!(target: "sync", "Sent sealed block to all peers");
};
}
self.propagate_new_transactions(io);
self.last_sent_block_number = chain_info.best_block_number;
}
@@ -1999,7 +2007,9 @@ impl ChainSync {
/// called when block is imported to chain - propagates the blocks and updates transactions sent to peers
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], enacted: &[H256], _retracted: &[H256], sealed: &[H256]) {
let queue_info = io.chain().queue_info();
if !self.status().is_syncing(queue_info) || !sealed.is_empty() {
let is_syncing = self.status().is_syncing(queue_info);
if !is_syncing || !sealed.is_empty() {
trace!(target: "sync", "Propagating blocks, state={:?}", self.state);
self.propagate_latest_blocks(io, sealed);
}
@@ -2008,7 +2018,7 @@ impl ChainSync {
self.restart(io);
}
if !enacted.is_empty() {
if !is_syncing && !enacted.is_empty() {
// Select random peers to re-broadcast transactions to.
let mut random = random::new();
let len = self.peers.len();
@@ -2358,7 +2368,7 @@ mod tests {
}
#[test]
fn should_not_propagate_transactions_again_after_new_block() {
fn does_not_propagate_new_transactions_after_new_block() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
client.insert_transaction_to_queue();
@@ -2367,6 +2377,8 @@ mod tests {
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &mut queue, None);
let peer_count = sync.propagate_new_transactions(&mut io);
// New block import should not trigger propagation.
// (we only propagate on timeout)
sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[]);
// Try to propagate same transactions for the second time
let peer_count2 = sync.propagate_new_transactions(&mut io);