diff --git a/sync/src/chain.rs b/sync/src/chain.rs index ec0659a6e..e853cf4e3 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -1074,18 +1074,22 @@ impl ChainSync { self.peers.iter().map(|peer| (peer.0.clone(), peer.1.latest.clone())).collect() } + fn get_lagging_peers(&self, io: &SyncIo) -> Vec<(usize, H256)> { + let chain = io.chain(); + let chain_info = chain.chain_info(); + let latest_hash = chain_info.best_block_hash; + self.query_peer_latest_blocks().iter().filter(|peer| + match io.chain().block_status(&peer.1) + { + BlockStatus::InChain => peer.1 != latest_hash, + _ => false + }).cloned().collect::>() + } + fn propagade_blocks(&mut self, io: &mut SyncIo) -> usize { let updated_peers = { - let chain = io.chain(); - let chain_info = chain.chain_info(); - let latest_hash = chain_info.best_block_hash; - let lagging_peers = self.query_peer_latest_blocks().iter().filter(|peer| - match io.chain().block_status(&peer.1) - { - BlockStatus::InChain => peer.1 != latest_hash, - _ => false - }).cloned().collect::>(); + let lagging_peers = self.get_lagging_peers(io); let lucky_peers = match lagging_peers.len() { 0 ... MIN_PEERS_PROPAGATION => lagging_peers, @@ -1117,7 +1121,7 @@ impl ChainSync { if self.state == SyncState::Idle { let blocks_propagaded = self.propagade_blocks(io); - debug!(target: "sync", "Sent new blocks to peers: {:?}", blocks_propagaded); + trace!(target: "sync", "Sent new blocks to peers: {:?}", blocks_propagaded); } } } @@ -1127,6 +1131,8 @@ mod tests { use tests::helpers::*; use super::*; use util::*; + use super::{PeerInfo, PeerAsking}; + use ethcore::header::{BlockNumber}; #[test] fn return_receipts_empty() { @@ -1195,4 +1201,68 @@ mod tests { sync.on_packet(&mut io, 1usize, super::GET_NODE_DATA_PACKET, &node_request); assert_eq!(1, io.queue.len()); } + + fn dummy_sync_with_peer(peer_latest_hash: H256) -> ChainSync { + let mut sync = ChainSync::new(); + sync.peers.insert(0, + PeerInfo { + protocol_version: 0, + genesis: H256::zero(), + network_id: U256::zero(), + latest: peer_latest_hash, + difficulty: U256::zero(), + asking: PeerAsking::Nothing, + asking_blocks: Vec::::new(), + ask_time: 0f64, + }); + sync + } + + #[test] + fn finds_lagging_peers() { + let mut client = TestBlockChainClient::new(); + client.add_blocks(100, false); + let mut queue = VecDeque::new(); + let sync = dummy_sync_with_peer(client.block_hash_delta_minus(10)); + let io = TestIo::new(&mut client, &mut queue, None); + + let lagging_peers = sync.get_lagging_peers(&io); + + assert_eq!(1, lagging_peers.len()) + } + + #[test] + fn calculates_tree_for_lagging_peer() { + let mut client = TestBlockChainClient::new(); + client.add_blocks(15, false); + + let start = client.block_hash_delta_minus(4); + let end = client.block_hash_delta_minus(2); + + // wrong way end -> start, should be None + let rlp = ChainSync::create_new_hashes_rlp(&client, &end, &start); + assert!(rlp.is_none()); + + let rlp = ChainSync::create_new_hashes_rlp(&client, &start, &end).unwrap(); + // size of three rlp encoded hash + assert_eq!(101, rlp.len()); + } + + #[test] + fn sends_packet_to_lagging_peer() { + let mut client = TestBlockChainClient::new(); + client.add_blocks(20, false); + let mut queue = VecDeque::new(); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let mut io = TestIo::new(&mut client, &mut queue, None); + + let block_count = sync.propagade_blocks(&mut io); + + // 1 message should be send + assert_eq!(1, io.queue.len()); + // 1 peer should be updated + assert_eq!(1, block_count); + // NEW_BLOCK_HASHES_PACKET + assert_eq!(0x01, io.queue[0].packet_id); + } } \ No newline at end of file diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index 43db4c428..a9aeb2e34 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -122,14 +122,18 @@ fn status_packet() { #[test] fn propagade() { - let mut net = TestNet::new(2); - net.peer_mut(0).chain.add_blocks(100, false); - net.peer_mut(1).chain.add_blocks(100, false); + let mut net = TestNet::new(3); + net.peer_mut(1).chain.add_blocks(1000, false); + net.peer_mut(2).chain.add_blocks(1000, false); net.sync(); + let status = net.peer(0).sync.status(); + assert_eq!(status.state, SyncState::Idle); net.peer_mut(0).chain.add_blocks(10, false); net.sync_step_peer(0); - assert_eq!(1, net.peer(0).queue.len()); + // 2 peers to sync + assert_eq!(2, net.peer(0).queue.len()); + // NEW_BLOCK_HASHES_PACKET assert_eq!(0x01, net.peer(0).queue[0].packet_id); } \ No newline at end of file diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index 1e9e70c2f..2be501ebb 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -69,6 +69,12 @@ impl TestBlockChainClient { self.import_block(rlp.as_raw().to_vec()).unwrap(); } } + + pub fn block_hash_delta_minus(&mut self, delta: usize) -> H256 { + let blocks_read = self.numbers.read().unwrap(); + let index = blocks_read.len() - delta; + blocks_read[&index].clone() + } } impl BlockChainClient for TestBlockChainClient { @@ -125,11 +131,33 @@ impl BlockChainClient for TestBlockChainClient { } } - fn tree_route(&self, _from: &H256, _to: &H256) -> Option { + // works only if blocks are one after another 1 -> 2 -> 3 + fn tree_route(&self, from: &H256, to: &H256) -> Option { Some(TreeRoute { - blocks: Vec::new(), ancestor: H256::new(), - index: 0 + index: 0, + blocks: { + let numbers_read = self.numbers.read().unwrap(); + let mut adding = false; + + let mut blocks = Vec::new(); + for (_, hash) in numbers_read.iter().sort_by(|tuple1, tuple2| tuple1.0.cmp(tuple2.0)) { + if hash == to { + if adding { + blocks.push(hash.clone()); + } + adding = false; + break; + } + if hash == from { + adding = true; + } + if adding { + blocks.push(hash.clone()); + } + } + if adding { Vec::new() } else { blocks } + } }) }