Verify block syncing responses against requests (#9670)

* sync: Validate received BlockHeaders packets against stored request.

* sync: Validate received BlockBodies and BlockReceipts.

* sync: Fix broken tests.

* sync: Unit tests for BlockDownloader::import_headers.

* sync: Unit tests for import_{bodies,receipts}.

* tests: Add missing method doc.
This commit is contained in:
Jim Posen 2018-10-03 03:35:10 -07:00 committed by Wei Tang
parent 7ba5652bea
commit 2fc1679886
5 changed files with 472 additions and 135 deletions

View File

@ -118,7 +118,7 @@ pub struct TestBlockChainClient {
} }
/// Used for generating test client blocks. /// Used for generating test client blocks.
#[derive(Clone)] #[derive(Clone, Copy)]
pub enum EachBlockWith { pub enum EachBlockWith {
/// Plain block. /// Plain block.
Nothing, Nothing,
@ -242,16 +242,21 @@ impl TestBlockChainClient {
*self.error_on_logs.write() = val; *self.error_on_logs.write() = val;
} }
/// Add blocks to test client. /// Add a block to test client.
pub fn add_blocks(&self, count: usize, with: EachBlockWith) { pub fn add_block<F>(&self, with: EachBlockWith, hook: F)
let len = self.numbers.read().len(); where F: Fn(BlockHeader) -> BlockHeader
for n in len..(len + count) { {
let n = self.numbers.read().len();
let mut header = BlockHeader::new(); let mut header = BlockHeader::new();
header.set_difficulty(From::from(n)); header.set_difficulty(From::from(n));
header.set_parent_hash(self.last_hash.read().clone()); header.set_parent_hash(self.last_hash.read().clone());
header.set_number(n as BlockNumber); header.set_number(n as BlockNumber);
header.set_gas_limit(U256::from(1_000_000)); header.set_gas_limit(U256::from(1_000_000));
header.set_extra_data(self.extra_data.clone()); header.set_extra_data(self.extra_data.clone());
header = hook(header);
let uncles = match with { let uncles = match with {
EachBlockWith::Uncle | EachBlockWith::UncleAndTransaction => { EachBlockWith::Uncle | EachBlockWith::UncleAndTransaction => {
let mut uncles = RlpStream::new_list(1); let mut uncles = RlpStream::new_list(1);
@ -293,18 +298,12 @@ impl TestBlockChainClient {
let unverified = Unverified::from_rlp(rlp.out()).unwrap(); let unverified = Unverified::from_rlp(rlp.out()).unwrap();
self.import_block(unverified).unwrap(); self.import_block(unverified).unwrap();
} }
}
/// Make a bad block by setting invalid extra data. /// Add a sequence of blocks to test client.
pub fn corrupt_block(&self, n: BlockNumber) { pub fn add_blocks(&self, count: usize, with: EachBlockWith) {
let hash = self.block_hash(BlockId::Number(n)).unwrap(); for _ in 0..count {
let mut header: BlockHeader = self.block_header(BlockId::Number(n)).unwrap().decode().expect("decoding failed"); self.add_block(with, |header| header);
header.set_extra_data(b"This extra data is way too long to be considered valid".to_vec()); }
let mut rlp = RlpStream::new_list(3);
rlp.append(&header);
rlp.append_raw(&::rlp::NULL_RLP, 1);
rlp.append_raw(&::rlp::NULL_RLP, 1);
self.blocks.write().insert(hash, rlp.out());
} }
/// Make a bad block by setting invalid parent hash. /// Make a bad block by setting invalid parent hash.

View File

@ -220,7 +220,7 @@ impl BlockDownloader {
} }
/// Add new block headers. /// Add new block headers.
pub fn import_headers(&mut self, io: &mut SyncIo, r: &Rlp, expected_hash: Option<H256>) -> Result<DownloadAction, BlockDownloaderImportError> { pub fn import_headers(&mut self, io: &mut SyncIo, r: &Rlp, expected_hash: H256) -> Result<DownloadAction, BlockDownloaderImportError> {
let item_count = r.item_count().unwrap_or(0); let item_count = r.item_count().unwrap_or(0);
if self.state == State::Idle { if self.state == State::Idle {
trace!(target: "sync", "Ignored unexpected block headers"); trace!(target: "sync", "Ignored unexpected block headers");
@ -230,30 +230,50 @@ impl BlockDownloader {
return Err(BlockDownloaderImportError::Invalid); return Err(BlockDownloaderImportError::Invalid);
} }
// The request is generated in ::request_blocks.
let (max_count, skip) = if self.state == State::ChainHead {
(SUBCHAIN_SIZE as usize, (MAX_HEADERS_TO_REQUEST - 2) as u64)
} else {
(MAX_HEADERS_TO_REQUEST, 0)
};
if item_count > max_count {
debug!(target: "sync", "Headers response is larger than expected");
return Err(BlockDownloaderImportError::Invalid);
}
let mut headers = Vec::new(); let mut headers = Vec::new();
let mut hashes = Vec::new(); let mut hashes = Vec::new();
let mut valid_response = item_count == 0; //empty response is valid let mut last_header = None;
let mut any_known = false;
for i in 0..item_count { for i in 0..item_count {
let info = SyncHeader::from_rlp(r.at(i)?.as_raw().to_vec())?; let info = SyncHeader::from_rlp(r.at(i)?.as_raw().to_vec())?;
let number = BlockNumber::from(info.header.number()); let number = BlockNumber::from(info.header.number());
let hash = info.header.hash(); let hash = info.header.hash();
// Check if any of the headers matches the hash we requested
let valid_response = match last_header {
// First header must match expected hash.
None => expected_hash == hash,
Some((last_number, last_hash)) => {
// Subsequent headers must be spaced by skip interval.
let skip_valid = number == last_number + skip + 1;
// Consecutive headers must be linked by parent hash.
let parent_valid = (number != last_number + 1) || *info.header.parent_hash() == last_hash;
skip_valid && parent_valid
}
};
// Disable the peer for this syncing round if it gives invalid chain
if !valid_response { if !valid_response {
if let Some(expected) = expected_hash { debug!(target: "sync", "Invalid headers response");
valid_response = expected == hash; return Err(BlockDownloaderImportError::Invalid);
} }
}
any_known = any_known || self.blocks.contains_head(&hash); last_header = Some((number, hash));
if self.blocks.contains(&hash) { if self.blocks.contains(&hash) {
trace!(target: "sync", "Skipping existing block header {} ({:?})", number, hash); trace!(target: "sync", "Skipping existing block header {} ({:?})", number, hash);
continue; continue;
} }
if self.highest_block.as_ref().map_or(true, |n| number > *n) {
self.highest_block = Some(number);
}
match io.chain().block_status(BlockId::Hash(hash.clone())) { match io.chain().block_status(BlockId::Hash(hash.clone())) {
BlockStatus::InChain | BlockStatus::Queued => { BlockStatus::InChain | BlockStatus::Queued => {
match self.state { match self.state {
@ -273,16 +293,15 @@ impl BlockDownloader {
} }
} }
// Disable the peer for this syncing round if it gives invalid chain if let Some((number, _)) = last_header {
if !valid_response { if self.highest_block.as_ref().map_or(true, |n| number > *n) {
trace!(target: "sync", "Invalid headers response"); self.highest_block = Some(number);
return Err(BlockDownloaderImportError::Invalid); }
} }
match self.state { match self.state {
State::ChainHead => { State::ChainHead => {
if !headers.is_empty() { if !headers.is_empty() {
// TODO: validate heads better. E.g. check that there is enough distance between blocks.
trace!(target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); trace!(target: "sync", "Received {} subchain heads, proceeding to download", headers.len());
self.blocks.reset_to(hashes); self.blocks.reset_to(hashes);
self.state = State::Blocks; self.state = State::Blocks;
@ -299,8 +318,7 @@ impl BlockDownloader {
}, },
State::Blocks => { State::Blocks => {
let count = headers.len(); let count = headers.len();
// At least one of the heades must advance the subchain. Otherwise they are all useless. if count == 0 {
if count == 0 || !any_known {
trace!(target: "sync", "No useful headers"); trace!(target: "sync", "No useful headers");
return Err(BlockDownloaderImportError::Useless); return Err(BlockDownloaderImportError::Useless);
} }
@ -314,7 +332,7 @@ impl BlockDownloader {
} }
/// Called by peer once it has new block bodies /// Called by peer once it has new block bodies
pub fn import_bodies(&mut self, r: &Rlp) -> Result<(), BlockDownloaderImportError> { pub fn import_bodies(&mut self, r: &Rlp, expected_hashes: &[H256]) -> Result<(), BlockDownloaderImportError> {
let item_count = r.item_count().unwrap_or(0); let item_count = r.item_count().unwrap_or(0);
if item_count == 0 { if item_count == 0 {
return Err(BlockDownloaderImportError::Useless); return Err(BlockDownloaderImportError::Useless);
@ -327,16 +345,21 @@ impl BlockDownloader {
bodies.push(body); bodies.push(body);
} }
if self.blocks.insert_bodies(bodies) != item_count { let hashes = self.blocks.insert_bodies(bodies);
if hashes.len() != item_count {
trace!(target: "sync", "Deactivating peer for giving invalid block bodies"); trace!(target: "sync", "Deactivating peer for giving invalid block bodies");
return Err(BlockDownloaderImportError::Invalid); return Err(BlockDownloaderImportError::Invalid);
} }
if !all_expected(hashes.as_slice(), expected_hashes, |&a, &b| a == b) {
trace!(target: "sync", "Deactivating peer for giving unexpected block bodies");
return Err(BlockDownloaderImportError::Invalid);
}
} }
Ok(()) Ok(())
} }
/// Called by peer once it has new block bodies /// Called by peer once it has new block bodies
pub fn import_receipts(&mut self, _io: &mut SyncIo, r: &Rlp) -> Result<(), BlockDownloaderImportError> { pub fn import_receipts(&mut self, r: &Rlp, expected_hashes: &[H256]) -> Result<(), BlockDownloaderImportError> {
let item_count = r.item_count().unwrap_or(0); let item_count = r.item_count().unwrap_or(0);
if item_count == 0 { if item_count == 0 {
return Err(BlockDownloaderImportError::Useless); return Err(BlockDownloaderImportError::Useless);
@ -353,10 +376,15 @@ impl BlockDownloader {
})?; })?;
receipts.push(receipt.as_raw().to_vec()); receipts.push(receipt.as_raw().to_vec());
} }
if self.blocks.insert_receipts(receipts) != item_count { let hashes = self.blocks.insert_receipts(receipts);
if hashes.len() != item_count {
trace!(target: "sync", "Deactivating peer for giving invalid block receipts"); trace!(target: "sync", "Deactivating peer for giving invalid block receipts");
return Err(BlockDownloaderImportError::Invalid); return Err(BlockDownloaderImportError::Invalid);
} }
if !all_expected(hashes.as_slice(), expected_hashes, |a, b| a.contains(b)) {
trace!(target: "sync", "Deactivating peer for giving unexpected block receipts");
return Err(BlockDownloaderImportError::Invalid);
}
} }
Ok(()) Ok(())
} }
@ -549,4 +577,298 @@ impl BlockDownloader {
} }
} }
//TODO: module tests // Determines if the first argument matches an ordered subset of the second, according to some predicate.
fn all_expected<A, B, F>(values: &[A], expected_values: &[B], is_expected: F) -> bool
where F: Fn(&A, &B) -> bool
{
let mut expected_iter = expected_values.iter();
values.iter().all(|val1| {
while let Some(val2) = expected_iter.next() {
if is_expected(val1, val2) {
return true;
}
}
false
})
}
#[cfg(test)]
mod tests {
use super::*;
use ethcore::client::TestBlockChainClient;
use ethcore::header::Header as BlockHeader;
use ethcore::spec::Spec;
use ethkey::{Generator,Random};
use hash::keccak;
use parking_lot::RwLock;
use rlp::{encode_list,RlpStream};
use tests::helpers::TestIo;
use tests::snapshot::TestSnapshotService;
use transaction::{Transaction,SignedTransaction};
use triehash_ethereum::ordered_trie_root;
fn dummy_header(number: u64, parent_hash: H256) -> BlockHeader {
let mut header = BlockHeader::new();
header.set_gas_limit(0.into());
header.set_difficulty((number * 100).into());
header.set_timestamp(number * 10);
header.set_number(number);
header.set_parent_hash(parent_hash);
header.set_state_root(H256::zero());
header
}
fn dummy_signed_tx() -> SignedTransaction {
let keypair = Random.generate().unwrap();
Transaction::default().sign(keypair.secret(), None)
}
#[test]
fn import_headers_in_chain_head_state() {
::env_logger::try_init().ok();
let spec = Spec::new_test();
let genesis_hash = spec.genesis_header().hash();
let mut downloader = BlockDownloader::new(false, &genesis_hash, 0);
downloader.state = State::ChainHead;
let mut chain = TestBlockChainClient::new();
let snapshot_service = TestSnapshotService::new();
let queue = RwLock::new(VecDeque::new());
let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None);
// Valid headers sequence.
let valid_headers = [
spec.genesis_header(),
dummy_header(127, H256::random()),
dummy_header(254, H256::random()),
];
let rlp_data = encode_list(&valid_headers);
let valid_rlp = Rlp::new(&rlp_data);
match downloader.import_headers(&mut io, &valid_rlp, genesis_hash) {
Ok(DownloadAction::Reset) => assert_eq!(downloader.state, State::Blocks),
_ => panic!("expected transition to Blocks state"),
};
// Headers are rejected because the expected hash does not match.
let invalid_start_block_headers = [
dummy_header(0, H256::random()),
dummy_header(127, H256::random()),
dummy_header(254, H256::random()),
];
let rlp_data = encode_list(&invalid_start_block_headers);
let invalid_start_block_rlp = Rlp::new(&rlp_data);
match downloader.import_headers(&mut io, &invalid_start_block_rlp, genesis_hash) {
Err(BlockDownloaderImportError::Invalid) => (),
_ => panic!("expected BlockDownloaderImportError"),
};
// Headers are rejected because they are not spaced as expected.
let invalid_skip_headers = [
spec.genesis_header(),
dummy_header(128, H256::random()),
dummy_header(256, H256::random()),
];
let rlp_data = encode_list(&invalid_skip_headers);
let invalid_skip_rlp = Rlp::new(&rlp_data);
match downloader.import_headers(&mut io, &invalid_skip_rlp, genesis_hash) {
Err(BlockDownloaderImportError::Invalid) => (),
_ => panic!("expected BlockDownloaderImportError"),
};
// Invalid because the packet size is too large.
let mut too_many_headers = Vec::with_capacity((SUBCHAIN_SIZE + 1) as usize);
too_many_headers.push(spec.genesis_header());
for i in 1..(SUBCHAIN_SIZE + 1) {
too_many_headers.push(dummy_header((MAX_HEADERS_TO_REQUEST as u64 - 1) * i, H256::random()));
}
let rlp_data = encode_list(&too_many_headers);
let too_many_rlp = Rlp::new(&rlp_data);
match downloader.import_headers(&mut io, &too_many_rlp, genesis_hash) {
Err(BlockDownloaderImportError::Invalid) => (),
_ => panic!("expected BlockDownloaderImportError"),
};
}
#[test]
fn import_headers_in_blocks_state() {
::env_logger::try_init().ok();
let mut chain = TestBlockChainClient::new();
let snapshot_service = TestSnapshotService::new();
let queue = RwLock::new(VecDeque::new());
let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None);
let mut headers = Vec::with_capacity(3);
let parent_hash = H256::random();
headers.push(dummy_header(127, parent_hash));
let parent_hash = headers[0].hash();
headers.push(dummy_header(128, parent_hash));
let parent_hash = headers[1].hash();
headers.push(dummy_header(129, parent_hash));
let mut downloader = BlockDownloader::new(false, &H256::random(), 0);
downloader.state = State::Blocks;
downloader.blocks.reset_to(vec![headers[0].hash()]);
let rlp_data = encode_list(&headers);
let headers_rlp = Rlp::new(&rlp_data);
match downloader.import_headers(&mut io, &headers_rlp, headers[0].hash()) {
Ok(DownloadAction::None) => (),
_ => panic!("expected successful import"),
};
// Invalidate parent_hash link.
headers[2] = dummy_header(129, H256::random());
let rlp_data = encode_list(&headers);
let headers_rlp = Rlp::new(&rlp_data);
match downloader.import_headers(&mut io, &headers_rlp, headers[0].hash()) {
Err(BlockDownloaderImportError::Invalid) => (),
_ => panic!("expected BlockDownloaderImportError"),
};
// Invalidate header sequence by skipping a header.
headers[2] = dummy_header(130, headers[1].hash());
let rlp_data = encode_list(&headers);
let headers_rlp = Rlp::new(&rlp_data);
match downloader.import_headers(&mut io, &headers_rlp, headers[0].hash()) {
Err(BlockDownloaderImportError::Invalid) => (),
_ => panic!("expected BlockDownloaderImportError"),
};
}
#[test]
fn import_bodies() {
::env_logger::try_init().ok();
let mut chain = TestBlockChainClient::new();
let snapshot_service = TestSnapshotService::new();
let queue = RwLock::new(VecDeque::new());
let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None);
// Import block headers.
let mut headers = Vec::with_capacity(4);
let mut bodies = Vec::with_capacity(4);
let mut parent_hash = H256::zero();
for i in 0..4 {
// Construct the block body
let mut uncles = if i > 0 {
encode_list(&[dummy_header(i - 1, H256::random())]).into_vec()
} else {
::rlp::EMPTY_LIST_RLP.to_vec()
};
let mut txs = encode_list(&[dummy_signed_tx()]);
let tx_root = ordered_trie_root(Rlp::new(&txs).iter().map(|r| r.as_raw()));
let mut rlp = RlpStream::new_list(2);
rlp.append_raw(&txs, 1);
rlp.append_raw(&uncles, 1);
bodies.push(rlp.out());
// Construct the block header
let mut header = dummy_header(i, parent_hash);
header.set_transactions_root(tx_root);
header.set_uncles_hash(keccak(&uncles));
parent_hash = header.hash();
headers.push(header);
}
let mut downloader = BlockDownloader::new(false, &headers[0].hash(), 0);
downloader.state = State::Blocks;
downloader.blocks.reset_to(vec![headers[0].hash()]);
// Only import the first three block headers.
let rlp_data = encode_list(&headers[0..3]);
let headers_rlp = Rlp::new(&rlp_data);
assert!(downloader.import_headers(&mut io, &headers_rlp, headers[0].hash()).is_ok());
// Import first body successfully.
let mut rlp_data = RlpStream::new_list(1);
rlp_data.append_raw(&bodies[0], 1);
let bodies_rlp = Rlp::new(rlp_data.as_raw());
assert!(downloader.import_bodies(&bodies_rlp, &[headers[0].hash(), headers[1].hash()]).is_ok());
// Import second body successfully.
let mut rlp_data = RlpStream::new_list(1);
rlp_data.append_raw(&bodies[1], 1);
let bodies_rlp = Rlp::new(rlp_data.as_raw());
assert!(downloader.import_bodies(&bodies_rlp, &[headers[0].hash(), headers[1].hash()]).is_ok());
// Import unexpected third body.
let mut rlp_data = RlpStream::new_list(1);
rlp_data.append_raw(&bodies[2], 1);
let bodies_rlp = Rlp::new(rlp_data.as_raw());
match downloader.import_bodies(&bodies_rlp, &[headers[0].hash(), headers[1].hash()]) {
Err(BlockDownloaderImportError::Invalid) => (),
_ => panic!("expected BlockDownloaderImportError"),
};
}
#[test]
fn import_receipts() {
::env_logger::try_init().ok();
let mut chain = TestBlockChainClient::new();
let snapshot_service = TestSnapshotService::new();
let queue = RwLock::new(VecDeque::new());
let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None);
// Import block headers.
let mut headers = Vec::with_capacity(4);
let mut receipts = Vec::with_capacity(4);
let mut parent_hash = H256::zero();
for i in 0..4 {
// Construct the receipts. Receipt root for the first two blocks is the same.
//
// The RLP-encoded integers are clearly not receipts, but the BlockDownloader treats
// all receipts as byte blobs, so it does not matter.
let mut receipts_rlp = if i < 2 {
encode_list(&[0u32])
} else {
encode_list(&[i as u32])
};
let receipts_root = ordered_trie_root(Rlp::new(&receipts_rlp).iter().map(|r| r.as_raw()));
receipts.push(receipts_rlp);
// Construct the block header.
let mut header = dummy_header(i, parent_hash);
header.set_receipts_root(receipts_root);
parent_hash = header.hash();
headers.push(header);
}
let mut downloader = BlockDownloader::new(true, &headers[0].hash(), 0);
downloader.state = State::Blocks;
downloader.blocks.reset_to(vec![headers[0].hash()]);
// Only import the first three block headers.
let rlp_data = encode_list(&headers[0..3]);
let headers_rlp = Rlp::new(&rlp_data);
assert!(downloader.import_headers(&mut io, &headers_rlp, headers[0].hash()).is_ok());
// Import second and third receipts successfully.
let mut rlp_data = RlpStream::new_list(2);
rlp_data.append_raw(&receipts[1], 1);
rlp_data.append_raw(&receipts[2], 1);
let receipts_rlp = Rlp::new(rlp_data.as_raw());
assert!(downloader.import_receipts(&receipts_rlp, &[headers[1].hash(), headers[2].hash()]).is_ok());
// Import unexpected fourth receipt.
let mut rlp_data = RlpStream::new_list(1);
rlp_data.append_raw(&receipts[3], 1);
let bodies_rlp = Rlp::new(rlp_data.as_raw());
match downloader.import_bodies(&bodies_rlp, &[headers[1].hash(), headers[2].hash()]) {
Err(BlockDownloaderImportError::Invalid) => (),
_ => panic!("expected BlockDownloaderImportError"),
};
}
}

View File

@ -212,32 +212,28 @@ impl BlockCollection {
} }
/// Insert a collection of block bodies for previously downloaded headers. /// Insert a collection of block bodies for previously downloaded headers.
pub fn insert_bodies(&mut self, bodies: Vec<SyncBody>) -> usize { pub fn insert_bodies(&mut self, bodies: Vec<SyncBody>) -> Vec<H256> {
let mut inserted = 0; bodies.into_iter()
for b in bodies { .filter_map(|b| {
if let Err(e) = self.insert_body(b) { self.insert_body(b)
trace!(target: "sync", "Ignored invalid body: {:?}", e); .map_err(|e| trace!(target: "sync", "Ignored invalid body: {:?}", e))
} else { .ok()
inserted += 1; })
} .collect()
}
inserted
} }
/// Insert a collection of block receipts for previously downloaded headers. /// Insert a collection of block receipts for previously downloaded headers.
pub fn insert_receipts(&mut self, receipts: Vec<Bytes>) -> usize { pub fn insert_receipts(&mut self, receipts: Vec<Bytes>) -> Vec<Vec<H256>> {
if !self.need_receipts { if !self.need_receipts {
return 0; return Vec::new();
} }
let mut inserted = 0; receipts.into_iter()
for r in receipts { .filter_map(|r| {
if let Err(e) = self.insert_receipt(r) { self.insert_receipt(r)
trace!(target: "sync", "Ignored invalid receipt: {:?}", e); .map_err(|e| trace!(target: "sync", "Ignored invalid receipt: {:?}", e))
} else { .ok()
inserted += 1; })
} .collect()
}
inserted
} }
/// Returns a set of block hashes that require a body download. The returned set is marked as being downloaded. /// Returns a set of block hashes that require a body download. The returned set is marked as being downloaded.
@ -398,11 +394,6 @@ impl BlockCollection {
self.blocks.contains_key(hash) self.blocks.contains_key(hash)
} }
/// Check if collection contains a block header.
pub fn contains_head(&self, hash: &H256) -> bool {
self.heads.contains(hash)
}
/// Return used heap size. /// Return used heap size.
pub fn heap_size(&self) -> usize { pub fn heap_size(&self) -> usize {
self.heads.heap_size_of_children() self.heads.heap_size_of_children()
@ -418,7 +409,7 @@ impl BlockCollection {
self.downloading_headers.contains(hash) || self.downloading_bodies.contains(hash) self.downloading_headers.contains(hash) || self.downloading_bodies.contains(hash)
} }
fn insert_body(&mut self, body: SyncBody) -> Result<(), network::Error> { fn insert_body(&mut self, body: SyncBody) -> Result<H256, network::Error> {
let header_id = { let header_id = {
let tx_root = ordered_trie_root(Rlp::new(&body.transactions_bytes).iter().map(|r| r.as_raw())); let tx_root = ordered_trie_root(Rlp::new(&body.transactions_bytes).iter().map(|r| r.as_raw()));
let uncles = keccak(&body.uncles_bytes); let uncles = keccak(&body.uncles_bytes);
@ -435,7 +426,7 @@ impl BlockCollection {
Some(ref mut block) => { Some(ref mut block) => {
trace!(target: "sync", "Got body {}", h); trace!(target: "sync", "Got body {}", h);
block.body = Some(body); block.body = Some(body);
Ok(()) Ok(h)
}, },
None => { None => {
warn!("Got body with no header {}", h); warn!("Got body with no header {}", h);
@ -450,7 +441,7 @@ impl BlockCollection {
} }
} }
fn insert_receipt(&mut self, r: Bytes) -> Result<(), network::Error> { fn insert_receipt(&mut self, r: Bytes) -> Result<Vec<H256>, network::Error> {
let receipt_root = { let receipt_root = {
let receipts = Rlp::new(&r); let receipts = Rlp::new(&r);
ordered_trie_root(receipts.iter().map(|r| r.as_raw())) ordered_trie_root(receipts.iter().map(|r| r.as_raw()))
@ -458,7 +449,8 @@ impl BlockCollection {
self.downloading_receipts.remove(&receipt_root); self.downloading_receipts.remove(&receipt_root);
match self.receipt_ids.entry(receipt_root) { match self.receipt_ids.entry(receipt_root) {
hash_map::Entry::Occupied(entry) => { hash_map::Entry::Occupied(entry) => {
for h in entry.remove() { let block_hashes = entry.remove();
for h in block_hashes.iter() {
match self.blocks.get_mut(&h) { match self.blocks.get_mut(&h) {
Some(ref mut block) => { Some(ref mut block) => {
trace!(target: "sync", "Got receipt {}", h); trace!(target: "sync", "Got receipt {}", h);
@ -470,7 +462,7 @@ impl BlockCollection {
} }
} }
} }
Ok(()) Ok(block_hashes)
}, },
hash_map::Entry::Vacant(_) => { hash_map::Entry::Vacant(_) => {
trace!(target: "sync", "Ignored unknown/stale block receipt {:?}", receipt_root); trace!(target: "sync", "Ignored unknown/stale block receipt {:?}", receipt_root);

View File

@ -28,6 +28,7 @@ use network::PeerId;
use rlp::Rlp; use rlp::Rlp;
use snapshot::ChunkType; use snapshot::ChunkType;
use std::cmp; use std::cmp;
use std::mem;
use std::collections::HashSet; use std::collections::HashSet;
use std::time::Instant; use std::time::Instant;
use sync_io::SyncIo; use sync_io::SyncIo;
@ -296,6 +297,13 @@ impl SyncHandler {
trace!(target: "sync", "{}: Ignored unexpected bodies", peer_id); trace!(target: "sync", "{}: Ignored unexpected bodies", peer_id);
return Ok(()); return Ok(());
} }
let expected_blocks = match sync.peers.get_mut(&peer_id) {
Some(peer) => mem::replace(&mut peer.asking_blocks, Vec::new()),
None => {
trace!(target: "sync", "{}: Ignored unexpected bodies (peer not found)", peer_id);
return Ok(());
}
};
let item_count = r.item_count()?; let item_count = r.item_count()?;
trace!(target: "sync", "{} -> BlockBodies ({} entries), set = {:?}", peer_id, item_count, block_set); trace!(target: "sync", "{} -> BlockBodies ({} entries), set = {:?}", peer_id, item_count, block_set);
if item_count == 0 { if item_count == 0 {
@ -315,7 +323,7 @@ impl SyncHandler {
Some(ref mut blocks) => blocks, Some(ref mut blocks) => blocks,
} }
}; };
downloader.import_bodies(r)?; downloader.import_bodies(r, expected_blocks.as_slice())?;
} }
sync.collect_blocks(io, block_set); sync.collect_blocks(io, block_set);
Ok(()) Ok(())
@ -368,10 +376,23 @@ impl SyncHandler {
let expected_hash = sync.peers.get(&peer_id).and_then(|p| p.asking_hash); let expected_hash = sync.peers.get(&peer_id).and_then(|p| p.asking_hash);
let allowed = sync.peers.get(&peer_id).map(|p| p.is_allowed()).unwrap_or(false); let allowed = sync.peers.get(&peer_id).map(|p| p.is_allowed()).unwrap_or(false);
let block_set = sync.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks); let block_set = sync.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks);
if !sync.reset_peer_asking(peer_id, PeerAsking::BlockHeaders) || expected_hash.is_none() || !allowed {
trace!(target: "sync", "{}: Ignored unexpected headers, expected_hash = {:?}", peer_id, expected_hash); if !sync.reset_peer_asking(peer_id, PeerAsking::BlockHeaders) {
debug!(target: "sync", "{}: Ignored unexpected headers", peer_id);
return Ok(()); return Ok(());
} }
let expected_hash = match expected_hash {
Some(hash) => hash,
None => {
debug!(target: "sync", "{}: Ignored unexpected headers (expected_hash is None)", peer_id);
return Ok(());
}
};
if !allowed {
debug!(target: "sync", "{}: Ignored unexpected headers (peer not allowed)", peer_id);
return Ok(());
}
let item_count = r.item_count()?; let item_count = r.item_count()?;
trace!(target: "sync", "{} -> BlockHeaders ({} entries), state = {:?}, set = {:?}", peer_id, item_count, sync.state, block_set); trace!(target: "sync", "{} -> BlockHeaders ({} entries), state = {:?}, set = {:?}", peer_id, item_count, sync.state, block_set);
if (sync.state == SyncState::Idle || sync.state == SyncState::WaitingPeers) && sync.old_blocks.is_none() { if (sync.state == SyncState::Idle || sync.state == SyncState::WaitingPeers) && sync.old_blocks.is_none() {
@ -419,6 +440,13 @@ impl SyncHandler {
trace!(target: "sync", "{}: Ignored unexpected receipts", peer_id); trace!(target: "sync", "{}: Ignored unexpected receipts", peer_id);
return Ok(()); return Ok(());
} }
let expected_blocks = match sync.peers.get_mut(&peer_id) {
Some(peer) => mem::replace(&mut peer.asking_blocks, Vec::new()),
None => {
trace!(target: "sync", "{}: Ignored unexpected bodies (peer not found)", peer_id);
return Ok(());
}
};
let item_count = r.item_count()?; let item_count = r.item_count()?;
trace!(target: "sync", "{} -> BlockReceipts ({} entries)", peer_id, item_count); trace!(target: "sync", "{} -> BlockReceipts ({} entries)", peer_id, item_count);
if item_count == 0 { if item_count == 0 {
@ -438,7 +466,7 @@ impl SyncHandler {
Some(ref mut blocks) => blocks, Some(ref mut blocks) => blocks,
} }
}; };
downloader.import_receipts(io, r)?; downloader.import_receipts(r, expected_blocks.as_slice())?;
} }
sync.collect_blocks(io, block_set); sync.collect_blocks(io, block_set);
Ok(()) Ok(())

View File

@ -225,32 +225,28 @@ fn propagate_blocks() {
#[test] #[test]
fn restart_on_malformed_block() { fn restart_on_malformed_block() {
::env_logger::try_init().ok();
let mut net = TestNet::new(2); let mut net = TestNet::new(2);
net.peer(1).chain.add_blocks(10, EachBlockWith::Uncle); net.peer(1).chain.add_blocks(5, EachBlockWith::Nothing);
net.peer(1).chain.corrupt_block(6); net.peer(1).chain.add_block(EachBlockWith::Nothing, |mut header| {
header.set_extra_data(b"This extra data is way too long to be considered valid".to_vec());
header
});
net.sync_steps(20); net.sync_steps(20);
assert_eq!(net.peer(0).chain.chain_info().best_block_number, 5); // This gets accepted just fine since the TestBlockChainClient performs no validation.
// Probably remove this test?
assert_eq!(net.peer(0).chain.chain_info().best_block_number, 6);
} }
#[test] #[test]
fn restart_on_broken_chain() { fn reject_on_broken_chain() {
let mut net = TestNet::new(2); let mut net = TestNet::new(2);
net.peer(1).chain.add_blocks(10, EachBlockWith::Uncle); net.peer(1).chain.add_blocks(10, EachBlockWith::Nothing);
net.peer(1).chain.corrupt_block_parent(6); net.peer(1).chain.corrupt_block_parent(6);
net.sync_steps(20); net.sync_steps(20);
assert_eq!(net.peer(0).chain.chain_info().best_block_number, 5); assert_eq!(net.peer(0).chain.chain_info().best_block_number, 0);
}
#[test]
fn high_td_attach() {
let mut net = TestNet::new(2);
net.peer(1).chain.add_blocks(10, EachBlockWith::Uncle);
net.peer(1).chain.corrupt_block_parent(6);
net.sync_steps(20);
assert_eq!(net.peer(0).chain.chain_info().best_block_number, 5);
} }
#[test] #[test]