Fixed NewHashes response

This commit is contained in:
arkpar 2016-06-13 18:49:40 +02:00
parent 8c7bcdafdb
commit 1dac2e3b23

View File

@ -915,7 +915,7 @@ impl ChainSync {
} }
/// Respond to GetBlockHeaders request /// Respond to GetBlockHeaders request
fn return_block_headers(io: &SyncIo, r: &UntrustedRlp) -> RlpResponseResult { fn return_block_headers(io: &SyncIo, r: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult {
// Packet layout: // Packet layout:
// [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ] // [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ]
let max_headers: usize = try!(r.val_at(1)); let max_headers: usize = try!(r.val_at(1));
@ -925,12 +925,14 @@ impl ChainSync {
let number = if try!(r.at(0)).size() == 32 { let number = if try!(r.at(0)).size() == 32 {
// id is a hash // id is a hash
let hash: H256 = try!(r.val_at(0)); let hash: H256 = try!(r.val_at(0));
trace!(target: "sync", "-> GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", hash, max_headers, skip, reverse); trace!(target: "sync", "{} -> GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", peer_id, hash, max_headers, skip, reverse);
match io.chain().block_header(BlockID::Hash(hash)) { match io.chain().block_header(BlockID::Hash(hash)) {
Some(hdr) => { Some(hdr) => {
let number = From::from(HeaderView::new(&hdr).number()); let number = From::from(HeaderView::new(&hdr).number());
if io.chain().block_hash(BlockID::Number(number)) != Some(hash) { assert_eq!(HeaderView::new(&hdr).sha3(), hash);
// Non canonical header requested, return just a single header if max_headers == 1 || io.chain().block_hash(BlockID::Number(number)) != Some(hash) {
// Non canonical header or single header requested
trace!("Returning single header: {:?}", hash);
let mut rlp = RlpStream::new_list(1); let mut rlp = RlpStream::new_list(1);
rlp.append_raw(&hdr, 1); rlp.append_raw(&hdr, 1);
return Ok(Some((BLOCK_HEADERS_PACKET, rlp))); return Ok(Some((BLOCK_HEADERS_PACKET, rlp)));
@ -940,7 +942,7 @@ impl ChainSync {
None => return Ok(Some((BLOCK_HEADERS_PACKET, RlpStream::new_list(0)))) //no such header, return nothing None => return Ok(Some((BLOCK_HEADERS_PACKET, RlpStream::new_list(0)))) //no such header, return nothing
} }
} else { } else {
trace!(target: "sync", "-> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", try!(r.val_at::<BlockNumber>(0)), max_headers, skip, reverse); trace!(target: "sync", "{} -> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", peer_id, try!(r.val_at::<BlockNumber>(0)), max_headers, skip, reverse);
try!(r.val_at(0)) try!(r.val_at(0))
}; };
@ -975,13 +977,13 @@ impl ChainSync {
} }
/// Respond to GetBlockBodies request /// Respond to GetBlockBodies request
fn return_block_bodies(io: &SyncIo, r: &UntrustedRlp) -> RlpResponseResult { fn return_block_bodies(io: &SyncIo, r: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult {
let mut count = r.item_count(); let mut count = r.item_count();
if count == 0 { if count == 0 {
debug!(target: "sync", "Empty GetBlockBodies request, ignoring."); debug!(target: "sync", "Empty GetBlockBodies request, ignoring.");
return Ok(None); return Ok(None);
} }
trace!(target: "sync", "-> GetBlockBodies: {} entries", count); trace!(target: "sync", "{} -> GetBlockBodies: {} entries", peer_id, count);
count = min(count, MAX_BODIES_TO_SEND); count = min(count, MAX_BODIES_TO_SEND);
let mut added = 0usize; let mut added = 0usize;
let mut data = Bytes::new(); let mut data = Bytes::new();
@ -998,8 +1000,9 @@ impl ChainSync {
} }
/// Respond to GetNodeData request /// Respond to GetNodeData request
fn return_node_data(io: &SyncIo, r: &UntrustedRlp) -> RlpResponseResult { fn return_node_data(io: &SyncIo, r: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult {
let mut count = r.item_count(); let mut count = r.item_count();
trace!(target: "sync", "{} -> GetNodeData: {} entries", peer_id, count);
if count == 0 { if count == 0 {
debug!(target: "sync", "Empty GetNodeData request, ignoring."); debug!(target: "sync", "Empty GetNodeData request, ignoring.");
return Ok(None); return Ok(None);
@ -1013,13 +1016,15 @@ impl ChainSync {
added += 1; added += 1;
} }
} }
trace!(target: "sync", "{} -> GetNodeData: return {} entries", peer_id, added);
let mut rlp = RlpStream::new_list(added); let mut rlp = RlpStream::new_list(added);
rlp.append_raw(&data, added); rlp.append_raw(&data, added);
Ok(Some((NODE_DATA_PACKET, rlp))) Ok(Some((NODE_DATA_PACKET, rlp)))
} }
fn return_receipts(io: &SyncIo, rlp: &UntrustedRlp) -> RlpResponseResult { fn return_receipts(io: &SyncIo, rlp: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult {
let mut count = rlp.item_count(); let mut count = rlp.item_count();
trace!(target: "sync", "{} -> GetReceipts: {} entries", peer_id, count);
if count == 0 { if count == 0 {
debug!(target: "sync", "Empty GetReceipts request, ignoring."); debug!(target: "sync", "Empty GetReceipts request, ignoring.");
return Ok(None); return Ok(None);
@ -1041,11 +1046,11 @@ impl ChainSync {
Ok(Some((RECEIPTS_PACKET, rlp_result))) Ok(Some((RECEIPTS_PACKET, rlp_result)))
} }
fn return_rlp<FRlp, FError>(&self, io: &mut SyncIo, rlp: &UntrustedRlp, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError> fn return_rlp<FRlp, FError>(&self, io: &mut SyncIo, rlp: &UntrustedRlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError>
where FRlp : Fn(&SyncIo, &UntrustedRlp) -> RlpResponseResult, where FRlp : Fn(&SyncIo, &UntrustedRlp, PeerId) -> RlpResponseResult,
FError : FnOnce(UtilError) -> String FError : FnOnce(UtilError) -> String
{ {
let response = rlp_func(io, rlp); let response = rlp_func(io, rlp, peer);
match response { match response {
Err(e) => Err(e), Err(e) => Err(e),
Ok(Some((packet_id, rlp_stream))) => { Ok(Some((packet_id, rlp_stream))) => {
@ -1073,19 +1078,19 @@ impl ChainSync {
NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp), NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp),
NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp), NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp),
GET_BLOCK_BODIES_PACKET => self.return_rlp(io, &rlp, GET_BLOCK_BODIES_PACKET => self.return_rlp(io, &rlp, peer,
ChainSync::return_block_bodies, ChainSync::return_block_bodies,
|e| format!("Error sending block bodies: {:?}", e)), |e| format!("Error sending block bodies: {:?}", e)),
GET_BLOCK_HEADERS_PACKET => self.return_rlp(io, &rlp, GET_BLOCK_HEADERS_PACKET => self.return_rlp(io, &rlp, peer,
ChainSync::return_block_headers, ChainSync::return_block_headers,
|e| format!("Error sending block headers: {:?}", e)), |e| format!("Error sending block headers: {:?}", e)),
GET_RECEIPTS_PACKET => self.return_rlp(io, &rlp, GET_RECEIPTS_PACKET => self.return_rlp(io, &rlp, peer,
ChainSync::return_receipts, ChainSync::return_receipts,
|e| format!("Error sending receipts: {:?}", e)), |e| format!("Error sending receipts: {:?}", e)),
GET_NODE_DATA_PACKET => self.return_rlp(io, &rlp, GET_NODE_DATA_PACKET => self.return_rlp(io, &rlp, peer,
ChainSync::return_node_data, ChainSync::return_node_data,
|e| format!("Error sending nodes: {:?}", e)), |e| format!("Error sending nodes: {:?}", e)),
@ -1103,7 +1108,7 @@ impl ChainSync {
let tick = time::precise_time_s(); let tick = time::precise_time_s();
for (peer_id, peer) in &self.peers { for (peer_id, peer) in &self.peers {
if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC { if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC {
trace!(target:"sync", "Timeouted {}", peer_id); trace!(target:"sync", "Timeout {}", peer_id);
io.disconnect_peer(*peer_id); io.disconnect_peer(*peer_id);
} }
} }
@ -1129,10 +1134,10 @@ impl ChainSync {
let mut rlp_stream = RlpStream::new_list(blocks.len()); let mut rlp_stream = RlpStream::new_list(blocks.len());
for block_hash in blocks { for block_hash in blocks {
let mut hash_rlp = RlpStream::new_list(2); let mut hash_rlp = RlpStream::new_list(2);
let difficulty = chain.block_total_difficulty(BlockID::Hash(block_hash.clone())).expect("Malformed block without a difficulty on the chain!"); let number = HeaderView::new(&chain.block_header(BlockID::Hash(block_hash.clone())).expect("Malformed block without a header on the chain!")).number();
hash_rlp.append(&block_hash); hash_rlp.append(&block_hash);
hash_rlp.append(&difficulty); hash_rlp.append(&number);
rlp_stream.append_raw(&hash_rlp.out(), 1); rlp_stream.append_raw(&hash_rlp.as_raw(), 1);
} }
Some(rlp_stream.out()) Some(rlp_stream.out())
} }
@ -1184,6 +1189,7 @@ impl ChainSync {
/// propagates latest block to lagging peers /// propagates latest block to lagging peers
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize { fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize {
let lucky_peers = self.select_lagging_peers(chain_info, io); let lucky_peers = self.select_lagging_peers(chain_info, io);
trace!("Sending NewBlocks to {:?}", lucky_peers);
let mut sent = 0; let mut sent = 0;
for (peer_id, _) in lucky_peers { for (peer_id, _) in lucky_peers {
let rlp = ChainSync::create_latest_block_rlp(io.chain()); let rlp = ChainSync::create_latest_block_rlp(io.chain());
@ -1198,6 +1204,7 @@ impl ChainSync {
/// propagates new known hashes to all peers /// propagates new known hashes to all peers
fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize { fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize {
let lucky_peers = self.select_lagging_peers(chain_info, io); let lucky_peers = self.select_lagging_peers(chain_info, io);
trace!("Sending NewHashes to {:?}", lucky_peers);
let mut sent = 0; let mut sent = 0;
let last_parent = HeaderView::new(&io.chain().block_header(BlockID::Hash(chain_info.best_block_hash.clone())).unwrap()).parent_hash(); let last_parent = HeaderView::new(&io.chain().block_header(BlockID::Hash(chain_info.best_block_hash.clone())).unwrap()).parent_hash();
for (peer_id, peer_number) in lucky_peers { for (peer_id, peer_number) in lucky_peers {
@ -1351,7 +1358,7 @@ mod tests {
let mut queue = VecDeque::new(); let mut queue = VecDeque::new();
let io = TestIo::new(&mut client, &mut queue, None); let io = TestIo::new(&mut client, &mut queue, None);
let result = ChainSync::return_receipts(&io, &UntrustedRlp::new(&[0xc0])); let result = ChainSync::return_receipts(&io, &UntrustedRlp::new(&[0xc0]), 0);
assert!(result.is_ok()); assert!(result.is_ok());
} }
@ -1371,7 +1378,7 @@ mod tests {
let receipts_request = receipt_list.out(); let receipts_request = receipt_list.out();
// it returns rlp ONLY for hashes started with "f" // it returns rlp ONLY for hashes started with "f"
let result = ChainSync::return_receipts(&io, &UntrustedRlp::new(&receipts_request.clone())); let result = ChainSync::return_receipts(&io, &UntrustedRlp::new(&receipts_request.clone()), 0);
assert!(result.is_ok()); assert!(result.is_ok());
let rlp_result = result.unwrap(); let rlp_result = result.unwrap();
@ -1419,33 +1426,33 @@ mod tests {
let io = TestIo::new(&mut client, &mut queue, None); let io = TestIo::new(&mut client, &mut queue, None);
let unknown: H256 = H256::new(); let unknown: H256 = H256::new();
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&unknown, 1, 0, false))); let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&unknown, 1, 0, false)), 0);
assert!(to_header_vec(result).is_empty()); assert!(to_header_vec(result).is_empty());
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&unknown, 1, 0, true))); let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&unknown, 1, 0, true)), 0);
assert!(to_header_vec(result).is_empty()); assert!(to_header_vec(result).is_empty());
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[2], 1, 0, true))); let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[2], 1, 0, true)), 0);
assert_eq!(to_header_vec(result), vec![headers[2].clone()]); assert_eq!(to_header_vec(result), vec![headers[2].clone()]);
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[2], 1, 0, false))); let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[2], 1, 0, false)), 0);
assert_eq!(to_header_vec(result), vec![headers[2].clone()]); assert_eq!(to_header_vec(result), vec![headers[2].clone()]);
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[50], 3, 5, false))); let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[50], 3, 5, false)), 0);
assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[56].clone(), headers[62].clone()]); assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[56].clone(), headers[62].clone()]);
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[50], 3, 5, true))); let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[50], 3, 5, true)), 0);
assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[44].clone(), headers[38].clone()]); assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[44].clone(), headers[38].clone()]);
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(2, 1, 0, true))); let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(2, 1, 0, true)), 0);
assert_eq!(to_header_vec(result), vec![headers[2].clone()]); assert_eq!(to_header_vec(result), vec![headers[2].clone()]);
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(2, 1, 0, false))); let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(2, 1, 0, false)), 0);
assert_eq!(to_header_vec(result), vec![headers[2].clone()]); assert_eq!(to_header_vec(result), vec![headers[2].clone()]);
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(50, 3, 5, false))); let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(50, 3, 5, false)), 0);
assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[56].clone(), headers[62].clone()]); assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[56].clone(), headers[62].clone()]);
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(50, 3, 5, true))); let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(50, 3, 5, true)), 0);
assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[44].clone(), headers[38].clone()]); assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[44].clone(), headers[38].clone()]);
} }
@ -1463,7 +1470,7 @@ mod tests {
let node_request = node_list.out(); let node_request = node_list.out();
// it returns rlp ONLY for hashes started with "f" // it returns rlp ONLY for hashes started with "f"
let result = ChainSync::return_node_data(&io, &UntrustedRlp::new(&node_request.clone())); let result = ChainSync::return_node_data(&io, &UntrustedRlp::new(&node_request.clone()), 0);
assert!(result.is_ok()); assert!(result.is_ok());
let rlp_result = result.unwrap(); let rlp_result = result.unwrap();