TestNet flushing and cleanup

This commit is contained in:
keorn 2016-12-12 19:44:24 +01:00
parent 722cd4d086
commit 19adb84527
4 changed files with 49 additions and 40 deletions

View File

@ -596,6 +596,7 @@ impl Engine for Tendermint {
let proposal = ConsensusMessage::new_proposal(header).expect("block went through full verification; this Engine verifies new_proposal creation; qed"); let proposal = ConsensusMessage::new_proposal(header).expect("block went through full verification; this Engine verifies new_proposal creation; qed");
if signatures_len != 1 { if signatures_len != 1 {
// New Commit received, skip to next height. // New Commit received, skip to next height.
trace!(target: "poa", "Received a commit for height {}, round {}.", proposal.height, proposal.round);
self.to_next_height(proposal.height); self.to_next_height(proposal.height);
return false; return false;
} }

View File

@ -1822,18 +1822,18 @@ impl ChainSync {
} }
/// returns peer ids that have different blocks than our chain /// returns peer ids that have different blocks than our chain
fn get_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &SyncIo) -> Vec<PeerId> { fn get_lagging_peers(&mut self, chain_info: &BlockChainInfo) -> Vec<PeerId> {
let latest_hash = chain_info.best_block_hash; let latest_hash = chain_info.best_block_hash;
self.peers.iter_mut().filter_map(|(&id, ref mut peer_info)| self
match io.chain().block_status(BlockId::Hash(peer_info.latest_hash.clone())) { .peers
BlockStatus::InChain => { .iter_mut()
.filter_map(|(&id, ref mut peer_info)| {
trace!(target: "sync", "Checking peer our best {} their best {}", latest_hash, peer_info.latest_hash);
if peer_info.latest_hash != latest_hash { if peer_info.latest_hash != latest_hash {
Some(id) Some(id)
} else { } else {
None None
} }
},
_ => None
}) })
.collect::<Vec<_>>() .collect::<Vec<_>>()
} }
@ -1980,7 +1980,7 @@ impl ChainSync {
fn propagate_latest_blocks(&mut self, io: &mut SyncIo, sealed: &[H256]) { fn propagate_latest_blocks(&mut self, io: &mut SyncIo, sealed: &[H256]) {
let chain_info = io.chain().chain_info(); let chain_info = io.chain().chain_info();
if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
let mut peers = self.get_lagging_peers(&chain_info, io); let mut peers = self.get_lagging_peers(&chain_info);
if sealed.is_empty() { if sealed.is_empty() {
let hashes = self.propagate_new_hashes(&chain_info, io, &peers); let hashes = self.propagate_new_hashes(&chain_info, io, &peers);
peers = ChainSync::select_random_peers(&peers); peers = ChainSync::select_random_peers(&peers);
@ -2272,13 +2272,10 @@ mod tests {
fn finds_lagging_peers() { fn finds_lagging_peers() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle); client.add_blocks(100, EachBlockWith::Uncle);
let queue = RwLock::new(VecDeque::new());
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10), &client); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10), &client);
let chain_info = client.chain_info(); let chain_info = client.chain_info();
let ss = TestSnapshotService::new();
let io = TestIo::new(&mut client, &ss, &queue, None);
let lagging_peers = sync.get_lagging_peers(&chain_info, &io); let lagging_peers = sync.get_lagging_peers(&chain_info);
assert_eq!(1, lagging_peers.len()); assert_eq!(1, lagging_peers.len());
} }
@ -2310,7 +2307,7 @@ mod tests {
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None);
let peers = sync.get_lagging_peers(&chain_info, &io); let peers = sync.get_lagging_peers(&chain_info);
let peer_count = sync.propagate_new_hashes(&chain_info, &mut io, &peers); let peer_count = sync.propagate_new_hashes(&chain_info, &mut io, &peers);
// 1 message should be send // 1 message should be send
@ -2330,7 +2327,7 @@ mod tests {
let chain_info = client.chain_info(); let chain_info = client.chain_info();
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None);
let peers = sync.get_lagging_peers(&chain_info, &io); let peers = sync.get_lagging_peers(&chain_info);
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[], &peers); let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[], &peers);
// 1 message should be send // 1 message should be send
@ -2351,7 +2348,7 @@ mod tests {
let chain_info = client.chain_info(); let chain_info = client.chain_info();
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None);
let peers = sync.get_lagging_peers(&chain_info, &io); let peers = sync.get_lagging_peers(&chain_info);
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[hash.clone()], &peers); let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[hash.clone()], &peers);
// 1 message should be send // 1 message should be send
@ -2365,15 +2362,35 @@ mod tests {
#[test] #[test]
fn sends_proposed_block() { fn sends_proposed_block() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle); client.add_blocks(2, EachBlockWith::Uncle);
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let block = client.block(BlockId::Latest).unwrap(); let block = client.block(BlockId::Latest).unwrap();
let mut sync = dummy_sync_with_peer(client.block_hash(BlockId::Latest).unwrap(), &client); let mut sync = ChainSync::new(SyncConfig::default(), &client);
sync.peers.insert(0,
PeerInfo {
// Messaging protocol
protocol_version: 2,
genesis: H256::zero(),
network_id: 0,
latest_hash: client.block_hash_delta_minus(1),
difficulty: None,
asking: PeerAsking::Nothing,
asking_blocks: Vec::new(),
asking_hash: None,
ask_time: 0,
last_sent_transactions: HashSet::new(),
expired: false,
confirmation: super::ForkConfirmation::Confirmed,
snapshot_number: None,
snapshot_hash: None,
asking_snapshot_data: None,
block_set: None,
});
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None);
sync.propagate_proposed_blocks(&mut io, &[block]); sync.propagate_proposed_blocks(&mut io, &[block]);
// 1 message should be send // 1 message should be sent
assert_eq!(1, io.packets.len()); assert_eq!(1, io.packets.len());
// NEW_BLOCK_PACKET // NEW_BLOCK_PACKET
assert_eq!(0x07, io.packets[0].packet_id); assert_eq!(0x07, io.packets[0].packet_id);
@ -2582,7 +2599,7 @@ mod tests {
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None);
let peers = sync.get_lagging_peers(&chain_info, &io); let peers = sync.get_lagging_peers(&chain_info);
sync.propagate_new_hashes(&chain_info, &mut io, &peers); sync.propagate_new_hashes(&chain_info, &mut io, &peers);
let data = &io.packets[0].data.clone(); let data = &io.packets[0].data.clone();
@ -2602,7 +2619,7 @@ mod tests {
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None);
let peers = sync.get_lagging_peers(&chain_info, &io); let peers = sync.get_lagging_peers(&chain_info);
sync.propagate_blocks(&chain_info, &mut io, &[], &peers); sync.propagate_blocks(&chain_info, &mut io, &[], &peers);
let data = &io.packets[0].data.clone(); let data = &io.packets[0].data.clone();

View File

@ -115,8 +115,6 @@ fn authority_round() {
#[test] #[test]
fn tendermint() { fn tendermint() {
::env_logger::init().ok();
let s0 = KeyPair::from_secret("1".sha3()).unwrap(); let s0 = KeyPair::from_secret("1".sha3()).unwrap();
let s1 = KeyPair::from_secret("0".sha3()).unwrap(); let s1 = KeyPair::from_secret("0".sha3()).unwrap();
let spec_factory = || { let spec_factory = || {
@ -167,7 +165,6 @@ fn tendermint() {
net.sync(); net.sync();
assert_eq!(net.peer(0).chain.chain_info().best_block_number, 2); assert_eq!(net.peer(0).chain.chain_info().best_block_number, 2);
assert_eq!(net.peer(1).chain.chain_info().best_block_number, 2); assert_eq!(net.peer(1).chain.chain_info().best_block_number, 2);
println!("HEIGHT 2");
net.peer(0).chain.miner().import_own_transaction(&*net.peer(0).chain, new_tx(s0.secret(), 1.into())).unwrap(); net.peer(0).chain.miner().import_own_transaction(&*net.peer(0).chain, new_tx(s0.secret(), 1.into())).unwrap();
net.peer(1).chain.miner().import_own_transaction(&*net.peer(1).chain, new_tx(s1.secret(), 1.into())).unwrap(); net.peer(1).chain.miner().import_own_transaction(&*net.peer(1).chain, new_tx(s1.secret(), 1.into())).unwrap();
@ -175,29 +172,21 @@ fn tendermint() {
// Commit // Commit
net.peer(0).chain.engine().step(); net.peer(0).chain.engine().step();
net.peer(1).chain.engine().step(); net.peer(1).chain.engine().step();
// Propose and Prevote // Propose
net.peer(0).chain.engine().step(); net.peer(0).chain.engine().step();
net.peer(1).chain.engine().step(); net.peer(1).chain.engine().step();
// Negotiate through a None round.
println!("RECONNECT");
net.peer(0).chain.miner().import_own_transaction(&*net.peer(0).chain, new_tx(s0.secret(), 2.into())).unwrap(); net.peer(0).chain.miner().import_own_transaction(&*net.peer(0).chain, new_tx(s0.secret(), 2.into())).unwrap();
net.peer(1).chain.miner().import_own_transaction(&*net.peer(1).chain, new_tx(s1.secret(), 2.into())).unwrap(); net.peer(1).chain.miner().import_own_transaction(&*net.peer(1).chain, new_tx(s1.secret(), 2.into())).unwrap();
// Send different prevotes
net.sync(); net.sync();
// Prevote timeout
net.peer(0).chain.engine().step(); net.peer(0).chain.engine().step();
net.peer(1).chain.engine().step(); net.peer(1).chain.engine().step();
// Precommit and commit
net.sync(); net.sync();
net.sync(); // Propose timeout
// Propose timeout.
println!("PROPOSE");
net.peer(0).chain.engine().step(); net.peer(0).chain.engine().step();
net.peer(1).chain.engine().step(); net.peer(1).chain.engine().step();
// Prevote, precommit and commit
net.sync();
net.peer(0).chain.engine().step();
net.peer(1).chain.engine().step();
::std::thread::sleep(::std::time::Duration::from_millis(1000));
net.sync(); net.sync();
let ci0 = net.peer(0).chain.chain_info(); let ci0 = net.peer(0).chain.chain_info();
let ci1 = net.peer(1).chain.chain_info(); let ci1 = net.peer(1).chain.chain_info();

View File

@ -250,6 +250,7 @@ impl<C> TestNet<C> where C: FlushingBlockChainClient {
pub fn sync_step(&mut self) { pub fn sync_step(&mut self) {
for peer in 0..self.peers.len() { for peer in 0..self.peers.len() {
self.peers[peer].chain.flush();
let packet = self.peers[peer].queue.write().pop_front(); let packet = self.peers[peer].queue.write().pop_front();
if let Some(packet) = packet { if let Some(packet) = packet {
let disconnecting = { let disconnecting = {
@ -258,6 +259,7 @@ impl<C> TestNet<C> where C: FlushingBlockChainClient {
let to_disconnect = { let to_disconnect = {
let mut io = TestIo::new(&*p.chain, &p.snapshot_service, &p.queue, Some(peer as PeerId)); let mut io = TestIo::new(&*p.chain, &p.snapshot_service, &p.queue, Some(peer as PeerId));
ChainSync::dispatch_packet(&p.sync, &mut io, peer as PeerId, packet.packet_id, &packet.data); ChainSync::dispatch_packet(&p.sync, &mut io, peer as PeerId, packet.packet_id, &packet.data);
p.chain.flush();
io.to_disconnect.clone() io.to_disconnect.clone()
}; };
for d in &to_disconnect { for d in &to_disconnect {