diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 6a7add27f..1d90e2d03 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -249,14 +249,14 @@ impl ChainSync { blocks_total: match self.highest_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 }, num_peers: self.peers.len(), num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(), - mem_used: + mem_used: // TODO: https://github.com/servo/heapsize/pull/50 - // self.downloading_hashes.heap_size_of_children() - //+ self.downloading_bodies.heap_size_of_children() - //+ self.downloading_hashes.heap_size_of_children() - self.headers.heap_size_of_children() - + self.bodies.heap_size_of_children() - + self.peers.heap_size_of_children() + // self.downloading_hashes.heap_size_of_children() + //+ self.downloading_bodies.heap_size_of_children() + //+ self.downloading_hashes.heap_size_of_children() + self.headers.heap_size_of_children() + + self.bodies.heap_size_of_children() + + self.peers.heap_size_of_children() + self.header_ids.heap_size_of_children(), } } @@ -635,16 +635,7 @@ impl ChainSync { match self.last_imported_block { None => 0, Some(x) => x } } - /// Find some headers or blocks to download for a peer. - fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId, ignore_others: bool) { - self.clear_peer_download(peer_id); - - if io.chain().queue_info().is_full() { - self.pause_sync(); - return; - } - - // check to see if we need to download any block bodies first + fn find_block_bodies_hashes_to_request(&self, ignore_others: bool) -> (Vec, Vec) { let mut needed_bodies: Vec = Vec::new(); let mut needed_numbers: Vec = Vec::new(); @@ -664,74 +655,88 @@ impl ChainSync { } } } + (needed_bodies, needed_numbers) + } + + /// Find some headers or blocks to download for a peer. + fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId, ignore_others: bool) { + self.clear_peer_download(peer_id); + + if io.chain().queue_info().is_full() { + self.pause_sync(); + return; + } + + // check to see if we need to download any block bodies first + let (needed_bodies, needed_numbers) = self.find_block_bodies_hashes_to_request(ignore_others); if !needed_bodies.is_empty() { let (head, _) = self.headers.range_iter().next().unwrap(); if needed_numbers.first().unwrap() - head > self.max_download_ahead_blocks as BlockNumber { trace!(target: "sync", "{}: Stalled download ({} vs {}), helping with downloading block bodies", peer_id, needed_numbers.first().unwrap(), head); self.request_blocks(io, peer_id, true); - return; + } else { + self.downloading_bodies.extend(needed_numbers.iter()); + replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, needed_numbers); + self.request_bodies(io, peer_id, needed_bodies); + } + return; + } + + // check if need to download headers + let mut start = 0; + if !self.have_common_block { + // download backwards until common block is found 1 header at a time + let chain_info = io.chain().chain_info(); + start = chain_info.best_block_number; + if !self.headers.is_empty() { + start = min(start, self.headers.range_iter().next().unwrap().0 - 1); + } + if start == 0 { + self.have_common_block = true; //reached genesis + self.last_imported_hash = Some(chain_info.genesis_hash); + self.last_imported_block = Some(0); + } + } + if self.have_common_block { + let mut headers: Vec = Vec::new(); + let mut prev = self.current_base_block() + 1; + let head = self.headers.range_iter().next().map(|(h, _)| h); + for (next, ref items) in self.headers.range_iter() { + if !headers.is_empty() { + break; + } + if next <= prev { + prev = next + items.len() as BlockNumber; + continue; + } + let mut block = prev; + while block < next && headers.len() < MAX_HEADERS_TO_REQUEST { + if ignore_others || !self.downloading_headers.contains(&(block as BlockNumber)) { + headers.push(block as BlockNumber); + } + block += 1; + } + prev = next + items.len() as BlockNumber; + } + + if !headers.is_empty() { + start = headers[0]; + if head.is_some() && start > head.unwrap() && start - head.unwrap() > self.max_download_ahead_blocks as BlockNumber { + trace!(target: "sync", "{}: Stalled download ({} vs {}), helping with downloading headers", peer_id, start, head.unwrap()); + self.request_blocks(io, peer_id, true); + return; + } + let count = headers.len(); + self.downloading_headers.extend(headers.iter()); + replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, headers); + assert!(!self.headers.have_item(&start)); + self.request_headers_by_number(io, peer_id, start, count, 0, false); } - self.downloading_bodies.extend(needed_numbers.iter()); - replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, needed_numbers); - self.request_bodies(io, peer_id, needed_bodies); } else { - // check if need to download headers - let mut start = 0; - if !self.have_common_block { - // download backwards until common block is found 1 header at a time - let chain_info = io.chain().chain_info(); - start = chain_info.best_block_number; - if !self.headers.is_empty() { - start = min(start, self.headers.range_iter().next().unwrap().0 - 1); - } - if start == 0 { - self.have_common_block = true; //reached genesis - self.last_imported_hash = Some(chain_info.genesis_hash); - self.last_imported_block = Some(0); - } - } - if self.have_common_block { - let mut headers: Vec = Vec::new(); - let mut prev = self.current_base_block() + 1; - let head = self.headers.range_iter().next().map(|(h, _)| h); - for (next, ref items) in self.headers.range_iter() { - if !headers.is_empty() { - break; - } - if next <= prev { - prev = next + items.len() as BlockNumber; - continue; - } - let mut block = prev; - while block < next && headers.len() < MAX_HEADERS_TO_REQUEST { - if ignore_others || !self.downloading_headers.contains(&(block as BlockNumber)) { - headers.push(block as BlockNumber); - } - block += 1; - } - prev = next + items.len() as BlockNumber; - } - - if !headers.is_empty() { - start = headers[0]; - if head.is_some() && start > head.unwrap() && start - head.unwrap() > self.max_download_ahead_blocks as BlockNumber { - trace!(target: "sync", "{}: Stalled download ({} vs {}), helping with downloading headers", peer_id, start, head.unwrap()); - self.request_blocks(io, peer_id, true); - return; - } - let count = headers.len(); - self.downloading_headers.extend(headers.iter()); - replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, headers); - assert!(!self.headers.have_item(&start)); - self.request_headers_by_number(io, peer_id, start, count, 0, false); - } - } - else { - // continue search for common block - self.downloading_headers.insert(start); - self.request_headers_by_number(io, peer_id, start, 1, 0, false); - } + // continue search for common block + self.downloading_headers.insert(start); + self.request_headers_by_number(io, peer_id, start, 1, 0, false); } }