From f74c5dc92110cf8d85b98f79d478524c8460dfe1 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 12 Feb 2016 13:07:02 +0100 Subject: [PATCH 1/6] More sync and propagation fixes --- sync/src/chain.rs | 99 +++++++++++++++++++++++++++------------- sync/src/tests/chain.rs | 30 ++++++++---- util/src/network/host.rs | 1 + 3 files changed, 89 insertions(+), 41 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index f0c0347a9..6717c0814 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -162,6 +162,8 @@ struct PeerInfo { asking: PeerAsking, /// A set of block numbers being requested asking_blocks: Vec, + /// Holds requested header hash if currently requesting block header by hash + asking_hash: Option, /// Request timestamp ask_time: f64, } @@ -179,6 +181,8 @@ pub struct ChainSync { downloading_headers: HashSet, /// Set of block body numbers being downloaded downloading_bodies: HashSet, + /// Set of block headers being downloaded by hash + downloading_hashes: HashSet, /// Downloaded headers. headers: Vec<(BlockNumber, Vec
)>, //TODO: use BTreeMap once range API is sable. For now it is a vector sorted in descending order /// Downloaded bodies @@ -195,6 +199,8 @@ pub struct ChainSync { syncing_difficulty: U256, /// True if common block for our and remote chain has been found have_common_block: bool, + /// Last propagated block number + last_send_block_number: BlockNumber, } type RlpResponseResult = Result, PacketDecodeError>; @@ -208,6 +214,7 @@ impl ChainSync { highest_block: None, downloading_headers: HashSet::new(), downloading_bodies: HashSet::new(), + downloading_hashes: HashSet::new(), headers: Vec::new(), bodies: Vec::new(), peers: HashMap::new(), @@ -216,6 +223,7 @@ impl ChainSync { last_imported_hash: None, syncing_difficulty: U256::from(0u64), have_common_block: false, + last_send_block_number: 0, } } @@ -248,6 +256,7 @@ impl ChainSync { self.bodies.clear(); for (_, ref mut p) in &mut self.peers { p.asking_blocks.clear(); + p.asking_hash = None; } self.header_ids.clear(); self.syncing_difficulty = From::from(0u64); @@ -277,11 +286,16 @@ impl ChainSync { genesis: try!(r.val_at(4)), asking: PeerAsking::Nothing, asking_blocks: Vec::new(), + asking_hash: None, ask_time: 0f64, }; trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest, peer.genesis); + if self.peers.contains_key(&peer_id) { + warn!("Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id)); + return Ok(()); + } let chain_info = io.chain().chain_info(); if peer.genesis != chain_info.genesis_hash { io.disable_peer(peer_id); @@ -294,10 +308,7 @@ impl ChainSync { return Ok(()); } - let old = self.peers.insert(peer_id.clone(), peer); - if old.is_some() { - panic!("ChainSync: new peer already exists"); - } + self.peers.insert(peer_id.clone(), peer); info!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id)); self.sync_peer(io, peer_id, false); Ok(()) @@ -437,6 +448,10 @@ impl ChainSync { trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h); let header: BlockHeader = try!(header_rlp.as_val()); let mut unknown = false; + { + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + peer.latest = header.hash(); + } // TODO: Decompose block and add to self.headers and self.bodies instead if header.number == From::from(self.current_base_block() + 1) { match io.chain().import_block(block_rlp.as_raw().to_vec()) { @@ -469,10 +484,6 @@ impl ChainSync { let peer_difficulty = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").difficulty; if difficulty > peer_difficulty { trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); - { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); - peer.latest = header.hash(); - } self.sync_peer(io, peer_id, true); } } @@ -486,11 +497,14 @@ impl ChainSync { return Ok(()); } trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count()); - let hashes = r.iter().map(|item| (item.val_at::(0), item.val_at::(1))); - let mut max_height: U256 = From::from(0); + let hashes = r.iter().map(|item| (item.val_at::(0), item.val_at::(1))); + let mut max_height: BlockNumber = 0; for (rh, rd) in hashes { let h = try!(rh); let d = try!(rd); + if self.downloading_hashes.contains(&h) { + continue; + } match io.chain().block_status(BlockId::Hash(h.clone())) { BlockStatus::InChain => { trace!(target: "sync", "New block hash already in chain {:?}", h); @@ -499,8 +513,8 @@ impl ChainSync { trace!(target: "sync", "New hash block already queued {:?}", h); }, BlockStatus::Unknown => { - trace!(target: "sync", "New unknown block hash {:?}", h); if d > max_height { + trace!(target: "sync", "New unknown block hash {:?}", h); let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); peer.latest = h.clone(); max_height = d; @@ -513,7 +527,7 @@ impl ChainSync { } } }; - if max_height != x!(0) { + if max_height != 0 { self.sync_peer(io, peer_id, true); } Ok(()) @@ -523,7 +537,7 @@ impl ChainSync { pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) { trace!(target: "sync", "== Disconnecting {}", peer); if self.peers.contains_key(&peer) { - info!(target: "sync", "Disconnected {}:{}", peer, io.peer_info(peer)); + info!(target: "sync", "Disconnected {}", peer); self.clear_peer_download(peer); self.peers.remove(&peer); self.continue_sync(io); @@ -581,6 +595,8 @@ impl ChainSync { self.state = SyncState::Blocks; } trace!(target: "sync", "Starting sync with better chain"); + self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").asking_hash = Some(peer_latest.clone()); + self.downloading_hashes.insert(peer_latest.clone()); self.request_headers_by_hash(io, peer_id, &peer_latest, 1, 0, false); } else if self.state == SyncState::Blocks && io.chain().block_status(BlockId::Hash(peer_latest)) == BlockStatus::Unknown { @@ -673,6 +689,8 @@ impl ChainSync { } } else { + // continue search for common block + self.downloading_headers.insert(start as BlockNumber); self.request_headers_by_number(io, peer_id, start as BlockNumber, 1, 0, false); } } @@ -681,6 +699,9 @@ impl ChainSync { /// Clear all blocks/headers marked as being downloaded by a peer. fn clear_peer_download(&mut self, peer_id: PeerId) { let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + if let Some(hash) = peer.asking_hash.take() { + self.downloading_hashes.remove(&hash); + } for b in &peer.asking_blocks { self.downloading_headers.remove(&b); self.downloading_bodies.remove(&b); @@ -827,7 +848,7 @@ impl ChainSync { { let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); if peer.asking != PeerAsking::Nothing { - warn!(target:"sync", "Asking {:?} while requesting {:?}", asking, peer.asking); + warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); } } match sync.send(peer_id, packet_id, packet) { @@ -844,6 +865,14 @@ impl ChainSync { } } + /// Generic packet sender + fn send_packet(&mut self, sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) { + if let Err(e) = sync.send(peer_id, packet_id, packet) { + warn!(target:"sync", "Error sending packet: {:?}", e); + sync.disable_peer(peer_id); + self.on_peer_aborting(sync, peer_id); + } + } /// Called when peer sends us new transactions fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> { Ok(()) @@ -1089,11 +1118,10 @@ impl ChainSync { let latest_hash = chain_info.best_block_hash; let latest_number = chain_info.best_block_number; self.peers.iter().filter(|&(_, peer_info)| - match io.chain().block_status(BlockId::Hash(peer_info.latest.clone())) - { + match io.chain().block_status(BlockId::Hash(peer_info.latest.clone())) { BlockStatus::InChain => { let peer_number = HeaderView::new(&io.chain().block_header(BlockId::Hash(peer_info.latest.clone())).unwrap()).number(); - peer_info.latest != latest_hash && latest_number > peer_number && latest_number - peer_number < MAX_PEER_LAG_PROPAGATION + peer_info.latest != latest_hash && latest_number > peer_number }, _ => false }) @@ -1102,7 +1130,7 @@ impl ChainSync { } /// propagades latest block to lagging peers - fn propagade_blocks(&mut self, io: &mut SyncIo) -> usize { + fn propagade_blocks(&mut self, local_best: &H256, io: &mut SyncIo) -> usize { let updated_peers = { let lagging_peers = self.get_lagging_peers(io); @@ -1118,10 +1146,9 @@ impl ChainSync { }; let mut sent = 0; - let local_best = io.chain().chain_info().best_block_hash; for peer_id in updated_peers { let rlp = ChainSync::create_latest_block_rlp(io.chain()); - self.send_request(io, peer_id, PeerAsking::Nothing, NEW_BLOCK_PACKET, rlp); + self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp); self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").latest = local_best.clone(); sent = sent + 1; } @@ -1129,10 +1156,9 @@ impl ChainSync { } /// propagades new known hashes to all peers - fn propagade_new_hashes(&mut self, io: &mut SyncIo) -> usize { + fn propagade_new_hashes(&mut self, local_best: &H256, io: &mut SyncIo) -> usize { let updated_peers = self.get_lagging_peers(io); let mut sent = 0; - let local_best = io.chain().chain_info().best_block_hash; for peer_id in updated_peers { sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &self.peers.get(&peer_id).expect("ChainSync: unknown peer").latest, &local_best) { Some(rlp) => { @@ -1140,7 +1166,7 @@ impl ChainSync { let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); peer.latest = local_best.clone(); } - self.send_request(io, peer_id, PeerAsking::Nothing, NEW_BLOCK_HASHES_PACKET, rlp); + self.send_packet(io, peer_id, NEW_BLOCK_HASHES_PACKET, rlp); 1 }, None => 0 @@ -1152,15 +1178,19 @@ impl ChainSync { /// Maintain other peers. Send out any new blocks and transactions pub fn maintain_sync(&mut self, io: &mut SyncIo) { self.check_resume(io); - - let peers = self.propagade_new_hashes(io); - trace!(target: "sync", "Sent new hashes to peers: {:?}", peers); } /// should be called once chain has new block, triggers the latest block propagation pub fn chain_blocks_verified(&mut self, io: &mut SyncIo) { - let peers = self.propagade_blocks(io); - trace!(target: "sync", "Sent latest block to peers: {:?}", peers); + let chain = io.chain().chain_info(); + if (((chain.best_block_number as i64) - (self.last_send_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { + let blocks = self.propagade_blocks(&chain.best_block_hash, io); + let hashes = self.propagade_new_hashes(&chain.best_block_hash, io); + if blocks != 0 || hashes != 0 { + trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); + } + } + self.last_send_block_number = chain.best_block_number; } } @@ -1291,6 +1321,7 @@ mod tests { difficulty: U256::zero(), asking: PeerAsking::Nothing, asking_blocks: Vec::::new(), + asking_hash: None, ask_time: 0f64, }); sync @@ -1332,9 +1363,10 @@ mod tests { client.add_blocks(100, false); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let best_hash = client.chain_info().best_block_hash.clone(); let mut io = TestIo::new(&mut client, &mut queue, None); - let peer_count = sync.propagade_new_hashes(&mut io); + let peer_count = sync.propagade_new_hashes(&best_hash, &mut io); // 1 message should be send assert_eq!(1, io.queue.len()); @@ -1350,9 +1382,10 @@ mod tests { client.add_blocks(100, false); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let best_hash = client.chain_info().best_block_hash.clone(); let mut io = TestIo::new(&mut client, &mut queue, None); - let peer_count = sync.propagade_blocks(&mut io); + let peer_count = sync.propagade_blocks(&best_hash, &mut io); // 1 message should be send assert_eq!(1, io.queue.len()); @@ -1454,9 +1487,10 @@ mod tests { client.add_blocks(100, false); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let best_hash = client.chain_info().best_block_hash.clone(); let mut io = TestIo::new(&mut client, &mut queue, None); - sync.propagade_new_hashes(&mut io); + sync.propagade_new_hashes(&best_hash, &mut io); let data = &io.queue[0].data.clone(); let result = sync.on_peer_new_hashes(&mut io, 0, &UntrustedRlp::new(&data)); @@ -1471,9 +1505,10 @@ mod tests { client.add_blocks(100, false); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let best_hash = client.chain_info().best_block_hash.clone(); let mut io = TestIo::new(&mut client, &mut queue, None); - sync.propagade_blocks(&mut io); + sync.propagade_blocks(&best_hash, &mut io); let data = &io.queue[0].data.clone(); let result = sync.on_peer_new_block(&mut io, 0, &UntrustedRlp::new(&data)); diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index f560f4ca6..a5244f26a 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -109,7 +109,7 @@ fn status_empty() { #[test] fn status_packet() { let mut net = TestNet::new(2); - net.peer_mut(0).chain.add_blocks(1000, false); + net.peer_mut(0).chain.add_blocks(100, false); net.peer_mut(1).chain.add_blocks(1, false); net.start(); @@ -122,18 +122,29 @@ fn status_packet() { #[test] fn propagade_hashes() { - let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, false); - net.peer_mut(2).chain.add_blocks(1000, false); + let mut net = TestNet::new(6); + net.peer_mut(1).chain.add_blocks(10, false); net.sync(); net.peer_mut(0).chain.add_blocks(10, false); - net.sync_step_peer(0); + net.sync(); + net.trigger_block_verified(0); //first event just sets the marker + net.trigger_block_verified(0); - // 2 peers to sync - assert_eq!(2, net.peer(0).queue.len()); - // NEW_BLOCK_HASHES_PACKET - assert_eq!(0x01, net.peer(0).queue[0].packet_id); + // 5 peers to sync + assert_eq!(5, net.peer(0).queue.len()); + let mut hashes = 0; + let mut blocks = 0; + for i in 0..5 { + if net.peer(0).queue[i].packet_id == 0x1 { + hashes += 1; + } + if net.peer(0).queue[i].packet_id == 0x7 { + blocks += 1; + } + } + assert!(blocks > 0); + assert!(hashes > 0); } #[test] @@ -143,6 +154,7 @@ fn propagade_blocks() { net.sync(); net.peer_mut(0).chain.add_blocks(10, false); + net.trigger_block_verified(0); //first event just sets the marker net.trigger_block_verified(0); assert!(!net.peer(0).queue.is_empty()); diff --git a/util/src/network/host.rs b/util/src/network/host.rs index c1423dbb3..430850453 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -217,6 +217,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone s.send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| { warn!(target: "net", "Send error: {:?}", e); }); //TODO: don't copy vector data + try!(self.io.update_registration(peer)); }, _ => warn!(target: "net", "Send: Peer is not connected yet") } From 34b465a125621c8d5c1bdec0e122524894168ba2 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 12 Feb 2016 14:20:18 +0100 Subject: [PATCH 2/6] Check for peer registration --- sync/src/chain.rs | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 6717c0814..f82162b79 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -449,7 +449,7 @@ impl ChainSync { let header: BlockHeader = try!(header_rlp.as_val()); let mut unknown = false; { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + let peer = self.peers.get_mut(&peer_id).unwrap(); peer.latest = header.hash(); } // TODO: Decompose block and add to self.headers and self.bodies instead @@ -481,7 +481,7 @@ impl ChainSync { trace!(target: "sync", "New block unknown {:?}", h); //TODO: handle too many unknown blocks let difficulty: U256 = try!(r.val_at(1)); - let peer_difficulty = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").difficulty; + let peer_difficulty = self.peers.get_mut(&peer_id).unwrap().difficulty; if difficulty > peer_difficulty { trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); self.sync_peer(io, peer_id, true); @@ -492,7 +492,7 @@ impl ChainSync { /// Handles NewHashes packet. Initiates headers download for any unknown hashes. fn on_peer_new_hashes(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - if self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").asking != PeerAsking::Nothing { + if self.peers.get_mut(&peer_id).unwrap().asking != PeerAsking::Nothing { trace!(target: "sync", "Ignoring new hashes since we're already downloading."); return Ok(()); } @@ -515,7 +515,7 @@ impl ChainSync { BlockStatus::Unknown => { if d > max_height { trace!(target: "sync", "New unknown block hash {:?}", h); - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + let peer = self.peers.get_mut(&peer_id).unwrap(); peer.latest = h.clone(); max_height = d; } @@ -575,7 +575,7 @@ impl ChainSync { /// Find something to do for a peer. Called for a new peer or when a peer is done with it's task. fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) { let (peer_latest, peer_difficulty) = { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + let peer = self.peers.get_mut(&peer_id).unwrap(); if peer.asking != PeerAsking::Nothing { return; } @@ -595,7 +595,7 @@ impl ChainSync { self.state = SyncState::Blocks; } trace!(target: "sync", "Starting sync with better chain"); - self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").asking_hash = Some(peer_latest.clone()); + self.peers.get_mut(&peer_id).unwrap().asking_hash = Some(peer_latest.clone()); self.downloading_hashes.insert(peer_latest.clone()); self.request_headers_by_hash(io, peer_id, &peer_latest, 1, 0, false); } @@ -698,7 +698,7 @@ impl ChainSync { /// Clear all blocks/headers marked as being downloaded by a peer. fn clear_peer_download(&mut self, peer_id: PeerId) { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + let peer = self.peers.get_mut(&peer_id).unwrap(); if let Some(hash) = peer.asking_hash.take() { self.downloading_hashes.remove(&hash); } @@ -834,7 +834,7 @@ impl ChainSync { /// Reset peer status after request is complete. fn reset_peer_asking(&mut self, peer_id: PeerId, asking: PeerAsking) { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + let peer = self.peers.get_mut(&peer_id).unwrap(); if peer.asking != asking { warn!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking); } @@ -846,7 +846,7 @@ impl ChainSync { /// Generic request sender fn send_request(&mut self, sync: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) { { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + let peer = self.peers.get_mut(&peer_id).unwrap(); if peer.asking != PeerAsking::Nothing { warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); } @@ -1029,6 +1029,11 @@ impl ChainSync { /// Dispatch incoming requests and responses pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { let rlp = UntrustedRlp::new(data); + + if packet_id != STATUS_PACKET && !self.peers.contains_key(&peer) { + warn!(target:"sync", "Unexpected packet from unregistered peer: {}:{}", peer, io.peer_info(peer)); + return; + } let result = match packet_id { STATUS_PACKET => self.on_peer_status(io, peer, &rlp), TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp), @@ -1089,7 +1094,7 @@ impl ChainSync { let mut rlp_stream = RlpStream::new_list(route.blocks.len()); for block_hash in route.blocks { let mut hash_rlp = RlpStream::new_list(2); - let difficulty = chain.block_total_difficulty(BlockId::Hash(block_hash.clone())).expect("Mallformed block without a difficulty on the chain!"); + let difficulty = chain.block_total_difficulty(BlockId::Hash(block_hash.clone())).unwrap(); hash_rlp.append(&block_hash); hash_rlp.append(&difficulty); @@ -1106,7 +1111,7 @@ impl ChainSync { /// creates latest block rlp for the given client fn create_latest_block_rlp(chain: &BlockChainClient) -> Bytes { let mut rlp_stream = RlpStream::new_list(2); - rlp_stream.append_raw(&chain.block(BlockId::Hash(chain.chain_info().best_block_hash)).expect("Creating latest block when there is none"), 1); + rlp_stream.append_raw(&chain.block(BlockId::Hash(chain.chain_info().best_block_hash)).unwrap(), 1); rlp_stream.append(&chain.chain_info().total_difficulty); rlp_stream.out() } @@ -1149,7 +1154,7 @@ impl ChainSync { for peer_id in updated_peers { let rlp = ChainSync::create_latest_block_rlp(io.chain()); self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp); - self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").latest = local_best.clone(); + self.peers.get_mut(&peer_id).unwrap().latest = local_best.clone(); sent = sent + 1; } sent @@ -1160,10 +1165,10 @@ impl ChainSync { let updated_peers = self.get_lagging_peers(io); let mut sent = 0; for peer_id in updated_peers { - sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &self.peers.get(&peer_id).expect("ChainSync: unknown peer").latest, &local_best) { + sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &self.peers.get(&peer_id).unwrap().latest, &local_best) { Some(rlp) => { { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + let peer = self.peers.get_mut(&peer_id).unwrap(); peer.latest = local_best.clone(); } self.send_packet(io, peer_id, NEW_BLOCK_HASHES_PACKET, rlp); @@ -1276,9 +1281,9 @@ mod tests { // the length of two rlp-encoded receipts assert_eq!(597, rlp_result.unwrap().1.out().len()); - let mut sync = ChainSync::new(); + let mut sync = dummy_sync_with_peer(H256::new()); io.sender = Some(2usize); - sync.on_packet(&mut io, 1usize, super::GET_RECEIPTS_PACKET, &receipts_request); + sync.on_packet(&mut io, 0usize, super::GET_RECEIPTS_PACKET, &receipts_request); assert_eq!(1, io.queue.len()); } @@ -1304,9 +1309,9 @@ mod tests { // the length of one rlp-encoded hashe assert_eq!(34, rlp_result.unwrap().1.out().len()); - let mut sync = ChainSync::new(); + let mut sync = dummy_sync_with_peer(H256::new()); io.sender = Some(2usize); - sync.on_packet(&mut io, 1usize, super::GET_NODE_DATA_PACKET, &node_request); + sync.on_packet(&mut io, 0usize, super::GET_NODE_DATA_PACKET, &node_request); assert_eq!(1, io.queue.len()); } From fcd0dafbe43c6fee532c18b66c1c5203c14b4718 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 12 Feb 2016 15:48:26 +0100 Subject: [PATCH 3/6] Fixed random failing test --- sync/src/tests/chain.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index a5244f26a..5ee5df831 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -143,8 +143,7 @@ fn propagade_hashes() { blocks += 1; } } - assert!(blocks > 0); - assert!(hashes > 0); + assert!(blocks + hashes == 5); } #[test] From debf1ed9342f0cc0c00abe70d985881ce46231a2 Mon Sep 17 00:00:00 2001 From: arkpar Date: Sun, 14 Feb 2016 17:10:55 +0100 Subject: [PATCH 4/6] Propagate only one last hash for peers that are too far behind --- sync/src/chain.rs | 81 +++++++++++++++++++++++++++++------------------ 1 file changed, 51 insertions(+), 30 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index f82162b79..632d99749 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -155,7 +155,9 @@ struct PeerInfo { /// Peer network id network_id: U256, /// Peer best block hash - latest: H256, + latest_hash: H256, + /// Peer best block number if known + latest_number: Option, /// Peer total difficulty difficulty: U256, /// Type of data currenty being requested from peer. @@ -282,7 +284,8 @@ impl ChainSync { protocol_version: try!(r.val_at(0)), network_id: try!(r.val_at(1)), difficulty: try!(r.val_at(2)), - latest: try!(r.val_at(3)), + latest_hash: try!(r.val_at(3)), + latest_number: None, genesis: try!(r.val_at(4)), asking: PeerAsking::Nothing, asking_blocks: Vec::new(), @@ -290,7 +293,7 @@ impl ChainSync { ask_time: 0f64, }; - trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest, peer.genesis); + trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis); if self.peers.contains_key(&peer_id) { warn!("Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id)); @@ -450,7 +453,8 @@ impl ChainSync { let mut unknown = false; { let peer = self.peers.get_mut(&peer_id).unwrap(); - peer.latest = header.hash(); + peer.latest_hash = header.hash(); + peer.latest_number = Some(header.number()); } // TODO: Decompose block and add to self.headers and self.bodies instead if header.number == From::from(self.current_base_block() + 1) { @@ -516,7 +520,8 @@ impl ChainSync { if d > max_height { trace!(target: "sync", "New unknown block hash {:?}", h); let peer = self.peers.get_mut(&peer_id).unwrap(); - peer.latest = h.clone(); + peer.latest_hash = h.clone(); + peer.latest_number = Some(d); max_height = d; } }, @@ -583,7 +588,7 @@ impl ChainSync { trace!(target: "sync", "Waiting for block queue"); return; } - (peer.latest.clone(), peer.difficulty.clone()) + (peer.latest_hash.clone(), peer.difficulty.clone()) }; let td = io.chain().chain_info().pending_total_difficulty; @@ -1117,25 +1122,28 @@ impl ChainSync { } /// returns peer ids that have less blocks than our chain - fn get_lagging_peers(&self, io: &SyncIo) -> Vec { + fn get_lagging_peers(&mut self, io: &SyncIo) -> Vec<(PeerId, BlockNumber)> { let chain = io.chain(); let chain_info = chain.chain_info(); let latest_hash = chain_info.best_block_hash; let latest_number = chain_info.best_block_number; - self.peers.iter().filter(|&(_, peer_info)| - match io.chain().block_status(BlockId::Hash(peer_info.latest.clone())) { + self.peers.iter_mut().filter_map(|(&id, ref mut peer_info)| + match io.chain().block_status(BlockId::Hash(peer_info.latest_hash.clone())) { BlockStatus::InChain => { - let peer_number = HeaderView::new(&io.chain().block_header(BlockId::Hash(peer_info.latest.clone())).unwrap()).number(); - peer_info.latest != latest_hash && latest_number > peer_number + if peer_info.latest_number.is_none() { + peer_info.latest_number = Some(HeaderView::new(&io.chain().block_header(BlockId::Hash(peer_info.latest_hash.clone())).unwrap()).number()); + } + if peer_info.latest_hash != latest_hash && latest_number > peer_info.latest_number.unwrap() { + Some((id, peer_info.latest_number.unwrap())) + } else { None } }, - _ => false + _ => None }) - .map(|(peer_id, _)| peer_id) - .cloned().collect::>() + .collect::>() } /// propagades latest block to lagging peers - fn propagade_blocks(&mut self, local_best: &H256, io: &mut SyncIo) -> usize { + fn propagade_blocks(&mut self, local_best: &H256, best_number: BlockNumber, io: &mut SyncIo) -> usize { let updated_peers = { let lagging_peers = self.get_lagging_peers(io); @@ -1143,33 +1151,41 @@ impl ChainSync { let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32; let lucky_peers = match lagging_peers.len() { 0 ... MIN_PEERS_PROPAGATION => lagging_peers, - _ => lagging_peers.iter().filter(|_| ::rand::random::() < fraction).cloned().collect::>() + _ => lagging_peers.into_iter().filter(|_| ::rand::random::() < fraction).collect::>() }; // taking at max of MAX_PEERS_PROPAGATION - lucky_peers.iter().take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).cloned().collect::>() + lucky_peers.iter().map(|&(id, _)| id.clone()).take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).collect::>() }; let mut sent = 0; for peer_id in updated_peers { let rlp = ChainSync::create_latest_block_rlp(io.chain()); self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp); - self.peers.get_mut(&peer_id).unwrap().latest = local_best.clone(); + self.peers.get_mut(&peer_id).unwrap().latest_hash = local_best.clone(); + self.peers.get_mut(&peer_id).unwrap().latest_number = Some(best_number); sent = sent + 1; } sent } /// propagades new known hashes to all peers - fn propagade_new_hashes(&mut self, local_best: &H256, io: &mut SyncIo) -> usize { + fn propagade_new_hashes(&mut self, local_best: &H256, best_number: BlockNumber, io: &mut SyncIo) -> usize { let updated_peers = self.get_lagging_peers(io); let mut sent = 0; - for peer_id in updated_peers { - sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &self.peers.get(&peer_id).unwrap().latest, &local_best) { + let last_parent = HeaderView::new(&io.chain().block_header(BlockId::Hash(local_best.clone())).unwrap()).parent_hash(); + for (peer_id, peer_number) in updated_peers { + let mut peer_best = self.peers.get(&peer_id).unwrap().latest_hash.clone(); + if best_number - peer_number > MAX_PEERS_PROPAGATION as BlockNumber { + // If we think peer is too far behind just end one latest hash + peer_best = last_parent.clone(); + } + sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &peer_best, &local_best) { Some(rlp) => { { let peer = self.peers.get_mut(&peer_id).unwrap(); - peer.latest = local_best.clone(); + peer.latest_hash = local_best.clone(); + peer.latest_number = Some(best_number); } self.send_packet(io, peer_id, NEW_BLOCK_HASHES_PACKET, rlp); 1 @@ -1189,8 +1205,8 @@ impl ChainSync { pub fn chain_blocks_verified(&mut self, io: &mut SyncIo) { let chain = io.chain().chain_info(); if (((chain.best_block_number as i64) - (self.last_send_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { - let blocks = self.propagade_blocks(&chain.best_block_hash, io); - let hashes = self.propagade_new_hashes(&chain.best_block_hash, io); + let blocks = self.propagade_blocks(&chain.best_block_hash, chain.best_block_number, io); + let hashes = self.propagade_new_hashes(&chain.best_block_hash, chain.best_block_number, io); if blocks != 0 || hashes != 0 { trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); } @@ -1322,7 +1338,8 @@ mod tests { protocol_version: 0, genesis: H256::zero(), network_id: U256::zero(), - latest: peer_latest_hash, + latest_hash: peer_latest_hash, + latest_number: None, difficulty: U256::zero(), asking: PeerAsking::Nothing, asking_blocks: Vec::::new(), @@ -1337,7 +1354,7 @@ mod tests { let mut client = TestBlockChainClient::new(); client.add_blocks(100, false); let mut queue = VecDeque::new(); - let sync = dummy_sync_with_peer(client.block_hash_delta_minus(10)); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10)); let io = TestIo::new(&mut client, &mut queue, None); let lagging_peers = sync.get_lagging_peers(&io); @@ -1369,9 +1386,10 @@ mod tests { let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let best_hash = client.chain_info().best_block_hash.clone(); + let best_number = client.chain_info().best_block_number; let mut io = TestIo::new(&mut client, &mut queue, None); - let peer_count = sync.propagade_new_hashes(&best_hash, &mut io); + let peer_count = sync.propagade_new_hashes(&best_hash, best_number, &mut io); // 1 message should be send assert_eq!(1, io.queue.len()); @@ -1388,9 +1406,10 @@ mod tests { let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let best_hash = client.chain_info().best_block_hash.clone(); + let best_number = client.chain_info().best_block_number; let mut io = TestIo::new(&mut client, &mut queue, None); - let peer_count = sync.propagade_blocks(&best_hash, &mut io); + let peer_count = sync.propagade_blocks(&best_hash, best_number, &mut io); // 1 message should be send assert_eq!(1, io.queue.len()); @@ -1493,9 +1512,10 @@ mod tests { let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let best_hash = client.chain_info().best_block_hash.clone(); + let best_number = client.chain_info().best_block_number; let mut io = TestIo::new(&mut client, &mut queue, None); - sync.propagade_new_hashes(&best_hash, &mut io); + sync.propagade_new_hashes(&best_hash, best_number, &mut io); let data = &io.queue[0].data.clone(); let result = sync.on_peer_new_hashes(&mut io, 0, &UntrustedRlp::new(&data)); @@ -1511,9 +1531,10 @@ mod tests { let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let best_hash = client.chain_info().best_block_hash.clone(); + let best_number = client.chain_info().best_block_number; let mut io = TestIo::new(&mut client, &mut queue, None); - sync.propagade_blocks(&best_hash, &mut io); + sync.propagade_blocks(&best_hash, best_number, &mut io); let data = &io.queue[0].data.clone(); let result = sync.on_peer_new_block(&mut io, 0, &UntrustedRlp::new(&data)); From 8b0ec51c0fb546bd87fa536e4237c6e8bfa06dc1 Mon Sep 17 00:00:00 2001 From: arkpar Date: Sun, 14 Feb 2016 18:08:30 +0100 Subject: [PATCH 5/6] Update last imported number on new block --- sync/src/chain.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 632d99749..e78f488eb 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -466,6 +466,7 @@ impl ChainSync { trace!(target: "sync", "New block already queued {:?}", h); }, Ok(_) => { + self.last_imported_block = Some(header.number); trace!(target: "sync", "New block queued {:?}", h); }, Err(ImportError::UnknownParent) => { From f141e696711e7409955cf5cde216bd3b0d37212f Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 15 Feb 2016 13:34:58 +0100 Subject: [PATCH 6/6] Added test for restart on malformed block --- sync/src/tests/chain.rs | 11 +++++++++++ sync/src/tests/helpers.rs | 11 +++++++++++ 2 files changed, 22 insertions(+) diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index 5ee5df831..1dd9a1e78 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -160,3 +160,14 @@ fn propagade_blocks() { // NEW_BLOCK_PACKET assert_eq!(0x07, net.peer(0).queue[0].packet_id); } + +#[test] +fn restart_on_malformed_block() { + let mut net = TestNet::new(2); + net.peer_mut(1).chain.add_blocks(10, false); + net.peer_mut(1).chain.corrupt_block(6); + net.sync_steps(10); + + assert_eq!(net.peer(0).chain.chain_info().best_block_number, 4); +} + diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index d8cd5e54a..c561b65a3 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -71,6 +71,17 @@ impl TestBlockChainClient { } } + pub fn corrupt_block(&mut self, n: BlockNumber) { + let hash = self.block_hash(BlockId::Number(n)).unwrap(); + let mut header: BlockHeader = decode(&self.block_header(BlockId::Number(n)).unwrap()); + header.parent_hash = H256::new(); + let mut rlp = RlpStream::new_list(3); + rlp.append(&header); + rlp.append_raw(&rlp::NULL_RLP, 1); + rlp.append_raw(&rlp::NULL_RLP, 1); + self.blocks.write().unwrap().insert(hash, rlp.out()); + } + pub fn block_hash_delta_minus(&mut self, delta: usize) -> H256 { let blocks_read = self.numbers.read().unwrap(); let index = blocks_read.len() - delta;