Sync: Cache last sync round block parents (#1331)

* Cache last sync round block parents

* Limit incoming transactions and new hashes
This commit is contained in:
Arkadiy Paronyan 2016-06-20 00:40:11 +02:00 committed by Gav Wood
parent 6b074e8fb2
commit bf6308312e

View File

@ -118,6 +118,9 @@ const MIN_PEERS_PROPAGATION: usize = 4;
const MAX_PEERS_PROPAGATION: usize = 128; const MAX_PEERS_PROPAGATION: usize = 128;
const MAX_PEER_LAG_PROPAGATION: BlockNumber = 20; const MAX_PEER_LAG_PROPAGATION: BlockNumber = 20;
const SUBCHAIN_SIZE: usize = 64; const SUBCHAIN_SIZE: usize = 64;
const MAX_ROUND_PARENTS: usize = 32;
const MAX_NEW_HASHES: usize = 64;
const MAX_TX_TO_IMPORT: usize = 512;
const STATUS_PACKET: u8 = 0x00; const STATUS_PACKET: u8 = 0x00;
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
@ -238,6 +241,8 @@ pub struct ChainSync {
_max_download_ahead_blocks: usize, _max_download_ahead_blocks: usize,
/// Number of blocks imported this round /// Number of blocks imported this round
imported_this_round: Option<usize>, imported_this_round: Option<usize>,
/// Block parents imported this round (hash, parent)
round_parents: VecDeque<(H256, H256)>,
/// Network ID /// Network ID
network_id: U256, network_id: U256,
} }
@ -260,6 +265,7 @@ impl ChainSync {
syncing_difficulty: U256::from(0u64), syncing_difficulty: U256::from(0u64),
last_sent_block_number: 0, last_sent_block_number: 0,
imported_this_round: None, imported_this_round: None,
round_parents: VecDeque::new(),
_max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), _max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
network_id: config.network_id, network_id: config.network_id,
}; };
@ -281,11 +287,9 @@ impl ChainSync {
num_peers: self.peers.len(), num_peers: self.peers.len(),
num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(), num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(),
mem_used: mem_used:
// TODO: https://github.com/servo/heapsize/pull/50
//+ self.downloading_bodies.heap_size_of_children()
//+ self.downloading_headers.heap_size_of_children()
self.blocks.heap_size() self.blocks.heap_size()
+ self.peers.heap_size_of_children(), + self.peers.heap_size_of_children()
+ self.round_parents.heap_size_of_children(),
} }
} }
@ -589,7 +593,7 @@ impl ChainSync {
return Ok(()); return Ok(());
} }
trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count()); trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count());
let hashes = r.iter().map(|item| (item.val_at::<H256>(0), item.val_at::<BlockNumber>(1))); let hashes = r.iter().take(MAX_NEW_HASHES).map(|item| (item.val_at::<H256>(0), item.val_at::<BlockNumber>(1)));
let mut max_height: BlockNumber = 0; let mut max_height: BlockNumber = 0;
let mut new_hashes = Vec::new(); let mut new_hashes = Vec::new();
for (rh, rd) in hashes { for (rh, rd) in hashes {
@ -729,20 +733,28 @@ impl ChainSync {
trace!(target: "sync", "Starting round (last imported count = {:?}, block = {:?}", self.imported_this_round, self.last_imported_block); trace!(target: "sync", "Starting round (last imported count = {:?}, block = {:?}", self.imported_this_round, self.last_imported_block);
// Check if need to retract to find the common block. The problem is that the peers still return headers by hash even // Check if need to retract to find the common block. The problem is that the peers still return headers by hash even
// from the non-canonical part of the tree. So we also retract if nothing has been imported last round. // from the non-canonical part of the tree. So we also retract if nothing has been imported last round.
if self.imported_this_round.is_some() && self.imported_this_round.unwrap() == 0 && self.last_imported_block > 0 { match self.imported_this_round {
match io.chain().block_hash(BlockID::Number(self.last_imported_block - 1)) { Some(n) if n == 0 && self.last_imported_block > 0 => {
Some(h) => { // nothing was imported last round, step back to a previous block
// search parent in last round known parents first
if let Some(&(_, p)) = self.round_parents.iter().find(|&&(h, _)| h == self.last_imported_hash) {
self.last_imported_block -= 1; self.last_imported_block -= 1;
self.last_imported_hash = h; self.last_imported_hash = p.clone();
trace!(target: "sync", "Searching common header {} ({})", self.last_imported_block, self.last_imported_hash); trace!(target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash);
} else {
match io.chain().block_hash(BlockID::Number(self.last_imported_block - 1)) {
Some(h) => {
self.last_imported_block -= 1;
self.last_imported_hash = h;
trace!(target: "sync", "Searching common header in the blockchain {} ({})", self.last_imported_block, self.last_imported_hash);
}
None => {
debug!(target: "sync", "Could not revert to previous block, last: {} ({})", self.last_imported_block, self.last_imported_hash);
}
}
} }
None => { },
// TODO: get hash by number from the block queue _ => (),
trace!(target: "sync", "Could not revert to previous block, last: {} ({})", self.last_imported_block, self.last_imported_hash);
// just wait for full sync to complete
self.pause_sync();
}
}
} }
self.imported_this_round = None; self.imported_this_round = None;
} }
@ -789,6 +801,15 @@ impl ChainSync {
peer.asking_blocks.clear(); peer.asking_blocks.clear();
} }
fn block_imported(&mut self, hash: &H256, number: BlockNumber, parent: &H256) {
self.last_imported_block = number;
self.last_imported_hash = hash.clone();
self.round_parents.push_back((hash.clone(), parent.clone()));
if self.round_parents.len() > MAX_ROUND_PARENTS {
self.round_parents.pop_front();
}
}
/// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import. /// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import.
fn collect_blocks(&mut self, io: &mut SyncIo) { fn collect_blocks(&mut self, io: &mut SyncIo) {
let mut restart = false; let mut restart = false;
@ -796,8 +817,10 @@ impl ChainSync {
let blocks = self.blocks.drain(); let blocks = self.blocks.drain();
let count = blocks.len(); let count = blocks.len();
for block in blocks { for block in blocks {
let number = BlockView::new(&block).header_view().number(); let (h, number, parent) = {
let h = BlockView::new(&block).header_view().sha3(); let header = BlockView::new(&block).header_view();
(header.sha3(), header.number(), header.parent_hash())
};
// Perform basic block verification // Perform basic block verification
if !Block::is_good(&block) { if !Block::is_good(&block) {
@ -808,20 +831,17 @@ impl ChainSync {
match io.chain().import_block(block) { match io.chain().import_block(block) {
Err(Error::Import(ImportError::AlreadyInChain)) => { Err(Error::Import(ImportError::AlreadyInChain)) => {
self.last_imported_block = number;
self.last_imported_hash = h.clone();
trace!(target: "sync", "Block already in chain {:?}", h); trace!(target: "sync", "Block already in chain {:?}", h);
self.block_imported(&h, number, &parent);
}, },
Err(Error::Import(ImportError::AlreadyQueued)) => { Err(Error::Import(ImportError::AlreadyQueued)) => {
self.last_imported_block = number;
self.last_imported_hash = h.clone();
trace!(target: "sync", "Block already queued {:?}", h); trace!(target: "sync", "Block already queued {:?}", h);
self.block_imported(&h, number, &parent);
}, },
Ok(_) => { Ok(_) => {
trace!(target: "sync", "Block queued {:?}", h); trace!(target: "sync", "Block queued {:?}", h);
self.last_imported_block = number;
self.last_imported_hash = h.clone();
imported.insert(h.clone()); imported.insert(h.clone());
self.block_imported(&h, number, &parent);
}, },
Err(Error::Block(BlockError::UnknownParent(_))) if self.state == SyncState::NewBlocks => { Err(Error::Block(BlockError::UnknownParent(_))) if self.state == SyncState::NewBlocks => {
trace!(target: "sync", "Unknown new block parent, restarting sync"); trace!(target: "sync", "Unknown new block parent, restarting sync");
@ -921,7 +941,7 @@ impl ChainSync {
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count); trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
let mut transactions = Vec::with_capacity(item_count); let mut transactions = Vec::with_capacity(item_count);
for i in 0..item_count { for i in 0 .. min(item_count, MAX_TX_TO_IMPORT) {
let tx: SignedTransaction = try!(r.val_at(i)); let tx: SignedTransaction = try!(r.val_at(i));
transactions.push(tx); transactions.push(tx);
} }