Propagate only one last hash for peers that are too far behind

This commit is contained in:
arkpar 2016-02-14 17:10:55 +01:00
parent dee375bfac
commit fc7483ab87

View File

@ -155,7 +155,9 @@ struct PeerInfo {
/// Peer network id /// Peer network id
network_id: U256, network_id: U256,
/// Peer best block hash /// Peer best block hash
latest: H256, latest_hash: H256,
/// Peer best block number if known
latest_number: Option<BlockNumber>,
/// Peer total difficulty /// Peer total difficulty
difficulty: U256, difficulty: U256,
/// Type of data currenty being requested from peer. /// Type of data currenty being requested from peer.
@ -282,7 +284,8 @@ impl ChainSync {
protocol_version: try!(r.val_at(0)), protocol_version: try!(r.val_at(0)),
network_id: try!(r.val_at(1)), network_id: try!(r.val_at(1)),
difficulty: try!(r.val_at(2)), difficulty: try!(r.val_at(2)),
latest: try!(r.val_at(3)), latest_hash: try!(r.val_at(3)),
latest_number: None,
genesis: try!(r.val_at(4)), genesis: try!(r.val_at(4)),
asking: PeerAsking::Nothing, asking: PeerAsking::Nothing,
asking_blocks: Vec::new(), asking_blocks: Vec::new(),
@ -290,7 +293,7 @@ impl ChainSync {
ask_time: 0f64, ask_time: 0f64,
}; };
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest, peer.genesis); trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
if self.peers.contains_key(&peer_id) { if self.peers.contains_key(&peer_id) {
warn!("Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id)); warn!("Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id));
@ -450,7 +453,8 @@ impl ChainSync {
let mut unknown = false; let mut unknown = false;
{ {
let peer = self.peers.get_mut(&peer_id).unwrap(); let peer = self.peers.get_mut(&peer_id).unwrap();
peer.latest = header.hash(); peer.latest_hash = header.hash();
peer.latest_number = Some(header.number());
} }
// TODO: Decompose block and add to self.headers and self.bodies instead // TODO: Decompose block and add to self.headers and self.bodies instead
if header.number == From::from(self.current_base_block() + 1) { if header.number == From::from(self.current_base_block() + 1) {
@ -516,7 +520,8 @@ impl ChainSync {
if d > max_height { if d > max_height {
trace!(target: "sync", "New unknown block hash {:?}", h); trace!(target: "sync", "New unknown block hash {:?}", h);
let peer = self.peers.get_mut(&peer_id).unwrap(); let peer = self.peers.get_mut(&peer_id).unwrap();
peer.latest = h.clone(); peer.latest_hash = h.clone();
peer.latest_number = Some(d);
max_height = d; max_height = d;
} }
}, },
@ -583,7 +588,7 @@ impl ChainSync {
trace!(target: "sync", "Waiting for block queue"); trace!(target: "sync", "Waiting for block queue");
return; return;
} }
(peer.latest.clone(), peer.difficulty.clone()) (peer.latest_hash.clone(), peer.difficulty.clone())
}; };
let td = io.chain().chain_info().pending_total_difficulty; let td = io.chain().chain_info().pending_total_difficulty;
@ -1117,25 +1122,28 @@ impl ChainSync {
} }
/// returns peer ids that have less blocks than our chain /// returns peer ids that have less blocks than our chain
fn get_lagging_peers(&self, io: &SyncIo) -> Vec<PeerId> { fn get_lagging_peers(&mut self, io: &SyncIo) -> Vec<(PeerId, BlockNumber)> {
let chain = io.chain(); let chain = io.chain();
let chain_info = chain.chain_info(); let chain_info = chain.chain_info();
let latest_hash = chain_info.best_block_hash; let latest_hash = chain_info.best_block_hash;
let latest_number = chain_info.best_block_number; let latest_number = chain_info.best_block_number;
self.peers.iter().filter(|&(_, peer_info)| self.peers.iter_mut().filter_map(|(&id, ref mut peer_info)|
match io.chain().block_status(BlockId::Hash(peer_info.latest.clone())) { match io.chain().block_status(BlockId::Hash(peer_info.latest_hash.clone())) {
BlockStatus::InChain => { BlockStatus::InChain => {
let peer_number = HeaderView::new(&io.chain().block_header(BlockId::Hash(peer_info.latest.clone())).unwrap()).number(); if peer_info.latest_number.is_none() {
peer_info.latest != latest_hash && latest_number > peer_number peer_info.latest_number = Some(HeaderView::new(&io.chain().block_header(BlockId::Hash(peer_info.latest_hash.clone())).unwrap()).number());
}
if peer_info.latest_hash != latest_hash && latest_number > peer_info.latest_number.unwrap() {
Some((id, peer_info.latest_number.unwrap()))
} else { None }
}, },
_ => false _ => None
}) })
.map(|(peer_id, _)| peer_id) .collect::<Vec<_>>()
.cloned().collect::<Vec<PeerId>>()
} }
/// propagades latest block to lagging peers /// propagades latest block to lagging peers
fn propagade_blocks(&mut self, local_best: &H256, io: &mut SyncIo) -> usize { fn propagade_blocks(&mut self, local_best: &H256, best_number: BlockNumber, io: &mut SyncIo) -> usize {
let updated_peers = { let updated_peers = {
let lagging_peers = self.get_lagging_peers(io); let lagging_peers = self.get_lagging_peers(io);
@ -1143,33 +1151,41 @@ impl ChainSync {
let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32; let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32;
let lucky_peers = match lagging_peers.len() { let lucky_peers = match lagging_peers.len() {
0 ... MIN_PEERS_PROPAGATION => lagging_peers, 0 ... MIN_PEERS_PROPAGATION => lagging_peers,
_ => lagging_peers.iter().filter(|_| ::rand::random::<u32>() < fraction).cloned().collect::<Vec<PeerId>>() _ => lagging_peers.into_iter().filter(|_| ::rand::random::<u32>() < fraction).collect::<Vec<_>>()
}; };
// taking at max of MAX_PEERS_PROPAGATION // taking at max of MAX_PEERS_PROPAGATION
lucky_peers.iter().take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).cloned().collect::<Vec<PeerId>>() lucky_peers.iter().map(|&(id, _)| id.clone()).take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).collect::<Vec<PeerId>>()
}; };
let mut sent = 0; let mut sent = 0;
for peer_id in updated_peers { for peer_id in updated_peers {
let rlp = ChainSync::create_latest_block_rlp(io.chain()); let rlp = ChainSync::create_latest_block_rlp(io.chain());
self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp); self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp);
self.peers.get_mut(&peer_id).unwrap().latest = local_best.clone(); self.peers.get_mut(&peer_id).unwrap().latest_hash = local_best.clone();
self.peers.get_mut(&peer_id).unwrap().latest_number = Some(best_number);
sent = sent + 1; sent = sent + 1;
} }
sent sent
} }
/// propagades new known hashes to all peers /// propagades new known hashes to all peers
fn propagade_new_hashes(&mut self, local_best: &H256, io: &mut SyncIo) -> usize { fn propagade_new_hashes(&mut self, local_best: &H256, best_number: BlockNumber, io: &mut SyncIo) -> usize {
let updated_peers = self.get_lagging_peers(io); let updated_peers = self.get_lagging_peers(io);
let mut sent = 0; let mut sent = 0;
for peer_id in updated_peers { let last_parent = HeaderView::new(&io.chain().block_header(BlockId::Hash(local_best.clone())).unwrap()).parent_hash();
sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &self.peers.get(&peer_id).unwrap().latest, &local_best) { for (peer_id, peer_number) in updated_peers {
let mut peer_best = self.peers.get(&peer_id).unwrap().latest_hash.clone();
if best_number - peer_number > MAX_PEERS_PROPAGATION as BlockNumber {
// If we think peer is too far behind just end one latest hash
peer_best = last_parent.clone();
}
sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &peer_best, &local_best) {
Some(rlp) => { Some(rlp) => {
{ {
let peer = self.peers.get_mut(&peer_id).unwrap(); let peer = self.peers.get_mut(&peer_id).unwrap();
peer.latest = local_best.clone(); peer.latest_hash = local_best.clone();
peer.latest_number = Some(best_number);
} }
self.send_packet(io, peer_id, NEW_BLOCK_HASHES_PACKET, rlp); self.send_packet(io, peer_id, NEW_BLOCK_HASHES_PACKET, rlp);
1 1
@ -1189,8 +1205,8 @@ impl ChainSync {
pub fn chain_blocks_verified(&mut self, io: &mut SyncIo) { pub fn chain_blocks_verified(&mut self, io: &mut SyncIo) {
let chain = io.chain().chain_info(); let chain = io.chain().chain_info();
if (((chain.best_block_number as i64) - (self.last_send_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { if (((chain.best_block_number as i64) - (self.last_send_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
let blocks = self.propagade_blocks(&chain.best_block_hash, io); let blocks = self.propagade_blocks(&chain.best_block_hash, chain.best_block_number, io);
let hashes = self.propagade_new_hashes(&chain.best_block_hash, io); let hashes = self.propagade_new_hashes(&chain.best_block_hash, chain.best_block_number, io);
if blocks != 0 || hashes != 0 { if blocks != 0 || hashes != 0 {
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
} }
@ -1322,7 +1338,8 @@ mod tests {
protocol_version: 0, protocol_version: 0,
genesis: H256::zero(), genesis: H256::zero(),
network_id: U256::zero(), network_id: U256::zero(),
latest: peer_latest_hash, latest_hash: peer_latest_hash,
latest_number: None,
difficulty: U256::zero(), difficulty: U256::zero(),
asking: PeerAsking::Nothing, asking: PeerAsking::Nothing,
asking_blocks: Vec::<BlockNumber>::new(), asking_blocks: Vec::<BlockNumber>::new(),
@ -1337,7 +1354,7 @@ mod tests {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
client.add_blocks(100, false); client.add_blocks(100, false);
let mut queue = VecDeque::new(); let mut queue = VecDeque::new();
let sync = dummy_sync_with_peer(client.block_hash_delta_minus(10)); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10));
let io = TestIo::new(&mut client, &mut queue, None); let io = TestIo::new(&mut client, &mut queue, None);
let lagging_peers = sync.get_lagging_peers(&io); let lagging_peers = sync.get_lagging_peers(&io);
@ -1369,9 +1386,10 @@ mod tests {
let mut queue = VecDeque::new(); let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let best_hash = client.chain_info().best_block_hash.clone(); let best_hash = client.chain_info().best_block_hash.clone();
let best_number = client.chain_info().best_block_number;
let mut io = TestIo::new(&mut client, &mut queue, None); let mut io = TestIo::new(&mut client, &mut queue, None);
let peer_count = sync.propagade_new_hashes(&best_hash, &mut io); let peer_count = sync.propagade_new_hashes(&best_hash, best_number, &mut io);
// 1 message should be send // 1 message should be send
assert_eq!(1, io.queue.len()); assert_eq!(1, io.queue.len());
@ -1388,9 +1406,10 @@ mod tests {
let mut queue = VecDeque::new(); let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let best_hash = client.chain_info().best_block_hash.clone(); let best_hash = client.chain_info().best_block_hash.clone();
let best_number = client.chain_info().best_block_number;
let mut io = TestIo::new(&mut client, &mut queue, None); let mut io = TestIo::new(&mut client, &mut queue, None);
let peer_count = sync.propagade_blocks(&best_hash, &mut io); let peer_count = sync.propagade_blocks(&best_hash, best_number, &mut io);
// 1 message should be send // 1 message should be send
assert_eq!(1, io.queue.len()); assert_eq!(1, io.queue.len());
@ -1493,9 +1512,10 @@ mod tests {
let mut queue = VecDeque::new(); let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let best_hash = client.chain_info().best_block_hash.clone(); let best_hash = client.chain_info().best_block_hash.clone();
let best_number = client.chain_info().best_block_number;
let mut io = TestIo::new(&mut client, &mut queue, None); let mut io = TestIo::new(&mut client, &mut queue, None);
sync.propagade_new_hashes(&best_hash, &mut io); sync.propagade_new_hashes(&best_hash, best_number, &mut io);
let data = &io.queue[0].data.clone(); let data = &io.queue[0].data.clone();
let result = sync.on_peer_new_hashes(&mut io, 0, &UntrustedRlp::new(&data)); let result = sync.on_peer_new_hashes(&mut io, 0, &UntrustedRlp::new(&data));
@ -1511,9 +1531,10 @@ mod tests {
let mut queue = VecDeque::new(); let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let best_hash = client.chain_info().best_block_hash.clone(); let best_hash = client.chain_info().best_block_hash.clone();
let best_number = client.chain_info().best_block_number;
let mut io = TestIo::new(&mut client, &mut queue, None); let mut io = TestIo::new(&mut client, &mut queue, None);
sync.propagade_blocks(&best_hash, &mut io); sync.propagade_blocks(&best_hash, best_number, &mut io);
let data = &io.queue[0].data.clone(); let data = &io.queue[0].data.clone();
let result = sync.on_peer_new_block(&mut io, 0, &UntrustedRlp::new(&data)); let result = sync.on_peer_new_block(&mut io, 0, &UntrustedRlp::new(&data));