Snapshot sync part 2 (#2098)

* Split block downloader into a module

* Snapshot sync progress

* Warp sync CLI option

* Increased snapshot chunk and ping timeouts

* Fixed an issue with delayed writes

* Updated bootnodes

* Don't run pending IO tasks on shutdown

* Optional first_block; removed insert_snapshot_block

* Fixing expect calls

* Fixed stalled sync

* style and docs

* Update block_sync.rs

[ci:skip]
This commit is contained in:
Arkadiy Paronyan
2016-10-18 18:16:00 +02:00
committed by GitHub
parent dba2d79b56
commit 487dfb0208
36 changed files with 1347 additions and 528 deletions

View File

@@ -15,7 +15,8 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc;
use std::str;
use std::collections::HashMap;
use util::Bytes;
use network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId,
NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode, NetworkError};
use util::{U256, H256};
@@ -41,6 +42,8 @@ pub struct SyncConfig {
pub subprotocol_name: [u8; 3],
/// Fork block to check
pub fork_block: Option<(BlockNumber, H256)>,
/// Enable snapshot sync
pub warp_sync: bool,
}
impl Default for SyncConfig {
@@ -50,6 +53,7 @@ impl Default for SyncConfig {
network_id: U256::from(1),
subprotocol_name: *b"eth",
fork_block: None,
warp_sync: true,
}
}
}
@@ -104,7 +108,12 @@ impl EthSync {
let service = try!(NetworkService::new(try!(network_config.into_basic())));
let sync = Arc::new(EthSync{
network: service,
handler: Arc::new(SyncProtocolHandler { sync: RwLock::new(chain_sync), chain: chain, snapshot_service: snapshot_service }),
handler: Arc::new(SyncProtocolHandler {
sync: RwLock::new(chain_sync),
chain: chain,
snapshot_service: snapshot_service,
overlay: RwLock::new(HashMap::new()),
}),
subprotocol_name: config.subprotocol_name,
});
@@ -122,7 +131,7 @@ impl SyncProvider for EthSync {
/// Get sync peers
fn peers(&self) -> Vec<PeerInfo> {
self.network.with_context_eval(self.subprotocol_name, |context| {
let sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service);
let sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service, &self.handler.overlay);
self.handler.sync.write().peers(&sync_io)
}).unwrap_or(Vec::new())
}
@@ -135,6 +144,8 @@ struct SyncProtocolHandler {
snapshot_service: Arc<SnapshotService>,
/// Sync strategy
sync: RwLock<ChainSync>,
/// Chain overlay used to cache data such as fork block.
overlay: RwLock<HashMap<BlockNumber, Bytes>>,
}
impl NetworkProtocolHandler for SyncProtocolHandler {
@@ -143,21 +154,21 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
}
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service), *peer, packet_id, data);
ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data);
}
fn connected(&self, io: &NetworkContext, peer: &PeerId) {
self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service), *peer);
self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer);
}
fn disconnected(&self, io: &NetworkContext, peer: &PeerId) {
self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service), *peer);
self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer);
}
fn timeout(&self, io: &NetworkContext, _timer: TimerToken) {
self.sync.write().maintain_peers(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service));
self.sync.write().maintain_sync(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service));
self.sync.write().propagate_new_transactions(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service));
self.sync.write().maintain_peers(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay));
self.sync.write().maintain_sync(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay));
self.sync.write().propagate_new_transactions(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay));
}
}
@@ -171,7 +182,7 @@ impl ChainNotify for EthSync {
_duration: u64)
{
self.network.with_context(self.subprotocol_name, |context| {
let mut sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service);
let mut sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service, &self.handler.overlay);
self.handler.sync.write().chain_new_blocks(
&mut sync_io,
&imported,
@@ -239,7 +250,7 @@ impl ManageNetwork for EthSync {
fn stop_network(&self) {
self.network.with_context(self.subprotocol_name, |context| {
let mut sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service);
let mut sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service, &self.handler.overlay);
self.handler.sync.write().abort(&mut sync_io);
});
self.stop();

477
sync/src/block_sync.rs Normal file
View File

@@ -0,0 +1,477 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
///
/// Blockchain downloader
///
use util::*;
use rlp::*;
use ethcore::views::{BlockView};
use ethcore::header::{BlockNumber, Header as BlockHeader};
use ethcore::client::{BlockStatus, BlockID, BlockImportError};
use ethcore::block::Block;
use ethcore::error::{ImportError, BlockError};
use sync_io::SyncIo;
use blocks::BlockCollection;
const MAX_HEADERS_TO_REQUEST: usize = 128;
const MAX_BODIES_TO_REQUEST: usize = 128;
const MAX_RECEPITS_TO_REQUEST: usize = 128;
const SUBCHAIN_SIZE: u64 = 256;
const MAX_ROUND_PARENTS: usize = 32;
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
/// Downloader state
pub enum State {
/// No active downloads.
Idle,
/// Downloading subchain heads
ChainHead,
/// Downloading blocks
Blocks,
/// Download is complete
Complete,
}
/// Data that needs to be requested from a peer.
pub enum BlockRequest {
Headers {
start: H256,
count: u64,
skip: u64,
},
Bodies {
hashes: Vec<H256>,
},
Receipts {
hashes: Vec<H256>,
},
}
#[derive(Eq, PartialEq, Debug)]
pub enum BlockDownloaderImportError {
/// Imported data is rejected as invalid.
Invalid,
/// Imported data is valid but rejected cause the downloader does not need it.
Useless,
}
/// Block downloader strategy.
/// Manages state and block data for a block download process.
pub struct BlockDownloader {
/// Downloader state
state: State,
/// Highest block number seen
highest_block: Option<BlockNumber>,
/// Downloaded blocks, holds `H`, `B` and `S`
blocks: BlockCollection,
/// Last impoted block number
last_imported_block: BlockNumber,
/// Last impoted block hash
last_imported_hash: H256,
/// Number of blocks imported this round
imported_this_round: Option<usize>,
/// Block parents imported this round (hash, parent)
round_parents: VecDeque<(H256, H256)>,
/// Do we need to download block recetips.
download_receipts: bool,
/// Sync up to the block with this hash.
target_hash: Option<H256>,
}
impl BlockDownloader {
/// Create a new instance of syncing strategy.
pub fn new(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber) -> BlockDownloader {
BlockDownloader {
state: State::Idle,
highest_block: None,
last_imported_block: start_number,
last_imported_hash: start_hash.clone(),
blocks: BlockCollection::new(sync_receipts),
imported_this_round: None,
round_parents: VecDeque::new(),
download_receipts: sync_receipts,
target_hash: None,
}
}
/// Reset sync. Clear all local downloaded data.
pub fn reset(&mut self) {
self.blocks.clear();
self.state = State::Idle;
}
/// Mark a block as known in the chain
pub fn mark_as_known(&mut self, hash: &H256, number: BlockNumber) {
if number == self.last_imported_block + 1 {
self.last_imported_block = number;
self.last_imported_hash = hash.clone();
}
}
/// Check if download is complete
pub fn is_complete(&self) -> bool {
self.state == State::Complete
}
/// Check if particular block hash is being downloaded
pub fn is_downloading(&self, hash: &H256) -> bool {
self.blocks.is_downloading(hash)
}
/// Set starting sync block
pub fn set_target(&mut self, hash: &H256) {
self.target_hash = Some(hash.clone());
}
/// Set starting sync block
pub fn _set_start(&mut self, hash: &H256, number: BlockNumber) {
self.last_imported_hash = hash.clone();
self.last_imported_block = number;
}
/// Unmark header as being downloaded.
pub fn clear_header_download(&mut self, hash: &H256) {
self.blocks.clear_header_download(hash)
}
/// Unmark block body as being downloaded.
pub fn clear_body_download(&mut self, hashes: &[H256]) {
self.blocks.clear_body_download(hashes)
}
/// Unmark block receipt as being downloaded.
pub fn clear_receipt_download(&mut self, hashes: &[H256]) {
self.blocks.clear_receipt_download(hashes)
}
/// Reset collection for a new sync round with given subchain block hashes.
pub fn reset_to(&mut self, hashes: Vec<H256>) {
self.reset();
self.blocks.reset_to(hashes);
}
/// Returns used heap memory size.
pub fn heap_size(&self) -> usize {
self.blocks.heap_size() + self.round_parents.heap_size_of_children()
}
/// Returns best imported block number.
pub fn last_imported_block_number(&self) -> BlockNumber {
self.last_imported_block
}
/// Add new block headers.
pub fn import_headers(&mut self, io: &mut SyncIo, r: &UntrustedRlp, expected_hash: Option<H256>) -> Result<(), BlockDownloaderImportError> {
let item_count = r.item_count();
if self.state == State::Idle {
trace!(target: "sync", "Ignored unexpected block headers");
return Ok(())
}
if item_count == 0 && (self.state == State::Blocks) {
return Err(BlockDownloaderImportError::Invalid);
}
let mut headers = Vec::new();
let mut hashes = Vec::new();
let mut valid_response = item_count == 0; //empty response is valid
for i in 0..item_count {
let info: BlockHeader = try!(r.val_at(i).map_err(|e| {
trace!(target: "sync", "Error decoding block header RLP: {:?}", e);
BlockDownloaderImportError::Invalid
}));
let number = BlockNumber::from(info.number());
// Check if any of the headers matches the hash we requested
if !valid_response {
if let Some(expected) = expected_hash {
valid_response = expected == info.hash()
}
}
if self.blocks.contains(&info.hash()) {
trace!(target: "sync", "Skipping existing block header {} ({:?})", number, info.hash());
continue;
}
if self.highest_block.as_ref().map_or(true, |n| number > *n) {
self.highest_block = Some(number);
}
let hash = info.hash();
let hdr = try!(r.at(i).map_err(|e| {
trace!(target: "sync", "Error decoding block header RLP: {:?}", e);
BlockDownloaderImportError::Invalid
}));
match io.chain().block_status(BlockID::Hash(hash.clone())) {
BlockStatus::InChain | BlockStatus::Queued => {
match self.state {
State::Blocks => trace!(target: "sync", "Header already in chain {} ({})", number, hash),
_ => trace!(target: "sync", "Header already in chain {} ({}), state = {:?}", number, hash, self.state),
}
headers.push(hdr.as_raw().to_vec());
hashes.push(hash);
},
BlockStatus::Bad => {
return Err(BlockDownloaderImportError::Invalid);
},
BlockStatus::Unknown => {
headers.push(hdr.as_raw().to_vec());
hashes.push(hash);
}
}
}
// Disable the peer for this syncing round if it gives invalid chain
if !valid_response {
trace!(target: "sync", "Invalid headers response");
return Err(BlockDownloaderImportError::Invalid);
}
match self.state {
State::ChainHead => {
if !headers.is_empty() {
// TODO: validate heads better. E.g. check that there is enough distance between blocks.
trace!(target: "sync", "Received {} subchain heads, proceeding to download", headers.len());
self.blocks.reset_to(hashes);
self.state = State::Blocks;
}
},
State::Blocks => {
let count = headers.len();
self.blocks.insert_headers(headers);
trace!(target: "sync", "Inserted {} headers", count);
},
_ => trace!(target: "sync", "Unexpected headers({})", headers.len()),
}
Ok(())
}
/// Called by peer once it has new block bodies
pub fn import_bodies(&mut self, _io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), BlockDownloaderImportError> {
let item_count = r.item_count();
if item_count == 0 {
return Err(BlockDownloaderImportError::Useless);
}
else if self.state != State::Blocks {
trace!(target: "sync", "Ignored unexpected block bodies");
}
else {
let mut bodies = Vec::with_capacity(item_count);
for i in 0..item_count {
let body = try!(r.at(i).map_err(|e| {
trace!(target: "sync", "Error decoding block boides RLP: {:?}", e);
BlockDownloaderImportError::Invalid
}));
bodies.push(body.as_raw().to_vec());
}
if self.blocks.insert_bodies(bodies) != item_count {
trace!(target: "sync", "Deactivating peer for giving invalid block bodies");
return Err(BlockDownloaderImportError::Invalid);
}
}
Ok(())
}
/// Called by peer once it has new block bodies
pub fn import_receipts(&mut self, _io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), BlockDownloaderImportError> {
let item_count = r.item_count();
if item_count == 0 {
return Err(BlockDownloaderImportError::Useless);
}
else if self.state != State::Blocks {
trace!(target: "sync", "Ignored unexpected block receipts");
}
else {
let mut receipts = Vec::with_capacity(item_count);
for i in 0..item_count {
let receipt = try!(r.at(i).map_err(|e| {
trace!(target: "sync", "Error decoding block receipts RLP: {:?}", e);
BlockDownloaderImportError::Invalid
}));
receipts.push(receipt.as_raw().to_vec());
}
if self.blocks.insert_receipts(receipts) != item_count {
trace!(target: "sync", "Deactivating peer for giving invalid block receipts");
return Err(BlockDownloaderImportError::Invalid);
}
}
Ok(())
}
fn start_sync_round(&mut self, io: &mut SyncIo) {
self.state = State::ChainHead;
trace!(target: "sync", "Starting round (last imported count = {:?}, block = {:?}", self.imported_this_round, self.last_imported_block);
// Check if need to retract to find the common block. The problem is that the peers still return headers by hash even
// from the non-canonical part of the tree. So we also retract if nothing has been imported last round.
match self.imported_this_round {
Some(n) if n == 0 && self.last_imported_block > 0 => {
// nothing was imported last round, step back to a previous block
// search parent in last round known parents first
if let Some(&(_, p)) = self.round_parents.iter().find(|&&(h, _)| h == self.last_imported_hash) {
self.last_imported_block -= 1;
self.last_imported_hash = p.clone();
trace!(target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash);
} else {
match io.chain().block_hash(BlockID::Number(self.last_imported_block - 1)) {
Some(h) => {
self.last_imported_block -= 1;
self.last_imported_hash = h;
trace!(target: "sync", "Searching common header in the blockchain {} ({})", self.last_imported_block, self.last_imported_hash);
}
None => {
debug!(target: "sync", "Could not revert to previous block, last: {} ({})", self.last_imported_block, self.last_imported_hash);
}
}
}
},
_ => (),
}
self.imported_this_round = None;
}
/// Find some headers or blocks to download for a peer.
pub fn request_blocks(&mut self, io: &mut SyncIo) -> Option<BlockRequest> {
match self.state {
State::Idle => {
self.start_sync_round(io);
return self.request_blocks(io);
},
State::ChainHead => {
// Request subchain headers
trace!(target: "sync", "Starting sync with better chain");
// Request MAX_HEADERS_TO_REQUEST - 2 headers apart so that
// MAX_HEADERS_TO_REQUEST would include headers for neighbouring subchains
return Some(BlockRequest::Headers {
start: self.last_imported_hash.clone(),
count: SUBCHAIN_SIZE,
skip: (MAX_HEADERS_TO_REQUEST - 2) as u64,
});
},
State::Blocks => {
// check to see if we need to download any block bodies first
let needed_bodies = self.blocks.needed_bodies(MAX_BODIES_TO_REQUEST, false);
if !needed_bodies.is_empty() {
return Some(BlockRequest::Bodies {
hashes: needed_bodies,
});
}
if self.download_receipts {
let needed_receipts = self.blocks.needed_receipts(MAX_RECEPITS_TO_REQUEST, false);
if !needed_receipts.is_empty() {
return Some(BlockRequest::Receipts {
hashes: needed_receipts,
});
}
}
// find subchain to download
if let Some((h, count)) = self.blocks.needed_headers(MAX_HEADERS_TO_REQUEST, false) {
return Some(BlockRequest::Headers {
start: h,
count: count as u64,
skip: 0,
});
}
},
State::Complete => (),
}
None
}
/// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import.
pub fn collect_blocks(&mut self, io: &mut SyncIo, allow_out_of_order: bool) -> Result<(), BlockDownloaderImportError> {
let mut bad = false;
let mut imported = HashSet::new();
let blocks = self.blocks.drain();
let count = blocks.len();
for block_and_receipts in blocks {
let block = block_and_receipts.block;
let receipts = block_and_receipts.receipts;
let (h, number, parent) = {
let header = BlockView::new(&block).header_view();
(header.sha3(), header.number(), header.parent_hash())
};
// Perform basic block verification
if !Block::is_good(&block) {
debug!(target: "sync", "Bad block rlp {:?} : {:?}", h, block);
bad = true;
break;
}
if self.target_hash.as_ref().map_or(false, |t| t == &h) {
self.state = State::Complete;
trace!(target: "sync", "Sync target reached");
return Ok(());
}
let result = if let Some(receipts) = receipts {
io.chain().import_block_with_receipts(block, receipts)
} else {
io.chain().import_block(block)
};
match result {
Err(BlockImportError::Import(ImportError::AlreadyInChain)) => {
trace!(target: "sync", "Block already in chain {:?}", h);
self.block_imported(&h, number, &parent);
},
Err(BlockImportError::Import(ImportError::AlreadyQueued)) => {
trace!(target: "sync", "Block already queued {:?}", h);
self.block_imported(&h, number, &parent);
},
Ok(_) => {
trace!(target: "sync", "Block queued {:?}", h);
imported.insert(h.clone());
self.block_imported(&h, number, &parent);
},
Err(BlockImportError::Block(BlockError::UnknownParent(_))) if allow_out_of_order => {
trace!(target: "sync", "Unknown new block parent, restarting sync");
break;
},
Err(e) => {
debug!(target: "sync", "Bad block {:?} : {:?}", h, e);
bad = true;
break;
}
}
}
trace!(target: "sync", "Imported {} of {}", imported.len(), count);
self.imported_this_round = Some(self.imported_this_round.unwrap_or(0) + imported.len());
if bad {
return Err(BlockDownloaderImportError::Invalid);
}
if self.blocks.is_empty() {
// complete sync round
trace!(target: "sync", "Sync round complete");
self.reset();
}
Ok(())
}
fn block_imported(&mut self, hash: &H256, number: BlockNumber, parent: &H256) {
self.last_imported_block = number;
self.last_imported_hash = hash.clone();
self.round_parents.push_back((hash.clone(), parent.clone()));
if self.round_parents.len() > MAX_ROUND_PARENTS {
self.round_parents.pop_front();
}
}
}
//TODO: module tests

View File

@@ -25,6 +25,15 @@ known_heap_size!(0, HeaderId);
struct SyncBlock {
header: Bytes,
body: Option<Bytes>,
receipts: Option<Bytes>,
}
/// Block with optional receipt
pub struct BlockAndReceipts {
/// Block data.
pub block: Bytes,
/// Block receipts RLP list.
pub receipts: Option<Bytes>,
}
impl HeapSizeOf for SyncBlock {
@@ -45,6 +54,8 @@ struct HeaderId {
/// the downloaded blocks.
#[derive(Default)]
pub struct BlockCollection {
/// Does this collection need block receipts.
need_receipts: bool,
/// Heads of subchains to download
heads: Vec<H256>,
/// Downloaded blocks.
@@ -53,25 +64,32 @@ pub struct BlockCollection {
parents: HashMap<H256, H256>,
/// Used to map body to header.
header_ids: HashMap<HeaderId, H256>,
/// Used to map receipts root to header.
receipt_ids: HashMap<H256, H256>,
/// First block in `blocks`.
head: Option<H256>,
/// Set of block header hashes being downloaded
downloading_headers: HashSet<H256>,
/// Set of block bodies being downloaded identified by block hash.
downloading_bodies: HashSet<H256>,
/// Set of block receipts being downloaded identified by block hash.
downloading_receipts: HashSet<H256>,
}
impl BlockCollection {
/// Create a new instance.
pub fn new() -> BlockCollection {
pub fn new(download_receipts: bool) -> BlockCollection {
BlockCollection {
need_receipts: download_receipts,
blocks: HashMap::new(),
header_ids: HashMap::new(),
receipt_ids: HashMap::new(),
heads: Vec::new(),
parents: HashMap::new(),
head: None,
downloading_headers: HashSet::new(),
downloading_bodies: HashSet::new(),
downloading_receipts: HashSet::new(),
}
}
@@ -80,10 +98,12 @@ impl BlockCollection {
self.blocks.clear();
self.parents.clear();
self.header_ids.clear();
self.receipt_ids.clear();
self.heads.clear();
self.head = None;
self.downloading_headers.clear();
self.downloading_bodies.clear();
self.downloading_receipts.clear();
}
/// Reset collection for a new sync round with given subchain block hashes.
@@ -108,8 +128,23 @@ impl BlockCollection {
for b in bodies.into_iter() {
if let Err(e) = self.insert_body(b) {
trace!(target: "sync", "Ignored invalid body: {:?}", e);
} else {
inserted += 1;
}
else {
}
inserted
}
/// Insert a collection of block receipts for previously downloaded headers.
pub fn insert_receipts(&mut self, receipts: Vec<Bytes>) -> usize {
if !self.need_receipts {
return 0;
}
let mut inserted = 0;
for r in receipts.into_iter() {
if let Err(e) = self.insert_receipt(r) {
trace!(target: "sync", "Ignored invalid receipt: {:?}", e);
} else {
inserted += 1;
}
}
@@ -147,6 +182,38 @@ impl BlockCollection {
needed_bodies
}
/// Returns a set of block hashes that require a receipt download. The returned set is marked as being downloaded.
pub fn needed_receipts(&mut self, count: usize, _ignore_downloading: bool) -> Vec<H256> {
if self.head.is_none() || !self.need_receipts {
return Vec::new();
}
let mut needed_receipts: Vec<H256> = Vec::new();
let mut head = self.head;
while head.is_some() && needed_receipts.len() < count {
head = self.parents.get(&head.unwrap()).cloned();
if let Some(head) = head {
match self.blocks.get(&head) {
Some(block) if block.receipts.is_none() && !self.downloading_receipts.contains(&head) => {
self.downloading_receipts.insert(head.clone());
needed_receipts.push(head.clone());
}
_ => (),
}
}
}
for h in self.receipt_ids.values() {
if needed_receipts.len() >= count {
break;
}
if !self.downloading_receipts.contains(h) {
needed_receipts.push(h.clone());
self.downloading_receipts.insert(h.clone());
}
}
needed_receipts
}
/// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded.
pub fn needed_headers(&mut self, count: usize, ignore_downloading: bool) -> Option<(H256, usize)> {
// find subchain to download
@@ -163,18 +230,27 @@ impl BlockCollection {
download.map(|h| (h, count))
}
/// Unmark a header as being downloaded.
/// Unmark header as being downloaded.
pub fn clear_header_download(&mut self, hash: &H256) {
self.downloading_headers.remove(hash);
}
/// Unmark a block body as being downloaded.
pub fn clear_body_download(&mut self, hash: &H256) {
self.downloading_bodies.remove(hash);
/// Unmark block body as being downloaded.
pub fn clear_body_download(&mut self, hashes: &[H256]) {
for h in hashes {
self.downloading_bodies.remove(h);
}
}
/// Unmark block receipt as being downloaded.
pub fn clear_receipt_download(&mut self, hashes: &[H256]) {
for h in hashes {
self.downloading_receipts.remove(h);
}
}
/// Get a valid chain of blocks ordered in descending order and ready for importing into blockchain.
pub fn drain(&mut self) -> Vec<Bytes> {
pub fn drain(&mut self) -> Vec<BlockAndReceipts> {
if self.blocks.is_empty() || self.head.is_none() {
return Vec::new();
}
@@ -188,7 +264,7 @@ impl BlockCollection {
head = self.parents.get(&h).cloned();
if let Some(head) = head {
match self.blocks.get(&head) {
Some(block) if block.body.is_some() => {
Some(block) if block.body.is_some() && (!self.need_receipts || block.receipts.is_some()) => {
blocks.push(block);
hashes.push(head);
self.head = Some(head);
@@ -198,19 +274,24 @@ impl BlockCollection {
}
}
for block in blocks.drain(..) {
for block in blocks {
let mut block_rlp = RlpStream::new_list(3);
block_rlp.append_raw(&block.header, 1);
let body = Rlp::new(block.body.as_ref().expect("blocks contains only full blocks; qed"));
block_rlp.append_raw(body.at(0).as_raw(), 1);
block_rlp.append_raw(body.at(1).as_raw(), 1);
drained.push(block_rlp.out());
{
let body = Rlp::new(block.body.as_ref().expect("blocks contains only full blocks; qed"));
block_rlp.append_raw(body.at(0).as_raw(), 1);
block_rlp.append_raw(body.at(1).as_raw(), 1);
}
drained.push(BlockAndReceipts {
block: block_rlp.out(),
receipts: block.receipts.clone(),
});
}
}
for h in hashes {
self.blocks.remove(&h);
}
trace!("Drained {} blocks, new head :{:?}", drained.len(), self.head);
trace!(target: "sync", "Drained {} blocks, new head :{:?}", drained.len(), self.head);
drained
}
@@ -241,14 +322,17 @@ impl BlockCollection {
}
fn insert_body(&mut self, b: Bytes) -> Result<(), NetworkError> {
let body = UntrustedRlp::new(&b);
let tx = try!(body.at(0));
let tx_root = ordered_trie_root(tx.iter().map(|r| r.as_raw().to_vec())); //TODO: get rid of vectors here
let uncles = try!(body.at(1)).as_raw().sha3();
let header_id = HeaderId {
transactions_root: tx_root,
uncles: uncles
let header_id = {
let body = UntrustedRlp::new(&b);
let tx = try!(body.at(0));
let tx_root = ordered_trie_root(tx.iter().map(|r| r.as_raw().to_vec())); //TODO: get rid of vectors here
let uncles = try!(body.at(1)).as_raw().sha3();
HeaderId {
transactions_root: tx_root,
uncles: uncles
}
};
match self.header_ids.get(&header_id).cloned() {
Some(h) => {
self.header_ids.remove(&header_id);
@@ -256,7 +340,7 @@ impl BlockCollection {
match self.blocks.get_mut(&h) {
Some(ref mut block) => {
trace!(target: "sync", "Got body {}", h);
block.body = Some(body.as_raw().to_vec());
block.body = Some(b);
Ok(())
},
None => {
@@ -266,7 +350,35 @@ impl BlockCollection {
}
}
None => {
trace!(target: "sync", "Ignored unknown/stale block body");
trace!(target: "sync", "Ignored unknown/stale block body. tx_root = {:?}, uncles = {:?}", header_id.transactions_root, header_id.uncles);
Err(NetworkError::BadProtocol)
}
}
}
fn insert_receipt(&mut self, r: Bytes) -> Result<(), NetworkError> {
let receipt_root = {
let receipts = UntrustedRlp::new(&r);
ordered_trie_root(receipts.iter().map(|r| r.as_raw().to_vec())) //TODO: get rid of vectors here
};
match self.receipt_ids.get(&receipt_root).cloned() {
Some(h) => {
self.receipt_ids.remove(&receipt_root);
self.downloading_receipts.remove(&h);
match self.blocks.get_mut(&h) {
Some(ref mut block) => {
trace!(target: "sync", "Got receipt {}", h);
block.receipts = Some(r);
Ok(())
},
None => {
warn!("Got receipt with no header {}", h);
Err(NetworkError::BadProtocol)
}
}
}
None => {
trace!(target: "sync", "Ignored unknown/stale block receipt {:?}", receipt_root);
Err(NetworkError::BadProtocol)
}
}
@@ -280,7 +392,7 @@ impl BlockCollection {
}
match self.head {
None if hash == self.heads[0] => {
trace!("New head {}", hash);
trace!(target: "sync", "New head {}", hash);
self.head = Some(info.parent_hash().clone());
},
_ => ()
@@ -289,6 +401,7 @@ impl BlockCollection {
let mut block = SyncBlock {
header: header,
body: None,
receipts: None,
};
let header_id = HeaderId {
transactions_root: info.transactions_root().clone(),
@@ -302,8 +415,21 @@ impl BlockCollection {
block.body = Some(body_stream.out());
}
else {
trace!("Queueing body tx_root = {:?}, uncles = {:?}, block = {:?}, number = {}", header_id.transactions_root, header_id.uncles, hash, info.number());
self.header_ids.insert(header_id, hash.clone());
}
if self.need_receipts {
let receipt_root = info.receipts_root().clone();
if receipt_root == sha3::SHA3_NULL_RLP {
let receipts_stream = RlpStream::new_list(0);
block.receipts = Some(receipts_stream.out());
} else {
if self.receipt_ids.contains_key(&receipt_root) {
warn!(target: "sync", "Duplicate receipt root {:?}, block: {:?}", receipt_root, hash);
}
self.receipt_ids.insert(receipt_root, hash.clone());
}
}
self.parents.insert(info.parent_hash().clone(), hash.clone());
self.blocks.insert(hash.clone(), block);
@@ -326,7 +452,7 @@ impl BlockCollection {
Some(next) => {
h = next.clone();
if old_subchains.contains(&h) {
trace!("Completed subchain {:?}", s);
trace!(target: "sync", "Completed subchain {:?}", s);
break; // reached head of the other subchain, merge by not adding
}
},
@@ -362,7 +488,7 @@ mod test {
#[test]
fn create_clear() {
let mut bc = BlockCollection::new();
let mut bc = BlockCollection::new(false);
assert!(is_empty(&bc));
let client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Nothing);
@@ -375,7 +501,7 @@ mod test {
#[test]
fn insert_headers() {
let mut bc = BlockCollection::new();
let mut bc = BlockCollection::new(false);
assert!(is_empty(&bc));
let client = TestBlockChainClient::new();
let nblocks = 200;
@@ -407,7 +533,7 @@ mod test {
assert!(!bc.is_downloading(&hashes[0]));
assert!(bc.contains(&hashes[0]));
assert_eq!(&bc.drain()[..], &blocks[0..6]);
assert_eq!(&bc.drain().into_iter().map(|b| b.block).collect::<Vec<_>>()[..], &blocks[0..6]);
assert!(!bc.contains(&hashes[0]));
assert_eq!(hashes[5], bc.head.unwrap());
@@ -418,7 +544,7 @@ mod test {
bc.insert_headers(headers[10..16].to_vec());
assert!(bc.drain().is_empty());
bc.insert_headers(headers[5..10].to_vec());
assert_eq!(&bc.drain()[..], &blocks[6..16]);
assert_eq!(&bc.drain().into_iter().map(|b| b.block).collect::<Vec<_>>()[..], &blocks[6..16]);
assert_eq!(hashes[15], bc.heads[0]);
bc.insert_headers(headers[15..].to_vec());
@@ -428,7 +554,7 @@ mod test {
#[test]
fn insert_headers_with_gap() {
let mut bc = BlockCollection::new();
let mut bc = BlockCollection::new(false);
assert!(is_empty(&bc));
let client = TestBlockChainClient::new();
let nblocks = 200;
@@ -450,7 +576,7 @@ mod test {
#[test]
fn insert_headers_no_gap() {
let mut bc = BlockCollection::new();
let mut bc = BlockCollection::new(false);
assert!(is_empty(&bc));
let client = TestBlockChainClient::new();
let nblocks = 200;

File diff suppressed because it is too large Load Diff

View File

@@ -48,6 +48,7 @@ extern crate ethcore_ipc as ipc;
mod chain;
mod blocks;
mod block_sync;
mod sync_io;
mod snapshot;

View File

@@ -14,9 +14,13 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashMap;
use network::{NetworkContext, PeerId, PacketId, NetworkError, SessionInfo};
use util::Bytes;
use ethcore::client::BlockChainClient;
use ethcore::header::BlockNumber;
use ethcore::snapshot::SnapshotService;
use parking_lot::RwLock;
/// IO interface for the syning handler.
/// Provides peer connection management and an interface to the blockchain client.
@@ -48,6 +52,8 @@ pub trait SyncIo {
}
/// Check if the session is expired
fn is_expired(&self) -> bool;
/// Return sync overlay
fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>>;
}
/// Wraps `NetworkContext` and the blockchain client
@@ -55,15 +61,20 @@ pub struct NetSyncIo<'s, 'h> where 'h: 's {
network: &'s NetworkContext<'h>,
chain: &'s BlockChainClient,
snapshot_service: &'s SnapshotService,
chain_overlay: &'s RwLock<HashMap<BlockNumber, Bytes>>,
}
impl<'s, 'h> NetSyncIo<'s, 'h> {
/// Creates a new instance from the `NetworkContext` and the blockchain client reference.
pub fn new(network: &'s NetworkContext<'h>, chain: &'s BlockChainClient, snapshot_service: &'s SnapshotService) -> NetSyncIo<'s, 'h> {
pub fn new(network: &'s NetworkContext<'h>,
chain: &'s BlockChainClient,
snapshot_service: &'s SnapshotService,
chain_overlay: &'s RwLock<HashMap<BlockNumber, Bytes>>) -> NetSyncIo<'s, 'h> {
NetSyncIo {
network: network,
chain: chain,
snapshot_service: snapshot_service,
chain_overlay: chain_overlay,
}
}
}
@@ -89,6 +100,10 @@ impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> {
self.chain
}
fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>> {
self.chain_overlay
}
fn snapshot_service(&self) -> &SnapshotService {
self.snapshot_service
}

View File

@@ -30,6 +30,7 @@ pub struct TestIo<'p> {
pub queue: &'p mut VecDeque<TestPacket>,
pub sender: Option<PeerId>,
pub to_disconnect: HashSet<PeerId>,
overlay: RwLock<HashMap<BlockNumber, Bytes>>,
}
impl<'p> TestIo<'p> {
@@ -40,6 +41,7 @@ impl<'p> TestIo<'p> {
queue: queue,
sender: sender,
to_disconnect: HashSet::new(),
overlay: RwLock::new(HashMap::new()),
}
}
}
@@ -90,6 +92,10 @@ impl<'p> SyncIo for TestIo<'p> {
fn eth_protocol_version(&self, _peer: PeerId) -> u8 {
64
}
fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>> {
&self.overlay
}
}
pub struct TestPacket {
@@ -149,6 +155,7 @@ impl TestNet {
for client in 0..self.peers.len() {
if peer != client {
let mut p = self.peers.get_mut(peer).unwrap();
p.sync.write().restart(&mut TestIo::new(&mut p.chain, &p.snapshot_service, &mut p.queue, Some(client as PeerId)));
p.sync.write().on_peer_connected(&mut TestIo::new(&mut p.chain, &p.snapshot_service, &mut p.queue, Some(client as PeerId)), client as PeerId);
}
}

View File

@@ -77,7 +77,9 @@ impl SnapshotService for TestSnapshotService {
match *self.restoration_manifest.lock() {
Some(ref manifest) if self.state_restoration_chunks.lock().len() == manifest.state_hashes.len() &&
self.block_restoration_chunks.lock().len() == manifest.block_hashes.len() => RestorationStatus::Inactive,
Some(_) => RestorationStatus::Ongoing {
Some(ref manifest) => RestorationStatus::Ongoing {
state_chunks: manifest.state_hashes.len() as u32,
block_chunks: manifest.block_hashes.len() as u32,
state_chunks_done: self.state_restoration_chunks.lock().len() as u32,
block_chunks_done: self.block_restoration_chunks.lock().len() as u32,
},
@@ -114,7 +116,7 @@ impl SnapshotService for TestSnapshotService {
fn snapshot_sync() {
::env_logger::init().ok();
let mut net = TestNet::new(2);
net.peer_mut(0).snapshot_service = Arc::new(TestSnapshotService::new_with_snapshot(16, H256::new(), 1));
net.peer_mut(0).snapshot_service = Arc::new(TestSnapshotService::new_with_snapshot(16, H256::new(), 500000));
net.peer_mut(0).chain.add_blocks(1, EachBlockWith::Nothing);
net.sync_steps(19); // status + manifest + chunks
assert_eq!(net.peer(1).snapshot_service.state_restoration_chunks.lock().len(), net.peer(0).snapshot_service.manifest.as_ref().unwrap().state_hashes.len());