From bf6308312ed4601d4489490ebd84454759f64599 Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Mon, 20 Jun 2016 00:40:11 +0200 Subject: [PATCH] Sync: Cache last sync round block parents (#1331) * Cache last sync round block parents * Limit incoming transactions and new hashes --- sync/src/chain.rs | 72 ++++++++++++++++++++++++++++++----------------- 1 file changed, 46 insertions(+), 26 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index cb781ea22..caed95e0c 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -118,6 +118,9 @@ const MIN_PEERS_PROPAGATION: usize = 4; const MAX_PEERS_PROPAGATION: usize = 128; const MAX_PEER_LAG_PROPAGATION: BlockNumber = 20; const SUBCHAIN_SIZE: usize = 64; +const MAX_ROUND_PARENTS: usize = 32; +const MAX_NEW_HASHES: usize = 64; +const MAX_TX_TO_IMPORT: usize = 512; const STATUS_PACKET: u8 = 0x00; const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; @@ -238,6 +241,8 @@ pub struct ChainSync { _max_download_ahead_blocks: usize, /// Number of blocks imported this round imported_this_round: Option, + /// Block parents imported this round (hash, parent) + round_parents: VecDeque<(H256, H256)>, /// Network ID network_id: U256, } @@ -260,6 +265,7 @@ impl ChainSync { syncing_difficulty: U256::from(0u64), last_sent_block_number: 0, imported_this_round: None, + round_parents: VecDeque::new(), _max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), network_id: config.network_id, }; @@ -281,11 +287,9 @@ impl ChainSync { num_peers: self.peers.len(), num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(), mem_used: - // TODO: https://github.com/servo/heapsize/pull/50 - //+ self.downloading_bodies.heap_size_of_children() - //+ self.downloading_headers.heap_size_of_children() self.blocks.heap_size() - + self.peers.heap_size_of_children(), + + self.peers.heap_size_of_children() + + self.round_parents.heap_size_of_children(), } } @@ -589,7 +593,7 @@ impl ChainSync { return Ok(()); } trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count()); - let hashes = r.iter().map(|item| (item.val_at::(0), item.val_at::(1))); + let hashes = r.iter().take(MAX_NEW_HASHES).map(|item| (item.val_at::(0), item.val_at::(1))); let mut max_height: BlockNumber = 0; let mut new_hashes = Vec::new(); for (rh, rd) in hashes { @@ -729,20 +733,28 @@ impl ChainSync { trace!(target: "sync", "Starting round (last imported count = {:?}, block = {:?}", self.imported_this_round, self.last_imported_block); // Check if need to retract to find the common block. The problem is that the peers still return headers by hash even // from the non-canonical part of the tree. So we also retract if nothing has been imported last round. - if self.imported_this_round.is_some() && self.imported_this_round.unwrap() == 0 && self.last_imported_block > 0 { - match io.chain().block_hash(BlockID::Number(self.last_imported_block - 1)) { - Some(h) => { + match self.imported_this_round { + Some(n) if n == 0 && self.last_imported_block > 0 => { + // nothing was imported last round, step back to a previous block + // search parent in last round known parents first + if let Some(&(_, p)) = self.round_parents.iter().find(|&&(h, _)| h == self.last_imported_hash) { self.last_imported_block -= 1; - self.last_imported_hash = h; - trace!(target: "sync", "Searching common header {} ({})", self.last_imported_block, self.last_imported_hash); + self.last_imported_hash = p.clone(); + trace!(target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash); + } else { + match io.chain().block_hash(BlockID::Number(self.last_imported_block - 1)) { + Some(h) => { + self.last_imported_block -= 1; + self.last_imported_hash = h; + trace!(target: "sync", "Searching common header in the blockchain {} ({})", self.last_imported_block, self.last_imported_hash); + } + None => { + debug!(target: "sync", "Could not revert to previous block, last: {} ({})", self.last_imported_block, self.last_imported_hash); + } + } } - None => { - // TODO: get hash by number from the block queue - trace!(target: "sync", "Could not revert to previous block, last: {} ({})", self.last_imported_block, self.last_imported_hash); - // just wait for full sync to complete - self.pause_sync(); - } - } + }, + _ => (), } self.imported_this_round = None; } @@ -789,6 +801,15 @@ impl ChainSync { peer.asking_blocks.clear(); } + fn block_imported(&mut self, hash: &H256, number: BlockNumber, parent: &H256) { + self.last_imported_block = number; + self.last_imported_hash = hash.clone(); + self.round_parents.push_back((hash.clone(), parent.clone())); + if self.round_parents.len() > MAX_ROUND_PARENTS { + self.round_parents.pop_front(); + } + } + /// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import. fn collect_blocks(&mut self, io: &mut SyncIo) { let mut restart = false; @@ -796,8 +817,10 @@ impl ChainSync { let blocks = self.blocks.drain(); let count = blocks.len(); for block in blocks { - let number = BlockView::new(&block).header_view().number(); - let h = BlockView::new(&block).header_view().sha3(); + let (h, number, parent) = { + let header = BlockView::new(&block).header_view(); + (header.sha3(), header.number(), header.parent_hash()) + }; // Perform basic block verification if !Block::is_good(&block) { @@ -808,20 +831,17 @@ impl ChainSync { match io.chain().import_block(block) { Err(Error::Import(ImportError::AlreadyInChain)) => { - self.last_imported_block = number; - self.last_imported_hash = h.clone(); trace!(target: "sync", "Block already in chain {:?}", h); + self.block_imported(&h, number, &parent); }, Err(Error::Import(ImportError::AlreadyQueued)) => { - self.last_imported_block = number; - self.last_imported_hash = h.clone(); trace!(target: "sync", "Block already queued {:?}", h); + self.block_imported(&h, number, &parent); }, Ok(_) => { trace!(target: "sync", "Block queued {:?}", h); - self.last_imported_block = number; - self.last_imported_hash = h.clone(); imported.insert(h.clone()); + self.block_imported(&h, number, &parent); }, Err(Error::Block(BlockError::UnknownParent(_))) if self.state == SyncState::NewBlocks => { trace!(target: "sync", "Unknown new block parent, restarting sync"); @@ -921,7 +941,7 @@ impl ChainSync { trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count); let mut transactions = Vec::with_capacity(item_count); - for i in 0..item_count { + for i in 0 .. min(item_count, MAX_TX_TO_IMPORT) { let tx: SignedTransaction = try!(r.val_at(i)); transactions.push(tx); }