From b27259518389697128e7219224c4d3f0ab005a7f Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 8 Jan 2016 17:52:25 +0100 Subject: [PATCH] Rlp error handling --- src/sync/chain.rs | 143 +++++++++++++++++++++++++++------------------- 1 file changed, 83 insertions(+), 60 deletions(-) diff --git a/src/sync/chain.rs b/src/sync/chain.rs index 0d45cda5e..5d0798c80 100644 --- a/src/sync/chain.rs +++ b/src/sync/chain.rs @@ -5,11 +5,13 @@ use util::network::{PeerId, PacketId}; use util::hash::{H256, FixedHash}; use util::bytes::{Bytes}; use util::uint::{U256}; -use util::rlp::{Rlp, RlpStream, self}; //TODO: use UntrustedRlp +use util::rlp::{Rlp, UntrustedRlp, RlpStream, self}; use util::rlp::rlptraits::{Stream, View}; +use util::rlp::rlperrors::DecoderError; use util::sha3::Hashable; use client::{BlockNumber, BlockChainClient, BlockStatus, QueueStatus, ImportResult}; use views::{HeaderView}; +use header::{Header as BlockHeader}; use sync::range_collection::{RangeCollection, ToUsize, FromUsize}; use sync::{SyncIo}; @@ -25,6 +27,8 @@ impl FromUsize for BlockNumber { } } +type PacketDecodeError = DecoderError; + const PROTOCOL_VERSION: u8 = 63u8; const MAX_BODIES_TO_SEND: usize = 256; const MAX_HEADERS_TO_SEND: usize = 512; @@ -206,13 +210,13 @@ impl ChainSync { } /// Called by peer to report status - fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { + fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { let peer = PeerInfo { - protocol_version: r.val_at(0), - network_id: r.val_at(1), - difficulty: r.val_at(2), - latest: r.val_at(3), - genesis: r.val_at(4), + protocol_version: try!(r.val_at(0)), + network_id: try!(r.val_at(1)), + difficulty: try!(r.val_at(2)), + latest: try!(r.val_at(3)), + genesis: try!(r.val_at(4)), asking: PeerAsking::Nothing, asking_blocks: Vec::new(), }; @@ -224,26 +228,27 @@ impl ChainSync { panic!("ChainSync: new peer already exists"); } self.sync_peer(io, peer_id, false); + Ok(()) } /// Called by peer once it has new block headers during sync - fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { + fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { self.reset_peer_asking(peer_id, PeerAsking::BlockHeaders); let item_count = r.item_count(); trace!(target: "sync", "{} -> BlockHeaders ({} entries)", peer_id, item_count); self.clear_peer_download(peer_id); if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting { trace!(target: "sync", "Ignored unexpected block headers"); - return; + return Ok(()); } if self.state == SyncState::Waiting { trace!(target: "sync", "Ignored block headers while waiting"); - return; + return Ok(()); } for i in 0..item_count { - let info = HeaderView::new_from_rlp(r.at(i)); - let number = BlockNumber::from(info.number()); + let info: BlockHeader = try!(r.val_at(i)); + let number = BlockNumber::from(info.number); if number <= self.last_imported_block || self.headers.have_item(&number) { trace!(target: "sync", "Skipping existing block header"); continue; @@ -251,7 +256,7 @@ impl ChainSync { if number > self.highest_block { self.highest_block = number; } - let hash = info.sha3(); + let hash = info.hash(); match io.chain().block_status(&hash) { BlockStatus::InChain => { self.have_common_block = true; @@ -262,12 +267,12 @@ impl ChainSync { _ => { if self.have_common_block { //validate chain - if self.have_common_block && number == self.last_imported_block + 1 && info.parent_hash() != self.last_imported_hash { + if self.have_common_block && number == self.last_imported_block + 1 && info.parent_hash != self.last_imported_hash { // TODO: lower peer rating debug!(target: "sync", "Mismatched block header {} {}", number, hash); continue; } - if self.headers.find_item(&(number - 1)).map_or(false, |p| p.hash != info.parent_hash()) { + if self.headers.find_item(&(number - 1)).map_or(false, |p| p.hash != info.parent_hash) { // mismatching parent id, delete the previous block and don't add this one // TODO: lower peer rating debug!(target: "sync", "Mismatched block header {} {}", number, hash); @@ -281,14 +286,14 @@ impl ChainSync { } } let hdr = Header { - data: r.at(i).as_raw().to_vec(), + data: try!(r.at(i)).as_raw().to_vec(), hash: hash.clone(), - parent: info.parent_hash(), + parent: info.parent_hash, }; self.headers.insert_item(number, hdr); let header_id = HeaderId { - transactions_root: info.transactions_root(), - uncles: info.uncles_hash() + transactions_root: info.transactions_root, + uncles: info.uncles_hash }; trace!(target: "sync", "Got header {} ({})", number, hash); if header_id.transactions_root == rlp::SHA3_NULL_RLP && header_id.uncles == rlp::SHA3_EMPTY_LIST_RLP { @@ -306,27 +311,29 @@ impl ChainSync { } self.collect_blocks(io); self.continue_sync(io); + Ok(()) } /// Called by peer once it has new block bodies - fn on_peer_block_bodies(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { + fn on_peer_block_bodies(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + use util::triehash::ordered_trie_root; self.reset_peer_asking(peer_id, PeerAsking::BlockBodies); let item_count = r.item_count(); trace!(target: "sync", "{} -> BlockBodies ({} entries)", peer_id, item_count); self.clear_peer_download(peer_id); if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting { trace!(target: "sync", "Ignored unexpected block bodies"); - return; + return Ok(()); } if self.state == SyncState::Waiting { trace!(target: "sync", "Ignored block bodies while waiting"); - return; + return Ok(()); } for i in 0..item_count { - let body: Rlp = r.at(i); - let tx = body.at(0); - let tx_root = ::util::triehash::ordered_trie_root(tx.iter().map(|r| r.as_raw().to_vec()).collect()); //TODO: get rid of vectors here - let uncles = body.at(1).as_raw().sha3(); + let body = try!(r.at(i)); + let tx = try!(body.at(0)); + let tx_root = ordered_trie_root(tx.iter().map(|r| r.as_raw().to_vec()).collect()); //TODO: get rid of vectors here + let uncles = try!(body.at(1)).as_raw().sha3(); let header_id = HeaderId { transactions_root: tx_root, uncles: uncles @@ -344,20 +351,21 @@ impl ChainSync { } self.collect_blocks(io); self.continue_sync(io); + Ok(()) } /// Called by peer once it has new block bodies - fn on_peer_new_block(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { - let block_rlp = r.at(0); - let header_rlp = block_rlp.at(0); + fn on_peer_new_block(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + let block_rlp = try!(r.at(0)); + let header_rlp = try!(block_rlp.at(0)); let h = header_rlp.as_raw().sha3(); trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h); match io.chain().import_block(block_rlp.as_raw()) { - ImportResult::AlreadyInChain => { + ImportResult::AlreadyInChain => { trace!(target: "sync", "New block already in chain {:?}", h); }, - ImportResult::AlreadyQueued(_) => { + ImportResult::AlreadyQueued(_) => { trace!(target: "sync", "New block already queued {:?}", h); }, ImportResult::Queued(QueueStatus::Known) => { @@ -366,7 +374,7 @@ impl ChainSync { ImportResult::Queued(QueueStatus::Unknown) => { trace!(target: "sync", "New block unknown {:?}", h); //TODO: handle too many unknown blocks - let difficulty: U256 = r.val_at(1); + let difficulty: U256 = try!(r.val_at(1)); let peer_difficulty = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").difficulty; if difficulty > peer_difficulty { trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); @@ -377,18 +385,21 @@ impl ChainSync { debug!(target: "sync", "Bad new block {:?}", h); io.disable_peer(peer_id); } - } + }; + Ok(()) } - fn on_peer_new_hashes(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { + fn on_peer_new_hashes(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { if self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").asking != PeerAsking::Nothing { trace!(target: "sync", "Ignoring new hashes since we're already downloading."); - return; + return Ok(()); } trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count()); let hashes = r.iter().map(|item| (item.val_at::(0), item.val_at::(1))); let mut max_height: U256 = From::from(0); - for (h, d) in hashes { + for (rh, rd) in hashes { + let h = try!(rh); + let d = try!(rd); match io.chain().block_status(&h) { BlockStatus::InChain => { trace!(target: "sync", "New block hash already in chain {:?}", h); @@ -407,10 +418,11 @@ impl ChainSync { BlockStatus::Bad =>{ debug!(target: "sync", "Bad new block hash {:?}", h); io.disable_peer(peer_id); - return; + return Ok(()); } } - } + }; + Ok(()) } /// Called by peer when it is disconnecting @@ -714,7 +726,8 @@ impl ChainSync { } } - fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: &PeerId, _r: &Rlp) { + fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: &PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + Ok(()) } fn send_status(&mut self, io: &mut SyncIo, peer_id: &PeerId) { @@ -735,16 +748,16 @@ impl ChainSync { } } - fn return_block_headers(&self, io: &mut SyncIo, r: &Rlp) { + fn return_block_headers(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { // Packet layout: // [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ] - let max_headers: usize = r.val_at(1); - let skip: usize = r.val_at(2); - let reverse: bool = r.val_at(3); + let max_headers: usize = try!(r.val_at(1)); + let skip: usize = try!(r.val_at(2)); + let reverse: bool = try!(r.val_at(3)); let last = io.chain().chain_info().best_block_number; - let mut number = if r.at(0).size() == 32 { + let mut number = if try!(r.at(0)).size() == 32 { // id is a hash - let hash: H256 = r.val_at(0); + let hash: H256 = try!(r.val_at(0)); trace!(target: "sync", "-> GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", hash, max_headers, skip, reverse); match io.chain().block_header(&hash) { Some(hdr) => From::from(HeaderView::new(&hdr).number()), @@ -752,8 +765,8 @@ impl ChainSync { } } else { - trace!(target: "sync", "-> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", r.val_at::(0), max_headers, skip, reverse); - r.val_at(0) + trace!(target: "sync", "-> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", try!(r.val_at::(0)), max_headers, skip, reverse); + try!(r.val_at(0)) }; if reverse { @@ -788,20 +801,21 @@ impl ChainSync { io.respond(BLOCK_HEADERS_PACKET, rlp.out()).unwrap_or_else(|e| debug!(target: "sync", "Error sending headers: {:?}", e)); trace!(target: "sync", "-> GetBlockHeaders: returned {} entries", count); + Ok(()) } - fn return_block_bodies(&self, io: &mut SyncIo, r: &Rlp) { + fn return_block_bodies(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { let mut count = r.item_count(); if count == 0 { debug!(target: "sync", "Empty GetBlockBodies request, ignoring."); - return; + return Ok(()); } trace!(target: "sync", "-> GetBlockBodies: {} entries", count); count = min(count, MAX_BODIES_TO_SEND); let mut added = 0usize; let mut data = Bytes::new(); for i in 0..count { - match io.chain().block_body(&r.val_at::(i)) { + match io.chain().block_body(&try!(r.val_at::(i))) { Some(mut hdr) => { data.append(&mut hdr); added += 1; @@ -814,19 +828,20 @@ impl ChainSync { io.respond(BLOCK_BODIES_PACKET, rlp.out()).unwrap_or_else(|e| debug!(target: "sync", "Error sending headers: {:?}", e)); trace!(target: "sync", "-> GetBlockBodies: returned {} entries", added); + Ok(()) } - fn return_node_data(&self, io: &mut SyncIo, r: &Rlp) { + fn return_node_data(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { let mut count = r.item_count(); if count == 0 { debug!(target: "sync", "Empty GetNodeData request, ignoring."); - return; + return Ok(()); } count = min(count, MAX_NODE_DATA_TO_SEND); let mut added = 0usize; let mut data = Bytes::new(); for i in 0..count { - match io.chain().state_data(&r.val_at::(i)) { + match io.chain().state_data(&try!(r.val_at::(i))) { Some(mut hdr) => { data.append(&mut hdr); added += 1; @@ -838,19 +853,20 @@ impl ChainSync { rlp.append_raw(&data, added); io.respond(NODE_DATA_PACKET, rlp.out()).unwrap_or_else(|e| debug!(target: "sync", "Error sending headers: {:?}", e)); + Ok(()) } - fn return_receipts(&self, io: &mut SyncIo, r: &Rlp) { + fn return_receipts(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { let mut count = r.item_count(); if count == 0 { debug!(target: "sync", "Empty GetReceipts request, ignoring."); - return; + return Ok(()); } count = min(count, MAX_RECEIPTS_TO_SEND); let mut added = 0usize; let mut data = Bytes::new(); for i in 0..count { - match io.chain().block_receipts(&r.val_at::(i)) { + match io.chain().block_receipts(&try!(r.val_at::(i))) { Some(mut hdr) => { data.append(&mut hdr); added += 1; @@ -862,11 +878,12 @@ impl ChainSync { rlp.append_raw(&data, added); io.respond(RECEIPTS_PACKET, rlp.out()).unwrap_or_else(|e| debug!(target: "sync", "Error sending headers: {:?}", e)); + Ok(()) } pub fn on_packet(&mut self, io: &mut SyncIo, peer: &PeerId, packet_id: u8, data: &[u8]) { - let rlp = Rlp::new(data); - match packet_id { + let rlp = UntrustedRlp::new(data); + let result = match packet_id { STATUS_PACKET => self.on_peer_status(io, peer, &rlp), TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp), GET_BLOCK_HEADERS_PACKET => self.return_block_headers(io, &rlp), @@ -877,8 +894,14 @@ impl ChainSync { NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp), GET_NODE_DATA_PACKET => self.return_node_data(io, &rlp), GET_RECEIPTS_PACKET => self.return_receipts(io, &rlp), - _ => debug!(target: "sync", "Unknown packet {}", packet_id) - } + _ => { + debug!(target: "sync", "Unknown packet {}", packet_id); + Ok(()) + } + }; + result.unwrap_or_else(|e| { + debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e); + }) } pub fn maintain_sync(&mut self, _io: &mut SyncIo) {