diff --git a/ethcore/src/engines/tendermint/mod.rs b/ethcore/src/engines/tendermint/mod.rs index 1e7f9eb63..de89658ab 100644 --- a/ethcore/src/engines/tendermint/mod.rs +++ b/ethcore/src/engines/tendermint/mod.rs @@ -596,6 +596,7 @@ impl Engine for Tendermint { let proposal = ConsensusMessage::new_proposal(header).expect("block went through full verification; this Engine verifies new_proposal creation; qed"); if signatures_len != 1 { // New Commit received, skip to next height. + trace!(target: "poa", "Received a commit for height {}, round {}.", proposal.height, proposal.round); self.to_next_height(proposal.height); return false; } diff --git a/sync/src/chain.rs b/sync/src/chain.rs index e30a60595..9c3ef7132 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -1822,18 +1822,18 @@ impl ChainSync { } /// returns peer ids that have different blocks than our chain - fn get_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &SyncIo) -> Vec { + fn get_lagging_peers(&mut self, chain_info: &BlockChainInfo) -> Vec { let latest_hash = chain_info.best_block_hash; - self.peers.iter_mut().filter_map(|(&id, ref mut peer_info)| - match io.chain().block_status(BlockId::Hash(peer_info.latest_hash.clone())) { - BlockStatus::InChain => { - if peer_info.latest_hash != latest_hash { - Some(id) - } else { - None - } - }, - _ => None + self + .peers + .iter_mut() + .filter_map(|(&id, ref mut peer_info)| { + trace!(target: "sync", "Checking peer our best {} their best {}", latest_hash, peer_info.latest_hash); + if peer_info.latest_hash != latest_hash { + Some(id) + } else { + None + } }) .collect::>() } @@ -1980,7 +1980,7 @@ impl ChainSync { fn propagate_latest_blocks(&mut self, io: &mut SyncIo, sealed: &[H256]) { let chain_info = io.chain().chain_info(); if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { - let mut peers = self.get_lagging_peers(&chain_info, io); + let mut peers = self.get_lagging_peers(&chain_info); if sealed.is_empty() { let hashes = self.propagate_new_hashes(&chain_info, io, &peers); peers = ChainSync::select_random_peers(&peers); @@ -2272,13 +2272,10 @@ mod tests { fn finds_lagging_peers() { let mut client = TestBlockChainClient::new(); client.add_blocks(100, EachBlockWith::Uncle); - let queue = RwLock::new(VecDeque::new()); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10), &client); let chain_info = client.chain_info(); - let ss = TestSnapshotService::new(); - let io = TestIo::new(&mut client, &ss, &queue, None); - let lagging_peers = sync.get_lagging_peers(&chain_info, &io); + let lagging_peers = sync.get_lagging_peers(&chain_info); assert_eq!(1, lagging_peers.len()); } @@ -2310,7 +2307,7 @@ mod tests { let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); - let peers = sync.get_lagging_peers(&chain_info, &io); + let peers = sync.get_lagging_peers(&chain_info); let peer_count = sync.propagate_new_hashes(&chain_info, &mut io, &peers); // 1 message should be send @@ -2330,7 +2327,7 @@ mod tests { let chain_info = client.chain_info(); let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); - let peers = sync.get_lagging_peers(&chain_info, &io); + let peers = sync.get_lagging_peers(&chain_info); let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[], &peers); // 1 message should be send @@ -2351,7 +2348,7 @@ mod tests { let chain_info = client.chain_info(); let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); - let peers = sync.get_lagging_peers(&chain_info, &io); + let peers = sync.get_lagging_peers(&chain_info); let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[hash.clone()], &peers); // 1 message should be send @@ -2365,15 +2362,35 @@ mod tests { #[test] fn sends_proposed_block() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, EachBlockWith::Uncle); + client.add_blocks(2, EachBlockWith::Uncle); let queue = RwLock::new(VecDeque::new()); let block = client.block(BlockId::Latest).unwrap(); - let mut sync = dummy_sync_with_peer(client.block_hash(BlockId::Latest).unwrap(), &client); + let mut sync = ChainSync::new(SyncConfig::default(), &client); + sync.peers.insert(0, + PeerInfo { + // Messaging protocol + protocol_version: 2, + genesis: H256::zero(), + network_id: 0, + latest_hash: client.block_hash_delta_minus(1), + difficulty: None, + asking: PeerAsking::Nothing, + asking_blocks: Vec::new(), + asking_hash: None, + ask_time: 0, + last_sent_transactions: HashSet::new(), + expired: false, + confirmation: super::ForkConfirmation::Confirmed, + snapshot_number: None, + snapshot_hash: None, + asking_snapshot_data: None, + block_set: None, + }); let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); sync.propagate_proposed_blocks(&mut io, &[block]); - // 1 message should be send + // 1 message should be sent assert_eq!(1, io.packets.len()); // NEW_BLOCK_PACKET assert_eq!(0x07, io.packets[0].packet_id); @@ -2582,7 +2599,7 @@ mod tests { let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); - let peers = sync.get_lagging_peers(&chain_info, &io); + let peers = sync.get_lagging_peers(&chain_info); sync.propagate_new_hashes(&chain_info, &mut io, &peers); let data = &io.packets[0].data.clone(); @@ -2602,7 +2619,7 @@ mod tests { let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); - let peers = sync.get_lagging_peers(&chain_info, &io); + let peers = sync.get_lagging_peers(&chain_info); sync.propagate_blocks(&chain_info, &mut io, &[], &peers); let data = &io.packets[0].data.clone(); diff --git a/sync/src/tests/consensus.rs b/sync/src/tests/consensus.rs index 2e98dfac5..df845ad2c 100644 --- a/sync/src/tests/consensus.rs +++ b/sync/src/tests/consensus.rs @@ -115,8 +115,6 @@ fn authority_round() { #[test] fn tendermint() { - ::env_logger::init().ok(); - let s0 = KeyPair::from_secret("1".sha3()).unwrap(); let s1 = KeyPair::from_secret("0".sha3()).unwrap(); let spec_factory = || { @@ -167,7 +165,6 @@ fn tendermint() { net.sync(); assert_eq!(net.peer(0).chain.chain_info().best_block_number, 2); assert_eq!(net.peer(1).chain.chain_info().best_block_number, 2); - println!("HEIGHT 2"); net.peer(0).chain.miner().import_own_transaction(&*net.peer(0).chain, new_tx(s0.secret(), 1.into())).unwrap(); net.peer(1).chain.miner().import_own_transaction(&*net.peer(1).chain, new_tx(s1.secret(), 1.into())).unwrap(); @@ -175,29 +172,21 @@ fn tendermint() { // Commit net.peer(0).chain.engine().step(); net.peer(1).chain.engine().step(); - // Propose and Prevote + // Propose net.peer(0).chain.engine().step(); net.peer(1).chain.engine().step(); - // Negotiate through a None round. - println!("RECONNECT"); net.peer(0).chain.miner().import_own_transaction(&*net.peer(0).chain, new_tx(s0.secret(), 2.into())).unwrap(); net.peer(1).chain.miner().import_own_transaction(&*net.peer(1).chain, new_tx(s1.secret(), 2.into())).unwrap(); - - + // Send different prevotes net.sync(); + // Prevote timeout net.peer(0).chain.engine().step(); net.peer(1).chain.engine().step(); + // Precommit and commit net.sync(); - net.sync(); - // Propose timeout. - println!("PROPOSE"); + // Propose timeout net.peer(0).chain.engine().step(); net.peer(1).chain.engine().step(); - // Prevote, precommit and commit - net.sync(); - net.peer(0).chain.engine().step(); - net.peer(1).chain.engine().step(); - ::std::thread::sleep(::std::time::Duration::from_millis(1000)); net.sync(); let ci0 = net.peer(0).chain.chain_info(); let ci1 = net.peer(1).chain.chain_info(); diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index c6b2ec949..e9983edcf 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -250,6 +250,7 @@ impl TestNet where C: FlushingBlockChainClient { pub fn sync_step(&mut self) { for peer in 0..self.peers.len() { + self.peers[peer].chain.flush(); let packet = self.peers[peer].queue.write().pop_front(); if let Some(packet) = packet { let disconnecting = { @@ -258,6 +259,7 @@ impl TestNet where C: FlushingBlockChainClient { let to_disconnect = { let mut io = TestIo::new(&*p.chain, &p.snapshot_service, &p.queue, Some(peer as PeerId)); ChainSync::dispatch_packet(&p.sync, &mut io, peer as PeerId, packet.packet_id, &packet.data); + p.chain.flush(); io.to_disconnect.clone() }; for d in &to_disconnect {