diff --git a/sync/src/chain.rs b/sync/src/chain.rs index fc0a19aba..e598c4572 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -207,7 +207,7 @@ pub struct ChainSync { /// True if common block for our and remote chain has been found have_common_block: bool, /// Last propagated block number - last_send_block_number: BlockNumber, + last_sent_block_number: BlockNumber, /// Max blocks to download ahead max_download_ahead_blocks: usize, /// Network ID @@ -236,7 +236,7 @@ impl ChainSync { last_imported_hash: None, syncing_difficulty: U256::from(0u64), have_common_block: false, - last_send_block_number: 0, + last_sent_block_number: 0, max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), network_id: config.network_id, transaction_queue: Mutex::new(TransactionQueue::new()), @@ -1248,26 +1248,25 @@ impl ChainSync { sent } + fn propagate_latest_blocks(&mut self, io: &mut SyncIo) { + let chain_info = io.chain().chain_info(); + if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { + let blocks = self.propagate_blocks(&chain_info, io); + let hashes = self.propagate_new_hashes(&chain_info, io); + if blocks != 0 || hashes != 0 { + trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); + } + } + self.last_sent_block_number = chain_info.best_block_number; + } + /// Maintain other peers. Send out any new blocks and transactions pub fn maintain_sync(&mut self, io: &mut SyncIo) { self.check_resume(io); } - /// should be called once chain has new block, triggers the latest block propagation - pub fn chain_blocks_verified(&mut self, io: &mut SyncIo) { - let chain = io.chain().chain_info(); - if (((chain.best_block_number as i64) - (self.last_send_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { - let blocks = self.propagate_blocks(&chain, io); - let hashes = self.propagate_new_hashes(&chain, io); - if blocks != 0 || hashes != 0 { - trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); - } - } - self.last_send_block_number = chain.best_block_number; - } - - /// called when block is imported to chain, updates transactions queue - pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], bad: &[H256], _retracted: &[H256]) { + /// called when block is imported to chain, updates transactions queue and propagates the blocks + pub fn chain_new_blocks(&mut self, io: &mut SyncIo, good: &[H256], bad: &[H256], _retracted: &[H256]) { fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec { let block = chain .block(BlockId::Hash(hash.clone())) @@ -1278,24 +1277,31 @@ impl ChainSync { } - let chain = io.chain(); - let good = good.par_iter().map(|h| fetch_transactions(chain, h)); - let bad = bad.par_iter().map(|h| fetch_transactions(chain, h)); + { + let chain = io.chain(); + let good = good.par_iter().map(|h| fetch_transactions(chain, h)); + let bad = bad.par_iter().map(|h| fetch_transactions(chain, h)); - good.for_each(|txs| { - let mut transaction_queue = self.transaction_queue.lock().unwrap(); - let hashes = txs.iter().map(|tx| tx.hash()).collect::>(); - transaction_queue.remove_all(&hashes, |a| chain.nonce(a)); - }); - bad.for_each(|txs| { - // populate sender - for tx in &txs { - let _sender = tx.sender(); - } - let mut transaction_queue = self.transaction_queue.lock().unwrap(); - transaction_queue.add_all(txs, |a| chain.nonce(a)); - }); + good.for_each(|txs| { + let mut transaction_queue = self.transaction_queue.lock().unwrap(); + let hashes = txs.iter().map(|tx| tx.hash()).collect::>(); + transaction_queue.remove_all(&hashes, |a| chain.nonce(a)); + }); + bad.for_each(|txs| { + // populate sender + for tx in &txs { + let _sender = tx.sender(); + } + let mut transaction_queue = self.transaction_queue.lock().unwrap(); + transaction_queue.add_all(txs, |a| chain.nonce(a)); + }); + } + + // Propagate latests blocks + self.propagate_latest_blocks(io); + // TODO [todr] propagate transactions? } + } #[cfg(test)] @@ -1634,13 +1640,13 @@ mod tests { let retracted_blocks = vec![client.block_hash_delta_minus(1)]; let mut queue = VecDeque::new(); - let io = TestIo::new(&mut client, &mut queue, None); + let mut io = TestIo::new(&mut client, &mut queue, None); // when - sync.chain_new_blocks(&io, &[], &good_blocks, &[]); + sync.chain_new_blocks(&mut io, &[], &good_blocks, &[]); assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0); assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1); - sync.chain_new_blocks(&io, &good_blocks, &retracted_blocks, &[]); + sync.chain_new_blocks(&mut io, &good_blocks, &retracted_blocks, &[]); // then let status = sync.transaction_queue.lock().unwrap().status(); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 8a30385a2..b5869642c 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -154,13 +154,11 @@ impl NetworkProtocolHandler for EthSync { fn message(&self, io: &NetworkContext, message: &SyncMessage) { match *message { - SyncMessage::BlockVerified => { - self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref())); - }, SyncMessage::NewChainBlocks { ref good, ref bad, ref retracted } => { - let sync_io = NetSyncIo::new(io, self.chain.deref()); - self.sync.write().unwrap().chain_new_blocks(&sync_io, good, bad, retracted); - } + let mut sync_io = NetSyncIo::new(io, self.chain.deref()); + self.sync.write().unwrap().chain_new_blocks(&mut sync_io, good, bad, retracted); + }, + _ => {/* Ignore other messages */}, } } } diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index 58f50916e..855aa79a6 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -129,8 +129,8 @@ fn propagate_hashes() { net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle); net.sync(); - net.trigger_block_verified(0); //first event just sets the marker - net.trigger_block_verified(0); + net.trigger_chain_new_blocks(0); //first event just sets the marker + net.trigger_chain_new_blocks(0); // 5 peers to sync assert_eq!(5, net.peer(0).queue.len()); @@ -154,8 +154,8 @@ fn propagate_blocks() { net.sync(); net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle); - net.trigger_block_verified(0); //first event just sets the marker - net.trigger_block_verified(0); + net.trigger_chain_new_blocks(0); //first event just sets the marker + net.trigger_chain_new_blocks(0); assert!(!net.peer(0).queue.is_empty()); // NEW_BLOCK_PACKET diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index 5b53ad90b..d01dba0b2 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -455,8 +455,8 @@ impl TestNet { self.peers.iter().all(|p| p.queue.is_empty()) } - pub fn trigger_block_verified(&mut self, peer_id: usize) { + pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) { let mut peer = self.peer_mut(peer_id); - peer.sync.chain_blocks_verified(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); + peer.sync.chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[]); } }