diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 277be7fea..90e18d8e8 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -319,6 +319,7 @@ impl ChainSync { /// Remove peer from active peer set fn deactivate_peer(&mut self, io: &mut SyncIo, peer_id: PeerId) { + trace!(target: "sync", "Deactivating peer {}", peer_id); self.active_peers.remove(&peer_id); if self.active_peers.is_empty() { trace!(target: "sync", "No more active peers"); @@ -383,6 +384,7 @@ impl ChainSync { /// Called by peer once it has new block headers during sync fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { self.clear_peer_download(peer_id); + let expected_hash = self.peers.get(&peer_id).and_then(|p| p.asking_hash); let expected_asking = if self.state == SyncState::ChainHead { PeerAsking::Heads } else { PeerAsking::BlockHeaders }; if !self.reset_peer_asking(peer_id, expected_asking) { trace!(target: "sync", "Ignored unexpected headers"); @@ -408,9 +410,16 @@ impl ChainSync { let mut headers = Vec::new(); let mut hashes = Vec::new(); + let mut valid_response = item_count == 0; //empty response is valid for i in 0..item_count { let info: BlockHeader = try!(r.val_at(i)); let number = BlockNumber::from(info.number); + // Check if any of the headers matches the hash we requested + if !valid_response { + if let Some(expected) = expected_hash { + valid_response = expected == info.hash() + } + } if self.blocks.contains(&info.hash()) { trace!(target: "sync", "Skipping existing block header {} ({:?})", number, info.hash()); continue; @@ -441,6 +450,11 @@ impl ChainSync { } } + // Disable the peer for this syncing round if it gives invalid chain + if !valid_response { + trace!(target: "sync", "{} Disabled for invalid headers response", peer_id); + self.deactivate_peer(io, peer_id); + } match self.state { SyncState::ChainHead => { if headers.is_empty() { @@ -831,6 +845,9 @@ impl ChainSync { rlp.append(&skip); rlp.append(&if reverse {1u32} else {0u32}); self.send_request(sync, peer_id, asking, GET_BLOCK_HEADERS_PACKET, rlp.out()); + self.peers.get_mut(&peer_id) + .expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed") + .asking_hash = Some(h.clone()); } /// Request block bodies from a peer @@ -911,7 +928,7 @@ impl ChainSync { } /// Respond to GetBlockHeaders request - fn return_block_headers(io: &SyncIo, r: &UntrustedRlp) -> RlpResponseResult { + fn return_block_headers(io: &SyncIo, r: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult { // Packet layout: // [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ] let max_headers: usize = try!(r.val_at(1)); @@ -921,13 +938,25 @@ impl ChainSync { let number = if try!(r.at(0)).size() == 32 { // id is a hash let hash: H256 = try!(r.val_at(0)); - trace!(target: "sync", "-> GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", hash, max_headers, skip, reverse); + trace!(target: "sync", "{} -> GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", peer_id, hash, max_headers, skip, reverse); match io.chain().block_header(BlockID::Hash(hash)) { - Some(hdr) => From::from(HeaderView::new(&hdr).number()), + Some(hdr) => { + let number = From::from(HeaderView::new(&hdr).number()); + debug_assert_eq!(HeaderView::new(&hdr).sha3(), hash); + if max_headers == 1 || io.chain().block_hash(BlockID::Number(number)) != Some(hash) { + // Non canonical header or single header requested + // TODO: handle single-step reverse hashchains of non-canon hashes + trace!(target:"sync", "Returning single header: {:?}", hash); + let mut rlp = RlpStream::new_list(1); + rlp.append_raw(&hdr, 1); + return Ok(Some((BLOCK_HEADERS_PACKET, rlp))); + } + number + } None => return Ok(Some((BLOCK_HEADERS_PACKET, RlpStream::new_list(0)))) //no such header, return nothing } } else { - trace!(target: "sync", "-> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", try!(r.val_at::(0)), max_headers, skip, reverse); + trace!(target: "sync", "{} -> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", peer_id, try!(r.val_at::(0)), max_headers, skip, reverse); try!(r.val_at(0)) }; @@ -962,13 +991,13 @@ impl ChainSync { } /// Respond to GetBlockBodies request - fn return_block_bodies(io: &SyncIo, r: &UntrustedRlp) -> RlpResponseResult { + fn return_block_bodies(io: &SyncIo, r: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult { let mut count = r.item_count(); if count == 0 { debug!(target: "sync", "Empty GetBlockBodies request, ignoring."); return Ok(None); } - trace!(target: "sync", "-> GetBlockBodies: {} entries", count); + trace!(target: "sync", "{} -> GetBlockBodies: {} entries", peer_id, count); count = min(count, MAX_BODIES_TO_SEND); let mut added = 0usize; let mut data = Bytes::new(); @@ -985,8 +1014,9 @@ impl ChainSync { } /// Respond to GetNodeData request - fn return_node_data(io: &SyncIo, r: &UntrustedRlp) -> RlpResponseResult { + fn return_node_data(io: &SyncIo, r: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult { let mut count = r.item_count(); + trace!(target: "sync", "{} -> GetNodeData: {} entries", peer_id, count); if count == 0 { debug!(target: "sync", "Empty GetNodeData request, ignoring."); return Ok(None); @@ -1000,13 +1030,15 @@ impl ChainSync { added += 1; } } + trace!(target: "sync", "{} -> GetNodeData: return {} entries", peer_id, added); let mut rlp = RlpStream::new_list(added); rlp.append_raw(&data, added); Ok(Some((NODE_DATA_PACKET, rlp))) } - fn return_receipts(io: &SyncIo, rlp: &UntrustedRlp) -> RlpResponseResult { + fn return_receipts(io: &SyncIo, rlp: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult { let mut count = rlp.item_count(); + trace!(target: "sync", "{} -> GetReceipts: {} entries", peer_id, count); if count == 0 { debug!(target: "sync", "Empty GetReceipts request, ignoring."); return Ok(None); @@ -1028,11 +1060,11 @@ impl ChainSync { Ok(Some((RECEIPTS_PACKET, rlp_result))) } - fn return_rlp(&self, io: &mut SyncIo, rlp: &UntrustedRlp, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError> - where FRlp : Fn(&SyncIo, &UntrustedRlp) -> RlpResponseResult, + fn return_rlp(&self, io: &mut SyncIo, rlp: &UntrustedRlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError> + where FRlp : Fn(&SyncIo, &UntrustedRlp, PeerId) -> RlpResponseResult, FError : FnOnce(UtilError) -> String { - let response = rlp_func(io, rlp); + let response = rlp_func(io, rlp, peer); match response { Err(e) => Err(e), Ok(Some((packet_id, rlp_stream))) => { @@ -1060,19 +1092,19 @@ impl ChainSync { NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp), NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp), - GET_BLOCK_BODIES_PACKET => self.return_rlp(io, &rlp, + GET_BLOCK_BODIES_PACKET => self.return_rlp(io, &rlp, peer, ChainSync::return_block_bodies, |e| format!("Error sending block bodies: {:?}", e)), - GET_BLOCK_HEADERS_PACKET => self.return_rlp(io, &rlp, + GET_BLOCK_HEADERS_PACKET => self.return_rlp(io, &rlp, peer, ChainSync::return_block_headers, |e| format!("Error sending block headers: {:?}", e)), - GET_RECEIPTS_PACKET => self.return_rlp(io, &rlp, + GET_RECEIPTS_PACKET => self.return_rlp(io, &rlp, peer, ChainSync::return_receipts, |e| format!("Error sending receipts: {:?}", e)), - GET_NODE_DATA_PACKET => self.return_rlp(io, &rlp, + GET_NODE_DATA_PACKET => self.return_rlp(io, &rlp, peer, ChainSync::return_node_data, |e| format!("Error sending nodes: {:?}", e)), @@ -1090,7 +1122,7 @@ impl ChainSync { let tick = time::precise_time_s(); for (peer_id, peer) in &self.peers { if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC { - trace!(target:"sync", "Timeouted {}", peer_id); + trace!(target:"sync", "Timeout {}", peer_id); io.disconnect_peer(*peer_id); } } @@ -1116,10 +1148,11 @@ impl ChainSync { let mut rlp_stream = RlpStream::new_list(blocks.len()); for block_hash in blocks { let mut hash_rlp = RlpStream::new_list(2); - let difficulty = chain.block_total_difficulty(BlockID::Hash(block_hash.clone())).expect("Malformed block without a difficulty on the chain!"); + let number = HeaderView::new(&chain.block_header(BlockID::Hash(block_hash.clone())) + .expect("chain.tree_route and chain.find_uncles only return hahses of blocks that are in the blockchain. qed.")).number(); hash_rlp.append(&block_hash); - hash_rlp.append(&difficulty); - rlp_stream.append_raw(&hash_rlp.out(), 1); + hash_rlp.append(&number); + rlp_stream.append_raw(&hash_rlp.as_raw(), 1); } Some(rlp_stream.out()) } @@ -1171,6 +1204,7 @@ impl ChainSync { /// propagates latest block to lagging peers fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize { let lucky_peers = self.select_lagging_peers(chain_info, io); + trace!("Sending NewBlocks to {:?}", lucky_peers); let mut sent = 0; for (peer_id, _) in lucky_peers { let rlp = ChainSync::create_latest_block_rlp(io.chain()); @@ -1185,6 +1219,7 @@ impl ChainSync { /// propagates new known hashes to all peers fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize { let lucky_peers = self.select_lagging_peers(chain_info, io); + trace!("Sending NewHashes to {:?}", lucky_peers); let mut sent = 0; let last_parent = HeaderView::new(&io.chain().block_header(BlockID::Hash(chain_info.best_block_hash.clone())).unwrap()).parent_hash(); for (peer_id, peer_number) in lucky_peers { @@ -1338,7 +1373,7 @@ mod tests { let mut queue = VecDeque::new(); let io = TestIo::new(&mut client, &mut queue, None); - let result = ChainSync::return_receipts(&io, &UntrustedRlp::new(&[0xc0])); + let result = ChainSync::return_receipts(&io, &UntrustedRlp::new(&[0xc0]), 0); assert!(result.is_ok()); } @@ -1358,7 +1393,7 @@ mod tests { let receipts_request = receipt_list.out(); // it returns rlp ONLY for hashes started with "f" - let result = ChainSync::return_receipts(&io, &UntrustedRlp::new(&receipts_request.clone())); + let result = ChainSync::return_receipts(&io, &UntrustedRlp::new(&receipts_request.clone()), 0); assert!(result.is_ok()); let rlp_result = result.unwrap(); @@ -1406,33 +1441,33 @@ mod tests { let io = TestIo::new(&mut client, &mut queue, None); let unknown: H256 = H256::new(); - let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&unknown, 1, 0, false))); + let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&unknown, 1, 0, false)), 0); assert!(to_header_vec(result).is_empty()); - let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&unknown, 1, 0, true))); + let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&unknown, 1, 0, true)), 0); assert!(to_header_vec(result).is_empty()); - let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[2], 1, 0, true))); + let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[2], 1, 0, true)), 0); assert_eq!(to_header_vec(result), vec![headers[2].clone()]); - let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[2], 1, 0, false))); + let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[2], 1, 0, false)), 0); assert_eq!(to_header_vec(result), vec![headers[2].clone()]); - let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[50], 3, 5, false))); + let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[50], 3, 5, false)), 0); assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[56].clone(), headers[62].clone()]); - let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[50], 3, 5, true))); + let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[50], 3, 5, true)), 0); assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[44].clone(), headers[38].clone()]); - let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(2, 1, 0, true))); + let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(2, 1, 0, true)), 0); assert_eq!(to_header_vec(result), vec![headers[2].clone()]); - let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(2, 1, 0, false))); + let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(2, 1, 0, false)), 0); assert_eq!(to_header_vec(result), vec![headers[2].clone()]); - let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(50, 3, 5, false))); + let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(50, 3, 5, false)), 0); assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[56].clone(), headers[62].clone()]); - let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(50, 3, 5, true))); + let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(50, 3, 5, true)), 0); assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[44].clone(), headers[38].clone()]); } @@ -1450,7 +1485,7 @@ mod tests { let node_request = node_list.out(); // it returns rlp ONLY for hashes started with "f" - let result = ChainSync::return_node_data(&io, &UntrustedRlp::new(&node_request.clone())); + let result = ChainSync::return_node_data(&io, &UntrustedRlp::new(&node_request.clone()), 0); assert!(result.is_ok()); let rlp_result = result.unwrap(); diff --git a/util/src/network/connection.rs b/util/src/network/connection.rs index ade06b469..f4c4c2a8d 100644 --- a/util/src/network/connection.rs +++ b/util/src/network/connection.rs @@ -436,7 +436,7 @@ impl EncryptedConnection { /// Readable IO handler. Tracker receive status and returns decoded packet if avaialable. pub fn readable(&mut self, io: &IoContext) -> Result, UtilError> where Message: Send + Clone{ - io.clear_timer(self.connection.token).unwrap(); + try!(io.clear_timer(self.connection.token)); if let EncryptedConnectionState::Header = self.read_state { if let Some(data) = try!(self.connection.readable()) { try!(self.read_header(&data)); diff --git a/util/src/network/host.rs b/util/src/network/host.rs index e5853b8db..abace1983 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -683,6 +683,7 @@ impl Host where Message: Send + Sync + Clone { Some(_) => packet_data.push((protocol, packet_id, data)), } }, + Ok(SessionData::Continue) => (), Ok(SessionData::None) => break, } } diff --git a/util/src/network/session.rs b/util/src/network/session.rs index d5fd33813..c19dfbcf8 100644 --- a/util/src/network/session.rs +++ b/util/src/network/session.rs @@ -68,6 +68,8 @@ pub enum SessionData { /// Zero based packet ID packet_id: u8, }, + /// Session has more data to be read + Continue, } /// Shared session information @@ -329,16 +331,19 @@ impl Session { PACKET_DISCONNECT => { let rlp = UntrustedRlp::new(&packet.data[1..]); let reason: u8 = try!(rlp.val_at(0)); + if self.had_hello { + debug!("Disconnected: {}: {:?}", self.token(), DisconnectReason::from_u8(reason)); + } Err(From::from(NetworkError::Disconnect(DisconnectReason::from_u8(reason)))) } PACKET_PING => { try!(self.send_pong(io)); - Ok(SessionData::None) + Ok(SessionData::Continue) }, PACKET_PONG => { self.pong_time_ns = Some(time::precise_time_ns()); self.info.ping_ms = Some((self.pong_time_ns.unwrap() - self.ping_time_ns) / 1000_000); - Ok(SessionData::None) + Ok(SessionData::Continue) }, PACKET_GET_PEERS => Ok(SessionData::None), //TODO; PACKET_PEERS => Ok(SessionData::None), @@ -348,7 +353,7 @@ impl Session { i += 1; if i == self.info.capabilities.len() { debug!(target: "network", "Unknown packet: {:?}", packet_id); - return Ok(SessionData::None) + return Ok(SessionData::Continue) } } @@ -359,7 +364,7 @@ impl Session { }, _ => { debug!(target: "network", "Unknown packet: {:?}", packet_id); - Ok(SessionData::None) + Ok(SessionData::Continue) } } }