ethcore sync decodes rlp less often (#9264)

* deserialize block only once during verification

* ethcore-sync uses Unverified

* ethcore-sync uses Unverified

* fixed build error

* removed Block::is_good

* applied review suggestions

* ethcore-sync deserializes headers and blocks only once
This commit is contained in:
Marek Kotewicz 2018-08-08 10:56:54 +02:00 committed by GitHub
parent 712101b63d
commit 78a38e9825
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 196 additions and 118 deletions

View File

@ -113,6 +113,7 @@ pub mod blocks {
} }
/// An unverified block. /// An unverified block.
#[derive(PartialEq, Debug)]
pub struct Unverified { pub struct Unverified {
/// Unverified block header. /// Unverified block header.
pub header: Header, pub header: Header,

View File

@ -23,12 +23,11 @@ use std::cmp;
use heapsize::HeapSizeOf; use heapsize::HeapSizeOf;
use ethereum_types::H256; use ethereum_types::H256;
use rlp::{self, Rlp}; use rlp::{self, Rlp};
use ethcore::header::{BlockNumber, Header as BlockHeader}; use ethcore::header::BlockNumber;
use ethcore::client::{BlockStatus, BlockId, BlockImportError, BlockImportErrorKind}; use ethcore::client::{BlockStatus, BlockId, BlockImportError, BlockImportErrorKind};
use ethcore::error::{ImportErrorKind, BlockError}; use ethcore::error::{ImportErrorKind, BlockError};
use ethcore::verification::queue::kind::blocks::Unverified;
use sync_io::SyncIo; use sync_io::SyncIo;
use blocks::BlockCollection; use blocks::{BlockCollection, SyncBody, SyncHeader};
const MAX_HEADERS_TO_REQUEST: usize = 128; const MAX_HEADERS_TO_REQUEST: usize = 128;
const MAX_BODIES_TO_REQUEST: usize = 32; const MAX_BODIES_TO_REQUEST: usize = 32;
@ -236,45 +235,39 @@ impl BlockDownloader {
let mut valid_response = item_count == 0; //empty response is valid let mut valid_response = item_count == 0; //empty response is valid
let mut any_known = false; let mut any_known = false;
for i in 0..item_count { for i in 0..item_count {
let info: BlockHeader = r.val_at(i).map_err(|e| { let info = SyncHeader::from_rlp(r.at(i)?.as_raw().to_vec())?;
trace!(target: "sync", "Error decoding block header RLP: {:?}", e); let number = BlockNumber::from(info.header.number());
BlockDownloaderImportError::Invalid let hash = info.header.hash();
})?;
let number = BlockNumber::from(info.number());
// Check if any of the headers matches the hash we requested // Check if any of the headers matches the hash we requested
if !valid_response { if !valid_response {
if let Some(expected) = expected_hash { if let Some(expected) = expected_hash {
valid_response = expected == info.hash() valid_response = expected == hash;
} }
} }
any_known = any_known || self.blocks.contains_head(&info.hash()); any_known = any_known || self.blocks.contains_head(&hash);
if self.blocks.contains(&info.hash()) { if self.blocks.contains(&hash) {
trace!(target: "sync", "Skipping existing block header {} ({:?})", number, info.hash()); trace!(target: "sync", "Skipping existing block header {} ({:?})", number, hash);
continue; continue;
} }
if self.highest_block.as_ref().map_or(true, |n| number > *n) { if self.highest_block.as_ref().map_or(true, |n| number > *n) {
self.highest_block = Some(number); self.highest_block = Some(number);
} }
let hash = info.hash();
let hdr = r.at(i).map_err(|e| {
trace!(target: "sync", "Error decoding block header RLP: {:?}", e);
BlockDownloaderImportError::Invalid
})?;
match io.chain().block_status(BlockId::Hash(hash.clone())) { match io.chain().block_status(BlockId::Hash(hash.clone())) {
BlockStatus::InChain | BlockStatus::Queued => { BlockStatus::InChain | BlockStatus::Queued => {
match self.state { match self.state {
State::Blocks => trace!(target: "sync", "Header already in chain {} ({})", number, hash), State::Blocks => trace!(target: "sync", "Header already in chain {} ({})", number, hash),
_ => trace!(target: "sync", "Header already in chain {} ({}), state = {:?}", number, hash, self.state), _ => trace!(target: "sync", "Header already in chain {} ({}), state = {:?}", number, hash, self.state),
} }
headers.push(hdr.as_raw().to_vec()); headers.push(info);
hashes.push(hash); hashes.push(hash);
}, },
BlockStatus::Bad => { BlockStatus::Bad => {
return Err(BlockDownloaderImportError::Invalid); return Err(BlockDownloaderImportError::Invalid);
}, },
BlockStatus::Unknown | BlockStatus::Pending => { BlockStatus::Unknown | BlockStatus::Pending => {
headers.push(hdr.as_raw().to_vec()); headers.push(info);
hashes.push(hash); hashes.push(hash);
} }
} }
@ -325,19 +318,15 @@ impl BlockDownloader {
let item_count = r.item_count().unwrap_or(0); let item_count = r.item_count().unwrap_or(0);
if item_count == 0 { if item_count == 0 {
return Err(BlockDownloaderImportError::Useless); return Err(BlockDownloaderImportError::Useless);
} } else if self.state != State::Blocks {
else if self.state != State::Blocks {
trace!(target: "sync", "Ignored unexpected block bodies"); trace!(target: "sync", "Ignored unexpected block bodies");
} } else {
else {
let mut bodies = Vec::with_capacity(item_count); let mut bodies = Vec::with_capacity(item_count);
for i in 0..item_count { for i in 0..item_count {
let body = r.at(i).map_err(|e| { let body = SyncBody::from_rlp(r.at(i)?.as_raw())?;
trace!(target: "sync", "Error decoding block boides RLP: {:?}", e); bodies.push(body);
BlockDownloaderImportError::Invalid
})?;
bodies.push(body.as_raw().to_vec());
} }
if self.blocks.insert_bodies(bodies) != item_count { if self.blocks.insert_bodies(bodies) != item_count {
trace!(target: "sync", "Deactivating peer for giving invalid block bodies"); trace!(target: "sync", "Deactivating peer for giving invalid block bodies");
return Err(BlockDownloaderImportError::Invalid); return Err(BlockDownloaderImportError::Invalid);
@ -483,15 +472,6 @@ impl BlockDownloader {
let block = block_and_receipts.block; let block = block_and_receipts.block;
let receipts = block_and_receipts.receipts; let receipts = block_and_receipts.receipts;
let block = match Unverified::from_rlp(block) {
Ok(block) => block,
Err(_) => {
debug!(target: "sync", "Bad block rlp");
bad = true;
break;
}
};
let h = block.header.hash(); let h = block.header.hash();
let number = block.header.number(); let number = block.header.number();
let parent = *block.header.parent_hash(); let parent = *block.header.parent_hash();

View File

@ -23,28 +23,85 @@ use triehash_ethereum::ordered_trie_root;
use bytes::Bytes; use bytes::Bytes;
use rlp::{Rlp, RlpStream, DecoderError}; use rlp::{Rlp, RlpStream, DecoderError};
use network; use network;
use ethcore::encoded::Block;
use ethcore::views::{HeaderView, BodyView};
use ethcore::header::Header as BlockHeader; use ethcore::header::Header as BlockHeader;
use ethcore::verification::queue::kind::blocks::Unverified;
use transaction::UnverifiedTransaction;
known_heap_size!(0, HeaderId); known_heap_size!(0, HeaderId);
type SmallHashVec = SmallVec<[H256; 1]>; type SmallHashVec = SmallVec<[H256; 1]>;
/// Block data with optional body. pub struct SyncHeader {
struct SyncBlock { pub bytes: Bytes,
header: Bytes, pub header: BlockHeader,
body: Option<Bytes>,
receipts: Option<Bytes>,
receipts_root: H256,
} }
/// Block with optional receipt impl HeapSizeOf for SyncHeader {
pub struct BlockAndReceipts { fn heap_size_of_children(&self) -> usize {
/// Block data. self.bytes.heap_size_of_children()
pub block: Bytes, + self.header.heap_size_of_children()
/// Block receipts RLP list. }
pub receipts: Option<Bytes>, }
impl SyncHeader {
pub fn from_rlp(bytes: Bytes) -> Result<Self, DecoderError> {
let result = SyncHeader {
header: ::rlp::decode(&bytes)?,
bytes,
};
Ok(result)
}
}
pub struct SyncBody {
pub transactions_bytes: Bytes,
pub transactions: Vec<UnverifiedTransaction>,
pub uncles_bytes: Bytes,
pub uncles: Vec<BlockHeader>,
}
impl SyncBody {
pub fn from_rlp(bytes: &[u8]) -> Result<Self, DecoderError> {
let rlp = Rlp::new(bytes);
let transactions_rlp = rlp.at(0)?;
let uncles_rlp = rlp.at(1)?;
let result = SyncBody {
transactions_bytes: transactions_rlp.as_raw().to_vec(),
transactions: transactions_rlp.as_list()?,
uncles_bytes: uncles_rlp.as_raw().to_vec(),
uncles: uncles_rlp.as_list()?,
};
Ok(result)
}
fn empty_body() -> Self {
SyncBody {
transactions_bytes: ::rlp::EMPTY_LIST_RLP.to_vec(),
transactions: Vec::with_capacity(0),
uncles_bytes: ::rlp::EMPTY_LIST_RLP.to_vec(),
uncles: Vec::with_capacity(0),
}
}
}
impl HeapSizeOf for SyncBody {
fn heap_size_of_children(&self) -> usize {
self.transactions_bytes.heap_size_of_children()
+ self.transactions.heap_size_of_children()
+ self.uncles_bytes.heap_size_of_children()
+ self.uncles.heap_size_of_children()
}
}
/// Block data with optional body.
struct SyncBlock {
header: SyncHeader,
body: Option<SyncBody>,
receipts: Option<Bytes>,
receipts_root: H256,
} }
impl HeapSizeOf for SyncBlock { impl HeapSizeOf for SyncBlock {
@ -53,6 +110,29 @@ impl HeapSizeOf for SyncBlock {
} }
} }
fn unverified_from_sync(header: SyncHeader, body: Option<SyncBody>) -> Unverified {
let mut stream = RlpStream::new_list(3);
stream.append_raw(&header.bytes, 1);
let body = body.unwrap_or_else(SyncBody::empty_body);
stream.append_raw(&body.transactions_bytes, 1);
stream.append_raw(&body.uncles_bytes, 1);
Unverified {
header: header.header,
transactions: body.transactions,
uncles: body.uncles,
bytes: stream.out().to_vec(),
}
}
/// Block with optional receipt
pub struct BlockAndReceipts {
/// Block data.
pub block: Unverified,
/// Block receipts RLP list.
pub receipts: Option<Bytes>,
}
/// Used to identify header by transactions and uncles hashes /// Used to identify header by transactions and uncles hashes
#[derive(Eq, PartialEq, Hash)] #[derive(Eq, PartialEq, Hash)]
struct HeaderId { struct HeaderId {
@ -124,7 +204,7 @@ impl BlockCollection {
} }
/// Insert a set of headers into collection and advance subchain head pointers. /// Insert a set of headers into collection and advance subchain head pointers.
pub fn insert_headers(&mut self, headers: Vec<Bytes>) { pub fn insert_headers(&mut self, headers: Vec<SyncHeader>) {
for h in headers { for h in headers {
if let Err(e) = self.insert_header(h) { if let Err(e) = self.insert_header(h) {
trace!(target: "sync", "Ignored invalid header: {:?}", e); trace!(target: "sync", "Ignored invalid header: {:?}", e);
@ -134,7 +214,7 @@ impl BlockCollection {
} }
/// Insert a collection of block bodies for previously downloaded headers. /// Insert a collection of block bodies for previously downloaded headers.
pub fn insert_bodies(&mut self, bodies: Vec<Bytes>) -> usize { pub fn insert_bodies(&mut self, bodies: Vec<SyncBody>) -> usize {
let mut inserted = 0; let mut inserted = 0;
for b in bodies { for b in bodies {
if let Err(e) = self.insert_body(b) { if let Err(e) = self.insert_body(b) {
@ -278,30 +358,33 @@ impl BlockCollection {
while let Some(h) = head { while let Some(h) = head {
head = self.parents.get(&h).cloned(); head = self.parents.get(&h).cloned();
if let Some(head) = head { if let Some(head) = head {
match self.blocks.get(&head) { match self.blocks.remove(&head) {
Some(block) if block.body.is_some() && (!self.need_receipts || block.receipts.is_some()) => { Some(block) => {
blocks.push(block); if block.body.is_some() && (!self.need_receipts || block.receipts.is_some()) {
hashes.push(head); blocks.push(block);
self.head = Some(head); hashes.push(head);
} self.head = Some(head);
_ => break, } else {
self.blocks.insert(head, block);
break;
}
},
_ => {
break;
},
} }
} }
} }
for block in blocks { for block in blocks.into_iter() {
let body = view!(BodyView, block.body.as_ref().expect("blocks contains only full blocks; qed")); let unverified = unverified_from_sync(block.header, block.body);
let header = view!(HeaderView, &block.header);
let block_view = Block::new_from_header_and_body(&header, &body);
drained.push(BlockAndReceipts { drained.push(BlockAndReceipts {
block: block_view.into_inner(), block: unverified,
receipts: block.receipts.clone(), receipts: block.receipts.clone(),
}); });
} }
} }
for h in hashes {
self.blocks.remove(&h);
}
trace!(target: "sync", "Drained {} blocks, new head :{:?}", drained.len(), self.head); trace!(target: "sync", "Drained {} blocks, new head :{:?}", drained.len(), self.head);
drained drained
} }
@ -337,26 +420,23 @@ impl BlockCollection {
self.downloading_headers.contains(hash) || self.downloading_bodies.contains(hash) self.downloading_headers.contains(hash) || self.downloading_bodies.contains(hash)
} }
fn insert_body(&mut self, b: Bytes) -> Result<(), network::Error> { fn insert_body(&mut self, body: SyncBody) -> Result<(), network::Error> {
let header_id = { let header_id = {
let body = Rlp::new(&b); let tx_root = ordered_trie_root(Rlp::new(&body.transactions_bytes).iter().map(|r| r.as_raw()));
let tx = body.at(0)?; let uncles = keccak(&body.uncles_bytes);
let tx_root = ordered_trie_root(tx.iter().map(|r| r.as_raw()));
let uncles = keccak(body.at(1)?.as_raw());
HeaderId { HeaderId {
transactions_root: tx_root, transactions_root: tx_root,
uncles: uncles uncles: uncles
} }
}; };
match self.header_ids.get(&header_id).cloned() { match self.header_ids.remove(&header_id) {
Some(h) => { Some(h) => {
self.header_ids.remove(&header_id);
self.downloading_bodies.remove(&h); self.downloading_bodies.remove(&h);
match self.blocks.get_mut(&h) { match self.blocks.get_mut(&h) {
Some(ref mut block) => { Some(ref mut block) => {
trace!(target: "sync", "Got body {}", h); trace!(target: "sync", "Got body {}", h);
block.body = Some(b); block.body = Some(body);
Ok(()) Ok(())
}, },
None => { None => {
@ -401,54 +481,63 @@ impl BlockCollection {
} }
} }
fn insert_header(&mut self, header: Bytes) -> Result<H256, DecoderError> { fn insert_header(&mut self, info: SyncHeader) -> Result<H256, DecoderError> {
let info: BlockHeader = Rlp::new(&header).as_val()?; let hash = info.header.hash();
let hash = info.hash();
if self.blocks.contains_key(&hash) { if self.blocks.contains_key(&hash) {
return Ok(hash); return Ok(hash);
} }
match self.head { match self.head {
None if hash == self.heads[0] => { None if hash == self.heads[0] => {
trace!(target: "sync", "New head {}", hash); trace!(target: "sync", "New head {}", hash);
self.head = Some(info.parent_hash().clone()); self.head = Some(info.header.parent_hash().clone());
}, },
_ => () _ => ()
} }
let mut block = SyncBlock {
header: header,
body: None,
receipts: None,
receipts_root: H256::new(),
};
let header_id = HeaderId { let header_id = HeaderId {
transactions_root: info.transactions_root().clone(), transactions_root: *info.header.transactions_root(),
uncles: info.uncles_hash().clone(), uncles: *info.header.uncles_hash(),
}; };
if header_id.transactions_root == KECCAK_NULL_RLP && header_id.uncles == KECCAK_EMPTY_LIST_RLP {
let body = if header_id.transactions_root == KECCAK_NULL_RLP && header_id.uncles == KECCAK_EMPTY_LIST_RLP {
// empty body, just mark as downloaded // empty body, just mark as downloaded
let mut body_stream = RlpStream::new_list(2); Some(SyncBody::empty_body())
body_stream.append_raw(&::rlp::EMPTY_LIST_RLP, 1); } else {
body_stream.append_raw(&::rlp::EMPTY_LIST_RLP, 1); trace!(
block.body = Some(body_stream.out()); "Queueing body tx_root = {:?}, uncles = {:?}, block = {:?}, number = {}",
} header_id.transactions_root,
else { header_id.uncles,
trace!("Queueing body tx_root = {:?}, uncles = {:?}, block = {:?}, number = {}", header_id.transactions_root, header_id.uncles, hash, info.number()); hash,
self.header_ids.insert(header_id, hash.clone()); info.header.number()
} );
if self.need_receipts { self.header_ids.insert(header_id, hash);
let receipt_root = info.receipts_root().clone(); None
};
let (receipts, receipts_root) = if self.need_receipts {
let receipt_root = *info.header.receipts_root();
if receipt_root == KECCAK_NULL_RLP { if receipt_root == KECCAK_NULL_RLP {
let receipts_stream = RlpStream::new_list(0); let receipts_stream = RlpStream::new_list(0);
block.receipts = Some(receipts_stream.out()); (Some(receipts_stream.out()), receipt_root)
} else { } else {
self.receipt_ids.entry(receipt_root).or_insert_with(|| SmallHashVec::new()).push(hash.clone()); self.receipt_ids.entry(receipt_root).or_insert_with(|| SmallHashVec::new()).push(hash);
(None, receipt_root)
} }
block.receipts_root = receipt_root; } else {
} (None, H256::new())
};
self.parents.insert(info.parent_hash().clone(), hash.clone()); self.parents.insert(*info.header.parent_hash(), hash);
self.blocks.insert(hash.clone(), block);
let block = SyncBlock {
header: info,
body,
receipts,
receipts_root,
};
self.blocks.insert(hash, block);
trace!(target: "sync", "New header: {:x}", hash); trace!(target: "sync", "New header: {:x}", hash);
Ok(hash) Ok(hash)
} }
@ -485,10 +574,11 @@ impl BlockCollection {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::BlockCollection; use super::{BlockCollection, SyncHeader};
use ethcore::client::{TestBlockChainClient, EachBlockWith, BlockId, BlockChainClient}; use ethcore::client::{TestBlockChainClient, EachBlockWith, BlockId, BlockChainClient};
use ethcore::views::HeaderView;
use ethcore::header::BlockNumber; use ethcore::header::BlockNumber;
use ethcore::verification::queue::kind::blocks::Unverified;
use ethcore::views::HeaderView;
use rlp::*; use rlp::*;
fn is_empty(bc: &BlockCollection) -> bool { fn is_empty(bc: &BlockCollection) -> bool {
@ -541,7 +631,7 @@ mod test {
assert_eq!(bc.downloading_headers.len(), 1); assert_eq!(bc.downloading_headers.len(), 1);
assert!(bc.drain().is_empty()); assert!(bc.drain().is_empty());
bc.insert_headers(headers[0..6].to_vec()); bc.insert_headers(headers[0..6].iter().map(|h| SyncHeader::from_rlp(h.to_vec()).unwrap()).collect());
assert_eq!(hashes[5], bc.heads[0]); assert_eq!(hashes[5], bc.heads[0]);
for h in &hashes[0..6] { for h in &hashes[0..6] {
bc.clear_header_download(h) bc.clear_header_download(h)
@ -550,7 +640,10 @@ mod test {
assert!(!bc.is_downloading(&hashes[0])); assert!(!bc.is_downloading(&hashes[0]));
assert!(bc.contains(&hashes[0])); assert!(bc.contains(&hashes[0]));
assert_eq!(&bc.drain().into_iter().map(|b| b.block).collect::<Vec<_>>()[..], &blocks[0..6]); assert_eq!(
bc.drain().into_iter().map(|b| b.block).collect::<Vec<_>>(),
blocks[0..6].iter().map(|b| Unverified::from_rlp(b.to_vec()).unwrap()).collect::<Vec<_>>()
);
assert!(!bc.contains(&hashes[0])); assert!(!bc.contains(&hashes[0]));
assert_eq!(hashes[5], bc.head.unwrap()); assert_eq!(hashes[5], bc.head.unwrap());
@ -558,13 +651,17 @@ mod test {
assert_eq!(hashes[5], h); assert_eq!(hashes[5], h);
let (h, _) = bc.needed_headers(6, false).unwrap(); let (h, _) = bc.needed_headers(6, false).unwrap();
assert_eq!(hashes[20], h); assert_eq!(hashes[20], h);
bc.insert_headers(headers[10..16].to_vec()); bc.insert_headers(headers[10..16].iter().map(|h| SyncHeader::from_rlp(h.to_vec()).unwrap()).collect());
assert!(bc.drain().is_empty()); assert!(bc.drain().is_empty());
bc.insert_headers(headers[5..10].to_vec()); bc.insert_headers(headers[5..10].iter().map(|h| SyncHeader::from_rlp(h.to_vec()).unwrap()).collect());
assert_eq!(&bc.drain().into_iter().map(|b| b.block).collect::<Vec<_>>()[..], &blocks[6..16]); assert_eq!(
bc.drain().into_iter().map(|b| b.block).collect::<Vec<_>>(),
blocks[6..16].iter().map(|b| Unverified::from_rlp(b.to_vec()).unwrap()).collect::<Vec<_>>()
);
assert_eq!(hashes[15], bc.heads[0]); assert_eq!(hashes[15], bc.heads[0]);
bc.insert_headers(headers[15..].to_vec()); bc.insert_headers(headers[15..].iter().map(|h| SyncHeader::from_rlp(h.to_vec()).unwrap()).collect());
bc.drain(); bc.drain();
assert!(bc.is_empty()); assert!(bc.is_empty());
} }
@ -584,11 +681,11 @@ mod test {
let heads: Vec<_> = hashes.iter().enumerate().filter_map(|(i, h)| if i % 20 == 0 { Some(h.clone()) } else { None }).collect(); let heads: Vec<_> = hashes.iter().enumerate().filter_map(|(i, h)| if i % 20 == 0 { Some(h.clone()) } else { None }).collect();
bc.reset_to(heads); bc.reset_to(heads);
bc.insert_headers(headers[2..22].to_vec()); bc.insert_headers(headers[2..22].iter().map(|h| SyncHeader::from_rlp(h.to_vec()).unwrap()).collect());
assert_eq!(hashes[0], bc.heads[0]); assert_eq!(hashes[0], bc.heads[0]);
assert_eq!(hashes[21], bc.heads[1]); assert_eq!(hashes[21], bc.heads[1]);
assert!(bc.head.is_none()); assert!(bc.head.is_none());
bc.insert_headers(headers[0..2].to_vec()); bc.insert_headers(headers[0..2].iter().map(|h| SyncHeader::from_rlp(h.to_vec()).unwrap()).collect());
assert!(bc.head.is_some()); assert!(bc.head.is_some());
assert_eq!(hashes[21], bc.heads[0]); assert_eq!(hashes[21], bc.heads[0]);
} }
@ -608,9 +705,9 @@ mod test {
let heads: Vec<_> = hashes.iter().enumerate().filter_map(|(i, h)| if i % 20 == 0 { Some(h.clone()) } else { None }).collect(); let heads: Vec<_> = hashes.iter().enumerate().filter_map(|(i, h)| if i % 20 == 0 { Some(h.clone()) } else { None }).collect();
bc.reset_to(heads); bc.reset_to(heads);
bc.insert_headers(headers[1..2].to_vec()); bc.insert_headers(headers[1..2].iter().map(|h| SyncHeader::from_rlp(h.to_vec()).unwrap()).collect());
assert!(bc.drain().is_empty()); assert!(bc.drain().is_empty());
bc.insert_headers(headers[0..1].to_vec()); bc.insert_headers(headers[0..1].iter().map(|h| SyncHeader::from_rlp(h.to_vec()).unwrap()).collect());
assert_eq!(bc.drain().len(), 2); assert_eq!(bc.drain().len(), 2);
} }
} }