openethereum/ethcore/sync/src/blocks.rs

708 lines
21 KiB
Rust
Raw Normal View History

// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
2016-05-16 14:36:35 +02:00
// Parity Ethereum is free software: you can redistribute it and/or modify
2016-05-16 14:36:35 +02:00
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
2016-05-16 14:36:35 +02:00
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
2016-05-16 14:36:35 +02:00
use std::collections::{HashSet, HashMap, hash_map};
2017-08-31 11:35:41 +02:00
use hash::{keccak, KECCAK_NULL_RLP, KECCAK_EMPTY_LIST_RLP};
use heapsize::HeapSizeOf;
use ethereum_types::H256;
use triehash_ethereum::ordered_trie_root;
use bytes::Bytes;
use rlp::{Rlp, RlpStream, DecoderError};
2017-11-13 14:37:08 +01:00
use network;
use ethcore::verification::queue::kind::blocks::Unverified;
use types::transaction::UnverifiedTransaction;
use types::header::Header as BlockHeader;
2016-05-16 14:36:35 +02:00
known_heap_size!(0, HeaderId);
2016-05-16 14:36:35 +02:00
#[derive(PartialEq, Debug, Clone)]
pub struct SyncHeader {
pub bytes: Bytes,
pub header: BlockHeader,
}
impl HeapSizeOf for SyncHeader {
fn heap_size_of_children(&self) -> usize {
self.bytes.heap_size_of_children()
+ self.header.heap_size_of_children()
}
}
impl SyncHeader {
pub fn from_rlp(bytes: Bytes) -> Result<Self, DecoderError> {
let result = SyncHeader {
header: ::rlp::decode(&bytes)?,
bytes,
};
Ok(result)
}
}
pub struct SyncBody {
pub transactions_bytes: Bytes,
pub transactions: Vec<UnverifiedTransaction>,
pub uncles_bytes: Bytes,
pub uncles: Vec<BlockHeader>,
}
impl SyncBody {
pub fn from_rlp(bytes: &[u8]) -> Result<Self, DecoderError> {
let rlp = Rlp::new(bytes);
let transactions_rlp = rlp.at(0)?;
let uncles_rlp = rlp.at(1)?;
let result = SyncBody {
transactions_bytes: transactions_rlp.as_raw().to_vec(),
transactions: transactions_rlp.as_list()?,
uncles_bytes: uncles_rlp.as_raw().to_vec(),
uncles: uncles_rlp.as_list()?,
};
Ok(result)
}
fn empty_body() -> Self {
SyncBody {
transactions_bytes: ::rlp::EMPTY_LIST_RLP.to_vec(),
transactions: Vec::with_capacity(0),
uncles_bytes: ::rlp::EMPTY_LIST_RLP.to_vec(),
uncles: Vec::with_capacity(0),
}
}
}
impl HeapSizeOf for SyncBody {
fn heap_size_of_children(&self) -> usize {
self.transactions_bytes.heap_size_of_children()
+ self.transactions.heap_size_of_children()
+ self.uncles_bytes.heap_size_of_children()
+ self.uncles.heap_size_of_children()
}
}
2016-05-16 14:36:35 +02:00
/// Block data with optional body.
struct SyncBlock {
header: SyncHeader,
body: Option<SyncBody>,
receipts: Option<Bytes>,
2017-03-31 14:12:15 +02:00
receipts_root: H256,
}
impl HeapSizeOf for SyncBlock {
fn heap_size_of_children(&self) -> usize {
self.header.heap_size_of_children() + self.body.heap_size_of_children()
}
}
fn unverified_from_sync(header: SyncHeader, body: Option<SyncBody>) -> Unverified {
let mut stream = RlpStream::new_list(3);
stream.append_raw(&header.bytes, 1);
let body = body.unwrap_or_else(SyncBody::empty_body);
stream.append_raw(&body.transactions_bytes, 1);
stream.append_raw(&body.uncles_bytes, 1);
Unverified {
header: header.header,
transactions: body.transactions,
uncles: body.uncles,
bytes: stream.out().to_vec(),
}
}
/// Block with optional receipt
pub struct BlockAndReceipts {
/// Block data.
pub block: Unverified,
/// Block receipts RLP list.
pub receipts: Option<Bytes>,
2016-05-16 14:36:35 +02:00
}
/// Used to identify header by transactions and uncles hashes
#[derive(Eq, PartialEq, Hash)]
struct HeaderId {
transactions_root: H256,
uncles: H256
}
/// A collection of blocks and subchain pointers being downloaded. This keeps track of
/// which headers/bodies need to be downloaded, which are being downloaded and also holds
/// the downloaded blocks.
#[derive(Default)]
2016-05-16 14:36:35 +02:00
pub struct BlockCollection {
/// Does this collection need block receipts.
need_receipts: bool,
2016-05-16 14:36:35 +02:00
/// Heads of subchains to download
heads: Vec<H256>,
/// Downloaded blocks.
blocks: HashMap<H256, SyncBlock>,
/// Downloaded blocks by parent.
parents: HashMap<H256, H256>,
/// Used to map body to header.
header_ids: HashMap<HeaderId, H256>,
2017-03-29 19:59:20 +02:00
/// Used to map receipts root to headers.
receipt_ids: HashMap<H256, Vec<H256>>,
2016-05-16 14:36:35 +02:00
/// First block in `blocks`.
head: Option<H256>,
/// Set of block header hashes being downloaded
downloading_headers: HashSet<H256>,
/// Set of block bodies being downloaded identified by block hash.
downloading_bodies: HashSet<H256>,
2017-03-31 14:12:15 +02:00
/// Set of block receipts being downloaded identified by receipt root.
downloading_receipts: HashSet<H256>,
2016-05-16 14:36:35 +02:00
}
impl BlockCollection {
/// Create a new instance.
pub fn new(download_receipts: bool) -> BlockCollection {
2016-05-16 14:36:35 +02:00
BlockCollection {
need_receipts: download_receipts,
2016-05-16 14:36:35 +02:00
blocks: HashMap::new(),
header_ids: HashMap::new(),
receipt_ids: HashMap::new(),
2016-05-16 14:36:35 +02:00
heads: Vec::new(),
parents: HashMap::new(),
head: None,
downloading_headers: HashSet::new(),
downloading_bodies: HashSet::new(),
downloading_receipts: HashSet::new(),
2016-05-16 14:36:35 +02:00
}
}
/// Clear everything.
pub fn clear(&mut self) {
self.blocks.clear();
self.parents.clear();
self.header_ids.clear();
self.receipt_ids.clear();
2016-05-16 14:36:35 +02:00
self.heads.clear();
self.head = None;
self.downloading_headers.clear();
self.downloading_bodies.clear();
self.downloading_receipts.clear();
2016-05-16 14:36:35 +02:00
}
/// Reset collection for a new sync round with given subchain block hashes.
pub fn reset_to(&mut self, hashes: Vec<H256>) {
self.clear();
self.heads = hashes;
}
2016-05-16 19:46:09 +02:00
/// Insert a set of headers into collection and advance subchain head pointers.
pub fn insert_headers(&mut self, headers: Vec<SyncHeader>) {
for h in headers {
2016-05-16 14:36:35 +02:00
if let Err(e) = self.insert_header(h) {
trace!(target: "sync", "Ignored invalid header: {:?}", e);
}
}
self.update_heads();
}
2016-05-16 19:46:09 +02:00
2016-05-16 14:36:35 +02:00
/// Insert a collection of block bodies for previously downloaded headers.
pub fn insert_bodies(&mut self, bodies: Vec<SyncBody>) -> Vec<H256> {
bodies.into_iter()
.filter_map(|b| {
self.insert_body(b)
.map_err(|e| trace!(target: "sync", "Ignored invalid body: {:?}", e))
.ok()
})
.collect()
}
/// Insert a collection of block receipts for previously downloaded headers.
pub fn insert_receipts(&mut self, receipts: Vec<Bytes>) -> Vec<Vec<H256>> {
if !self.need_receipts {
return Vec::new();
2016-05-16 14:36:35 +02:00
}
receipts.into_iter()
.filter_map(|r| {
self.insert_receipt(r)
.map_err(|e| trace!(target: "sync", "Ignored invalid receipt: {:?}", e))
.ok()
})
.collect()
2016-05-16 14:36:35 +02:00
}
/// Returns a set of block hashes that require a body download. The returned set is marked as being downloaded.
pub fn needed_bodies(&mut self, count: usize, _ignore_downloading: bool) -> Vec<H256> {
if self.head.is_none() {
return Vec::new();
}
let mut needed_bodies: Vec<H256> = Vec::new();
let mut head = self.head;
while head.is_some() && needed_bodies.len() < count {
head = self.parents.get(&head.unwrap()).cloned();
if let Some(head) = head {
match self.blocks.get(&head) {
Some(block) if block.body.is_none() && !self.downloading_bodies.contains(&head) => {
2016-07-25 18:38:36 +02:00
self.downloading_bodies.insert(head.clone());
2016-05-16 14:36:35 +02:00
needed_bodies.push(head.clone());
}
_ => (),
}
}
}
2016-07-25 18:38:36 +02:00
for h in self.header_ids.values() {
if needed_bodies.len() >= count {
break;
}
if !self.downloading_bodies.contains(h) {
needed_bodies.push(h.clone());
self.downloading_bodies.insert(h.clone());
}
}
2016-05-16 14:36:35 +02:00
needed_bodies
}
/// Returns a set of block hashes that require a receipt download. The returned set is marked as being downloaded.
pub fn needed_receipts(&mut self, count: usize, _ignore_downloading: bool) -> Vec<H256> {
if self.head.is_none() || !self.need_receipts {
return Vec::new();
}
let mut needed_receipts: Vec<H256> = Vec::new();
let mut head = self.head;
while head.is_some() && needed_receipts.len() < count {
head = self.parents.get(&head.unwrap()).cloned();
if let Some(head) = head {
match self.blocks.get(&head) {
2017-03-31 14:12:15 +02:00
Some(block) => {
if block.receipts.is_none() && !self.downloading_receipts.contains(&block.receipts_root) {
self.downloading_receipts.insert(block.receipts_root);
needed_receipts.push(head.clone());
}
}
_ => (),
}
}
}
2017-03-31 14:12:15 +02:00
// If there are multiple blocks per receipt, only request one of them.
for (root, h) in self.receipt_ids.iter().map(|(root, hashes)| (root, hashes[0])) {
if needed_receipts.len() >= count {
break;
}
2017-03-31 14:12:15 +02:00
if !self.downloading_receipts.contains(root) {
needed_receipts.push(h.clone());
2017-03-31 14:12:15 +02:00
self.downloading_receipts.insert(*root);
}
}
needed_receipts
}
2016-05-16 14:36:35 +02:00
/// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded.
pub fn needed_headers(&mut self, count: usize, ignore_downloading: bool) -> Option<(H256, usize)> {
// find subchain to download
let mut download = None;
{
for h in &self.heads {
if ignore_downloading || !self.downloading_headers.contains(h) {
2016-05-16 14:36:35 +02:00
self.downloading_headers.insert(h.clone());
download = Some(h.clone());
break;
}
}
}
download.map(|h| (h, count))
}
/// Unmark header as being downloaded.
2016-05-16 14:36:35 +02:00
pub fn clear_header_download(&mut self, hash: &H256) {
self.downloading_headers.remove(hash);
}
/// Unmark block body as being downloaded.
pub fn clear_body_download(&mut self, hashes: &[H256]) {
for h in hashes {
self.downloading_bodies.remove(h);
}
}
/// Unmark block receipt as being downloaded.
pub fn clear_receipt_download(&mut self, hashes: &[H256]) {
for h in hashes {
2017-03-31 14:12:15 +02:00
if let Some(ref block) = self.blocks.get(h) {
self.downloading_receipts.remove(&block.receipts_root);
}
}
2016-05-16 14:36:35 +02:00
}
2018-05-17 10:58:35 +02:00
/// Get a valid chain of blocks ordered in ascending order and ready for importing into blockchain.
pub fn drain(&mut self) -> Vec<BlockAndReceipts> {
2016-05-16 14:36:35 +02:00
if self.blocks.is_empty() || self.head.is_none() {
return Vec::new();
}
let mut drained = Vec::new();
let mut hashes = Vec::new();
{
let mut blocks = Vec::new();
let mut head = self.head;
while let Some(h) = head {
head = self.parents.get(&h).cloned();
2016-05-16 14:36:35 +02:00
if let Some(head) = head {
match self.blocks.remove(&head) {
Some(block) => {
if block.body.is_some() && (!self.need_receipts || block.receipts.is_some()) {
blocks.push(block);
hashes.push(head);
self.head = Some(head);
} else {
self.blocks.insert(head, block);
break;
}
},
_ => {
break;
},
2016-05-16 14:36:35 +02:00
}
}
}
for block in blocks.into_iter() {
let unverified = unverified_from_sync(block.header, block.body);
drained.push(BlockAndReceipts {
block: unverified,
receipts: block.receipts.clone(),
});
2016-05-16 14:36:35 +02:00
}
}
trace!(target: "sync", "Drained {} blocks, new head :{:?}", drained.len(), self.head);
2016-05-16 14:36:35 +02:00
drained
}
/// Check if the collection is empty. We consider the syncing round complete once
/// there is no block data left and only a single or none head pointer remains.
pub fn is_empty(&self) -> bool {
self.heads.len() == 0 || (self.heads.len() == 1 && self.head.map_or(false, |h| h == self.heads[0]))
2016-05-16 14:36:35 +02:00
}
2016-11-16 19:34:12 +01:00
/// Check if collection contains a block header.
2016-05-16 14:36:35 +02:00
pub fn contains(&self, hash: &H256) -> bool {
self.blocks.contains_key(hash)
}
Fix ancient blocks sync (#9531) * Log block set in block_sync for easier debugging * logging macros * Match no args in sync logging macros * Add QueueFull error * Only allow importing headers if the first matches requested * WIP * Test for chain head gaps and log * Calc distance even with 2 heads * Revert previous commits, preparing simple fix This reverts commit 5f38aa885b22ebb0e3a1d60120cea69f9f322628. * Reject headers with no gaps when ChainHead * Reset block sync download when queue full * Simplify check for subchain heads * Add comment to explain subchain heads filter * Fix is_subchain_heads check and comment * Prevent premature round completion after restart This is a problem on mainnet where multiple stale peer requests will force many rounds to complete quickly, forcing the retraction. * Reset stale old blocks request after queue full * Revert "Reject headers with no gaps when ChainHead" This reverts commit 0eb865539e5dee37ab34f168f5fb643300de5ace. * Add BlockSet to BlockDownloader logging Currently it is difficult to debug this because there are two instances, one for OldBlocks and one for NewBlocks. This adds the BlockSet to all log messages for easy log filtering. * Reset OldBlocks download from last enqueued Previously when the ancient block queue was full it would restart the download from the last imported block, so the ones still in the queue would be redownloaded. Keeping the existing downloader instance and just resetting it will start again from the last enqueued block.:wq * Ignore expired Body and Receipt requests * Log when ancient block download being restarted * Only request old blocks from peers with >= difficulty https://github.com/paritytech/parity-ethereum/pull/9226 might be too permissive and causing the behaviour of the retraction soon after the fork block. With this change the peer difficulty has to be greater than or euqal to our syncing difficulty, so should still fix https://github.com/paritytech/parity-ethereum/issues/9225 * Some logging and clear stalled blocks head * Revert "Some logging and clear stalled blocks head" This reverts commit 757641d9b817ae8b63fec684759b0815af9c4d0e. * Reset stalled header if useless more than once * Store useless headers in HashSet * Add sync target to logging macro * Don't disable useless peer and fix log macro * Clear useless headers on reset and comments * Use custom error for collecting blocks Previously we resued BlockImportError, however only the Invalid case and this made little sense with the QueueFull error. * Remove blank line * Test for reset sync after consecutive useless headers * Don't reset after consecutive headers when chain head * Delete commented out imports * Return DownloadAction from collect_blocks instead of error * Don't reset after round complete, was causing test hangs * Add comment explaining reset after useless * Replace HashSet with counter for useless headers * Refactor sync reset on bad block/queue full * Add missing target for log message * Fix compiler errors and test after merge * ethcore: revert ethereum tests submodule update
2018-10-09 15:31:40 +02:00
/// Check the number of heads
pub fn heads_len(&self) -> usize {
self.heads.len()
}
/// Return used heap size.
2016-05-16 14:36:35 +02:00
pub fn heap_size(&self) -> usize {
self.heads.heap_size_of_children()
+ self.blocks.heap_size_of_children()
+ self.parents.heap_size_of_children()
+ self.header_ids.heap_size_of_children()
+ self.downloading_headers.heap_size_of_children()
+ self.downloading_bodies.heap_size_of_children()
2016-05-16 14:36:35 +02:00
}
/// Check if given block hash is marked as being downloaded.
pub fn is_downloading(&self, hash: &H256) -> bool {
self.downloading_headers.contains(hash) || self.downloading_bodies.contains(hash)
}
fn insert_body(&mut self, body: SyncBody) -> Result<H256, network::Error> {
let header_id = {
let tx_root = ordered_trie_root(Rlp::new(&body.transactions_bytes).iter().map(|r| r.as_raw()));
let uncles = keccak(&body.uncles_bytes);
HeaderId {
transactions_root: tx_root,
uncles: uncles
}
2016-05-16 14:36:35 +02:00
};
match self.header_ids.remove(&header_id) {
2016-05-16 14:36:35 +02:00
Some(h) => {
self.downloading_bodies.remove(&h);
match self.blocks.get_mut(&h) {
Some(ref mut block) => {
trace!(target: "sync", "Got body {}", h);
block.body = Some(body);
Ok(h)
2016-05-16 14:36:35 +02:00
},
None => {
warn!("Got body with no header {}", h);
2017-11-13 14:37:08 +01:00
Err(network::ErrorKind::BadProtocol.into())
}
2016-05-16 14:36:35 +02:00
}
}
None => {
trace!(target: "sync", "Ignored unknown/stale block body. tx_root = {:?}, uncles = {:?}", header_id.transactions_root, header_id.uncles);
2017-11-13 14:37:08 +01:00
Err(network::ErrorKind::BadProtocol.into())
}
}
}
fn insert_receipt(&mut self, r: Bytes) -> Result<Vec<H256>, network::Error> {
let receipt_root = {
let receipts = Rlp::new(&r);
2018-02-16 20:24:16 +01:00
ordered_trie_root(receipts.iter().map(|r| r.as_raw()))
};
2017-03-31 14:12:15 +02:00
self.downloading_receipts.remove(&receipt_root);
2017-03-29 19:59:20 +02:00
match self.receipt_ids.entry(receipt_root) {
hash_map::Entry::Occupied(entry) => {
let block_hashes = entry.remove();
for h in block_hashes.iter() {
2017-03-31 14:12:15 +02:00
match self.blocks.get_mut(&h) {
Some(ref mut block) => {
trace!(target: "sync", "Got receipt {}", h);
block.receipts = Some(r.clone());
},
None => {
warn!("Got receipt with no header {}", h);
2017-11-13 14:37:08 +01:00
return Err(network::ErrorKind::BadProtocol.into())
2017-03-31 14:12:15 +02:00
}
}
}
Ok(block_hashes)
},
hash_map::Entry::Vacant(_) => {
trace!(target: "sync", "Ignored unknown/stale block receipt {:?}", receipt_root);
2017-11-13 14:37:08 +01:00
Err(network::ErrorKind::BadProtocol.into())
}
}
2016-05-16 14:36:35 +02:00
}
fn insert_header(&mut self, info: SyncHeader) -> Result<H256, DecoderError> {
let hash = info.header.hash();
2016-05-16 14:36:35 +02:00
if self.blocks.contains_key(&hash) {
return Ok(hash);
}
2016-05-16 14:36:35 +02:00
match self.head {
None if hash == self.heads[0] => {
trace!(target: "sync", "New head {}", hash);
self.head = Some(info.header.parent_hash().clone());
2016-05-16 14:36:35 +02:00
},
_ => ()
}
let header_id = HeaderId {
transactions_root: *info.header.transactions_root(),
uncles: *info.header.uncles_hash(),
2016-05-16 14:36:35 +02:00
};
let body = if header_id.transactions_root == KECCAK_NULL_RLP && header_id.uncles == KECCAK_EMPTY_LIST_RLP {
2016-05-16 14:36:35 +02:00
// empty body, just mark as downloaded
Some(SyncBody::empty_body())
} else {
trace!(
"Queueing body tx_root = {:?}, uncles = {:?}, block = {:?}, number = {}",
header_id.transactions_root,
header_id.uncles,
hash,
info.header.number()
);
self.header_ids.insert(header_id, hash);
None
};
let (receipts, receipts_root) = if self.need_receipts {
let receipt_root = *info.header.receipts_root();
2017-08-31 11:35:41 +02:00
if receipt_root == KECCAK_NULL_RLP {
let receipts_stream = RlpStream::new_list(0);
(Some(receipts_stream.out()), receipt_root)
} else {
self.receipt_ids.entry(receipt_root).or_insert_with(Vec::new).push(hash);
(None, receipt_root)
}
} else {
(None, H256::new())
};
self.parents.insert(*info.header.parent_hash(), hash);
let block = SyncBlock {
header: info,
body,
receipts,
receipts_root,
};
2016-05-16 14:36:35 +02:00
self.blocks.insert(hash, block);
trace!(target: "sync", "New header: {:x}", hash);
2016-05-16 14:36:35 +02:00
Ok(hash)
}
// update subchain headers
fn update_heads(&mut self) {
let mut new_heads = Vec::new();
let old_subchains: HashSet<_> = { self.heads.iter().cloned().collect() };
2016-05-16 14:36:35 +02:00
for s in self.heads.drain(..) {
let mut h = s.clone();
if !self.blocks.contains_key(&h) {
new_heads.push(h);
continue;
}
2016-05-16 14:36:35 +02:00
loop {
match self.parents.get(&h) {
Some(next) => {
h = next.clone();
if old_subchains.contains(&h) {
trace!(target: "sync", "Completed subchain {:?}", s);
2016-05-16 14:36:35 +02:00
break; // reached head of the other subchain, merge by not adding
}
},
_ => {
new_heads.push(h);
break;
}
}
}
}
self.heads = new_heads;
}
}
#[cfg(test)]
mod test {
use super::{BlockCollection, SyncHeader};
use ethcore::client::{TestBlockChainClient, EachBlockWith, BlockId, BlockChainClient};
use types::BlockNumber;
use ethcore::verification::queue::kind::blocks::Unverified;
2016-09-01 14:49:12 +02:00
use rlp::*;
2016-05-16 14:36:35 +02:00
fn is_empty(bc: &BlockCollection) -> bool {
bc.heads.is_empty() &&
bc.blocks.is_empty() &&
bc.parents.is_empty() &&
bc.header_ids.is_empty() &&
bc.head.is_none() &&
bc.downloading_headers.is_empty() &&
bc.downloading_bodies.is_empty()
}
#[test]
fn create_clear() {
let mut bc = BlockCollection::new(false);
2016-05-16 14:36:35 +02:00
assert!(is_empty(&bc));
let client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Nothing);
let hashes = (0 .. 100).map(|i| (&client as &BlockChainClient).block_hash(BlockId::Number(i)).unwrap()).collect();
2016-05-16 14:36:35 +02:00
bc.reset_to(hashes);
assert!(!is_empty(&bc));
bc.clear();
assert!(is_empty(&bc));
}
#[test]
fn insert_headers() {
let mut bc = BlockCollection::new(false);
2016-05-16 14:36:35 +02:00
assert!(is_empty(&bc));
let client = TestBlockChainClient::new();
let nblocks = 200;
client.add_blocks(nblocks, EachBlockWith::Nothing);
let blocks: Vec<_> = (0..nblocks)
.map(|i| (&client as &BlockChainClient).block(BlockId::Number(i as BlockNumber)).unwrap().into_inner())
.collect();
let headers: Vec<_> = blocks.iter().map(|b| SyncHeader::from_rlp(Rlp::new(b).at(0).unwrap().as_raw().to_vec()).unwrap()).collect();
let hashes: Vec<_> = headers.iter().map(|h| h.header.hash()).collect();
let heads: Vec<_> = hashes.iter().enumerate().filter_map(|(i, h)| if i % 20 == 0 { Some(*h) } else { None }).collect();
2016-05-16 14:36:35 +02:00
bc.reset_to(heads);
assert!(!bc.is_empty());
assert_eq!(hashes[0], bc.heads[0]);
assert!(bc.needed_bodies(1, false).is_empty());
assert!(!bc.contains(&hashes[0]));
assert!(!bc.is_downloading(&hashes[0]));
let (h, n) = bc.needed_headers(6, false).unwrap();
assert!(bc.is_downloading(&hashes[0]));
assert_eq!(hashes[0], h);
assert_eq!(n, 6);
assert_eq!(bc.downloading_headers.len(), 1);
assert!(bc.drain().is_empty());
bc.insert_headers(headers[0..6].into_iter().map(Clone::clone).collect());
2016-05-16 14:36:35 +02:00
assert_eq!(hashes[5], bc.heads[0]);
for h in &hashes[0..6] {
bc.clear_header_download(h)
}
assert_eq!(bc.downloading_headers.len(), 0);
assert!(!bc.is_downloading(&hashes[0]));
assert!(bc.contains(&hashes[0]));
assert_eq!(
bc.drain().into_iter().map(|b| b.block).collect::<Vec<_>>(),
blocks[0..6].iter().map(|b| Unverified::from_rlp(b.to_vec()).unwrap()).collect::<Vec<_>>()
);
2016-05-16 14:36:35 +02:00
assert!(!bc.contains(&hashes[0]));
assert_eq!(hashes[5], bc.head.unwrap());
let (h, _) = bc.needed_headers(6, false).unwrap();
assert_eq!(hashes[5], h);
let (h, _) = bc.needed_headers(6, false).unwrap();
assert_eq!(hashes[20], h);
bc.insert_headers(headers[10..16].into_iter().map(Clone::clone).collect());
2016-05-16 14:36:35 +02:00
assert!(bc.drain().is_empty());
bc.insert_headers(headers[5..10].into_iter().map(Clone::clone).collect());
assert_eq!(
bc.drain().into_iter().map(|b| b.block).collect::<Vec<_>>(),
blocks[6..16].iter().map(|b| Unverified::from_rlp(b.to_vec()).unwrap()).collect::<Vec<_>>()
);
2016-05-16 14:36:35 +02:00
assert_eq!(hashes[15], bc.heads[0]);
bc.insert_headers(headers[15..].into_iter().map(Clone::clone).collect());
2016-05-16 14:36:35 +02:00
bc.drain();
assert!(bc.is_empty());
}
#[test]
fn insert_headers_with_gap() {
let mut bc = BlockCollection::new(false);
2016-05-16 14:36:35 +02:00
assert!(is_empty(&bc));
let client = TestBlockChainClient::new();
let nblocks = 200;
client.add_blocks(nblocks, EachBlockWith::Nothing);
let blocks: Vec<_> = (0..nblocks)
.map(|i| (&client as &BlockChainClient).block(BlockId::Number(i as BlockNumber)).unwrap().into_inner())
.collect();
let headers: Vec<_> = blocks.iter().map(|b| SyncHeader::from_rlp(Rlp::new(b).at(0).unwrap().as_raw().to_vec()).unwrap()).collect();
let hashes: Vec<_> = headers.iter().map(|h| h.header.hash()).collect();
let heads: Vec<_> = hashes.iter().enumerate().filter_map(|(i, h)| if i % 20 == 0 { Some(*h) } else { None }).collect();
2016-05-16 14:36:35 +02:00
bc.reset_to(heads);
bc.insert_headers(headers[2..22].into_iter().map(Clone::clone).collect());
2016-05-16 14:36:35 +02:00
assert_eq!(hashes[0], bc.heads[0]);
assert_eq!(hashes[21], bc.heads[1]);
assert!(bc.head.is_none());
bc.insert_headers(headers[0..2].into_iter().map(Clone::clone).collect());
2016-05-16 14:36:35 +02:00
assert!(bc.head.is_some());
assert_eq!(hashes[21], bc.heads[0]);
}
#[test]
fn insert_headers_no_gap() {
let mut bc = BlockCollection::new(false);
assert!(is_empty(&bc));
let client = TestBlockChainClient::new();
let nblocks = 200;
client.add_blocks(nblocks, EachBlockWith::Nothing);
let blocks: Vec<_> = (0..nblocks)
.map(|i| (&client as &BlockChainClient).block(BlockId::Number(i as BlockNumber)).unwrap().into_inner())
.collect();
let headers: Vec<_> = blocks.iter().map(|b| SyncHeader::from_rlp(Rlp::new(b).at(0).unwrap().as_raw().to_vec()).unwrap()).collect();
let hashes: Vec<_> = headers.iter().map(|h| h.header.hash()).collect();
let heads: Vec<_> = hashes.iter().enumerate().filter_map(|(i, h)| if i % 20 == 0 { Some(*h) } else { None }).collect();
bc.reset_to(heads);
bc.insert_headers(headers[1..2].into_iter().map(Clone::clone).collect());
assert!(bc.drain().is_empty());
bc.insert_headers(headers[0..1].into_iter().map(Clone::clone).collect());
assert_eq!(bc.drain().len(), 2);
}
2016-05-16 14:36:35 +02:00
}