Download unique receipts only

This commit is contained in:
arkpar 2017-03-31 14:12:15 +02:00
parent 3915943f57
commit deef600caf

View File

@ -30,6 +30,7 @@ struct SyncBlock {
header: Bytes, header: Bytes,
body: Option<Bytes>, body: Option<Bytes>,
receipts: Option<Bytes>, receipts: Option<Bytes>,
receipts_root: H256,
} }
/// Block with optional receipt /// Block with optional receipt
@ -76,7 +77,7 @@ pub struct BlockCollection {
downloading_headers: HashSet<H256>, downloading_headers: HashSet<H256>,
/// Set of block bodies being downloaded identified by block hash. /// Set of block bodies being downloaded identified by block hash.
downloading_bodies: HashSet<H256>, downloading_bodies: HashSet<H256>,
/// Set of block receipts being downloaded identified by block hash. /// Set of block receipts being downloaded identified by receipt root.
downloading_receipts: HashSet<H256>, downloading_receipts: HashSet<H256>,
} }
@ -198,21 +199,24 @@ impl BlockCollection {
head = self.parents.get(&head.unwrap()).cloned(); head = self.parents.get(&head.unwrap()).cloned();
if let Some(head) = head { if let Some(head) = head {
match self.blocks.get(&head) { match self.blocks.get(&head) {
Some(block) if block.receipts.is_none() && !self.downloading_receipts.contains(&head) => { Some(block) => {
self.downloading_receipts.insert(head.clone()); if block.receipts.is_none() && !self.downloading_receipts.contains(&block.receipts_root) {
needed_receipts.push(head.clone()); self.downloading_receipts.insert(block.receipts_root);
needed_receipts.push(head.clone());
}
} }
_ => (), _ => (),
} }
} }
} }
for h in self.receipt_ids.values().flat_map(|hashes| hashes) { // If there are multiple blocks per receipt, only request one of them.
for (root, h) in self.receipt_ids.iter().map(|(root, hashes)| (root, hashes[0])) {
if needed_receipts.len() >= count { if needed_receipts.len() >= count {
break; break;
} }
if !self.downloading_receipts.contains(h) { if !self.downloading_receipts.contains(root) {
needed_receipts.push(h.clone()); needed_receipts.push(h.clone());
self.downloading_receipts.insert(h.clone()); self.downloading_receipts.insert(*root);
} }
} }
needed_receipts needed_receipts
@ -249,7 +253,9 @@ impl BlockCollection {
/// Unmark block receipt as being downloaded. /// Unmark block receipt as being downloaded.
pub fn clear_receipt_download(&mut self, hashes: &[H256]) { pub fn clear_receipt_download(&mut self, hashes: &[H256]) {
for h in hashes { for h in hashes {
self.downloading_receipts.remove(h); if let Some(ref block) = self.blocks.get(h) {
self.downloading_receipts.remove(&block.receipts_root);
}
} }
} }
@ -370,24 +376,22 @@ impl BlockCollection {
let receipts = UntrustedRlp::new(&r); let receipts = UntrustedRlp::new(&r);
ordered_trie_root(receipts.iter().map(|r| r.as_raw().to_vec())) //TODO: get rid of vectors here ordered_trie_root(receipts.iter().map(|r| r.as_raw().to_vec())) //TODO: get rid of vectors here
}; };
self.downloading_receipts.remove(&receipt_root);
match self.receipt_ids.entry(receipt_root) { match self.receipt_ids.entry(receipt_root) {
Entry::Occupied(mut entry) => { Entry::Occupied(entry) => {
let h = entry.get_mut().pop().expect("Empty vectors are not allowed in insert_receipt; qed"); for h in entry.remove() {
if entry.get().is_empty() { match self.blocks.get_mut(&h) {
entry.remove(); Some(ref mut block) => {
} trace!(target: "sync", "Got receipt {}", h);
self.downloading_receipts.remove(&h); block.receipts = Some(r.clone());
match self.blocks.get_mut(&h) { },
Some(ref mut block) => { None => {
trace!(target: "sync", "Got receipt {}", h); warn!("Got receipt with no header {}", h);
block.receipts = Some(r.clone()); return Err(NetworkError::BadProtocol)
Ok(()) }
},
None => {
debug!("Got receipt with no header {}", h);
Err(NetworkError::BadProtocol)
} }
} }
Ok(())
} }
_ => { _ => {
trace!(target: "sync", "Ignored unknown/stale block receipt {:?}", receipt_root); trace!(target: "sync", "Ignored unknown/stale block receipt {:?}", receipt_root);
@ -414,6 +418,7 @@ impl BlockCollection {
header: header, header: header,
body: None, body: None,
receipts: None, receipts: None,
receipts_root: H256::new(),
}; };
let header_id = HeaderId { let header_id = HeaderId {
transactions_root: info.transactions_root().clone(), transactions_root: info.transactions_root().clone(),
@ -438,6 +443,7 @@ impl BlockCollection {
} else { } else {
self.receipt_ids.entry(receipt_root).or_insert_with(|| SmallHashVec::new()).push(hash.clone()); self.receipt_ids.entry(receipt_root).or_insert_with(|| SmallHashVec::new()).push(hash.clone());
} }
block.receipts_root = receipt_root;
} }
self.parents.insert(info.parent_hash().clone(), hash.clone()); self.parents.insert(info.parent_hash().clone(), hash.clone());