From 2fc1679886958674fb2b0fe49b34aee81c46da27 Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Wed, 3 Oct 2018 03:35:10 -0700 Subject: [PATCH] Verify block syncing responses against requests (#9670) * sync: Validate received BlockHeaders packets against stored request. * sync: Validate received BlockBodies and BlockReceipts. * sync: Fix broken tests. * sync: Unit tests for BlockDownloader::import_headers. * sync: Unit tests for import_{bodies,receipts}. * tests: Add missing method doc. --- ethcore/src/client/test_client.rs | 119 +++++----- ethcore/sync/src/block_sync.rs | 370 ++++++++++++++++++++++++++++-- ethcore/sync/src/blocks.rs | 54 ++--- ethcore/sync/src/chain/handler.rs | 36 ++- ethcore/sync/src/tests/chain.rs | 28 +-- 5 files changed, 472 insertions(+), 135 deletions(-) diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index c1f755444..c3d8c127b 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -118,7 +118,7 @@ pub struct TestBlockChainClient { } /// Used for generating test client blocks. -#[derive(Clone)] +#[derive(Clone, Copy)] pub enum EachBlockWith { /// Plain block. Nothing, @@ -242,69 +242,68 @@ impl TestBlockChainClient { *self.error_on_logs.write() = val; } - /// Add blocks to test client. - pub fn add_blocks(&self, count: usize, with: EachBlockWith) { - let len = self.numbers.read().len(); - for n in len..(len + count) { - let mut header = BlockHeader::new(); - header.set_difficulty(From::from(n)); - header.set_parent_hash(self.last_hash.read().clone()); - header.set_number(n as BlockNumber); - header.set_gas_limit(U256::from(1_000_000)); - header.set_extra_data(self.extra_data.clone()); - let uncles = match with { - EachBlockWith::Uncle | EachBlockWith::UncleAndTransaction => { - let mut uncles = RlpStream::new_list(1); - let mut uncle_header = BlockHeader::new(); - uncle_header.set_difficulty(From::from(n)); - uncle_header.set_parent_hash(self.last_hash.read().clone()); - uncle_header.set_number(n as BlockNumber); - uncles.append(&uncle_header); - header.set_uncles_hash(keccak(uncles.as_raw())); - uncles - }, - _ => RlpStream::new_list(0) - }; - let txs = match with { - EachBlockWith::Transaction | EachBlockWith::UncleAndTransaction => { - let mut txs = RlpStream::new_list(1); - let keypair = Random.generate().unwrap(); - // Update nonces value - self.nonces.write().insert(keypair.address(), U256::one()); - let tx = Transaction { - action: Action::Create, - value: U256::from(100), - data: "3331600055".from_hex().unwrap(), - gas: U256::from(100_000), - gas_price: U256::from(200_000_000_000u64), - nonce: U256::zero() - }; - let signed_tx = tx.sign(keypair.secret(), None); - txs.append(&signed_tx); - txs.out() - }, - _ => ::rlp::EMPTY_LIST_RLP.to_vec() - }; + /// Add a block to test client. + pub fn add_block(&self, with: EachBlockWith, hook: F) + where F: Fn(BlockHeader) -> BlockHeader + { + let n = self.numbers.read().len(); - let mut rlp = RlpStream::new_list(3); - rlp.append(&header); - rlp.append_raw(&txs, 1); - rlp.append_raw(uncles.as_raw(), 1); - let unverified = Unverified::from_rlp(rlp.out()).unwrap(); - self.import_block(unverified).unwrap(); - } - } + let mut header = BlockHeader::new(); + header.set_difficulty(From::from(n)); + header.set_parent_hash(self.last_hash.read().clone()); + header.set_number(n as BlockNumber); + header.set_gas_limit(U256::from(1_000_000)); + header.set_extra_data(self.extra_data.clone()); + + header = hook(header); + + let uncles = match with { + EachBlockWith::Uncle | EachBlockWith::UncleAndTransaction => { + let mut uncles = RlpStream::new_list(1); + let mut uncle_header = BlockHeader::new(); + uncle_header.set_difficulty(From::from(n)); + uncle_header.set_parent_hash(self.last_hash.read().clone()); + uncle_header.set_number(n as BlockNumber); + uncles.append(&uncle_header); + header.set_uncles_hash(keccak(uncles.as_raw())); + uncles + }, + _ => RlpStream::new_list(0) + }; + let txs = match with { + EachBlockWith::Transaction | EachBlockWith::UncleAndTransaction => { + let mut txs = RlpStream::new_list(1); + let keypair = Random.generate().unwrap(); + // Update nonces value + self.nonces.write().insert(keypair.address(), U256::one()); + let tx = Transaction { + action: Action::Create, + value: U256::from(100), + data: "3331600055".from_hex().unwrap(), + gas: U256::from(100_000), + gas_price: U256::from(200_000_000_000u64), + nonce: U256::zero() + }; + let signed_tx = tx.sign(keypair.secret(), None); + txs.append(&signed_tx); + txs.out() + }, + _ => ::rlp::EMPTY_LIST_RLP.to_vec() + }; - /// Make a bad block by setting invalid extra data. - pub fn corrupt_block(&self, n: BlockNumber) { - let hash = self.block_hash(BlockId::Number(n)).unwrap(); - let mut header: BlockHeader = self.block_header(BlockId::Number(n)).unwrap().decode().expect("decoding failed"); - header.set_extra_data(b"This extra data is way too long to be considered valid".to_vec()); let mut rlp = RlpStream::new_list(3); rlp.append(&header); - rlp.append_raw(&::rlp::NULL_RLP, 1); - rlp.append_raw(&::rlp::NULL_RLP, 1); - self.blocks.write().insert(hash, rlp.out()); + rlp.append_raw(&txs, 1); + rlp.append_raw(uncles.as_raw(), 1); + let unverified = Unverified::from_rlp(rlp.out()).unwrap(); + self.import_block(unverified).unwrap(); + } + + /// Add a sequence of blocks to test client. + pub fn add_blocks(&self, count: usize, with: EachBlockWith) { + for _ in 0..count { + self.add_block(with, |header| header); + } } /// Make a bad block by setting invalid parent hash. diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 727e5e23e..9afdf3441 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -220,7 +220,7 @@ impl BlockDownloader { } /// Add new block headers. - pub fn import_headers(&mut self, io: &mut SyncIo, r: &Rlp, expected_hash: Option) -> Result { + pub fn import_headers(&mut self, io: &mut SyncIo, r: &Rlp, expected_hash: H256) -> Result { let item_count = r.item_count().unwrap_or(0); if self.state == State::Idle { trace!(target: "sync", "Ignored unexpected block headers"); @@ -230,30 +230,50 @@ impl BlockDownloader { return Err(BlockDownloaderImportError::Invalid); } + // The request is generated in ::request_blocks. + let (max_count, skip) = if self.state == State::ChainHead { + (SUBCHAIN_SIZE as usize, (MAX_HEADERS_TO_REQUEST - 2) as u64) + } else { + (MAX_HEADERS_TO_REQUEST, 0) + }; + + if item_count > max_count { + debug!(target: "sync", "Headers response is larger than expected"); + return Err(BlockDownloaderImportError::Invalid); + } + let mut headers = Vec::new(); let mut hashes = Vec::new(); - let mut valid_response = item_count == 0; //empty response is valid - let mut any_known = false; + let mut last_header = None; for i in 0..item_count { let info = SyncHeader::from_rlp(r.at(i)?.as_raw().to_vec())?; let number = BlockNumber::from(info.header.number()); let hash = info.header.hash(); - // Check if any of the headers matches the hash we requested - if !valid_response { - if let Some(expected) = expected_hash { - valid_response = expected == hash; + + let valid_response = match last_header { + // First header must match expected hash. + None => expected_hash == hash, + Some((last_number, last_hash)) => { + // Subsequent headers must be spaced by skip interval. + let skip_valid = number == last_number + skip + 1; + // Consecutive headers must be linked by parent hash. + let parent_valid = (number != last_number + 1) || *info.header.parent_hash() == last_hash; + skip_valid && parent_valid } + }; + + // Disable the peer for this syncing round if it gives invalid chain + if !valid_response { + debug!(target: "sync", "Invalid headers response"); + return Err(BlockDownloaderImportError::Invalid); } - any_known = any_known || self.blocks.contains_head(&hash); + + last_header = Some((number, hash)); if self.blocks.contains(&hash) { trace!(target: "sync", "Skipping existing block header {} ({:?})", number, hash); continue; } - if self.highest_block.as_ref().map_or(true, |n| number > *n) { - self.highest_block = Some(number); - } - match io.chain().block_status(BlockId::Hash(hash.clone())) { BlockStatus::InChain | BlockStatus::Queued => { match self.state { @@ -273,16 +293,15 @@ impl BlockDownloader { } } - // Disable the peer for this syncing round if it gives invalid chain - if !valid_response { - trace!(target: "sync", "Invalid headers response"); - return Err(BlockDownloaderImportError::Invalid); + if let Some((number, _)) = last_header { + if self.highest_block.as_ref().map_or(true, |n| number > *n) { + self.highest_block = Some(number); + } } match self.state { State::ChainHead => { if !headers.is_empty() { - // TODO: validate heads better. E.g. check that there is enough distance between blocks. trace!(target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); self.blocks.reset_to(hashes); self.state = State::Blocks; @@ -299,8 +318,7 @@ impl BlockDownloader { }, State::Blocks => { let count = headers.len(); - // At least one of the heades must advance the subchain. Otherwise they are all useless. - if count == 0 || !any_known { + if count == 0 { trace!(target: "sync", "No useful headers"); return Err(BlockDownloaderImportError::Useless); } @@ -314,7 +332,7 @@ impl BlockDownloader { } /// Called by peer once it has new block bodies - pub fn import_bodies(&mut self, r: &Rlp) -> Result<(), BlockDownloaderImportError> { + pub fn import_bodies(&mut self, r: &Rlp, expected_hashes: &[H256]) -> Result<(), BlockDownloaderImportError> { let item_count = r.item_count().unwrap_or(0); if item_count == 0 { return Err(BlockDownloaderImportError::Useless); @@ -327,16 +345,21 @@ impl BlockDownloader { bodies.push(body); } - if self.blocks.insert_bodies(bodies) != item_count { + let hashes = self.blocks.insert_bodies(bodies); + if hashes.len() != item_count { trace!(target: "sync", "Deactivating peer for giving invalid block bodies"); return Err(BlockDownloaderImportError::Invalid); } + if !all_expected(hashes.as_slice(), expected_hashes, |&a, &b| a == b) { + trace!(target: "sync", "Deactivating peer for giving unexpected block bodies"); + return Err(BlockDownloaderImportError::Invalid); + } } Ok(()) } /// Called by peer once it has new block bodies - pub fn import_receipts(&mut self, _io: &mut SyncIo, r: &Rlp) -> Result<(), BlockDownloaderImportError> { + pub fn import_receipts(&mut self, r: &Rlp, expected_hashes: &[H256]) -> Result<(), BlockDownloaderImportError> { let item_count = r.item_count().unwrap_or(0); if item_count == 0 { return Err(BlockDownloaderImportError::Useless); @@ -353,10 +376,15 @@ impl BlockDownloader { })?; receipts.push(receipt.as_raw().to_vec()); } - if self.blocks.insert_receipts(receipts) != item_count { + let hashes = self.blocks.insert_receipts(receipts); + if hashes.len() != item_count { trace!(target: "sync", "Deactivating peer for giving invalid block receipts"); return Err(BlockDownloaderImportError::Invalid); } + if !all_expected(hashes.as_slice(), expected_hashes, |a, b| a.contains(b)) { + trace!(target: "sync", "Deactivating peer for giving unexpected block receipts"); + return Err(BlockDownloaderImportError::Invalid); + } } Ok(()) } @@ -549,4 +577,298 @@ impl BlockDownloader { } } -//TODO: module tests +// Determines if the first argument matches an ordered subset of the second, according to some predicate. +fn all_expected(values: &[A], expected_values: &[B], is_expected: F) -> bool + where F: Fn(&A, &B) -> bool +{ + let mut expected_iter = expected_values.iter(); + values.iter().all(|val1| { + while let Some(val2) = expected_iter.next() { + if is_expected(val1, val2) { + return true; + } + } + false + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use ethcore::client::TestBlockChainClient; + use ethcore::header::Header as BlockHeader; + use ethcore::spec::Spec; + use ethkey::{Generator,Random}; + use hash::keccak; + use parking_lot::RwLock; + use rlp::{encode_list,RlpStream}; + use tests::helpers::TestIo; + use tests::snapshot::TestSnapshotService; + use transaction::{Transaction,SignedTransaction}; + use triehash_ethereum::ordered_trie_root; + + fn dummy_header(number: u64, parent_hash: H256) -> BlockHeader { + let mut header = BlockHeader::new(); + header.set_gas_limit(0.into()); + header.set_difficulty((number * 100).into()); + header.set_timestamp(number * 10); + header.set_number(number); + header.set_parent_hash(parent_hash); + header.set_state_root(H256::zero()); + header + } + + fn dummy_signed_tx() -> SignedTransaction { + let keypair = Random.generate().unwrap(); + Transaction::default().sign(keypair.secret(), None) + } + + #[test] + fn import_headers_in_chain_head_state() { + ::env_logger::try_init().ok(); + + let spec = Spec::new_test(); + let genesis_hash = spec.genesis_header().hash(); + + let mut downloader = BlockDownloader::new(false, &genesis_hash, 0); + downloader.state = State::ChainHead; + + let mut chain = TestBlockChainClient::new(); + let snapshot_service = TestSnapshotService::new(); + let queue = RwLock::new(VecDeque::new()); + let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None); + + // Valid headers sequence. + let valid_headers = [ + spec.genesis_header(), + dummy_header(127, H256::random()), + dummy_header(254, H256::random()), + ]; + let rlp_data = encode_list(&valid_headers); + let valid_rlp = Rlp::new(&rlp_data); + + match downloader.import_headers(&mut io, &valid_rlp, genesis_hash) { + Ok(DownloadAction::Reset) => assert_eq!(downloader.state, State::Blocks), + _ => panic!("expected transition to Blocks state"), + }; + + // Headers are rejected because the expected hash does not match. + let invalid_start_block_headers = [ + dummy_header(0, H256::random()), + dummy_header(127, H256::random()), + dummy_header(254, H256::random()), + ]; + let rlp_data = encode_list(&invalid_start_block_headers); + let invalid_start_block_rlp = Rlp::new(&rlp_data); + + match downloader.import_headers(&mut io, &invalid_start_block_rlp, genesis_hash) { + Err(BlockDownloaderImportError::Invalid) => (), + _ => panic!("expected BlockDownloaderImportError"), + }; + + // Headers are rejected because they are not spaced as expected. + let invalid_skip_headers = [ + spec.genesis_header(), + dummy_header(128, H256::random()), + dummy_header(256, H256::random()), + ]; + let rlp_data = encode_list(&invalid_skip_headers); + let invalid_skip_rlp = Rlp::new(&rlp_data); + + match downloader.import_headers(&mut io, &invalid_skip_rlp, genesis_hash) { + Err(BlockDownloaderImportError::Invalid) => (), + _ => panic!("expected BlockDownloaderImportError"), + }; + + // Invalid because the packet size is too large. + let mut too_many_headers = Vec::with_capacity((SUBCHAIN_SIZE + 1) as usize); + too_many_headers.push(spec.genesis_header()); + for i in 1..(SUBCHAIN_SIZE + 1) { + too_many_headers.push(dummy_header((MAX_HEADERS_TO_REQUEST as u64 - 1) * i, H256::random())); + } + let rlp_data = encode_list(&too_many_headers); + + let too_many_rlp = Rlp::new(&rlp_data); + match downloader.import_headers(&mut io, &too_many_rlp, genesis_hash) { + Err(BlockDownloaderImportError::Invalid) => (), + _ => panic!("expected BlockDownloaderImportError"), + }; + } + + #[test] + fn import_headers_in_blocks_state() { + ::env_logger::try_init().ok(); + + let mut chain = TestBlockChainClient::new(); + let snapshot_service = TestSnapshotService::new(); + let queue = RwLock::new(VecDeque::new()); + let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None); + + let mut headers = Vec::with_capacity(3); + let parent_hash = H256::random(); + headers.push(dummy_header(127, parent_hash)); + let parent_hash = headers[0].hash(); + headers.push(dummy_header(128, parent_hash)); + let parent_hash = headers[1].hash(); + headers.push(dummy_header(129, parent_hash)); + + let mut downloader = BlockDownloader::new(false, &H256::random(), 0); + downloader.state = State::Blocks; + downloader.blocks.reset_to(vec![headers[0].hash()]); + + let rlp_data = encode_list(&headers); + let headers_rlp = Rlp::new(&rlp_data); + + match downloader.import_headers(&mut io, &headers_rlp, headers[0].hash()) { + Ok(DownloadAction::None) => (), + _ => panic!("expected successful import"), + }; + + // Invalidate parent_hash link. + headers[2] = dummy_header(129, H256::random()); + let rlp_data = encode_list(&headers); + let headers_rlp = Rlp::new(&rlp_data); + + match downloader.import_headers(&mut io, &headers_rlp, headers[0].hash()) { + Err(BlockDownloaderImportError::Invalid) => (), + _ => panic!("expected BlockDownloaderImportError"), + }; + + // Invalidate header sequence by skipping a header. + headers[2] = dummy_header(130, headers[1].hash()); + let rlp_data = encode_list(&headers); + let headers_rlp = Rlp::new(&rlp_data); + + match downloader.import_headers(&mut io, &headers_rlp, headers[0].hash()) { + Err(BlockDownloaderImportError::Invalid) => (), + _ => panic!("expected BlockDownloaderImportError"), + }; + } + + #[test] + fn import_bodies() { + ::env_logger::try_init().ok(); + + let mut chain = TestBlockChainClient::new(); + let snapshot_service = TestSnapshotService::new(); + let queue = RwLock::new(VecDeque::new()); + let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None); + + // Import block headers. + let mut headers = Vec::with_capacity(4); + let mut bodies = Vec::with_capacity(4); + let mut parent_hash = H256::zero(); + for i in 0..4 { + // Construct the block body + let mut uncles = if i > 0 { + encode_list(&[dummy_header(i - 1, H256::random())]).into_vec() + } else { + ::rlp::EMPTY_LIST_RLP.to_vec() + }; + + let mut txs = encode_list(&[dummy_signed_tx()]); + let tx_root = ordered_trie_root(Rlp::new(&txs).iter().map(|r| r.as_raw())); + + let mut rlp = RlpStream::new_list(2); + rlp.append_raw(&txs, 1); + rlp.append_raw(&uncles, 1); + bodies.push(rlp.out()); + + // Construct the block header + let mut header = dummy_header(i, parent_hash); + header.set_transactions_root(tx_root); + header.set_uncles_hash(keccak(&uncles)); + parent_hash = header.hash(); + headers.push(header); + } + + let mut downloader = BlockDownloader::new(false, &headers[0].hash(), 0); + downloader.state = State::Blocks; + downloader.blocks.reset_to(vec![headers[0].hash()]); + + // Only import the first three block headers. + let rlp_data = encode_list(&headers[0..3]); + let headers_rlp = Rlp::new(&rlp_data); + assert!(downloader.import_headers(&mut io, &headers_rlp, headers[0].hash()).is_ok()); + + // Import first body successfully. + let mut rlp_data = RlpStream::new_list(1); + rlp_data.append_raw(&bodies[0], 1); + let bodies_rlp = Rlp::new(rlp_data.as_raw()); + assert!(downloader.import_bodies(&bodies_rlp, &[headers[0].hash(), headers[1].hash()]).is_ok()); + + // Import second body successfully. + let mut rlp_data = RlpStream::new_list(1); + rlp_data.append_raw(&bodies[1], 1); + let bodies_rlp = Rlp::new(rlp_data.as_raw()); + assert!(downloader.import_bodies(&bodies_rlp, &[headers[0].hash(), headers[1].hash()]).is_ok()); + + // Import unexpected third body. + let mut rlp_data = RlpStream::new_list(1); + rlp_data.append_raw(&bodies[2], 1); + let bodies_rlp = Rlp::new(rlp_data.as_raw()); + match downloader.import_bodies(&bodies_rlp, &[headers[0].hash(), headers[1].hash()]) { + Err(BlockDownloaderImportError::Invalid) => (), + _ => panic!("expected BlockDownloaderImportError"), + }; + } + + #[test] + fn import_receipts() { + ::env_logger::try_init().ok(); + + let mut chain = TestBlockChainClient::new(); + let snapshot_service = TestSnapshotService::new(); + let queue = RwLock::new(VecDeque::new()); + let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None); + + // Import block headers. + let mut headers = Vec::with_capacity(4); + let mut receipts = Vec::with_capacity(4); + let mut parent_hash = H256::zero(); + for i in 0..4 { + // Construct the receipts. Receipt root for the first two blocks is the same. + // + // The RLP-encoded integers are clearly not receipts, but the BlockDownloader treats + // all receipts as byte blobs, so it does not matter. + let mut receipts_rlp = if i < 2 { + encode_list(&[0u32]) + } else { + encode_list(&[i as u32]) + }; + let receipts_root = ordered_trie_root(Rlp::new(&receipts_rlp).iter().map(|r| r.as_raw())); + receipts.push(receipts_rlp); + + // Construct the block header. + let mut header = dummy_header(i, parent_hash); + header.set_receipts_root(receipts_root); + parent_hash = header.hash(); + headers.push(header); + } + + let mut downloader = BlockDownloader::new(true, &headers[0].hash(), 0); + downloader.state = State::Blocks; + downloader.blocks.reset_to(vec![headers[0].hash()]); + + // Only import the first three block headers. + let rlp_data = encode_list(&headers[0..3]); + let headers_rlp = Rlp::new(&rlp_data); + assert!(downloader.import_headers(&mut io, &headers_rlp, headers[0].hash()).is_ok()); + + // Import second and third receipts successfully. + let mut rlp_data = RlpStream::new_list(2); + rlp_data.append_raw(&receipts[1], 1); + rlp_data.append_raw(&receipts[2], 1); + let receipts_rlp = Rlp::new(rlp_data.as_raw()); + assert!(downloader.import_receipts(&receipts_rlp, &[headers[1].hash(), headers[2].hash()]).is_ok()); + + // Import unexpected fourth receipt. + let mut rlp_data = RlpStream::new_list(1); + rlp_data.append_raw(&receipts[3], 1); + let bodies_rlp = Rlp::new(rlp_data.as_raw()); + match downloader.import_bodies(&bodies_rlp, &[headers[1].hash(), headers[2].hash()]) { + Err(BlockDownloaderImportError::Invalid) => (), + _ => panic!("expected BlockDownloaderImportError"), + }; + } +} diff --git a/ethcore/sync/src/blocks.rs b/ethcore/sync/src/blocks.rs index 4a6805a2b..18ec6e29b 100644 --- a/ethcore/sync/src/blocks.rs +++ b/ethcore/sync/src/blocks.rs @@ -212,32 +212,28 @@ impl BlockCollection { } /// Insert a collection of block bodies for previously downloaded headers. - pub fn insert_bodies(&mut self, bodies: Vec) -> usize { - let mut inserted = 0; - for b in bodies { - if let Err(e) = self.insert_body(b) { - trace!(target: "sync", "Ignored invalid body: {:?}", e); - } else { - inserted += 1; - } - } - inserted + pub fn insert_bodies(&mut self, bodies: Vec) -> Vec { + bodies.into_iter() + .filter_map(|b| { + self.insert_body(b) + .map_err(|e| trace!(target: "sync", "Ignored invalid body: {:?}", e)) + .ok() + }) + .collect() } /// Insert a collection of block receipts for previously downloaded headers. - pub fn insert_receipts(&mut self, receipts: Vec) -> usize { + pub fn insert_receipts(&mut self, receipts: Vec) -> Vec> { if !self.need_receipts { - return 0; + return Vec::new(); } - let mut inserted = 0; - for r in receipts { - if let Err(e) = self.insert_receipt(r) { - trace!(target: "sync", "Ignored invalid receipt: {:?}", e); - } else { - inserted += 1; - } - } - inserted + receipts.into_iter() + .filter_map(|r| { + self.insert_receipt(r) + .map_err(|e| trace!(target: "sync", "Ignored invalid receipt: {:?}", e)) + .ok() + }) + .collect() } /// Returns a set of block hashes that require a body download. The returned set is marked as being downloaded. @@ -398,11 +394,6 @@ impl BlockCollection { self.blocks.contains_key(hash) } - /// Check if collection contains a block header. - pub fn contains_head(&self, hash: &H256) -> bool { - self.heads.contains(hash) - } - /// Return used heap size. pub fn heap_size(&self) -> usize { self.heads.heap_size_of_children() @@ -418,7 +409,7 @@ impl BlockCollection { self.downloading_headers.contains(hash) || self.downloading_bodies.contains(hash) } - fn insert_body(&mut self, body: SyncBody) -> Result<(), network::Error> { + fn insert_body(&mut self, body: SyncBody) -> Result { let header_id = { let tx_root = ordered_trie_root(Rlp::new(&body.transactions_bytes).iter().map(|r| r.as_raw())); let uncles = keccak(&body.uncles_bytes); @@ -435,7 +426,7 @@ impl BlockCollection { Some(ref mut block) => { trace!(target: "sync", "Got body {}", h); block.body = Some(body); - Ok(()) + Ok(h) }, None => { warn!("Got body with no header {}", h); @@ -450,7 +441,7 @@ impl BlockCollection { } } - fn insert_receipt(&mut self, r: Bytes) -> Result<(), network::Error> { + fn insert_receipt(&mut self, r: Bytes) -> Result, network::Error> { let receipt_root = { let receipts = Rlp::new(&r); ordered_trie_root(receipts.iter().map(|r| r.as_raw())) @@ -458,7 +449,8 @@ impl BlockCollection { self.downloading_receipts.remove(&receipt_root); match self.receipt_ids.entry(receipt_root) { hash_map::Entry::Occupied(entry) => { - for h in entry.remove() { + let block_hashes = entry.remove(); + for h in block_hashes.iter() { match self.blocks.get_mut(&h) { Some(ref mut block) => { trace!(target: "sync", "Got receipt {}", h); @@ -470,7 +462,7 @@ impl BlockCollection { } } } - Ok(()) + Ok(block_hashes) }, hash_map::Entry::Vacant(_) => { trace!(target: "sync", "Ignored unknown/stale block receipt {:?}", receipt_root); diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index be95a3c28..c1671f81e 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -28,6 +28,7 @@ use network::PeerId; use rlp::Rlp; use snapshot::ChunkType; use std::cmp; +use std::mem; use std::collections::HashSet; use std::time::Instant; use sync_io::SyncIo; @@ -296,6 +297,13 @@ impl SyncHandler { trace!(target: "sync", "{}: Ignored unexpected bodies", peer_id); return Ok(()); } + let expected_blocks = match sync.peers.get_mut(&peer_id) { + Some(peer) => mem::replace(&mut peer.asking_blocks, Vec::new()), + None => { + trace!(target: "sync", "{}: Ignored unexpected bodies (peer not found)", peer_id); + return Ok(()); + } + }; let item_count = r.item_count()?; trace!(target: "sync", "{} -> BlockBodies ({} entries), set = {:?}", peer_id, item_count, block_set); if item_count == 0 { @@ -315,7 +323,7 @@ impl SyncHandler { Some(ref mut blocks) => blocks, } }; - downloader.import_bodies(r)?; + downloader.import_bodies(r, expected_blocks.as_slice())?; } sync.collect_blocks(io, block_set); Ok(()) @@ -368,10 +376,23 @@ impl SyncHandler { let expected_hash = sync.peers.get(&peer_id).and_then(|p| p.asking_hash); let allowed = sync.peers.get(&peer_id).map(|p| p.is_allowed()).unwrap_or(false); let block_set = sync.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks); - if !sync.reset_peer_asking(peer_id, PeerAsking::BlockHeaders) || expected_hash.is_none() || !allowed { - trace!(target: "sync", "{}: Ignored unexpected headers, expected_hash = {:?}", peer_id, expected_hash); + + if !sync.reset_peer_asking(peer_id, PeerAsking::BlockHeaders) { + debug!(target: "sync", "{}: Ignored unexpected headers", peer_id); return Ok(()); } + let expected_hash = match expected_hash { + Some(hash) => hash, + None => { + debug!(target: "sync", "{}: Ignored unexpected headers (expected_hash is None)", peer_id); + return Ok(()); + } + }; + if !allowed { + debug!(target: "sync", "{}: Ignored unexpected headers (peer not allowed)", peer_id); + return Ok(()); + } + let item_count = r.item_count()?; trace!(target: "sync", "{} -> BlockHeaders ({} entries), state = {:?}, set = {:?}", peer_id, item_count, sync.state, block_set); if (sync.state == SyncState::Idle || sync.state == SyncState::WaitingPeers) && sync.old_blocks.is_none() { @@ -419,6 +440,13 @@ impl SyncHandler { trace!(target: "sync", "{}: Ignored unexpected receipts", peer_id); return Ok(()); } + let expected_blocks = match sync.peers.get_mut(&peer_id) { + Some(peer) => mem::replace(&mut peer.asking_blocks, Vec::new()), + None => { + trace!(target: "sync", "{}: Ignored unexpected bodies (peer not found)", peer_id); + return Ok(()); + } + }; let item_count = r.item_count()?; trace!(target: "sync", "{} -> BlockReceipts ({} entries)", peer_id, item_count); if item_count == 0 { @@ -438,7 +466,7 @@ impl SyncHandler { Some(ref mut blocks) => blocks, } }; - downloader.import_receipts(io, r)?; + downloader.import_receipts(r, expected_blocks.as_slice())?; } sync.collect_blocks(io, block_set); Ok(()) diff --git a/ethcore/sync/src/tests/chain.rs b/ethcore/sync/src/tests/chain.rs index 0b6c8f7c2..06118df66 100644 --- a/ethcore/sync/src/tests/chain.rs +++ b/ethcore/sync/src/tests/chain.rs @@ -225,32 +225,28 @@ fn propagate_blocks() { #[test] fn restart_on_malformed_block() { + ::env_logger::try_init().ok(); let mut net = TestNet::new(2); - net.peer(1).chain.add_blocks(10, EachBlockWith::Uncle); - net.peer(1).chain.corrupt_block(6); + net.peer(1).chain.add_blocks(5, EachBlockWith::Nothing); + net.peer(1).chain.add_block(EachBlockWith::Nothing, |mut header| { + header.set_extra_data(b"This extra data is way too long to be considered valid".to_vec()); + header + }); net.sync_steps(20); - assert_eq!(net.peer(0).chain.chain_info().best_block_number, 5); + // This gets accepted just fine since the TestBlockChainClient performs no validation. + // Probably remove this test? + assert_eq!(net.peer(0).chain.chain_info().best_block_number, 6); } #[test] -fn restart_on_broken_chain() { +fn reject_on_broken_chain() { let mut net = TestNet::new(2); - net.peer(1).chain.add_blocks(10, EachBlockWith::Uncle); + net.peer(1).chain.add_blocks(10, EachBlockWith::Nothing); net.peer(1).chain.corrupt_block_parent(6); net.sync_steps(20); - assert_eq!(net.peer(0).chain.chain_info().best_block_number, 5); -} - -#[test] -fn high_td_attach() { - let mut net = TestNet::new(2); - net.peer(1).chain.add_blocks(10, EachBlockWith::Uncle); - net.peer(1).chain.corrupt_block_parent(6); - net.sync_steps(20); - - assert_eq!(net.peer(0).chain.chain_info().best_block_number, 5); + assert_eq!(net.peer(0).chain.chain_info().best_block_number, 0); } #[test]