Lowering complexity of request_blocks
This commit is contained in:
parent
30e7ac8d6d
commit
083747dc67
@ -249,14 +249,14 @@ impl ChainSync {
|
||||
blocks_total: match self.highest_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 },
|
||||
num_peers: self.peers.len(),
|
||||
num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(),
|
||||
mem_used:
|
||||
mem_used:
|
||||
// TODO: https://github.com/servo/heapsize/pull/50
|
||||
// self.downloading_hashes.heap_size_of_children()
|
||||
//+ self.downloading_bodies.heap_size_of_children()
|
||||
//+ self.downloading_hashes.heap_size_of_children()
|
||||
self.headers.heap_size_of_children()
|
||||
+ self.bodies.heap_size_of_children()
|
||||
+ self.peers.heap_size_of_children()
|
||||
// self.downloading_hashes.heap_size_of_children()
|
||||
//+ self.downloading_bodies.heap_size_of_children()
|
||||
//+ self.downloading_hashes.heap_size_of_children()
|
||||
self.headers.heap_size_of_children()
|
||||
+ self.bodies.heap_size_of_children()
|
||||
+ self.peers.heap_size_of_children()
|
||||
+ self.header_ids.heap_size_of_children(),
|
||||
}
|
||||
}
|
||||
@ -635,16 +635,7 @@ impl ChainSync {
|
||||
match self.last_imported_block { None => 0, Some(x) => x }
|
||||
}
|
||||
|
||||
/// Find some headers or blocks to download for a peer.
|
||||
fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId, ignore_others: bool) {
|
||||
self.clear_peer_download(peer_id);
|
||||
|
||||
if io.chain().queue_info().is_full() {
|
||||
self.pause_sync();
|
||||
return;
|
||||
}
|
||||
|
||||
// check to see if we need to download any block bodies first
|
||||
fn find_block_bodies_hashes_to_request(&self, ignore_others: bool) -> (Vec<H256>, Vec<BlockNumber>) {
|
||||
let mut needed_bodies: Vec<H256> = Vec::new();
|
||||
let mut needed_numbers: Vec<BlockNumber> = Vec::new();
|
||||
|
||||
@ -664,74 +655,88 @@ impl ChainSync {
|
||||
}
|
||||
}
|
||||
}
|
||||
(needed_bodies, needed_numbers)
|
||||
}
|
||||
|
||||
/// Find some headers or blocks to download for a peer.
|
||||
fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId, ignore_others: bool) {
|
||||
self.clear_peer_download(peer_id);
|
||||
|
||||
if io.chain().queue_info().is_full() {
|
||||
self.pause_sync();
|
||||
return;
|
||||
}
|
||||
|
||||
// check to see if we need to download any block bodies first
|
||||
let (needed_bodies, needed_numbers) = self.find_block_bodies_hashes_to_request(ignore_others);
|
||||
if !needed_bodies.is_empty() {
|
||||
let (head, _) = self.headers.range_iter().next().unwrap();
|
||||
if needed_numbers.first().unwrap() - head > self.max_download_ahead_blocks as BlockNumber {
|
||||
trace!(target: "sync", "{}: Stalled download ({} vs {}), helping with downloading block bodies", peer_id, needed_numbers.first().unwrap(), head);
|
||||
self.request_blocks(io, peer_id, true);
|
||||
return;
|
||||
} else {
|
||||
self.downloading_bodies.extend(needed_numbers.iter());
|
||||
replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, needed_numbers);
|
||||
self.request_bodies(io, peer_id, needed_bodies);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// check if need to download headers
|
||||
let mut start = 0;
|
||||
if !self.have_common_block {
|
||||
// download backwards until common block is found 1 header at a time
|
||||
let chain_info = io.chain().chain_info();
|
||||
start = chain_info.best_block_number;
|
||||
if !self.headers.is_empty() {
|
||||
start = min(start, self.headers.range_iter().next().unwrap().0 - 1);
|
||||
}
|
||||
if start == 0 {
|
||||
self.have_common_block = true; //reached genesis
|
||||
self.last_imported_hash = Some(chain_info.genesis_hash);
|
||||
self.last_imported_block = Some(0);
|
||||
}
|
||||
}
|
||||
if self.have_common_block {
|
||||
let mut headers: Vec<BlockNumber> = Vec::new();
|
||||
let mut prev = self.current_base_block() + 1;
|
||||
let head = self.headers.range_iter().next().map(|(h, _)| h);
|
||||
for (next, ref items) in self.headers.range_iter() {
|
||||
if !headers.is_empty() {
|
||||
break;
|
||||
}
|
||||
if next <= prev {
|
||||
prev = next + items.len() as BlockNumber;
|
||||
continue;
|
||||
}
|
||||
let mut block = prev;
|
||||
while block < next && headers.len() < MAX_HEADERS_TO_REQUEST {
|
||||
if ignore_others || !self.downloading_headers.contains(&(block as BlockNumber)) {
|
||||
headers.push(block as BlockNumber);
|
||||
}
|
||||
block += 1;
|
||||
}
|
||||
prev = next + items.len() as BlockNumber;
|
||||
}
|
||||
|
||||
if !headers.is_empty() {
|
||||
start = headers[0];
|
||||
if head.is_some() && start > head.unwrap() && start - head.unwrap() > self.max_download_ahead_blocks as BlockNumber {
|
||||
trace!(target: "sync", "{}: Stalled download ({} vs {}), helping with downloading headers", peer_id, start, head.unwrap());
|
||||
self.request_blocks(io, peer_id, true);
|
||||
return;
|
||||
}
|
||||
let count = headers.len();
|
||||
self.downloading_headers.extend(headers.iter());
|
||||
replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, headers);
|
||||
assert!(!self.headers.have_item(&start));
|
||||
self.request_headers_by_number(io, peer_id, start, count, 0, false);
|
||||
}
|
||||
self.downloading_bodies.extend(needed_numbers.iter());
|
||||
replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, needed_numbers);
|
||||
self.request_bodies(io, peer_id, needed_bodies);
|
||||
}
|
||||
else {
|
||||
// check if need to download headers
|
||||
let mut start = 0;
|
||||
if !self.have_common_block {
|
||||
// download backwards until common block is found 1 header at a time
|
||||
let chain_info = io.chain().chain_info();
|
||||
start = chain_info.best_block_number;
|
||||
if !self.headers.is_empty() {
|
||||
start = min(start, self.headers.range_iter().next().unwrap().0 - 1);
|
||||
}
|
||||
if start == 0 {
|
||||
self.have_common_block = true; //reached genesis
|
||||
self.last_imported_hash = Some(chain_info.genesis_hash);
|
||||
self.last_imported_block = Some(0);
|
||||
}
|
||||
}
|
||||
if self.have_common_block {
|
||||
let mut headers: Vec<BlockNumber> = Vec::new();
|
||||
let mut prev = self.current_base_block() + 1;
|
||||
let head = self.headers.range_iter().next().map(|(h, _)| h);
|
||||
for (next, ref items) in self.headers.range_iter() {
|
||||
if !headers.is_empty() {
|
||||
break;
|
||||
}
|
||||
if next <= prev {
|
||||
prev = next + items.len() as BlockNumber;
|
||||
continue;
|
||||
}
|
||||
let mut block = prev;
|
||||
while block < next && headers.len() < MAX_HEADERS_TO_REQUEST {
|
||||
if ignore_others || !self.downloading_headers.contains(&(block as BlockNumber)) {
|
||||
headers.push(block as BlockNumber);
|
||||
}
|
||||
block += 1;
|
||||
}
|
||||
prev = next + items.len() as BlockNumber;
|
||||
}
|
||||
|
||||
if !headers.is_empty() {
|
||||
start = headers[0];
|
||||
if head.is_some() && start > head.unwrap() && start - head.unwrap() > self.max_download_ahead_blocks as BlockNumber {
|
||||
trace!(target: "sync", "{}: Stalled download ({} vs {}), helping with downloading headers", peer_id, start, head.unwrap());
|
||||
self.request_blocks(io, peer_id, true);
|
||||
return;
|
||||
}
|
||||
let count = headers.len();
|
||||
self.downloading_headers.extend(headers.iter());
|
||||
replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, headers);
|
||||
assert!(!self.headers.have_item(&start));
|
||||
self.request_headers_by_number(io, peer_id, start, count, 0, false);
|
||||
}
|
||||
}
|
||||
else {
|
||||
// continue search for common block
|
||||
self.downloading_headers.insert(start);
|
||||
self.request_headers_by_number(io, peer_id, start, 1, 0, false);
|
||||
}
|
||||
// continue search for common block
|
||||
self.downloading_headers.insert(start);
|
||||
self.request_headers_by_number(io, peer_id, start, 1, 0, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user