|
|
|
@ -113,11 +113,12 @@ const MAX_HEADERS_TO_SEND: usize = 512;
|
|
|
|
|
const MAX_NODE_DATA_TO_SEND: usize = 1024;
|
|
|
|
|
const MAX_RECEIPTS_TO_SEND: usize = 1024;
|
|
|
|
|
const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256;
|
|
|
|
|
const MAX_HEADERS_TO_REQUEST: usize = 256;
|
|
|
|
|
const MAX_HEADERS_TO_REQUEST: usize = 128;
|
|
|
|
|
const MAX_BODIES_TO_REQUEST: usize = 64;
|
|
|
|
|
const MIN_PEERS_PROPAGATION: usize = 4;
|
|
|
|
|
const MAX_PEERS_PROPAGATION: usize = 128;
|
|
|
|
|
const MAX_PEER_LAG_PROPAGATION: BlockNumber = 20;
|
|
|
|
|
const SUBCHAIN_SIZE: usize = 64;
|
|
|
|
|
|
|
|
|
|
const STATUS_PACKET: u8 = 0x00;
|
|
|
|
|
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
|
|
|
|
@ -133,7 +134,7 @@ const NODE_DATA_PACKET: u8 = 0x0e;
|
|
|
|
|
const GET_RECEIPTS_PACKET: u8 = 0x0f;
|
|
|
|
|
const RECEIPTS_PACKET: u8 = 0x10;
|
|
|
|
|
|
|
|
|
|
const CONNECTION_TIMEOUT_SEC: f64 = 10f64;
|
|
|
|
|
const CONNECTION_TIMEOUT_SEC: f64 = 15f64;
|
|
|
|
|
|
|
|
|
|
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
|
|
|
|
|
/// Sync state
|
|
|
|
@ -639,7 +640,7 @@ impl ChainSync {
|
|
|
|
|
self.sync_peer(io, p, false);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if !self.peers.values().any(|p| p.asking != PeerAsking::Nothing) {
|
|
|
|
|
if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing) {
|
|
|
|
|
self.complete_sync();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -665,7 +666,7 @@ impl ChainSync {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if self.state == SyncState::Waiting {
|
|
|
|
|
trace!(target: "sync", "Waiting for block queue");
|
|
|
|
|
trace!(target: "sync", "Waiting for the block queue");
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
(peer.latest_hash.clone(), peer.difficulty.clone())
|
|
|
|
@ -689,7 +690,7 @@ impl ChainSync {
|
|
|
|
|
// Request subchain headers
|
|
|
|
|
trace!(target: "sync", "Starting sync with better chain");
|
|
|
|
|
let last = self.last_imported_hash.clone();
|
|
|
|
|
self.request_headers_by_hash(io, peer_id, &last, 128, 255, false, PeerAsking::Heads);
|
|
|
|
|
self.request_headers_by_hash(io, peer_id, &last, SUBCHAIN_SIZE, MAX_HEADERS_TO_REQUEST - 1, false, PeerAsking::Heads);
|
|
|
|
|
},
|
|
|
|
|
SyncState::Blocks | SyncState::NewBlocks => {
|
|
|
|
|
if io.chain().block_status(BlockID::Hash(peer_latest)) == BlockStatus::Unknown {
|
|
|
|
@ -704,6 +705,8 @@ impl ChainSync {
|
|
|
|
|
fn start_sync_round(&mut self, io: &mut SyncIo) {
|
|
|
|
|
self.state = SyncState::ChainHead;
|
|
|
|
|
trace!(target: "sync", "Starting round (last imported count = {:?}, block = {:?}", self.imported_this_round, self.last_imported_block);
|
|
|
|
|
// Check if need to retract to find the common block. The problem is that the peers still return headers by hash even
|
|
|
|
|
// from the non-canonical part of the tree. So we also retract if nothing has been imported last round.
|
|
|
|
|
if self.imported_this_round.is_some() && self.imported_this_round.unwrap() == 0 && self.last_imported_block > 0 {
|
|
|
|
|
match io.chain().block_hash(BlockID::Number(self.last_imported_block - 1)) {
|
|
|
|
|
Some(h) => {
|
|
|
|
@ -781,9 +784,13 @@ impl ChainSync {
|
|
|
|
|
|
|
|
|
|
match io.chain().import_block(block) {
|
|
|
|
|
Err(Error::Import(ImportError::AlreadyInChain)) => {
|
|
|
|
|
self.last_imported_block = number;
|
|
|
|
|
self.last_imported_hash = h.clone();
|
|
|
|
|
trace!(target: "sync", "Block already in chain {:?}", h);
|
|
|
|
|
},
|
|
|
|
|
Err(Error::Import(ImportError::AlreadyQueued)) => {
|
|
|
|
|
self.last_imported_block = number;
|
|
|
|
|
self.last_imported_hash = h.clone();
|
|
|
|
|
trace!(target: "sync", "Block already queued {:?}", h);
|
|
|
|
|
},
|
|
|
|
|
Ok(_) => {
|
|
|
|
@ -856,22 +863,15 @@ impl ChainSync {
|
|
|
|
|
|
|
|
|
|
/// Generic request sender
|
|
|
|
|
fn send_request(&mut self, sync: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) {
|
|
|
|
|
{
|
|
|
|
|
let peer = self.peers.get_mut(&peer_id).unwrap();
|
|
|
|
|
if peer.asking != PeerAsking::Nothing {
|
|
|
|
|
warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking);
|
|
|
|
|
}
|
|
|
|
|
let peer = self.peers.get_mut(&peer_id).unwrap();
|
|
|
|
|
if peer.asking != PeerAsking::Nothing {
|
|
|
|
|
warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking);
|
|
|
|
|
}
|
|
|
|
|
match sync.send(peer_id, packet_id, packet) {
|
|
|
|
|
Err(e) => {
|
|
|
|
|
debug!(target:"sync", "Error sending request: {:?}", e);
|
|
|
|
|
sync.disable_peer(peer_id);
|
|
|
|
|
}
|
|
|
|
|
Ok(_) => {
|
|
|
|
|
let mut peer = self.peers.get_mut(&peer_id).unwrap();
|
|
|
|
|
peer.asking = asking;
|
|
|
|
|
peer.ask_time = time::precise_time_s();
|
|
|
|
|
}
|
|
|
|
|
peer.asking = asking;
|
|
|
|
|
peer.ask_time = time::precise_time_s();
|
|
|
|
|
if let Err(e) = sync.send(peer_id, packet_id, packet) {
|
|
|
|
|
debug!(target:"sync", "Error sending request: {:?}", e);
|
|
|
|
|
sync.disable_peer(peer_id);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1099,6 +1099,7 @@ impl ChainSync {
|
|
|
|
|
let tick = time::precise_time_s();
|
|
|
|
|
for (peer_id, peer) in &self.peers {
|
|
|
|
|
if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC {
|
|
|
|
|
trace!(target:"sync", "Timeouted {}", peer_id);
|
|
|
|
|
io.disconnect_peer(*peer_id);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -1164,24 +1165,23 @@ impl ChainSync {
|
|
|
|
|
.collect::<Vec<_>>()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fn select_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> Vec<(PeerId, BlockNumber)> {
|
|
|
|
|
use rand::Rng;
|
|
|
|
|
let mut lagging_peers = self.get_lagging_peers(chain_info, io);
|
|
|
|
|
// take sqrt(x) peers
|
|
|
|
|
let mut count = (self.peers.len() as f64).powf(0.5).round() as usize;
|
|
|
|
|
count = min(count, MAX_PEERS_PROPAGATION);
|
|
|
|
|
count = max(count, MIN_PEERS_PROPAGATION);
|
|
|
|
|
::rand::thread_rng().shuffle(&mut lagging_peers);
|
|
|
|
|
lagging_peers.into_iter().take(count).collect::<Vec<_>>()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// propagates latest block to lagging peers
|
|
|
|
|
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize {
|
|
|
|
|
let updated_peers = {
|
|
|
|
|
let lagging_peers = self.get_lagging_peers(chain_info, io);
|
|
|
|
|
|
|
|
|
|
// sqrt(x)/x scaled to max u32
|
|
|
|
|
let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32;
|
|
|
|
|
let lucky_peers = match lagging_peers.len() {
|
|
|
|
|
0 ... MIN_PEERS_PROPAGATION => lagging_peers,
|
|
|
|
|
_ => lagging_peers.into_iter().filter(|_| ::rand::random::<u32>() < fraction).collect::<Vec<_>>()
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// taking at max of MAX_PEERS_PROPAGATION
|
|
|
|
|
lucky_peers.iter().map(|&(id, _)| id.clone()).take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).collect::<Vec<PeerId>>()
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let lucky_peers = self.select_lagging_peers(chain_info, io);
|
|
|
|
|
let mut sent = 0;
|
|
|
|
|
for peer_id in updated_peers {
|
|
|
|
|
for (peer_id, _) in lucky_peers {
|
|
|
|
|
let rlp = ChainSync::create_latest_block_rlp(io.chain());
|
|
|
|
|
self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp);
|
|
|
|
|
self.peers.get_mut(&peer_id).unwrap().latest_hash = chain_info.best_block_hash.clone();
|
|
|
|
@ -1193,12 +1193,12 @@ impl ChainSync {
|
|
|
|
|
|
|
|
|
|
/// propagates new known hashes to all peers
|
|
|
|
|
fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize {
|
|
|
|
|
let updated_peers = self.get_lagging_peers(chain_info, io);
|
|
|
|
|
let lucky_peers = self.select_lagging_peers(chain_info, io);
|
|
|
|
|
let mut sent = 0;
|
|
|
|
|
let last_parent = HeaderView::new(&io.chain().block_header(BlockID::Hash(chain_info.best_block_hash.clone())).unwrap()).parent_hash();
|
|
|
|
|
for (peer_id, peer_number) in updated_peers {
|
|
|
|
|
for (peer_id, peer_number) in lucky_peers {
|
|
|
|
|
let mut peer_best = self.peers.get(&peer_id).unwrap().latest_hash.clone();
|
|
|
|
|
if chain_info.best_block_number - peer_number > MAX_PEERS_PROPAGATION as BlockNumber {
|
|
|
|
|
if chain_info.best_block_number - peer_number > MAX_PEER_LAG_PROPAGATION as BlockNumber {
|
|
|
|
|
// If we think peer is too far behind just send one latest hash
|
|
|
|
|
peer_best = last_parent.clone();
|
|
|
|
|
}
|
|
|
|
@ -1259,15 +1259,15 @@ impl ChainSync {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn propagate_latest_blocks(&mut self, io: &mut SyncIo) {
|
|
|
|
|
self.propagate_new_transactions(io);
|
|
|
|
|
let chain_info = io.chain().chain_info();
|
|
|
|
|
if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
|
|
|
|
|
let blocks = self.propagate_blocks(&chain_info, io);
|
|
|
|
|
let hashes = self.propagate_new_hashes(&chain_info, io);
|
|
|
|
|
let blocks = self.propagate_blocks(&chain_info, io);
|
|
|
|
|
if blocks != 0 || hashes != 0 {
|
|
|
|
|
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
self.propagate_new_transactions(io);
|
|
|
|
|
self.last_sent_block_number = chain_info.best_block_number;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|