commit
a03da30510
@ -319,6 +319,7 @@ impl ChainSync {
|
||||
|
||||
/// Remove peer from active peer set
|
||||
fn deactivate_peer(&mut self, io: &mut SyncIo, peer_id: PeerId) {
|
||||
trace!(target: "sync", "Deactivating peer {}", peer_id);
|
||||
self.active_peers.remove(&peer_id);
|
||||
if self.active_peers.is_empty() {
|
||||
trace!(target: "sync", "No more active peers");
|
||||
@ -383,6 +384,7 @@ impl ChainSync {
|
||||
/// Called by peer once it has new block headers during sync
|
||||
fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
||||
self.clear_peer_download(peer_id);
|
||||
let expected_hash = self.peers.get(&peer_id).and_then(|p| p.asking_hash);
|
||||
let expected_asking = if self.state == SyncState::ChainHead { PeerAsking::Heads } else { PeerAsking::BlockHeaders };
|
||||
if !self.reset_peer_asking(peer_id, expected_asking) {
|
||||
trace!(target: "sync", "Ignored unexpected headers");
|
||||
@ -408,9 +410,16 @@ impl ChainSync {
|
||||
|
||||
let mut headers = Vec::new();
|
||||
let mut hashes = Vec::new();
|
||||
let mut valid_response = item_count == 0; //empty response is valid
|
||||
for i in 0..item_count {
|
||||
let info: BlockHeader = try!(r.val_at(i));
|
||||
let number = BlockNumber::from(info.number);
|
||||
// Check if any of the headers matches the hash we requested
|
||||
if !valid_response {
|
||||
if let Some(expected) = expected_hash {
|
||||
valid_response = expected == info.hash()
|
||||
}
|
||||
}
|
||||
if self.blocks.contains(&info.hash()) {
|
||||
trace!(target: "sync", "Skipping existing block header {} ({:?})", number, info.hash());
|
||||
continue;
|
||||
@ -441,6 +450,11 @@ impl ChainSync {
|
||||
}
|
||||
}
|
||||
|
||||
// Disable the peer for this syncing round if it gives invalid chain
|
||||
if !valid_response {
|
||||
trace!(target: "sync", "{} Disabled for invalid headers response", peer_id);
|
||||
self.deactivate_peer(io, peer_id);
|
||||
}
|
||||
match self.state {
|
||||
SyncState::ChainHead => {
|
||||
if headers.is_empty() {
|
||||
@ -831,6 +845,9 @@ impl ChainSync {
|
||||
rlp.append(&skip);
|
||||
rlp.append(&if reverse {1u32} else {0u32});
|
||||
self.send_request(sync, peer_id, asking, GET_BLOCK_HEADERS_PACKET, rlp.out());
|
||||
self.peers.get_mut(&peer_id)
|
||||
.expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed")
|
||||
.asking_hash = Some(h.clone());
|
||||
}
|
||||
|
||||
/// Request block bodies from a peer
|
||||
@ -911,7 +928,7 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// Respond to GetBlockHeaders request
|
||||
fn return_block_headers(io: &SyncIo, r: &UntrustedRlp) -> RlpResponseResult {
|
||||
fn return_block_headers(io: &SyncIo, r: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
// Packet layout:
|
||||
// [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ]
|
||||
let max_headers: usize = try!(r.val_at(1));
|
||||
@ -921,13 +938,25 @@ impl ChainSync {
|
||||
let number = if try!(r.at(0)).size() == 32 {
|
||||
// id is a hash
|
||||
let hash: H256 = try!(r.val_at(0));
|
||||
trace!(target: "sync", "-> GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", hash, max_headers, skip, reverse);
|
||||
trace!(target: "sync", "{} -> GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", peer_id, hash, max_headers, skip, reverse);
|
||||
match io.chain().block_header(BlockID::Hash(hash)) {
|
||||
Some(hdr) => From::from(HeaderView::new(&hdr).number()),
|
||||
Some(hdr) => {
|
||||
let number = From::from(HeaderView::new(&hdr).number());
|
||||
debug_assert_eq!(HeaderView::new(&hdr).sha3(), hash);
|
||||
if max_headers == 1 || io.chain().block_hash(BlockID::Number(number)) != Some(hash) {
|
||||
// Non canonical header or single header requested
|
||||
// TODO: handle single-step reverse hashchains of non-canon hashes
|
||||
trace!(target:"sync", "Returning single header: {:?}", hash);
|
||||
let mut rlp = RlpStream::new_list(1);
|
||||
rlp.append_raw(&hdr, 1);
|
||||
return Ok(Some((BLOCK_HEADERS_PACKET, rlp)));
|
||||
}
|
||||
number
|
||||
}
|
||||
None => return Ok(Some((BLOCK_HEADERS_PACKET, RlpStream::new_list(0)))) //no such header, return nothing
|
||||
}
|
||||
} else {
|
||||
trace!(target: "sync", "-> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", try!(r.val_at::<BlockNumber>(0)), max_headers, skip, reverse);
|
||||
trace!(target: "sync", "{} -> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", peer_id, try!(r.val_at::<BlockNumber>(0)), max_headers, skip, reverse);
|
||||
try!(r.val_at(0))
|
||||
};
|
||||
|
||||
@ -962,13 +991,13 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// Respond to GetBlockBodies request
|
||||
fn return_block_bodies(io: &SyncIo, r: &UntrustedRlp) -> RlpResponseResult {
|
||||
fn return_block_bodies(io: &SyncIo, r: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
let mut count = r.item_count();
|
||||
if count == 0 {
|
||||
debug!(target: "sync", "Empty GetBlockBodies request, ignoring.");
|
||||
return Ok(None);
|
||||
}
|
||||
trace!(target: "sync", "-> GetBlockBodies: {} entries", count);
|
||||
trace!(target: "sync", "{} -> GetBlockBodies: {} entries", peer_id, count);
|
||||
count = min(count, MAX_BODIES_TO_SEND);
|
||||
let mut added = 0usize;
|
||||
let mut data = Bytes::new();
|
||||
@ -985,8 +1014,9 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// Respond to GetNodeData request
|
||||
fn return_node_data(io: &SyncIo, r: &UntrustedRlp) -> RlpResponseResult {
|
||||
fn return_node_data(io: &SyncIo, r: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
let mut count = r.item_count();
|
||||
trace!(target: "sync", "{} -> GetNodeData: {} entries", peer_id, count);
|
||||
if count == 0 {
|
||||
debug!(target: "sync", "Empty GetNodeData request, ignoring.");
|
||||
return Ok(None);
|
||||
@ -1000,13 +1030,15 @@ impl ChainSync {
|
||||
added += 1;
|
||||
}
|
||||
}
|
||||
trace!(target: "sync", "{} -> GetNodeData: return {} entries", peer_id, added);
|
||||
let mut rlp = RlpStream::new_list(added);
|
||||
rlp.append_raw(&data, added);
|
||||
Ok(Some((NODE_DATA_PACKET, rlp)))
|
||||
}
|
||||
|
||||
fn return_receipts(io: &SyncIo, rlp: &UntrustedRlp) -> RlpResponseResult {
|
||||
fn return_receipts(io: &SyncIo, rlp: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
let mut count = rlp.item_count();
|
||||
trace!(target: "sync", "{} -> GetReceipts: {} entries", peer_id, count);
|
||||
if count == 0 {
|
||||
debug!(target: "sync", "Empty GetReceipts request, ignoring.");
|
||||
return Ok(None);
|
||||
@ -1028,11 +1060,11 @@ impl ChainSync {
|
||||
Ok(Some((RECEIPTS_PACKET, rlp_result)))
|
||||
}
|
||||
|
||||
fn return_rlp<FRlp, FError>(&self, io: &mut SyncIo, rlp: &UntrustedRlp, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError>
|
||||
where FRlp : Fn(&SyncIo, &UntrustedRlp) -> RlpResponseResult,
|
||||
fn return_rlp<FRlp, FError>(&self, io: &mut SyncIo, rlp: &UntrustedRlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError>
|
||||
where FRlp : Fn(&SyncIo, &UntrustedRlp, PeerId) -> RlpResponseResult,
|
||||
FError : FnOnce(UtilError) -> String
|
||||
{
|
||||
let response = rlp_func(io, rlp);
|
||||
let response = rlp_func(io, rlp, peer);
|
||||
match response {
|
||||
Err(e) => Err(e),
|
||||
Ok(Some((packet_id, rlp_stream))) => {
|
||||
@ -1060,19 +1092,19 @@ impl ChainSync {
|
||||
NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp),
|
||||
NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp),
|
||||
|
||||
GET_BLOCK_BODIES_PACKET => self.return_rlp(io, &rlp,
|
||||
GET_BLOCK_BODIES_PACKET => self.return_rlp(io, &rlp, peer,
|
||||
ChainSync::return_block_bodies,
|
||||
|e| format!("Error sending block bodies: {:?}", e)),
|
||||
|
||||
GET_BLOCK_HEADERS_PACKET => self.return_rlp(io, &rlp,
|
||||
GET_BLOCK_HEADERS_PACKET => self.return_rlp(io, &rlp, peer,
|
||||
ChainSync::return_block_headers,
|
||||
|e| format!("Error sending block headers: {:?}", e)),
|
||||
|
||||
GET_RECEIPTS_PACKET => self.return_rlp(io, &rlp,
|
||||
GET_RECEIPTS_PACKET => self.return_rlp(io, &rlp, peer,
|
||||
ChainSync::return_receipts,
|
||||
|e| format!("Error sending receipts: {:?}", e)),
|
||||
|
||||
GET_NODE_DATA_PACKET => self.return_rlp(io, &rlp,
|
||||
GET_NODE_DATA_PACKET => self.return_rlp(io, &rlp, peer,
|
||||
ChainSync::return_node_data,
|
||||
|e| format!("Error sending nodes: {:?}", e)),
|
||||
|
||||
@ -1090,7 +1122,7 @@ impl ChainSync {
|
||||
let tick = time::precise_time_s();
|
||||
for (peer_id, peer) in &self.peers {
|
||||
if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC {
|
||||
trace!(target:"sync", "Timeouted {}", peer_id);
|
||||
trace!(target:"sync", "Timeout {}", peer_id);
|
||||
io.disconnect_peer(*peer_id);
|
||||
}
|
||||
}
|
||||
@ -1116,10 +1148,11 @@ impl ChainSync {
|
||||
let mut rlp_stream = RlpStream::new_list(blocks.len());
|
||||
for block_hash in blocks {
|
||||
let mut hash_rlp = RlpStream::new_list(2);
|
||||
let difficulty = chain.block_total_difficulty(BlockID::Hash(block_hash.clone())).expect("Malformed block without a difficulty on the chain!");
|
||||
let number = HeaderView::new(&chain.block_header(BlockID::Hash(block_hash.clone()))
|
||||
.expect("chain.tree_route and chain.find_uncles only return hahses of blocks that are in the blockchain. qed.")).number();
|
||||
hash_rlp.append(&block_hash);
|
||||
hash_rlp.append(&difficulty);
|
||||
rlp_stream.append_raw(&hash_rlp.out(), 1);
|
||||
hash_rlp.append(&number);
|
||||
rlp_stream.append_raw(&hash_rlp.as_raw(), 1);
|
||||
}
|
||||
Some(rlp_stream.out())
|
||||
}
|
||||
@ -1171,6 +1204,7 @@ impl ChainSync {
|
||||
/// propagates latest block to lagging peers
|
||||
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize {
|
||||
let lucky_peers = self.select_lagging_peers(chain_info, io);
|
||||
trace!("Sending NewBlocks to {:?}", lucky_peers);
|
||||
let mut sent = 0;
|
||||
for (peer_id, _) in lucky_peers {
|
||||
let rlp = ChainSync::create_latest_block_rlp(io.chain());
|
||||
@ -1185,6 +1219,7 @@ impl ChainSync {
|
||||
/// propagates new known hashes to all peers
|
||||
fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize {
|
||||
let lucky_peers = self.select_lagging_peers(chain_info, io);
|
||||
trace!("Sending NewHashes to {:?}", lucky_peers);
|
||||
let mut sent = 0;
|
||||
let last_parent = HeaderView::new(&io.chain().block_header(BlockID::Hash(chain_info.best_block_hash.clone())).unwrap()).parent_hash();
|
||||
for (peer_id, peer_number) in lucky_peers {
|
||||
@ -1338,7 +1373,7 @@ mod tests {
|
||||
let mut queue = VecDeque::new();
|
||||
let io = TestIo::new(&mut client, &mut queue, None);
|
||||
|
||||
let result = ChainSync::return_receipts(&io, &UntrustedRlp::new(&[0xc0]));
|
||||
let result = ChainSync::return_receipts(&io, &UntrustedRlp::new(&[0xc0]), 0);
|
||||
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
@ -1358,7 +1393,7 @@ mod tests {
|
||||
|
||||
let receipts_request = receipt_list.out();
|
||||
// it returns rlp ONLY for hashes started with "f"
|
||||
let result = ChainSync::return_receipts(&io, &UntrustedRlp::new(&receipts_request.clone()));
|
||||
let result = ChainSync::return_receipts(&io, &UntrustedRlp::new(&receipts_request.clone()), 0);
|
||||
|
||||
assert!(result.is_ok());
|
||||
let rlp_result = result.unwrap();
|
||||
@ -1406,33 +1441,33 @@ mod tests {
|
||||
let io = TestIo::new(&mut client, &mut queue, None);
|
||||
|
||||
let unknown: H256 = H256::new();
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&unknown, 1, 0, false)));
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&unknown, 1, 0, false)), 0);
|
||||
assert!(to_header_vec(result).is_empty());
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&unknown, 1, 0, true)));
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&unknown, 1, 0, true)), 0);
|
||||
assert!(to_header_vec(result).is_empty());
|
||||
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[2], 1, 0, true)));
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[2], 1, 0, true)), 0);
|
||||
assert_eq!(to_header_vec(result), vec![headers[2].clone()]);
|
||||
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[2], 1, 0, false)));
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[2], 1, 0, false)), 0);
|
||||
assert_eq!(to_header_vec(result), vec![headers[2].clone()]);
|
||||
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[50], 3, 5, false)));
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[50], 3, 5, false)), 0);
|
||||
assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[56].clone(), headers[62].clone()]);
|
||||
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[50], 3, 5, true)));
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[50], 3, 5, true)), 0);
|
||||
assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[44].clone(), headers[38].clone()]);
|
||||
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(2, 1, 0, true)));
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(2, 1, 0, true)), 0);
|
||||
assert_eq!(to_header_vec(result), vec![headers[2].clone()]);
|
||||
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(2, 1, 0, false)));
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(2, 1, 0, false)), 0);
|
||||
assert_eq!(to_header_vec(result), vec![headers[2].clone()]);
|
||||
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(50, 3, 5, false)));
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(50, 3, 5, false)), 0);
|
||||
assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[56].clone(), headers[62].clone()]);
|
||||
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(50, 3, 5, true)));
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(50, 3, 5, true)), 0);
|
||||
assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[44].clone(), headers[38].clone()]);
|
||||
}
|
||||
|
||||
@ -1450,7 +1485,7 @@ mod tests {
|
||||
|
||||
let node_request = node_list.out();
|
||||
// it returns rlp ONLY for hashes started with "f"
|
||||
let result = ChainSync::return_node_data(&io, &UntrustedRlp::new(&node_request.clone()));
|
||||
let result = ChainSync::return_node_data(&io, &UntrustedRlp::new(&node_request.clone()), 0);
|
||||
|
||||
assert!(result.is_ok());
|
||||
let rlp_result = result.unwrap();
|
||||
|
@ -436,7 +436,7 @@ impl EncryptedConnection {
|
||||
|
||||
/// Readable IO handler. Tracker receive status and returns decoded packet if avaialable.
|
||||
pub fn readable<Message>(&mut self, io: &IoContext<Message>) -> Result<Option<Packet>, UtilError> where Message: Send + Clone{
|
||||
io.clear_timer(self.connection.token).unwrap();
|
||||
try!(io.clear_timer(self.connection.token));
|
||||
if let EncryptedConnectionState::Header = self.read_state {
|
||||
if let Some(data) = try!(self.connection.readable()) {
|
||||
try!(self.read_header(&data));
|
||||
|
@ -683,6 +683,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
||||
Some(_) => packet_data.push((protocol, packet_id, data)),
|
||||
}
|
||||
},
|
||||
Ok(SessionData::Continue) => (),
|
||||
Ok(SessionData::None) => break,
|
||||
}
|
||||
}
|
||||
|
@ -68,6 +68,8 @@ pub enum SessionData {
|
||||
/// Zero based packet ID
|
||||
packet_id: u8,
|
||||
},
|
||||
/// Session has more data to be read
|
||||
Continue,
|
||||
}
|
||||
|
||||
/// Shared session information
|
||||
@ -329,16 +331,19 @@ impl Session {
|
||||
PACKET_DISCONNECT => {
|
||||
let rlp = UntrustedRlp::new(&packet.data[1..]);
|
||||
let reason: u8 = try!(rlp.val_at(0));
|
||||
if self.had_hello {
|
||||
debug!("Disconnected: {}: {:?}", self.token(), DisconnectReason::from_u8(reason));
|
||||
}
|
||||
Err(From::from(NetworkError::Disconnect(DisconnectReason::from_u8(reason))))
|
||||
}
|
||||
PACKET_PING => {
|
||||
try!(self.send_pong(io));
|
||||
Ok(SessionData::None)
|
||||
Ok(SessionData::Continue)
|
||||
},
|
||||
PACKET_PONG => {
|
||||
self.pong_time_ns = Some(time::precise_time_ns());
|
||||
self.info.ping_ms = Some((self.pong_time_ns.unwrap() - self.ping_time_ns) / 1000_000);
|
||||
Ok(SessionData::None)
|
||||
Ok(SessionData::Continue)
|
||||
},
|
||||
PACKET_GET_PEERS => Ok(SessionData::None), //TODO;
|
||||
PACKET_PEERS => Ok(SessionData::None),
|
||||
@ -348,7 +353,7 @@ impl Session {
|
||||
i += 1;
|
||||
if i == self.info.capabilities.len() {
|
||||
debug!(target: "network", "Unknown packet: {:?}", packet_id);
|
||||
return Ok(SessionData::None)
|
||||
return Ok(SessionData::Continue)
|
||||
}
|
||||
}
|
||||
|
||||
@ -359,7 +364,7 @@ impl Session {
|
||||
},
|
||||
_ => {
|
||||
debug!(target: "network", "Unknown packet: {:?}", packet_id);
|
||||
Ok(SessionData::None)
|
||||
Ok(SessionData::Continue)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user